You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2021/08/24 22:58:06 UTC

[couchdb] 01/01: Avoid change feed rewinds on shard moves

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch check-for-moved-shards
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 29abd4571e6590645768fd3a083b774d997e7ce6
Author: Nick Vatamaniuc <va...@gmail.com>
AuthorDate: Tue Aug 24 18:44:21 2021 -0400

    Avoid change feed rewinds on shard moves
    
    When shards are moved, attempt to match missing nodes and ranges from the old
    changes sequence by inspecting current shard uuids.
    
    Previously, if a node and range was missing, we randomly picked a node in the
    appropriate range, so 1/3 of the time we might have hit the exact node, but 2/3
    of the times we would end up with a complete changes feed rewind.
    
    Since update sequences contain uuids of each of the shards, and assuming shard
    moves preserve uuid (they are just plain file copies), it is possible to match
    based on that instead of guessing.
    
    Unfortunately, this involves a fabric worker scatter gather operation to all
    shard copies. This should only happen when we get an incomplete sequence. We
    rely on that happenning rarely, mostly right after the shards move, and in case
    users stashed change feed sequence from the old shard map configuration.
    Afterwards, users should start using the new last_seq values which would have
    the new shard map configuration with the new nodes.
---
 src/fabric/src/fabric.erl              |  7 +++-
 src/fabric/src/fabric_db_doc_count.erl |  1 +
 src/fabric/src/fabric_db_uuids.erl     | 65 ++++++++++++++++++++++++++++++++++
 src/fabric/src/fabric_ring.erl         | 20 ++++++++++-
 src/fabric/src/fabric_rpc.erl          |  5 ++-
 src/fabric/src/fabric_util.erl         | 49 +++++++++++++++++++++++++
 src/fabric/src/fabric_view_changes.erl | 43 +++++++++++++++++++++-
 7 files changed, 186 insertions(+), 4 deletions(-)

diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl
index 27fa8c0..2461579 100644
--- a/src/fabric/src/fabric.erl
+++ b/src/fabric/src/fabric.erl
@@ -37,7 +37,7 @@
 % miscellany
 -export([design_docs/1, reset_validation_funs/1, cleanup_index_files/0,
     cleanup_index_files/1, cleanup_index_files_all_nodes/1, dbname/1,
-    inactive_index_files/1]).
+    inactive_index_files/1, db_uuids/1]).
 
 -include_lib("fabric/include/fabric.hrl").
 
@@ -551,6 +551,11 @@ dbname(Db) ->
         erlang:error({illegal_database_name, Db})
     end.
 
+%% @doc get db shard uuids
+-spec db_uuids(dbname())  -> map().
+db_uuids(DbName) ->
+    fabric_db_uuid:go(dbname(DbName)).
+
 name(Thing) ->
     couch_util:to_binary(Thing).
 
diff --git a/src/fabric/src/fabric_db_doc_count.erl b/src/fabric/src/fabric_db_doc_count.erl
index a91014b..2258ee4 100644
--- a/src/fabric/src/fabric_db_doc_count.erl
+++ b/src/fabric/src/fabric_db_doc_count.erl
@@ -47,6 +47,7 @@ handle_message({rexi_EXIT, Reason}, Shard, {Counters, Resps}) ->
     end;
 
 handle_message({ok, Count}, Shard, {Counters, Resps}) ->
+    couch_log:error(" +++++++ ~p:~p@~B OK Count:~p Shard:~p Counter:~p Resps:~p", [?MODULE, ?FUNCTION_NAME, ?LINE, Count, Shard, Counters, Resps]),
     case fabric_ring:handle_response(Shard, Count, Counters, Resps) of
         {ok, {Counters1, Resps1}} ->
             {ok, {Counters1, Resps1}};
