You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2021/09/10 15:11:57 UTC

[couchdb] branch 3.x updated: Fix splitting shards with large purge sequences

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch 3.x
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/3.x by this push:
     new 9f08191  Fix splitting shards with large purge sequences
9f08191 is described below

commit 9f081914fe1fd7f31c2c1c7b3ead89427cf342f3
Author: Nick Vatamaniuc <va...@gmail.com>
AuthorDate: Thu Sep 9 22:33:26 2021 -0400

    Fix splitting shards with large purge sequences
    
    Previously, if the source db purge sequence > `purge_infos_limit`, shard
    splitting would crash with the `{{invalid_start_purge_seq,0},
    [{couch_bt_engine,fold_purge_infos,5...` error. That was because purge
    sequences were always copied starting from 0. That would only work as long as
    the total number of purges stayed below the purge_infos_limit threshold. In
    order to correctly gather the purge sequences, the start sequence must be
    based off of the actual oldest sequence currently available.
    
    An example of how it should be done is in the `mem_rpc` module, when loading
    purge infos [0], so here we do exactly the same. The `MinSeq - 1` logic is also
    evident by inspecting the fold_purge_infos [1] function.
    
    The test sets up the exact scenario as described above: reduces the purge info
    limit to 10 then purges 20 documents. By purging more than the limit, we ensure
    the starting sequence is now != 0. However, the purge sequence btree is
    actually trimmed down during compaction. That is why there are a few extra
    helper functions to ensure compaction runs and finishes before shard splitting
    starts.
    
    Fixes: https://github.com/apache/couchdb/issues/3738
    
    [0] https://github.com/apache/couchdb/blob/4ea9f1ea1a2078162d0e281948b56469228af3f7/src/mem3/src/mem3_rpc.erl#L206-L207
    [1] https://github.com/apache/couchdb/blob/3.x/src/couch/src/couch_bt_engine.erl#L625-L637
---
 src/couch/src/couch_db_split.erl          |  3 +-
 src/mem3/test/eunit/mem3_reshard_test.erl | 81 +++++++++++++++++++++++++++++++
 2 files changed, 83 insertions(+), 1 deletion(-)

diff --git a/src/couch/src/couch_db_split.erl b/src/couch/src/couch_db_split.erl
index 3a1f98d..1aa86fb 100644
--- a/src/couch/src/couch_db_split.erl
+++ b/src/couch/src/couch_db_split.erl
@@ -301,7 +301,8 @@ copy_meta(#state{source_db = SourceDb, targets = Targets} = State) ->
 
 
 copy_purge_info(#state{source_db = Db} = State) ->
-    {ok, NewState} = couch_db:fold_purge_infos(Db, 0, fun purge_cb/2, State),
+    Seq = max(0, couch_db:get_oldest_purge_seq(Db) - 1),
+    {ok, NewState} = couch_db:fold_purge_infos(Db, Seq, fun purge_cb/2, State),
     Targets = maps:map(fun(_, #target{} = T) ->
         commit_purge_infos(T)
     end, NewState#state.targets),
diff --git a/src/mem3/test/eunit/mem3_reshard_test.erl b/src/mem3/test/eunit/mem3_reshard_test.erl
index 7cd6b1f..1122590 100644
--- a/src/mem3/test/eunit/mem3_reshard_test.erl
+++ b/src/mem3/test/eunit/mem3_reshard_test.erl
@@ -65,6 +65,7 @@ mem3_reshard_db_test_() ->
                 fun setup/0, fun teardown/1,
                 [
                     fun split_one_shard/1,
+                    fun split_shard_with_lots_of_purges/1,
                     fun update_docs_before_topoff1/1,
                     fun indices_are_built/1,
                     fun split_partitioned_db/1,
@@ -140,6 +141,56 @@ split_one_shard(#{db1 := Db}) ->
     end)}.
 
 
+% Test to check that shard with high number of purges can be split
+split_shard_with_lots_of_purges(#{db1 := Db}) ->
+    {timeout, ?TIMEOUT, ?_test(begin
+        % Set a low purge infos limit, we are planning to overrun it
+        set_purge_infos_limit(Db, 10),
+
+        % Add docs 1..20 and purge them
+        add_test_docs(Db, #{docs => [1, 20]}),
+        IdRevs = maps:fold(fun(Id, #{<<"_rev">> := Rev}, Acc) ->
+            [{Id, [Rev]} | Acc]
+        end, [], get_all_docs(Db)),
+        ?assertMatch({ok, _}, purge_docs(Db, IdRevs)),
+
+        % Compact to trim the purge sequence
+        ok = compact(Db),
+
+        % Add some extra docs, these won't be purged
+        add_test_docs(Db, #{docs => [21, 30]}),
+        Docs0 = get_all_docs(Db),
+
+        % Save db info before splitting
+        DbInfo0 = get_db_info(Db),
+
+        % Split the one shard
+        [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
+        {ok, JobId} = mem3_reshard:start_split_job(Shard),
+        wait_state(JobId, completed),
+
+        % Perform some basic checks that the shard was split
+        Shards1 = lists:sort(mem3:local_shards(Db)),
+        ?assertEqual(2, length(Shards1)),
+        [#shard{range = R1}, #shard{range = R2}] = Shards1,
+        ?assertEqual([16#00000000, 16#7fffffff], R1),
+        ?assertEqual([16#80000000, 16#ffffffff], R2),
+
+        % Check metadata bits after the split
+        ?assertEqual(10, get_purge_infos_limit(Db)),
+
+        DbInfo1 = get_db_info(Db),
+        Docs1 = get_all_docs(Db),
+
+        % When comparing db infos, ignore update sequences they won't be the
+        % same since they are more shards involved after the split
+        ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)),
+
+        % Finally compare that the documents are still there after the split
+        ?assertEqual(Docs0, Docs1)
+    end)}.
+
+
 % This test checks that document added while the shard is being split are not
 % lost. Topoff1 state happens before indices are built
 update_docs_before_topoff1(#{db1 := Db}) ->
@@ -556,6 +607,36 @@ set_purge_infos_limit(DbName, Limit) ->
     end).
 
 
+purge_docs(DbName, DocIdRevs) ->
+    with_proc(fun() ->
+        fabric:purge_docs(DbName, DocIdRevs, [])
+    end).
+
+
+compact(DbName) ->
+    InitFileSize = get_db_file_size(DbName),
+    ok = with_proc(fun() -> fabric:compact(DbName) end),
+    test_util:wait(fun() ->
+        case {compact_running(DbName), get_db_file_size(DbName)} of
+            {true, _} -> wait;
+            {false, FileSize} when FileSize == InitFileSize -> wait;
+            {false, FileSize} when FileSize < InitFileSize -> ok
+        end
+    end, 5000, 200).
+
+
+compact_running(DbName) ->
+    {ok, DbInfo} = with_proc(fun() -> fabric:get_db_info(DbName) end),
+    #{<<"compact_running">> := CompactRunning} = to_map(DbInfo),
+    CompactRunning.
+
+
+get_db_file_size(DbName) ->
+    {ok, DbInfo} = with_proc(fun() -> fabric:get_db_info(DbName) end),
+    #{<<"sizes">> := #{<<"file">> := FileSize}} = to_map(DbInfo),
+    FileSize.
+
+
 set_security(DbName, SecObj) ->
     with_proc(fun() -> fabric:set_security(DbName, SecObj) end).