You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/08/28 18:46:57 UTC

[couchdb] branch compactor-id-tree-optimization updated (e3d7b28 -> ec5a512)

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a change to branch compactor-id-tree-optimization
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


    omit e3d7b28  Copy the docid tree from the old db
     new ec5a512  Copy the docid tree from the old db

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (e3d7b28)
            \
             N -- N -- N   refs/heads/compactor-id-tree-optimization (ec5a512)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/couch/test/couch_db_updater_tests.erl | 11 +++++++++++
 1 file changed, 11 insertions(+)

-- 
To stop receiving notification emails like this one, please contact
['"commits@couchdb.apache.org" <co...@couchdb.apache.org>'].

[couchdb] 01/01: Copy the docid tree from the old db

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch compactor-id-tree-optimization
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit ec5a512d2f9c6f440dfce70e407dcbe3d82941bd
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Aug 16 13:47:38 2017 -0500

    Copy the docid tree from the old db
    
    This optimizes the first pass of compaction to avoid writing data to the
    emsort structure stored in the .compact.meta file. Instead of writing
    data there we instead just re-read the docid tree from the original
    database.
    
    This also adds a fairly comprehensive test suite checking the ability of
    the compactor to properly react to various errors during compaction.
---
 src/couch/src/couch_db_updater.erl        | 211 +++++++++++++++++--
 src/couch/test/couch_db_updater_ev.erl    | 106 ++++++++++
 src/couch/test/couch_db_updater_tests.erl | 324 ++++++++++++++++++++++++++++++
 3 files changed, 619 insertions(+), 22 deletions(-)

diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index a83e3b4..38a339e 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -26,6 +26,7 @@
 -record(comp_st, {
     old_db,
     new_db,
+    new_id_tree,
     meta_fd,
     retry
 }).
@@ -1039,20 +1040,29 @@ check_md5(Md5, Md5) -> ok;
 check_md5(_, _) -> throw(md5_mismatch).
 
 
