Allow shard replacements in stream_start

This adds fabric_util:stream_start/4 which accepts a start function and
a proplist of replacement shards keyed by range. If a worker exits due
to maintenance mode and prevents progress from being made, the list of
replacement shards is searched for any replacements for the range. If
any are found the start function is called for each possible
replacement. The start function takes the possible replacement and
should return the new worker.

BugzId: 20423


Branch: refs/heads/windsor-merge
Commit: f225697757ed07fd344016f1a6a85674b800686c
Parents: 0f45478
Author: Paul J. Davis <>
Authored: Thu Sep 12 10:23:48 2013 -0500
Committer: Robert Newson <>
Committed: Fri Aug 1 15:33:41 2014 +0100

 src/fabric_util.erl | 82 +++++++++++++++++++++++++++++++++++-------------
 1 file changed, 60 insertions(+), 22 deletions(-)
diff --git a/src/fabric_util.erl b/src/fabric_util.erl
index f11abe3..fb6cd8b 100644
--- a/src/fabric_util.erl
+++ b/src/fabric_util.erl
@@ -16,13 +16,19 @@
         update_counter/3, remove_ancestors/2, create_monitors/1, kv/2,
+-export([stream_start/2, stream_start/4]).
+-record(stream_acc, {
+    workers,
+    start_fun,
+    replacements
 remove_down_workers(Workers, BadNode) ->
     Filter = fun(#shard{node = Node}, _) -> Node =/= BadNode end,
     NewWorkers = fabric_dict:filter(Filter, Workers),
@@ -45,12 +51,19 @@ submit_jobs(Shards, Module, EndPoint, ExtraArgs) ->
 cleanup(Workers) ->
     [rexi:kill(Node, Ref) || #shard{node=Node, ref=Ref} <- Workers].
-stream_start(Workers0, Keypos) ->
+stream_start(Workers, Keypos) ->
+    stream_start(Workers, Keypos, undefined, undefined).
+stream_start(Workers0, Keypos, StartFun, Replacements) ->
     Fun = fun handle_stream_start/3,
-    Acc = fabric_dict:init(Workers0, waiting),
+    Acc = #stream_acc{
+        workers = fabric_dict:init(Workers0, waiting),
+        start_fun = StartFun,
+        replacements = Replacements
+    },
     Timeout = request_timeout(),
     case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
-        {ok, Workers} ->
+        {ok, #stream_acc{workers=Workers}} ->
             true = fabric_view:is_progress_possible(Workers),
             AckedWorkers = fabric_dict:fold(fun(Worker, From, WorkerAcc) ->
@@ -61,36 +74,61 @@ stream_start(Workers0, Keypos) ->
-handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
-    case fabric_util:remove_down_workers(State, NodeRef) of
-    {ok, NewState} ->
-        {ok, NewState};
+handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, St) ->
+    case fabric_util:remove_down_workers(St#stream_acc.workers, NodeRef) of
+    {ok, Workers} ->
+        {ok, St#stream_acc{workers=Workers}};
     error ->
         Reason = {nodedown, <<"progress not possible">>},
         {error, Reason}
-handle_stream_start({rexi_EXIT, Reason}, Worker, State) ->
-    NewState = fabric_dict:erase(Worker, State),
-    case fabric_view:is_progress_possible(NewState) of
-    true ->
-        {ok, NewState};
-    false ->
+handle_stream_start({rexi_EXIT, Reason}, Worker, St) ->
+    Workers = fabric_dict:erase(Worker, St#stream_acc.workers),
+    case {fabric_view:is_progress_possible(Workers), Reason} of
+    {true, _} ->
+        {ok, St#stream_acc{workers=Workers}};
+    {false, {maintenance_mode, _Node}} ->
+        % Check if we have replacements for this range
+        % and start the new workers if so.
+        case lists:keytake(Worker#shard.range, 1, St#stream_acc.replacements) of
+            {value, {_Range, WorkerReplacements}, NewReplacements} ->
+                FinalWorkers = lists:foldl(fun(Repl, NewWorkers) ->
+                    NewWorker = (St#stream_acc.start_fun)(Repl),
+                    fabric_dict:store(NewWorker, waiting, NewWorkers)
+                end, Workers, WorkerReplacements),
+                % Assert that our replaced worker provides us
+                % the oppurtunity to make progress.
+                true = fabric_view:is_progress_possible(FinalWorkers),
+                NewRefs = fabric_dict:fetch_keys(FinalWorkers),
+                {new_refs, NewRefs, St#stream_acc{
+                    workers=FinalWorkers,
+                    replacements=NewReplacements
+                }};
+            false ->
+                % If we progress isn't possible and we don't have any
+                % replacements then we're dead in the water.
+                Error = {nodedown, <<"progress not possible">>},
+                {error, Error}
+        end;
+    {false, _} ->
         {error, fabric_util:error_info(Reason)}
-handle_stream_start(rexi_STREAM_INIT, {Worker, From}, State) ->
-    case fabric_dict:lookup_element(Worker, State) of
+handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St) ->
+    case fabric_dict:lookup_element(Worker, St#stream_acc.workers) of
     undefined ->
         % This worker lost the race with other partition copies, terminate
-        {ok, State};
+        {ok, St};
     waiting ->
         % Don't ack the worker yet so they don't start sending us
         % rows until we're ready
-        NewState0 = fabric_dict:store(Worker, From, State),
-        NewState1 = fabric_view:remove_overlapping_shards(Worker, NewState0),
-        case fabric_dict:any(waiting, NewState1) of
-            true -> {ok, NewState1};
-            false -> {stop, NewState1}
+        Workers0 = fabric_dict:store(Worker, From, St#stream_acc.workers),
+        Workers1 = fabric_view:remove_overlapping_shards(Worker, Workers0),
+        case fabric_dict:any(waiting, Workers1) of
+            true ->
+                {ok, St#stream_acc{workers=Workers1}};
+            false ->
+                {stop, St#stream_acc{workers=Workers1}}
 handle_stream_start(Else, _, _) ->