You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/09/11 20:56:51 UTC

[couchdb] 03/06: Implement compactor test suite

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch compactor-optimize-emsort
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 98d35f2615c8170ba24d1bc7408be2f07cebd0dc
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Sep 6 16:26:29 2017 -0500

    Implement compactor test suite
---
 src/couch/src/couch_db_updater.erl        |  61 +++++-
 src/couch/test/couch_db_updater_ev.erl    | 106 ++++++++++
 src/couch/test/couch_db_updater_tests.erl | 317 ++++++++++++++++++++++++++++++
 3 files changed, 483 insertions(+), 1 deletion(-)

diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 3142516..6bcf727 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -1039,11 +1039,21 @@ check_md5(Md5, Md5) -> ok;
 check_md5(_, _) -> throw(md5_mismatch).
 
 
+-ifdef(TEST).
+-define(COMP_EVENT(Name),
+        couch_db_updater_ev:event(Name)).
+-else.
+-define(COMP_EVENT(Name), ignore).
+-endif.
+
+
 start_copy_compact(#db{}=Db) ->
     erlang:put(io_priority, {db_compact, Db#db.name}),
     couch_log:debug("Compaction process spawned for db \"~s\"", [Db#db.name]),
 
+    ?COMP_EVENT(init),
     {ok, InitCompSt} = open_compaction_files(Db),
+    ?COMP_EVENT(files_opened),
 
     Stages = [
         fun copy_purge_info/1,
@@ -1052,7 +1062,8 @@ start_copy_compact(#db{}=Db) ->
         fun sort_meta_data/1,
         fun commit_compaction_data/1,
         fun copy_meta_data/1,
-        fun compact_final_sync/1
+        fun compact_final_sync/1,
+        fun verify_compaction/1
     ],
 
     FinalCompSt = lists:foldl(fun(Stage, CompSt) ->
@@ -1067,6 +1078,7 @@ start_copy_compact(#db{}=Db) ->
     close_db(FinalNewDb),
     ok = couch_file:close(MetaFd),
 
+    ?COMP_EVENT(before_notify),
     gen_server:cast(Db#db.main_pid, {compact_done, FinalNewDb#db.filepath}).
 
 
@@ -1146,6 +1158,7 @@ reset_compaction_file(Fd, Header) ->
 
 
 copy_purge_info(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
+    ?COMP_EVENT(purge_init),
     OldHdr = OldDb#db.header,
     NewHdr = NewDb#db.header,
     OldPurgeSeq = couch_db_header:purge_seq(OldHdr),
@@ -1154,6 +1167,7 @@ copy_purge_info(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
         {ok, PurgedIdsRevs} = couch_db:get_last_purged(OldDb),
         Opts = [{compression, NewDb#db.compression}],
         {ok, Ptr, _} = couch_file:append_term(NewDb#db.fd, PurgedIdsRevs, Opts),
+        ?COMP_EVENT(purge_done),
         CompSt#comp_st{
             new_db = NewDb#db{
                 header = couch_db_header:set(NewHdr, [
@@ -1163,6 +1177,7 @@ copy_purge_info(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
             }
         };
     true ->
+        ?COMP_EVENT(purge_done),
         CompSt
     end.
 
@@ -1229,6 +1244,7 @@ copy_compact(#comp_st{} = CompSt) ->
         couch_task_status:set_update_frequency(500)
     end,
 
+    ?COMP_EVENT(seq_init),
     {ok, _, {NewDb2, Uncopied, _, _}} =
         couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun,
             {NewDb, [], 0, 0},
@@ -1236,6 +1252,8 @@ copy_compact(#comp_st{} = CompSt) ->
 
     NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
 
+    ?COMP_EVENT(seq_done),
+
     % copy misc header values
     if NewDb3#db.security /= Db#db.security ->
         {ok, Ptr, _} = couch_file:append_term(
@@ -1294,6 +1312,7 @@ copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
         TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts),
         NewActiveSize = FinalAS + TotalAttSize,
         NewExternalSize = FinalES + TotalAttSize,
+        ?COMP_EVENT(seq_copy),
         Info#full_doc_info{
             rev_tree = NewRevTree,
             sizes = #size_info{
@@ -1420,7 +1439,9 @@ bind_id_tree(Db, Fd, State) ->
 
 
 sort_meta_data(#comp_st{new_db = Db0} = CompSt) ->
+    ?COMP_EVENT(md_sort_init),
     {ok, Ems} = couch_emsort:merge(Db0#db.id_tree),
+    ?COMP_EVENT(md_sort_done),
     CompSt#comp_st{
         new_db = Db0#db{
             id_tree = Ems
@@ -1447,11 +1468,13 @@ copy_meta_data(#comp_st{new_db = Db} = CompSt) ->
         rem_seqs=[],
         infos=[]
     },
+    ?COMP_EVENT(md_copy_init),
     Acc = merge_docids(Iter, Acc0),
     {ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Acc#merge_st.infos),
     {ok, SeqTree} = couch_btree:add_remove(
         Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
     ),
+    ?COMP_EVENT(md_copy_done),
     CompSt#comp_st{
         new_db = Db#db{
             id_tree = IdTree,
@@ -1461,13 +1484,48 @@ copy_meta_data(#comp_st{new_db = Db} = CompSt) ->
 
 
 compact_final_sync(#comp_st{new_db = NewDb0} = CompSt) ->
+    ?COMP_EVENT(before_final_sync),
     NewHdr = db_to_header(NewDb0, NewDb0#db.header),
     NewDb1 = sync_header(NewDb0, NewHdr),
+    ?COMP_EVENT(after_final_sync),
     CompSt#comp_st{
         new_db = NewDb1
     }.
 
 
+verify_compaction(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
+    {ok, OldIdReds0} = couch_btree:full_reduce(OldDb#db.id_tree),
+    {ok, OldSeqReds} = couch_btree:full_reduce(OldDb#db.seq_tree),
+    {ok, NewIdReds0} = couch_btree:full_reduce(NewDb#db.id_tree),
+    {ok, NewSeqReds} = couch_btree:full_reduce(NewDb#db.seq_tree),
+    {
+        OldDocCount,
+        OldDelDocCount,
+        OldSizes
+    } = OldIdReds0,
+    #size_info{
+        external = OldExternalSize
+    } = upgrade_sizes(OldSizes),
+    OldIdReds = {OldDocCount, OldDelDocCount, OldExternalSize},
+    {
+        NewDocCount,
+        NewDelDocCount,
+        #size_info{external = NewExternalSize}
+    } = NewIdReds0,
+    NewIdReds = {NewDocCount, NewDelDocCount, NewExternalSize},
+    if NewIdReds == OldIdReds -> ok; true ->
+        Fmt1 = "Compacted id tree for ~s differs from source: ~p /= ~p",
+        couch_log:error(Fmt1, [couch_db:name(OldDb), NewIdReds, OldIdReds]),
+        exit({compaction_error, id_tree})
+    end,
+    if NewSeqReds == OldSeqReds -> ok; true ->
+        Fmt2 = "Compacted seq tree for ~s differs from source: ~p /= ~p",
+        couch_log:error(Fmt2, [couch_db:name(OldDb), NewSeqReds, OldSeqReds]),
+        exit({compaction_error, seq_tree})
+    end,
+    CompSt.
+
+
 merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 ->
     #merge_st{
         id_tree=IdTree0,
@@ -1491,6 +1549,7 @@ merge_docids(Iter, #merge_st{curr=Curr}=Acc) ->
                 rem_seqs = Seqs ++ Acc#merge_st.rem_seqs,
                 curr = NewCurr
             },
+            ?COMP_EVENT(md_copy_row),
             merge_docids(NextIter, Acc1);
         {finished, FDI, Seqs} ->
             Acc#merge_st{
diff --git a/src/couch/test/couch_db_updater_ev.erl b/src/couch/test/couch_db_updater_ev.erl
new file mode 100644
index 0000000..3a57141
--- /dev/null
+++ b/src/couch/test/couch_db_updater_ev.erl
@@ -0,0 +1,106 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_db_updater_ev).
+
+
+-export([
+    init/0,
+    terminate/0,
+    clear/0,
+
+    set_wait/1,
+    set_crash/1,
+
+    event/1
+]).
+
+
+-define(TAB, couch_db_updater_ev_tab).
+
+
+init() ->
+    ets:new(?TAB, [set, public, named_table]).
+
+
+terminate() ->
+    ets:delete(?TAB).
+
+
+clear() ->
+    ets:delete_all_objects(?TAB).
+
+
+set_wait(Event) ->
+    Self = self(),
+    WaitFun = fun(_) ->
+        receive
+            {Self, go} ->
+                Self ! {self(), ok}
+        end,
+        ets:delete(?TAB, Event)
+    end,
+    ContinueFun = fun(Pid) ->
+        Pid ! {Self, go},
+        receive {Pid, ok} -> ok end
+    end,
+    ets:insert(?TAB, {Event, WaitFun}),
+    {ok, ContinueFun}.
+
+
+set_crash(Event) ->
+    Reason = {couch_db_updater_ev_crash, Event},
+    CrashFun = fun(_) -> exit(Reason) end,
+    ets:insert(?TAB, {Event, CrashFun}),
+    {ok, Reason}.
+
+
+event(Event) ->
+    NewEvent = case Event of
+        seq_init ->
+            put(?MODULE, 0),
+            Event;
+        seq_copy ->
+            Count = get(?MODULE),
+            put(?MODULE, Count + 1),
+            {seq_copy, Count};
+        id_init ->
+            put(?MODULE, 0),
+            Event;
+        id_copy ->
+            Count = get(?MODULE),
+            put(?MODULE, Count + 1),
+            {id_copy, Count};
+        md_copy_init ->
+            put(?MODULE, 0),
+            Event;
+        md_copy_row ->
+            Count = get(?MODULE),
+            put(?MODULE, Count + 1),
+            {md_copy_row, Count};
+        _ ->
+            Event
+    end,
+    handle_event(NewEvent).
+
+
+handle_event(Event) ->
+    try
+        case ets:lookup(?TAB, Event) of
+            [{Event, ActionFun}] ->
+                ActionFun(Event);
+            [] ->
+                ok
+        end
+    catch error:badarg ->
+        ok
+    end.
\ No newline at end of file
diff --git a/src/couch/test/couch_db_updater_tests.erl b/src/couch/test/couch_db_updater_tests.erl
new file mode 100644
index 0000000..9ad492e
--- /dev/null
+++ b/src/couch/test/couch_db_updater_tests.erl
@@ -0,0 +1,317 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_db_updater_tests).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
+
+-define(TIMEOUT_EUNIT, 60).
+-define(EV_MOD, couch_db_updater_ev).
+-define(INIT_DOCS, 2500).
+-define(WRITE_DOCS, 20).
+
+% The idea behind the tests in this module are to attempt to
+% cover the number of restart/recopy events during compaction
+% so that we can be as sure as possible that the compactor
+% is resilient to errors in the face of external conditions
+% (i.e., the VM rebooted). The single linear pass is easy enough
+% to prove, however restarting is important enough that we don't
+% want to waste work if a VM happens to bounce a lot.
+%
+% To try and cover as many restart situations we have created a
+% number of events in the compactor code that are present during
+% a test compiled version of the module. These events can then
+% be used (via meck) to introduce errors and coordinate writes
+% to the database while compaction is in progress.
+
+% This list of events is where we'll insert our errors.
+
+events() ->
+    [
+        init,               % The compactor process is spawned
+        files_opened,       % After compaction files have opened
+
+        purge_init,         % Just before apply purge changes
+        purge_done,         % Just after finish purge updates
+
+        % The firs phase is when we copy all document body and attachment
+        % data to the new database file in order of update sequence so
+        % that we can resume on crash.
+
+        seq_init,           % Before the first change is copied
+        {seq_copy, 0},      % After change N is copied
+        {seq_copy, ?INIT_DOCS div 2},
+        {seq_copy, ?INIT_DOCS - 2},
+        seq_done,           % After last change is copied
+
+        % The id copy phases come in two flavors. Before a compaction
+        % swap is attempted they're copied from the id_tree in the
+        % database being compacted. After a swap attempt they are
+        % stored in an emsort file on disk. Thus the two sets of
+        % related events here.
+
+        md_sort_init,       % Just before metadata sort starts
+        md_sort_done,       % Justa after metadata sort finished
+        md_copy_init,       % Just before metadata copy starts
+        {md_copy_row, 0},   % After docid N is copied
+        {md_copy_row, ?INIT_DOCS div 2},
+        {md_copy_row, ?INIT_DOCS - 2},
+        md_copy_done,       % Just after the last docid is copied
+
+        % And then the final steps before we finish
+
+        before_final_sync,  % Just before final sync
+        after_final_sync,   % Just after the final sync
+        before_notify       % Just before the final notification
+    ].
+
+% Mark which evens only happen when documents are present
+
+requires_docs({seq_copy, _}) -> true;
+requires_docs(md_sort_init) -> true;
+requires_docs(md_sort_done) -> true;
+requires_docs(md_copy_init) -> true;
+requires_docs({md_copy_row, _}) -> true;
+requires_docs(md_copy_done) -> true;
+requires_docs(_) -> false.
+
+
+% Mark which events only happen when there's write activity during
+% a compaction.
+
+requires_write(md_sort_init) -> true;
+requires_write(md_sort_done) -> true;
+requires_write(md_copy_init) -> true;
+requires_write({md_copy_row, _}) -> true;
+requires_write(md_copy_done) -> true;
+requires_write(_) -> false.
+
+
+setup() ->
+    purge_module(),
+    ?EV_MOD:init(),
+    test_util:start_couch().
+
+
+teardown(Ctx) ->
+    test_util:stop_couch(Ctx),
+    ?EV_MOD:terminate().
+
+
+start_empty_db_test(_Event) ->
+    ?EV_MOD:clear(),
+    DbName = ?tempdb(),
+    {ok, _} = couch_db:create(DbName, [?ADMIN_CTX]),
+    DbName.
+
+
+start_populated_db_test(Event) ->
+    DbName = start_empty_db_test(Event),
+    {ok, Db} = couch_db:open_int(DbName, []),
+    try
+        populate_db(Db, ?INIT_DOCS)
+    after
+        couch_db:close(Db)
+    end,
+    DbName.
+
+
+stop_test(_Event, DbName) ->
+    couch_server:delete(DbName, [?ADMIN_CTX]).
+
+
+static_empty_db_test_() ->
+    FiltFun = fun(E) ->
+        not (requires_docs(E) or requires_write(E))
+    end,
+    Events = lists:filter(FiltFun, events()) -- [init],
+    {
+        "Idle empty database",
+        {
+            setup,
+            fun setup/0,
+            fun teardown/1,
+            [
+                {
+                    foreachx,
+                    fun start_empty_db_test/1,
+                    fun stop_test/2,
+                    [{Event, fun run_static_init/2} || Event <- Events]
+                }
+            ]
+        }
+    }.
+
+
+static_populated_db_test_() ->
+    FiltFun = fun(E) -> not requires_write(E) end,
+    Events = lists:filter(FiltFun, events()) -- [init],
+    {
+        "Idle populated database",
+        {
+            setup,
+            fun setup/0,
+            fun teardown/1,
+            [
+                {
+                    foreachx,
+                    fun start_populated_db_test/1,
+                    fun stop_test/2,
+                    [{Event, fun run_static_init/2} || Event <- Events]
+                }
+            ]
+        }
+    }.
+
+
+dynamic_empty_db_test_() ->
+    FiltFun = fun(E) -> not requires_docs(E) end,
+    Events = lists:filter(FiltFun, events()) -- [init],
+    {
+        "Writes to empty database",
+        {
+            setup,
+            fun setup/0,
+            fun teardown/1,
+            [
+                {
+                    foreachx,
+                    fun start_empty_db_test/1,
+                    fun stop_test/2,
+                    [{Event, fun run_dynamic_init/2} || Event <- Events]
+                }
+            ]
+        }
+    }.
+
+
+dynamic_populated_db_test_() ->
+    Events = events() -- [init],
+    {
+        "Writes to populated database",
+        {
+            setup,
+            fun setup/0,
+            fun teardown/1,
+            [
+                {
+                    foreachx,
+                    fun start_populated_db_test/1,
+                    fun stop_test/2,
+                    [{Event, fun run_dynamic_init/2} || Event <- Events]
+                }
+            ]
+        }
+    }.
+
+
+run_static_init(Event, DbName) ->
+    Name = lists:flatten(io_lib:format("~p", [Event])),
+    Test = {timeout, ?TIMEOUT_EUNIT, ?_test(run_static(Event, DbName))},
+    {Name, Test}.
+
+
+run_static(Event, DbName) ->
+    {ok, ContinueFun} = ?EV_MOD:set_wait(init),
+    {ok, Reason} = ?EV_MOD:set_crash(Event),
+    {ok, Db} = couch_db:open_int(DbName, []),
+    Ref = couch_db:monitor(Db),
+    {ok, CPid} = couch_db:start_compact(Db),
+    ContinueFun(CPid),
+    receive
+        {'DOWN', Ref, _, _, Reason} ->
+            wait_db_cleared(Db)
+    end,
+    run_successful_compaction(DbName),
+    couch_db:close(Db).
+
+
+run_dynamic_init(Event, DbName) ->
+    Name = lists:flatten(io_lib:format("~p", [Event])),
+    Test = {timeout, ?TIMEOUT_EUNIT, ?_test(run_dynamic(Event, DbName))},
+    {Name, Test}.
+
+
+run_dynamic(Event, DbName) ->
+    {ok, ContinueFun} = ?EV_MOD:set_wait(init),
+    {ok, Reason} = ?EV_MOD:set_crash(Event),
+    {ok, Db} = couch_db:open_int(DbName, []),
+    Ref = couch_db:monitor(Db),
+    {ok, CPid} = couch_db:start_compact(Db),
+    ok = populate_db(Db, 10),
+    ContinueFun(CPid),
+    receive
+        {'DOWN', Ref, _, _, Reason} ->
+            wait_db_cleared(Db)
+    end,
+    run_successful_compaction(DbName),
+    couch_db:close(Db).
+
+
+run_successful_compaction(DbName) ->
+    ?EV_MOD:clear(),
+    {ok, ContinueFun} = ?EV_MOD:set_wait(init),
+    {ok, Db} = couch_db:open_int(DbName, []),
+    {ok, CPid} = couch_db:start_compact(Db),
+    Ref = erlang:monitor(process, CPid),
+    ContinueFun(CPid),
+    receive
+        {'DOWN', Ref, _, _, normal} -> ok
+    end,
+    ?assertMatch({ok, _}, gen_server:call(Db#db.main_pid, get_db)),
+    couch_db:close(Db).
+
+
+wait_db_cleared(Db) ->
+    wait_db_cleared(Db, 5).
+
+
+wait_db_cleared(Db, N) when N < 0 ->
+    erlang:error({db_clear_timeout, couch_db:name(Db)});
+
+wait_db_cleared(Db, N) ->
+    case ets:lookup(couch_dbs, couch_db:name(Db)) of
+        [] ->
+            ok;
+        [NewDb] when NewDb#db.main_pid /= Db#db.main_pid ->
+            ok;
+        _ ->
+            timer:sleep(100),
+            wait_db_cleared(Db, N - 1)
+    end.
+
+
+populate_db(_Db, NumDocs) when NumDocs =< 0 ->
+    ok;
+populate_db(Db, NumDocs) ->
+    String = [$a || _ <- lists:seq(1, erlang:min(NumDocs, 500))],
+    Docs = lists:map(
+        fun(_) ->
+            couch_doc:from_json_obj({[
+                {<<"_id">>, couch_uuids:random()},
+                {<<"string">>, list_to_binary(String)}
+            ]})
+        end,
+        lists:seq(1, 500)),
+    {ok, _} = couch_db:update_docs(Db, Docs, []),
+    populate_db(Db, NumDocs - 500).
+
+
+purge_module() ->
+    case code:which(couch_db_updater) of
+        cover_compiled ->
+            ok;
+        _ ->
+            code:delete(couch_db_updater),
+            code:purge(couch_db_updater)
+    end.
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.