You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2020/04/14 16:26:24 UTC

[couchdb] branch 3.x updated (a4f240c -> 123bf82)

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a change to branch 3.x
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


    from a4f240c  Port reduce_false.js and reduce_builtin.js to Elixir (#2541)
     new fbe5ba5  Simplify compaction state management
     new 1caf374  Implement compactor test suite
     new dabd4e2  Add multi-append functions to couch_file
     new 2c9477e  Optimize btree node writes
     new 1aef628  Optimize compactor to use multi-IO API
     new 123bf82  Update compaction progress during docid phases

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/couch/src/couch_bt_engine_compactor.erl        | 268 +++++++++++++----
 src/couch/src/couch_btree.erl                      |  28 +-
 src/couch/src/couch_emsort.erl                     | 102 +++++--
 src/couch/src/couch_file.erl                       | 162 ++++++++--
 .../test/eunit/couch_bt_engine_compactor_ev.erl    | 106 +++++++
 .../eunit/couch_bt_engine_compactor_ev_tests.erl   | 335 +++++++++++++++++++++
 6 files changed, 869 insertions(+), 132 deletions(-)
 create mode 100644 src/couch/test/eunit/couch_bt_engine_compactor_ev.erl
 create mode 100644 src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl


[couchdb] 02/06: Implement compactor test suite

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch 3.x
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 1caf374bdc89fbe41ecb0908ccb7c170640612af
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Sep 6 16:26:29 2017 -0500

    Implement compactor test suite
---
 src/couch/src/couch_bt_engine_compactor.erl        |  23 ++
 .../test/eunit/couch_bt_engine_compactor_ev.erl    | 106 +++++++
 .../eunit/couch_bt_engine_compactor_ev_tests.erl   | 335 +++++++++++++++++++++
 3 files changed, 464 insertions(+)

diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl
index 3979fcb..50b3981 100644
--- a/src/couch/src/couch_bt_engine_compactor.erl
+++ b/src/couch/src/couch_bt_engine_compactor.erl
@@ -44,13 +44,22 @@
 }).
 
 
