2017/09/15 14:29:52

FEEDBACK ONLY: Compactor optimize emsort

davisp commented on a change in pull request #806: FEEDBACK ONLY: Compactor optimize emsort

 File path: src/couch/src/couch_db_updater.erl
 @@ -1065,83 +1039,156 @@ merge_lookups([FDI | RestInfos], Lookups) ->
 check_md5(Md5, Md5) -> ok;
 check_md5(_, _) -> throw(md5_mismatch).
-copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
-    DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos],
-    LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds),
-    % COUCHDB-968, make sure we prune duplicates during compaction
-    NewInfos0 = lists:usort(fun(#full_doc_info{id=A}, #full_doc_info{id=B}) ->
-        A =< B
-    end, merge_lookups(MixedInfos, LookupResults)),
-    NewInfos1 = lists:map(fun(Info) ->
-        {NewRevTree, FinalAcc} = couch_key_tree:mapfold(fun
-            (_Rev, #leaf{ptr=Sp}=Leaf, leaf, SizesAcc) ->
-                {Body, AttInfos} = copy_doc_attachments(Db, Sp, DestFd),
-                % In the future, we should figure out how to do this for
-                % upgrade purposes.
-                EJsonBody = case is_binary(Body) of
-                    true ->
-                        couch_compress:decompress(Body);
-                    false ->
-                        Body
-                end,
-                SummaryChunk = make_doc_summary(NewDb, {Body, AttInfos}),
-                ExternalSize = ?term_size(EJsonBody),
-                {ok, Pos, SummarySize} = couch_file:append_raw_chunk(
-                    DestFd, SummaryChunk),
-                AttSizes = [{element(3,A), element(4,A)} || A <- AttInfos],
-                NewLeaf = Leaf#leaf{
-                    ptr = Pos,
-                    sizes = #size_info{
-                        active = SummarySize,
-                        external = ExternalSize
-                    },
-                    atts = AttSizes
-                },
-                {NewLeaf, add_sizes(leaf, NewLeaf, SizesAcc)};
-            (_Rev, _Leaf, branch, SizesAcc) ->
-                {?REV_MISSING, SizesAcc}
-        end, {0, 0, []}, Info#full_doc_info.rev_tree),
-        {FinalAS, FinalES, FinalAtts} = FinalAcc,
-        TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts),
-        NewActiveSize = FinalAS + TotalAttSize,
-        NewExternalSize = FinalES + TotalAttSize,
-        Info#full_doc_info{
-            rev_tree = NewRevTree,
-            sizes = #size_info{
-                active = NewActiveSize,
-                external = NewExternalSize
-            }
-        }
-    end, NewInfos0),
+        couch_db_updater_ev:event(Name)).
+-define(COMP_EVENT(Name), ignore).
-    NewInfos = stem_full_doc_infos(Db, NewInfos1),
-    RemoveSeqs =
-    case Retry of
-    nil ->
-        [];
-    OldDocIdTree ->
-        % Compaction is being rerun to catch up to writes during the
-        % first pass. This means we may have docs that already exist
-        % in the seq_tree in the .data file. Here we lookup any old
-        % update_seqs so that they can be removed.
-        Ids = [Id || #full_doc_info{id=Id} <- NewInfos],
-        Existing = couch_btree:lookup(OldDocIdTree, Ids),
-        [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing]
+start_copy_compact(#db{}=Db) ->
+    erlang:put(io_priority, {db_compact,}),
+    couch_log:debug("Compaction process spawned for db \"~s\"", []),
+    ?COMP_EVENT(init),
+    {ok, InitCompSt} = open_compaction_files(Db),
+    ?COMP_EVENT(files_opened),
+    Stages = [
+        fun copy_purge_info/1,
+        fun copy_compact/1,
+        fun commit_compaction_data/1,
+        fun sort_meta_data/1,
+        fun commit_compaction_data/1,
+        fun copy_meta_data/1,
+        fun compact_final_sync/1,
+        fun verify_compaction/1
+    ],
+    FinalCompSt = lists:foldl(fun(Stage, CompSt) ->
+        Stage(CompSt)
+    end, InitCompSt, Stages),
+    #comp_st{
+        new_db = FinalNewDb,
+        meta_fd = MetaFd
+    } = FinalCompSt,
+    close_db(FinalNewDb),
+    ok = couch_file:close(MetaFd),
+    ?COMP_EVENT(before_notify),
+    gen_server:cast(Db#db.main_pid, {compact_done, FinalNewDb#db.filepath}).
+open_compaction_files(OldDb) ->
+    #db{
+        name = DbName,
+        filepath = DbFilePath,
+        options = Options,
+        header = SrcHdr
+    } = OldDb,
+    DataFile = DbFilePath ++ "",
+    MetaFile = DbFilePath ++ ".compact.meta",
+    {ok, DataFd, DataHdr} = open_compaction_file(DataFile),
+    {ok, MetaFd, MetaHdr} = open_compaction_file(MetaFile),
+    DataHdrIsDbHdr = couch_db_header:is_header(DataHdr),
+    CompSt = case {DataHdr, MetaHdr} of
+        {#comp_header{}=A, #comp_header{}=A} ->
+            % We're restarting a compaction that did not finish
+            % before trying to swap out with the original db
+            DbHeader = A#comp_header.db_header,
+            Db0 = init_db(DbName, DataFile, DataFd, DbHeader, Options),
+            Db1 = bind_emsort(Db0, MetaFd, A#comp_header.meta_st),
+            #comp_st{
+                old_db = OldDb,
+                new_db = Db1,
+                meta_fd = MetaFd,
+                retry = Db0#db.id_tree
+            };
+        _ when DataHdrIsDbHdr ->
+            % We tried to swap out the compaction but there were
+            % writes to the database during compaction. Start
+            % a compaction retry.
+            ok = reset_compaction_file(MetaFd, couch_db_header:from(SrcHdr)),
+            Db0 = init_db(DbName, DataFile, DataFd, DataHdr, Options),
+            Db1 = bind_emsort(Db0, MetaFd, nil),
+            #comp_st{
+                old_db = OldDb,
+                new_db = Db1,
+                meta_fd = MetaFd,
+                retry = Db0#db.id_tree
+            };
+        _ ->
+            % We're starting a compaction from scratch
+            Header = couch_db_header:from(SrcHdr),
+            ok = reset_compaction_file(DataFd, Header),
+            ok = reset_compaction_file(MetaFd, Header),
+            Db0 = init_db(DbName, DataFile, DataFd, Header, Options),
+            Db1 = bind_emsort(Db0, MetaFd, nil),
+            #comp_st{
+                old_db = OldDb,
+                new_db = Db1,
+                meta_fd = MetaFd,
+                retry = nil
+            }
+    unlink(DataFd),
 Review comment:
   init_db takes the monitor for DataFd. Its similar to how things work when opening a database normally. This is an artefact of not having a gen_server:start_monitor call.
