You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/09/11 20:56:49 UTC

[couchdb] 01/06: Reorder compaction functions

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch compactor-optimize-emsort
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit a7a66c3b74a932adc5f71e4bc5d046928be61389
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Aug 16 11:23:07 2017 -0500

    Reorder compaction functions
    
    Strictly copy/paste moving of code to make a more logical ordering of
    start to finish reading top down.
---
 src/couch/src/couch_db_updater.erl | 373 +++++++++++++++++++------------------
 1 file changed, 188 insertions(+), 185 deletions(-)

diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 78e0b8c..f786048 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -1014,40 +1014,6 @@ sync_header(Db, NewHeader) ->
         waiting_delayed_commit=nil
     }.
 
-copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) ->
-    {ok, {BodyData, BinInfos0}} = couch_db:read_doc(SrcDb, SrcSp),
-    BinInfos = case BinInfos0 of
-    _ when is_binary(BinInfos0) ->
-        couch_compress:decompress(BinInfos0);
-    _ when is_list(BinInfos0) ->
-        % pre 1.2 file format
-        BinInfos0
-    end,
-    % copy the bin values
-    NewBinInfos = lists:map(
-        fun({Name, Type, BinSp, AttLen, RevPos, ExpectedMd5}) ->
-            % 010 UPGRADE CODE
-            {NewBinSp, AttLen, AttLen, ActualMd5, _IdentityMd5} =
-                couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
-            check_md5(ExpectedMd5, ActualMd5),
-            {Name, Type, NewBinSp, AttLen, AttLen, RevPos, ExpectedMd5, identity};
-        ({Name, Type, BinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc1}) ->
-            {NewBinSp, AttLen, _, ActualMd5, _IdentityMd5} =
-                couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
-            check_md5(ExpectedMd5, ActualMd5),
-            Enc = case Enc1 of
-            true ->
-                % 0110 UPGRADE CODE
-                gzip;
-            false ->
-                % 0110 UPGRADE CODE
-                identity;
-            _ ->
-                Enc1
-            end,
-            {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc}
-        end, BinInfos),
-    {BodyData, NewBinInfos}.
 
 merge_lookups(Infos, []) ->
     Infos;
@@ -1065,157 +1031,6 @@ merge_lookups([FDI | RestInfos], Lookups) ->
 check_md5(Md5, Md5) -> ok;
 check_md5(_, _) -> throw(md5_mismatch).
 
-copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
-    DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos],
-    LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds),
-    % COUCHDB-968, make sure we prune duplicates during compaction
-    NewInfos0 = lists:usort(fun(#full_doc_info{id=A}, #full_doc_info{id=B}) ->
-        A =< B
-    end, merge_lookups(MixedInfos, LookupResults)),
-
-    NewInfos1 = lists:map(fun(Info) ->
-        {NewRevTree, FinalAcc} = couch_key_tree:mapfold(fun
-            (_Rev, #leaf{ptr=Sp}=Leaf, leaf, SizesAcc) ->
-                {Body, AttInfos} = copy_doc_attachments(Db, Sp, DestFd),
-                % In the future, we should figure out how to do this for
-                % upgrade purposes.
-                EJsonBody = case is_binary(Body) of
-                    true ->
-                        couch_compress:decompress(Body);
-                    false ->
-                        Body
-                end,
-                SummaryChunk = make_doc_summary(NewDb, {Body, AttInfos}),
-                ExternalSize = ?term_size(EJsonBody),
-                {ok, Pos, SummarySize} = couch_file:append_raw_chunk(
-                    DestFd, SummaryChunk),
-                AttSizes = [{element(3,A), element(4,A)} || A <- AttInfos],
-                NewLeaf = Leaf#leaf{
-                    ptr = Pos,
-                    sizes = #size_info{
-                        active = SummarySize,
-                        external = ExternalSize
-                    },
-                    atts = AttSizes
-                },
-                {NewLeaf, add_sizes(leaf, NewLeaf, SizesAcc)};
-            (_Rev, _Leaf, branch, SizesAcc) ->
-                {?REV_MISSING, SizesAcc}
-        end, {0, 0, []}, Info#full_doc_info.rev_tree),
-        {FinalAS, FinalES, FinalAtts} = FinalAcc,
-        TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts),
-        NewActiveSize = FinalAS + TotalAttSize,
-        NewExternalSize = FinalES + TotalAttSize,
-        Info#full_doc_info{
-            rev_tree = NewRevTree,
-            sizes = #size_info{
-                active = NewActiveSize,
-                external = NewExternalSize
-            }
-        }
-    end, NewInfos0),
-
-    NewInfos = stem_full_doc_infos(Db, NewInfos1),
-    RemoveSeqs =
-    case Retry of
-    nil ->
-        [];
-    OldDocIdTree ->
-        % Compaction is being rerun to catch up to writes during the
-        % first pass. This means we may have docs that already exist
-        % in the seq_tree in the .data file. Here we lookup any old
-        % update_seqs so that they can be removed.
-        Ids = [Id || #full_doc_info{id=Id} <- NewInfos],
-        Existing = couch_btree:lookup(OldDocIdTree, Ids),
-        [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing]
-    end,
-
-    {ok, SeqTree} = couch_btree:add_remove(
-            NewDb#db.seq_tree, NewInfos, RemoveSeqs),
-
-    FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) ->
-        {{Id, Seq}, FDI}
-    end, NewInfos),
-    {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs),
-    update_compact_task(length(NewInfos)),
-    NewDb#db{id_tree=IdEms, seq_tree=SeqTree}.
-
-
-copy_compact(Db, NewDb0, Retry) ->
-    Compression = couch_compress:get_compression_method(),
-    NewDb = NewDb0#db{compression=Compression},
-    TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
-    BufferSize = list_to_integer(
-        config:get("database_compaction", "doc_buffer_size", "524288")),
-    CheckpointAfter = couch_util:to_integer(
-        config:get("database_compaction", "checkpoint_after",
-            BufferSize * 10)),
-
-    EnumBySeqFun =
-    fun(DocInfo, _Offset,
-            {AccNewDb, AccUncopied, AccUncopiedSize, AccCopiedSize}) ->
-
-        Seq = case DocInfo of
-            #full_doc_info{} -> DocInfo#full_doc_info.update_seq;
-            #doc_info{} -> DocInfo#doc_info.high_seq
-        end,
-
-        AccUncopiedSize2 = AccUncopiedSize + ?term_size(DocInfo),
-        if AccUncopiedSize2 >= BufferSize ->
-            NewDb2 = copy_docs(
-                Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
-            AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2,
-            if AccCopiedSize2 >= CheckpointAfter ->
-                CommNewDb2 = commit_compaction_data(NewDb2#db{update_seq=Seq}),
-                {ok, {CommNewDb2, [], 0, 0}};
-            true ->
-                {ok, {NewDb2#db{update_seq = Seq}, [], 0, AccCopiedSize2}}
-            end;
-        true ->
-            {ok, {AccNewDb, [DocInfo | AccUncopied], AccUncopiedSize2,
-                AccCopiedSize}}
-        end
-    end,
-
-    TaskProps0 = [
-        {type, database_compaction},
-        {database, Db#db.name},
-        {progress, 0},
-        {changes_done, 0},
-        {total_changes, TotalChanges}
-    ],
-    case (Retry =/= nil) and couch_task_status:is_task_added() of
-    true ->
-        couch_task_status:update([
-            {retry, true},
-            {progress, 0},
-            {changes_done, 0},
-            {total_changes, TotalChanges}
-        ]);
-    false ->
-        couch_task_status:add_task(TaskProps0),
-        couch_task_status:set_update_frequency(500)
-    end,
-
-    {ok, _, {NewDb2, Uncopied, _, _}} =
-        couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun,
-            {NewDb, [], 0, 0},
-            [{start_key, NewDb#db.update_seq + 1}]),
-
-    NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
-
-    % copy misc header values
-    if NewDb3#db.security /= Db#db.security ->
-        {ok, Ptr, _} = couch_file:append_term(
-            NewDb3#db.fd, Db#db.security,
-            [{compression, NewDb3#db.compression}]),
-        NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr};
-    true ->
-        NewDb4 = NewDb3
-    end,
-
-    commit_compaction_data(NewDb4#db{update_seq=Db#db.update_seq}).
-
 
 start_copy_compact(#db{}=Db) ->
     erlang:put(io_priority, {db_compact, Db#db.name}),
@@ -1307,6 +1122,194 @@ copy_purge_info(OldDb, NewDb) ->
     end.
 
 
+copy_compact(Db, NewDb0, Retry) ->
+    Compression = couch_compress:get_compression_method(),
+    NewDb = NewDb0#db{compression=Compression},
+    TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
+    BufferSize = list_to_integer(
+        config:get("database_compaction", "doc_buffer_size", "524288")),
+    CheckpointAfter = couch_util:to_integer(
+        config:get("database_compaction", "checkpoint_after",
+            BufferSize * 10)),
+
+    EnumBySeqFun =
+    fun(DocInfo, _Offset,
+            {AccNewDb, AccUncopied, AccUncopiedSize, AccCopiedSize}) ->
+
+        Seq = case DocInfo of
+            #full_doc_info{} -> DocInfo#full_doc_info.update_seq;
+            #doc_info{} -> DocInfo#doc_info.high_seq
+        end,
+
+        AccUncopiedSize2 = AccUncopiedSize + ?term_size(DocInfo),
+        if AccUncopiedSize2 >= BufferSize ->
+            NewDb2 = copy_docs(
+                Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
+            AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2,
+            if AccCopiedSize2 >= CheckpointAfter ->
+                CommNewDb2 = commit_compaction_data(NewDb2#db{update_seq=Seq}),
+                {ok, {CommNewDb2, [], 0, 0}};
+            true ->
+                {ok, {NewDb2#db{update_seq = Seq}, [], 0, AccCopiedSize2}}
+            end;
+        true ->
+            {ok, {AccNewDb, [DocInfo | AccUncopied], AccUncopiedSize2,
+                AccCopiedSize}}
+        end
+    end,
+
+    TaskProps0 = [
+        {type, database_compaction},
+        {database, Db#db.name},
+        {progress, 0},
+        {changes_done, 0},
+        {total_changes, TotalChanges}
+    ],
+    case (Retry =/= nil) and couch_task_status:is_task_added() of
+    true ->
+        couch_task_status:update([
+            {retry, true},
+            {progress, 0},
+            {changes_done, 0},
+            {total_changes, TotalChanges}
+        ]);
+    false ->
+        couch_task_status:add_task(TaskProps0),
+        couch_task_status:set_update_frequency(500)
+    end,
+
+    {ok, _, {NewDb2, Uncopied, _, _}} =
+        couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun,
+            {NewDb, [], 0, 0},
+            [{start_key, NewDb#db.update_seq + 1}]),
+
+    NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
+
+    % copy misc header values
+    if NewDb3#db.security /= Db#db.security ->
+        {ok, Ptr, _} = couch_file:append_term(
+            NewDb3#db.fd, Db#db.security,
+            [{compression, NewDb3#db.compression}]),
+        NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr};
+    true ->
+        NewDb4 = NewDb3
+    end,
+
+    commit_compaction_data(NewDb4#db{update_seq=Db#db.update_seq}).
+
+
+copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
+    DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos],
+    LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds),
+    % COUCHDB-968, make sure we prune duplicates during compaction
+    NewInfos0 = lists:usort(fun(#full_doc_info{id=A}, #full_doc_info{id=B}) ->
+        A =< B
+    end, merge_lookups(MixedInfos, LookupResults)),
+
+    NewInfos1 = lists:map(fun(Info) ->
+        {NewRevTree, FinalAcc} = couch_key_tree:mapfold(fun
+            (_Rev, #leaf{ptr=Sp}=Leaf, leaf, SizesAcc) ->
+                {Body, AttInfos} = copy_doc_attachments(Db, Sp, DestFd),
+                % In the future, we should figure out how to do this for
+                % upgrade purposes.
+                EJsonBody = case is_binary(Body) of
+                    true ->
+                        couch_compress:decompress(Body);
+                    false ->
+                        Body
+                end,
+                SummaryChunk = make_doc_summary(NewDb, {Body, AttInfos}),
+                ExternalSize = ?term_size(EJsonBody),
+                {ok, Pos, SummarySize} = couch_file:append_raw_chunk(
+                    DestFd, SummaryChunk),
+                AttSizes = [{element(3,A), element(4,A)} || A <- AttInfos],
+                NewLeaf = Leaf#leaf{
+                    ptr = Pos,
+                    sizes = #size_info{
+                        active = SummarySize,
+                        external = ExternalSize
+                    },
+                    atts = AttSizes
+                },
+                {NewLeaf, add_sizes(leaf, NewLeaf, SizesAcc)};
+            (_Rev, _Leaf, branch, SizesAcc) ->
+                {?REV_MISSING, SizesAcc}
+        end, {0, 0, []}, Info#full_doc_info.rev_tree),
+        {FinalAS, FinalES, FinalAtts} = FinalAcc,
+        TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts),
+        NewActiveSize = FinalAS + TotalAttSize,
+        NewExternalSize = FinalES + TotalAttSize,
+        Info#full_doc_info{
+            rev_tree = NewRevTree,
+            sizes = #size_info{
+                active = NewActiveSize,
+                external = NewExternalSize
+            }
+        }
+    end, NewInfos0),
+
+    NewInfos = stem_full_doc_infos(Db, NewInfos1),
+    RemoveSeqs =
+    case Retry of
+    nil ->
+        [];
+    OldDocIdTree ->
+        % Compaction is being rerun to catch up to writes during the
+        % first pass. This means we may have docs that already exist
+        % in the seq_tree in the .data file. Here we lookup any old
+        % update_seqs so that they can be removed.
+        Ids = [Id || #full_doc_info{id=Id} <- NewInfos],
+        Existing = couch_btree:lookup(OldDocIdTree, Ids),
+        [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing]
+    end,
+
+    {ok, SeqTree} = couch_btree:add_remove(
+            NewDb#db.seq_tree, NewInfos, RemoveSeqs),
+
+    FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) ->
+        {{Id, Seq}, FDI}
+    end, NewInfos),
+    {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs),
+    update_compact_task(length(NewInfos)),
+    NewDb#db{id_tree=IdEms, seq_tree=SeqTree}.
+
+
+copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) ->
+    {ok, {BodyData, BinInfos0}} = couch_db:read_doc(SrcDb, SrcSp),
+    BinInfos = case BinInfos0 of
+    _ when is_binary(BinInfos0) ->
+        couch_compress:decompress(BinInfos0);
+    _ when is_list(BinInfos0) ->
+        % pre 1.2 file format
+        BinInfos0
+    end,
+    % copy the bin values
+    NewBinInfos = lists:map(
+        fun({Name, Type, BinSp, AttLen, RevPos, ExpectedMd5}) ->
+            % 010 UPGRADE CODE
+            {NewBinSp, AttLen, AttLen, ActualMd5, _IdentityMd5} =
+                couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
+            check_md5(ExpectedMd5, ActualMd5),
+            {Name, Type, NewBinSp, AttLen, AttLen, RevPos, ExpectedMd5, identity};
+        ({Name, Type, BinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc1}) ->
+            {NewBinSp, AttLen, _, ActualMd5, _IdentityMd5} =
+                couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
+            check_md5(ExpectedMd5, ActualMd5),
+            Enc = case Enc1 of
+            true ->
+                % 0110 UPGRADE CODE
+                gzip;
+            false ->
+                % 0110 UPGRADE CODE
+                identity;
+            _ ->
+                Enc1
+            end,
+            {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc}
+        end, BinInfos),
+    {BodyData, NewBinInfos}.
+
+
 commit_compaction_data(#db{}=Db) ->
     % Compaction needs to write headers to both the data file
     % and the meta file so if we need to restart we can pick

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.