You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/01 16:34:38 UTC

[17/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Allow shard replacements in stream_start

This adds fabric_util:stream_start/4 which accepts a start function and
a proplist of replacement shards keyed by range. If a worker exits due
to maintenance mode and prevents progress from being made, the list of
replacement shards is searched for any replacements for the range. If
any are found the start function is called for each possible
replacement. The start function takes the possible replacement and
should return the new worker.

BugzId: 20423


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/f2256977
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/f2256977
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/f2256977

Branch: refs/heads/windsor-merge
Commit: f225697757ed07fd344016f1a6a85674b800686c
Parents: 0f45478
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Sep 12 10:23:48 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:41 2014 +0100

----------------------------------------------------------------------
 src/fabric_util.erl | 82 +++++++++++++++++++++++++++++++++++-------------
 1 file changed, 60 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/f2256977/src/fabric_util.erl
----------------------------------------------------------------------
diff --git a/src/fabric_util.erl b/src/fabric_util.erl
index f11abe3..fb6cd8b 100644
--- a/src/fabric_util.erl
+++ b/src/fabric_util.erl
@@ -16,13 +16,19 @@
         update_counter/3, remove_ancestors/2, create_monitors/1, kv/2,
         remove_down_workers/2]).
 -export([request_timeout/0]).
--export([stream_start/2]).
+-export([stream_start/2, stream_start/4]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("eunit/include/eunit.hrl").
 
+-record(stream_acc, {
+    workers,
+    start_fun,
+    replacements
+}).
+
 remove_down_workers(Workers, BadNode) ->
     Filter = fun(#shard{node = Node}, _) -> Node =/= BadNode end,
     NewWorkers = fabric_dict:filter(Filter, Workers),
@@ -45,12 +51,19 @@ submit_jobs(Shards, Module, EndPoint, ExtraArgs) ->
 cleanup(Workers) ->
     [rexi:kill(Node, Ref) || #shard{node=Node, ref=Ref} <- Workers].
 
-stream_start(Workers0, Keypos) ->
+stream_start(Workers, Keypos) ->
+    stream_start(Workers, Keypos, undefined, undefined).
+
+stream_start(Workers0, Keypos, StartFun, Replacements) ->
     Fun = fun handle_stream_start/3,
-    Acc = fabric_dict:init(Workers0, waiting),
+    Acc = #stream_acc{
+        workers = fabric_dict:init(Workers0, waiting),
+        start_fun = StartFun,
+        replacements = Replacements
+    },
     Timeout = request_timeout(),
     case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
-        {ok, Workers} ->
+        {ok, #stream_acc{workers=Workers}} ->
             true = fabric_view:is_progress_possible(Workers),
             AckedWorkers = fabric_dict:fold(fun(Worker, From, WorkerAcc) ->
                 rexi:stream_start(From),
@@ -61,36 +74,61 @@ stream_start(Workers0, Keypos) ->
             Else
     end.
 
-handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
-    case fabric_util:remove_down_workers(State, NodeRef) of
-    {ok, NewState} ->
-        {ok, NewState};
+handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, St) ->
+    case fabric_util:remove_down_workers(St#stream_acc.workers, NodeRef) of
+    {ok, Workers} ->
+        {ok, St#stream_acc{workers=Workers}};
     error ->
         Reason = {nodedown, <<"progress not possible">>},
         {error, Reason}
     end;
-handle_stream_start({rexi_EXIT, Reason}, Worker, State) ->
-    NewState = fabric_dict:erase(Worker, State),
-    case fabric_view:is_progress_possible(NewState) of
-    true ->
-        {ok, NewState};
-    false ->
+handle_stream_start({rexi_EXIT, Reason}, Worker, St) ->
+    Workers = fabric_dict:erase(Worker, St#stream_acc.workers),
+    case {fabric_view:is_progress_possible(Workers), Reason} of
+    {true, _} ->
+        {ok, St#stream_acc{workers=Workers}};
+    {false, {maintenance_mode, _Node}} ->
+        % Check if we have replacements for this range
+        % and start the new workers if so.
+        case lists:keytake(Worker#shard.range, 1, St#stream_acc.replacements) of
+            {value, {_Range, WorkerReplacements}, NewReplacements} ->
+                FinalWorkers = lists:foldl(fun(Repl, NewWorkers) ->
+                    NewWorker = (St#stream_acc.start_fun)(Repl),
+                    fabric_dict:store(NewWorker, waiting, NewWorkers)
+                end, Workers, WorkerReplacements),
+                % Assert that our replaced worker provides us
+                % the oppurtunity to make progress.
+                true = fabric_view:is_progress_possible(FinalWorkers),
+                NewRefs = fabric_dict:fetch_keys(FinalWorkers),
+                {new_refs, NewRefs, St#stream_acc{
+                    workers=FinalWorkers,
+                    replacements=NewReplacements
+                }};
+            false ->
+                % If we progress isn't possible and we don't have any
+                % replacements then we're dead in the water.
+                Error = {nodedown, <<"progress not possible">>},
+                {error, Error}
+        end;
+    {false, _} ->
         {error, fabric_util:error_info(Reason)}
     end;
-handle_stream_start(rexi_STREAM_INIT, {Worker, From}, State) ->
-    case fabric_dict:lookup_element(Worker, State) of
+handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St) ->
+    case fabric_dict:lookup_element(Worker, St#stream_acc.workers) of
     undefined ->
         % This worker lost the race with other partition copies, terminate
         rexi:stream_cancel(From),
-        {ok, State};
+        {ok, St};
     waiting ->
         % Don't ack the worker yet so they don't start sending us
         % rows until we're ready
-        NewState0 = fabric_dict:store(Worker, From, State),
-        NewState1 = fabric_view:remove_overlapping_shards(Worker, NewState0),
-        case fabric_dict:any(waiting, NewState1) of
-            true -> {ok, NewState1};
-            false -> {stop, NewState1}
+        Workers0 = fabric_dict:store(Worker, From, St#stream_acc.workers),
+        Workers1 = fabric_view:remove_overlapping_shards(Worker, Workers0),
+        case fabric_dict:any(waiting, Workers1) of
+            true ->
+                {ok, St#stream_acc{workers=Workers1}};
+            false ->
+                {stop, St#stream_acc{workers=Workers1}}
         end
     end;
 handle_stream_start(Else, _, _) ->