You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2022/10/01 03:56:31 UTC

[couchdb] branch fabric-handle-response-rmin created (now 0e6ba948d)

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a change to branch fabric-handle-response-rmin
in repository https://gitbox.apache.org/repos/asf/couchdb.git


      at 0e6ba948d Introduce {rmin, R} option for fabric:handle_response/4,5

This branch includes the following new commits:

     new 0e6ba948d Introduce {rmin, R} option for fabric:handle_response/4,5

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[couchdb] 01/01: Introduce {rmin, R} option for fabric:handle_response/4,5

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch fabric-handle-response-rmin
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 0e6ba948dd2b1c191157ac44e5c1b818a3f40e77
Author: Nick Vatamaniuc <va...@gmail.com>
AuthorDate: Fri Sep 30 23:39:08 2022 -0400

    Introduce {rmin, R} option for fabric:handle_response/4,5
    
    This option is useful when we want to accumulate at least R result copies for
    each document. Typically R=2 in a default 3 node cluster. Without shard
    splitting, this would simply be waiting for at least R copies returned from
    each range. However, in the case when shard copies don't have exactly the same
    range boundaries, the correct logic is to try to completely cover the ring at
    least R times. The algorith for checking if the ring can be covered is already
    implemented, we just add it as an option to fabric:handle_response/4,5.
    
    This is essentially another preparatory PR to implement the optimized
    `fabric:bulk_get(...)` API as described in issue #4183 and it's just split out
    as a seprate commit as it's a fairly standalone change.
    
    Additionally, had noticed that the unit tests in fabric_ring didn't cover the
    cleanup callback function, so added tests to make sure we exercise that code
    path.
    
    Emacs + erlang_ls also noticed the unused header included, so that's why that
    was removed.
---
 src/fabric/src/fabric_ring.erl | 132 ++++++++++++++++++++++++++++++++++++++++-
 src/mem3/src/mem3_util.erl     |  32 ++++++++--
 2 files changed, 157 insertions(+), 7 deletions(-)

diff --git a/src/fabric/src/fabric_ring.erl b/src/fabric/src/fabric_ring.erl
index 9349efb90..6a16275e0 100644
--- a/src/fabric/src/fabric_ring.erl
+++ b/src/fabric/src/fabric_ring.erl
@@ -24,7 +24,6 @@
     handle_response/5
 ]).
 
