You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/01 11:12:16 UTC

[28/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Implement fabric_util:stream_start/2

This is a utility function that handles gathering the start of a rexi
stream. It will return a single full hash ring of shards or an error.

BugzId: 21755


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/f0c675e9
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/f0c675e9
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/f0c675e9

Branch: refs/heads/windsor-merge-121
Commit: f0c675e993b3d138eaff4d5cd8c2c3a9e7318564
Parents: 03086f3
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Sep 6 07:26:32 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 13:55:43 2014 +0100

----------------------------------------------------------------------
 src/fabric_util.erl | 52 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 52 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/f0c675e9/src/fabric_util.erl
----------------------------------------------------------------------
diff --git a/src/fabric_util.erl b/src/fabric_util.erl
index c93efda..f11abe3 100644
--- a/src/fabric_util.erl
+++ b/src/fabric_util.erl
@@ -16,6 +16,7 @@
         update_counter/3, remove_ancestors/2, create_monitors/1, kv/2,
         remove_down_workers/2]).
 -export([request_timeout/0]).
+-export([stream_start/2]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
@@ -44,6 +45,57 @@ submit_jobs(Shards, Module, EndPoint, ExtraArgs) ->
 cleanup(Workers) ->
     [rexi:kill(Node, Ref) || #shard{node=Node, ref=Ref} <- Workers].
 
+stream_start(Workers0, Keypos) ->
+    Fun = fun handle_stream_start/3,
+    Acc = fabric_dict:init(Workers0, waiting),
+    Timeout = request_timeout(),
+    case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
+        {ok, Workers} ->
+            true = fabric_view:is_progress_possible(Workers),
+            AckedWorkers = fabric_dict:fold(fun(Worker, From, WorkerAcc) ->
+                rexi:stream_start(From),
+                [Worker | WorkerAcc]
+            end, [], Workers),
+            {ok, AckedWorkers};
+        Else ->
+            Else
+    end.
+
+handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
+    case fabric_util:remove_down_workers(State, NodeRef) of
+    {ok, NewState} ->
+        {ok, NewState};
+    error ->
+        Reason = {nodedown, <<"progress not possible">>},
+        {error, Reason}
+    end;
+handle_stream_start({rexi_EXIT, Reason}, Worker, State) ->
+    NewState = fabric_dict:erase(Worker, State),
+    case fabric_view:is_progress_possible(NewState) of
+    true ->
+        {ok, NewState};
+    false ->
+        {error, fabric_util:error_info(Reason)}
+    end;
+handle_stream_start(rexi_STREAM_INIT, {Worker, From}, State) ->
+    case fabric_dict:lookup_element(Worker, State) of
+    undefined ->
+        % This worker lost the race with other partition copies, terminate
+        rexi:stream_cancel(From),
+        {ok, State};
+    waiting ->
+        % Don't ack the worker yet so they don't start sending us
+        % rows until we're ready
+        NewState0 = fabric_dict:store(Worker, From, State),
+        NewState1 = fabric_view:remove_overlapping_shards(Worker, NewState0),
+        case fabric_dict:any(waiting, NewState1) of
+            true -> {ok, NewState1};
+            false -> {stop, NewState1}
+        end
+    end;
+handle_stream_start(Else, _, _) ->
+    exit({invalid_stream_start, Else}).
+
 recv(Workers, Keypos, Fun, Acc0) ->
     rexi_utils:recv(Workers, Keypos, Fun, Acc0, request_timeout(), infinity).