+-ifdef(TEST).
+-define(COMP_EVENT(Name), couch_bt_engine_compactor_ev:event(Name)).
+-else.
+-define(COMP_EVENT(Name), ignore).
+-endif.
+
+
 start(#st{} = St, DbName, Options, Parent) ->
     erlang:put(io_priority, {db_compact, DbName}),
     couch_log:debug("Compaction process spawned for db \"~s\"", [DbName]),
 
     couch_db_engine:trigger_on_compact(DbName),
 
+    ?COMP_EVENT(init),
     {ok, InitCompSt} = open_compaction_files(DbName, St, Options),
+    ?COMP_EVENT(files_opened),
 
     Stages = [
         fun copy_purge_info/1,
@@ -74,6 +83,7 @@ start(#st{} = St, DbName, Options, Parent) ->
     ok = couch_bt_engine:decref(FinalNewSt),
     ok = couch_file:close(MetaFd),
 
+    ?COMP_EVENT(before_notify),
     Msg = {compact_done, couch_bt_engine, FinalNewSt#st.filepath},
     gen_server:cast(Parent, Msg).
 
@@ -146,6 +156,7 @@ copy_purge_info(#comp_st{} = CompSt) ->
         new_st = NewSt,
         retry = Retry
     } = CompSt,
+    ?COMP_EVENT(purge_init),
     MinPurgeSeq = couch_util:with_db(DbName, fun(Db) ->
         couch_db:get_minimum_purge_seq(Db)
     end),
@@ -180,6 +191,7 @@ copy_purge_info(#comp_st{} = CompSt) ->
     {NewStAcc, Infos, _, _} = FinalAcc,
     FinalNewSt = copy_purge_infos(OldSt, NewStAcc, Infos, MinPurgeSeq, Retry),
 
+    ?COMP_EVENT(purge_done),
     CompSt#comp_st{
         new_st = FinalNewSt
     }.
@@ -322,6 +334,7 @@ copy_compact(#comp_st{} = CompSt) ->
         couch_task_status:set_update_frequency(500)
     end,
 
+    ?COMP_EVENT(seq_init),
     {ok, _, {NewSt2, Uncopied, _, _}} =
         couch_btree:foldl(St#st.seq_tree, EnumBySeqFun,
             {NewSt, [], 0, 0},
@@ -329,6 +342,8 @@ copy_compact(#comp_st{} = CompSt) ->
 
     NewSt3 = copy_docs(St, NewSt2, lists:reverse(Uncopied), Retry),
 
+    ?COMP_EVENT(seq_done),
+
     % Copy the security information over
     SecProps = couch_bt_engine:get_security(St),
     {ok, NewSt4} = couch_bt_engine:copy_security(NewSt3, SecProps),
@@ -392,6 +407,7 @@ copy_docs(St, #st{} = NewSt, MixedInfos, Retry) ->
         TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts),
         NewActiveSize = FinalAS + TotalAttSize,
         NewExternalSize = FinalES + TotalAttSize,
+        ?COMP_EVENT(seq_copy),
         Info#full_doc_info{
             rev_tree = NewRevTree,
             sizes = #size_info{
@@ -478,7 +494,9 @@ copy_doc_attachments(#st{} = SrcSt, SrcSp, DstSt) ->
 
 
 sort_meta_data(#comp_st{new_st = St0} = CompSt) ->
+    ?COMP_EVENT(md_sort_init),
     {ok, Ems} = couch_emsort:merge(St0#st.id_tree),
+    ?COMP_EVENT(md_sort_done),
     CompSt#comp_st{
         new_st = St0#st{
             id_tree = Ems
@@ -505,11 +523,13 @@ copy_meta_data(#comp_st{new_st = St} = CompSt) ->
         rem_seqs=[],
         infos=[]
     },
+    ?COMP_EVENT(md_copy_init),
     Acc = merge_docids(Iter, Acc0),
     {ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Acc#merge_st.infos),
     {ok, SeqTree} = couch_btree:add_remove(
         Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
     ),
+    ?COMP_EVENT(md_copy_done),
     CompSt#comp_st{
         new_st = St#st{
             id_tree = IdTree,
@@ -519,7 +539,9 @@ copy_meta_data(#comp_st{new_st = St} = CompSt) ->
 
 
 compact_final_sync(#comp_st{new_st = St0} = CompSt) ->
+    ?COMP_EVENT(before_final_sync),
     {ok, St1} = couch_bt_engine:commit_data(St0),
+    ?COMP_EVENT(after_final_sync),
     CompSt#comp_st{
         new_st = St1
     }.
@@ -628,6 +650,7 @@ merge_docids(Iter, #merge_st{curr=Curr}=Acc) ->
                 rem_seqs = Seqs ++ Acc#merge_st.rem_seqs,
                 curr = NewCurr
             },
+            ?COMP_EVENT(md_copy_row),
             merge_docids(NextIter, Acc1);
         {finished, FDI, Seqs} ->
             Acc#merge_st{
diff --git a/src/couch/test/eunit/couch_bt_engine_compactor_ev.erl b/src/couch/test/eunit/couch_bt_engine_compactor_ev.erl
new file mode 100644
index 0000000..f50be84
--- /dev/null
+++ b/src/couch/test/eunit/couch_bt_engine_compactor_ev.erl
@@ -0,0 +1,106 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_bt_engine_compactor_ev).
+
+
+-export([
+    init/0,
+    terminate/0,
+    clear/0,
+
+    set_wait/1,
+    set_crash/1,
+
+    event/1
+]).
+
+
+-define(TAB, couch_db_updater_ev_tab).
+
+
+init() ->
+    ets:new(?TAB, [set, public, named_table]).
+
+
+terminate() ->
+    ets:delete(?TAB).
+
+
+clear() ->
+    ets:delete_all_objects(?TAB).
+
+
+set_wait(Event) ->
+    Self = self(),
+    WaitFun = fun(_) ->
+        receive
+            {Self, go} ->
+                Self ! {self(), ok}
+        end,
+        ets:delete(?TAB, Event)
+    end,
+    ContinueFun = fun(Pid) ->
+        Pid ! {Self, go},
+        receive {Pid, ok} -> ok end
+    end,
+    ets:insert(?TAB, {Event, WaitFun}),
+    {ok, ContinueFun}.
+
+
+set_crash(Event) ->
+    Reason = {couch_db_updater_ev_crash, Event},
+    CrashFun = fun(_) -> exit(Reason) end,
+    ets:insert(?TAB, {Event, CrashFun}),
+    {ok, Reason}.
+
+
+event(Event) ->
+    NewEvent = case Event of
+        seq_init ->
+            put(?MODULE, 0),
+            Event;
+        seq_copy ->
+            Count = get(?MODULE),
+            put(?MODULE, Count + 1),
+            {seq_copy, Count};
+        id_init ->
+            put(?MODULE, 0),
+            Event;
+        id_copy ->
+            Count = get(?MODULE),
+            put(?MODULE, Count + 1),
+            {id_copy, Count};
+        md_copy_init ->
+            put(?MODULE, 0),
+            Event;
+        md_copy_row ->
+            Count = get(?MODULE),
+            put(?MODULE, Count + 1),
+            {md_copy_row, Count};
+        _ ->
+            Event
+    end,
+    handle_event(NewEvent).
+
+
+handle_event(Event) ->
+    try
+        case ets:lookup(?TAB, Event) of
+            [{Event, ActionFun}] ->
+                ActionFun(Event);
+            [] ->
+                ok
+        end
+    catch error:badarg ->
+        ok
+    end.
\ No newline at end of file
diff --git a/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl b/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl
new file mode 100644
index 0000000..188078c
--- /dev/null
+++ b/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl
@@ -0,0 +1,335 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_bt_engine_compactor_ev_tests).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/src/couch_server_int.hrl").
+
+-define(TIMEOUT_EUNIT, 60).
+-define(EV_MOD, couch_bt_engine_compactor_ev).
+-define(INIT_DOCS, 2500).
+-define(WRITE_DOCS, 20).
+
+% The idea behind the tests in this module are to attempt to
+% cover the number of restart/recopy events during compaction
+% so that we can be as sure as possible that the compactor
+% is resilient to errors in the face of external conditions
+% (i.e., the VM rebooted). The single linear pass is easy enough
+% to prove, however restarting is important enough that we don't
+% want to waste work if a VM happens to bounce a lot.
+%
+% To try and cover as many restart situations we have created a
+% number of events in the compactor code that are present during
+% a test compiled version of the module. These events can then
+% be used (via meck) to introduce errors and coordinate writes
+% to the database while compaction is in progress.
+
+% This list of events is where we'll insert our errors.
+
+events() ->
+    [
+        init,               % The compactor process is spawned
+        files_opened,       % After compaction files have opened
+
+        purge_init,         % Just before apply purge changes
+        purge_done,         % Just after finish purge updates
+
+        % The firs phase is when we copy all document body and attachment
+        % data to the new database file in order of update sequence so
+        % that we can resume on crash.
+
+        seq_init,           % Before the first change is copied
+        {seq_copy, 0},      % After change N is copied
+        {seq_copy, ?INIT_DOCS div 2},
+        {seq_copy, ?INIT_DOCS - 2},
+        seq_done,           % After last change is copied
+
+        % The id copy phases come in two flavors. Before a compaction
+        % swap is attempted they're copied from the id_tree in the
+        % database being compacted. After a swap attempt they are
+        % stored in an emsort file on disk. Thus the two sets of
+        % related events here.
+
+        md_sort_init,       % Just before metadata sort starts
+        md_sort_done,       % Justa after metadata sort finished
+        md_copy_init,       % Just before metadata copy starts
+        {md_copy_row, 0},   % After docid N is copied
+        {md_copy_row, ?INIT_DOCS div 2},
+        {md_copy_row, ?INIT_DOCS - 2},
+        md_copy_done,       % Just after the last docid is copied
+
+        % And then the final steps before we finish
+
+        before_final_sync,  % Just before final sync
+        after_final_sync,   % Just after the final sync
+        before_notify       % Just before the final notification
+    ].
+
+% Mark which evens only happen when documents are present
+
+requires_docs({seq_copy, _}) -> true;
+requires_docs(md_sort_init) -> true;
+requires_docs(md_sort_done) -> true;
+requires_docs(md_copy_init) -> true;
+requires_docs({md_copy_row, _}) -> true;
+requires_docs(md_copy_done) -> true;
+requires_docs(_) -> false.
+
+
+% Mark which events only happen when there's write activity during
+% a compaction.
+
+requires_write(md_sort_init) -> true;
+requires_write(md_sort_done) -> true;
+requires_write(md_copy_init) -> true;
+requires_write({md_copy_row, _}) -> true;
+requires_write(md_copy_done) -> true;
+requires_write(_) -> false.
+
+
+setup() ->
+    purge_module(),
+    ?EV_MOD:init(),
+    test_util:start_couch().
+
+
+teardown(Ctx) ->
+    test_util:stop_couch(Ctx),
+    ?EV_MOD:terminate().
+
+
+start_empty_db_test(_Event) ->
+    ?EV_MOD:clear(),
+    DbName = ?tempdb(),
+    {ok, _} = couch_db:create(DbName, [?ADMIN_CTX]),
+    DbName.
+
+
+start_populated_db_test(Event) ->
+    DbName = start_empty_db_test(Event),
+    {ok, Db} = couch_db:open_int(DbName, []),
+    try
+        populate_db(Db, ?INIT_DOCS)
+    after
+        couch_db:close(Db)
+    end,
+    DbName.
+
+
+stop_test(_Event, DbName) ->
+    couch_server:delete(DbName, [?ADMIN_CTX]).
+
+
+static_empty_db_test_() ->
+    FiltFun = fun(E) ->
+        not (requires_docs(E) or requires_write(E))
+    end,
+    Events = lists:filter(FiltFun, events()) -- [init],
+    {
+        "Idle empty database",
+        {
+            setup,
+            fun setup/0,
+            fun teardown/1,
+            [
+                {
+                    foreachx,
+                    fun start_empty_db_test/1,
+                    fun stop_test/2,
+                    [{Event, fun run_static_init/2} || Event <- Events]
+                }
+            ]
+        }
+    }.
+
+
+static_populated_db_test_() ->
+    FiltFun = fun(E) -> not requires_write(E) end,
+    Events = lists:filter(FiltFun, events()) -- [init],
+    {
+        "Idle populated database",
+        {
+            setup,
+            fun setup/0,
+            fun teardown/1,
+            [
+                {
+                    foreachx,
+                    fun start_populated_db_test/1,
+                    fun stop_test/2,
+                    [{Event, fun run_static_init/2} || Event <- Events]
+                }
+            ]
+        }
+    }.
+
+
+dynamic_empty_db_test_() ->
+    FiltFun = fun(E) -> not requires_docs(E) end,
+    Events = lists:filter(FiltFun, events()) -- [init],
+    {
+        "Writes to empty database",
+        {
+            setup,
+            fun setup/0,
+            fun teardown/1,
+            [
+                {
+                    foreachx,
+                    fun start_empty_db_test/1,
+                    fun stop_test/2,
+                    [{Event, fun run_dynamic_init/2} || Event <- Events]
+                }
+            ]
+        }
+    }.
+
+
+dynamic_populated_db_test_() ->
+    Events = events() -- [init],
+    {
+        "Writes to populated database",
+        {
+            setup,
+            fun setup/0,
+            fun teardown/1,
+            [
+                {
+                    foreachx,
+                    fun start_populated_db_test/1,
+                    fun stop_test/2,
+                    [{Event, fun run_dynamic_init/2} || Event <- Events]
+                }
+            ]
+        }
+    }.
+
+
+run_static_init(Event, DbName) ->
+    Name = lists:flatten(io_lib:format("~p", [Event])),
+    Test = {timeout, ?TIMEOUT_EUNIT, ?_test(run_static(Event, DbName))},
+    {Name, Test}.
+
+
+run_static(Event, DbName) ->
+    {ok, ContinueFun} = ?EV_MOD:set_wait(init),
+    {ok, Reason} = ?EV_MOD:set_crash(Event),
+    {ok, Db} = couch_db:open_int(DbName, []),
+    Ref = couch_db:monitor(Db),
+    {ok, CPid} = couch_db:start_compact(Db),
+    ContinueFun(CPid),
+    receive
+        {'DOWN', Ref, _, _, Reason} ->
+            wait_db_cleared(Db)
+    end,
+    run_successful_compaction(DbName),
+    couch_db:close(Db).
+
+
+run_dynamic_init(Event, DbName) ->
+    Name = lists:flatten(io_lib:format("~p", [Event])),
+    Test = {timeout, ?TIMEOUT_EUNIT, ?_test(run_dynamic(Event, DbName))},
+    {Name, Test}.
+
+
+run_dynamic(Event, DbName) ->
+    {ok, ContinueFun} = ?EV_MOD:set_wait(init),
+    {ok, Reason} = ?EV_MOD:set_crash(Event),
+    {ok, Db} = couch_db:open_int(DbName, []),
+    Ref = couch_db:monitor(Db),
+    {ok, CPid} = couch_db:start_compact(Db),
+    ok = populate_db(Db, 10),
+    ContinueFun(CPid),
+    receive
+        {'DOWN', Ref, _, _, Reason} ->
+            wait_db_cleared(Db)
+    end,
+    run_successful_compaction(DbName),
+    couch_db:close(Db).
+
+
+run_successful_compaction(DbName) ->
+    ?EV_MOD:clear(),
+    {ok, ContinueFun} = ?EV_MOD:set_wait(init),
+    {ok, Db} = couch_db:open_int(DbName, []),
+    {ok, CPid} = couch_db:start_compact(Db),
+    Ref = erlang:monitor(process, CPid),
+    ContinueFun(CPid),
+    receive
+        {'DOWN', Ref, _, _, normal} -> ok
+    end,
+    Pid = couch_db:get_pid(Db),
+    {ok, NewDb} = gen_server:call(Pid, get_db),
+    validate_compaction(NewDb),
+    couch_db:close(Db).
+
+
+wait_db_cleared(Db) ->
+    wait_db_cleared(Db, 5).
+
+
+wait_db_cleared(Db, N) when N < 0 ->
+    erlang:error({db_clear_timeout, couch_db:name(Db)});
+
+wait_db_cleared(Db, N) ->
+    case ets:lookup(couch_dbs, couch_db:name(Db)) of
+        [] ->
+            ok;
+        [#entry{db = NewDb}] ->
+            OldPid = couch_db:get_pid(Db),
+            NewPid = couch_db:get_pid(NewDb),
+            if NewPid /= OldPid -> ok; true ->
+                timer:sleep(100),
+                wait_db_cleared(Db, N - 1)
+            end
+    end.
+
+
+populate_db(_Db, NumDocs) when NumDocs =< 0 ->
+    ok;
+populate_db(Db, NumDocs) ->
+    String = [$a || _ <- lists:seq(1, erlang:min(NumDocs, 500))],
+    Docs = lists:map(
+        fun(_) ->
+            couch_doc:from_json_obj({[
+                {<<"_id">>, couch_uuids:random()},
+                {<<"string">>, list_to_binary(String)}
+            ]})
+        end,
+        lists:seq(1, 500)),
+    {ok, _} = couch_db:update_docs(Db, Docs, []),
+    populate_db(Db, NumDocs - 500).
+
+
+validate_compaction(Db) ->
+    {ok, DocCount} = couch_db:get_doc_count(Db),
+    {ok, DelDocCount} = couch_db:get_del_doc_count(Db),
+    NumChanges = couch_db:count_changes_since(Db, 0),
+    FoldFun = fun(FDI, {PrevId, CountAcc}) ->
+        ?assert(FDI#full_doc_info.id > PrevId),
+        {ok, {FDI#full_doc_info.id, CountAcc + 1}}
+    end,
+    {ok, {_, LastCount}} = couch_db:fold_docs(Db, FoldFun, {<<>>, 0}),
+    ?assertEqual(DocCount + DelDocCount, LastCount),
+    ?assertEqual(NumChanges, LastCount).
+
+
+purge_module() ->
+    case code:which(couch_db_updater) of
+        cover_compiled ->
+            ok;
+        _ ->
+            code:delete(couch_db_updater),
+            code:purge(couch_db_updater)
+    end.
\ No newline at end of file


[couchdb] 06/06: Update compaction progress during docid phases

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch 3.x
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 123bf82370c21a8b5458299a7e36c477a0fedca4
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Fri Apr 3 10:08:07 2020 -0500

    Update compaction progress during docid phases
    
    Previously the sort and copy phases when handling document IDs was not
    measured in _active_tasks. This adds size tracking to allow operators a
    way to measure progress during those phases.
    
    I'd like to thank Vitaly for the example in #1006 that showed a clean
    way for tracking the size info in `couch_emsort`.
    
    Co-Authored-By: Vitaly Goot <vi...@gmail.com>
---
 src/couch/src/couch_bt_engine_compactor.erl |  26 ++++++-
 src/couch/src/couch_emsort.erl              | 102 ++++++++++++++++++++--------
 2 files changed, 99 insertions(+), 29 deletions(-)

diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl
index 92ccea5..4bed49c 100644
--- a/src/couch/src/couch_bt_engine_compactor.erl
+++ b/src/couch/src/couch_bt_engine_compactor.erl
@@ -318,6 +318,7 @@ copy_compact(#comp_st{} = CompSt) ->
     TaskProps0 = [
         {type, database_compaction},
         {database, DbName},
+        {phase, document_copy},
         {progress, 0},
         {changes_done, 0},
         {total_changes, TotalChanges}
@@ -326,6 +327,7 @@ copy_compact(#comp_st{} = CompSt) ->
     true ->
         couch_task_status:update([
             {retry, true},
+            {phase, document_copy},
             {progress, 0},
             {changes_done, 0},
             {total_changes, TotalChanges}
@@ -502,7 +504,16 @@ copy_doc_attachments(#st{} = SrcSt, SrcSp, DstSt) ->
 
 sort_meta_data(#comp_st{new_st = St0} = CompSt) ->
     ?COMP_EVENT(md_sort_init),
-    {ok, Ems} = couch_emsort:merge(St0#st.id_tree),
+    NumKVs = couch_emsort:num_kvs(St0#st.id_tree),
+    NumMerges = couch_emsort:num_merges(St0#st.id_tree),
+    couch_task_status:update([
+        {phase, docid_sort},
+        {progress, 0},
+        {changes_done, 0},
+        {total_changes, NumMerges * NumKVs}
+    ]),
+    Reporter = fun update_compact_task/1,
+    {ok, Ems} = couch_emsort:merge(St0#st.id_tree, Reporter),
     ?COMP_EVENT(md_sort_done),
     CompSt#comp_st{
         new_st = St0#st{
@@ -533,12 +544,20 @@ copy_meta_data(#comp_st{new_st = St} = CompSt) ->
         locs=[]
     },
     ?COMP_EVENT(md_copy_init),
+    NumKVs = couch_emsort:num_kvs(Src),
+    couch_task_status:update([
+        {phase, docid_copy},
+        {progress, 0},
+        {changes_done, 0},
+        {total_changes, NumKVs}
+    ]),
     Acc = merge_docids(Iter, Acc0),
     {ok, Infos} = couch_file:pread_terms(SrcFd, Acc#merge_st.locs),
     {ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Infos),
     {ok, SeqTree} = couch_btree:add_remove(
         Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
     ),
+    update_compact_task(NumKVs),
     ?COMP_EVENT(md_copy_done),
     CompSt#comp_st{
         new_st = St#st{
@@ -609,8 +628,10 @@ commit_compaction_data(#st{header = OldHeader} = St0, Fd) ->
 bind_emsort(St, Fd, nil) ->
     {ok, Ems} = couch_emsort:open(Fd),
     St#st{id_tree=Ems};
+bind_emsort(St, Fd, State) when is_integer(State) ->
+    bind_emsort(St, Fd, [{root, State}]);
 bind_emsort(St, Fd, State) ->
-    {ok, Ems} = couch_emsort:open(Fd, [{root, State}]),
+    {ok, Ems} = couch_emsort:open(Fd, State),
     St#st{id_tree=Ems}.
 
 
@@ -653,6 +674,7 @@ merge_docids(Iter, #merge_st{locs=Locs}=Acc) when length(Locs) > 1000 ->
         rem_seqs=[],
         locs=[]
     },
+    update_compact_task(length(Locs)),
     merge_docids(Iter, Acc1);
 merge_docids(Iter, #merge_st{curr=Curr}=Acc) ->
     case next_info(Iter, Curr, []) of
diff --git a/src/couch/src/couch_emsort.erl b/src/couch/src/couch_emsort.erl
index 2a25a23..430d94e 100644
--- a/src/couch/src/couch_emsort.erl
+++ b/src/couch/src/couch_emsort.erl
@@ -130,17 +130,22 @@
 %
 
 -export([open/1, open/2, get_fd/1, get_state/1]).
--export([add/2, merge/1, sort/1, iter/1, next/1]).
-
+-export([add/2, merge/1, merge/2, sort/1, iter/1, next/1]).
+-export([num_kvs/1, num_merges/1]).
 
 -record(ems, {
     fd,
     root,
     bb_chunk = 10,
-    chain_chunk = 100
+    chain_chunk = 100,
+    num_kvs = 0,
+    num_bb = 0
 }).
 
 
+-define(REPORT_INTERVAL, 1000).
+
+
 open(Fd) ->
     {ok, #ems{fd=Fd}}.
 
@@ -156,22 +161,39 @@ set_options(Ems, [{root, Root} | Rest]) ->
 set_options(Ems, [{chain_chunk, Count} | Rest]) when is_integer(Count) ->
     set_options(Ems#ems{chain_chunk=Count}, Rest);
 set_options(Ems, [{back_bone_chunk, Count} | Rest]) when is_integer(Count) ->
-    set_options(Ems#ems{bb_chunk=Count}, Rest).
+    set_options(Ems#ems{bb_chunk=Count}, Rest);
+set_options(Ems, [{num_kvs, NumKVs} | Rest]) when is_integer(NumKVs) ->
+    set_options(Ems#ems{num_kvs=NumKVs}, Rest);
+set_options(Ems, [{num_bb, NumBB} | Rest]) when is_integer(NumBB) ->
+    set_options(Ems#ems{num_bb=NumBB}, Rest).
 
 
 get_fd(#ems{fd=Fd}) ->
     Fd.
 
 
-get_state(#ems{root=Root}) ->
-    Root.
+get_state(#ems{} = Ems) ->
+    #ems{
+        root = Root,
+        num_kvs = NumKVs,
+        num_bb = NumBB
+    } = Ems,
+    [
+        {root, Root},
+        {num_kvs, NumKVs},
+        {num_bb, NumBB}
+    ].
 
 
 add(Ems, []) ->
     {ok, Ems};
 add(Ems, KVs) ->
     Pos = write_kvs(Ems, KVs),
-    {ok, add_bb_pos(Ems, Pos)}.
+    NewEms = add_bb_pos(Ems, Pos),
+    {ok, NewEms#ems{
+        num_kvs = Ems#ems.num_kvs + length(KVs),
+        num_bb = Ems#ems.num_bb + 1
+    }}.
 
 
 sort(#ems{}=Ems) ->
@@ -179,10 +201,14 @@ sort(#ems{}=Ems) ->
     iter(Ems1).
 
 
-merge(#ems{root=undefined}=Ems) ->
+merge(Ems) ->
+    merge(Ems, fun(_) -> ok end).
+
+
+merge(#ems{root=undefined}=Ems, _Reporter) ->
     {ok, Ems};
-merge(#ems{}=Ems) ->
-    {ok, decimate(Ems)}.
+merge(#ems{}=Ems, Reporter) ->
+    {ok, decimate(Ems, Reporter)}.
 
 
 iter(#ems{root=undefined}=Ems) ->
@@ -201,6 +227,13 @@ next({Ems, Chains}) ->
     {ok, KV, {Ems, RestChains}}.
 
 
+num_kvs(#ems{num_kvs=NumKVs}) ->
+    NumKVs.
+
+num_merges(#ems{bb_chunk=BBChunk, num_bb=NumBB}) ->
+    num_merges(BBChunk, NumBB).
+
+
 add_bb_pos(#ems{root=undefined}=Ems, Pos) ->
     Ems#ems{root={[Pos], nil}};
 add_bb_pos(#ems{root={BB, Prev}}=Ems, Pos) ->
@@ -220,11 +253,11 @@ write_kvs(Ems, KVs) ->
     Final.
 
 
-decimate(#ems{root={_BB, nil}}=Ems) ->
+decimate(#ems{root={_BB, nil}}=Ems, _Reporter) ->
     % We have less than bb_chunk backbone pointers so we're
     % good to start streaming KV's back to the client.
     Ems;
-decimate(#ems{root={BB, NextBB}}=Ems) ->
+decimate(#ems{root={BB, NextBB}}=Ems, Reporter) ->
     % To make sure we have a bounded amount of data in RAM
     % at any given point we first need to decimate the data
     % by performing the first couple iterations of a merge
@@ -232,43 +265,51 @@ decimate(#ems{root={BB, NextBB}}=Ems) ->
 
     % The first pass gives us a sort with pointers linked from
     % largest to smallest.
-    {RevBB, RevNextBB} = merge_back_bone(Ems, small, BB, NextBB),
+    {RevBB, RevNextBB} = merge_back_bone(Ems, small, BB, NextBB, Reporter),
 
     % We have to run a second pass so that links are pointed
     % back from smallest to largest.
-    {FwdBB, FwdNextBB} = merge_back_bone(Ems, big, RevBB, RevNextBB),
+    {FwdBB, FwdNextBB} = merge_back_bone(Ems, big, RevBB, RevNextBB, Reporter),
 
     % Continue deicmating until we have an acceptable bound on
     % the number of keys to use.
-    decimate(Ems#ems{root={FwdBB, FwdNextBB}}).
+    decimate(Ems#ems{root={FwdBB, FwdNextBB}}, Reporter).
 
 
-merge_back_bone(Ems, Choose, BB, NextBB) ->
-    BBPos = merge_chains(Ems, Choose, BB),
-    merge_rest_back_bone(Ems, Choose, NextBB, {[BBPos], nil}).
+merge_back_bone(Ems, Choose, BB, NextBB, Reporter) ->
+    BBPos = merge_chains(Ems, Choose, BB, Reporter),
+    Reporter(length(BB)),
+    merge_rest_back_bone(Ems, Choose, NextBB, {[BBPos], nil}, Reporter).
 
 
-merge_rest_back_bone(_Ems, _Choose, nil, Acc) ->
+merge_rest_back_bone(_Ems, _Choose, nil, Acc, _Reporter) ->
     Acc;
-merge_rest_back_bone(Ems, Choose, BBPos, Acc) ->
+merge_rest_back_bone(Ems, Choose, BBPos, Acc, Reporter) ->
     {ok, {BB, NextBB}} = couch_file:pread_term(Ems#ems.fd, BBPos),
-    NewPos = merge_chains(Ems, Choose, BB),
+    NewPos = merge_chains(Ems, Choose, BB, Reporter),
     {NewBB, NewPrev} = append_item(Ems, Acc, NewPos, Ems#ems.bb_chunk),
-    merge_rest_back_bone(Ems, Choose, NextBB, {NewBB, NewPrev}).
+    merge_rest_back_bone(Ems, Choose, NextBB, {NewBB, NewPrev}, Reporter).
 
 
-merge_chains(Ems, Choose, BB) ->
+merge_chains(Ems, Choose, BB, Reporter) ->
     Chains = init_chains(Ems, Choose, BB),
-    merge_chains(Ems, Choose, Chains, {[], nil}).
+    merge_chains(Ems, Choose, Chains, {[], nil}, Reporter, 0).
 
 
-merge_chains(Ems, _Choose, [], ChainAcc) ->
+merge_chains(Ems, _Choose, [], ChainAcc, _Reporter, _Count) ->
     {ok, CPos, _} = couch_file:append_term(Ems#ems.fd, ChainAcc),
     CPos;
-merge_chains(#ems{chain_chunk=CC}=Ems, Choose, Chains, Acc) ->
+merge_chains(#ems{chain_chunk=CC}=Ems, Choose, Chains, Acc, Reporter, Count0) ->
     {KV, RestChains} = choose_kv(Choose, Ems, Chains),
     {NewKVs, NewPrev} = append_item(Ems, Acc, KV, CC),
-    merge_chains(Ems, Choose, RestChains, {NewKVs, NewPrev}).
+    Count1 = case (Count0 + 1) rem ?REPORT_INTERVAL of
+        0 ->
+            Reporter(Count0),
+            0;
+        _ ->
+            Count0 + 1
+    end,
+    merge_chains(Ems, Choose, RestChains, {NewKVs, NewPrev}, Reporter, Count1).
 
 
 init_chains(Ems, Choose, BB) ->
@@ -316,3 +357,10 @@ append_item(Ems, {List, Prev}, Pos, Size) when length(List) >= Size ->
 append_item(_Ems, {List, Prev}, Pos, _Size) ->
     {[Pos | List], Prev}.
 
+
+num_merges(BBChunk, NumBB) when NumBB =< BBChunk ->
+    0;
+num_merges(BBChunk, NumBB) when NumBB > BBChunk ->
+    RevNumBB = ceil(NumBB / BBChunk),
+    FwdNumBB = ceil(RevNumBB / BBChunk),
+    2 + num_merges(BBChunk, FwdNumBB).


[couchdb] 03/06: Add multi-append functions to couch_file

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch 3.x
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit dabd4e244b7807ce71e9f6905d184d4a8e135410
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Sep 6 13:36:16 2017 -0500

    Add multi-append functions to couch_file
    
    These functions allow the caller to append multiple terms or binaries to
    a file and receive the file position and size for each individual
    element. This is to optimize throughput in situations where we want to
    write multiple pieces of independant data.
---
 src/couch/src/couch_file.erl | 162 +++++++++++++++++++++++++++++++++++--------
 1 file changed, 135 insertions(+), 27 deletions(-)

diff --git a/src/couch/src/couch_file.erl b/src/couch/src/couch_file.erl
index 6db23ea..b1e3555 100644
--- a/src/couch/src/couch_file.erl
+++ b/src/couch/src/couch_file.erl
@@ -42,6 +42,8 @@
 -export([append_binary/2, append_binary_md5/2]).
 -export([append_raw_chunk/2, assemble_file_chunk/1, assemble_file_chunk/2]).
 -export([append_term/2, append_term/3, append_term_md5/2, append_term_md5/3]).
+-export([pread_terms/2, pread_binaries/2, pread_iolists/2]).
+-export([append_terms/2, append_terms/3, append_binaries/2]).
 -export([write_header/2, read_header/1]).
 -export([delete/2, delete/3, nuke_dir/2, init_delete_dir/1]).
 -export([last_read/1]).
@@ -119,6 +121,7 @@ append_term_md5(Fd, Term, Options) ->
     Comp = couch_util:get_value(compression, Options, ?DEFAULT_COMPRESSION),
     append_binary_md5(Fd, couch_compress:compress(Term, Comp)).
 
+
 %%----------------------------------------------------------------------
 %% Purpose: To append an Erlang binary to the end of the file.
 %% Args:    Erlang term to serialize and append to the file.
@@ -129,7 +132,7 @@ append_term_md5(Fd, Term, Options) ->
 
 append_binary(Fd, Bin) ->
     ioq:call(Fd, {append_bin, assemble_file_chunk(Bin)}, erlang:get(io_priority)).
-    
+
 append_binary_md5(Fd, Bin) ->
     ioq:call(Fd,
         {append_bin, assemble_file_chunk(Bin, couch_hash:md5_hash(Bin))},
@@ -172,21 +175,55 @@ pread_binary(Fd, Pos) ->
 
 pread_iolist(Fd, Pos) ->
     case ioq:call(Fd, {pread_iolist, Pos}, erlang:get(io_priority)) of
-    {ok, IoList, <<>>} ->
-        {ok, IoList};
-    {ok, IoList, Md5} ->
-        case couch_hash:md5_hash(IoList) of
-        Md5 ->
-            {ok, IoList};
-        _ ->
-            couch_log:emergency("File corruption in ~p at position ~B",
-                     [Fd, Pos]),
-            exit({file_corruption, <<"file corruption">>})
-        end;
-    Error ->
-        Error
+        {ok, IoList, Md5} ->
+            {ok, verify_md5(Fd, Pos, IoList, Md5)};
+        Error ->
+            Error
+    end.
+
+
+pread_terms(Fd, PosList) ->
+    {ok, Bins} = pread_binaries(Fd, PosList),
+    Terms = lists:map(fun(Bin) ->
+        couch_compress:decompress(Bin)
+    end, Bins),
+    {ok, Terms}.
+
+
+pread_binaries(Fd, PosList) ->
+    {ok, Data} = pread_iolists(Fd, PosList),
+    {ok, lists:map(fun erlang:iolist_to_binary/1, Data)}.
+
+
+pread_iolists(Fd, PosList) ->
+    case ioq:call(Fd, {pread_iolists, PosList}, erlang:get(io_priority)) of
+        {ok, DataMd5s} ->
+            Data = lists:zipwith(fun(Pos, {IoList, Md5}) ->
+                verify_md5(Fd, Pos, IoList, Md5)
+            end, PosList, DataMd5s),
+            {ok, Data};
+        Error ->
+            Error
     end.
 
+
+append_terms(Fd, Terms) ->
+    append_terms(Fd, Terms, []).
+
+
+append_terms(Fd, Terms, Options) ->
+    Comp = couch_util:get_value(compression, Options, ?DEFAULT_COMPRESSION),
+    Bins = lists:map(fun(Term) ->
+        couch_compress:compress(Term, Comp)
+    end, Terms),
+    append_binaries(Fd, Bins).
+
+
+append_binaries(Fd, Bins) ->
+    WriteBins = lists:map(fun assemble_file_chunk/1, Bins),
+    ioq:call(Fd, {append_bins, WriteBins}, erlang:get(io_priority)).
+
+
 %%----------------------------------------------------------------------
 %% Purpose: The length of a file, in bytes.
 %% Returns: {ok, Bytes}
@@ -464,6 +501,30 @@ handle_call({pread_iolist, Pos}, _From, File) ->
         {reply, {ok, Iolist, <<>>}, File}
     end;
 
+handle_call({pread_iolists, PosL}, _From, File) ->
+    update_read_timestamp(),
+    LocNums1 = [{Pos, 4} || Pos <- PosL],
+    DataSizes = read_multi_raw_iolists_int(File, LocNums1),
+    LocNums2 = lists:map(fun({LenIoList, NextPos}) ->
+        case iolist_to_binary(LenIoList) of
+            <<1:1/integer, Len:31/integer>> -> % an MD5-prefixed term
+                {NextPos, Len + 16};
+            <<0:1/integer, Len:31/integer>> ->
+                {NextPos, Len}
+        end
+    end, DataSizes),
+    Resps = read_multi_raw_iolists_int(File, LocNums2),
+    Extracted = lists:zipwith(fun({LenIoList, _}, {IoList, _}) ->
+        case iolist_to_binary(LenIoList) of
+            <<1:1/integer, _:31/integer>> ->
+                {Md5, IoList} = extract_md5(IoList),
+                {IoList, Md5};
+            <<0:1/integer, _:31/integer>> ->
+                {IoList, <<>>}
+        end
+    end, DataSizes, Resps),
+    {reply, {ok, Extracted}, File};
+
 handle_call(bytes, _From, #file{fd = Fd} = File) ->
     {reply, file:position(Fd, eof), File};
 
@@ -506,6 +567,20 @@ handle_call({append_bin, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
         {reply, Error, reset_eof(File)}
     end;
 
+handle_call({append_bins, Bins}, _From, #file{fd = Fd, eof = Pos} = File) ->
+    {BlockResps, FinalPos} = lists:mapfoldl(fun(Bin, PosAcc) ->
+        Blocks = make_blocks(PosAcc rem ?SIZE_BLOCK, Bin),
+        Size = iolist_size(Blocks),
+        {{Blocks, {PosAcc, Size}}, PosAcc + Size}
+    end, Pos, Bins),
+    {AllBlocks, Resps} = lists:unzip(BlockResps),
+    case file:write(Fd, AllBlocks) of
+    ok ->
+        {reply, {ok, Resps}, File#file{eof = FinalPos}};
+    Error ->
+        {reply, Error, reset_eof(File)}
+    end;
+
 handle_call({write_header, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
     BinSize = byte_size(Bin),
     case Pos rem ?SIZE_BLOCK of
@@ -634,23 +709,40 @@ find_newest_header(Fd, [{Location, Size} | LocationSizes]) ->
     {Data::iolist(), CurPos::non_neg_integer()}.
 read_raw_iolist_int(Fd, {Pos, _Size}, Len) -> % 0110 UPGRADE CODE
     read_raw_iolist_int(Fd, Pos, Len);
-read_raw_iolist_int(#file{fd = Fd, pread_limit = Limit} = F, Pos, Len) ->
+read_raw_iolist_int(#file{fd = Fd} = File, Pos, Len) ->
+    {Pos, TotalBytes} = get_pread_locnum(File, Pos, Len),
+    {ok, <<RawBin:TotalBytes/binary>>} = file:pread(Fd, Pos, TotalBytes),
+    {remove_block_prefixes(Pos rem ?SIZE_BLOCK, RawBin), Pos + TotalBytes}.
+
+
+read_multi_raw_iolists_int(#file{fd = Fd} = File, PosLens) ->
+    LocNums = lists:map(fun({Pos, Len}) ->
+        get_pread_locnum(File, Pos, Len)
+    end, PosLens),
+    {ok, Bins} = file:pread(Fd, LocNums),
+    lists:zipwith(fun({Pos, TotalBytes}, Bin) ->
+        <<RawBin:TotalBytes/binary>> = Bin,
+        {remove_block_prefixes(Pos rem ?SIZE_BLOCK, RawBin), Pos + TotalBytes}
+    end, LocNums, Bins).
+
+
+get_pread_locnum(File, Pos, Len) ->
     BlockOffset = Pos rem ?SIZE_BLOCK,
     TotalBytes = calculate_total_read_len(BlockOffset, Len),
-    if
-        (Pos + TotalBytes) > F#file.eof ->
-            couch_stats:increment_counter([pread, exceed_eof]),
-            {_Fd, Filepath} = get(couch_file_fd),
-            throw({read_beyond_eof, Filepath});
-        TotalBytes > Limit ->
-            couch_stats:increment_counter([pread, exceed_limit]),
-            {_Fd, Filepath} = get(couch_file_fd),
-            throw({exceed_pread_limit, Filepath, Limit});
-        true ->
-            {ok, <<RawBin:TotalBytes/binary>>} = file:pread(Fd, Pos, TotalBytes),
-            {remove_block_prefixes(BlockOffset, RawBin), Pos + TotalBytes}
+    case Pos + TotalBytes of
+    Size when Size > File#file.eof ->
+        couch_stats:increment_counter([pread, exceed_eof]),
+        {_Fd, Filepath} = get(couch_file_fd),
+        throw({read_beyond_eof, Filepath});
+    Size when Size > File#file.pread_limit ->
+        couch_stats:increment_counter([pread, exceed_limit]),
+        {_Fd, Filepath} = get(couch_file_fd),
+        throw({exceed_pread_limit, Filepath, File#file.pread_limit});
+    _ ->
+        {Pos, TotalBytes}
     end.
 
+
 -spec extract_md5(iolist()) -> {binary(), iolist()}.
 extract_md5(FullIoList) ->
     {Md5List, IoList} = split_iolist(FullIoList, 16, []),
@@ -722,6 +814,22 @@ monitored_by_pids() ->
     {monitored_by, PidsAndRefs} = process_info(self(), monitored_by),
     lists:filter(fun is_pid/1, PidsAndRefs).
 
+
+verify_md5(_Fd, _Pos, IoList, <<>>) ->
+    IoList;
+
+verify_md5(Fd, Pos, IoList, Md5) ->
+    case couch_hash:md5_hash(IoList) of
+        Md5 -> IoList;
+        _ -> report_md5_error(Fd, Pos)
+    end.
+
+
+report_md5_error(Fd, Pos) ->
+    couch_log:emergency("File corruption in ~p at position ~B", [Fd, Pos]),
+    exit({file_corruption, <<"file corruption">>}).
+
+
 % System dbs aren't monitored by couch_stats_process_tracker
 is_idle(#file{is_sys=true}) ->
     case monitored_by_pids() of


[couchdb] 05/06: Optimize compactor to use multi-IO API

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch 3.x
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 1aef628628f52f1fc12729fcdb326b7897147257
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Fri Sep 8 09:40:52 2017 -0500

    Optimize compactor to use multi-IO API
    
    This updates couch_db_updater to use the new multi-IO API functions
    (append_terms/pread_terms) in couch_file. This optimization benefits us
    by no longer requiring the `couch_emsort:merge/1` step to copy
    `#full_doc_info{}` records multiple times while also not being penalized
    by signficantly increasing the number of calls through couch_file APIs.
---
 src/couch/src/couch_bt_engine_compactor.erl | 54 ++++++++++++++++++-----------
 1 file changed, 33 insertions(+), 21 deletions(-)

diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl
index 50b3981..92ccea5 100644
--- a/src/couch/src/couch_bt_engine_compactor.erl
+++ b/src/couch/src/couch_bt_engine_compactor.erl
@@ -36,11 +36,12 @@
 }).
 
 -record(merge_st, {
+    src_fd,
     id_tree,
     seq_tree,
     curr,
     rem_seqs,
-    infos
+    locs
 }).
 
 
@@ -441,10 +442,16 @@ copy_docs(St, #st{} = NewSt, MixedInfos, Retry) ->
     {ok, SeqTree} = couch_btree:add_remove(
             NewSt#st.seq_tree, NewInfos, RemoveSeqs),
 
-    FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) ->
-        {{Id, Seq}, FDI}
-    end, NewInfos),
-    {ok, IdEms} = couch_emsort:add(NewSt#st.id_tree, FDIKVs),
+    EMSortFd = couch_emsort:get_fd(NewSt#st.id_tree),
+    {ok, LocSizes} = couch_file:append_terms(EMSortFd, NewInfos),
+    EMSortEntries = lists:zipwith(fun(FDI, {Loc, _}) ->
+        #full_doc_info{
+            id = Id,
+            update_seq = Seq
+        } = FDI,
+        {{Id, Seq}, Loc}
+    end, NewInfos, LocSizes),
+    {ok, IdEms} = couch_emsort:add(NewSt#st.id_tree, EMSortEntries),
     update_compact_task(length(NewInfos)),
     NewSt#st{id_tree=IdEms, seq_tree=SeqTree}.
 
@@ -510,6 +517,7 @@ copy_meta_data(#comp_st{new_st = St} = CompSt) ->
         header = Header,
         id_tree = Src
     } = St,
+    SrcFd = couch_emsort:get_fd(Src),
     DstState = couch_bt_engine_header:id_tree_state(Header),
     {ok, IdTree0} = couch_btree:open(DstState, Fd, [
         {split, fun couch_bt_engine:id_tree_split/1},
@@ -518,14 +526,16 @@ copy_meta_data(#comp_st{new_st = St} = CompSt) ->
     ]),
     {ok, Iter} = couch_emsort:iter(Src),
     Acc0 = #merge_st{
+        src_fd=SrcFd,
         id_tree=IdTree0,
         seq_tree=St#st.seq_tree,
         rem_seqs=[],
-        infos=[]
+        locs=[]
     },
     ?COMP_EVENT(md_copy_init),
     Acc = merge_docids(Iter, Acc0),
-    {ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Acc#merge_st.infos),
+    {ok, Infos} = couch_file:pread_terms(SrcFd, Acc#merge_st.locs),
+    {ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Infos),
     {ok, SeqTree} = couch_btree:add_remove(
         Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
     ),
@@ -627,34 +637,36 @@ merge_lookups([FDI | RestInfos], Lookups) ->
     [FDI | merge_lookups(RestInfos, Lookups)].
 
 
-merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 ->
+merge_docids(Iter, #merge_st{locs=Locs}=Acc) when length(Locs) > 1000 ->
     #merge_st{
+        src_fd=SrcFd,
         id_tree=IdTree0,
         seq_tree=SeqTree0,
         rem_seqs=RemSeqs
     } = Acc,
+    {ok, Infos} = couch_file:pread_terms(SrcFd, Locs),
     {ok, IdTree1} = couch_btree:add(IdTree0, Infos),
     {ok, SeqTree1} = couch_btree:add_remove(SeqTree0, [], RemSeqs),
     Acc1 = Acc#merge_st{
         id_tree=IdTree1,
         seq_tree=SeqTree1,
         rem_seqs=[],
-        infos=[]
+        locs=[]
     },
     merge_docids(Iter, Acc1);
 merge_docids(Iter, #merge_st{curr=Curr}=Acc) ->
     case next_info(Iter, Curr, []) of
-        {NextIter, NewCurr, FDI, Seqs} ->
+        {NextIter, NewCurr, Loc, Seqs} ->
             Acc1 = Acc#merge_st{
-                infos = [FDI | Acc#merge_st.infos],
+                locs = [Loc | Acc#merge_st.locs],
                 rem_seqs = Seqs ++ Acc#merge_st.rem_seqs,
                 curr = NewCurr
             },
             ?COMP_EVENT(md_copy_row),
             merge_docids(NextIter, Acc1);
-        {finished, FDI, Seqs} ->
+        {finished, Loc, Seqs} ->
             Acc#merge_st{
-                infos = [FDI | Acc#merge_st.infos],
+                locs = [Loc | Acc#merge_st.locs],
                 rem_seqs = Seqs ++ Acc#merge_st.rem_seqs,
                 curr = undefined
             };
@@ -665,19 +677,19 @@ merge_docids(Iter, #merge_st{curr=Curr}=Acc) ->
 
 next_info(Iter, undefined, []) ->
     case couch_emsort:next(Iter) of
-        {ok, {{Id, Seq}, FDI}, NextIter} ->
-            next_info(NextIter, {Id, Seq, FDI}, []);
+        {ok, {{Id, Seq}, Loc}, NextIter} ->
+            next_info(NextIter, {Id, Seq, Loc}, []);
         finished ->
             empty
     end;
-next_info(Iter, {Id, Seq, FDI}, Seqs) ->
+next_info(Iter, {Id, Seq, Loc}, Seqs) ->
     case couch_emsort:next(Iter) of
-        {ok, {{Id, NSeq}, NFDI}, NextIter} ->
-            next_info(NextIter, {Id, NSeq, NFDI}, [Seq | Seqs]);
-        {ok, {{NId, NSeq}, NFDI}, NextIter} ->
-            {NextIter, {NId, NSeq, NFDI}, FDI, Seqs};
+        {ok, {{Id, NSeq}, NLoc}, NextIter} ->
+            next_info(NextIter, {Id, NSeq, NLoc}, [Seq | Seqs]);
+        {ok, {{NId, NSeq}, NLoc}, NextIter} ->
+            {NextIter, {NId, NSeq, NLoc}, Loc, Seqs};
         finished ->
-            {finished, FDI, Seqs}
+            {finished, Loc, Seqs}
     end.
 
 


[couchdb] 04/06: Optimize btree node writes

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch 3.x
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 2c9477ec87b9fff2672d1891797a994a29fe17f5
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Sep 6 16:26:03 2017 -0500

    Optimize btree node writes
    
    This uses the new couch_file:append_terms/2 function to write all chunks
    in a single write call.
---
 src/couch/src/couch_btree.erl | 28 +++++++++++++++-------------
 1 file changed, 15 insertions(+), 13 deletions(-)

diff --git a/src/couch/src/couch_btree.erl b/src/couch/src/couch_btree.erl
index ea0cf69..858ae2b 100644
--- a/src/couch/src/couch_btree.erl
+++ b/src/couch/src/couch_btree.erl
@@ -437,20 +437,22 @@ get_node(#btree{fd = Fd}, NodePos) ->
 
 write_node(#btree{fd = Fd, compression = Comp} = Bt, NodeType, NodeList) ->
     % split up nodes into smaller sizes
-    NodeListList = chunkify(NodeList),
+    Chunks = chunkify(NodeList),
     % now write out each chunk and return the KeyPointer pairs for those nodes
-    ResultList = [
-        begin
-            {ok, Pointer, Size} = couch_file:append_term(
-                Fd, {NodeType, ANodeList}, [{compression, Comp}]),
-            {LastKey, _} = lists:last(ANodeList),
-            SubTreeSize = reduce_tree_size(NodeType, Size, ANodeList),
-            {LastKey, {Pointer, reduce_node(Bt, NodeType, ANodeList), SubTreeSize}}
-        end
-    ||
-        ANodeList <- NodeListList
-    ],
-    {ok, ResultList}.
+    ToWrite = [{NodeType, Chunk} || Chunk <- Chunks],
+    WriteOpts = [{compression, Comp}],
+    {ok, PtrSizes} = couch_file:append_terms(Fd, ToWrite, WriteOpts),
+    {ok, group_kps(Bt, NodeType, Chunks, PtrSizes)}.
+
+
+group_kps(_Bt, _NodeType, [], []) ->
+    [];
+
+group_kps(Bt, NodeType, [Chunk | RestChunks], [{Ptr, Size} | RestPtrSizes]) ->
+    {LastKey, _} = lists:last(Chunk),
+    SubTreeSize = reduce_tree_size(NodeType, Size, Chunk),
+    KP = {LastKey, {Ptr, reduce_node(Bt, NodeType, Chunk), SubTreeSize}},
+    [KP | group_kps(Bt, NodeType, RestChunks, RestPtrSizes)].
 
 
 write_node(Bt, _OldNode, NodeType, [], NewList) ->


[couchdb] 01/06: Simplify compaction state management

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch 3.x
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit fbe5ba5b9cb6df984ce12e25d92eafe1af370fff
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Aug 16 12:17:40 2017 -0500

    Simplify compaction state management
    
    This change adds a new `#comp_st{}` record that is used to pass
    compaction state through the various compaction steps. There are zero
    changes to the existing compaction logic. This merely sets the stage for
    adding our docid copy optimization.
---
 src/couch/src/couch_bt_engine_compactor.erl | 165 +++++++++++++++++++++-------
 1 file changed, 123 insertions(+), 42 deletions(-)

diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl
index 0b3fb22..3979fcb 100644
--- a/src/couch/src/couch_bt_engine_compactor.erl
+++ b/src/couch/src/couch_bt_engine_compactor.erl
@@ -22,9 +22,17 @@
 -include("couch_bt_engine.hrl").
 
 
+-record(comp_st, {
+    db_name,
+    old_st,
+    new_st,
+    meta_fd,
+    retry
+}).
+
 -record(comp_header, {
     db_header,
-    meta_state
+    meta_st
 }).
 
 -record(merge_st, {
@@ -38,68 +46,106 @@
 
 start(#st{} = St, DbName, Options, Parent) ->
     erlang:put(io_priority, {db_compact, DbName}),
-    #st{
-        filepath = FilePath,
-        header = Header
-    } = St,
     couch_log:debug("Compaction process spawned for db \"~s\"", [DbName]),
 
     couch_db_engine:trigger_on_compact(DbName),
 
-    {ok, NewSt, DName, DFd, MFd, Retry} =
-            open_compaction_files(Header, FilePath, Options),
-    erlang:monitor(process, MFd),
+    {ok, InitCompSt} = open_compaction_files(DbName, St, Options),
+
+    Stages = [
+        fun copy_purge_info/1,
+        fun copy_compact/1,
+        fun commit_compaction_data/1,
+        fun sort_meta_data/1,
+        fun commit_compaction_data/1,
+        fun copy_meta_data/1,
+        fun compact_final_sync/1
+    ],
 
-    % This is a bit worrisome. init_db/4 will monitor the data fd
-    % but it doesn't know about the meta fd. For now I'll maintain
-    % that the data fd is the old normal fd and meta fd is special
-    % and hope everything works out for the best.
-    unlink(DFd),
+    FinalCompSt = lists:foldl(fun(Stage, CompSt) ->
+        Stage(CompSt)
+    end, InitCompSt, Stages),
 
-    NewSt1 = copy_purge_info(DbName, St, NewSt, Retry),
-    NewSt2 = copy_compact(DbName, St, NewSt1, Retry),
-    NewSt3 = sort_meta_data(NewSt2),
-    NewSt4 = commit_compaction_data(NewSt3),
-    NewSt5 = copy_meta_data(NewSt4),
-    {ok, NewSt6} = couch_bt_engine:commit_data(NewSt5),
-    ok = couch_bt_engine:decref(NewSt6),
-    ok = couch_file:close(MFd),
+    #comp_st{
+        new_st = FinalNewSt,
+        meta_fd = MetaFd
+    } = FinalCompSt,
 
-    % Done
-    gen_server:cast(Parent, {compact_done, couch_bt_engine, DName}).
+    ok = couch_bt_engine:decref(FinalNewSt),
+    ok = couch_file:close(MetaFd),
 
+    Msg = {compact_done, couch_bt_engine, FinalNewSt#st.filepath},
+    gen_server:cast(Parent, Msg).
 
-open_compaction_files(SrcHdr, DbFilePath, Options) ->
+
+open_compaction_files(DbName, OldSt, Options) ->
+    #st{
+        filepath = DbFilePath,
+        header = SrcHdr
+    } = OldSt,
     DataFile = DbFilePath ++ ".compact.data",
     MetaFile = DbFilePath ++ ".compact.meta",
     {ok, DataFd, DataHdr} = open_compaction_file(DataFile),
     {ok, MetaFd, MetaHdr} = open_compaction_file(MetaFile),
     DataHdrIsDbHdr = couch_bt_engine_header:is_header(DataHdr),
-    case {DataHdr, MetaHdr} of
+    CompSt = case {DataHdr, MetaHdr} of
         {#comp_header{}=A, #comp_header{}=A} ->
+            % We're restarting a compaction that did not finish
+            % before trying to swap out with the original db
             DbHeader = A#comp_header.db_header,
             St0 = couch_bt_engine:init_state(
                     DataFile, DataFd, DbHeader, Options),
-            St1 = bind_emsort(St0, MetaFd, A#comp_header.meta_state),
-            {ok, St1, DataFile, DataFd, MetaFd, St0#st.id_tree};
+            St1 = bind_emsort(St0, MetaFd, A#comp_header.meta_st),
+            #comp_st{
+                db_name = DbName,
+                old_st = OldSt,
+                new_st = St1,
+                meta_fd = MetaFd,
+                retry = St0#st.id_tree
+            };
         _ when DataHdrIsDbHdr ->
+            % We tried to swap out the compaction but there were
+            % writes to the database during compaction. Start
+            % a compaction retry.
             Header = couch_bt_engine_header:from(SrcHdr),
             ok = reset_compaction_file(MetaFd, Header),
             St0 = couch_bt_engine:init_state(
                     DataFile, DataFd, DataHdr, Options),
             St1 = bind_emsort(St0, MetaFd, nil),
-            {ok, St1, DataFile, DataFd, MetaFd, St0#st.id_tree};
+            #comp_st{
+                db_name = DbName,
+                old_st = OldSt,
+                new_st = St1,
+                meta_fd = MetaFd,
+                retry = St0#st.id_tree
+            };
         _ ->
+            % We're starting a compaction from scratch
             Header = couch_bt_engine_header:from(SrcHdr),
             ok = reset_compaction_file(DataFd, Header),
             ok = reset_compaction_file(MetaFd, Header),
             St0 = couch_bt_engine:init_state(DataFile, DataFd, Header, Options),
             St1 = bind_emsort(St0, MetaFd, nil),
-            {ok, St1, DataFile, DataFd, MetaFd, nil}
-    end.
-
-
-copy_purge_info(DbName, OldSt, NewSt, Retry) ->
+            #comp_st{
+                db_name = DbName,
+                old_st = OldSt,
+                new_st = St1,
+                meta_fd = MetaFd,
+                retry = nil
+            }
+    end,
+    unlink(DataFd),
+    erlang:monitor(process, MetaFd),
+    {ok, CompSt}.
+
+
+copy_purge_info(#comp_st{} = CompSt) ->
+    #comp_st{
+        db_name = DbName,
+        old_st = OldSt,
+        new_st = NewSt,
+        retry = Retry
+    } = CompSt,
     MinPurgeSeq = couch_util:with_db(DbName, fun(Db) ->
         couch_db:get_minimum_purge_seq(Db)
     end),
@@ -132,7 +178,11 @@ copy_purge_info(DbName, OldSt, NewSt, Retry) ->
     Opts = [{start_key, StartSeq}],
     {ok, _, FinalAcc} = couch_btree:fold(OldPSTree, EnumFun, InitAcc, Opts),
     {NewStAcc, Infos, _, _} = FinalAcc,
-    copy_purge_infos(OldSt, NewStAcc, Infos, MinPurgeSeq, Retry).
+    FinalNewSt = copy_purge_infos(OldSt, NewStAcc, Infos, MinPurgeSeq, Retry),
+
+    CompSt#comp_st{
+        new_st = FinalNewSt
+    }.
 
 
 copy_purge_infos(OldSt, NewSt0, Infos, MinPurgeSeq, Retry) ->
@@ -206,7 +256,14 @@ copy_purge_infos(OldSt, NewSt0, Infos, MinPurgeSeq, Retry) ->
     bind_emsort(NewSt4, MetaFd, MetaState).
 
 
-copy_compact(DbName, St, NewSt0, Retry) ->
+copy_compact(#comp_st{} = CompSt) ->
+    #comp_st{
+        db_name = DbName,
+        old_st = St,
+        new_st = NewSt0,
+        retry = Retry
+    } = CompSt,
+
     Compression = couch_compress:get_compression_method(),
     NewSt = NewSt0#st{compression = Compression},
     NewUpdateSeq = couch_bt_engine:get_update_seq(NewSt0),
@@ -282,7 +339,10 @@ copy_compact(DbName, St, NewSt0, Retry) ->
 
     FinalUpdateSeq = couch_bt_engine:get_update_seq(St),
     {ok, NewSt6} = couch_bt_engine:set_update_seq(NewSt5, FinalUpdateSeq),
-    commit_compaction_data(NewSt6).
+
+    CompSt#comp_st{
+        new_st = NewSt6
+    }.
 
 
 copy_docs(St, #st{} = NewSt, MixedInfos, Retry) ->
@@ -417,12 +477,16 @@ copy_doc_attachments(#st{} = SrcSt, SrcSp, DstSt) ->
     {BodyData, NewBinInfos}.
 
 
-sort_meta_data(St0) ->
+sort_meta_data(#comp_st{new_st = St0} = CompSt) ->
     {ok, Ems} = couch_emsort:merge(St0#st.id_tree),
-    St0#st{id_tree=Ems}.
+    CompSt#comp_st{
+        new_st = St0#st{
+            id_tree = Ems
+        }
+    }.
 
 
-copy_meta_data(#st{} = St) ->
+copy_meta_data(#comp_st{new_st = St} = CompSt) ->
     #st{
         fd = Fd,
         header = Header,
@@ -446,7 +510,19 @@ copy_meta_data(#st{} = St) ->
     {ok, SeqTree} = couch_btree:add_remove(
         Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
     ),
-    St#st{id_tree=IdTree, seq_tree=SeqTree}.
+    CompSt#comp_st{
+        new_st = St#st{
+            id_tree = IdTree,
+            seq_tree = SeqTree
+        }
+    }.
+
+
+compact_final_sync(#comp_st{new_st = St0} = CompSt) ->
+    {ok, St1} = couch_bt_engine:commit_data(St0),
+    CompSt#comp_st{
+        new_st = St1
+    }.
 
 
 open_compaction_file(FilePath) ->
@@ -467,10 +543,15 @@ reset_compaction_file(Fd, Header) ->
     ok = couch_file:write_header(Fd, Header).
 
 
-commit_compaction_data(#st{}=St) ->
+commit_compaction_data(#comp_st{new_st = St} = CompSt) ->
     % Compaction needs to write headers to both the data file
     % and the meta file so if we need to restart we can pick
     % back up from where we left off.
+    CompSt#comp_st{
+        new_st = commit_compaction_data(St)
+    };
+
+commit_compaction_data(#st{} = St) ->
     commit_compaction_data(St, couch_emsort:get_fd(St#st.id_tree)),
     commit_compaction_data(St, St#st.fd).
 
@@ -483,7 +564,7 @@ commit_compaction_data(#st{header = OldHeader} = St0, Fd) ->
     Header = couch_bt_engine:update_header(St1, St1#st.header),
     CompHeader = #comp_header{
         db_header = Header,
-        meta_state = MetaState
+        meta_st = MetaState
     },
     ok = couch_file:sync(Fd),
     ok = couch_file:write_header(Fd, CompHeader),