You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2023/05/27 08:03:48 UTC

[couchdb] 01/01: Fix purge infos replicating to the wrong shards during shard splitting.

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch internal-replicator-purge-info-bug
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit ba7e219b3a3873e3e0d2fe71c53a67291687d034
Author: Nick Vatamaniuc <va...@gmail.com>
AuthorDate: Sat May 27 03:35:45 2023 -0400

    Fix purge infos replicating to the wrong shards during shard splitting.
    
    Previously, internal replicator (mem3_rep) replicated purge infos to/from all
    the target shards. Instead, it should push/pull changes only to
    appropriate ranges if those purge infos belong there based on database's hash
    function.
    
    Users experienced this error as a failure in database which contains purges,
    which was split twice in a row. For example, if a Q=8 database is split to
    Q=16, then split again from Q=16 to Q=32, the second split operation might fail
    with a `split_state:initial_copy ...{{badkey,not_in_range}` error. The
    misplaced purge infos would be noticed only during the second split, when the
    initial copy phase would crash because some purge infos do not hash to neither
    one of the two target ranges. Moreover, the crash would lead to repeated
    retries, which generated a huge job history log.
    
    The fix consists of three improvements:
    
      1) Internal replicator is updated to filter purge infos based on the db hash.
    
      2) Account for the fact that some users' dbs might already contain misplaced
        purge infos. Since it's a known bug, we anticipate that error and ignore
        misplaced purge info during the second shard split operation with a warning
        emitted in the logs.
    
      3) Make similar range errors fatal, and emit a clear error in the logs and
         job history so any future range errors are immediately obvious.
    
    Fixes #4624
---
 src/couch/src/couch_db_split.erl          |  47 ++++++++++---
 src/mem3/src/mem3_rep.erl                 |  67 +++++++++++++-----
 src/mem3/src/mem3_reshard_job.erl         |  13 ++++
 src/mem3/test/eunit/mem3_reshard_test.erl | 108 +++++++++++++++++++++++++++++-
 4 files changed, 204 insertions(+), 31 deletions(-)

