You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2020/04/14 16:26:30 UTC

[couchdb] 06/06: Update compaction progress during docid phases

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch 3.x
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 123bf82370c21a8b5458299a7e36c477a0fedca4
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Fri Apr 3 10:08:07 2020 -0500

    Update compaction progress during docid phases
    
    Previously the sort and copy phases when handling document IDs was not
    measured in _active_tasks. This adds size tracking to allow operators a
    way to measure progress during those phases.
    
    I'd like to thank Vitaly for the example in #1006 that showed a clean
    way for tracking the size info in `couch_emsort`.
    
    Co-Authored-By: Vitaly Goot <vi...@gmail.com>
---
 src/couch/src/couch_bt_engine_compactor.erl |  26 ++++++-
 src/couch/src/couch_emsort.erl              | 102 ++++++++++++++++++++--------
 2 files changed, 99 insertions(+), 29 deletions(-)

diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl
index 92ccea5..4bed49c 100644
--- a/src/couch/src/couch_bt_engine_compactor.erl
+++ b/src/couch/src/couch_bt_engine_compactor.erl
@@ -318,6 +318,7 @@ copy_compact(#comp_st{} = CompSt) ->
     TaskProps0 = [
         {type, database_compaction},
         {database, DbName},
+        {phase, document_copy},
         {progress, 0},
         {changes_done, 0},
         {total_changes, TotalChanges}
@@ -326,6 +327,7 @@ copy_compact(#comp_st{} = CompSt) ->
     true ->
         couch_task_status:update([
             {retry, true},
+            {phase, document_copy},
             {progress, 0},
             {changes_done, 0},
             {total_changes, TotalChanges}
@@ -502,7 +504,16 @@ copy_doc_attachments(#st{} = SrcSt, SrcSp, DstSt) ->
 
 sort_meta_data(#comp_st{new_st = St0} = CompSt) ->
     ?COMP_EVENT(md_sort_init),
-    {ok, Ems} = couch_emsort:merge(St0#st.id_tree),
+    NumKVs = couch_emsort:num_kvs(St0#st.id_tree),
+    NumMerges = couch_emsort:num_merges(St0#st.id_tree),
+    couch_task_status:update([
+        {phase, docid_sort},
+        {progress, 0},
+        {changes_done, 0},
+        {total_changes, NumMerges * NumKVs}
+    ]),
+    Reporter = fun update_compact_task/1,
+    {ok, Ems} = couch_emsort:merge(St0#st.id_tree, Reporter),
     ?COMP_EVENT(md_sort_done),
     CompSt#comp_st{
         new_st = St0#st{
@@ -533,12 +544,20 @@ copy_meta_data(#comp_st{new_st = St} = CompSt) ->
         locs=[]
     },
     ?COMP_EVENT(md_copy_init),
+    NumKVs = couch_emsort:num_kvs(Src),
+    couch_task_status:update([
+        {phase, docid_copy},
+        {progress, 0},
+        {changes_done, 0},
+        {total_changes, NumKVs}
+    ]),
     Acc = merge_docids(Iter, Acc0),
     {ok, Infos} = couch_file:pread_terms(SrcFd, Acc#merge_st.locs),
     {ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Infos),
     {ok, SeqTree} = couch_btree:add_remove(
         Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
     ),
+    update_compact_task(NumKVs),
     ?COMP_EVENT(md_copy_done),
     CompSt#comp_st{
         new_st = St#st{
@@ -609,8 +628,10 @@ commit_compaction_data(#st{header = OldHeader} = St0, Fd) ->
 bind_emsort(St, Fd, nil) ->
     {ok, Ems} = couch_emsort:open(Fd),
     St#st{id_tree=Ems};
+bind_emsort(St, Fd, State) when is_integer(State) ->
+    bind_emsort(St, Fd, [{root, State}]);
 bind_emsort(St, Fd, State) ->
-    {ok, Ems} = couch_emsort:open(Fd, [{root, State}]),
+    {ok, Ems} = couch_emsort:open(Fd, State),
     St#st{id_tree=Ems}.
 
 
@@ -653,6 +674,7 @@ merge_docids(Iter, #merge_st{locs=Locs}=Acc) when length(Locs) > 1000 ->
         rem_seqs=[],
         locs=[]
     },
+    update_compact_task(length(Locs)),
     merge_docids(Iter, Acc1);
 merge_docids(Iter, #merge_st{curr=Curr}=Acc) ->
     case next_info(Iter, Curr, []) of
diff --git a/src/couch/src/couch_emsort.erl b/src/couch/src/couch_emsort.erl
index 2a25a23..430d94e 100644
--- a/src/couch/src/couch_emsort.erl
+++ b/src/couch/src/couch_emsort.erl
@@ -130,17 +130,22 @@
 %
 
 -export([open/1, open/2, get_fd/1, get_state/1]).
--export([add/2, merge/1, sort/1, iter/1, next/1]).
-
+-export([add/2, merge/1, merge/2, sort/1, iter/1, next/1]).
+-export([num_kvs/1, num_merges/1]).
 
 -record(ems, {
     fd,
     root,
     bb_chunk = 10,
-    chain_chunk = 100
+    chain_chunk = 100,
+    num_kvs = 0,
+    num_bb = 0
 }).
 
 
+-define(REPORT_INTERVAL, 1000).
+
+
 open(Fd) ->
     {ok, #ems{fd=Fd}}.
 
@@ -156,22 +161,39 @@ set_options(Ems, [{root, Root} | Rest]) ->
 set_options(Ems, [{chain_chunk, Count} | Rest]) when is_integer(Count) ->
     set_options(Ems#ems{chain_chunk=Count}, Rest);
 set_options(Ems, [{back_bone_chunk, Count} | Rest]) when is_integer(Count) ->
-    set_options(Ems#ems{bb_chunk=Count}, Rest).
+    set_options(Ems#ems{bb_chunk=Count}, Rest);
+set_options(Ems, [{num_kvs, NumKVs} | Rest]) when is_integer(NumKVs) ->
+    set_options(Ems#ems{num_kvs=NumKVs}, Rest);
+set_options(Ems, [{num_bb, NumBB} | Rest]) when is_integer(NumBB) ->
+    set_options(Ems#ems{num_bb=NumBB}, Rest).
 
 
 get_fd(#ems{fd=Fd}) ->
     Fd.
 
 
-get_state(#ems{root=Root}) ->
-    Root.
+get_state(#ems{} = Ems) ->
+    #ems{
+        root = Root,
+        num_kvs = NumKVs,
+        num_bb = NumBB
+    } = Ems,
+    [
+        {root, Root},
+        {num_kvs, NumKVs},
+        {num_bb, NumBB}
+    ].
 
 
 add(Ems, []) ->
     {ok, Ems};
 add(Ems, KVs) ->
     Pos = write_kvs(Ems, KVs),
-    {ok, add_bb_pos(Ems, Pos)}.
+    NewEms = add_bb_pos(Ems, Pos),
+    {ok, NewEms#ems{
+        num_kvs = Ems#ems.num_kvs + length(KVs),
+        num_bb = Ems#ems.num_bb + 1
+    }}.
 
 
 sort(#ems{}=Ems) ->
@@ -179,10 +201,14 @@ sort(#ems{}=Ems) ->
     iter(Ems1).
 
 
-merge(#ems{root=undefined}=Ems) ->
+merge(Ems) ->
+    merge(Ems, fun(_) -> ok end).
+
+
+merge(#ems{root=undefined}=Ems, _Reporter) ->
     {ok, Ems};
-merge(#ems{}=Ems) ->
-    {ok, decimate(Ems)}.
+merge(#ems{}=Ems, Reporter) ->
+    {ok, decimate(Ems, Reporter)}.
 
 
 iter(#ems{root=undefined}=Ems) ->
@@ -201,6 +227,13 @@ next({Ems, Chains}) ->
     {ok, KV, {Ems, RestChains}}.
 
 
+num_kvs(#ems{num_kvs=NumKVs}) ->
+    NumKVs.
+
+num_merges(#ems{bb_chunk=BBChunk, num_bb=NumBB}) ->
+    num_merges(BBChunk, NumBB).
+
+
 add_bb_pos(#ems{root=undefined}=Ems, Pos) ->
     Ems#ems{root={[Pos], nil}};
 add_bb_pos(#ems{root={BB, Prev}}=Ems, Pos) ->
@@ -220,11 +253,11 @@ write_kvs(Ems, KVs) ->
     Final.
 
 
-decimate(#ems{root={_BB, nil}}=Ems) ->
+decimate(#ems{root={_BB, nil}}=Ems, _Reporter) ->
     % We have less than bb_chunk backbone pointers so we're
     % good to start streaming KV's back to the client.
     Ems;
-decimate(#ems{root={BB, NextBB}}=Ems) ->
+decimate(#ems{root={BB, NextBB}}=Ems, Reporter) ->
     % To make sure we have a bounded amount of data in RAM
     % at any given point we first need to decimate the data
     % by performing the first couple iterations of a merge
@@ -232,43 +265,51 @@ decimate(#ems{root={BB, NextBB}}=Ems) ->
 
     % The first pass gives us a sort with pointers linked from
     % largest to smallest.
-    {RevBB, RevNextBB} = merge_back_bone(Ems, small, BB, NextBB),
+    {RevBB, RevNextBB} = merge_back_bone(Ems, small, BB, NextBB, Reporter),
 
     % We have to run a second pass so that links are pointed
     % back from smallest to largest.
-    {FwdBB, FwdNextBB} = merge_back_bone(Ems, big, RevBB, RevNextBB),
+    {FwdBB, FwdNextBB} = merge_back_bone(Ems, big, RevBB, RevNextBB, Reporter),
 
     % Continue deicmating until we have an acceptable bound on
     % the number of keys to use.
-    decimate(Ems#ems{root={FwdBB, FwdNextBB}}).
+    decimate(Ems#ems{root={FwdBB, FwdNextBB}}, Reporter).
 
 
-merge_back_bone(Ems, Choose, BB, NextBB) ->
-    BBPos = merge_chains(Ems, Choose, BB),
-    merge_rest_back_bone(Ems, Choose, NextBB, {[BBPos], nil}).
+merge_back_bone(Ems, Choose, BB, NextBB, Reporter) ->
+    BBPos = merge_chains(Ems, Choose, BB, Reporter),
+    Reporter(length(BB)),
+    merge_rest_back_bone(Ems, Choose, NextBB, {[BBPos], nil}, Reporter).
 
 
-merge_rest_back_bone(_Ems, _Choose, nil, Acc) ->
+merge_rest_back_bone(_Ems, _Choose, nil, Acc, _Reporter) ->
     Acc;
-merge_rest_back_bone(Ems, Choose, BBPos, Acc) ->
+merge_rest_back_bone(Ems, Choose, BBPos, Acc, Reporter) ->
     {ok, {BB, NextBB}} = couch_file:pread_term(Ems#ems.fd, BBPos),
-    NewPos = merge_chains(Ems, Choose, BB),
+    NewPos = merge_chains(Ems, Choose, BB, Reporter),
     {NewBB, NewPrev} = append_item(Ems, Acc, NewPos, Ems#ems.bb_chunk),
-    merge_rest_back_bone(Ems, Choose, NextBB, {NewBB, NewPrev}).
+    merge_rest_back_bone(Ems, Choose, NextBB, {NewBB, NewPrev}, Reporter).
 
 
-merge_chains(Ems, Choose, BB) ->
+merge_chains(Ems, Choose, BB, Reporter) ->
     Chains = init_chains(Ems, Choose, BB),
-    merge_chains(Ems, Choose, Chains, {[], nil}).
+    merge_chains(Ems, Choose, Chains, {[], nil}, Reporter, 0).
 
 
-merge_chains(Ems, _Choose, [], ChainAcc) ->
+merge_chains(Ems, _Choose, [], ChainAcc, _Reporter, _Count) ->
     {ok, CPos, _} = couch_file:append_term(Ems#ems.fd, ChainAcc),
     CPos;
-merge_chains(#ems{chain_chunk=CC}=Ems, Choose, Chains, Acc) ->
+merge_chains(#ems{chain_chunk=CC}=Ems, Choose, Chains, Acc, Reporter, Count0) ->
     {KV, RestChains} = choose_kv(Choose, Ems, Chains),
     {NewKVs, NewPrev} = append_item(Ems, Acc, KV, CC),
-    merge_chains(Ems, Choose, RestChains, {NewKVs, NewPrev}).
+    Count1 = case (Count0 + 1) rem ?REPORT_INTERVAL of
+        0 ->
+            Reporter(Count0),
+            0;
+        _ ->
+            Count0 + 1
+    end,
+    merge_chains(Ems, Choose, RestChains, {NewKVs, NewPrev}, Reporter, Count1).
 
 
 init_chains(Ems, Choose, BB) ->
@@ -316,3 +357,10 @@ append_item(Ems, {List, Prev}, Pos, Size) when length(List) >= Size ->
 append_item(_Ems, {List, Prev}, Pos, _Size) ->
     {[Pos | List], Prev}.
 
+
+num_merges(BBChunk, NumBB) when NumBB =< BBChunk ->
+    0;
+num_merges(BBChunk, NumBB) when NumBB > BBChunk ->
+    RevNumBB = ceil(NumBB / BBChunk),
+    FwdNumBB = ceil(RevNumBB / BBChunk),
+    2 + num_merges(BBChunk, FwdNumBB).