You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/28 14:20:44 UTC
[09/50] fabric commit: updated refs/heads/master to a71701c
Update _changes coordinator to use new RPC APIs
BugzId: 21755
Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/c0d13fb6
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/c0d13fb6
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/c0d13fb6
Branch: refs/heads/master
Commit: c0d13fb66b02763d73e420bcb1b6cf8ab492319c
Parents: 6462e84
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Sep 4 15:59:39 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:41 2014 +0100
----------------------------------------------------------------------
src/fabric_view_changes.erl | 155 +++++++++++++++------------------------
1 file changed, 59 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/c0d13fb6/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 8d162ce..69d9d09 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -130,9 +130,30 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
end, find_replacement_shards(Shard, AllLiveShards))
end
end, unpack_seqs(PackedSeqs, DbName)),
- {Workers, _} = lists:unzip(Seqs),
- RexiMon = fabric_util:create_monitors(Workers),
+ {Workers0, _} = lists:unzip(Seqs),
+ RexiMon = fabric_util:create_monitors(Workers0),
+ try
+ case fabric_util:stream_start(Workers0, #shard.ref) of
+ {ok, Workers} ->
+ try
+ LiveSeqs = lists:filter(fun({W, _S}) ->
+ lists:member(W, Workers)
+ end, Seqs),
+ send_changes(DbName, Workers, LiveSeqs, ChangesArgs,
+ Callback, AccIn, Timeout)
+ after
+ fabric_util:cleanup(Workers)
+ end;
+ Else ->
+ Callback(Else, AccIn)
+ end
+ after
+ rexi_monitor:stop(RexiMon)
+ end.
+
+send_changes(DbName, Workers, Seqs, ChangesArgs, Callback, AccIn, Timeout) ->
State = #collector{
+ db_name = DbName,
query_args = ChangesArgs,
callback = Callback,
counters = orddict:from_list(Seqs),
@@ -141,12 +162,7 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
rows = Seqs % store sequence positions instead
},
%% TODO: errors need to be handled here
- try
- receive_results(Workers, State, Timeout, Callback)
- after
- rexi_monitor:stop(RexiMon),
- fabric_util:cleanup(Workers)
- end.
+ receive_results(Workers, State, Timeout, Callback).
receive_results(Workers, State, Timeout, Callback) ->
case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, State,
@@ -159,25 +175,11 @@ receive_results(Workers, State, Timeout, Callback) ->
{ok, NewState}
end.
-handle_message({rexi_DOWN, _, {_, NodeRef}, _}, nil, State) ->
- fabric_view:remove_down_shards(State, NodeRef);
+handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
+ fabric_view:check_down_shards(State, NodeRef);
handle_message({rexi_EXIT, Reason}, Worker, State) ->
- #collector{
- callback=Callback,
- counters=Counters0,
- rows = Seqs0,
- user_acc=Acc
- } = State,
- Counters = fabric_dict:erase(Worker, Counters0),
- Seqs = fabric_dict:erase(Worker, Seqs0),
- case fabric_view:is_progress_possible(Counters) of
- true ->
- {ok, State#collector{counters = Counters, rows=Seqs}};
- false ->
- {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
- {error, Resp}
- end;
+ fabric_view:handle_worker_exit(State, Worker, Reason);
handle_message(_, _, #collector{limit=0} = State) ->
{stop, State};
@@ -190,84 +192,45 @@ handle_message(#change{key=Key} = Row0, {Worker, From}, St) ->
limit = Limit,
user_acc = AccIn
} = St,
- case fabric_dict:lookup_element(Worker, S0) of
- undefined ->
- % this worker lost the race with other partition copies, terminate it
- gen_server:reply(From, stop),
- {ok, St};
- _ ->
- S1 = fabric_dict:store(Worker, Key, S0),
- S2 = fabric_view:remove_overlapping_shards(Worker, S1),
- % this check should not be necessary at all, as holes in the ranges
- % created from DOWN messages would have led to errors
- case fabric_view:is_progress_possible(S2) of
- true ->
- % Temporary hack for FB 23637
- Interval = erlang:get(changes_seq_interval),
- if (Interval == undefined) orelse (Limit rem Interval == 0) ->
- Row = Row0#change{key = pack_seqs(S2)};
- true ->
- Row = Row0#change{key = null}
- end,
- {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn),
- gen_server:reply(From, Go),
- {Go, St#collector{counters=S2, limit=Limit-1, user_acc=Acc}};
- false ->
- Reason = {range_not_covered, <<"progress not possible">>},
- Callback({error, Reason}, AccIn),
- gen_server:reply(From, stop),
- {stop, St#collector{counters=S2}}
- end
- end;
+ true = fabric_dict:is_key(Worker, S0),
+ S1 = fabric_dict:store(Worker, Key, S0),
+ % Temporary hack for FB 23637
+ Interval = erlang:get(changes_seq_interval),
+ if (Interval == undefined) orelse (Limit rem Interval == 0) ->
+ Row = Row0#change{key = pack_seqs(S1)};
+ true ->
+ Row = Row0#change{key = null}
+ end,
+ {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn),
+ rexi:stream_ack(From),
+ {Go, St#collector{counters=S1, limit=Limit-1, user_acc=Acc}};
handle_message({no_pass, Seq}, {Worker, From}, St) ->
- #collector{
- counters = S0
- } = St,
- case fabric_dict:lookup_element(Worker, S0) of
- undefined ->
- % this worker lost the race with other partition copies, terminate it
- gen_server:reply(From, stop),
- {ok, St};
- _ ->
- S1 = fabric_dict:store(Worker, Seq, S0),
- S2 = fabric_view:remove_overlapping_shards(Worker, S1),
- gen_server:reply(From, ok),
- {ok, St#collector{counters=S2}}
- end;
+ #collector{counters = S0} = St,
+ true = fabric_dict:is_key(Worker, S0),
+ S1 = fabric_dict:store(Worker, Seq, S0),
+ rexi:stream_ack(From),
+ {ok, St#collector{counters=S1}};
handle_message({complete, Key}, Worker, State) ->
#collector{
- callback = Callback,
counters = S0,
- total_rows = Completed, % override
- user_acc = Acc
+ total_rows = Completed % override
} = State,
- case fabric_dict:lookup_element(Worker, S0) of
- undefined ->
- {ok, State};
- _ ->
- S1 = fabric_dict:store(Worker, Key, S0),
- % unlikely to have overlaps here, but possible w/ filters
- S2 = fabric_view:remove_overlapping_shards(Worker, S1),
- NewState = State#collector{counters=S2, total_rows=Completed+1},
- case fabric_dict:size(S2) =:= (Completed+1) of
- true ->
- % check ranges are covered, again this should not be neccessary
- % as any holes in the ranges due to DOWN messages would have errored
- % out sooner
- case fabric_view:is_progress_possible(S2) of
- true ->
- {stop, NewState};
- false ->
- Reason = {range_not_covered, <<"progress not possible">>},
- Callback({error, Reason}, Acc),
- {stop, NewState}
- end;
- false ->
- {ok, NewState}
- end
- end.
+ true = fabric_dict:is_key(Worker, S0),
+ S1 = fabric_dict:store(Worker, Key, S0),
+ NewState = State#collector{counters=S1, total_rows=Completed+1},
+ % We're relying on S1 having exactly the numnber of workers that
+ % are participtaing in this response. With the new stream_start
+ % that's a bit more obvious but historically it wasn't quite
+ % so clear. The Completed variable is just a hacky override
+ % of the total_rows field in the #collector{} record.
+ NumWorkers = fabric_dict:size(S1),
+ Go = case NumWorkers =:= (Completed+1) of
+ true -> stop;
+ false -> ok
+ end,
+ {Go, NewState}.
make_changes_args(#changes_args{style=Style, filter=undefined}=Args) ->
Args#changes_args{filter = Style};