You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/28 14:20:44 UTC

[09/50] fabric commit: updated refs/heads/master to a71701c

Update _changes coordinator to use new RPC APIs

BugzId: 21755


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/c0d13fb6
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/c0d13fb6
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/c0d13fb6

Branch: refs/heads/master
Commit: c0d13fb66b02763d73e420bcb1b6cf8ab492319c
Parents: 6462e84
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Sep 4 15:59:39 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:41 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_changes.erl | 155 +++++++++++++++------------------------
 1 file changed, 59 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/c0d13fb6/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 8d162ce..69d9d09 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -130,9 +130,30 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
             end, find_replacement_shards(Shard, AllLiveShards))
         end
     end, unpack_seqs(PackedSeqs, DbName)),
-    {Workers, _} = lists:unzip(Seqs),
-    RexiMon = fabric_util:create_monitors(Workers),
+    {Workers0, _} = lists:unzip(Seqs),
+    RexiMon = fabric_util:create_monitors(Workers0),
+    try
+        case fabric_util:stream_start(Workers0, #shard.ref) of
+            {ok, Workers} ->
+                try
+                    LiveSeqs = lists:filter(fun({W, _S}) ->
+                        lists:member(W, Workers)
+                    end, Seqs),
+                    send_changes(DbName, Workers, LiveSeqs, ChangesArgs,
+                            Callback, AccIn, Timeout)
+                after
+                    fabric_util:cleanup(Workers)
+                end;
+            Else ->
+                Callback(Else, AccIn)
+        end
+    after
+        rexi_monitor:stop(RexiMon)
+    end.
+
+send_changes(DbName, Workers, Seqs, ChangesArgs, Callback, AccIn, Timeout) ->
     State = #collector{
+        db_name = DbName,
         query_args = ChangesArgs,
         callback = Callback,
         counters = orddict:from_list(Seqs),
@@ -141,12 +162,7 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
         rows = Seqs % store sequence positions instead
     },
     %% TODO: errors need to be handled here
-    try
-        receive_results(Workers, State, Timeout, Callback)
-    after
-        rexi_monitor:stop(RexiMon),
-        fabric_util:cleanup(Workers)
-    end.
+    receive_results(Workers, State, Timeout, Callback).
 
 receive_results(Workers, State, Timeout, Callback) ->
     case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, State,
@@ -159,25 +175,11 @@ receive_results(Workers, State, Timeout, Callback) ->
         {ok, NewState}
     end.
 
-handle_message({rexi_DOWN, _, {_, NodeRef}, _}, nil, State) ->
-    fabric_view:remove_down_shards(State, NodeRef);
+handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
+    fabric_view:check_down_shards(State, NodeRef);
 
 handle_message({rexi_EXIT, Reason}, Worker, State) ->
