You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/01 16:34:44 UTC

[23/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Collect pending counts and report sum at end

BugzID: 24236


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/7826ce5f
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/7826ce5f
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/7826ce5f

Branch: refs/heads/windsor-merge
Commit: 7826ce5fa9058bbc1c85ac49be6c6fbb369f5606
Parents: 9de1096
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Thu Oct 17 16:33:14 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:42 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_changes.erl | 65 +++++++++++++++++++++++++++++++++-------
 1 file changed, 54 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/7826ce5f/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 36cb945..60dc54c 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -61,7 +61,7 @@ go(DbName, "normal", Options, Callback, Acc0) ->
     case validate_start_seq(DbName, Since) of
     ok ->
         {ok, Acc} = Callback(start, Acc0),
-        {ok, #collector{counters=Seqs, user_acc=AccOut}} = send_changes(
+        {ok, Collector} = send_changes(
             DbName,
             Args,
             Callback,
@@ -69,7 +69,8 @@ go(DbName, "normal", Options, Callback, Acc0) ->
             Acc,
             5000
         ),
-        Callback({stop, pack_seqs(Seqs)}, AccOut);
+        #collector{counters=Seqs, user_acc=AccOut, offset=Offset} = Collector,
+        Callback({stop, pack_seqs(Seqs), pending_count(Offset)}, AccOut);
     Error ->
         Callback(Error, Acc0)
     end.
@@ -77,12 +78,17 @@ go(DbName, "normal", Options, Callback, Acc0) ->
 keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, UpListen, T0) ->
     #changes_args{limit=Limit, feed=Feed, heartbeat=Heartbeat} = Args,
     {ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn, Timeout),
-    #collector{limit=Limit2, counters=NewSeqs, user_acc=AccOut} = Collector,
+    #collector{
+        limit = Limit2,
+        counters = NewSeqs,
+        offset = Offset,
+        user_acc = AccOut
+    } = Collector,
     LastSeq = pack_seqs(NewSeqs),
     MaintenanceMode = config:get("cloudant", "maintenance_mode"),
     if Limit > Limit2, Feed == "longpoll";
       MaintenanceMode == "true"; MaintenanceMode == "nolb" ->
-        Callback({stop, LastSeq}, AccOut);
+        Callback({stop, LastSeq, pending_count(Offset)}, AccOut);
     true ->
         WaitForUpdate = wait_db_updated(UpListen),
         AccumulatedTime = timer:now_diff(os:timestamp(), T0) div 1000,
@@ -94,9 +100,9 @@ keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, UpListen, T0)
         end,
         case {Heartbeat, AccumulatedTime > Max, WaitForUpdate} of
         {undefined, _, timeout} ->
-            Callback({stop, LastSeq}, AccOut);
+            Callback({stop, LastSeq, pending_count(Offset)}, AccOut);
         {_, true, timeout} ->
-            Callback({stop, LastSeq}, AccOut);
+            Callback({stop, LastSeq, pending_count(Offset)}, AccOut);
         _ ->
             {ok, AccTimeout} = Callback(timeout, AccOut),
             ?MODULE:keep_sending_changes(
@@ -183,6 +189,7 @@ send_changes(DbName, Workers, Seqs, ChangesArgs, Callback, AccIn, Timeout) ->
         counters = orddict:from_list(Seqs),
         user_acc = AccIn,
         limit = ChangesArgs#changes_args.limit,
+        offset = fabric_dict:init(Workers, null),
         rows = Seqs % store sequence positions instead
     },
     %% TODO: errors need to be handled here
@@ -207,10 +214,29 @@ handle_message({rexi_EXIT, Reason}, Worker, State) ->
 
 % Temporary upgrade clause - Case 24236
 handle_message({complete, Key}, Worker, State) when is_tuple(Key) ->
