You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2019/03/06 06:35:03 UTC

[couchdb] branch shard-split updated: [fixup|fabric] introduce RingOpts to fabric_ring

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch shard-split
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/shard-split by this push:
     new fbfb116  [fixup|fabric] introduce RingOpts to fabric_ring
fbfb116 is described below

commit fbfb11657ab943690ad4fd4a2da3017c55dc168b
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Wed Mar 6 01:28:26 2019 -0500

    [fixup|fabric] introduce RingOpts to fabric_ring
    
    RingOpts customizes when a ring is considered complete. In the default case,
    when RingOpts is [] it operates on a "clean" range with copies having the same
    min and max ends. Ex: [00-ff], [00-7f], [80-ff], [00-ff].
    
    When RingOpts is [{any, [#shard{}]}] then any shard in the list of shard is
    considered to return a valid result. In this case copies might not all have the
    same matching ends. Ex: [00-ff], [00-7f], [00-ff]. This is basically to handle
    the case when we have a paritioned db shard that has a split copy.
---
 src/fabric/include/fabric.hrl              |   3 +-
 src/fabric/src/fabric_db_doc_count.erl     |   2 +-
 src/fabric/src/fabric_db_info.erl          |   2 +-
 src/fabric/src/fabric_design_doc_count.erl |   2 +-
 src/fabric/src/fabric_group_info.erl       |   2 +-
 src/fabric/src/fabric_ring.erl             | 294 ++++++++++++++++++++++-------
 src/fabric/src/fabric_streams.erl          |  29 ++-
 src/fabric/src/fabric_view.erl             |  10 +-
 src/fabric/src/fabric_view_all_docs.erl    |   4 +-
 src/fabric/src/fabric_view_map.erl         |   5 +-
 src/fabric/src/fabric_view_reduce.erl      |   5 +-
 11 files changed, 264 insertions(+), 94 deletions(-)

diff --git a/src/fabric/include/fabric.hrl b/src/fabric/include/fabric.hrl
index 8b71003..2a4da8b 100644
--- a/src/fabric/include/fabric.hrl
+++ b/src/fabric/include/fabric.hrl
@@ -38,7 +38,8 @@
     workers,
     ready,
     start_fun,
-    replacements
+    replacements,
+    ring_opts
 }).
 
 -record(view_row, {key, id, value, doc, worker}).
diff --git a/src/fabric/src/fabric_db_doc_count.erl b/src/fabric/src/fabric_db_doc_count.erl
index 1ea31f6..a91014b 100644
--- a/src/fabric/src/fabric_db_doc_count.erl
+++ b/src/fabric/src/fabric_db_doc_count.erl
@@ -41,7 +41,7 @@ handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Resps}) ->
     end;
 
 handle_message({rexi_EXIT, Reason}, Shard, {Counters, Resps}) ->
-    case fabric_ring:worker_exited(Shard, Counters, Resps) of
+    case fabric_ring:handle_error(Shard, Counters, Resps) of
         {ok, Counters1} -> {ok, {Counters1, Resps}};
         error -> {error, Reason}
     end;
diff --git a/src/fabric/src/fabric_db_info.erl b/src/fabric/src/fabric_db_info.erl
index b98d7ce..bb7a353 100644
--- a/src/fabric/src/fabric_db_info.erl
+++ b/src/fabric/src/fabric_db_info.erl
@@ -55,7 +55,7 @@ handle_message({rexi_DOWN, _, {_,NodeRef},_}, _, {Counters, Resps, CInfo}) ->
     end;
 
 handle_message({rexi_EXIT, Reason}, Shard, {Counters, Resps, CInfo}) ->
-    case fabric_ring:worker_exited(Shard, Counters, Resps) of
+    case fabric_ring:handle_error(Shard, Counters, Resps) of
         {ok, Counters1} -> {ok, {Counters1, Resps, CInfo}};
         error -> {error, Reason}
     end;
diff --git a/src/fabric/src/fabric_design_doc_count.erl b/src/fabric/src/fabric_design_doc_count.erl
index 2522e91..b0efc30 100644
--- a/src/fabric/src/fabric_design_doc_count.erl
+++ b/src/fabric/src/fabric_design_doc_count.erl
@@ -41,7 +41,7 @@ handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Resps}) ->
     end;
 
 handle_message({rexi_EXIT, Reason}, Shard, {Counters, Resps}) ->
-    case fabric_ring:worker_exited(Shard, Counters, Resps) of
+    case fabric_ring:handle_error(Shard, Counters, Resps) of
         {ok, Counters1} -> {ok, {Counters1, Resps}};
         error -> {error, Reason}
     end;
diff --git a/src/fabric/src/fabric_group_info.erl b/src/fabric/src/fabric_group_info.erl
index 61f7c76..be50742 100644
--- a/src/fabric/src/fabric_group_info.erl
+++ b/src/fabric/src/fabric_group_info.erl
@@ -47,7 +47,7 @@ handle_message({rexi_DOWN, _, {_,NodeRef},_}, _, {Counters, Resps, USet}) ->
     end;
 
 handle_message({rexi_EXIT, Reason}, Shard, {Counters, Resps, USet}) ->
