You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/08/16 21:15:25 UTC

[couchdb] 01/02: Copy the docid tree from the old db

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch compactor-rewrite
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 9c4c41ac22577e6ec8c46c44f0efc13c38712d3d
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Aug 16 13:47:38 2017 -0500

    Copy the docid tree from the old db
    
    This optimizes the first pass of compaction to avoid writing data to the
    emsort structure stored in the .compact.meta file. Instead of writing
    data there we instead just re-read the docid tree from the original
    database.
---
 src/couch/src/couch_db_updater.erl | 164 ++++++++++++++++++++++++++++++++-----
 1 file changed, 142 insertions(+), 22 deletions(-)

diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index a83e3b4..221b771 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -26,6 +26,7 @@
 -record(comp_st, {
     old_db,
     new_db,
+    new_id_tree,
     meta_fd,
     retry
 }).
@@ -1049,10 +1050,9 @@ start_copy_compact(#db{}=Db) ->
         fun copy_purge_info/1,
         fun copy_compact/1,
         fun commit_compaction_data/1,
-        fun sort_meta_data/1,
-        fun commit_compaction_data/1,
-        fun copy_meta_data/1,
-        fun compact_final_sync/1
+        fun copy_doc_ids/1,
+        fun compact_final_sync/1,
+        fun verify_compaction/1
     ],
 
     FinalCompSt = lists:foldl(fun(Stage, CompSt) ->
@@ -1092,8 +1092,9 @@ open_compaction_files(OldDb) ->
             #comp_st{
                 old_db = OldDb,
                 new_db = Db1,
+                new_id_tree = Db0#db.id_tree,
                 meta_fd = MetaFd,
-                retry = Db0#db.id_tree
+                retry = (couch_btree:size(Db0#db.id_tree) > 0)
             };
         _ when DataHdrIsDbHdr ->
             % We tried to swap out the compaction but there were
@@ -1105,8 +1106,9 @@ open_compaction_files(OldDb) ->
             #comp_st{
                 old_db = OldDb,
                 new_db = Db1,
+                new_id_tree = Db0#db.id_tree,
                 meta_fd = MetaFd,
-                retry = Db0#db.id_tree
+                retry = true
             };
         _ ->
             % We're starting a compaction from scratch
@@ -1118,8 +1120,9 @@ open_compaction_files(OldDb) ->
             #comp_st{
                 old_db = OldDb,
                 new_db = Db1,
+                new_id_tree = Db0#db.id_tree,
                 meta_fd = MetaFd,
-                retry = nil
+                retry = false
             }
     end,
     unlink(DataFd),
@@ -1194,7 +1197,7 @@ copy_compact(#comp_st{} = CompSt) ->
         AccUncopiedSize2 = AccUncopiedSize + ?term_size(DocInfo),
         if AccUncopiedSize2 >= BufferSize ->
             NewDb2 = copy_docs(
-                Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
+                Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), CompSt),
             AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2,
             if AccCopiedSize2 >= CheckpointAfter ->
                 CommNewDb2 = commit_compaction_data(NewDb2#db{update_seq=Seq}),
@@ -1216,7 +1219,7 @@ copy_compact(#comp_st{} = CompSt) ->
         {changes_done, 0},
         {total_changes, TotalChanges}
     ],
-    case (Retry /= nil) and couch_task_status:is_task_added() of
+    case Retry and couch_task_status:is_task_added() of
     true ->
         couch_task_status:update([
             {retry, true},
@@ -1234,7 +1237,7 @@ copy_compact(#comp_st{} = CompSt) ->
             {NewDb, [], 0, 0},
             [{start_key, NewDb#db.update_seq + 1}]),
 
-    NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
+    NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), CompSt),
 
     % copy misc header values
     if NewDb3#db.security /= Db#db.security ->
@@ -1253,7 +1256,7 @@ copy_compact(#comp_st{} = CompSt) ->
     }.
 
 
-copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
+copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, CompSt) ->
     DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos],
     LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds),
     % COUCHDB-968, make sure we prune duplicates during compaction
@@ -1305,28 +1308,41 @@ copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
 
     NewInfos = stem_full_doc_infos(Db, NewInfos1),
     RemoveSeqs =
-    case Retry of
-    nil ->
-        [];
-    OldDocIdTree ->
+    case CompSt#comp_st.retry of
+    true ->
+        NewDocIdTree = CompSt#comp_st.new_id_tree,
         % Compaction is being rerun to catch up to writes during the
         % first pass. This means we may have docs that already exist
         % in the seq_tree in the .data file. Here we lookup any old
         % update_seqs so that they can be removed.
         Ids = [Id || #full_doc_info{id=Id} <- NewInfos],
-        Existing = couch_btree:lookup(OldDocIdTree, Ids),
-        [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing]
+        Existing = couch_btree:lookup(NewDocIdTree, Ids),
+        [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing];
+    false ->
+        []
     end,
 
     {ok, SeqTree} = couch_btree:add_remove(
             NewDb#db.seq_tree, NewInfos, RemoveSeqs),
 
-    FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) ->
-        {{Id, Seq}, FDI}
-    end, NewInfos),
-    {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs),
+    NewIdEms = case CompSt#comp_st.retry of
+        false ->
+            % If we're not in a retry compaction then there's
+            % no reason for us to copy doc infos to the emsort
+            % structure. We can just re-use the id_tree in the
+            % old db record.
+            NewDb#db.id_tree;
+        true ->
+            % We have to store doc infos because of the possible
+            % changes in the update sequences.
+            FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) ->
+                {{Id, Seq}, FDI}
+            end, NewInfos),
+            {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs),
+            IdEms
+    end,
     update_compact_task(length(NewInfos)),
