You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2019/03/06 06:35:03 UTC
[couchdb] branch shard-split updated: [fixup|fabric] introduce
RingOpts to fabric_ring
This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch shard-split
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/shard-split by this push:
new fbfb116 [fixup|fabric] introduce RingOpts to fabric_ring
fbfb116 is described below
commit fbfb11657ab943690ad4fd4a2da3017c55dc168b
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Wed Mar 6 01:28:26 2019 -0500
[fixup|fabric] introduce RingOpts to fabric_ring
RingOpts customizes when a ring is considered complete. In the default case,
when RingOpts is [] it operates on a "clean" range with copies having the same
min and max ends. Ex: [00-ff], [00-7f], [80-ff], [00-ff].
When RingOpts is [{any, [#shard{}]}] then any shard in the list of shard is
considered to return a valid result. In this case copies might not all have the
same matching ends. Ex: [00-ff], [00-7f], [00-ff]. This is basically to handle
the case when we have a paritioned db shard that has a split copy.
---
src/fabric/include/fabric.hrl | 3 +-
src/fabric/src/fabric_db_doc_count.erl | 2 +-
src/fabric/src/fabric_db_info.erl | 2 +-
src/fabric/src/fabric_design_doc_count.erl | 2 +-
src/fabric/src/fabric_group_info.erl | 2 +-
src/fabric/src/fabric_ring.erl | 294 ++++++++++++++++++++++-------
src/fabric/src/fabric_streams.erl | 29 ++-
src/fabric/src/fabric_view.erl | 10 +-
src/fabric/src/fabric_view_all_docs.erl | 4 +-
src/fabric/src/fabric_view_map.erl | 5 +-
src/fabric/src/fabric_view_reduce.erl | 5 +-
11 files changed, 264 insertions(+), 94 deletions(-)
diff --git a/src/fabric/include/fabric.hrl b/src/fabric/include/fabric.hrl
index 8b71003..2a4da8b 100644
--- a/src/fabric/include/fabric.hrl
+++ b/src/fabric/include/fabric.hrl
@@ -38,7 +38,8 @@
workers,
ready,
start_fun,
- replacements
+ replacements,
+ ring_opts
}).
-record(view_row, {key, id, value, doc, worker}).
diff --git a/src/fabric/src/fabric_db_doc_count.erl b/src/fabric/src/fabric_db_doc_count.erl
index 1ea31f6..a91014b 100644
--- a/src/fabric/src/fabric_db_doc_count.erl
+++ b/src/fabric/src/fabric_db_doc_count.erl
@@ -41,7 +41,7 @@ handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Resps}) ->
end;
handle_message({rexi_EXIT, Reason}, Shard, {Counters, Resps}) ->
- case fabric_ring:worker_exited(Shard, Counters, Resps) of
+ case fabric_ring:handle_error(Shard, Counters, Resps) of
{ok, Counters1} -> {ok, {Counters1, Resps}};
error -> {error, Reason}
end;
diff --git a/src/fabric/src/fabric_db_info.erl b/src/fabric/src/fabric_db_info.erl
index b98d7ce..bb7a353 100644
--- a/src/fabric/src/fabric_db_info.erl
+++ b/src/fabric/src/fabric_db_info.erl
@@ -55,7 +55,7 @@ handle_message({rexi_DOWN, _, {_,NodeRef},_}, _, {Counters, Resps, CInfo}) ->
end;
handle_message({rexi_EXIT, Reason}, Shard, {Counters, Resps, CInfo}) ->
- case fabric_ring:worker_exited(Shard, Counters, Resps) of
+ case fabric_ring:handle_error(Shard, Counters, Resps) of
{ok, Counters1} -> {ok, {Counters1, Resps, CInfo}};
error -> {error, Reason}
end;
diff --git a/src/fabric/src/fabric_design_doc_count.erl b/src/fabric/src/fabric_design_doc_count.erl
index 2522e91..b0efc30 100644
--- a/src/fabric/src/fabric_design_doc_count.erl
+++ b/src/fabric/src/fabric_design_doc_count.erl
@@ -41,7 +41,7 @@ handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Resps}) ->
end;
handle_message({rexi_EXIT, Reason}, Shard, {Counters, Resps}) ->
- case fabric_ring:worker_exited(Shard, Counters, Resps) of
+ case fabric_ring:handle_error(Shard, Counters, Resps) of
{ok, Counters1} -> {ok, {Counters1, Resps}};
error -> {error, Reason}
end;
diff --git a/src/fabric/src/fabric_group_info.erl b/src/fabric/src/fabric_group_info.erl
index 61f7c76..be50742 100644
--- a/src/fabric/src/fabric_group_info.erl
+++ b/src/fabric/src/fabric_group_info.erl
@@ -47,7 +47,7 @@ handle_message({rexi_DOWN, _, {_,NodeRef},_}, _, {Counters, Resps, USet}) ->
end;
handle_message({rexi_EXIT, Reason}, Shard, {Counters, Resps, USet}) ->
- case fabric_ring:worker_exited(Shard, Counters, Resps) of
+ case fabric_ring:handle_error(Shard, Counters, Resps) of
{ok, Counters1} -> {ok, {Counters1, Resps, USet}};
error -> {error, Reason}
end;
diff --git a/src/fabric/src/fabric_ring.erl b/src/fabric/src/fabric_ring.erl
index 668249a..e3168e9 100644
--- a/src/fabric/src/fabric_ring.erl
+++ b/src/fabric/src/fabric_ring.erl
@@ -15,12 +15,14 @@
-export([
is_progress_possible/1,
+ is_progress_possible/2,
get_shard_replacements/2,
- worker_exited/3,
node_down/3,
+ node_down/4,
handle_error/3,
- handle_response/4
-
+ handle_error/4,
+ handle_response/4,
+ handle_response/5
]).
@@ -29,12 +31,21 @@
-type fabric_dict() :: [{#shard{}, any()}].
+-type ring_opts() :: [atom() | tuple()].
%% @doc looks for a fully covered keyrange in the list of counters
-spec is_progress_possible(fabric_dict()) -> boolean().
is_progress_possible(Counters) ->
- mem3_util:get_ring(get_worker_ranges(Counters)) =/= [].
+ is_progress_possible(Counters, []).
+
+
+%% @doc looks for a fully covered keyrange in the list of counters
+%% This version take ring option to configure how progress will
+%% be checked. By default, [], checks that the full ring is covered.
+-spec is_progress_possible(fabric_dict(), ring_opts()) -> boolean().
+is_progress_possible(Counters, RingOpts) ->
+ is_progress_possible(Counters, [], 0, ?RING_END, RingOpts).
-spec get_shard_replacements(binary(), [#shard{}]) -> [#shard{}].
@@ -46,23 +57,20 @@ get_shard_replacements(DbName, UsedShards0) ->
get_shard_replacements_int(AllLiveShards -- UsedShards, UsedShards).
--spec worker_exited(#shard{}, fabric_dict(), [{any(), #shard{}, any()}]) ->
+-spec node_down(node(), fabric_dict(), fabric_dict()) ->
{ok, fabric_dict()} | error.
-worker_exited(Shard, Workers, Responses) ->
- Workers1 = fabric_dict:erase(Shard, Workers),
- case is_progress_possible(Workers1, Responses) of
- true -> {ok, Workers1};
- false -> error
- end.
+node_down(Node, Workers, Responses) ->
+ node_down(Node, Workers, Responses, []).
--spec node_down(node(), fabric_dict(), fabric_dict()) ->
+-spec node_down(node(), fabric_dict(), fabric_dict(), ring_opts()) ->
{ok, fabric_dict()} | error.
-node_down(Node, Workers, Responses) ->
+node_down(Node, Workers, Responses, RingOpts) ->
+ {B, E} = range_bounds(Workers, Responses),
Workers1 = fabric_dict:filter(fun(#shard{node = N}, _) ->
N =/= Node
end, Workers),
- case is_progress_possible(Workers1, Responses) of
+ case is_progress_possible(Workers1, Responses, B, E, RingOpts) of
true -> {ok, Workers1};
false -> error
end.
@@ -71,8 +79,15 @@ node_down(Node, Workers, Responses) ->
-spec handle_error(#shard{}, fabric_dict(), fabric_dict()) ->
{ok, fabric_dict()} | error.
handle_error(Shard, Workers, Responses) ->
+ handle_error(Shard, Workers, Responses, []).
+
+
+-spec handle_error(#shard{}, fabric_dict(), fabric_dict(), ring_opts()) ->
+ {ok, fabric_dict()} | error.
+handle_error(Shard, Workers, Responses, RingOpts) ->
+ {B, E} = range_bounds(Workers, Responses),
Workers1 = fabric_dict:erase(Shard, Workers),
- case is_progress_possible(Workers1, Responses) of
+ case is_progress_possible(Workers1, Responses, B, E, RingOpts) of
true -> {ok, Workers1};
false -> error
end.
@@ -81,43 +96,107 @@ handle_error(Shard, Workers, Responses) ->
-spec handle_response(#shard{}, any(), fabric_dict(), fabric_dict()) ->
{ok, {fabric_dict(), fabric_dict()}} | {stop, fabric_dict()}.
handle_response(Shard, Response, Workers, Responses) ->
- handle_response(Shard, Response, Workers, Responses, fun stop_workers/1).
+ handle_response(Shard, Response, Workers, Responses, []).
+
+
+-spec handle_response(#shard{}, any(), fabric_dict(), fabric_dict(),
+ ring_opts()) ->
+ {ok, {fabric_dict(), fabric_dict()}} | {stop, fabric_dict()}.
+handle_response(Shard, Response, Workers, Responses, RingOpts) ->
+ handle_response(Shard, Response, Workers, Responses, RingOpts,
+ fun stop_workers/1).
--spec handle_response(#shard{}, any(), fabric_dict(), fabric_dict(), fun()) ->
- {ok, {fabric_dict(), fabric_dict()}} | {stop, list()}.
-handle_response(Shard, Response, Workers, Responses, CleanupCb) ->
+% Worker response handler. Gets reponses from shard and puts them in the list
+% until they complete a full ring. Then kill unused responses and remaining
+% workers.
+%
+% How a ring "completes" is driven by RingOpts:
+%
+% * When RingOpts is [] (the default case) responses must form a "clean"
+% ring, where all copies at the start of the range and end of the range must
+% have the same boundary values.
+%
+% * When RingOpts is [{any, [#shard{}]}] responses are accepted from any of
+% the provided list of shards. This type of ring might be used when querying
+% a partitioned database. As soon as a result from any of the shards
+% arrives, result collection stops.
+%
+handle_response(Shard, Response, Workers, Responses, RingOpts, CleanupCb) ->
Workers1 = fabric_dict:erase(Shard, Workers),
- #shard{range = [B, E]} = Shard,
- Responses1 = [{{B, E}, Shard, Response} | Responses],
- ResponseRanges = lists:map(fun({R, _, _}) -> R end, Responses1),
- {MinB, MaxE} = get_range_bounds(Workers, ResponseRanges),
- case mem3_util:get_ring(ResponseRanges, MinB, MaxE) of
+ case RingOpts of
+ [] ->
+ #shard{range = [B, E]} = Shard,
+ Responses1 = [{{B, E}, Shard, Response} | Responses],
+ handle_response_ring(Workers1, Responses1, CleanupCb);
+ [{any, Any}] ->
+ handle_response_any(Shard, Response, Workers1, Any, CleanupCb)
+ end.
+
+
+handle_response_ring(Workers, Responses, CleanupCb) ->
+ {MinB, MaxE} = range_bounds(Workers, Responses),
+ Ranges = lists:map(fun({R, _, _}) -> R end, Responses),
+ case mem3_util:get_ring(Ranges, MinB, MaxE) of
[] ->
- {ok, {Workers1, Responses1}};
+ {ok, {Workers, Responses}};
Ring ->
% Return one response per range in the ring. The
% response list is reversed before sorting so that the
% first shard copy to reply is first. We use keysort
% because it is documented as being stable so that
% we keep the relative order of duplicate shards
- SortedResponses = lists:keysort(1, lists:reverse(Responses1)),
+ SortedResponses = lists:keysort(1, lists:reverse(Responses)),
UsedResponses = get_responses(Ring, SortedResponses),
% Kill all the remaining workers as well as the redunant responses
- stop_unused_workers(Workers1, Responses1, UsedResponses, CleanupCb),
- {stop, UsedResponses}
+ stop_unused_workers(Workers, Responses, UsedResponses, CleanupCb),
+ {stop, fabric_dict:from_list(UsedResponses)}
+ end.
+
+
+handle_response_any(Shard, Response, Workers, Any, CleanupCb) ->
+ case lists:member(Shard#shard{ref = undefined}, Any) of
+ true ->
+ stop_unused_workers(Workers, [], [], CleanupCb),
+ {stop, fabric_dict:from_list([{Shard, Response}])};
+ false ->
+ {ok, {Workers, []}}
end.
-% This version combines workers that are still waiting and the ones that have
-% responded already.
--spec is_progress_possible(fabric_dict(), [{any(), #shard{}, any()}]) ->
- boolean().
-is_progress_possible(Counters, Responses) ->
+% Check if workers still waiting and the already received responses could
+% still form a continous range. The range won't always be the full ring, and
+% the bounds are computed based on the minimum and maximum interval beginning
+% and ends.
+%
+% There is also a special case where even if the ring cannot be formed, but
+% there is an overlap between all the shards, then it's considered that
+% progress can still be made. This is essentially to allow for split
+% partitioned shards where one shard copy on a node was split the set of ranges
+% might look like: 00-ff, 00-ff, 07-ff. Even if both 00-ff workers exit,
+% progress can still be made with the remaining 07-ff copy.
+%
+-spec is_progress_possible(fabric_dict(), [{any(), #shard{}, any()}],
+ non_neg_integer(), non_neg_integer(), ring_opts()) -> boolean().
+is_progress_possible([], [], _, _, _) ->
+ false;
+
+is_progress_possible(Counters, Responses, MinB, MaxE, []) ->
ResponseRanges = lists:map(fun({{B, E}, _, _}) -> {B, E} end, Responses),
- {MinB, MaxE} = get_range_bounds(Counters, ResponseRanges),
- Ranges = get_worker_ranges(Counters) ++ ResponseRanges,
- mem3_util:get_ring(Ranges, MinB, MaxE) =/= [].
+ Ranges = worker_ranges(Counters) ++ ResponseRanges,
+ mem3_util:get_ring(Ranges, MinB, MaxE) =/= [];
+
+is_progress_possible(Counters, Responses, _, _, [{any, AnyShards}]) ->
+ InAny = fun(S) -> lists:member(S#shard{ref = undefined}, AnyShards) end,
+ case fabric_dict:filter(fun(S, _) -> InAny(S) end, Counters) of
+ [] ->
+ case lists:filter(fun({_, S, _}) -> InAny(S) end, Responses) of
+ [] -> false;
+ [_ | _] -> true
+ end;
+ [_ | _] ->
+ true
+ end.
get_shard_replacements_int(UnusedShards, UsedShards) ->
@@ -144,16 +223,17 @@ get_shard_replacements_int(UnusedShards, UsedShards) ->
end, [], UsedShards).
--spec get_worker_ranges(fabric_dict()) -> [{integer(), integer()}].
-get_worker_ranges(Workers) ->
+-spec worker_ranges(fabric_dict()) -> [{integer(), integer()}].
+worker_ranges(Workers) ->
Ranges = fabric_dict:fold(fun(#shard{range=[X, Y]}, _, Acc) ->
[{X, Y} | Acc]
end, [], Workers),
lists:usort(Ranges).
-get_range_bounds(Workers, ResponseRanges) ->
- Ranges = get_worker_ranges(Workers) ++ ResponseRanges,
+range_bounds(Workers, Responses) ->
+ RespRanges = lists:map(fun({R, _, _}) -> R end, Responses),
+ Ranges = worker_ranges(Workers) ++ RespRanges,
{Bs, Es} = lists:unzip(Ranges),
{lists:min(Bs), lists:max(Es)}.
@@ -172,9 +252,9 @@ stop_unused_workers(_, _, _, undefined) ->
ok;
stop_unused_workers(Workers, AllResponses, UsedResponses, CleanupCb) ->
- WorkerShards = [S || {S, _V} <- Workers],
- Used = [S || {S, _V} <- UsedResponses],
- Unused = [S || {_R, S, _V} <- AllResponses, not lists:member(S, Used)],
+ WorkerShards = [S || {S, _} <- Workers],
+ Used = [S || {S, _} <- UsedResponses],
+ Unused = [S || {_, S, _} <- AllResponses, not lists:member(S, Used)],
CleanupCb(WorkerShards ++ Unused).
@@ -184,26 +264,71 @@ stop_workers(Shards) when is_list(Shards) ->
% Unit tests
-is_progress_possible_test() ->
+is_progress_possible_full_range_test() ->
+ % a base case
+ ?assertEqual(false, is_progress_possible([], [], 0, 0, [])),
T1 = [[0, ?RING_END]],
- ?assertEqual(is_progress_possible(mk_cnts(T1)), true),
+ ?assertEqual(true, is_progress_possible(mk_cnts(T1))),
T2 = [[0, 10], [11, 20], [21, ?RING_END]],
- ?assertEqual(is_progress_possible(mk_cnts(T2)), true),
+ ?assertEqual(true, is_progress_possible(mk_cnts(T2))),
% gap
T3 = [[0, 10], [12, ?RING_END]],
- ?assertEqual(is_progress_possible(mk_cnts(T3)), false),
+ ?assertEqual(false, is_progress_possible(mk_cnts(T3))),
% outside range
T4 = [[1, 10], [11, 20], [21, ?RING_END]],
- ?assertEqual(is_progress_possible(mk_cnts(T4)), false),
+ ?assertEqual(false, is_progress_possible(mk_cnts(T4))),
% outside range
T5 = [[0, 10], [11, 20], [21, ?RING_END + 1]],
- ?assertEqual(is_progress_possible(mk_cnts(T5)), false),
+ ?assertEqual(false, is_progress_possible(mk_cnts(T5))),
% possible progress but with backtracking
T6 = [[0, 10], [11, 20], [0, 5], [6, 21], [21, ?RING_END]],
- ?assertEqual(is_progress_possible(mk_cnts(T6)), true),
+ ?assertEqual(true, is_progress_possible(mk_cnts(T6))),
% not possible, overlap is not exact
T7 = [[0, 10], [13, 20], [21, ?RING_END], [9, 12]],
- ?assertEqual(is_progress_possible(mk_cnts(T7)), false).
+ ?assertEqual(false, is_progress_possible(mk_cnts(T7))).
+
+
+is_progress_possible_with_responses_test() ->
+ C1 = mk_cnts([[0, ?RING_END]]),
+ ?assertEqual(true, is_progress_possible(C1, [], 0, ?RING_END, [])),
+ % check for gaps
+ C2 = mk_cnts([[5, 6], [7, 8]]),
+ ?assertEqual(true, is_progress_possible(C2, [], 5, 8, [])),
+ ?assertEqual(false, is_progress_possible(C2, [], 4, 8, [])),
+ ?assertEqual(false, is_progress_possible(C2, [], 5, 7, [])),
+ ?assertEqual(false, is_progress_possible(C2, [], 4, 9, [])),
+ % check for uneven shard range copies
+ C3 = mk_cnts([[2, 5], [2, 10]]),
+ ?assertEqual(true, is_progress_possible(C3, [], 2, 10, [])),
+ ?assertEqual(false, is_progress_possible(C3, [], 2, 11, [])),
+ ?assertEqual(false, is_progress_possible(C3, [], 3, 10, [])),
+ % they overlap but still not a proper ring
+ C4 = mk_cnts([[2, 4], [3, 7], [6, 10]]),
+ ?assertEqual(false, is_progress_possible(C4, [], 2, 10, [])),
+ % some of the ranges are in responses
+ RS1 = mk_resps([{"n1", 7, 8, 42}]),
+ C5 = mk_cnts([[5, 6]]),
+ ?assertEqual(true, is_progress_possible(C5, RS1, 5, 8, [])),
+ ?assertEqual(false, is_progress_possible([], RS1, 5, 8, [])),
+ ?assertEqual(true, is_progress_possible([], RS1, 7, 8, [])).
+
+
+is_progress_possible_with_ring_opts_test() ->
+ Opts = [{any, [mk_shard("n1", [0, 5]), mk_shard("n2", [3, 10])]}],
+ C1 = [{mk_shard("n1", [0, ?RING_END]), nil}],
+ RS1 = mk_resps([{"n1", 3, 10, 42}]),
+ ?assertEqual(false, is_progress_possible(C1, [], 0, ?RING_END, Opts)),
+ ?assertEqual(false, is_progress_possible([], [], 0, ?RING_END, Opts)),
+ ?assertEqual(false, is_progress_possible([], RS1, 0, ?RING_END, Opts)),
+ % explicitly accept only the shard specified in the ring options
+ ?assertEqual(false, is_progress_possible([], RS1, 3, 10, [{any, []}])),
+ % need to match the node exactly
+ ?assertEqual(false, is_progress_possible([], RS1, 3, 10, Opts)),
+ RS2 = mk_resps([{"n2", 3, 10, 42}]),
+ ?assertEqual(true, is_progress_possible([], RS2, 3, 10, Opts)),
+ % assert that counters can fill the ring not just the response
+ C2 = [{mk_shard("n1", [0, 5]), nil}],
+ ?assertEqual(true, is_progress_possible(C2, [], 0, ?RING_END, Opts)).
get_shard_replacements_test() ->
@@ -233,13 +358,13 @@ handle_response_basic_test() ->
Workers1 = fabric_dict:init([Shard1, Shard2], nil),
- Result1 = handle_response(Shard1, 42, Workers1, [], undefined),
+ Result1 = handle_response(Shard1, 42, Workers1, [], [], undefined),
?assertMatch({ok, {_, _}}, Result1),
{ok, {Workers2, Responses1}} = Result1,
?assertEqual(fabric_dict:erase(Shard1, Workers1), Workers2),
?assertEqual([{{0, 1}, Shard1, 42}], Responses1),
- Result2 = handle_response(Shard2, 43, Workers2, Responses1, undefined),
+ Result2 = handle_response(Shard2, 43, Workers2, Responses1, [], undefined),
?assertEqual({stop, [{Shard1, 42}, {Shard2, 43}]}, Result2).
@@ -249,13 +374,13 @@ handle_response_incomplete_ring_test() ->
Workers1 = fabric_dict:init([Shard1, Shard2], nil),
- Result1 = handle_response(Shard1, 42, Workers1, [], undefined),
+ Result1 = handle_response(Shard1, 42, Workers1, [], [], undefined),
?assertMatch({ok, {_, _}}, Result1),
{ok, {Workers2, Responses1}} = Result1,
?assertEqual(fabric_dict:erase(Shard1, Workers1), Workers2),
?assertEqual([{{0, 1}, Shard1, 42}], Responses1),
- Result2 = handle_response(Shard2, 43, Workers2, Responses1, undefined),
+ Result2 = handle_response(Shard2, 43, Workers2, Responses1, [], undefined),
?assertEqual({stop, [{Shard1, 42}, {Shard2, 43}]}, Result2).
@@ -266,15 +391,15 @@ handle_response_test_multiple_copies_test() ->
Workers1 = fabric_dict:init([Shard1, Shard2, Shard3], nil),
- Result1 = handle_response(Shard1, 42, Workers1, [], undefined),
+ Result1 = handle_response(Shard1, 42, Workers1, [], [], undefined),
?assertMatch({ok, {_, _}}, Result1),
{ok, {Workers2, Responses1}} = Result1,
- Result2 = handle_response(Shard2, 43, Workers2, Responses1, undefined),
+ Result2 = handle_response(Shard2, 43, Workers2, Responses1, [], undefined),
?assertMatch({ok, {_, _}}, Result2),
{ok, {Workers3, Responses2}} = Result2,
- Result3 = handle_response(Shard3, 44, Workers3, Responses2, undefined),
+ Result3 = handle_response(Shard3, 44, Workers3, Responses2, [], undefined),
% Use the value (42) to distinguish between [0, 1] copies. In reality
% they should have the same value but here we need to assert that copy
% that responded first is included in the ring.
@@ -287,22 +412,46 @@ handle_response_test_backtracking_test() ->
Shard3 = mk_shard("n2", [2, ?RING_END]),
Shard4 = mk_shard("n3", [0, 1]),
- Workers1 = fabric_dict:init([Shard1, Shard2, Shard3, Shard3], nil),
+ Workers1 = fabric_dict:init([Shard1, Shard2, Shard3, Shard4], nil),
- Result1 = handle_response(Shard1, 42, Workers1, [], undefined),
+ Result1 = handle_response(Shard1, 42, Workers1, [], [], undefined),
?assertMatch({ok, {_, _}}, Result1),
{ok, {Workers2, Responses1}} = Result1,
- Result2 = handle_response(Shard2, 43, Workers2, Responses1, undefined),
+ Result2 = handle_response(Shard2, 43, Workers2, Responses1, [], undefined),
?assertMatch({ok, {_, _}}, Result2),
{ok, {Workers3, Responses2}} = Result2,
- Result3 = handle_response(Shard3, 44, Workers3, Responses2, undefined),
+ Result3 = handle_response(Shard3, 44, Workers3, Responses2, [], undefined),
?assertMatch({ok, {_, _}}, Result3),
{ok, {Workers4, Responses3}} = Result3,
- Result4 = handle_response(Shard4, 45, Workers4, Responses3, undefined),
- ?assertEqual({stop, [{Shard4, 45}, {Shard3, 44}]}, Result4).
+ Result4 = handle_response(Shard4, 45, Workers4, Responses3, [], undefined),
+ ?assertEqual({stop, [{Shard3, 44}, {Shard4, 45}]}, Result4).
+
+
+handle_response_test_ring_opts_test() ->
+ Shard1 = mk_shard("n1", [0, 5]),
+ Shard2 = mk_shard("n2", [0, 1]),
+ Shard3 = mk_shard("n3", [0, 1]),
+
+ Opts = [{any, [mk_shard("n3", [0, 1])]}],
+
+ ShardList = [Shard1, Shard2, Shard3],
+ WithRefs = [S#shard{ref = make_ref()} || S <- ShardList],
+ Workers1 = fabric_dict:init(WithRefs, nil),
+
+ Result1 = handle_response(Shard1, 42, Workers1, [], Opts, undefined),
+ ?assertMatch({ok, {_, _}}, Result1),
+ {ok, {Workers2, []}} = Result1,
+
+ % Still waiting because the node doesn't match
+ Result2 = handle_response(Shard2, 43, Workers2, [], Opts, undefined),
+ ?assertMatch({ok, {_, _}}, Result2),
+ {ok, {Workers3, []}} = Result2,
+
+ Result3 = handle_response(Shard3, 44, Workers3, [], Opts, undefined),
+ ?assertEqual({stop, [{Shard3, 44}]}, Result3).
handle_error_test() ->
@@ -313,7 +462,7 @@ handle_error_test() ->
Workers1 = fabric_dict:init([Shard1, Shard2, Shard3, Shard4], nil),
- Result1 = handle_response(Shard1, 42, Workers1, [], undefined),
+ Result1 = handle_response(Shard1, 42, Workers1, [], [], undefined),
?assertMatch({ok, {_, _}}, Result1),
{ok, {Workers2, Responses1}} = Result1,
@@ -322,10 +471,9 @@ handle_error_test() ->
{ok, Workers3} = Result2,
?assertEqual(fabric_dict:erase(Shard2, Workers2), Workers3),
- Result3 = handle_response(Shard3, 44, Workers3, Responses1, undefined),
+ Result3 = handle_response(Shard3, 44, Workers3, Responses1, [], undefined),
?assertMatch({ok, {_, _}}, Result3),
{ok, {Workers4, Responses3}} = Result3,
-
?assertEqual(error, handle_error(Shard4, Workers4, Responses3)).
@@ -337,11 +485,11 @@ node_down_test() ->
Workers1 = fabric_dict:init([Shard1, Shard2, Shard3, Shard4], nil),
- Result1 = handle_response(Shard1, 42, Workers1, [], undefined),
+ Result1 = handle_response(Shard1, 42, Workers1, [], [], undefined),
?assertMatch({ok, {_, _}}, Result1),
{ok, {Workers2, Responses1}} = Result1,
- Result2 = handle_response(Shard2, 43, Workers2, Responses1, undefined),
+ Result2 = handle_response(Shard2, 43, Workers2, Responses1, [], undefined),
?assertMatch({ok, {_, _}}, Result2),
{ok, {Workers3, Responses2}} = Result2,
@@ -350,7 +498,7 @@ node_down_test() ->
{ok, Workers4} = Result3,
?assertEqual([{Shard3, nil}, {Shard4, nil}], Workers4),
- Result4 = handle_response(Shard3, 44, Workers4, Responses2, undefined),
+ Result4 = handle_response(Shard3, 44, Workers4, Responses2, [], undefined),
?assertMatch({ok, {_, _}}, Result4),
{ok, {Workers5, Responses3}} = Result4,
@@ -362,7 +510,11 @@ node_down_test() ->
mk_cnts(Ranges) ->
Shards = lists:map(fun mk_shard/1, Ranges),
- orddict:from_list([{Shard,nil} || Shard <- Shards]).
+ fabric_dict:init([S#shard{ref = make_ref()} || S <- Shards], nil).
+
+
+mk_resps(RangeNameVals) ->
+ [{{B, E}, mk_shard(Name, [B, E]), V} || {Name, B, E, V} <- RangeNameVals].
mk_shard([B, E]) when is_integer(B), is_integer(E) ->
diff --git a/src/fabric/src/fabric_streams.erl b/src/fabric/src/fabric_streams.erl
index a4c9df6..d0a549d 100644
--- a/src/fabric/src/fabric_streams.erl
+++ b/src/fabric/src/fabric_streams.erl
@@ -14,7 +14,9 @@
-export([
start/2,
+ start/3,
start/4,
+ start/5,
cleanup/1
]).
@@ -28,13 +30,23 @@
start(Workers, Keypos) ->
start(Workers, Keypos, undefined, undefined).
-start(Workers0, Keypos, StartFun, Replacements) ->
+
+start(Workers, Keypos, RingOpts) ->
+ start(Workers, Keypos, undefined, undefined, RingOpts).
+
+
+start(Workers, Keypos, StartFun, Replacements) ->
+ start(Workers, Keypos, StartFun, Replacements, []).
+
+
+start(Workers0, Keypos, StartFun, Replacements, RingOpts) ->
Fun = fun handle_stream_start/3,
Acc = #stream_acc{
workers = fabric_dict:init(Workers0, waiting),
ready = [],
start_fun = StartFun,
- replacements = Replacements
+ replacements = Replacements,
+ ring_opts = RingOpts
},
spawn_worker_cleaner(self(), Workers0),
Timeout = fabric_util:request_timeout(),
@@ -65,8 +77,8 @@ cleanup(Workers) ->
handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, St) ->
- #stream_acc{workers = Workers, ready = Ready} = St,
- case fabric_ring:node_down(NodeRef, Workers, Ready) of
+ #stream_acc{workers = Workers, ready = Ready, ring_opts = RingOpts} = St,
+ case fabric_ring:node_down(NodeRef, Workers, Ready, RingOpts) of
{ok, Workers1} ->
{ok, St#stream_acc{workers = Workers1}};
error ->
@@ -77,9 +89,10 @@ handle_stream_start({rexi_EXIT, Reason}, Worker, St) ->
#stream_acc{
workers = Workers,
ready = Ready,
- replacements = Replacements
+ replacements = Replacements,
+ ring_opts = RingOpts
} = St,
- case {fabric_ring:worker_exited(Worker, Workers, Ready), Reason} of
+ case {fabric_ring:handle_error(Worker, Workers, Ready, RingOpts), Reason} of
{{ok, Workers1}, _Reason} ->
{ok, St#stream_acc{workers = Workers1}};
{error, {maintenance_mode, _Node}} when Replacements /= undefined ->
@@ -110,14 +123,14 @@ handle_stream_start({rexi_EXIT, Reason}, Worker, St) ->
end;
handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St) ->
- #stream_acc{workers = Workers, ready = Ready} = St,
+ #stream_acc{workers = Workers, ready = Ready, ring_opts = RingOpts} = St,
case fabric_dict:lookup_element(Worker, Workers) of
undefined ->
% This worker lost the race with other partition copies, terminate
rexi:stream_cancel(From),
{ok, St};
waiting ->
- case fabric_ring:handle_response(Worker, From, Workers, Ready) of
+ case fabric_ring:handle_response(Worker, From, Workers, Ready, RingOpts) of
{ok, {Workers1, Ready1}} ->
% Don't have a full ring yet. Keep getting responses
{ok, St#stream_acc{workers = Workers1, ready = Ready1}};
diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl
index eb31f53..193f6e7 100644
--- a/src/fabric/src/fabric_view.erl
+++ b/src/fabric/src/fabric_view.erl
@@ -280,13 +280,15 @@ get_shards(Db, #mrargs{} = Args) ->
% request.
case {Args#mrargs.stable, Partition} of
{true, undefined} ->
- mem3:ushards(DbName);
+ {mem3:ushards(DbName), []};
{true, Partition} ->
- mem3:ushards(DbName, couch_partition:shard_key(Partition));
+ Shards = mem3:ushards(DbName, couch_partition:shard_key(Partition)),
+ {Shards, [{any, Shards}]};
{false, undefined} ->
- mem3:shards(DbName);
+ {mem3:shards(DbName), []};
{false, Partition} ->
- mem3:shards(DbName, couch_partition:shard_key(Partition))
+ Shards = mem3:shards(DbName, couch_partition:shard_key(Partition)),
+ {Shards, [{any, Shards}]}
end.
maybe_update_others(DbName, DDoc, ShardsInvolved, ViewName,
diff --git a/src/fabric/src/fabric_view_all_docs.erl b/src/fabric/src/fabric_view_all_docs.erl
index 9049eaa..1d87e3d 100644
--- a/src/fabric/src/fabric_view_all_docs.erl
+++ b/src/fabric/src/fabric_view_all_docs.erl
@@ -23,12 +23,12 @@
go(Db, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) ->
{CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(QueryArgs),
DbName = fabric:dbname(Db),
- Shards = shards(Db, QueryArgs),
+ {Shards, RingOpts} = shards(Db, QueryArgs),
Workers0 = fabric_util:submit_jobs(
Shards, fabric_rpc, all_docs, [Options, WorkerArgs]),
RexiMon = fabric_util:create_monitors(Workers0),
try
- case fabric_streams:start(Workers0, #shard.ref) of
+ case fabric_streams:start(Workers0, #shard.ref, RingOpts) of
{ok, Workers} ->
try
go(DbName, Options, Workers, CoordArgs, Callback, Acc)
diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl
index 022bec8..5a5cc13 100644
--- a/src/fabric/src/fabric_view_map.erl
+++ b/src/fabric/src/fabric_view_map.erl
@@ -26,7 +26,7 @@ go(DbName, Options, GroupId, View, Args, Callback, Acc, VInfo)
go(Db, Options, DDoc, View, Args, Callback, Acc, VInfo) ->
DbName = fabric:dbname(Db),
- Shards = fabric_view:get_shards(Db, Args),
+ {Shards, RingOpts} = fabric_view:get_shards(Db, Args),
{CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args),
DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, View, Args),
@@ -38,7 +38,8 @@ go(Db, Options, DDoc, View, Args, Callback, Acc, VInfo) ->
Workers0 = fabric_util:submit_jobs(Shards, fabric_rpc, map_view, RPCArgs),
RexiMon = fabric_util:create_monitors(Workers0),
try
- case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls) of
+ case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls,
+ RingOpts) of
{ok, ddoc_updated} ->
Callback({error, ddoc_updated}, Acc);
{ok, Workers} ->
diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl
index f98bd10..a432b2c 100644
--- a/src/fabric/src/fabric_view_reduce.erl
+++ b/src/fabric/src/fabric_view_reduce.erl
@@ -25,7 +25,7 @@ go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) -
go(Db, DDoc, VName, Args, Callback, Acc, VInfo) ->
DbName = fabric:dbname(Db),
- Shards = fabric_view:get_shards(Db, Args),
+ {Shards, RingOpts} = fabric_view:get_shards(Db, Args),
{CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args),
DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
RPCArgs = [DocIdAndRev, VName, WorkerArgs],
@@ -37,7 +37,8 @@ go(Db, DDoc, VName, Args, Callback, Acc, VInfo) ->
Workers0 = fabric_util:submit_jobs(Shards,fabric_rpc,reduce_view,RPCArgs),
RexiMon = fabric_util:create_monitors(Workers0),
try
- case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls) of
+ case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls,
+ RingOpts) of
{ok, ddoc_updated} ->
Callback({error, ddoc_updated}, Acc);
{ok, Workers} ->