You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/01 16:29:49 UTC
[09/23] fabric commit: updated refs/heads/windsor-merge to 4ec3f11
Collect pending counts and report sum at end
BugzID: 24236
Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/d55908fa
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/d55908fa
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/d55908fa
Branch: refs/heads/windsor-merge
Commit: d55908faea746cf598b10a5c381a4f785abade5f
Parents: 78d4553
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Thu Oct 17 16:33:14 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:07:06 2014 +0100
----------------------------------------------------------------------
src/fabric_view_changes.erl | 65 +++++++++++++++++++++++++++++++++-------
1 file changed, 54 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/d55908fa/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 672ec2d..11478de 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -61,7 +61,7 @@ go(DbName, "normal", Options, Callback, Acc0) ->
case validate_start_seq(DbName, Since) of
ok ->
{ok, Acc} = Callback(start, Acc0),
- {ok, #collector{counters=Seqs, user_acc=AccOut}} = send_changes(
+ {ok, Collector} = send_changes(
DbName,
Args,
Callback,
@@ -69,7 +69,8 @@ go(DbName, "normal", Options, Callback, Acc0) ->
Acc,
5000
),
- Callback({stop, pack_seqs(Seqs)}, AccOut);
+ #collector{counters=Seqs, user_acc=AccOut, offset=Offset} = Collector,
+ Callback({stop, pack_seqs(Seqs), pending_count(Offset)}, AccOut);
Error ->
Callback(Error, Acc0)
end.
@@ -77,12 +78,17 @@ go(DbName, "normal", Options, Callback, Acc0) ->
keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, UpListen, T0) ->
#changes_args{limit=Limit, feed=Feed, heartbeat=Heartbeat} = Args,
{ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn, Timeout),
- #collector{limit=Limit2, counters=NewSeqs, user_acc=AccOut} = Collector,
+ #collector{
+ limit = Limit2,
+ counters = NewSeqs,
+ offset = Offset,
+ user_acc = AccOut
+ } = Collector,
LastSeq = pack_seqs(NewSeqs),
MaintenanceMode = config:get("cloudant", "maintenance_mode"),
if Limit > Limit2, Feed == "longpoll";
MaintenanceMode == "true"; MaintenanceMode == "nolb" ->
- Callback({stop, LastSeq}, AccOut);
+ Callback({stop, LastSeq, pending_count(Offset)}, AccOut);
true ->
WaitForUpdate = wait_db_updated(UpListen),
AccumulatedTime = timer:now_diff(os:timestamp(), T0) div 1000,
@@ -94,9 +100,9 @@ keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, UpListen, T0)
end,
case {Heartbeat, AccumulatedTime > Max, WaitForUpdate} of
{undefined, _, timeout} ->
- Callback({stop, LastSeq}, AccOut);
+ Callback({stop, LastSeq, pending_count(Offset)}, AccOut);
{_, true, timeout} ->
- Callback({stop, LastSeq}, AccOut);
+ Callback({stop, LastSeq, pending_count(Offset)}, AccOut);
_ ->
{ok, AccTimeout} = Callback(timeout, AccOut),
?MODULE:keep_sending_changes(
@@ -183,6 +189,7 @@ send_changes(DbName, Workers, Seqs, ChangesArgs, Callback, AccIn, Timeout) ->
counters = orddict:from_list(Seqs),
user_acc = AccIn,
limit = ChangesArgs#changes_args.limit,
+ offset = fabric_dict:init(Workers, null),
rows = Seqs % store sequence positions instead
},
%% TODO: errors need to be handled here
@@ -207,10 +214,29 @@ handle_message({rexi_EXIT, Reason}, Worker, State) ->
% Temporary upgrade clause - Case 24236
handle_message({complete, Key}, Worker, State) when is_tuple(Key) ->
- handle_message({complete, [{seq, Key}]}, Worker, State);
+ handle_message({complete, [{seq, Key}, {pending, 0}]}, Worker, State);
+
+handle_message({change, Props}, {Worker, _}, #collector{limit=0} = State) ->
+ O0 = State#collector.offset,
+ O1 = case fabric_dict:lookup_element(Worker, O0) of
+ null ->
+ % Use Pending+1 because we're ignoring this row in the response
+ Pending = couch_util:get_value(pending, Props),
+ fabric_dict:store(Worker, Pending+1, O0);
+ _ ->
+ O0
+ end,
+ maybe_stop(State#collector{offset = O1});
-handle_message(_, _, #collector{limit=0} = State) ->
- {stop, State};
+handle_message({complete, Props}, Worker, #collector{limit=0} = State) ->
+ O0 = State#collector.offset,
+ O1 = case fabric_dict:lookup_element(Worker, O0) of
+ null ->
+ fabric_dict:store(Worker, couch_util:get_value(pending,Props), O0);
+ _ ->
+ O0
+ end,
+ maybe_stop(State#collector{offset = O1});
handle_message(#change{} = Row, {Worker, From}, St) ->
Change = {change, [
@@ -226,11 +252,13 @@ handle_message({change, Props}, {Worker, From}, St) ->
#collector{
callback = Callback,
counters = S0,
+ offset = O0,
limit = Limit,
user_acc = AccIn
} = St,
true = fabric_dict:is_key(Worker, S0),
S1 = fabric_dict:store(Worker, couch_util:get_value(seq, Props), S0),
+ O1 = fabric_dict:store(Worker, couch_util:get_value(pending, Props), O0),
% Temporary hack for FB 23637
Interval = erlang:get(changes_seq_interval),
if (Interval == undefined) orelse (Limit rem Interval == 0) ->
@@ -240,7 +268,7 @@ handle_message({change, Props}, {Worker, From}, St) ->
end,
{Go, Acc} = Callback(changes_row(Props2), AccIn),
rexi:stream_ack(From),
- {Go, St#collector{counters=S1, limit=Limit-1, user_acc=Acc}};
+ {Go, St#collector{counters=S1, offset=O1, limit=Limit-1, user_acc=Acc}};
handle_message({no_pass, Seq}, {Worker, From}, St) ->
#collector{counters = S0} = St,
@@ -253,11 +281,13 @@ handle_message({complete, Props}, Worker, State) ->
Key = couch_util:get_value(seq, Props),
#collector{
counters = S0,
+ offset = O0,
total_rows = Completed % override
} = State,
true = fabric_dict:is_key(Worker, S0),
S1 = fabric_dict:store(Worker, Key, S0),
- NewState = State#collector{counters=S1, total_rows=Completed+1},
+ O1 = fabric_dict:store(Worker, couch_util:get_value(pending, Props), O0),
+ NewState = State#collector{counters=S1, offset=O1, total_rows=Completed+1},
% We're relying on S1 having exactly the numnber of workers that
% are participtaing in this response. With the new stream_start
% that's a bit more obvious but historically it wasn't quite
@@ -270,6 +300,7 @@ handle_message({complete, Props}, Worker, State) ->
end,
{Go, NewState}.
+
make_replacement_arg(Node, {Seq, Uuid}) ->
{replace, Node, Uuid, Seq};
make_replacement_arg(Node, {Seq, Uuid, _}) ->
@@ -279,6 +310,15 @@ make_replacement_arg(Node, {Seq, Uuid, _}) ->
make_replacement_arg(_, _) ->
0.
+maybe_stop(#collector{offset = Offset} = State) ->
+ case fabric_dict:any(null, Offset) of
+ false ->
+ {stop, State};
+ true ->
+ % Wait till we've heard from everyone to compute pending count
+ {ok, State}
+ end.
+
make_changes_args(#changes_args{style=Style, filter=undefined}=Args) ->
Args#changes_args{filter = Style};
make_changes_args(Args) ->
@@ -310,6 +350,9 @@ collect_update_seqs(Seq, Shard, Counters) when is_integer(Seq) ->
end
end.
+pending_count(Dict) ->
+ fabric_dict:fold(fun(_Worker, C, Acc) -> C+Acc end, 0, Dict).
+
pack_seqs(Workers) ->
SeqList = [{N,R,S} || {#shard{node=N, range=R}, S} <- Workers],
SeqSum = lists:sum([seq(S) || {_,_,S} <- SeqList]),