You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/01 14:06:10 UTC
[09/11] git commit: Use gen_server:cast rather than rexi RPCs
Use gen_server:cast rather than rexi RPCs
Prior to this commit, the global_changes_listener would send updates to
the global_changes_server via a rexi RPC which only did a
gen_server:call. This is needlessly heavyweight and would cause
gen_server:call timeout errors when the global_changes_server was slow
(usually due to slow disk IO). This commit removes the rexi RPC and
replaces it with a gen_server:cast.
Since this causes all global_changes_server handlers to not reply, this
commit also removes the format_reply function as it's unnecessary.
BugzID: 28242
Project: http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/commit/b226a241
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/tree/b226a241
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/diff/b226a241
Branch: refs/heads/windsor-merge
Commit: b226a241a8cda786d16e951bb9ffaf374c657268
Parents: 2b2005a
Author: Benjamin Bastian <be...@gmail.com>
Authored: Wed Feb 19 13:41:28 2014 -0800
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 13:03:55 2014 +0100
----------------------------------------------------------------------
src/global_changes_listener.erl | 3 +-
src/global_changes_server.erl | 55 ++++++++++++++++--------------------
2 files changed, 25 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/b226a241/src/global_changes_listener.erl
----------------------------------------------------------------------
diff --git a/src/global_changes_listener.erl b/src/global_changes_listener.erl
index d836f2b..ccee705 100644
--- a/src/global_changes_listener.erl
+++ b/src/global_changes_listener.erl
@@ -114,8 +114,7 @@ maybe_send_updates(#state{update_db=true}=State) ->
Grouped ->
dict:map(fun(Node, Docs) ->
Metric = [global_changes, rpcs],
- MFA = {global_changes_server, update_docs, [Docs]},
- rexi:cast(Node, MFA)
+ global_changes_server:update_docs(Node, Docs)
end, Grouped)
catch error:database_does_not_exist ->
ok
http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/b226a241/src/global_changes_server.erl
----------------------------------------------------------------------
diff --git a/src/global_changes_server.erl b/src/global_changes_server.erl
index 619313f..966d866 100644
--- a/src/global_changes_server.erl
+++ b/src/global_changes_server.erl
@@ -19,7 +19,7 @@
]).
-export([
- update_docs/1
+ update_docs/2
]).
@@ -67,20 +67,23 @@ terminate(_Reason, _Srv) ->
ok.
-handle_call(_Msg, _From, #state{update_db=false}=State) ->
- {reply, ok, State};
-handle_call({update_docs, DocIds}, _From, State) ->
+handle_call(_Msg, _From, State) ->
+ {reply, ok, State}.
+
+
+handle_cast(_Msg, #state{update_db=false}=State) ->
+ {noreply, State};
+handle_cast({update_docs, DocIds}, State) ->
Pending = sets:union(sets:from_list(DocIds), State#state.pending_updates),
NewState = State#state{
pending_updates=Pending,
pending_update_count=sets:size(Pending)
},
- format_reply(reply, maybe_update_docs(NewState)).
-
+ maybe_update_docs(NewState);
handle_cast({set_max_write_delay, MaxWriteDelay}, State) ->
NewState = State#state{max_write_delay=MaxWriteDelay},
- format_reply(noreply, maybe_update_docs(NewState));
+ maybe_update_docs(NewState);
handle_cast({set_update_db, Boolean}, State0) ->
% If turning update_db off, clear out server state
State = case {Boolean, State0#state.update_db} of
@@ -94,9 +97,9 @@ handle_cast({set_update_db, Boolean}, State0) ->
_ ->
State0#state{update_db=Boolean}
end,
- format_reply(noreply, maybe_update_docs(State));
+ maybe_update_docs(State);
handle_cast(_Msg, State) ->
- format_reply(noreply, maybe_update_docs(State)).
+ maybe_update_docs(State).
handle_info(start_listener, State) ->
@@ -104,13 +107,13 @@ handle_info(start_listener, State) ->
NewState = State#state{
handler_ref=erlang:monitor(process, Handler)
},
- format_reply(noreply, maybe_update_docs(NewState));
+ maybe_update_docs(NewState);
handle_info({'DOWN', Ref, _, _, Reason}, #state{handler_ref=Ref}=State) ->
couch_log:error("global_changes_listener terminated: ~w", [Reason]),
erlang:send_after(5000, self(), start_listener),
- format_reply(noreply, maybe_update_docs(State));
+ maybe_update_docs(State);
handle_info(_, State) ->
- format_reply(noreply, maybe_update_docs(State)).
+ maybe_update_docs(State).
code_change(_OldVsn, State, _Extra) ->
@@ -118,13 +121,13 @@ code_change(_OldVsn, State, _Extra) ->
maybe_update_docs(#state{pending_update_count=0}=State) ->
- State;
+ {noreply, State};
maybe_update_docs(#state{update_db=true}=State) ->
#state{max_write_delay=MaxWriteDelay, last_update_time=LastUpdateTime} = State,
Now = os:timestamp(),
case LastUpdateTime of
undefined ->
- {State#state{last_update_time=Now}, MaxWriteDelay};
+ {noreply, State#state{last_update_time=Now}, MaxWriteDelay};
_ ->
Delta = round(timer:now_diff(Now, LastUpdateTime)/1000),
if Delta >= MaxWriteDelay ->
@@ -147,23 +150,23 @@ maybe_update_docs(#state{update_db=true}=State) ->
Count = State#state.pending_update_count,
catch error:database_does_not_exist ->
- ok
+ {noreply, State}
end,
- State#state{
+ {noreply, State#state{
pending_updates=sets:new(),
pending_update_count=0,
last_update_time=undefined
- };
+ }};
true ->
- {State, MaxWriteDelay-Delta}
+ {noreply, State, MaxWriteDelay-Delta}
end
end;
maybe_update_docs(State) ->
- State.
+ {noreply, State}.
-update_docs(Updates) ->
- gen_server:call(?MODULE, {update_docs, Updates}).
+update_docs(Node, Updates) ->
+ gen_server:cast({?MODULE, Node}, {update_docs, Updates}).
group_ids_by_shard(DbName, DocIds) ->
@@ -179,16 +182,6 @@ group_ids_by_shard(DbName, DocIds) ->
end, dict:new(), DocIds).
-format_reply(reply, #state{}=State) ->
- {reply, ok, State};
-format_reply(reply, {State, Timeout}) ->
- {reply, ok, State, Timeout};
-format_reply(noreply, #state{}=State) ->
- {noreply, State};
-format_reply(noreply, {State, Timeout}) ->
- {noreply, State, Timeout}.
-
-
get_docs_locally(Shard, Ids) ->
lists:map(fun(Id) ->
DocInfo = couch_db:get_doc_info(Shard, Id),