You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/09/06 21:48:06 UTC

[couchdb] branch compactor-optimize-emsort updated (356de96 -> 04f1adf)

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a change to branch compactor-optimize-emsort
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


 discard 356de96  Implement compactor test suite
 discard 1da8ee4  ss - couch_emsort fixups
 discard 65984c3  Optimize btree node writes
 discard b102201  splity splitty - couch_emsort
 discard e3d9249  ss - couch_file append_bins
 discard 1c146dc  TMP - append_bins for couch_file
     new ff23e13  Implement compactor test suite
     new a30b950  Add multi-append functions to couch_file
     new ac940a9  Optimize btree node writes
     new 04f1adf  Optimize couch_emsort writes

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (356de96)
            \
             N -- N -- N   refs/heads/compactor-optimize-emsort (04f1adf)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/couch/src/couch_file.erl | 21 ---------------------
 1 file changed, 21 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
['"commits@couchdb.apache.org" <co...@couchdb.apache.org>'].

[couchdb] 04/04: Optimize couch_emsort writes

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch compactor-optimize-emsort
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 04f1adf233d85ae77a6c74cdaaa2f741bafc0bea
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Sep 6 16:46:04 2017 -0500

    Optimize couch_emsort writes
    
    This makes two related changes to couch_emsort. First it uses the new
    couch_file:append_terms/2 function to write out all values to disk so
    that they are not re-written during the merge phase before streaming
    key/values from disk.
    
    Secondly, this buffers KVs added to the emsort structure so that our
    chains are a consistent size. This helps to minimize the number of
    merge phases executed during decimation.
---
 src/couch/src/couch_db_updater.erl |  5 ++--
 src/couch/src/couch_emsort.erl     | 57 ++++++++++++++++++++++++++++----------
 2 files changed, 46 insertions(+), 16 deletions(-)

diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 33c17bb..0a04c65 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -1401,8 +1401,9 @@ commit_compaction_data(#db{header=OldHeader}=Db0, Fd) ->
     % fd instead of the Filepath stuff that commit_data/2
     % does.
     DataState = couch_db_header:id_tree_state(OldHeader),
-    MetaFd = couch_emsort:get_fd(Db0#db.id_tree),
-    MetaState = couch_emsort:get_state(Db0#db.id_tree),
+    {ok, Ems} = couch_emsort:flush(Db0#db.id_tree),
+    MetaFd = couch_emsort:get_fd(Ems),
+    MetaState = couch_emsort:get_state(Ems),
     Db1 = bind_id_tree(Db0, Db0#db.fd, DataState),
     Header = db_to_header(Db1, OldHeader),
     CompHeader = #comp_header{
diff --git a/src/couch/src/couch_emsort.erl b/src/couch/src/couch_emsort.erl
index 2a25a23..b70f8d3 100644
--- a/src/couch/src/couch_emsort.erl
+++ b/src/couch/src/couch_emsort.erl
@@ -129,13 +129,14 @@
 %     CA3                  CD3
 %
 
--export([open/1, open/2, get_fd/1, get_state/1]).
+-export([open/1, open/2, get_fd/1, get_state/1, flush/1]).
 -export([add/2, merge/1, sort/1, iter/1, next/1]).
 
 
 -record(ems, {
     fd,
     root,
+    curr_batch = [],
     bb_chunk = 10,
     chain_chunk = 100
 }).
@@ -163,15 +164,35 @@ get_fd(#ems{fd=Fd}) ->
     Fd.
 
 
-get_state(#ems{root=Root}) ->
+get_state(#ems{root=Root, curr_batch=[]}) ->
     Root.
 
 
 add(Ems, []) ->
     {ok, Ems};
 add(Ems, KVs) ->
-    Pos = write_kvs(Ems, KVs),
-    {ok, add_bb_pos(Ems, Pos)}.
+    #ems{
+        fd = Fd,
+        curr_batch = CurrBatch,
+        bb_chunk = BBChunk,
+        chain_chunk = ChainChunk
+    } = Ems,
+    KPs = write_values(Fd, KVs),
+    Limit = BBChunk * ChainChunk,
+    case length(KPs) + length(CurrBatch) > Limit of
+        true ->
+            flush_kps(Ems, KPs ++ CurrBatch);
+        false ->
+            {ok, Ems#ems{
+                curr_batch = KPs ++ CurrBatch
+            }}
+    end.
+
+
+flush(#ems{curr_batch=[]}=Ems) ->
+    {ok, Ems};
+flush(Ems) ->
+    flush_kps(Ems, Ems#ems.curr_batch).
 
 
 sort(#ems{}=Ems) ->
@@ -182,7 +203,8 @@ sort(#ems{}=Ems) ->
 merge(#ems{root=undefined}=Ems) ->
     {ok, Ems};
 merge(#ems{}=Ems) ->
-    {ok, decimate(Ems)}.
+    {ok, FlushedEms} = flush(Ems),
+    {ok, decimate(FlushedEms)}.
 
 
 iter(#ems{root=undefined}=Ems) ->
@@ -197,8 +219,9 @@ iter(#ems{root={_, _}}) ->
 next({_Ems, []}) ->
     finished;
 next({Ems, Chains}) ->
-    {KV, RestChains} = choose_kv(small, Ems, Chains),
-    {ok, KV, {Ems, RestChains}}.
+    {{Key, Pos}, RestChains} = choose_kv(small, Ems, Chains),
+    {ok, Val} = couch_file:pread_term(Ems#ems.fd, Pos),
+    {ok, {Key, Val}, {Ems, RestChains}}.
 
 
 add_bb_pos(#ems{root=undefined}=Ems, Pos) ->
@@ -208,16 +231,22 @@ add_bb_pos(#ems{root={BB, Prev}}=Ems, Pos) ->
     Ems#ems{root={NewBB, NewPrev}}.
 
 
-write_kvs(Ems, KVs) ->
+write_values(Fd, KVs) ->
+    {Keys, Vals} = lists:unzip(KVs),
+    {ok, PosSizes} = couch_file:append_terms(Fd, Vals),
+    {Pos, _} = lists:unzip(PosSizes),
+    lists:zip(Keys, Pos).
+
+
+flush_kps(#ems{fd=Fd}=Ems, KPs) ->
     % Write the list of KV's to disk in sorted order in chunks
     % of 100. Also make sure that the order is so that they
     % can be streamed in asscending order.
-    {LastKVs, LastPos} =
-    lists:foldr(fun(KV, Acc) ->
-        append_item(Ems, Acc, KV, Ems#ems.chain_chunk)
-    end, {[], nil}, lists:sort(KVs)),
-    {ok, Final, _} = couch_file:append_term(Ems#ems.fd, {LastKVs, LastPos}),
-    Final.
+    {LastKPs, LastPos} = lists:foldr(fun(KP, Acc) ->
+        append_item(Ems, Acc, KP, Ems#ems.chain_chunk)
+    end, {[], nil}, lists:sort(KPs)),
+    {ok, Final, _} = couch_file:append_term(Fd, {LastKPs, LastPos}),
+    {ok, add_bb_pos(Ems#ems{curr_batch = []}, Final)}.
 
 
 decimate(#ems{root={_BB, nil}}=Ems) ->

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.

[couchdb] 02/04: Add multi-append functions to couch_file

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch compactor-optimize-emsort
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit a30b95015d26029a15e209ee02e388171dac4d3a
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Sep 6 13:36:16 2017 -0500

    Add multi-append functions to couch_file
    
    These functions allow the caller to append multiple terms or binaries to
    a file and receive the file position and size for each individual
    element. This is to optimize throughput in situations where we want to
    write multiple pieces of independant data.
---
 src/couch/src/couch_file.erl | 31 ++++++++++++++++++++++++++++++-
 1 file changed, 30 insertions(+), 1 deletion(-)

diff --git a/src/couch/src/couch_file.erl b/src/couch/src/couch_file.erl
index acd4fda..4068872 100644
--- a/src/couch/src/couch_file.erl
+++ b/src/couch/src/couch_file.erl
@@ -42,6 +42,7 @@
 -export([append_binary/2, append_binary_md5/2]).
 -export([append_raw_chunk/2, assemble_file_chunk/1, assemble_file_chunk/2]).
 -export([append_term/2, append_term/3, append_term_md5/2, append_term_md5/3]).
+-export([append_terms/2, append_terms/3, append_binaries/2]).
 -export([write_header/2, read_header/1]).
 -export([delete/2, delete/3, nuke_dir/2, init_delete_dir/1]).
 -export([msec_since_last_read/1]).
@@ -119,6 +120,16 @@ append_term_md5(Fd, Term, Options) ->
     Comp = couch_util:get_value(compression, Options, ?DEFAULT_COMPRESSION),
     append_binary_md5(Fd, couch_compress:compress(Term, Comp)).
 
+append_terms(Fd, Terms) ->
+    append_terms(Fd, Terms, []).
+
+append_terms(Fd, Terms, Options) ->
+    Comp = couch_util:get_value(compression, Options, ?DEFAULT_COMPRESSION),
+    Bins = lists:map(fun(Term) ->
+        couch_compress:compress(Term, Comp)
+    end, Terms),
+    append_binaries(Fd, Bins).
+
 %%----------------------------------------------------------------------
 %% Purpose: To append an Erlang binary to the end of the file.
 %% Args:    Erlang term to serialize and append to the file.
@@ -129,12 +140,16 @@ append_term_md5(Fd, Term, Options) ->
 
 append_binary(Fd, Bin) ->
     ioq:call(Fd, {append_bin, assemble_file_chunk(Bin)}, erlang:get(io_priority)).
-    
+
 append_binary_md5(Fd, Bin) ->
     ioq:call(Fd,
         {append_bin, assemble_file_chunk(Bin, crypto:hash(md5, Bin))},
         erlang:get(io_priority)).
 
+append_binaries(Fd, Bins) ->
+    WriteBins = lists:map(fun assemble_file_chunk/1, Bins),
+    ioq:call(Fd, {append_bins, WriteBins}, erlang:get(io_priority)).
+
 append_raw_chunk(Fd, Chunk) ->
     ioq:call(Fd, {append_bin, Chunk}, erlang:get(io_priority)).
 
@@ -481,6 +496,20 @@ handle_call({append_bin, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
         {reply, Error, reset_eof(File)}
     end;
 
+handle_call({append_bins, Bins}, _From, #file{fd = Fd, eof = Pos} = File) ->
+    {BlockResps, FinalPos} = lists:mapfoldl(fun(Bin, PosAcc) ->
+        Blocks = make_blocks(PosAcc rem ?SIZE_BLOCK, Bin),
+        Size = iolist_size(Blocks),
+        {{Blocks, {PosAcc, Size}}, PosAcc + Size}
+    end, Pos, Bins),
+    {AllBlocks, Resps} = lists:unzip(BlockResps),
+    case file:write(Fd, AllBlocks) of
+    ok ->
+        {reply, {ok, Resps}, File#file{eof = FinalPos}};
+    Error ->
+        {reply, Error, reset_eof(File)}
+    end;
+
 handle_call({write_header, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
     BinSize = byte_size(Bin),
     case Pos rem ?SIZE_BLOCK of

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.

[couchdb] 01/04: Implement compactor test suite

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch compactor-optimize-emsort
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit ff23e13cf61eabec03a49f1287e826ede3344f08
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Sep 6 16:26:29 2017 -0500

    Implement compactor test suite
---
 src/couch/src/couch_db_updater.erl        |  58 +++++-
 src/couch/test/couch_db_updater_ev.erl    | 106 ++++++++++
 src/couch/test/couch_db_updater_tests.erl | 317 ++++++++++++++++++++++++++++++
 3 files changed, 480 insertions(+), 1 deletion(-)

diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index a83e3b4..33c17bb 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -1039,11 +1039,21 @@ check_md5(Md5, Md5) -> ok;
 check_md5(_, _) -> throw(md5_mismatch).
 
 
+-ifdef(TEST).
+-define(COMP_EVENT(Name),
+        couch_db_updater_ev:event(Name)).
+-else.
+-define(COMP_EVENT(Name), ignore).
+-endif.
+
+
 start_copy_compact(#db{}=Db) ->
     erlang:put(io_priority, {db_compact, Db#db.name}),
     couch_log:debug("Compaction process spawned for db \"~s\"", [Db#db.name]),
 
+    ?COMP_EVENT(init),
     {ok, InitCompSt} = open_compaction_files(Db),
+    ?COMP_EVENT(files_opened),
 
     Stages = [
         fun copy_purge_info/1,
@@ -1052,7 +1062,8 @@ start_copy_compact(#db{}=Db) ->
         fun sort_meta_data/1,
         fun commit_compaction_data/1,
         fun copy_meta_data/1,
-        fun compact_final_sync/1
+        fun compact_final_sync/1,
+        fun verify_compaction/1
     ],
 
     FinalCompSt = lists:foldl(fun(Stage, CompSt) ->
@@ -1067,6 +1078,7 @@ start_copy_compact(#db{}=Db) ->
     close_db(FinalNewDb),
     ok = couch_file:close(MetaFd),
 
+    ?COMP_EVENT(before_notify),
     gen_server:cast(Db#db.main_pid, {compact_done, FinalNewDb#db.filepath}).
 
 
@@ -1146,6 +1158,7 @@ reset_compaction_file(Fd, Header) ->
 
 
 copy_purge_info(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
+    ?COMP_EVENT(purge_init),
     OldHdr = OldDb#db.header,
     NewHdr = NewDb#db.header,
     OldPurgeSeq = couch_db_header:purge_seq(OldHdr),
@@ -1154,6 +1167,7 @@ copy_purge_info(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
         {ok, PurgedIdsRevs} = couch_db:get_last_purged(OldDb),
         Opts = [{compression, NewDb#db.compression}],
         {ok, Ptr, _} = couch_file:append_term(NewDb#db.fd, PurgedIdsRevs, Opts),
+        ?COMP_EVENT(purge_done),
         CompSt#comp_st{
             new_db = NewDb#db{
                 header = couch_db_header:set(NewHdr, [
@@ -1163,6 +1177,7 @@ copy_purge_info(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
             }
         };
     true ->
+        ?COMP_EVENT(purge_done),
         CompSt
     end.
 
@@ -1229,6 +1244,7 @@ copy_compact(#comp_st{} = CompSt) ->
         couch_task_status:set_update_frequency(500)
     end,
 
+    ?COMP_EVENT(seq_init),
     {ok, _, {NewDb2, Uncopied, _, _}} =
         couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun,
             {NewDb, [], 0, 0},
@@ -1236,6 +1252,8 @@ copy_compact(#comp_st{} = CompSt) ->
 
     NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
 
+    ?COMP_EVENT(seq_done),
+
     % copy misc header values
     if NewDb3#db.security /= Db#db.security ->
         {ok, Ptr, _} = couch_file:append_term(
@@ -1294,6 +1312,7 @@ copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
         TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts),
         NewActiveSize = FinalAS + TotalAttSize,
         NewExternalSize = FinalES + TotalAttSize,
+        ?COMP_EVENT(seq_copy),
         Info#full_doc_info{
             rev_tree = NewRevTree,
             sizes = #size_info{
@@ -1418,7 +1437,9 @@ bind_id_tree(Db, Fd, State) ->
 
 
 sort_meta_data(#comp_st{new_db = Db0} = CompSt) ->
+    ?COMP_EVENT(md_sort_init),
     {ok, Ems} = couch_emsort:merge(Db0#db.id_tree),
+    ?COMP_EVENT(md_sort_done),
     CompSt#comp_st{
         new_db = Db0#db{
             id_tree = Ems
@@ -1445,11 +1466,13 @@ copy_meta_data(#comp_st{new_db = Db} = CompSt) ->
         rem_seqs=[],
         infos=[]
     },
+    ?COMP_EVENT(md_copy_init),
     Acc = merge_docids(Iter, Acc0),
     {ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Acc#merge_st.infos),
     {ok, SeqTree} = couch_btree:add_remove(
         Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
     ),
+    ?COMP_EVENT(md_copy_done),
     CompSt#comp_st{
         new_db = Db#db{
             id_tree = IdTree,
@@ -1459,13 +1482,45 @@ copy_meta_data(#comp_st{new_db = Db} = CompSt) ->
 
 
 compact_final_sync(#comp_st{new_db = NewDb0} = CompSt) ->
+    ?COMP_EVENT(before_final_sync),
     NewHdr = db_to_header(NewDb0, NewDb0#db.header),
     NewDb1 = sync_header(NewDb0, NewHdr),
+    ?COMP_EVENT(after_final_sync),
     CompSt#comp_st{
         new_db = NewDb1
     }.
 
 
+verify_compaction(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
+    {ok, OldIdReds0} = couch_btree:full_reduce(OldDb#db.id_tree),
+    {ok, OldSeqReds} = couch_btree:full_reduce(OldDb#db.seq_tree),
+    {ok, NewIdReds0} = couch_btree:full_reduce(NewDb#db.id_tree),
+    {ok, NewSeqReds} = couch_btree:full_reduce(NewDb#db.seq_tree),
+    {
+        OldDocCount,
+        OldDelDocCount,
+        #size_info{external = OldExternalSize}
+    } = OldIdReds0,
+    OldIdReds = {OldDocCount, OldDelDocCount, OldExternalSize},
+    {
+        NewDocCount,
+        NewDelDocCount,
+        #size_info{external = NewExternalSize}
+    } = NewIdReds0,
+    NewIdReds = {NewDocCount, NewDelDocCount, NewExternalSize},
+    if NewIdReds == OldIdReds -> ok; true ->
+        Fmt1 = "Compacted id tree for ~s differs from source: ~p /= ~p",
+        couch_log:error(Fmt1, [couch_db:name(OldDb), NewIdReds, OldIdReds]),
+        exit({compaction_error, id_tree})
+    end,
+    if NewSeqReds == OldSeqReds -> ok; true ->
+        Fmt2 = "Compacted seq tree for ~s differs from source: ~p /= ~p",
+        couch_log:error(Fmt2, [couch_db:name(OldDb), NewSeqReds, OldSeqReds]),
+        exit({compaction_error, seq_tree})
+    end,
+    CompSt.
+
+
 merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 ->
     #merge_st{
         id_tree=IdTree0,
@@ -1489,6 +1544,7 @@ merge_docids(Iter, #merge_st{curr=Curr}=Acc) ->
                 rem_seqs = Seqs ++ Acc#merge_st.rem_seqs,
                 curr = NewCurr
             },
+            ?COMP_EVENT(md_copy_row),
             merge_docids(NextIter, Acc1);
         {finished, FDI, Seqs} ->
             Acc#merge_st{
diff --git a/src/couch/test/couch_db_updater_ev.erl b/src/couch/test/couch_db_updater_ev.erl
new file mode 100644
index 0000000..3a57141
--- /dev/null
+++ b/src/couch/test/couch_db_updater_ev.erl
@@ -0,0 +1,106 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_db_updater_ev).
+
+
+-export([
+    init/0,
+    terminate/0,
+    clear/0,
+
+    set_wait/1,
+    set_crash/1,
+
+    event/1
+]).
+
+
+-define(TAB, couch_db_updater_ev_tab).
+
+
+init() ->
+    ets:new(?TAB, [set, public, named_table]).
+
+
+terminate() ->
+    ets:delete(?TAB).
+
+
+clear() ->
+    ets:delete_all_objects(?TAB).
+
+
+set_wait(Event) ->
+    Self = self(),
+    WaitFun = fun(_) ->
+        receive
+            {Self, go} ->
+                Self ! {self(), ok}
+        end,
+        ets:delete(?TAB, Event)
+    end,
+    ContinueFun = fun(Pid) ->
+        Pid ! {Self, go},
+        receive {Pid, ok} -> ok end
+    end,
+    ets:insert(?TAB, {Event, WaitFun}),
+    {ok, ContinueFun}.
+
+
+set_crash(Event) ->
+    Reason = {couch_db_updater_ev_crash, Event},
+    CrashFun = fun(_) -> exit(Reason) end,
+    ets:insert(?TAB, {Event, CrashFun}),
+    {ok, Reason}.
+
+
+event(Event) ->
+    NewEvent = case Event of
+        seq_init ->
+            put(?MODULE, 0),
+            Event;
+        seq_copy ->
+            Count = get(?MODULE),
+            put(?MODULE, Count + 1),
+            {seq_copy, Count};
+        id_init ->
+            put(?MODULE, 0),
+            Event;
+        id_copy ->
+            Count = get(?MODULE),
+            put(?MODULE, Count + 1),
+            {id_copy, Count};
+        md_copy_init ->
+            put(?MODULE, 0),
+            Event;
+        md_copy_row ->
+            Count = get(?MODULE),
+            put(?MODULE, Count + 1),
+            {md_copy_row, Count};
+        _ ->
+            Event
+    end,
+    handle_event(NewEvent).
+
+
+handle_event(Event) ->
+    try
+        case ets:lookup(?TAB, Event) of
+            [{Event, ActionFun}] ->
+                ActionFun(Event);
+            [] ->
+                ok
+        end
+    catch error:badarg ->
+        ok
+    end.
\ No newline at end of file
diff --git a/src/couch/test/couch_db_updater_tests.erl b/src/couch/test/couch_db_updater_tests.erl
new file mode 100644
index 0000000..9ad492e
--- /dev/null
+++ b/src/couch/test/couch_db_updater_tests.erl
@@ -0,0 +1,317 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_db_updater_tests).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
+
+-define(TIMEOUT_EUNIT, 60).
+-define(EV_MOD, couch_db_updater_ev).
+-define(INIT_DOCS, 2500).
+-define(WRITE_DOCS, 20).
+
+% The idea behind the tests in this module are to attempt to
+% cover the number of restart/recopy events during compaction
+% so that we can be as sure as possible that the compactor
+% is resilient to errors in the face of external conditions
+% (i.e., the VM rebooted). The single linear pass is easy enough
+% to prove, however restarting is important enough that we don't
+% want to waste work if a VM happens to bounce a lot.
+%
+% To try and cover as many restart situations we have created a
+% number of events in the compactor code that are present during
+% a test compiled version of the module. These events can then
+% be used (via meck) to introduce errors and coordinate writes
+% to the database while compaction is in progress.
+
+% This list of events is where we'll insert our errors.
+
+events() ->
+    [
+        init,               % The compactor process is spawned
+        files_opened,       % After compaction files have opened
+
+        purge_init,         % Just before apply purge changes
+        purge_done,         % Just after finish purge updates
+
+        % The firs phase is when we copy all document body and attachment
+        % data to the new database file in order of update sequence so
+        % that we can resume on crash.
+
+        seq_init,           % Before the first change is copied
+        {seq_copy, 0},      % After change N is copied
+        {seq_copy, ?INIT_DOCS div 2},
+        {seq_copy, ?INIT_DOCS - 2},
+        seq_done,           % After last change is copied
+
+        % The id copy phases come in two flavors. Before a compaction
+        % swap is attempted they're copied from the id_tree in the
+        % database being compacted. After a swap attempt they are
+        % stored in an emsort file on disk. Thus the two sets of
+        % related events here.
+
+        md_sort_init,       % Just before metadata sort starts
+        md_sort_done,       % Justa after metadata sort finished
+        md_copy_init,       % Just before metadata copy starts
+        {md_copy_row, 0},   % After docid N is copied
+        {md_copy_row, ?INIT_DOCS div 2},
+        {md_copy_row, ?INIT_DOCS - 2},
+        md_copy_done,       % Just after the last docid is copied
+
+        % And then the final steps before we finish
+
+        before_final_sync,  % Just before final sync
+        after_final_sync,   % Just after the final sync
+        before_notify       % Just before the final notification
+    ].
+
+% Mark which evens only happen when documents are present
+
+requires_docs({seq_copy, _}) -> true;
+requires_docs(md_sort_init) -> true;
+requires_docs(md_sort_done) -> true;
+requires_docs(md_copy_init) -> true;
+requires_docs({md_copy_row, _}) -> true;
+requires_docs(md_copy_done) -> true;
+requires_docs(_) -> false.
+
+
+% Mark which events only happen when there's write activity during
+% a compaction.
+
+requires_write(md_sort_init) -> true;
+requires_write(md_sort_done) -> true;
+requires_write(md_copy_init) -> true;
+requires_write({md_copy_row, _}) -> true;
+requires_write(md_copy_done) -> true;
+requires_write(_) -> false.
+
+
+setup() ->
+    purge_module(),
+    ?EV_MOD:init(),
+    test_util:start_couch().
+
+
+teardown(Ctx) ->
+    test_util:stop_couch(Ctx),
+    ?EV_MOD:terminate().
+
+
+start_empty_db_test(_Event) ->
+    ?EV_MOD:clear(),
+    DbName = ?tempdb(),
+    {ok, _} = couch_db:create(DbName, [?ADMIN_CTX]),
+    DbName.
+
+
+start_populated_db_test(Event) ->
+    DbName = start_empty_db_test(Event),
+    {ok, Db} = couch_db:open_int(DbName, []),
+    try
+        populate_db(Db, ?INIT_DOCS)
+    after
+        couch_db:close(Db)
+    end,
+    DbName.
+
+
+stop_test(_Event, DbName) ->
+    couch_server:delete(DbName, [?ADMIN_CTX]).
+
+
+static_empty_db_test_() ->
+    FiltFun = fun(E) ->
+        not (requires_docs(E) or requires_write(E))
+    end,
+    Events = lists:filter(FiltFun, events()) -- [init],
+    {
+        "Idle empty database",
+        {
+            setup,
+            fun setup/0,
+            fun teardown/1,
+            [
+                {
+                    foreachx,
+                    fun start_empty_db_test/1,
+                    fun stop_test/2,
+                    [{Event, fun run_static_init/2} || Event <- Events]
+                }
+            ]
+        }
+    }.
+
+
+static_populated_db_test_() ->
+    FiltFun = fun(E) -> not requires_write(E) end,
+    Events = lists:filter(FiltFun, events()) -- [init],
+    {
+        "Idle populated database",
+        {
+            setup,
+            fun setup/0,
+            fun teardown/1,
+            [
+                {
+                    foreachx,
+                    fun start_populated_db_test/1,
+                    fun stop_test/2,
+                    [{Event, fun run_static_init/2} || Event <- Events]
+                }
+            ]
+        }
+    }.
+
+
+dynamic_empty_db_test_() ->
+    FiltFun = fun(E) -> not requires_docs(E) end,
+    Events = lists:filter(FiltFun, events()) -- [init],
+    {
+        "Writes to empty database",
+        {
+            setup,
+            fun setup/0,
+            fun teardown/1,
+            [
+                {
+                    foreachx,
+                    fun start_empty_db_test/1,
+                    fun stop_test/2,
+                    [{Event, fun run_dynamic_init/2} || Event <- Events]
+                }
+            ]
+        }
+    }.
+
+
+dynamic_populated_db_test_() ->
+    Events = events() -- [init],
+    {
+        "Writes to populated database",
+        {
+            setup,
+            fun setup/0,
+            fun teardown/1,
+            [
+                {
+                    foreachx,
+                    fun start_populated_db_test/1,
+                    fun stop_test/2,
+                    [{Event, fun run_dynamic_init/2} || Event <- Events]
+                }
+            ]
+        }
+    }.
+
+
+run_static_init(Event, DbName) ->
+    Name = lists:flatten(io_lib:format("~p", [Event])),
+    Test = {timeout, ?TIMEOUT_EUNIT, ?_test(run_static(Event, DbName))},
+    {Name, Test}.
+
+
+run_static(Event, DbName) ->
+    {ok, ContinueFun} = ?EV_MOD:set_wait(init),
+    {ok, Reason} = ?EV_MOD:set_crash(Event),
+    {ok, Db} = couch_db:open_int(DbName, []),
+    Ref = couch_db:monitor(Db),
+    {ok, CPid} = couch_db:start_compact(Db),
+    ContinueFun(CPid),
+    receive
+        {'DOWN', Ref, _, _, Reason} ->
+            wait_db_cleared(Db)
+    end,
+    run_successful_compaction(DbName),
+    couch_db:close(Db).
+
+
+run_dynamic_init(Event, DbName) ->
+    Name = lists:flatten(io_lib:format("~p", [Event])),
+    Test = {timeout, ?TIMEOUT_EUNIT, ?_test(run_dynamic(Event, DbName))},
+    {Name, Test}.
+
+
+run_dynamic(Event, DbName) ->
+    {ok, ContinueFun} = ?EV_MOD:set_wait(init),
+    {ok, Reason} = ?EV_MOD:set_crash(Event),
+    {ok, Db} = couch_db:open_int(DbName, []),
+    Ref = couch_db:monitor(Db),
+    {ok, CPid} = couch_db:start_compact(Db),
+    ok = populate_db(Db, 10),
+    ContinueFun(CPid),
+    receive
+        {'DOWN', Ref, _, _, Reason} ->
+            wait_db_cleared(Db)
+    end,
+    run_successful_compaction(DbName),
+    couch_db:close(Db).
+
+
+run_successful_compaction(DbName) ->
+    ?EV_MOD:clear(),
+    {ok, ContinueFun} = ?EV_MOD:set_wait(init),
+    {ok, Db} = couch_db:open_int(DbName, []),
+    {ok, CPid} = couch_db:start_compact(Db),
+    Ref = erlang:monitor(process, CPid),
+    ContinueFun(CPid),
+    receive
+        {'DOWN', Ref, _, _, normal} -> ok
+    end,
+    ?assertMatch({ok, _}, gen_server:call(Db#db.main_pid, get_db)),
+    couch_db:close(Db).
+
+
+wait_db_cleared(Db) ->
+    wait_db_cleared(Db, 5).
+
+
+wait_db_cleared(Db, N) when N < 0 ->
+    erlang:error({db_clear_timeout, couch_db:name(Db)});
+
+wait_db_cleared(Db, N) ->
+    case ets:lookup(couch_dbs, couch_db:name(Db)) of
+        [] ->
+            ok;
+        [NewDb] when NewDb#db.main_pid /= Db#db.main_pid ->
+            ok;
+        _ ->
+            timer:sleep(100),
+            wait_db_cleared(Db, N - 1)
+    end.
+
+
+populate_db(_Db, NumDocs) when NumDocs =< 0 ->
+    ok;
+populate_db(Db, NumDocs) ->
+    String = [$a || _ <- lists:seq(1, erlang:min(NumDocs, 500))],
+    Docs = lists:map(
+        fun(_) ->
+            couch_doc:from_json_obj({[
+                {<<"_id">>, couch_uuids:random()},
+                {<<"string">>, list_to_binary(String)}
+            ]})
+        end,
+        lists:seq(1, 500)),
+    {ok, _} = couch_db:update_docs(Db, Docs, []),
+    populate_db(Db, NumDocs - 500).
+
+
+purge_module() ->
+    case code:which(couch_db_updater) of
+        cover_compiled ->
+            ok;
+        _ ->
+            code:delete(couch_db_updater),
+            code:purge(couch_db_updater)
+    end.
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.

[couchdb] 03/04: Optimize btree node writes

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch compactor-optimize-emsort
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit ac940a950cf5791d2637c68ed993d83438e52ccb
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Sep 6 16:26:03 2017 -0500

    Optimize btree node writes
    
    This uses the new couch_file:append_terms/2 function to write all chunks
    in a single write call.
---
 src/couch/src/couch_btree.erl | 28 +++++++++++++++-------------
 1 file changed, 15 insertions(+), 13 deletions(-)

diff --git a/src/couch/src/couch_btree.erl b/src/couch/src/couch_btree.erl
index ea224b1..0ae6e7a 100644
--- a/src/couch/src/couch_btree.erl
+++ b/src/couch/src/couch_btree.erl
@@ -436,20 +436,22 @@ get_node(#btree{fd = Fd}, NodePos) ->
 
 write_node(#btree{fd = Fd, compression = Comp} = Bt, NodeType, NodeList) ->
     % split up nodes into smaller sizes
-    NodeListList = chunkify(NodeList),
+    Chunks = chunkify(NodeList),
     % now write out each chunk and return the KeyPointer pairs for those nodes
-    ResultList = [
-        begin
-            {ok, Pointer, Size} = couch_file:append_term(
-                Fd, {NodeType, ANodeList}, [{compression, Comp}]),
-            {LastKey, _} = lists:last(ANodeList),
-            SubTreeSize = reduce_tree_size(NodeType, Size, ANodeList),
-            {LastKey, {Pointer, reduce_node(Bt, NodeType, ANodeList), SubTreeSize}}
-        end
-    ||
-        ANodeList <- NodeListList
-    ],
-    {ok, ResultList}.
+    ToWrite = [{NodeType, Chunk} || Chunk <- Chunks],
+    WriteOpts = [{compression, Comp}],
+    {ok, PtrSizes} = couch_file:append_terms(Fd, ToWrite, WriteOpts),
+    {ok, group_kps(Bt, NodeType, Chunks, PtrSizes)}.
+
+
+group_kps(_Bt, _NodeType, [], []) ->
+    [];
+
+group_kps(Bt, NodeType, [Chunk | RestChunks], [{Ptr, Size} | RestPtrSizes]) ->
+    {LastKey, _} = lists:last(Chunk),
+    SubTreeSize = reduce_tree_size(NodeType, Size, Chunk),
+    KP = {LastKey, {Ptr, reduce_node(Bt, NodeType, Chunk), SubTreeSize}},
+    [KP | group_kps(Bt, NodeType, RestChunks, RestPtrSizes)].
 
 
 write_node(Bt, _OldNode, NodeType, [], NewList) ->

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.