diff --git a/src/fabric/src/fabric_db_uuids.erl b/src/fabric/src/fabric_db_uuids.erl
new file mode 100644
index 0000000..e15671b
--- /dev/null
+++ b/src/fabric/src/fabric_db_uuids.erl
@@ -0,0 +1,65 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_db_uuids).
+
+-export([go/1]).
+
+-include_lib("fabric/include/fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+%-include_lib("couch/include/couch_db.hrl").
+
+go(DbName) when is_binary(DbName) ->
+    Shards = mem3:live_shards(DbName, [node() | nodes()]),
+    Workers = fabric_util:submit_jobs(Shards, get_uuid, []),
+    RexiMon = fabric_util:create_monitors(Shards),
+    Acc0 = {fabric_dict:init(Workers, nil), []},
+    try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
+    {timeout, {WorkersDict, _}} ->
+        DefunctWorkers = fabric_util:remove_done_workers(WorkersDict, nil),
+        fabric_util:log_timeout(DefunctWorkers, "db_uuid_match"),
+        {error, timeout};
+    Else ->
+        Else
+    after
+        rexi_monitor:stop(RexiMon)
+    end.
+
+handle_message({rexi_DOWN, _, {_, NodeRef},_}, _Shard, {Cntrs, Res}) ->
+    case fabric_ring:node_down(NodeRef, Cntrs, Res, [all]) of
+        {ok, Cntrs1} -> {ok, {Cntrs1, Res}};
+        error -> {error, {nodedown, <<"progress not possible">>}}
+    end;
+
+handle_message({rexi_EXIT, Reason}, Shard, {Cntrs, Res}) ->
+    case fabric_ring:handle_error(Shard, Cntrs, Res, [all]) of
+        {ok, Cntrs1} -> {ok, {Cntrs1, Res}};
+        error -> {error, Reason}
+    end;
+
+handle_message(Uuid, Shard, {Cntrs, Res}) when is_binary(Uuid) ->
+    couch_log:error(" +++ ~p:~p@~B OK Uuid:~p S:~p Resps:~p", [?MODULE, ?FUNCTION_NAME, ?LINE, Uuid, Shard, Cntrs, Res]),
+    case fabric_ring:handle_response(Shard, Uuid, Cntrs, Res, [all]) of
+        {ok, {Cntrs1, Res1}} ->
+            {ok, {Cntrs1, Res1}};
+        {stop, Res1} ->
+            Uuids = fabric_dict:fold(fun(#shard{} = S, Id, #{} = Acc) ->
+                Acc#{Id => S#shard{ref = undefined}}
+            end, #{}, Res1),
+            {stop, Uuids}
+    end;
+
+handle_message(Reason, Shard, {Cntrs, Res}) ->
+    case fabric_ring:handle_error(Shard, Cntrs, Res, [all]) of
+        {ok, Cntrs1} -> {ok, {Cntrs1, Res}};
+        error -> {error, Reason}
+    end.
diff --git a/src/fabric/src/fabric_ring.erl b/src/fabric/src/fabric_ring.erl
index 110edb9..f3d4a7d 100644
--- a/src/fabric/src/fabric_ring.erl
+++ b/src/fabric/src/fabric_ring.erl
@@ -122,6 +122,9 @@ handle_response(Shard, Response, Workers, Responses, RingOpts) ->
 %    a partitioned database. As soon as a result from any of the shards
 %    arrives, result collection stops.
 %
+%  * When RingOpts is [all], responses are accepted until all the shards return
+%  results
+%
 handle_response(Shard, Response, Workers, Responses, RingOpts, CleanupCb) ->
     Workers1 = fabric_dict:erase(Shard, Workers),
     case RingOpts of
@@ -130,7 +133,10 @@ handle_response(Shard, Response, Workers, Responses, RingOpts, CleanupCb) ->
             Responses1 = [{{B, E}, Shard, Response} | Responses],
             handle_response_ring(Workers1, Responses1, CleanupCb);
         [{any, Any}] ->
-            handle_response_any(Shard, Response, Workers1, Any, CleanupCb)
+            handle_response_any(Shard, Response, Workers1, Any, CleanupCb);
+        [all] ->
+            Responses1 = [{Shard, Response} | Responses],
+            handle_response_all(Workers1, Responses1)
     end.
 
 
@@ -164,6 +170,15 @@ handle_response_any(Shard, Response, Workers, Any, CleanupCb) ->
     end.
 
 
+handle_response_all(Workers, Responses) ->
+    case fabric_dict:size(Workers) =:= 0 of
+        true ->
+            {stop, fabric_dict:from_list(Responses)};
+        false ->
+            {ok, {Workers, Responses}}
+    end.
+
+
 % Check if workers still waiting and the already received responses could
 % still form a continous range. The range won't always be the full ring, and
 % the bounds are computed based on the minimum and maximum interval beginning
@@ -186,6 +201,9 @@ is_progress_possible(Counters, Responses, MinB, MaxE, []) ->
     Ranges = fabric_util:worker_ranges(Counters) ++ ResponseRanges,
     mem3_util:get_ring(Ranges, MinB, MaxE) =/= [];
 
+is_progress_possible(Counters, _Responses, _, _, [all]) ->
+    fabric_dict:size(Counters) > 0;
+
 is_progress_possible(Counters, Responses, _, _, [{any, AnyShards}]) ->
     InAny = fun(S) -> lists:member(S#shard{ref = undefined}, AnyShards) end,
     case fabric_dict:filter(fun(S, _) -> InAny(S) end, Counters) of
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 9ed8efd..3f8756a 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -26,7 +26,7 @@
 
 -export([get_db_info/2, get_doc_count/2, get_design_doc_count/2,
          get_update_seq/2, changes/4, map_view/5, reduce_view/5,
-         group_info/3, update_mrview/4]).
+         group_info/3, update_mrview/4, get_uuid/1]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("couch/include/couch_db.hrl").
@@ -318,6 +318,9 @@ compact(ShardName, DesignName) ->
     Ref = erlang:make_ref(),
     Pid ! {'$gen_call', {self(), Ref}, compact}.
 
+get_uuid(DbName) ->
+    with_db(DbName, [], {couch_db, get_uuid, []}).
+
 %%
 %% internal
 %%
diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl
index 84ffef1..51ce566 100644
--- a/src/fabric/src/fabric_util.erl
+++ b/src/fabric/src/fabric_util.erl
@@ -23,6 +23,7 @@
 -export([validate_all_docs_args/2, validate_args/3]).
 -export([upgrade_mrargs/1]).
 -export([worker_ranges/1]).
+-export([isolate/1, isolate/2]).
 
 -compile({inline, [{doc_id_and_rev,1}]}).
 
@@ -345,3 +346,51 @@ worker_ranges(Workers) ->
         [{X, Y} | Acc]
     end, [], Workers),
     lists:usort(Ranges).
+
+
+% If we issue multiple fabric calls from the same process we have to isolate
+% them so in case of error they don't pollute the processes's dictionary or the
+% mailbox
+
+isolate(Fun) ->
+    isolate(Fun, infinity).
+
+
+isolate(Fun, Timeout) ->
+    {Pid, Ref} = erlang:spawn_monitor(fun() -> exit(do_isolate(Fun)) end),
+    receive
+        {'DOWN', Ref, _, _, {'$isolres', Res}} ->
+            Res;
+        {'DOWN', Ref, _, _, {'$isolerr', Tag, Reason, Stack}} ->
+            erlang:raise(Tag, Reason, Stack)
+    after Timeout ->
+        erlang:demonitor(Ref, [flush]),
+        exit(Pid, kill),
+        erlang:error(timeout)
+    end.
+
+
+% OTP_RELEASE is defined in OTP 21+ only
+-ifdef(OTP_RELEASE).
+
+
+do_isolate(Fun) ->
+    try
+        {'$isolres', Fun()}
+    catch Tag:Reason:Stack ->
+        {'$isolerr', Tag, Reason, Stack}
+    end.
+
+
+-else.
+
+
+do_isolate(Fun) ->
+    try
+        {'$isolres', Fun()}
+    catch ?STACKTRACE(Tag, Reason, Stack)
+        {'$isolerr', Tag, Reason, Stack}
+    end.
+
+
+-endif.
diff --git a/src/fabric/src/fabric_view_changes.erl b/src/fabric/src/fabric_view_changes.erl
index febbd31..9d0940d 100644
--- a/src/fabric/src/fabric_view_changes.erl
+++ b/src/fabric/src/fabric_view_changes.erl
@@ -453,12 +453,14 @@ do_unpack_seqs(Opaque, DbName) ->
         true ->
             Unpacked;
         false ->
+            Uuids = get_db_uuids(DbName),
             PotentialWorkers = lists:map(fun({Node, [A, B], Seq}) ->
                 case mem3:get_shard(DbName, Node, [A, B]) of
                     {ok, Shard} ->
                         {Shard, Seq};
                     {error, not_found} ->
-                        {#shard{node = Node, range = [A, B]}, Seq}
+                        Shard = replace_moved_shard(Node, [A, B], Seq, Uuids),
+                        {Shard, Seq}
                 end
             end, Deduped),
             Shards = mem3:shards(DbName),
@@ -495,6 +497,45 @@ get_old_seq(#shard{range=R}=Shard, SinceSeqs) ->
     end.
 
 
+get_db_uuids(DbName) ->
+    % Need to use an isolated process as we are performing a fabric call from
+    % another fabric call and there is a good chance we'd polute the mailbox
+    % with returned messages
+    Timeout = fabric_util:request_timeout(),
+    Uuids = fabric_util:isolate(fun() -> fabric:db_uuids(DbName) end, Timeout),
+    % Trim uuids so we match exactly based on the currently configured
+    % uuid_prefix_len. The assumption is that we are getting an older sequence
+    % from the same cluster and we didn't tweak that relatively obscure config
+    % option in the meantime.
+    PrefixLen = config:get_integer("fabric", "uuid_prefix_len", 7),
+    maps:fold(fun(Uuid, Shard, Acc) ->
+        TrimmedUuid = binary:part(Uuid, {0, PrefixLen}),
+        Acc#{TrimmedUuid => Shard}
+    end, #{}, Uuids).
+
+
+
+
+%% Determine if the missing shard moved to a new node. Do that by matching the
+%% uuids from the current shard map. If we cannot find a moved shard, we return
+%% the original node and range as a shard and hope for the best.
+replace_moved_shard(Node, Range, Seq, #{} = _Uuids) when is_number(Seq) ->
+    % Cannot figure out shard moves wouthout uuid matching
+    #shard{node = Node, range = Range};
+replace_moved_shard(Node, Range, {Seq, Uuid}, #{} = Uuids) ->
+    % Compatibility case for an old seq format which didn't have epoch nodes
+    replace_moved_shard(Node, Range, {Seq, Uuid, Node}, Uuids);
+replace_moved_shard(Node, Range, {_Seq, Uuid, _EpochNode}, #{} = Uuids) ->
+    case Uuids of
+        #{Uuid := #shard{range = Range, node = NewNode}} ->
+            % Found a moved shard by matchign both the uuid and the range
+            #shard{node = NewNode, range = Range};
+        #{} ->
+            % Did not find a moved shard, use the original node
+            #shard{node = Node, range = Range}
+    end.
+
+
 changes_row(Props0) ->
     Props1 = case couch_util:get_value(deleted, Props0) of
         true ->