--include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
 
 -type fabric_dict() :: [{#shard{}, any()}].
@@ -118,6 +117,14 @@ handle_response(Shard, Response, Workers, Responses, RingOpts) ->
 %    ring, where all copies at the start of the range and end of the range must
 %    have the same boundary values.
 %
+%  * When RingOpts is [{rmin, R}], where R is an integer, accumulate responses
+%    until the ring can be completed at least R times. If shards had not been
+%    split, this would be the equivalent waiting until there are at least R
+%    responses for each shard range. Also of note is that [] is not exactly
+%    equivalent with [{rmin, 1}], as with [{rmin, 1}] it's possible that some
+%    shard ranges would return more than R copies while waiting for other
+%    ranges respond.
+%
 %  * When RingOpts is [{any, [#shard{}]}] responses are accepted from any of
 %    the provided list of shards. This type of ring might be used when querying
 %    a partitioned database. As soon as a result from any of the shards
@@ -133,6 +140,10 @@ handle_response(Shard, Response, Workers, Responses, RingOpts, CleanupCb) ->
             #shard{range = [B, E]} = Shard,
             Responses1 = [{{B, E}, Shard, Response} | Responses],
             handle_response_ring(Workers1, Responses1, CleanupCb);
+        [{rmin, RMin}] when is_integer(RMin), RMin >= 1 ->
+            #shard{range = [B, E]} = Shard,
+            Responses1 = [{{B, E}, Shard, Response} | Responses],
+            handle_response_rmin(RMin, Workers1, Responses1, CleanupCb);
         [{any, Any}] ->
             handle_response_any(Shard, Response, Workers1, Any, CleanupCb);
         [all] ->
@@ -159,6 +170,18 @@ handle_response_ring(Workers, Responses, CleanupCb) ->
             {stop, fabric_dict:from_list(UsedResponses)}
     end.
 
+handle_response_rmin(RMin, Workers, Responses, CleanupCb) ->
+    {MinB, MaxE} = range_bounds(Workers, Responses),
+    Shards = lists:map(fun({_, S, _}) -> S end, Responses),
+    case mem3_util:calculate_max_n(Shards, MinB, MaxE) of
+        MaxN when is_integer(MaxN), MaxN < RMin ->
+            {ok, {Workers, Responses}};
+        MaxN when is_integer(MaxN), MaxN >= RMin ->
+            UsedResponses = lists:map(fun({_, S, R}) -> {S, R} end, Responses),
+            stop_unused_workers(Workers, Responses, UsedResponses, CleanupCb),
+            {stop, fabric_dict:from_list(UsedResponses)}
+    end.
+
 handle_response_any(Shard, Response, Workers, Any, CleanupCb) ->
     case lists:member(Shard#shard{ref = undefined}, Any) of
         true ->
@@ -448,6 +471,50 @@ handle_response_backtracking_test() ->
     Result4 = handle_response(Shard4, 45, Workers4, Responses3, [], undefined),
     ?assertEqual({stop, [{Shard3, 44}, {Shard4, 45}]}, Result4).
 
+handle_response_rmin_test() ->
+    Shard1 = mk_shard("n1", [0, 5]),
+    Shard2 = mk_shard("n1", [6, 9]),
+    Shard3 = mk_shard("n1", [10, ?RING_END]),
+    Shard4 = mk_shard("n2", [2, ?RING_END]),
+    Shard5 = mk_shard("n3", [0, 1]),
+
+    Workers1 = fabric_dict:init([Shard1, Shard2, Shard3, Shard4, Shard5], nil),
+
+    Opts = [{rmin, 2}],
+
+    Result1 = handle_response(Shard1, 101, Workers1, [], Opts, undefined),
+    ?assertMatch({ok, {_, _}}, Result1),
+    {ok, {Workers2, Responses1}} = Result1,
+
+    Result2 = handle_response(Shard3, 103, Workers2, Responses1, Opts, undefined),
+    ?assertMatch({ok, {_, _}}, Result2),
+    {ok, {Workers3, Responses2}} = Result2,
+
+    Result3 = handle_response(Shard4, 104, Workers3, Responses2, Opts, undefined),
+    ?assertMatch({ok, {_, _}}, Result3),
+    {ok, {Workers4, Responses3}} = Result3,
+
+    Result4 = handle_response(Shard5, 105, Workers4, Responses3, Opts, undefined),
+    % Even though Shard4 and Shard5 would complete a full ring we're not done
+    % we need two full rings since our rmin is 2.
+    ?assertMatch({ok, {_, _}}, Result4),
+    {ok, {Workers5, Responses4}} = Result4,
+
+    Result5 = handle_response(Shard2, 102, Workers5, Responses4, Opts, undefined),
+    ?assertMatch({stop, [_ | _]}, Result5),
+
+    {stop, FinalResponses} = Result5,
+    ?assertEqual(
+        [
+            {Shard1, 101},
+            {Shard2, 102},
+            {Shard3, 103},
+            {Shard4, 104},
+            {Shard5, 105}
+        ],
+        lists:sort(FinalResponses)
+    ).
+
 handle_response_ring_opts_any_test() ->
     Shard1 = mk_shard("n1", [0, 5]),
     Shard2 = mk_shard("n2", [0, 1]),
@@ -546,6 +613,64 @@ node_down_test() ->
 
     ?assertEqual(error, node_down(n3, Workers5, Responses3)).
 
+% Check that cleanup callback for fabric:handle_response/* gets called both to
+% kill workers which haven't returned or results which where not used (for
+% example if we wanted to stream for only the results we picked and throw the
+% other results away).
+%
+handle_response_cleanup_callback_test_() ->
+    {
+        foreach,
+        fun setup/0,
+        fun teardown/1,
+        [
+            ?TDEF_FE(t_cleanup_unused),
+            ?TDEF_FE(t_cleanup_not_returned)
+        ]
+    }.
+
+setup() ->
+    meck:new(rexi, [passthrough]),
+    meck:expect(rexi, kill_all, 1, ok),
+    ok.
+
+teardown(_) ->
+    meck:unload().
+
+t_cleanup_unused(_) ->
+    Ref1 = make_ref(),
+    Ref2 = make_ref(),
+    Ref3 = make_ref(),
+    Shard1 = mk_shard("n1", [0, 1], Ref1),
+    Shard2 = mk_shard("n2", [0, 1], Ref2),
+    Shard3 = mk_shard("n1", [2, ?RING_END], Ref3),
+
+    Workers1 = fabric_dict:init([Shard1, Shard2, Shard3], nil),
+    {ok, {Workers2, Responses1}} = handle_response(Shard1, 42, Workers1, []),
+    {ok, {Workers3, Responses2}} = handle_response(Shard2, 43, Workers2, Responses1),
+    {stop, [{_, 42}, {_, 44}]} = handle_response(Shard3, 44, Workers3, Responses2),
+    ?assertMatch(
+        [
+            {_, {rexi, kill_all, [[{n2, Ref2}]]}, ok}
+        ],
+        meck:history(rexi)
+    ).
+
+t_cleanup_not_returned(_) ->
+    Ref1 = make_ref(),
+    Ref2 = make_ref(),
+    Shard1 = mk_shard("n1", [0, ?RING_END], Ref1),
+    Shard2 = mk_shard("n2", [0, ?RING_END], Ref2),
+
+    Workers = fabric_dict:init([Shard1, Shard2], nil),
+    {stop, [{_, 42}]} = handle_response(Shard1, 42, Workers, []),
+    ?assertMatch(
+        [
+            {_, {rexi, kill_all, [[{n2, Ref2}]]}, ok}
+        ],
+        meck:history(rexi)
+    ).
+
 mk_cnts(Ranges) ->
     Shards = lists:map(fun mk_shard/1, Ranges),
     fabric_dict:init([S#shard{ref = make_ref()} || S <- Shards], nil).
@@ -557,8 +682,11 @@ mk_shard([B, E]) when is_integer(B), is_integer(E) ->
     #shard{range = [B, E]}.
 
 mk_shard(Name, Range) ->
+    mk_shard(Name, Range, undefined).
+
+mk_shard(Name, Range, Ref) ->
     Node = list_to_atom(Name),
     BName = list_to_binary(Name),
-    #shard{name = BName, node = Node, range = Range}.
+    #shard{name = BName, node = Node, range = Range, ref = Ref}.
 
 -endif.
diff --git a/src/mem3/src/mem3_util.erl b/src/mem3/src/mem3_util.erl
index 6a4ae1327..f05fe7378 100644
--- a/src/mem3/src/mem3_util.erl
+++ b/src/mem3/src/mem3_util.erl
@@ -43,7 +43,8 @@
     get_ring/4,
     non_overlapping_shards/1,
     non_overlapping_shards/3,
-    calculate_max_n/1
+    calculate_max_n/1,
+    calculate_max_n/3
 ]).
 
 %% do not use outside mem3.
@@ -502,6 +503,9 @@ non_overlapping_shards(Shards, Start, End) ->
 % across all the ranges. If the ring is incomplete it will return 0.
 % If there it is an n = 1 database, it should return 1, etc.
 calculate_max_n(Shards) ->
+    calculate_max_n(Shards, 0, ?RING_END).
+
+calculate_max_n(Shards, Start, End) when is_integer(Start), is_integer(End) ->
     Ranges = lists:map(
         fun(Shard) ->
             [B, E] = mem3:range(Shard),
@@ -509,13 +513,15 @@ calculate_max_n(Shards) ->
         end,
         Shards
     ),
-    calculate_max_n(Ranges, get_ring(Ranges), 0).
+    FirstRing = get_ring(Ranges, Start, End),
+    calculate_max_n(Ranges, FirstRing, Start, End, 0).
 
-calculate_max_n(_Ranges, [], N) ->
+calculate_max_n(_Ranges, [], _Start, _End, N) ->
     N;
-calculate_max_n(Ranges, Ring, N) ->
+calculate_max_n(Ranges, Ring, Start, End, N) ->
     NewRanges = Ranges -- Ring,
-    calculate_max_n(NewRanges, get_ring(NewRanges), N + 1).
+    NewRing = get_ring(NewRanges, Start, End),
+    calculate_max_n(NewRanges, NewRing, Start, End, N + 1).
 
 get_ring(Ranges) ->
     get_ring(Ranges, fun sort_ranges_fun/2, 0, ?RING_END).
@@ -752,6 +758,22 @@ calculate_max_n_test_() ->
         ]
     ].
 
+calculate_max_n_custom_range_test_() ->
+    [
+        ?_assertEqual(Res, calculate_max_n(Shards, B, E))
+     || {Res, Shards, B, E} <- [
+            {0, [], 1, 15},
+            {0, [], 0, 15},
+            {0, [shard(1, 10)], 1, 15},
+            {0, [shard(0, 8)], 1, 15},
+            {1, [shard(0, 15)], 0, 15},
+            {1, [shard(0, 15), shard(1, 15)], 0, 15},
+            {2, [shard(0, 15), shard(0, 15)], 0, 15},
+            {2, [shard(0, 1), shard(2, 15), shard(0, 15)], 0, 15},
+            {0, [shard(0, 3), shard(3, 15), shard(1, 15)], 0, 15}
+        ]
+    ].
+
 shard(Begin, End) ->
     #shard{range = [Begin, End]}.