-    #collector{
-        callback=Callback,
-        counters=Counters0,
-        rows = Seqs0,
-        user_acc=Acc
-    } = State,
-    Counters = fabric_dict:erase(Worker, Counters0),
-    Seqs = fabric_dict:erase(Worker, Seqs0),
-    case fabric_view:is_progress_possible(Counters) of
-    true ->
-        {ok, State#collector{counters = Counters, rows=Seqs}};
-    false ->
-        {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
-        {error, Resp}
-    end;
+    fabric_view:handle_worker_exit(State, Worker, Reason);
 
 handle_message(_, _, #collector{limit=0} = State) ->
     {stop, State};
@@ -190,84 +192,45 @@ handle_message(#change{key=Key} = Row0, {Worker, From}, St) ->
         limit = Limit,
         user_acc = AccIn
     } = St,
-    case fabric_dict:lookup_element(Worker, S0) of
-    undefined ->
-        % this worker lost the race with other partition copies, terminate it
-        gen_server:reply(From, stop),
-        {ok, St};
-    _ ->
-        S1 = fabric_dict:store(Worker, Key, S0),
-        S2 = fabric_view:remove_overlapping_shards(Worker, S1),
-        % this check should not be necessary at all, as holes in the ranges
-        % created from DOWN messages would have led to errors
-        case fabric_view:is_progress_possible(S2) of
-        true ->
-            % Temporary hack for FB 23637
-            Interval = erlang:get(changes_seq_interval),
-            if (Interval == undefined) orelse (Limit rem Interval == 0) ->
-                Row = Row0#change{key = pack_seqs(S2)};
-            true ->
-                Row = Row0#change{key = null}
-            end,
-            {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn),
-            gen_server:reply(From, Go),
-            {Go, St#collector{counters=S2, limit=Limit-1, user_acc=Acc}};
-        false ->
-            Reason = {range_not_covered, <<"progress not possible">>},
-            Callback({error, Reason}, AccIn),
-            gen_server:reply(From, stop),
-            {stop, St#collector{counters=S2}}
-        end
-    end;
+    true = fabric_dict:is_key(Worker, S0),
+    S1 = fabric_dict:store(Worker, Key, S0),
+    % Temporary hack for FB 23637
+    Interval = erlang:get(changes_seq_interval),
+    if (Interval == undefined) orelse (Limit rem Interval == 0) ->
+        Row = Row0#change{key = pack_seqs(S1)};
+    true ->
+        Row = Row0#change{key = null}
+    end,
+    {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn),
+    rexi:stream_ack(From),
+    {Go, St#collector{counters=S1, limit=Limit-1, user_acc=Acc}};
 
 handle_message({no_pass, Seq}, {Worker, From}, St) ->
-    #collector{
-        counters = S0
-    } = St,
-    case fabric_dict:lookup_element(Worker, S0) of
-    undefined ->
-        % this worker lost the race with other partition copies, terminate it
-        gen_server:reply(From, stop),
-        {ok, St};
-    _ ->
-        S1 = fabric_dict:store(Worker, Seq, S0),
-        S2 = fabric_view:remove_overlapping_shards(Worker, S1),
-        gen_server:reply(From, ok),
-        {ok, St#collector{counters=S2}}
-    end;
+    #collector{counters = S0} = St,
+    true = fabric_dict:is_key(Worker, S0),
+    S1 = fabric_dict:store(Worker, Seq, S0),
+    rexi:stream_ack(From),
+    {ok, St#collector{counters=S1}};
 
 handle_message({complete, Key}, Worker, State) ->
     #collector{
-        callback = Callback,
         counters = S0,
-        total_rows = Completed, % override
-        user_acc = Acc
+        total_rows = Completed % override
     } = State,
-    case fabric_dict:lookup_element(Worker, S0) of
-    undefined ->
-        {ok, State};
-    _ ->
-        S1 = fabric_dict:store(Worker, Key, S0),
-        % unlikely to have overlaps here, but possible w/ filters
-        S2 = fabric_view:remove_overlapping_shards(Worker, S1),
-        NewState = State#collector{counters=S2, total_rows=Completed+1},
-        case fabric_dict:size(S2) =:= (Completed+1) of
-        true ->
-            % check ranges are covered, again this should not be neccessary
-            % as any holes in the ranges due to DOWN messages would have errored
-            % out sooner
-            case fabric_view:is_progress_possible(S2) of
-            true ->
-                {stop, NewState};
-            false ->
-                Reason = {range_not_covered, <<"progress not possible">>},
-                Callback({error, Reason}, Acc),
-                {stop, NewState}
-            end;
-        false ->
-            {ok, NewState}
-        end
-    end.
+    true = fabric_dict:is_key(Worker, S0),
+    S1 = fabric_dict:store(Worker, Key, S0),
+    NewState = State#collector{counters=S1, total_rows=Completed+1},
+    % We're relying on S1 having exactly the numnber of workers that
+    % are participtaing in this response. With the new stream_start
+    % that's a bit more obvious but historically it wasn't quite
+    % so clear. The Completed variable is just a hacky override
+    % of the total_rows field in the #collector{} record.
+    NumWorkers = fabric_dict:size(S1),
+    Go = case NumWorkers =:= (Completed+1) of
+        true -> stop;
+        false -> ok
+    end,
+    {Go, NewState}.
 
 make_changes_args(#changes_args{style=Style, filter=undefined}=Args) ->
     Args#changes_args{filter = Style};