You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/01 16:34:35 UTC

[14/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Update map view coordinator to use new RPC APIs

BugzId: 21755


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/11112ad1
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/11112ad1
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/11112ad1

Branch: refs/heads/windsor-merge
Commit: 11112ad1655426fd5689a6172866631e18a62eea
Parents: 216ffb3
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Sep 6 07:26:52 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:41 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_map.erl | 100 +++++++++++++++++++++++--------------------
 1 file changed, 54 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/11112ad1/src/fabric_view_map.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_map.erl b/src/fabric_view_map.erl
index eb30179..cf70568 100644
--- a/src/fabric_view_map.erl
+++ b/src/fabric_view_map.erl
@@ -23,9 +23,29 @@ go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
     {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
     go(DbName, DDoc, View, Args, Callback, Acc0);
 
-go(DbName, DDoc, View, Args, Callback, Acc0) ->
+go(DbName, DDoc, View, Args, Callback, Acc) ->
     Shards = fabric_view:get_shards(DbName, Args),
-    Workers = fabric_util:submit_jobs(Shards, map_view, [DDoc, View, Args]),
+    Workers0 = fabric_util:submit_jobs(
+            Shards, fabric_rpc, map_view, [DDoc, View, Args]),
+    RexiMon = fabric_util:create_monitors(Workers0),
+    try
+        case fabric_util:stream_start(Workers0, #shard.ref) of
+            {ok, Workers} ->
+                try
+                    go(DbName, Workers, Args, Callback, Acc)
+                after
+                    fabric_util:cleanup(Workers)
+                end;
+            {timeout, _} ->
+                Callback({error, timeout}, Acc);
+            {error, Error} ->
+                Callback({error, Error}, Acc)
+        end
+    after
+        rexi_monitor:stop(RexiMon)
+    end.
+
+go(DbName, Workers, Args, Callback, Acc0) ->
     #mrargs{limit = Limit, skip = Skip, keys = Keys} = Args,
     State = #collector{
         db_name=DbName,
@@ -38,8 +58,7 @@ go(DbName, DDoc, View, Args, Callback, Acc0) ->
         sorted = Args#mrargs.sorted,
         user_acc = Acc0
     },
-    RexiMon = fabric_util:create_monitors(Workers),
-    try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
+    case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
         State, infinity, 1000 * 60 * 60) of
     {ok, NewState} ->
         {ok, NewState#collector.user_acc};
@@ -47,24 +66,18 @@ go(DbName, DDoc, View, Args, Callback, Acc0) ->
         Callback({error, timeout}, NewState#collector.user_acc);
     {error, Resp} ->
         {ok, Resp}
-    after
-        rexi_monitor:stop(RexiMon),
-        fabric_util:cleanup(Workers)
     end.
 
 handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
-    fabric_view:remove_down_shards(State, NodeRef);
+    fabric_view:check_down_shards(State, NodeRef);
 
 handle_message({rexi_EXIT, Reason}, Worker, State) ->
-    #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State,
-    Counters = fabric_dict:erase(Worker, Counters0),
-    case fabric_view:is_progress_possible(Counters) of
-    true ->
-        {ok, State#collector{counters = Counters}};
-    false ->
-        {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
-        {error, Resp}
-    end;
+    fabric_view:handle_worker_exit(State, Worker, Reason);
+
+handle_message({rexi_EXIT, Reason}, _, State) ->
+    #collector{callback=Callback, user_acc=Acc} = State,
+    {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
+    {error, Resp};
 
 handle_message({meta, Meta0}, {Worker, From}, State) ->
     Tot = couch_util:get_value(total, Meta0, 0),
@@ -76,35 +89,30 @@ handle_message({meta, Meta0}, {Worker, From}, State) ->
         offset = Offset0,
         user_acc = AccIn
     } = State,
-    case fabric_dict:lookup_element(Worker, Counters0) of
-    undefined ->
-        % this worker lost the race with other partition copies, terminate
-        gen_server:reply(From, stop),
-        {ok, State};
-    0 ->
-        gen_server:reply(From, ok),
-        Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
-        Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1),
-        Total = Total0 + Tot,
-        Offset = Offset0 + Off,
-        case fabric_dict:any(0, Counters2) of
-        true ->
-            {ok, State#collector{
-                counters = Counters2,
-                total_rows = Total,
-                offset = Offset
-            }};
-        false ->
-            FinalOffset = erlang:min(Total, Offset+State#collector.skip),
-            Meta = [{total, Total}, {offset, FinalOffset}],
-            {Go, Acc} = Callback({meta, Meta}, AccIn),
-            {Go, State#collector{
-                counters = fabric_dict:decrement_all(Counters2),
-                total_rows = Total,
-                offset = FinalOffset,
-                user_acc = Acc
-            }}
-        end
+    % Assert that we don't have other messages from this
+    % worker when the total_and_offset message arrives.
+    0 = fabric_dict:lookup_element(Worker, Counters0),
+    rexi:stream_ack(From),
+    Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
+    Total = Total0 + Tot,
+    Offset = Offset0 + Off,
+    case fabric_dict:any(0, Counters1) of
+    true ->
+        {ok, State#collector{
+            counters = Counters1,
+            total_rows = Total,
+            offset = Offset
+        }};
+    false ->
+        FinalOffset = erlang:min(Total, Offset+State#collector.skip),
+        Meta = [{total, Total}, {offset, FinalOffset}],
+        {Go, Acc} = Callback({meta, Meta}, AccIn),
+        {Go, State#collector{
+            counters = fabric_dict:decrement_all(Counters1),
+            total_rows = Total,
+            offset = FinalOffset,
+            user_acc = Acc
+        }}
     end;
 
 handle_message(#view_row{}, {_, _}, #collector{limit=0} = State) ->