+-ifdef(TEST).
+-define(COMP_EVENT(Name),
+        couch_db_updater_ev:event(Name)).
+-else.
+-define(COMP_EVENT(Name), ignore).
+-endif.
+
+
 start_copy_compact(#db{}=Db) ->
     erlang:put(io_priority, {db_compact, Db#db.name}),
     couch_log:debug("Compaction process spawned for db \"~s\"", [Db#db.name]),
 
+    ?COMP_EVENT(init),
     {ok, InitCompSt} = open_compaction_files(Db),
+    ?COMP_EVENT(files_opened),
 
     Stages = [
         fun copy_purge_info/1,
         fun copy_compact/1,
         fun commit_compaction_data/1,
-        fun sort_meta_data/1,
-        fun commit_compaction_data/1,
-        fun copy_meta_data/1,
-        fun compact_final_sync/1
+        fun copy_doc_ids/1,
+        fun compact_final_sync/1,
+        fun verify_compaction/1
     ],
 
     FinalCompSt = lists:foldl(fun(Stage, CompSt) ->
@@ -1067,6 +1077,7 @@ start_copy_compact(#db{}=Db) ->
     close_db(FinalNewDb),
     ok = couch_file:close(MetaFd),
 
+    ?COMP_EVENT(before_notify),
     gen_server:cast(Db#db.main_pid, {compact_done, FinalNewDb#db.filepath}).
 
 
@@ -1092,8 +1103,9 @@ open_compaction_files(OldDb) ->
             #comp_st{
                 old_db = OldDb,
                 new_db = Db1,
+                new_id_tree = Db0#db.id_tree,
                 meta_fd = MetaFd,
-                retry = Db0#db.id_tree
+                retry = (couch_btree:size(Db0#db.id_tree) > 0)
             };
         _ when DataHdrIsDbHdr ->
             % We tried to swap out the compaction but there were
@@ -1105,8 +1117,9 @@ open_compaction_files(OldDb) ->
             #comp_st{
                 old_db = OldDb,
                 new_db = Db1,
+                new_id_tree = Db0#db.id_tree,
                 meta_fd = MetaFd,
-                retry = Db0#db.id_tree
+                retry = true
             };
         _ ->
             % We're starting a compaction from scratch
@@ -1118,8 +1131,9 @@ open_compaction_files(OldDb) ->
             #comp_st{
                 old_db = OldDb,
                 new_db = Db1,
+                new_id_tree = Db0#db.id_tree,
                 meta_fd = MetaFd,
-                retry = nil
+                retry = false
             }
     end,
     unlink(DataFd),
@@ -1146,6 +1160,7 @@ reset_compaction_file(Fd, Header) ->
 
 
 copy_purge_info(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
+    ?COMP_EVENT(purge_init),
     OldHdr = OldDb#db.header,
     NewHdr = NewDb#db.header,
     OldPurgeSeq = couch_db_header:purge_seq(OldHdr),
@@ -1154,6 +1169,7 @@ copy_purge_info(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
         {ok, PurgedIdsRevs} = couch_db:get_last_purged(OldDb),
         Opts = [{compression, NewDb#db.compression}],
         {ok, Ptr, _} = couch_file:append_term(NewDb#db.fd, PurgedIdsRevs, Opts),
+        ?COMP_EVENT(purge_done),
         CompSt#comp_st{
             new_db = NewDb#db{
                 header = couch_db_header:set(NewHdr, [
@@ -1163,6 +1179,7 @@ copy_purge_info(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
             }
         };
     true ->
+        ?COMP_EVENT(purge_done),
         CompSt
     end.
 
@@ -1194,7 +1211,7 @@ copy_compact(#comp_st{} = CompSt) ->
         AccUncopiedSize2 = AccUncopiedSize + ?term_size(DocInfo),
         if AccUncopiedSize2 >= BufferSize ->
             NewDb2 = copy_docs(
-                Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
+                Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), CompSt),
             AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2,
             if AccCopiedSize2 >= CheckpointAfter ->
                 CommNewDb2 = commit_compaction_data(NewDb2#db{update_seq=Seq}),
@@ -1210,15 +1227,17 @@ copy_compact(#comp_st{} = CompSt) ->
 
     TaskProps0 = [
         {type, database_compaction},
+        {phase, copy_docs},
         {retry, (Retry /= nil)},
         {database, Db#db.name},
         {progress, 0},
         {changes_done, 0},
         {total_changes, TotalChanges}
     ],
-    case (Retry /= nil) and couch_task_status:is_task_added() of
+    case Retry and couch_task_status:is_task_added() of
     true ->
         couch_task_status:update([
+            {phase, copy_docs},
             {retry, true},
             {progress, 0},
             {changes_done, 0},
@@ -1229,12 +1248,14 @@ copy_compact(#comp_st{} = CompSt) ->
         couch_task_status:set_update_frequency(500)
     end,
 
+    ?COMP_EVENT(seq_init),
     {ok, _, {NewDb2, Uncopied, _, _}} =
         couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun,
             {NewDb, [], 0, 0},
             [{start_key, NewDb#db.update_seq + 1}]),
 
-    NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
+    NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), CompSt),
+    ?COMP_EVENT(seq_done),
 
     % copy misc header values
     if NewDb3#db.security /= Db#db.security ->
@@ -1253,7 +1274,7 @@ copy_compact(#comp_st{} = CompSt) ->
     }.
 
 
-copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
+copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, CompSt) ->
     DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos],
     LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds),
     % COUCHDB-968, make sure we prune duplicates during compaction
@@ -1294,6 +1315,7 @@ copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
         TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts),
         NewActiveSize = FinalAS + TotalAttSize,
         NewExternalSize = FinalES + TotalAttSize,
+        ?COMP_EVENT(seq_copy),
         Info#full_doc_info{
             rev_tree = NewRevTree,
             sizes = #size_info{
@@ -1305,28 +1327,41 @@ copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
 
     NewInfos = stem_full_doc_infos(Db, NewInfos1),
     RemoveSeqs =
-    case Retry of
-    nil ->
-        [];
-    OldDocIdTree ->
+    case CompSt#comp_st.retry of
+    true ->
+        NewDocIdTree = CompSt#comp_st.new_id_tree,
         % Compaction is being rerun to catch up to writes during the
         % first pass. This means we may have docs that already exist
         % in the seq_tree in the .data file. Here we lookup any old
         % update_seqs so that they can be removed.
         Ids = [Id || #full_doc_info{id=Id} <- NewInfos],
-        Existing = couch_btree:lookup(OldDocIdTree, Ids),
-        [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing]
+        Existing = couch_btree:lookup(NewDocIdTree, Ids),
+        [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing];
+    false ->
+        []
     end,
 
     {ok, SeqTree} = couch_btree:add_remove(
             NewDb#db.seq_tree, NewInfos, RemoveSeqs),
 
-    FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) ->
-        {{Id, Seq}, FDI}
-    end, NewInfos),
-    {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs),
+    NewIdEms = case CompSt#comp_st.retry of
+        false ->
+            % If we're not in a retry compaction then there's
+            % no reason for us to copy doc infos to the emsort
+            % structure. We can just re-use the id_tree in the
+            % old db record.
+            NewDb#db.id_tree;
+        true ->
+            % We have to store doc infos because of the possible
+            % changes in the update sequences.
+            FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) ->
+                {{Id, Seq}, FDI}
+            end, NewInfos),
+            {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs),
+            IdEms
+    end,
     update_compact_task(length(NewInfos)),
-    NewDb#db{id_tree=IdEms, seq_tree=SeqTree}.
+    NewDb#db{id_tree=NewIdEms, seq_tree=SeqTree}.
 
 
 copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) ->
@@ -1417,8 +1452,105 @@ bind_id_tree(Db, Fd, State) ->
     Db#db{id_tree=IdBtree}.
 
 
+copy_doc_ids(#comp_st{retry = false} = CompSt) ->
+    copy_from_tree(CompSt);
+copy_doc_ids(#comp_st{retry = true} = CompSt) ->
+    Stages = [
+        fun sort_meta_data/1,
+        fun commit_compaction_data/1,
+        fun copy_meta_data/1
+    ],
+    lists:foldl(fun(Stage, St) -> Stage(St) end, CompSt, Stages).
+
+
+copy_from_tree(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
+    #db{
+        fd = OldFd,
+        header = OldHdr
+    } = OldDb,
+
+    TotalChanges = couch_db:count_changes_since(OldDb, 0),
+    couch_task_status:update([
+        {phase, copy_doc_ids},
+        {progress, 0},
+        {changes_done, 0},
+        {total_changes, TotalChanges}
+    ]),
+
+    % We're reopening a custom view of the id_tree to
+    % avoid the work of creating complete `#full_doc_info{}`
+    % records.
+    Compression = couch_compress:get_compression_method(),
+    OldIdTreeState = couch_btree:get_state(OldDb#db.id_tree),
+    {ok, OldIdTree} = couch_btree:open(OldIdTreeState, OldFd, [
+        {split, fun ?MODULE:btree_by_id_split/1},
+        {join, fun compact_id_join/2},
+        {reduce, fun ?MODULE:btree_by_id_reduce/2},
+        {compression, Compression}
+    ]),
+
+    NewIdTree = CompSt#comp_st.new_id_tree,
+    NewSeqTree = NewDb#db.seq_tree,
+
+    BufferSize = list_to_integer(
+        config:get("database_compaction", "doc_buffer_size", "524288")),
+
+    EnumByIdFun = fun({DocId, UpdateSeq}, _, {DstIdTree, Batch, Count}) ->
+        case Count >= 1000 of
+            true ->
+                DstIdTree2 = flush_docid_batch(
+                        Batch, NewSeqTree, DstIdTree, BufferSize),
+                update_compact_task(Count),
+                {ok, {DstIdTree2, [{DocId, UpdateSeq}], 1}};
+            false ->
+                {ok, {DstIdTree, [{DocId, UpdateSeq} | Batch], Count + 1}}
+        end
+    end,
+
+    ?COMP_EVENT(id_init),
+    {ok, _, {NewIdTree2, LastBatch, LastCount}} =
+        couch_btree:foldl(OldIdTree, EnumByIdFun, {NewIdTree, [], 0}, []),
+    NewIdTree3 = flush_docid_batch(
+            LastBatch, NewSeqTree, NewIdTree2, BufferSize),
+    ?COMP_EVENT(id_done),
+    update_compact_task(LastCount),
+    CompSt#comp_st{
+        new_db = NewDb#db{id_tree = NewIdTree3}
+    }.
+
+
+flush_docid_batch(DocIds, SeqTree, IdTree, MaxBufSize) ->
+    flush_docid_batch(DocIds, SeqTree, IdTree, MaxBufSize, [], 0).
+
+
+flush_docid_batch([], _, IdTree, _, Batch, _) ->
+    {ok, NewIdTree} = couch_btree:add(IdTree, Batch),
+    NewIdTree;
+
+flush_docid_batch(DocIds, SeqTree, IdTree, MaxBufSize, Buf, BufSize)
+        when BufSize >= MaxBufSize ->
+    {ok, NewIdTree} = couch_btree:add(IdTree, Buf),
+    flush_docid_batch(DocIds, SeqTree, NewIdTree, MaxBufSize, [], 0);
+
+flush_docid_batch(DocIds, SeqTree, IdTree, MaxBufSize, Buf, BufSize) ->
+    [{_Id, Seq} | Rest] = DocIds,
+    [{ok, #full_doc_info{} = FDI}] = couch_btree:lookup(SeqTree, [Seq]),
+    NewBuf = [FDI | Buf],
+    NewBufSize = BufSize + ?term_size(FDI),
+    ?COMP_EVENT(id_copy),
+    flush_docid_batch(Rest, SeqTree, IdTree, MaxBufSize, NewBuf, NewBufSize).
+
+
+compact_id_join(Id, {HighSeq, _, _}) ->
+    {Id, HighSeq};
+compact_id_join(Id, {HighSeq, _, _, _}) ->
+    {Id, HighSeq}.
+
+
 sort_meta_data(#comp_st{new_db = Db0} = CompSt) ->
+    ?COMP_EVENT(md_sort_init),
     {ok, Ems} = couch_emsort:merge(Db0#db.id_tree),
+    ?COMP_EVENT(md_sort_done),
     CompSt#comp_st{
         new_db = Db0#db{
             id_tree = Ems
@@ -1445,11 +1577,13 @@ copy_meta_data(#comp_st{new_db = Db} = CompSt) ->
         rem_seqs=[],
         infos=[]
     },
+    ?COMP_EVENT(md_copy_init),
     Acc = merge_docids(Iter, Acc0),
     {ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Acc#merge_st.infos),
     {ok, SeqTree} = couch_btree:add_remove(
         Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
     ),
+    ?COMP_EVENT(md_copy_done),
     CompSt#comp_st{
         new_db = Db#db{
             id_tree = IdTree,
@@ -1459,13 +1593,45 @@ copy_meta_data(#comp_st{new_db = Db} = CompSt) ->
 
 
 compact_final_sync(#comp_st{new_db = NewDb0} = CompSt) ->
+    ?COMP_EVENT(before_final_sync),
     NewHdr = db_to_header(NewDb0, NewDb0#db.header),
     NewDb1 = sync_header(NewDb0, NewHdr),
+    ?COMP_EVENT(after_final_sync),
     CompSt#comp_st{
         new_db = NewDb1
     }.
 
 
+verify_compaction(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
+    {ok, OldIdReds0} = couch_btree:full_reduce(OldDb#db.id_tree),
+    {ok, OldSeqReds} = couch_btree:full_reduce(OldDb#db.seq_tree),
+    {ok, NewIdReds0} = couch_btree:full_reduce(NewDb#db.id_tree),
+    {ok, NewSeqReds} = couch_btree:full_reduce(NewDb#db.seq_tree),
+    {
+        OldDocCount,
+        OldDelDocCount,
+        #size_info{external = OldExternalSize}
+    } = OldIdReds0,
+    OldIdReds = {OldDocCount, OldDelDocCount, OldExternalSize},
+    {
+        NewDocCount,
+        NewDelDocCount,
+        #size_info{external = NewExternalSize}
+    } = NewIdReds0,
+    NewIdReds = {NewDocCount, NewDelDocCount, NewExternalSize},
+    if NewIdReds == OldIdReds -> ok; true ->
+        Fmt1 = "Compacted id tree for ~s differs from source: ~p /= ~p",
+        couch_log:error(Fmt1, [couch_db:name(OldDb), NewIdReds, OldIdReds]),
+        exit({compaction_error, id_tree})
+    end,
+    if NewSeqReds == OldSeqReds -> ok; true ->
+        Fmt2 = "Compacted seq tree for ~s differs from source: ~p /= ~p",
+        couch_log:error(Fmt2, [couch_db:name(OldDb), NewSeqReds, OldSeqReds]),
+        exit({compaction_error, seq_tree})
+    end,
+    CompSt.
+
+
 merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 ->
     #merge_st{
         id_tree=IdTree0,
@@ -1489,6 +1655,7 @@ merge_docids(Iter, #merge_st{curr=Curr}=Acc) ->
                 rem_seqs = Seqs ++ Acc#merge_st.rem_seqs,
                 curr = NewCurr
             },
+            ?COMP_EVENT(md_copy_row),
             merge_docids(NextIter, Acc1);
         {finished, FDI, Seqs} ->
             Acc#merge_st{
diff --git a/src/couch/test/couch_db_updater_ev.erl b/src/couch/test/couch_db_updater_ev.erl
new file mode 100644
index 0000000..a50b24c
--- /dev/null
+++ b/src/couch/test/couch_db_updater_ev.erl
@@ -0,0 +1,106 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_db_updater_ev).
+
+
+-export([
+    init/0,
+    terminate/0,
+    clear/0,
+
+    set_wait/1,
+    set_crash/1,
+
+    event/1
+]).
+
+
+-define(TAB, couch_db_updater_ev_tab).
+
+
+init() ->
+    ets:new(?TAB, [set, public, named_table]).
+
+
+terminate() ->
+    ets:delete(?TAB).
+
+
+clear() ->
+    ets:delete_all_objects(?TAB).
+
+
+set_wait(Event) ->
+    Self = self(),
+    WaitFun = fun(_) ->
+        receive
+            {Self, go} ->
+                Self ! {self(), ok}
+        end,
+        ets:delete(?TAB, Event)
+    end,
+    ContinueFun = fun(Pid) ->
+        Pid ! {Self, go},
+        receive {Pid, ok} -> ok end
+    end,
+    ets:insert(?TAB, {Event, WaitFun}),
+    {ok, ContinueFun}.
+
+
+set_crash(Event) ->
+    Reason = {couch_db_updater_ev_crash, Event},
+    CrashFun = fun(_) -> exit(Reason) end,
+    ets:insert(?TAB, {Event, CrashFun}),
+    {ok, Reason}.
+
+
+event(Event) ->
+    NewEvent = case Event of
+        seq_init ->
+            put(?MODULE, 0),
+            Event;
+        seq_copy ->
+            Count = get(?MODULE),
+            put(?MODULE, Count + 1),
+            {seq_copy, Count};
+        id_init ->
+            put(?MODULE, 0),
+            Event;
+        id_copy ->
+            Count = get(?MODULE),
+            put(?MODULE, Count + 1),
+            {id_copy, Count};
+        md_copy_init ->
+            put(?MODULE, 0),
+            Event;
+        md_copy_row ->
+            Count = get(?MODULE),
+            put(?MODULE, Count + 1),
+            {md_copy_row, Count};
+        _ ->
+            Event
+    end,
+    handle_event(NewEvent).
+
+
+handle_event(Event) ->
+    try
+        case ets:lookup(?TAB, Event) of
+            [{Event, ActionFun}] ->
+                ActionFun(Event);
+            [] ->
+                ok
+        end
+    catch error:badarg ->
+        ok
+    end.
diff --git a/src/couch/test/couch_db_updater_tests.erl b/src/couch/test/couch_db_updater_tests.erl
new file mode 100644
index 0000000..67e2b83
--- /dev/null
+++ b/src/couch/test/couch_db_updater_tests.erl
@@ -0,0 +1,324 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_db_updater_tests).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
+
+-define(TIMEOUT_EUNIT, 60).
+-define(EV_MOD, couch_db_updater_ev).
+-define(INIT_DOCS, 2500).
+-define(WRITE_DOCS, 20).
+
+% The idea behind the tests in this module are to attempt to
+% cover the number of restart/recopy events during compaction
+% so that we can be as sure as possible that the compactor
+% is resilient to errors in the face of external conditions
+% (i.e., the VM rebooted). The single linear pass is easy enough
+% to prove, however restarting is important enough that we don't
+% want to waste work if a VM happens to bounce a lot.
+%
+% To try and cover as many restart situations we have created a
+% number of events in the compactor code that are present during
+% a test compiled version of the module. These events can then
+% be used (via meck) to introduce errors and coordinate writes
+% to the database while compaction is in progress.
+
+% This list of events is where we'll insert our errors.
+
+events() ->
+    [
+        init,               % The compactor process is spawned
+        files_opened,       % After compaction files have opened
+
+        purge_init,         % Just before apply purge changes
+        purge_done,         % Just after finish purge updates
+
+        % The firs phase is when we copy all document body and attachment
+        % data to the new database file in order of update sequence so
+        % that we can resume on crash.
+
+        seq_init,           % Before the first change is copied
+        {seq_copy, 0},      % After change N is copied
+        {seq_copy, ?INIT_DOCS div 2},
+        {seq_copy, ?INIT_DOCS - 2},
+        seq_done,           % After last change is copied
+
+        % The id copy phases come in two flavors. Before a compaction
+        % swap is attempted they're copied from the id_tree in the
+        % database being compacted. After a swap attempt they are
+        % stored in an emsort file on disk. Thus the two sets of
+        % related events here.
+
+        id_init,            % Just before the first docid is copied
+        {id_copy, 0},       % After docid N is copied
+        {id_copy, ?INIT_DOCS div 2},
+        {id_copy, ?INIT_DOCS - 2},
+        id_done,            % After all docids have been copied
+
+        md_sort_init,       % Just before metadata sort starts
+        md_sort_done,       % Justa after metadata sort finished
+        md_copy_init,       % Just before metadata copy starts
+        {md_copy_row, 0},   % After docid N is copied
+        {md_copy_row, ?WRITE_DOCS div 2},
+        {md_copy_row, ?WRITE_DOCS - 2},
+        md_copy_done,       % Just after the last docid is copied
+
+        % And then the final steps before we finish
+
+        before_final_sync,  % Just before final sync
+        after_final_sync,   % Just after the final sync
+        before_notify       % Just before the final notification
+    ].
+
+% Mark which evens only happen when documents are present
+
+requires_docs({seq_copy, _}) -> true;
+requires_docs({id_copy, _}) -> true;
+requires_docs(md_sort_init) -> true;
+requires_docs(md_sort_done) -> true;
+requires_docs(md_copy_init) -> true;
+requires_docs({md_copy_row, _}) -> true;
+requires_docs(md_copy_done) -> true;
+requires_docs(_) -> false.
+
+
+% Mark which events only happen when there's write activity during
+% a compaction.
+
+requires_write(md_sort_init) -> true;
+requires_write(md_sort_done) -> true;
+requires_write(md_copy_init) -> true;
+requires_write({md_copy_row, _}) -> true;
+requires_write(md_copy_done) -> true;
+requires_write(_) -> false.
+
+
+setup() ->
+    purge_module(),
+    ?EV_MOD:init(),
+    test_util:start_couch().
+
+
+teardown(Ctx) ->
+    test_util:stop_couch(Ctx),
+    ?EV_MOD:terminate().
+
+
+start_empty_db_test(_Event) ->
+    ?EV_MOD:clear(),
+    DbName = ?tempdb(),
+    {ok, _} = couch_db:create(DbName, [?ADMIN_CTX]),
+    DbName.
+
+
+start_populated_db_test(Event) ->
+    DbName = start_empty_db_test(Event),
+    {ok, Db} = couch_db:open_int(DbName, []),
+    try
+        populate_db(Db, ?INIT_DOCS)
+    after
+        couch_db:close(Db)
+    end,
+    DbName.
+
+
+stop_test(_Event, DbName) ->
+    couch_server:delete(DbName, [?ADMIN_CTX]).
+
+
+static_empty_db_test_() ->
+    FiltFun = fun(E) ->
+        not (requires_docs(E) or requires_write(E))
+    end,
+    Events = lists:filter(FiltFun, events()) -- [init],
+    {
+        "Idle empty database",
+        {
+            setup,
+            fun setup/0,
+            fun teardown/1,
+            [
+                {
+                    foreachx,
+                    fun start_empty_db_test/1,
+                    fun stop_test/2,
+                    [{Event, fun run_static_init/2} || Event <- Events]
+                }
+            ]
+        }
+    }.
+
+
+static_populated_db_test_() ->
+    FiltFun = fun(E) -> not requires_write(E) end,
+    Events = lists:filter(FiltFun, events()) -- [init],
+    {
+        "Idle populated database",
+        {
+            setup,
+            fun setup/0,
+            fun teardown/1,
+            [
+                {
+                    foreachx,
+                    fun start_populated_db_test/1,
+                    fun stop_test/2,
+                    [{Event, fun run_static_init/2} || Event <- Events]
+                }
+            ]
+        }
+    }.
+
+
+dynamic_empty_db_test_() ->
+    FiltFun = fun(E) -> not requires_docs(E) end,
+    Events = lists:filter(FiltFun, events()) -- [init],
+    {
+        "Writes to empty database",
+        {
+            setup,
+            fun setup/0,
+            fun teardown/1,
+            [
+                {
+                    foreachx,
+                    fun start_empty_db_test/1,
+                    fun stop_test/2,
+                    [{Event, fun run_dynamic_init/2} || Event <- Events]
+                }
+            ]
+        }
+    }.
+
+
+dynamic_populated_db_test_() ->
+    Events = events() -- [init],
+    {
+        "Writes to populated database",
+        {
+            setup,
+            fun setup/0,
+            fun teardown/1,
+            [
+                {
+                    foreachx,
+                    fun start_populated_db_test/1,
+                    fun stop_test/2,
+                    [{Event, fun run_dynamic_init/2} || Event <- Events]
+                }
+            ]
+        }
+    }.
+
+
+run_static_init(Event, DbName) ->
+    Name = lists:flatten(io_lib:format("~p", [Event])),
+    Test = {timeout, ?TIMEOUT_EUNIT, ?_test(run_static(Event, DbName))},
+    {Name, Test}.
+
+
+run_static(Event, DbName) ->
+    {ok, ContinueFun} = ?EV_MOD:set_wait(init),
+    {ok, Reason} = ?EV_MOD:set_crash(Event),
+    {ok, Db} = couch_db:open_int(DbName, []),
+    Ref = couch_db:monitor(Db),
+    {ok, CPid} = couch_db:start_compact(Db),
+    ContinueFun(CPid),
+    receive
+        {'DOWN', Ref, _, _, Reason} ->
+            wait_db_cleared(Db)
+    end,
+    run_successful_compaction(DbName),
+    couch_db:close(Db).
+
+
+run_dynamic_init(Event, DbName) ->
+    Name = lists:flatten(io_lib:format("~p", [Event])),
+    Test = {timeout, ?TIMEOUT_EUNIT, ?_test(run_dynamic(Event, DbName))},
+    {Name, Test}.
+
+
+run_dynamic(Event, DbName) ->
+    {ok, ContinueFun} = ?EV_MOD:set_wait(init),
+    {ok, Reason} = ?EV_MOD:set_crash(Event),
+    {ok, Db} = couch_db:open_int(DbName, []),
+    Ref = couch_db:monitor(Db),
+    {ok, CPid} = couch_db:start_compact(Db),
+    ok = populate_db(Db, 10),
+    ContinueFun(CPid),
+    receive
+        {'DOWN', Ref, _, _, Reason} ->
+            wait_db_cleared(Db)
+    end,
+    run_successful_compaction(DbName),
+    couch_db:close(Db).
+
+
+run_successful_compaction(DbName) ->
+    ?EV_MOD:clear(),
+    {ok, ContinueFun} = ?EV_MOD:set_wait(init),
+    {ok, Db} = couch_db:open_int(DbName, []),
+    {ok, CPid} = couch_db:start_compact(Db),
+    Ref = erlang:monitor(process, CPid),
+    ContinueFun(CPid),
+    receive
+        {'DOWN', Ref, _, _, normal} -> ok
+    end,
+    ?assertMatch({ok, _}, gen_server:call(Db#db.main_pid, get_db)),
+    couch_db:close(Db).
+
+
+wait_db_cleared(Db) ->
+    wait_db_cleared(Db, 5).
+
+
+wait_db_cleared(Db, N) when N < 0 ->
+    erlang:error({db_clear_timeout, couch_db:name(Db)});
+
+wait_db_cleared(Db, N) ->
+    case ets:lookup(couch_dbs, couch_db:name(Db)) of
+        [] ->
+            ok;
+        [NewDb] when NewDb#db.main_pid /= Db#db.main_pid ->
+            ok;
+        _ ->
+            timer:sleep(100),
+            wait_db_cleared(Db, N - 1)
+    end.
+
+
+populate_db(_Db, NumDocs) when NumDocs =< 0 ->
+    ok;
+populate_db(Db, NumDocs) ->
+    String = [$a || _ <- lists:seq(1, erlang:min(NumDocs, 500))],
+    Docs = lists:map(
+        fun(_) ->
+            couch_doc:from_json_obj({[
+                {<<"_id">>, couch_uuids:random()},
+                {<<"string">>, list_to_binary(String)}
+            ]})
+        end,
+        lists:seq(1, 500)),
+    {ok, _} = couch_db:update_docs(Db, Docs, []),
+    populate_db(Db, NumDocs - 500).
+
+
+purge_module() ->
+    case code:which(couch_db_updater) of
+        cover_compiled ->
+            ok;
+        _ ->
+            code:delete(couch_db_updater),
+            code:purge(couch_db_updater)
+    end.

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.