You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/01 16:34:35 UTC
[14/49] fabric commit: updated refs/heads/windsor-merge to b1c0030
Update map view coordinator to use new RPC APIs
BugzId: 21755
Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/11112ad1
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/11112ad1
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/11112ad1
Branch: refs/heads/windsor-merge
Commit: 11112ad1655426fd5689a6172866631e18a62eea
Parents: 216ffb3
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Sep 6 07:26:52 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:41 2014 +0100
----------------------------------------------------------------------
src/fabric_view_map.erl | 100 +++++++++++++++++++++++--------------------
1 file changed, 54 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/11112ad1/src/fabric_view_map.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_map.erl b/src/fabric_view_map.erl
index eb30179..cf70568 100644
--- a/src/fabric_view_map.erl
+++ b/src/fabric_view_map.erl
@@ -23,9 +23,29 @@ go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
{ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
go(DbName, DDoc, View, Args, Callback, Acc0);
-go(DbName, DDoc, View, Args, Callback, Acc0) ->
+go(DbName, DDoc, View, Args, Callback, Acc) ->
Shards = fabric_view:get_shards(DbName, Args),
- Workers = fabric_util:submit_jobs(Shards, map_view, [DDoc, View, Args]),
+ Workers0 = fabric_util:submit_jobs(
+ Shards, fabric_rpc, map_view, [DDoc, View, Args]),
+ RexiMon = fabric_util:create_monitors(Workers0),
+ try
+ case fabric_util:stream_start(Workers0, #shard.ref) of
+ {ok, Workers} ->
+ try
+ go(DbName, Workers, Args, Callback, Acc)
+ after
+ fabric_util:cleanup(Workers)
+ end;
+ {timeout, _} ->
+ Callback({error, timeout}, Acc);
+ {error, Error} ->
+ Callback({error, Error}, Acc)
+ end
+ after
+ rexi_monitor:stop(RexiMon)
+ end.
+
+go(DbName, Workers, Args, Callback, Acc0) ->
#mrargs{limit = Limit, skip = Skip, keys = Keys} = Args,
State = #collector{
db_name=DbName,
@@ -38,8 +58,7 @@ go(DbName, DDoc, View, Args, Callback, Acc0) ->
sorted = Args#mrargs.sorted,
user_acc = Acc0
},
- RexiMon = fabric_util:create_monitors(Workers),
- try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
+ case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
State, infinity, 1000 * 60 * 60) of
{ok, NewState} ->
{ok, NewState#collector.user_acc};
@@ -47,24 +66,18 @@ go(DbName, DDoc, View, Args, Callback, Acc0) ->
Callback({error, timeout}, NewState#collector.user_acc);
{error, Resp} ->
{ok, Resp}
- after
- rexi_monitor:stop(RexiMon),
- fabric_util:cleanup(Workers)
end.
handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
- fabric_view:remove_down_shards(State, NodeRef);
+ fabric_view:check_down_shards(State, NodeRef);
handle_message({rexi_EXIT, Reason}, Worker, State) ->
- #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State,
- Counters = fabric_dict:erase(Worker, Counters0),
- case fabric_view:is_progress_possible(Counters) of
- true ->
- {ok, State#collector{counters = Counters}};
- false ->
- {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
- {error, Resp}
- end;
+ fabric_view:handle_worker_exit(State, Worker, Reason);
+
+handle_message({rexi_EXIT, Reason}, _, State) ->
+ #collector{callback=Callback, user_acc=Acc} = State,
+ {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
+ {error, Resp};
handle_message({meta, Meta0}, {Worker, From}, State) ->
Tot = couch_util:get_value(total, Meta0, 0),
@@ -76,35 +89,30 @@ handle_message({meta, Meta0}, {Worker, From}, State) ->
offset = Offset0,
user_acc = AccIn
} = State,
- case fabric_dict:lookup_element(Worker, Counters0) of
- undefined ->
- % this worker lost the race with other partition copies, terminate
- gen_server:reply(From, stop),
- {ok, State};
- 0 ->
- gen_server:reply(From, ok),
- Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
- Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1),
- Total = Total0 + Tot,
- Offset = Offset0 + Off,
- case fabric_dict:any(0, Counters2) of
- true ->
- {ok, State#collector{
- counters = Counters2,
- total_rows = Total,
- offset = Offset
- }};
- false ->
- FinalOffset = erlang:min(Total, Offset+State#collector.skip),
- Meta = [{total, Total}, {offset, FinalOffset}],
- {Go, Acc} = Callback({meta, Meta}, AccIn),
- {Go, State#collector{
- counters = fabric_dict:decrement_all(Counters2),
- total_rows = Total,
- offset = FinalOffset,
- user_acc = Acc
- }}
- end
+ % Assert that we don't have other messages from this
+ % worker when the total_and_offset message arrives.
+ 0 = fabric_dict:lookup_element(Worker, Counters0),
+ rexi:stream_ack(From),
+ Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
+ Total = Total0 + Tot,
+ Offset = Offset0 + Off,
+ case fabric_dict:any(0, Counters1) of
+ true ->
+ {ok, State#collector{
+ counters = Counters1,
+ total_rows = Total,
+ offset = Offset
+ }};
+ false ->
+ FinalOffset = erlang:min(Total, Offset+State#collector.skip),
+ Meta = [{total, Total}, {offset, FinalOffset}],
+ {Go, Acc} = Callback({meta, Meta}, AccIn),
+ {Go, State#collector{
+ counters = fabric_dict:decrement_all(Counters1),
+ total_rows = Total,
+ offset = FinalOffset,
+ user_acc = Acc
+ }}
end;
handle_message(#view_row{}, {_, _}, #collector{limit=0} = State) ->