-    handle_message({complete, [{seq, Key}]}, Worker, State);
+    handle_message({complete, [{seq, Key}, {pending, 0}]}, Worker, State);
+
+handle_message({change, Props}, {Worker, _}, #collector{limit=0} = State) ->
+    O0 = State#collector.offset,
+    O1 = case fabric_dict:lookup_element(Worker, O0) of
+        null ->
+            % Use Pending+1 because we're ignoring this row in the response
+            Pending = couch_util:get_value(pending, Props),
+            fabric_dict:store(Worker, Pending+1, O0);
+        _ ->
+            O0
+    end,
+    maybe_stop(State#collector{offset = O1});
 
-handle_message(_, _, #collector{limit=0} = State) ->
-    {stop, State};
+handle_message({complete, Props}, Worker, #collector{limit=0} = State) ->
+    O0 = State#collector.offset,
+    O1 = case fabric_dict:lookup_element(Worker, O0) of
+        null ->
+            fabric_dict:store(Worker, couch_util:get_value(pending,Props), O0);
+        _ ->
+            O0
+    end,
+    maybe_stop(State#collector{offset = O1});
 
 handle_message(#change{} = Row, {Worker, From}, St) ->
     Change = {change, [
@@ -226,11 +252,13 @@ handle_message({change, Props}, {Worker, From}, St) ->
     #collector{
         callback = Callback,
         counters = S0,
+        offset = O0,
         limit = Limit,
         user_acc = AccIn
     } = St,
     true = fabric_dict:is_key(Worker, S0),
     S1 = fabric_dict:store(Worker, couch_util:get_value(seq, Props), S0),
+    O1 = fabric_dict:store(Worker, couch_util:get_value(pending, Props), O0),
     % Temporary hack for FB 23637
     Interval = erlang:get(changes_seq_interval),
     if (Interval == undefined) orelse (Limit rem Interval == 0) ->
@@ -240,7 +268,7 @@ handle_message({change, Props}, {Worker, From}, St) ->
     end,
     {Go, Acc} = Callback(changes_row(Props2), AccIn),
     rexi:stream_ack(From),
-    {Go, St#collector{counters=S1, limit=Limit-1, user_acc=Acc}};
+    {Go, St#collector{counters=S1, offset=O1, limit=Limit-1, user_acc=Acc}};
 
 handle_message({no_pass, Seq}, {Worker, From}, St) ->
     #collector{counters = S0} = St,
@@ -253,11 +281,13 @@ handle_message({complete, Props}, Worker, State) ->
     Key = couch_util:get_value(seq, Props),
     #collector{
         counters = S0,
+        offset = O0,
         total_rows = Completed % override
     } = State,
     true = fabric_dict:is_key(Worker, S0),
     S1 = fabric_dict:store(Worker, Key, S0),
-    NewState = State#collector{counters=S1, total_rows=Completed+1},
+    O1 = fabric_dict:store(Worker, couch_util:get_value(pending, Props), O0),
+    NewState = State#collector{counters=S1, offset=O1, total_rows=Completed+1},
     % We're relying on S1 having exactly the numnber of workers that
     % are participtaing in this response. With the new stream_start
     % that's a bit more obvious but historically it wasn't quite
@@ -270,6 +300,7 @@ handle_message({complete, Props}, Worker, State) ->
     end,
     {Go, NewState}.
 
+
 make_replacement_arg(Node, {Seq, Uuid}) ->
     {replace, Node, Uuid, Seq};
 make_replacement_arg(Node, {Seq, Uuid, _}) ->
@@ -279,6 +310,15 @@ make_replacement_arg(Node, {Seq, Uuid, _}) ->
 make_replacement_arg(_, _) ->
     0.
 
+maybe_stop(#collector{offset = Offset} = State) ->
+    case fabric_dict:any(null, Offset) of
+        false ->
+            {stop, State};
+        true ->
+            % Wait till we've heard from everyone to compute pending count
+            {ok, State}
+    end.
+
 make_changes_args(#changes_args{style=Style, filter=undefined}=Args) ->
     Args#changes_args{filter = Style};
 make_changes_args(Args) ->
@@ -310,6 +350,9 @@ collect_update_seqs(Seq, Shard, Counters) when is_integer(Seq) ->
         end
     end.
 
+pending_count(Dict) ->
+    fabric_dict:fold(fun(_Worker, C, Acc) -> C+Acc end, 0, Dict).
+
 pack_seqs(Workers) ->
     SeqList = [{N,R,S} || {#shard{node=N, range=R}, S} <- Workers],
     SeqSum = lists:sum([seq(S) || {_,_,S} <- SeqList]),