diff --git a/src/couch/src/couch_db_split.erl b/src/couch/src/couch_db_split.erl
index d219e3731..dce91877c 100644
--- a/src/couch/src/couch_db_split.erl
+++ b/src/couch/src/couch_db_split.erl
@@ -62,7 +62,10 @@ split(Source, #{} = Targets, PickFun) when
             catch
                 throw:{target_create_error, DbName, Error, TargetDbs} ->
                     cleanup_targets(TargetDbs, Engine),
-                    {error, {target_create_error, DbName, Error}}
+                    {error, {target_create_error, DbName, Error}};
+                throw:{range_error, Context, DocId, TargetDbs} ->
+                    cleanup_targets(TargetDbs, Engine),
+                    {error, {range_error, Context, DocId, maps:keys(TargetDbs)}}
             after
                 couch_db:close(SourceDb)
             end;
@@ -208,10 +211,20 @@ delete_target(DbName, Engine) ->
     DelOpt = [{context, compaction}, sync],
     couch_db_engine:delete(Engine, RootDir, Filepath, DelOpt).
 
-pick_target(DocId, #state{} = State, #{} = Targets) ->
+pick_target(DocId, #state{} = State, #{} = Targets, Context) when
+    is_binary(DocId), is_atom(Context)
+->
     #state{pickfun = PickFun, hashfun = HashFun} = State,
-    Key = PickFun(DocId, maps:keys(Targets), HashFun),
-    {Key, maps:get(Key, Targets)}.
+    TargetRanges = maps:keys(Targets),
+    Key = PickFun(DocId, TargetRanges, HashFun),
+    case Key of
+        not_in_range ->
+            % We found a document, or purge info which doesn't hash to
+            % any of the target ranges. Stop, and raise a fatal exception.
+            throw({range_error, Context, DocId, Targets});
+        [B, E] when is_integer(B), is_integer(E), B =< E, is_map_key(Key, Targets) ->
+            {Key, map_get(Key, Targets)}
+    end.
 
 set_targets_update_seq(#state{targets = Targets} = State) ->
     Seq = couch_db:get_update_seq(State#state.source_db),
@@ -337,11 +350,23 @@ acc_and_flush(Item, #target{} = Target, MaxBuffer, FlushCb) ->
         false -> Target1
     end.
 
-purge_cb({_PSeq, _UUID, Id, _Revs} = PI, #state{targets = Targets} = State) ->
-    {Key, Target} = pick_target(Id, State, Targets),
-    MaxBuffer = State#state.max_buffer_size,
-    Target1 = acc_and_flush(PI, Target, MaxBuffer, fun commit_purge_infos/1),
-    {ok, State#state{targets = Targets#{Key => Target1}}}.
+purge_cb({PSeq, UUID, Id, _Revs} = PI, #state{} = State) ->
+    #state{source_db = Db, targets = Targets} = State,
+    try
+        {Key, Target} = pick_target(Id, State, Targets, purge_info_copy),
+        MaxBuffer = State#state.max_buffer_size,
+        Target1 = acc_and_flush(PI, Target, MaxBuffer, fun commit_purge_infos/1),
+        {ok, State#state{targets = Targets#{Key => Target1}}}
+    catch
+        throw:{range_error, purge_info_copy, _, _} ->
+            % Before 3.4, due to a bug in internal replicator, it was possible
+            % for purge infos which don't belong to the source shard to end up
+            % there during, or after shard splitting. We choose to emit a warning
+            % and ignore the misplaced purge info record.
+            LogMsg = "~p : ignore misplaced purge info pseq:~p uuid:~p doc_id:~p shard:~p",
+            couch_log:warning(LogMsg, [?MODULE, PSeq, UUID, Id, couch_db:name(Db)]),
+            {ok, State}
+    end.
 
 commit_purge_infos(#target{buffer = [], db = Db} = Target) ->
     Target#target{db = Db};
@@ -367,7 +392,7 @@ changes_cb(#doc_info{id = Id}, #state{source_db = Db} = State) ->
     changes_cb(FDI, State);
 changes_cb(#full_doc_info{id = Id} = FDI, #state{} = State) ->
     #state{source_db = SourceDb, targets = Targets} = State,
-    {Key, Target} = pick_target(Id, State, Targets),
+    {Key, Target} = pick_target(Id, State, Targets, changes),
     FDI1 = process_fdi(FDI, SourceDb, Target#target.db),
     MaxBuffer = State#state.max_buffer_size,
     Target1 = acc_and_flush(FDI1, Target, MaxBuffer, fun commit_docs/1),
@@ -500,7 +525,7 @@ copy_local_docs(#state{source_db = Db, targets = Targets} = State) ->
                 <<?LOCAL_DOC_PREFIX, _/binary>> ->
                     % Users' and replicator app's checkpoints go to their
                     % respective shards based on the general hashing algorithm
-                    {Key, Target} = pick_target(Id, State, Acc),
+                    {Key, Target} = pick_target(Id, State, Acc, local_docs),
                     #target{buffer = Docs} = Target,
                     Acc#{Key => Target#target{buffer = [Doc | Docs]}}
             end,
diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl
index d186f9e38..3cc381494 100644
--- a/src/mem3/src/mem3_rep.erl
+++ b/src/mem3/src/mem3_rep.erl
@@ -314,12 +314,18 @@ repl(#acc{db = Db0} = Acc0) ->
             {ok, Count}
     end.
 
-pull_purges_multi(#acc{source = Source} = Acc0) ->
-    #acc{batch_size = Count, seq = UpdateSeq, targets = Targets0} = Acc0,
+pull_purges_multi(#acc{} = Acc0) ->
+    #acc{
+        source = Source,
+        targets = Targets0,
+        batch_size = Count,
+        seq = UpdateSeq,
+        hashfun = HashFun
+    } = Acc0,
     with_src_db(Acc0, fun(Db) ->
         Targets = maps:map(
             fun(_, #tgt{} = T) ->
-                pull_purges(Db, Count, Source, T)
+                pull_purges(Db, Count, Source, T, HashFun)
             end,
             reset_remaining(Targets0)
         ),
@@ -343,7 +349,7 @@ pull_purges_multi(#acc{source = Source} = Acc0) ->
         end
     end).
 
-pull_purges(Db, Count, SrcShard, #tgt{} = Tgt0) ->
+pull_purges(Db, Count, #shard{} = SrcShard, #tgt{} = Tgt0, HashFun) ->
     #tgt{shard = TgtShard} = Tgt0,
     SrcUUID = couch_db:get_uuid(Db),
     #shard{node = TgtNode, name = TgtDbName} = TgtShard,
@@ -354,18 +360,33 @@ pull_purges(Db, Count, SrcShard, #tgt{} = Tgt0) ->
         Infos == [] ->
             ok;
         true ->
-            {ok, _} = couch_db:purge_docs(Db, Infos, [?REPLICATED_CHANGES]),
+            % When shard ranges are split it's possible to pull purges from a
+            % larger target range to a smaller source range, we don't want to
+            % pull purges which don't belong on the source, so we filter them
+            % out using the same pickfun which we use when picking documents
+            #shard{range = SrcRange} = SrcShard,
+            BelongsFun = fun({_UUID, Id, _Revs}) when is_binary(Id) ->
+                mem3_reshard_job:pickfun(Id, [SrcRange], HashFun) =:= SrcRange
+            end,
+            Infos1 = lists:filter(BelongsFun, Infos),
+            {ok, _} = couch_db:purge_docs(Db, Infos1, [?REPLICATED_CHANGES]),
             Body = purge_cp_body(SrcShard, TgtShard, ThroughSeq),
             mem3_rpc:save_purge_checkpoint(TgtNode, TgtDbName, LocalPurgeId, Body)
     end,
     Tgt#tgt{remaining = max(0, Remaining)}.
 
-push_purges_multi(#acc{source = SrcShard} = Acc) ->
-    #acc{batch_size = BatchSize, seq = UpdateSeq, targets = Targets0} = Acc,
+push_purges_multi(#acc{} = Acc) ->
+    #acc{
+        source = SrcShard,
+        targets = Targets0,
+        batch_size = BatchSize,
+        seq = UpdateSeq,
+        hashfun = HashFun
+    } = Acc,
     with_src_db(Acc, fun(Db) ->
         Targets = maps:map(
             fun(_, #tgt{} = T) ->
-                push_purges(Db, BatchSize, SrcShard, T)
+                push_purges(Db, BatchSize, SrcShard, T, HashFun)
             end,
             reset_remaining(Targets0)
         ),
@@ -385,9 +406,9 @@ push_purges_multi(#acc{source = SrcShard} = Acc) ->
         end
     end).
 
-push_purges(Db, BatchSize, SrcShard, Tgt) ->
+push_purges(Db, BatchSize, SrcShard, Tgt, HashFun) ->
     #tgt{shard = TgtShard, purgeid = LocalPurgeId} = Tgt,
-    #shard{node = TgtNode, name = TgtDbName} = TgtShard,
+    #shard{node = TgtNode, range = TgtRange, name = TgtDbName} = TgtShard,
     StartSeq =
         case couch_db:open_doc(Db, LocalPurgeId, []) of
             {ok, #doc{body = {Props}}} ->
@@ -396,15 +417,25 @@ push_purges(Db, BatchSize, SrcShard, Tgt) ->
                 Oldest = couch_db:get_oldest_purge_seq(Db),
                 erlang:max(0, Oldest - 1)
         end,
+    BelongsFun = fun(Id) when is_binary(Id) ->
+        mem3_reshard_job:pickfun(Id, [TgtRange], HashFun) =:= TgtRange
+    end,
     FoldFun = fun({PSeq, UUID, Id, Revs}, {Count, Infos, _}) ->
-        NewCount = Count + length(Revs),
-        NewInfos = [{UUID, Id, Revs} | Infos],
-        Status =
-            if
-                NewCount < BatchSize -> ok;
-                true -> stop
-            end,
-        {Status, {NewCount, NewInfos, PSeq}}
+        case BelongsFun(Id) of
+            true ->
+                NewCount = Count + length(Revs),
+                NewInfos = [{UUID, Id, Revs} | Infos],
+                Status =
+                    if
+                        NewCount < BatchSize -> ok;
+                        true -> stop
+                    end,
+                {Status, {NewCount, NewInfos, PSeq}};
+            false ->
+                % In case of split shard ranges, purges, like documents, will
+                % belong only to one target
+                {ok, {Count, Infos, PSeq}}
+        end
     end,
     InitAcc = {0, [], StartSeq},
     {ok, {_, Infos, ThroughSeq}} =
diff --git a/src/mem3/src/mem3_reshard_job.erl b/src/mem3/src/mem3_reshard_job.erl
index 25ff1cf19..4c93bf98f 100644
--- a/src/mem3/src/mem3_reshard_job.erl
+++ b/src/mem3/src/mem3_reshard_job.erl
@@ -265,6 +265,19 @@ handle_worker_exit(#job{} = Job, _Pid, {error, missing_target}) ->
     couch_log:error(Msg, [?MODULE, jobfmt(Job)]),
     kill_workers(Job),
     exit({error, missing_target});
+handle_worker_exit(#job{} = Job, _Pid, {error, {range_error, _, _, _}} = Reason) ->
+    Msg1 = "~p fatal range error job:~p error:~p",
+    couch_log:error(Msg1, [?MODULE, jobfmt(Job), Reason]),
+    kill_workers(Job),
+    case lists:member(Job#job.split_state, ?CLEAN_TARGET_STATES) of
+        true ->
+            Msg2 = "~p cleaning target after db was deleted ~p",
+            couch_log:error(Msg2, [?MODULE, jobfmt(Job)]),
+            reset_target(Job),
+            exit(Reason);
+        false ->
+            exit(Reason)
+    end;
 handle_worker_exit(#job{} = Job0, _Pid, Reason) ->
     couch_log:error("~p worker error ~p ~p", [?MODULE, jobfmt(Job0), Reason]),
     kill_workers(Job0),
diff --git a/src/mem3/test/eunit/mem3_reshard_test.erl b/src/mem3/test/eunit/mem3_reshard_test.erl
index be539b47a..4376ee48c 100644
--- a/src/mem3/test/eunit/mem3_reshard_test.erl
+++ b/src/mem3/test/eunit/mem3_reshard_test.erl
@@ -77,7 +77,9 @@ mem3_reshard_db_test_() ->
                     fun retries_work/1,
                     fun target_reset_in_initial_copy/1,
                     fun split_an_incomplete_shard_map/1,
-                    fun target_shards_are_locked/1
+                    fun target_shards_are_locked/1,
+                    fun doc_in_bad_range_on_source/1,
+                    fun purge_info_in_bad_range_on_source/1
                 ]
             }
         }
@@ -88,7 +90,7 @@ mem3_reshard_db_test_() ->
 split_one_shard(#{db1 := Db}) ->
     {timeout, ?TIMEOUT,
         ?_test(begin
-            DocSpec = #{docs => 10, delete => [5, 9], mrview => 1, local => 1},
+            DocSpec = #{docs => 10, delete => [5, 9], mrview => 1, local => 10},
             add_test_docs(Db, DocSpec),
 
             % Save documents before the split
@@ -107,6 +109,7 @@ split_one_shard(#{db1 := Db}) ->
 
             % Split the one shard
             [#shard{name = Shard}] = lists:sort(mem3:local_shards(Db)),
+            SrcDocIds = get_shard_doc_ids(Shard),
             {ok, JobId} = mem3_reshard:start_split_job(Shard),
             wait_state(JobId, completed),
 
@@ -117,6 +120,16 @@ split_one_shard(#{db1 := Db}) ->
             ?assertEqual([16#00000000, 16#7fffffff], R1),
             ?assertEqual([16#80000000, 16#ffffffff], R2),
 
+            % Check that docs are on the shards where they belong to
+            [#shard{name = SN1}, #shard{name = SN2}] = Shards1,
+            DocIds1 = get_shard_doc_ids(SN1),
+            DocIds2 = get_shard_doc_ids(SN2),
+            [?assert(mem3:belongs(SN1, Id)) || Id <- DocIds1],
+            [?assert(mem3:belongs(SN2, Id)) || Id <- DocIds2],
+
+            % None of the docs or purges were dropped
+            ?assertEqual(lists:sort(SrcDocIds), lists:sort(DocIds1 ++ DocIds2)),
+
             % Check metadata bits after the split
             ?assertEqual(942, get_revs_limit(Db)),
             ?assertEqual(943, get_purge_infos_limit(Db)),
@@ -173,6 +186,9 @@ split_shard_with_lots_of_purges(#{db1 := Db}) ->
 
             % Split the one shard
             [#shard{name = Shard}] = lists:sort(mem3:local_shards(Db)),
+            % Get purge infos before the split
+            SrcPurges = get_purge_infos(Shard),
+            SrcDocIds = get_shard_doc_ids(Shard),
             {ok, JobId} = mem3_reshard:start_split_job(Shard),
             wait_state(JobId, completed),
 
@@ -183,6 +199,21 @@ split_shard_with_lots_of_purges(#{db1 := Db}) ->
             ?assertEqual([16#00000000, 16#7fffffff], R1),
             ?assertEqual([16#80000000, 16#ffffffff], R2),
 
+            % Check that purges and docs are on the shards where they belong to
+            [#shard{name = SN1}, #shard{name = SN2}] = Shards1,
+            TgtPurges1 = get_purge_infos(SN1),
+            TgtPurges2 = get_purge_infos(SN2),
+            DocIds1 = get_shard_doc_ids(SN1),
+            DocIds2 = get_shard_doc_ids(SN2),
+            [?assert(mem3:belongs(SN1, Id)) || Id <- TgtPurges1],
+            [?assert(mem3:belongs(SN2, Id)) || Id <- TgtPurges2],
+            [?assert(mem3:belongs(SN1, Id)) || Id <- DocIds1],
+            [?assert(mem3:belongs(SN2, Id)) || Id <- DocIds2],
+
+            % None of the docs or purges were dropped
+            ?assertEqual(lists:sort(SrcDocIds), lists:sort(DocIds1 ++ DocIds2)),
+            ?assertEqual(lists:sort(SrcPurges), lists:sort(TgtPurges1 ++ TgtPurges2)),
+
             % Check metadata bits after the split
             ?assertEqual(10, get_purge_infos_limit(Db)),
 
@@ -619,6 +650,53 @@ target_shards_are_locked(#{db1 := Db}) ->
             wait_state(JobId, completed)
         end)}.
 
+% Source somehow got a bad doc which doesn't belong there
+doc_in_bad_range_on_source(#{db1 := Db}) ->
+    {timeout, ?TIMEOUT,
+        ?_test(begin
+            DocSpec = #{docs => 10, mrview => 0, local => 1},
+            add_test_docs(Db, DocSpec),
+
+            % Split first shard
+            [#shard{name = Shard1}] = lists:sort(mem3:local_shards(Db)),
+            {ok, JobId1} = mem3_reshard:start_split_job(Shard1),
+            wait_state(JobId1, completed),
+
+            % Split the first range again but before doing that insert
+            % a doc in the shard with a doc id that wouldn't belong to
+            % any target ranges
+            [#shard{name = Shard2}, _] = lists:sort(mem3:local_shards(Db)),
+            add_shard_doc(Shard2, <<"4">>, [{<<"in_the_wrong">>, <<"shard_range">>}]),
+            {ok, JobId2} = mem3_reshard:start_split_job(Shard2),
+            wait_state(JobId2, failed),
+            {ok, {JobProps}} = mem3_reshard:job(JobId2),
+            StateInfo = proplists:get_value(state_info, JobProps),
+            ?assertMatch({[{reason, <<"{error,{range_error", _/binary>>}]}, StateInfo)
+        end)}.
+
+% Source has a bad doc but we expect that due to a bug in <3.4 so we
+% skip over it and move on
+purge_info_in_bad_range_on_source(#{db1 := Db}) ->
+    {timeout, ?TIMEOUT,
+        ?_test(begin
+            DocSpec = #{docs => 10, mrview => 0, local => 1},
+            add_test_docs(Db, DocSpec),
+
+            % Split first shard
+            [#shard{name = Shard1}] = lists:sort(mem3:local_shards(Db)),
+            {ok, JobId1} = mem3_reshard:start_split_job(Shard1),
+            wait_state(JobId1, completed),
+
+            % Split the first range again but before doing that insert
+            % a purge info in the shard with an id which wouldn't belong to
+            % any target ranges
+            [#shard{name = Shard2}, _] = lists:sort(mem3:local_shards(Db)),
+            PurgeInfo = {couch_uuids:new(), <<"4">>, [{1, <<"a">>}]},
+            add_shard_purge_info(Shard2, PurgeInfo),
+            {ok, JobId2} = mem3_reshard:start_split_job(Shard2),
+            wait_state(JobId2, completed)
+        end)}.
+
 intercept_state(State) ->
     TestPid = self(),
     meck:new(mem3_reshard_job, [passthrough]),
@@ -988,3 +1066,29 @@ atts(Size) when is_integer(Size), Size >= 1 ->
             {data, Data}
         ])
     ].
+
+get_purge_infos(ShardName) when is_binary(ShardName) ->
+    FoldFun = fun({_Seq, _UUID, Id, _Revs}, Acc) -> {ok, [Id | Acc]} end,
+    couch_util:with_db(ShardName, fun(Db) ->
+        PSeq = max(0, couch_db:get_oldest_purge_seq(Db) - 1),
+        {ok, Res} = couch_db:fold_purge_infos(Db, PSeq, FoldFun, []),
+        Res
+    end).
+
+get_shard_doc_ids(ShardName) when is_binary(ShardName) ->
+    FoldFun = fun(#full_doc_info{id = Id}, Acc) -> {ok, [Id | Acc]} end,
+    couch_util:with_db(ShardName, fun(Db) ->
+        {ok, Res} = couch_db:fold_docs(Db, FoldFun, [], []),
+        Res
+    end).
+
+add_shard_doc(ShardName, DocId, Props) ->
+    couch_util:with_db(ShardName, fun(Db) ->
+        Doc = couch_doc:from_json_obj({[{<<"_id">>, DocId}] ++ Props}),
+        couch_db:update_doc(Db, Doc, [])
+    end).
+
+add_shard_purge_info(ShardName, {UUID, Id, Revs}) ->
+    couch_util:with_db(ShardName, fun(Db) ->
+        couch_db:purge_docs(Db, [{UUID, Id, Revs}])
+    end).