-    case fabric_ring:worker_exited(Shard, Counters, Resps) of
+    case fabric_ring:handle_error(Shard, Counters, Resps) of
         {ok, Counters1} -> {ok, {Counters1, Resps, USet}};
         error -> {error, Reason}
     end;
diff --git a/src/fabric/src/fabric_ring.erl b/src/fabric/src/fabric_ring.erl
index 668249a..e3168e9 100644
--- a/src/fabric/src/fabric_ring.erl
+++ b/src/fabric/src/fabric_ring.erl
@@ -15,12 +15,14 @@
 
 -export([
     is_progress_possible/1,
+    is_progress_possible/2,
     get_shard_replacements/2,
-    worker_exited/3,
     node_down/3,
+    node_down/4,
     handle_error/3,
-    handle_response/4
-
+    handle_error/4,
+    handle_response/4,
+    handle_response/5
 ]).
 
 
@@ -29,12 +31,21 @@
 
 
 -type fabric_dict() :: [{#shard{}, any()}].
+-type ring_opts() :: [atom() | tuple()].
 
 
 %% @doc looks for a fully covered keyrange in the list of counters
 -spec is_progress_possible(fabric_dict()) -> boolean().
 is_progress_possible(Counters) ->
-    mem3_util:get_ring(get_worker_ranges(Counters)) =/= [].
+    is_progress_possible(Counters, []).
+
+
+%% @doc looks for a fully covered keyrange in the list of counters
+%% This version take ring option to configure how progress will
+%% be checked. By default, [], checks that the full ring is covered.
+-spec is_progress_possible(fabric_dict(), ring_opts()) -> boolean().
+is_progress_possible(Counters, RingOpts) ->
+    is_progress_possible(Counters, [], 0, ?RING_END, RingOpts).
 
 
 -spec get_shard_replacements(binary(), [#shard{}]) -> [#shard{}].
@@ -46,23 +57,20 @@ get_shard_replacements(DbName, UsedShards0) ->
     get_shard_replacements_int(AllLiveShards -- UsedShards, UsedShards).
 
 
--spec worker_exited(#shard{}, fabric_dict(), [{any(), #shard{}, any()}]) ->
+-spec node_down(node(), fabric_dict(), fabric_dict()) ->
     {ok, fabric_dict()} | error.
-worker_exited(Shard, Workers, Responses) ->
-    Workers1 = fabric_dict:erase(Shard, Workers),
-    case is_progress_possible(Workers1, Responses) of
-        true -> {ok, Workers1};
-        false -> error
-    end.
+node_down(Node, Workers, Responses) ->
+    node_down(Node, Workers, Responses, []).
 
 
--spec node_down(node(), fabric_dict(), fabric_dict()) ->
+-spec node_down(node(), fabric_dict(), fabric_dict(), ring_opts()) ->
     {ok, fabric_dict()} | error.
-node_down(Node, Workers, Responses) ->
+node_down(Node, Workers, Responses, RingOpts) ->
+    {B, E} = range_bounds(Workers, Responses),
     Workers1 = fabric_dict:filter(fun(#shard{node = N}, _) ->
         N =/= Node
     end, Workers),
-    case is_progress_possible(Workers1, Responses) of
+    case is_progress_possible(Workers1, Responses, B, E, RingOpts) of
         true -> {ok, Workers1};
         false -> error
     end.
@@ -71,8 +79,15 @@ node_down(Node, Workers, Responses) ->
 -spec handle_error(#shard{}, fabric_dict(), fabric_dict()) ->
     {ok, fabric_dict()} | error.
 handle_error(Shard, Workers, Responses) ->
+    handle_error(Shard, Workers, Responses, []).
+
+
+-spec handle_error(#shard{}, fabric_dict(), fabric_dict(), ring_opts()) ->
+    {ok, fabric_dict()} | error.
+handle_error(Shard, Workers, Responses, RingOpts) ->
+    {B, E} = range_bounds(Workers, Responses),
     Workers1 = fabric_dict:erase(Shard, Workers),
-    case is_progress_possible(Workers1, Responses) of
+    case is_progress_possible(Workers1, Responses, B, E, RingOpts) of
         true -> {ok, Workers1};
         false -> error
     end.
@@ -81,43 +96,107 @@ handle_error(Shard, Workers, Responses) ->
 -spec handle_response(#shard{}, any(), fabric_dict(), fabric_dict()) ->
     {ok, {fabric_dict(), fabric_dict()}} | {stop, fabric_dict()}.
 handle_response(Shard, Response, Workers, Responses) ->
-    handle_response(Shard, Response, Workers, Responses, fun stop_workers/1).
+    handle_response(Shard, Response, Workers, Responses, []).
+
+
+-spec handle_response(#shard{}, any(), fabric_dict(), fabric_dict(),
+        ring_opts()) ->
+    {ok, {fabric_dict(), fabric_dict()}} | {stop, fabric_dict()}.
+handle_response(Shard, Response, Workers, Responses, RingOpts) ->
+    handle_response(Shard, Response, Workers, Responses, RingOpts,
+        fun stop_workers/1).
 
 
--spec handle_response(#shard{}, any(), fabric_dict(), fabric_dict(), fun()) ->
-    {ok, {fabric_dict(), fabric_dict()}} | {stop, list()}.
-handle_response(Shard, Response, Workers, Responses, CleanupCb) ->
+% Worker response handler. Gets reponses from shard and puts them in the list
+% until they complete a full ring. Then kill unused responses and remaining
+% workers.
+%
+% How a ring "completes" is driven by RingOpts:
+%
+%  * When RingOpts is [] (the default case) responses must form a "clean"
+%    ring, where all copies at the start of the range and end of the range must
+%    have the same boundary values.
+%
+%  * When RingOpts is [{any, [#shard{}]}] responses are accepted from any of
+%    the provided list of shards. This type of ring might be used when querying
+%    a partitioned database. As soon as a result from any of the shards
+%    arrives, result collection stops.
+%
+handle_response(Shard, Response, Workers, Responses, RingOpts, CleanupCb) ->
     Workers1 = fabric_dict:erase(Shard, Workers),
-    #shard{range = [B, E]} = Shard,
-    Responses1 = [{{B, E}, Shard, Response} | Responses],
-    ResponseRanges = lists:map(fun({R, _, _}) -> R end, Responses1),
-    {MinB, MaxE} = get_range_bounds(Workers, ResponseRanges),
-    case mem3_util:get_ring(ResponseRanges, MinB, MaxE) of
+    case RingOpts of
+        [] ->
+            #shard{range = [B, E]} = Shard,
+            Responses1 = [{{B, E}, Shard, Response} | Responses],
+            handle_response_ring(Workers1, Responses1, CleanupCb);
+        [{any, Any}] ->
+            handle_response_any(Shard, Response, Workers1, Any, CleanupCb)
+    end.
+
+
+handle_response_ring(Workers, Responses, CleanupCb) ->
+    {MinB, MaxE} = range_bounds(Workers, Responses),
+    Ranges = lists:map(fun({R, _, _}) -> R end, Responses),
+    case mem3_util:get_ring(Ranges, MinB, MaxE) of
         [] ->
-            {ok, {Workers1, Responses1}};
+            {ok, {Workers, Responses}};
         Ring ->
             % Return one response per range in the ring. The
             % response list is reversed before sorting so that the
             % first shard copy to reply is first. We use keysort
             % because it is documented as being stable so that
             % we keep the relative order of duplicate shards
-            SortedResponses = lists:keysort(1, lists:reverse(Responses1)),
+            SortedResponses = lists:keysort(1, lists:reverse(Responses)),
             UsedResponses = get_responses(Ring, SortedResponses),
             % Kill all the remaining workers as well as the redunant responses
-            stop_unused_workers(Workers1, Responses1, UsedResponses, CleanupCb),
-            {stop, UsedResponses}
+            stop_unused_workers(Workers, Responses, UsedResponses, CleanupCb),
+            {stop, fabric_dict:from_list(UsedResponses)}
+    end.
+
+
+handle_response_any(Shard, Response, Workers, Any, CleanupCb) ->
+    case lists:member(Shard#shard{ref = undefined}, Any) of
+        true ->
+            stop_unused_workers(Workers, [], [], CleanupCb),
+            {stop, fabric_dict:from_list([{Shard, Response}])};
+        false ->
+            {ok, {Workers, []}}
     end.
 
 
-% This version combines workers that are still waiting and the ones that have
-% responded already.
--spec is_progress_possible(fabric_dict(), [{any(), #shard{}, any()}]) ->
-    boolean().
-is_progress_possible(Counters, Responses) ->
+% Check if workers still waiting and the already received responses could
+% still form a continous range. The range won't always be the full ring, and
+% the bounds are computed based on the minimum and maximum interval beginning
+% and ends.
+%
+% There is also a special case where even if the ring cannot be formed, but
+% there is an overlap between all the shards, then it's considered that
+% progress can still be made. This is essentially to allow for split
+% partitioned shards where one shard copy on a node was split the set of ranges
+% might look like: 00-ff, 00-ff, 07-ff. Even if both 00-ff workers exit,
+% progress can still be made with the remaining 07-ff copy.
+%
+-spec is_progress_possible(fabric_dict(), [{any(), #shard{}, any()}],
+    non_neg_integer(), non_neg_integer(), ring_opts()) -> boolean().
+is_progress_possible([], [], _, _, _) ->
+    false;
+
+is_progress_possible(Counters, Responses, MinB, MaxE, []) ->
     ResponseRanges = lists:map(fun({{B, E}, _, _}) -> {B, E} end, Responses),
-    {MinB, MaxE} = get_range_bounds(Counters, ResponseRanges),
-    Ranges = get_worker_ranges(Counters) ++ ResponseRanges,
-    mem3_util:get_ring(Ranges, MinB, MaxE) =/= [].
+    Ranges = worker_ranges(Counters) ++ ResponseRanges,
+    mem3_util:get_ring(Ranges, MinB, MaxE) =/= [];
+
+is_progress_possible(Counters, Responses, _, _, [{any, AnyShards}]) ->
+    InAny = fun(S) -> lists:member(S#shard{ref = undefined}, AnyShards) end,
+    case fabric_dict:filter(fun(S, _) -> InAny(S) end, Counters) of
+        [] ->
+            case lists:filter(fun({_, S, _}) -> InAny(S) end, Responses) of
+                [] -> false;
+                [_ | _] -> true
+            end;
+        [_ | _] ->
+            true
+    end.
 
 
 get_shard_replacements_int(UnusedShards, UsedShards) ->
@@ -144,16 +223,17 @@ get_shard_replacements_int(UnusedShards, UsedShards) ->
     end, [], UsedShards).
 
 
--spec get_worker_ranges(fabric_dict()) -> [{integer(), integer()}].
-get_worker_ranges(Workers) ->
+-spec worker_ranges(fabric_dict()) -> [{integer(), integer()}].
+worker_ranges(Workers) ->
     Ranges = fabric_dict:fold(fun(#shard{range=[X, Y]}, _, Acc) ->
         [{X, Y} | Acc]
     end, [], Workers),
     lists:usort(Ranges).
 
 
-get_range_bounds(Workers, ResponseRanges) ->
-    Ranges = get_worker_ranges(Workers) ++ ResponseRanges,
+range_bounds(Workers, Responses) ->
+    RespRanges = lists:map(fun({R, _, _}) -> R end, Responses),
+    Ranges = worker_ranges(Workers) ++ RespRanges,
     {Bs, Es} = lists:unzip(Ranges),
     {lists:min(Bs), lists:max(Es)}.
 
@@ -172,9 +252,9 @@ stop_unused_workers(_, _, _, undefined) ->
     ok;
 
 stop_unused_workers(Workers, AllResponses, UsedResponses, CleanupCb) ->
-    WorkerShards = [S || {S, _V} <- Workers],
-    Used  = [S || {S, _V} <- UsedResponses],
-    Unused = [S || {_R, S, _V} <- AllResponses, not lists:member(S, Used)],
+    WorkerShards = [S || {S, _} <- Workers],
+    Used  = [S || {S, _} <- UsedResponses],
+    Unused = [S || {_, S, _} <- AllResponses, not lists:member(S, Used)],
     CleanupCb(WorkerShards ++ Unused).
 
 
@@ -184,26 +264,71 @@ stop_workers(Shards) when is_list(Shards) ->
 
 % Unit tests
 
-is_progress_possible_test() ->
+is_progress_possible_full_range_test() ->
+    % a base case
+    ?assertEqual(false, is_progress_possible([], [], 0, 0, [])),
     T1 = [[0, ?RING_END]],
-    ?assertEqual(is_progress_possible(mk_cnts(T1)), true),
+    ?assertEqual(true, is_progress_possible(mk_cnts(T1))),
     T2 = [[0, 10], [11, 20], [21, ?RING_END]],
-    ?assertEqual(is_progress_possible(mk_cnts(T2)), true),
+    ?assertEqual(true, is_progress_possible(mk_cnts(T2))),
     % gap
     T3 = [[0, 10], [12, ?RING_END]],
-    ?assertEqual(is_progress_possible(mk_cnts(T3)), false),
+    ?assertEqual(false, is_progress_possible(mk_cnts(T3))),
     % outside range
     T4 = [[1, 10], [11, 20], [21, ?RING_END]],
-    ?assertEqual(is_progress_possible(mk_cnts(T4)), false),
+    ?assertEqual(false, is_progress_possible(mk_cnts(T4))),
     % outside range
     T5 = [[0, 10], [11, 20], [21, ?RING_END + 1]],
-    ?assertEqual(is_progress_possible(mk_cnts(T5)), false),
+    ?assertEqual(false, is_progress_possible(mk_cnts(T5))),
     % possible progress but with backtracking
     T6 = [[0, 10], [11, 20], [0, 5], [6, 21], [21, ?RING_END]],
-    ?assertEqual(is_progress_possible(mk_cnts(T6)), true),
+    ?assertEqual(true, is_progress_possible(mk_cnts(T6))),
     % not possible, overlap is not exact
     T7 = [[0, 10], [13, 20], [21, ?RING_END], [9, 12]],
-    ?assertEqual(is_progress_possible(mk_cnts(T7)), false).
+    ?assertEqual(false, is_progress_possible(mk_cnts(T7))).
+
+
+is_progress_possible_with_responses_test() ->
+    C1 = mk_cnts([[0, ?RING_END]]),
+    ?assertEqual(true, is_progress_possible(C1, [], 0, ?RING_END, [])),
+    % check for gaps
+    C2 = mk_cnts([[5, 6], [7, 8]]),
+    ?assertEqual(true, is_progress_possible(C2, [], 5, 8, [])),
+    ?assertEqual(false, is_progress_possible(C2, [], 4, 8, [])),
+    ?assertEqual(false, is_progress_possible(C2, [], 5, 7, [])),
+    ?assertEqual(false, is_progress_possible(C2, [], 4, 9, [])),
+    % check for uneven shard range copies
+    C3 = mk_cnts([[2, 5], [2, 10]]),
+    ?assertEqual(true, is_progress_possible(C3, [], 2, 10, [])),
+    ?assertEqual(false, is_progress_possible(C3, [], 2, 11, [])),
+    ?assertEqual(false, is_progress_possible(C3, [], 3, 10, [])),
+    % they overlap but still not a proper ring
+    C4 = mk_cnts([[2, 4], [3, 7], [6, 10]]),
+    ?assertEqual(false, is_progress_possible(C4, [], 2, 10, [])),
+    % some of the ranges are in responses
+    RS1 = mk_resps([{"n1", 7, 8, 42}]),
+    C5 = mk_cnts([[5, 6]]),
+    ?assertEqual(true, is_progress_possible(C5, RS1, 5, 8, [])),
+    ?assertEqual(false, is_progress_possible([], RS1, 5, 8, [])),
+    ?assertEqual(true, is_progress_possible([], RS1, 7, 8, [])).
+
+
+is_progress_possible_with_ring_opts_test() ->
+    Opts = [{any, [mk_shard("n1", [0, 5]), mk_shard("n2", [3, 10])]}],
+    C1 = [{mk_shard("n1", [0, ?RING_END]), nil}],
+    RS1 = mk_resps([{"n1", 3, 10, 42}]),
+    ?assertEqual(false, is_progress_possible(C1, [], 0, ?RING_END, Opts)),
+    ?assertEqual(false, is_progress_possible([], [], 0, ?RING_END, Opts)),
+    ?assertEqual(false, is_progress_possible([], RS1, 0, ?RING_END, Opts)),
+    % explicitly accept only the shard specified in the ring options
+    ?assertEqual(false, is_progress_possible([], RS1, 3, 10, [{any, []}])),
+    % need to match the node exactly
+    ?assertEqual(false, is_progress_possible([], RS1, 3, 10, Opts)),
+    RS2 = mk_resps([{"n2", 3, 10, 42}]),
+    ?assertEqual(true, is_progress_possible([], RS2, 3, 10, Opts)),
+    % assert that counters can fill the ring not just the response
+    C2 = [{mk_shard("n1", [0, 5]), nil}],
+    ?assertEqual(true, is_progress_possible(C2, [], 0, ?RING_END, Opts)).
 
 
 get_shard_replacements_test() ->
@@ -233,13 +358,13 @@ handle_response_basic_test() ->
 
     Workers1 = fabric_dict:init([Shard1, Shard2], nil),
 
-    Result1 = handle_response(Shard1, 42, Workers1, [], undefined),
+    Result1 = handle_response(Shard1, 42, Workers1, [], [], undefined),
     ?assertMatch({ok, {_, _}}, Result1),
     {ok, {Workers2, Responses1}} = Result1,
     ?assertEqual(fabric_dict:erase(Shard1, Workers1), Workers2),
     ?assertEqual([{{0, 1}, Shard1, 42}], Responses1),
 
-    Result2 = handle_response(Shard2, 43, Workers2, Responses1, undefined),
+    Result2 = handle_response(Shard2, 43, Workers2, Responses1, [], undefined),
     ?assertEqual({stop, [{Shard1, 42}, {Shard2, 43}]}, Result2).
 
 
@@ -249,13 +374,13 @@ handle_response_incomplete_ring_test() ->
 
     Workers1 = fabric_dict:init([Shard1, Shard2], nil),
 
-    Result1 = handle_response(Shard1, 42, Workers1, [], undefined),
+    Result1 = handle_response(Shard1, 42, Workers1, [], [], undefined),
     ?assertMatch({ok, {_, _}}, Result1),
     {ok, {Workers2, Responses1}} = Result1,
     ?assertEqual(fabric_dict:erase(Shard1, Workers1), Workers2),
     ?assertEqual([{{0, 1}, Shard1, 42}], Responses1),
 
-    Result2 = handle_response(Shard2, 43, Workers2, Responses1, undefined),
+    Result2 = handle_response(Shard2, 43, Workers2, Responses1, [], undefined),
     ?assertEqual({stop, [{Shard1, 42}, {Shard2, 43}]}, Result2).
 
 
@@ -266,15 +391,15 @@ handle_response_test_multiple_copies_test() ->
 
     Workers1 = fabric_dict:init([Shard1, Shard2, Shard3], nil),
 
-    Result1 = handle_response(Shard1, 42, Workers1, [], undefined),
+    Result1 = handle_response(Shard1, 42, Workers1, [], [], undefined),
     ?assertMatch({ok, {_, _}}, Result1),
     {ok, {Workers2, Responses1}} = Result1,
 
-    Result2 = handle_response(Shard2, 43, Workers2, Responses1, undefined),
+    Result2 = handle_response(Shard2, 43, Workers2, Responses1, [], undefined),
     ?assertMatch({ok, {_, _}}, Result2),
     {ok, {Workers3, Responses2}} = Result2,
 
-    Result3 = handle_response(Shard3, 44, Workers3, Responses2, undefined),
+    Result3 = handle_response(Shard3, 44, Workers3, Responses2, [], undefined),
     % Use the value (42) to distinguish between [0, 1] copies. In reality
     % they should have the same value but here we need to assert that copy
     % that responded first is included in the ring.
@@ -287,22 +412,46 @@ handle_response_test_backtracking_test() ->
     Shard3 = mk_shard("n2", [2, ?RING_END]),
     Shard4 = mk_shard("n3", [0, 1]),
 
-    Workers1 = fabric_dict:init([Shard1, Shard2, Shard3, Shard3], nil),
+    Workers1 = fabric_dict:init([Shard1, Shard2, Shard3, Shard4], nil),
 
-    Result1 = handle_response(Shard1, 42, Workers1, [], undefined),
+    Result1 = handle_response(Shard1, 42, Workers1, [], [], undefined),
     ?assertMatch({ok, {_, _}}, Result1),
     {ok, {Workers2, Responses1}} = Result1,
 
-    Result2 = handle_response(Shard2, 43, Workers2, Responses1, undefined),
+    Result2 = handle_response(Shard2, 43, Workers2, Responses1, [], undefined),
     ?assertMatch({ok, {_, _}}, Result2),
     {ok, {Workers3, Responses2}} = Result2,
 
-    Result3 = handle_response(Shard3, 44, Workers3, Responses2, undefined),
+    Result3 = handle_response(Shard3, 44, Workers3, Responses2, [], undefined),
     ?assertMatch({ok, {_, _}}, Result3),
     {ok, {Workers4, Responses3}} = Result3,
 
-    Result4 = handle_response(Shard4, 45, Workers4, Responses3, undefined),
-    ?assertEqual({stop, [{Shard4, 45}, {Shard3, 44}]}, Result4).
+    Result4 = handle_response(Shard4, 45, Workers4, Responses3, [], undefined),
+    ?assertEqual({stop, [{Shard3, 44}, {Shard4, 45}]}, Result4).
+
+
+handle_response_test_ring_opts_test() ->
+    Shard1 = mk_shard("n1", [0, 5]),
+    Shard2 = mk_shard("n2", [0, 1]),
+    Shard3 = mk_shard("n3", [0, 1]),
+
+    Opts = [{any, [mk_shard("n3", [0, 1])]}],
+
+    ShardList = [Shard1, Shard2, Shard3],
+    WithRefs = [S#shard{ref = make_ref()} || S <- ShardList],
+    Workers1 = fabric_dict:init(WithRefs, nil),
+
+    Result1 = handle_response(Shard1, 42, Workers1, [], Opts, undefined),
+    ?assertMatch({ok, {_, _}}, Result1),
+    {ok, {Workers2, []}} = Result1,
+
+    % Still waiting because the node doesn't match
+    Result2 = handle_response(Shard2, 43, Workers2, [], Opts, undefined),
+    ?assertMatch({ok, {_, _}}, Result2),
+    {ok, {Workers3, []}} = Result2,
+
+    Result3 = handle_response(Shard3, 44, Workers3, [], Opts, undefined),
+    ?assertEqual({stop, [{Shard3, 44}]}, Result3).
 
 
 handle_error_test() ->
@@ -313,7 +462,7 @@ handle_error_test() ->
 
     Workers1 = fabric_dict:init([Shard1, Shard2, Shard3, Shard4], nil),
 
-    Result1 = handle_response(Shard1, 42, Workers1, [], undefined),
+    Result1 = handle_response(Shard1, 42, Workers1, [], [], undefined),
     ?assertMatch({ok, {_, _}}, Result1),
     {ok, {Workers2, Responses1}} = Result1,
 
@@ -322,10 +471,9 @@ handle_error_test() ->
     {ok, Workers3} = Result2,
     ?assertEqual(fabric_dict:erase(Shard2, Workers2), Workers3),
 
-    Result3 = handle_response(Shard3, 44, Workers3, Responses1, undefined),
+    Result3 = handle_response(Shard3, 44, Workers3, Responses1, [], undefined),
     ?assertMatch({ok, {_, _}}, Result3),
     {ok, {Workers4, Responses3}} = Result3,
-
     ?assertEqual(error, handle_error(Shard4, Workers4, Responses3)).
 
 
@@ -337,11 +485,11 @@ node_down_test() ->
 
     Workers1 = fabric_dict:init([Shard1, Shard2, Shard3, Shard4], nil),
 
-    Result1 = handle_response(Shard1, 42, Workers1, [], undefined),
+    Result1 = handle_response(Shard1, 42, Workers1, [], [], undefined),
     ?assertMatch({ok, {_, _}}, Result1),
     {ok, {Workers2, Responses1}} = Result1,
 
-    Result2 = handle_response(Shard2, 43, Workers2, Responses1, undefined),
+    Result2 = handle_response(Shard2, 43, Workers2, Responses1, [], undefined),
     ?assertMatch({ok, {_, _}}, Result2),
     {ok, {Workers3, Responses2}} = Result2,
 
@@ -350,7 +498,7 @@ node_down_test() ->
     {ok, Workers4} = Result3,
     ?assertEqual([{Shard3, nil}, {Shard4, nil}], Workers4),
 
-    Result4 = handle_response(Shard3, 44, Workers4, Responses2, undefined),
+    Result4 = handle_response(Shard3, 44, Workers4, Responses2, [], undefined),
     ?assertMatch({ok, {_, _}}, Result4),
     {ok, {Workers5, Responses3}} = Result4,
 
@@ -362,7 +510,11 @@ node_down_test() ->
 
 mk_cnts(Ranges) ->
     Shards = lists:map(fun mk_shard/1, Ranges),
-    orddict:from_list([{Shard,nil} || Shard <- Shards]).
+    fabric_dict:init([S#shard{ref = make_ref()} || S <- Shards], nil).
+
+
+mk_resps(RangeNameVals) ->
+    [{{B, E}, mk_shard(Name, [B, E]), V} || {Name, B, E, V} <- RangeNameVals].
 
 
 mk_shard([B, E]) when is_integer(B), is_integer(E) ->
diff --git a/src/fabric/src/fabric_streams.erl b/src/fabric/src/fabric_streams.erl
index a4c9df6..d0a549d 100644
--- a/src/fabric/src/fabric_streams.erl
+++ b/src/fabric/src/fabric_streams.erl
@@ -14,7 +14,9 @@
 
 -export([
     start/2,
+    start/3,
     start/4,
+    start/5,
     cleanup/1
 ]).
 
@@ -28,13 +30,23 @@
 start(Workers, Keypos) ->
     start(Workers, Keypos, undefined, undefined).
 
-start(Workers0, Keypos, StartFun, Replacements) ->
+
+start(Workers, Keypos, RingOpts) ->
+    start(Workers, Keypos, undefined, undefined, RingOpts).
+
+
+start(Workers, Keypos, StartFun, Replacements) ->
+    start(Workers, Keypos, StartFun, Replacements, []).
+
+
+start(Workers0, Keypos, StartFun, Replacements, RingOpts) ->
     Fun = fun handle_stream_start/3,
     Acc = #stream_acc{
         workers = fabric_dict:init(Workers0, waiting),
         ready = [],
         start_fun = StartFun,
-        replacements = Replacements
+        replacements = Replacements,
+        ring_opts = RingOpts
     },
     spawn_worker_cleaner(self(), Workers0),
     Timeout = fabric_util:request_timeout(),
@@ -65,8 +77,8 @@ cleanup(Workers) ->
 
 
 handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, St) ->
-    #stream_acc{workers = Workers, ready = Ready} = St,
-    case fabric_ring:node_down(NodeRef, Workers, Ready) of
+    #stream_acc{workers = Workers, ready = Ready, ring_opts = RingOpts} = St,
+    case fabric_ring:node_down(NodeRef, Workers, Ready, RingOpts) of
         {ok, Workers1} ->
             {ok, St#stream_acc{workers = Workers1}};
         error ->
@@ -77,9 +89,10 @@ handle_stream_start({rexi_EXIT, Reason}, Worker, St) ->
     #stream_acc{
         workers = Workers,
         ready = Ready,
-        replacements = Replacements
+        replacements = Replacements,
+        ring_opts = RingOpts
     } = St,
-    case {fabric_ring:worker_exited(Worker, Workers, Ready), Reason} of
+    case {fabric_ring:handle_error(Worker, Workers, Ready, RingOpts), Reason} of
         {{ok, Workers1}, _Reason} ->
             {ok, St#stream_acc{workers = Workers1}};
         {error, {maintenance_mode, _Node}} when Replacements /= undefined ->
@@ -110,14 +123,14 @@ handle_stream_start({rexi_EXIT, Reason}, Worker, St) ->
     end;
 
 handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St) ->
-    #stream_acc{workers = Workers, ready = Ready} = St,
+    #stream_acc{workers = Workers, ready = Ready, ring_opts = RingOpts} = St,
     case fabric_dict:lookup_element(Worker, Workers) of
     undefined ->
         % This worker lost the race with other partition copies, terminate
         rexi:stream_cancel(From),
         {ok, St};
     waiting ->
-        case fabric_ring:handle_response(Worker, From, Workers, Ready) of
+        case fabric_ring:handle_response(Worker, From, Workers, Ready, RingOpts) of
             {ok, {Workers1, Ready1}} ->
                 % Don't have a full ring yet. Keep getting responses
                 {ok, St#stream_acc{workers = Workers1, ready = Ready1}};
diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl
index eb31f53..193f6e7 100644
--- a/src/fabric/src/fabric_view.erl
+++ b/src/fabric/src/fabric_view.erl
@@ -280,13 +280,15 @@ get_shards(Db, #mrargs{} = Args) ->
     % request.
     case {Args#mrargs.stable, Partition} of
         {true, undefined} ->
-            mem3:ushards(DbName);
+            {mem3:ushards(DbName), []};
         {true, Partition} ->
-            mem3:ushards(DbName, couch_partition:shard_key(Partition));
+            Shards = mem3:ushards(DbName, couch_partition:shard_key(Partition)),
+            {Shards, [{any, Shards}]};
         {false, undefined} ->
-            mem3:shards(DbName);
+            {mem3:shards(DbName), []};
         {false, Partition} ->
-            mem3:shards(DbName, couch_partition:shard_key(Partition))
+            Shards = mem3:shards(DbName, couch_partition:shard_key(Partition)),
+            {Shards, [{any, Shards}]}
     end.
 
 maybe_update_others(DbName, DDoc, ShardsInvolved, ViewName,
diff --git a/src/fabric/src/fabric_view_all_docs.erl b/src/fabric/src/fabric_view_all_docs.erl
index 9049eaa..1d87e3d 100644
--- a/src/fabric/src/fabric_view_all_docs.erl
+++ b/src/fabric/src/fabric_view_all_docs.erl
@@ -23,12 +23,12 @@
 go(Db, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) ->
     {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(QueryArgs),
     DbName = fabric:dbname(Db),
-    Shards = shards(Db, QueryArgs),
+    {Shards, RingOpts} = shards(Db, QueryArgs),
     Workers0 = fabric_util:submit_jobs(
             Shards, fabric_rpc, all_docs, [Options, WorkerArgs]),
     RexiMon = fabric_util:create_monitors(Workers0),
     try
-        case fabric_streams:start(Workers0, #shard.ref) of
+        case fabric_streams:start(Workers0, #shard.ref, RingOpts) of
             {ok, Workers} ->
                 try
                     go(DbName, Options, Workers, CoordArgs, Callback, Acc)
diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl
index 022bec8..5a5cc13 100644
--- a/src/fabric/src/fabric_view_map.erl
+++ b/src/fabric/src/fabric_view_map.erl
@@ -26,7 +26,7 @@ go(DbName, Options, GroupId, View, Args, Callback, Acc, VInfo)
 
 go(Db, Options, DDoc, View, Args, Callback, Acc, VInfo) ->
     DbName = fabric:dbname(Db),
-    Shards = fabric_view:get_shards(Db, Args),
+    {Shards, RingOpts} = fabric_view:get_shards(Db, Args),
     {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args),
     DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
     fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, View, Args),
@@ -38,7 +38,8 @@ go(Db, Options, DDoc, View, Args, Callback, Acc, VInfo) ->
     Workers0 = fabric_util:submit_jobs(Shards, fabric_rpc, map_view, RPCArgs),
     RexiMon = fabric_util:create_monitors(Workers0),
     try
-        case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls) of
+        case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls,
+                RingOpts) of
             {ok, ddoc_updated} ->
                 Callback({error, ddoc_updated}, Acc);
             {ok, Workers} ->
diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl
index f98bd10..a432b2c 100644
--- a/src/fabric/src/fabric_view_reduce.erl
+++ b/src/fabric/src/fabric_view_reduce.erl
@@ -25,7 +25,7 @@ go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) -
 
 go(Db, DDoc, VName, Args, Callback, Acc, VInfo) ->
     DbName = fabric:dbname(Db),
-    Shards = fabric_view:get_shards(Db, Args),
+    {Shards, RingOpts} = fabric_view:get_shards(Db, Args),
     {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args),
     DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
     RPCArgs = [DocIdAndRev, VName, WorkerArgs],
@@ -37,7 +37,8 @@ go(Db, DDoc, VName, Args, Callback, Acc, VInfo) ->
     Workers0 = fabric_util:submit_jobs(Shards,fabric_rpc,reduce_view,RPCArgs),
     RexiMon = fabric_util:create_monitors(Workers0),
     try
-        case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls) of
+        case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls,
+                RingOpts) of
             {ok, ddoc_updated} ->
                 Callback({error, ddoc_updated}, Acc);
             {ok, Workers} ->