You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/03/29 17:52:24 UTC

couch commit: updated refs/heads/feat-improve-compaction-task-status to 1db1337 [Forced Update!]

Repository: couchdb-couch
Updated Branches:
  refs/heads/feat-improve-compaction-task-status 552446d80 -> 1db133730 (forced update)


Improve compaction task status updates

Previous the emsort related operations did not update the compaction
task status. For large databases this leads to some very long waits
while the compaction task stays at 100%. This change adds progress
reports to the steps for sorting and copying document ids back into the
database file.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/1db13373
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/1db13373
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/1db13373

Branch: refs/heads/feat-improve-compaction-task-status
Commit: 1db1337301a7c897be41e13cd328270509008478
Parents: 21c8d37
Author: Paul J. Davis <pa...@gmail.com>
Authored: Tue Mar 28 16:21:38 2017 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Wed Mar 29 12:51:24 2017 -0500

----------------------------------------------------------------------
 src/couch_db_updater.erl | 87 ++++++++++++++++++++++++++++++++++++++++---
 src/couch_emsort.erl     | 71 ++++++++++++++++++++++-------------
 2 files changed, 127 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/1db13373/src/couch_db_updater.erl
----------------------------------------------------------------------
diff --git a/src/couch_db_updater.erl b/src/couch_db_updater.erl
index 270fffe..db5397e 100644
--- a/src/couch_db_updater.erl
+++ b/src/couch_db_updater.erl
@@ -1147,6 +1147,7 @@ copy_compact(Db, NewDb0, Retry) ->
     TaskProps0 = [
         {type, database_compaction},
         {database, Db#db.name},
+        {phase, seq_tree},
         {progress, 0},
         {changes_done, 0},
         {total_changes, TotalChanges}
@@ -1193,6 +1194,8 @@ start_copy_compact(#db{}=Db) ->
         open_compaction_files(Name, Header, Filepath, Options),
     erlang:monitor(process, MFd),
 
+    {ok, DocCount} = couch_db:get_doc_count(Db),
+
     % This is a bit worrisome. init_db/4 will monitor the data fd
     % but it doesn't know about the meta fd. For now I'll maintain
     % that the data fd is the old normal fd and meta fd is special
@@ -1201,9 +1204,9 @@ start_copy_compact(#db{}=Db) ->
 
     NewDb1 = copy_purge_info(Db, NewDb),
     NewDb2 = copy_compact(Db, NewDb1, Retry),
-    NewDb3 = sort_meta_data(NewDb2),
+    NewDb3 = sort_meta_data(NewDb2, DocCount),
     NewDb4 = commit_compaction_data(NewDb3),
-    NewDb5 = copy_meta_data(NewDb4),
+    NewDb5 = copy_meta_data(NewDb4, DocCount),
     NewDb6 = sync_header(NewDb5, db_to_header(NewDb5, NewDb5#db.header)),
     close_db(NewDb6),
 
@@ -1323,12 +1326,82 @@ bind_id_tree(Db, Fd, State) ->
     Db#db{id_tree=IdBtree}.
 
 
-sort_meta_data(Db0) ->
-    {ok, Ems} = couch_emsort:merge(Db0#db.id_tree),
-    Db0#db{id_tree=Ems}.
+sort_meta_data(Db0, DocCount) ->
+    couch_task_status:update([
+        {phase, sort_ids_init},
+        {total_changes, DocCount},
+        {changes_done, 0},
+        {progress, 0}
+    ]),
+    Ems0 = Db0#db.id_tree,
+    Options = [
+        {event_cb, fun emsort_cb/3},
+        {event_st, {init, 0, 0}}
+    ],
+    Ems1 = couch_emsort:set_options(Ems0, Options),
+    {ok, Ems2} = couch_emsort:merge(Ems1),
+    Db0#db{id_tree=Ems2}.
+
+
+emsort_cb(_Ems, {merge, chain}, {init, Copied, Nodes}) ->
+    {init, Copied, Nodes + 1};
+emsort_cb(_Ems, row_copy, {init, Copied, Nodes}) when Copied >= 1000 ->
+    update_compact_task(Copied + 1),
+    {init, 0, Nodes};
+emsort_cb(_Ems, row_copy, {init, Copied, Nodes}) ->
+    {init, Copied + 1, Nodes};
+emsort_cb(Ems, {merge_start, reverse}, {init, Copied, Nodes}) ->
+    BBChunkSize = couch_emsort:get_bb_chunk_size(Ems),
+
+    % Subtract one because we already finished the first
+    % iteration when we were counting the number of nodes
+    % in the backbone.
+    Iters = calculate_sort_iters(Nodes, BBChunkSize, 0) - 1,
+
+    % Compaction retries mean we may have copied more than
+    % doc count rows. This accounts for that by using the
+    % number we've actually copied.
+    [PrevCopied] = couch_task_status:get([changes_done]),
+    TotalCopied = PrevCopied + Copied,
+
+    couch_task_status:update([
+        {phase, sort_ids},
+        {total_changes, Iters * TotalCopied},
+        {changes_done, 0},
+        {progress, 0}
+    ]),
+    0;
+
+emsort_cb(_Ems, row_copy, Copied) when is_integer(Copied), Copied > 1000 ->
+    update_compact_task(Copied + 1),
+    0;
+
+emsort_cb(_Ems, row_copy, Copied) when is_integer(Copied) ->
+    Copied + 1;
+
+emsort_cb(_Ems, _Event, St) ->
+    St.
 
 
-copy_meta_data(#db{fd=Fd, header=Header}=Db) ->
+calculate_sort_iters(Nodes, BBChunk, Count) when Nodes < BBChunk ->
+    Count;
+calculate_sort_iters(Nodes0, BBChunk, Count) when BBChunk > 1 ->
+    Calc = fun(N0) ->
+        N1 = N0 div BBChunk,
+        N1 + if N1 rem BBChunk == 0 -> 0; true -> 1 end
+    end,
+    Nodes1 = Calc(Nodes0),
+    Nodes2 = Calc(Nodes1),
+    calculate_sort_iters(Nodes2, BBChunk, Count + 2).
+
+
+copy_meta_data(#db{fd=Fd, header=Header}=Db, DocCount) ->
+    couch_task_status:update([
+        {phase, copy_ids},
+        {changes_done, 0},
+        {total_changes, DocCount},
+        {progress, 0}
+    ]),
     Src = Db#db.id_tree,
     DstState = couch_db_header:id_tree_state(Header),
     {ok, IdTree0} = couch_btree:open(DstState, Fd, [
@@ -1348,6 +1421,7 @@ copy_meta_data(#db{fd=Fd, header=Header}=Db) ->
     {ok, SeqTree} = couch_btree:add_remove(
         Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
     ),
+    update_compact_task(length(Acc#merge_st.infos)),
     Db#db{id_tree=IdTree, seq_tree=SeqTree}.
 
 
@@ -1359,6 +1433,7 @@ merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 ->
     } = Acc,
     {ok, IdTree1} = couch_btree:add(IdTree0, Infos),
     {ok, SeqTree1} = couch_btree:add_remove(SeqTree0, [], RemSeqs),
+    update_compact_task(length(Infos)),
     Acc1 = Acc#merge_st{
         id_tree=IdTree1,
         seq_tree=SeqTree1,

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/1db13373/src/couch_emsort.erl
----------------------------------------------------------------------
diff --git a/src/couch_emsort.erl b/src/couch_emsort.erl
index 2a25a23..d7f1b2b 100644
--- a/src/couch_emsort.erl
+++ b/src/couch_emsort.erl
@@ -129,7 +129,8 @@
 %     CA3                  CD3
 %
 
--export([open/1, open/2, get_fd/1, get_state/1]).
+-export([open/1, open/2, set_options/2, get_fd/1, get_state/1]).
+-export([get_bb_chunk_size/1]).
 -export([add/2, merge/1, sort/1, iter/1, next/1]).
 
 
@@ -137,7 +138,9 @@
     fd,
     root,
     bb_chunk = 10,
-    chain_chunk = 100
+    chain_chunk = 100,
+    event_cb,
+    event_st
 }).
 
 
@@ -156,7 +159,11 @@ set_options(Ems, [{root, Root} | Rest]) ->
 set_options(Ems, [{chain_chunk, Count} | Rest]) when is_integer(Count) ->
     set_options(Ems#ems{chain_chunk=Count}, Rest);
 set_options(Ems, [{back_bone_chunk, Count} | Rest]) when is_integer(Count) ->
-    set_options(Ems#ems{bb_chunk=Count}, Rest).
+    set_options(Ems#ems{bb_chunk=Count}, Rest);
+set_options(Ems, [{event_cb, EventCB} | Rest]) when is_function(EventCB, 3) ->
+    set_options(Ems#ems{event_cb=EventCB}, Rest);
+set_options(Ems, [{event_st, EventSt} | Rest]) ->
+    set_options(Ems#ems{event_st=EventSt}, Rest).
 
 
 get_fd(#ems{fd=Fd}) ->
@@ -167,6 +174,10 @@ get_state(#ems{root=Root}) ->
     Root.
 
 
+get_bb_chunk_size(#ems{bb_chunk = Size}) ->
+    Size.
+
+
 add(Ems, []) ->
     {ok, Ems};
 add(Ems, KVs) ->
@@ -224,7 +235,7 @@ decimate(#ems{root={_BB, nil}}=Ems) ->
     % We have less than bb_chunk backbone pointers so we're
     % good to start streaming KV's back to the client.
     Ems;
-decimate(#ems{root={BB, NextBB}}=Ems) ->
+decimate(#ems{}=Ems0) ->
     % To make sure we have a bounded amount of data in RAM
     % at any given point we first need to decimate the data
     % by performing the first couple iterations of a merge
@@ -232,43 +243,47 @@ decimate(#ems{root={BB, NextBB}}=Ems) ->
 
     % The first pass gives us a sort with pointers linked from
     % largest to smallest.
-    {RevBB, RevNextBB} = merge_back_bone(Ems, small, BB, NextBB),
+    {ok, Ems1} = event_notify(Ems0, {merge_start, forward}),
+    {ok, Ems2} = merge_back_bone(Ems1, small),
 
     % We have to run a second pass so that links are pointed
     % back from smallest to largest.
-    {FwdBB, FwdNextBB} = merge_back_bone(Ems, big, RevBB, RevNextBB),
+    {ok, Ems3} = event_notify(Ems2, {merge_start, reverse}),
+    {ok, Ems4} = merge_back_bone(Ems3, big),
 
     % Continue deicmating until we have an acceptable bound on
     % the number of keys to use.
-    decimate(Ems#ems{root={FwdBB, FwdNextBB}}).
+    decimate(Ems4).
 
 
-merge_back_bone(Ems, Choose, BB, NextBB) ->
-    BBPos = merge_chains(Ems, Choose, BB),
-    merge_rest_back_bone(Ems, Choose, NextBB, {[BBPos], nil}).
+merge_back_bone(#ems{root={BB, NextBB}}=Ems0, Choose) ->
+    {ok, Ems1, BBPos} = merge_chains(Ems0, Choose, BB),
+    merge_rest_back_bone(Ems1, Choose, NextBB, {[BBPos], nil}).
 
 
-merge_rest_back_bone(_Ems, _Choose, nil, Acc) ->
-    Acc;
-merge_rest_back_bone(Ems, Choose, BBPos, Acc) ->
-    {ok, {BB, NextBB}} = couch_file:pread_term(Ems#ems.fd, BBPos),
-    NewPos = merge_chains(Ems, Choose, BB),
-    {NewBB, NewPrev} = append_item(Ems, Acc, NewPos, Ems#ems.bb_chunk),
-    merge_rest_back_bone(Ems, Choose, NextBB, {NewBB, NewPrev}).
+merge_rest_back_bone(Ems, _Choose, nil, Acc) ->
+    {ok, Ems#ems{root=Acc}};
+merge_rest_back_bone(Ems0, Choose, BBPos, Acc) ->
+    {ok, {BB, NextBB}} = couch_file:pread_term(Ems0#ems.fd, BBPos),
+    {ok, Ems1, NewPos} = merge_chains(Ems0, Choose, BB),
+    {NewBB, NewPrev} = append_item(Ems1, Acc, NewPos, Ems1#ems.bb_chunk),
+    merge_rest_back_bone(Ems1, Choose, NextBB, {NewBB, NewPrev}).
 
 
-merge_chains(Ems, Choose, BB) ->
-    Chains = init_chains(Ems, Choose, BB),
-    merge_chains(Ems, Choose, Chains, {[], nil}).
+merge_chains(Ems0, Choose, BB) ->
+    {ok, Ems1} = event_notify(Ems0, {merge, chain}),
+    Chains = init_chains(Ems1, Choose, BB),
+    merge_chains(Ems1, Choose, Chains, {[], nil}).
 
 
 merge_chains(Ems, _Choose, [], ChainAcc) ->
     {ok, CPos, _} = couch_file:append_term(Ems#ems.fd, ChainAcc),
-    CPos;
-merge_chains(#ems{chain_chunk=CC}=Ems, Choose, Chains, Acc) ->
-    {KV, RestChains} = choose_kv(Choose, Ems, Chains),
-    {NewKVs, NewPrev} = append_item(Ems, Acc, KV, CC),
-    merge_chains(Ems, Choose, RestChains, {NewKVs, NewPrev}).
+    {ok, Ems, CPos};
+merge_chains(#ems{chain_chunk=CC}=Ems0, Choose, Chains, Acc) ->
+    {KV, RestChains} = choose_kv(Choose, Ems0, Chains),
+    {NewKVs, NewPrev} = append_item(Ems0, Acc, KV, CC),
+    {ok, Ems1} = event_notify(Ems0, row_copy),
+    merge_chains(Ems1, Choose, RestChains, {NewKVs, NewPrev}).
 
 
 init_chains(Ems, Choose, BB) ->
@@ -316,3 +331,9 @@ append_item(Ems, {List, Prev}, Pos, Size) when length(List) >= Size ->
 append_item(_Ems, {List, Prev}, Pos, _Size) ->
     {[Pos | List], Prev}.
 
+
+event_notify(#ems{event_cb = undefined} = Ems, _) ->
+    {ok, Ems};
+event_notify(#ems{event_cb=EventCB, event_st=EventSt}=Ems, Event) ->
+    NewSt = EventCB(Ems, Event, EventSt),
+    {ok, Ems#ems{event_st=NewSt}}.