-    NewDb#db{id_tree=IdEms, seq_tree=SeqTree}.
+    NewDb#db{id_tree=NewIdEms, seq_tree=SeqTree}.
 
 
 copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) ->
@@ -1417,6 +1433,88 @@ bind_id_tree(Db, Fd, State) ->
     Db#db{id_tree=IdBtree}.
 
 
+copy_doc_ids(#comp_st{retry = false} = CompSt) ->
+    copy_from_tree(CompSt);
+copy_doc_ids(#comp_st{retry = true} = CompSt) ->
+    Stages = [
+        fun sort_meta_data/1,
+        fun commit_compaction_data/1,
+        fun copy_meta_data/1
+    ],
+    lists:foldl(fun(Stage, St) -> Stage(St) end, CompSt, Stages).
+
+
+copy_from_tree(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
+    #db{
+        fd = OldFd,
+        header = OldHdr
+    } = OldDb,
+
+    % We're reopening a custom view of the id_tree to
+    % avoid the work of creating complete `#full_doc_info{}`
+    % records.
+    Compression = couch_compress:get_compression_method(),
+    OldIdTreeState = couch_btree:get_state(OldDb#db.id_tree),
+    {ok, OldIdTree} = couch_btree:open(OldIdTreeState, OldFd, [
+        {split, fun ?MODULE:btree_by_id_split/1},
+        {join, fun compact_id_join/2},
+        {reduce, fun ?MODULE:btree_by_id_reduce/2},
+        {compression, Compression}
+    ]),
+
+    NewIdTree = CompSt#comp_st.new_id_tree,
+    NewSeqTree = NewDb#db.seq_tree,
+
+    BufferSize = list_to_integer(
+        config:get("database_compaction", "doc_buffer_size", "524288")),
+
+    EnumByIdFun = fun({DocId, UpdateSeq}, _, {DstIdTree, Batch, Count}) ->
+        case Count >= 1000 of
+            true ->
+                DstIdTree2 = flush_docid_batch(
+                        Batch, NewSeqTree, DstIdTree, BufferSize),
+                {ok, {DstIdTree2, [{DocId, UpdateSeq}], 1}};
+            false ->
+                {ok, {DstIdTree, [{DocId, UpdateSeq} | Batch], Count + 1}}
+        end
+    end,
+
+    {ok, _, {NewIdTree2, LastBatch, _}} =
+        couch_btree:foldl(OldIdTree, EnumByIdFun, {NewIdTree, [], 0}, []),
+    NewIdTree3 = flush_docid_batch(
+            LastBatch, NewSeqTree, NewIdTree2, BufferSize),
+    CompSt#comp_st{
+        new_db = NewDb#db{id_tree = NewIdTree3}
+    }.
+
+
+flush_docid_batch(DocIds, SeqTree, IdTree, MaxBufSize) ->
+    flush_docid_batch(DocIds, SeqTree, IdTree, MaxBufSize, [], 0).
+
+
+flush_docid_batch([], _, IdTree, _, Batch, _) ->
+    {ok, NewIdTree} = couch_btree:add(IdTree, Batch),
+    NewIdTree;
+
+flush_docid_batch(DocIds, SeqTree, IdTree, MaxBufSize, Buf, BufSize)
+        when BufSize >= MaxBufSize ->
+    {ok, NewIdTree} = couch_btree:add(IdTree, Buf),
+    flush_docid_batch(DocIds, SeqTree, NewIdTree, MaxBufSize, [], 0);
+
+flush_docid_batch(DocIds, SeqTree, IdTree, MaxBufSize, Buf, BufSize) ->
+    [{_Id, Seq} | Rest] = DocIds,
+    [{ok, #full_doc_info{} = FDI}] = couch_btree:lookup(SeqTree, [Seq]),
+    NewBuf = [FDI | Buf],
+    NewBufSize = BufSize + ?term_size(FDI),
+    flush_docid_batch(Rest, SeqTree, IdTree, MaxBufSize, NewBuf, NewBufSize).
+
+
+compact_id_join(Id, {HighSeq, _, _}) ->
+    {Id, HighSeq};
+compact_id_join(Id, {HighSeq, _, _, _}) ->
+    {Id, HighSeq}.
+
+
 sort_meta_data(#comp_st{new_db = Db0} = CompSt) ->
     {ok, Ems} = couch_emsort:merge(Db0#db.id_tree),
     CompSt#comp_st{
@@ -1466,6 +1564,28 @@ compact_final_sync(#comp_st{new_db = NewDb0} = CompSt) ->
     }.
 
 
+verify_compaction(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
+    {ok, OldIdReds0} = couch_btree:full_reduce(OldDb#db.id_tree),
+    {ok, OldSeqReds} = couch_btree:full_reduce(OldDb#db.seq_tree),
+    {ok, NewIdReds0} = couch_btree:full_reduce(NewDb#db.id_tree),
+    {ok, NewSeqReds} = couch_btree:full_reduce(NewDb#db.seq_tree),
+    {
+        OldDocCount,
+        OldDelDocCount,
+        #size_info{external = OldExternalSize}
+    } = OldIdReds0,
+    OldIdReds = {OldDocCount, OldDelDocCount, OldExternalSize},
+    {
+        NewDocCount,
+        NewDelDocCount,
+        #size_info{external = NewExternalSize}
+    } = NewIdReds0,
+    NewIdReds = {NewDocCount, NewDelDocCount, NewExternalSize},
+    NewIdReds = OldIdReds,
+    NewSeqReds = OldSeqReds,
+    CompSt.
+
+
 merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 ->
     #merge_st{
         id_tree=IdTree0,

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.