You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/07 18:01:04 UTC

[02/10] git commit: Use gen_server:cast rather than rexi RPCs

Use gen_server:cast rather than rexi RPCs

Prior to this commit, the global_changes_listener would send updates to
the global_changes_server via a rexi RPC which only did a
gen_server:call. This is needlessly heavyweight and would cause
gen_server:call timeout errors when the global_changes_server was slow
(usually due to slow disk IO). This commit removes the rexi RPC and
replaces it with a gen_server:cast.

Since this causes all global_changes_server handlers to not reply, this
commit also removes the format_reply function as it's unnecessary.

BugzID: 28242


Project: http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/commit/db52f100
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/tree/db52f100
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/diff/db52f100

Branch: refs/heads/windsor-merge
Commit: db52f10059d46e736898a8de45ebc689b7ea4254
Parents: c0f0382
Author: Benjamin Bastian <be...@gmail.com>
Authored: Wed Feb 19 13:41:28 2014 -0800
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Aug 7 17:00:36 2014 +0100

----------------------------------------------------------------------
 src/global_changes_listener.erl |  3 +-
 src/global_changes_server.erl   | 55 ++++++++++++++++--------------------
 2 files changed, 25 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/db52f100/src/global_changes_listener.erl
----------------------------------------------------------------------
diff --git a/src/global_changes_listener.erl b/src/global_changes_listener.erl
index ae38245..1aa4ea4 100644
--- a/src/global_changes_listener.erl
+++ b/src/global_changes_listener.erl
@@ -112,8 +112,7 @@ maybe_send_updates(#state{update_db=true}=State) ->
             try group_updates_by_node(State#state.dbname, Updates) of
                 Grouped ->
                     dict:map(fun(Node, Docs) ->
-                        MFA = {global_changes_server, update_docs, [Docs]},
-                        rexi:cast(Node, MFA)
+                        global_changes_server:update_docs(Node, Docs)
                     end, Grouped)
             catch error:database_does_not_exist ->
                 ok

http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/db52f100/src/global_changes_server.erl
----------------------------------------------------------------------
diff --git a/src/global_changes_server.erl b/src/global_changes_server.erl
index d4c951e..d30a319 100644
--- a/src/global_changes_server.erl
+++ b/src/global_changes_server.erl
@@ -19,7 +19,7 @@
 ]).
 
 -export([
-    update_docs/1
+    update_docs/2
 ]).
 
 
@@ -67,20 +67,23 @@ terminate(_Reason, _Srv) ->
     ok.
 
 
-handle_call(_Msg, _From, #state{update_db=false}=State) ->
-    {reply, ok, State};
-handle_call({update_docs, DocIds}, _From, State) ->
+handle_call(_Msg, _From, State) ->
+    {reply, ok, State}.
+
+
+handle_cast(_Msg, #state{update_db=false}=State) ->
+    {noreply, State};
+handle_cast({update_docs, DocIds}, State) ->
     Pending = sets:union(sets:from_list(DocIds), State#state.pending_updates),
     NewState = State#state{
         pending_updates=Pending,
         pending_update_count=sets:size(Pending)
     },
-    format_reply(reply, maybe_update_docs(NewState)).
-
+    maybe_update_docs(NewState);
 
 handle_cast({set_max_write_delay, MaxWriteDelay}, State) ->
     NewState = State#state{max_write_delay=MaxWriteDelay},
-    format_reply(noreply, maybe_update_docs(NewState));
+    maybe_update_docs(NewState);
 handle_cast({set_update_db, Boolean}, State0) ->
     % If turning update_db off, clear out server state
     State = case {Boolean, State0#state.update_db} of
@@ -94,9 +97,9 @@ handle_cast({set_update_db, Boolean}, State0) ->
         _ ->
             State0#state{update_db=Boolean}
     end,
-    format_reply(noreply, maybe_update_docs(State));
+    maybe_update_docs(State);
 handle_cast(_Msg, State) ->
-    format_reply(noreply, maybe_update_docs(State)).
+    maybe_update_docs(State).
 
 
 handle_info(start_listener, State) ->
@@ -104,13 +107,13 @@ handle_info(start_listener, State) ->
     NewState = State#state{
         handler_ref=erlang:monitor(process, Handler)
     },
-    format_reply(noreply, maybe_update_docs(NewState));
+    maybe_update_docs(NewState);
 handle_info({'DOWN', Ref, _, _, Reason}, #state{handler_ref=Ref}=State) ->
     couch_log:error("global_changes_listener terminated: ~w", [Reason]),
     erlang:send_after(5000, self(), start_listener),
-    format_reply(noreply, maybe_update_docs(State));
+    maybe_update_docs(State);
 handle_info(_, State) ->
-    format_reply(noreply, maybe_update_docs(State)).
+    maybe_update_docs(State).
 
 
 code_change(_OldVsn, State, _Extra) ->
@@ -118,13 +121,13 @@ code_change(_OldVsn, State, _Extra) ->
 
 
 maybe_update_docs(#state{pending_update_count=0}=State) ->
-    State;
+    {noreply, State};
 maybe_update_docs(#state{update_db=true}=State) ->
     #state{max_write_delay=MaxWriteDelay, last_update_time=LastUpdateTime} = State,
     Now = os:timestamp(),
     case LastUpdateTime of
     undefined ->
-        {State#state{last_update_time=Now}, MaxWriteDelay};
+        {noreply, State#state{last_update_time=Now}, MaxWriteDelay};
     _ ->
         Delta = round(timer:now_diff(Now, LastUpdateTime)/1000),
         if Delta >= MaxWriteDelay ->
@@ -145,23 +148,23 @@ maybe_update_docs(#state{update_db=true}=State) ->
                     fabric:update_docs(State#state.dbname, Docs, [])
                 end)
             catch error:database_does_not_exist ->
-                ok
+                {noreply, State}
             end,
-            State#state{
+            {noreply, State#state{
                 pending_updates=sets:new(),
                 pending_update_count=0,
                 last_update_time=undefined
-            };
+            }};
         true ->
-            {State, MaxWriteDelay-Delta}
+            {noreply, State, MaxWriteDelay-Delta}
         end
     end;
 maybe_update_docs(State) ->
-    State.
+    {noreply, State}.
 
 
-update_docs(Updates) ->
-    gen_server:call(?MODULE, {update_docs, Updates}).
+update_docs(Node, Updates) ->
+    gen_server:cast({?MODULE, Node}, {update_docs, Updates}).
 
 
 group_ids_by_shard(DbName, DocIds) ->
@@ -177,16 +180,6 @@ group_ids_by_shard(DbName, DocIds) ->
     end, dict:new(), DocIds).
 
 
-format_reply(reply, #state{}=State) ->
-    {reply, ok, State};
-format_reply(reply, {State, Timeout}) ->
-    {reply, ok, State, Timeout};
-format_reply(noreply, #state{}=State) ->
-    {noreply, State};
-format_reply(noreply, {State, Timeout}) ->
-    {noreply, State, Timeout}.
-
-
 get_docs_locally(Shard, Ids) ->
     lists:map(fun(Id) ->
         DocInfo = couch_db:get_doc_info(Shard, Id),