You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/01 11:12:16 UTC
[28/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f
Implement fabric_util:stream_start/2
This is a utility function that handles gathering the start of a rexi
stream. It will return a single full hash ring of shards or an error.
BugzId: 21755
Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/f0c675e9
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/f0c675e9
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/f0c675e9
Branch: refs/heads/windsor-merge-121
Commit: f0c675e993b3d138eaff4d5cd8c2c3a9e7318564
Parents: 03086f3
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Sep 6 07:26:32 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 13:55:43 2014 +0100
----------------------------------------------------------------------
src/fabric_util.erl | 52 ++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 52 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/f0c675e9/src/fabric_util.erl
----------------------------------------------------------------------
diff --git a/src/fabric_util.erl b/src/fabric_util.erl
index c93efda..f11abe3 100644
--- a/src/fabric_util.erl
+++ b/src/fabric_util.erl
@@ -16,6 +16,7 @@
update_counter/3, remove_ancestors/2, create_monitors/1, kv/2,
remove_down_workers/2]).
-export([request_timeout/0]).
+-export([stream_start/2]).
-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
@@ -44,6 +45,57 @@ submit_jobs(Shards, Module, EndPoint, ExtraArgs) ->
cleanup(Workers) ->
[rexi:kill(Node, Ref) || #shard{node=Node, ref=Ref} <- Workers].
+stream_start(Workers0, Keypos) ->
+ Fun = fun handle_stream_start/3,
+ Acc = fabric_dict:init(Workers0, waiting),
+ Timeout = request_timeout(),
+ case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
+ {ok, Workers} ->
+ true = fabric_view:is_progress_possible(Workers),
+ AckedWorkers = fabric_dict:fold(fun(Worker, From, WorkerAcc) ->
+ rexi:stream_start(From),
+ [Worker | WorkerAcc]
+ end, [], Workers),
+ {ok, AckedWorkers};
+ Else ->
+ Else
+ end.
+
+handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
+ case fabric_util:remove_down_workers(State, NodeRef) of
+ {ok, NewState} ->
+ {ok, NewState};
+ error ->
+ Reason = {nodedown, <<"progress not possible">>},
+ {error, Reason}
+ end;
+handle_stream_start({rexi_EXIT, Reason}, Worker, State) ->
+ NewState = fabric_dict:erase(Worker, State),
+ case fabric_view:is_progress_possible(NewState) of
+ true ->
+ {ok, NewState};
+ false ->
+ {error, fabric_util:error_info(Reason)}
+ end;
+handle_stream_start(rexi_STREAM_INIT, {Worker, From}, State) ->
+ case fabric_dict:lookup_element(Worker, State) of
+ undefined ->
+ % This worker lost the race with other partition copies, terminate
+ rexi:stream_cancel(From),
+ {ok, State};
+ waiting ->
+ % Don't ack the worker yet so they don't start sending us
+ % rows until we're ready
+ NewState0 = fabric_dict:store(Worker, From, State),
+ NewState1 = fabric_view:remove_overlapping_shards(Worker, NewState0),
+ case fabric_dict:any(waiting, NewState1) of
+ true -> {ok, NewState1};
+ false -> {stop, NewState1}
+ end
+ end;
+handle_stream_start(Else, _, _) ->
+ exit({invalid_stream_start, Else}).
+
recv(Workers, Keypos, Fun, Acc0) ->
rexi_utils:recv(Workers, Keypos, Fun, Acc0, request_timeout(), infinity).