You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/03/31 15:57:22 UTC

couch commit: updated refs/heads/feat-improve-compaction-task-status to ae00a5a [Forced Update!]

Repository: couchdb-couch
Updated Branches:
  refs/heads/feat-improve-compaction-task-status 1db133730 -> ae00a5a62 (forced update)


Improve compaction task status updates

Previous the emsort related operations did not update the compaction
task status. For large databases this leads to some very long waits
while the compaction task stays at 100%. This change adds progress
reports to the steps for sorting and copying document ids back into the
database file.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/ae00a5a6
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/ae00a5a6
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/ae00a5a6

Branch: refs/heads/feat-improve-compaction-task-status
Commit: ae00a5a62e83e585c46ae46a5b110b1474f4ea40
Parents: 21c8d37
Author: Paul J. Davis <pa...@gmail.com>
Authored: Tue Mar 28 16:21:38 2017 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Fri Mar 31 10:57:10 2017 -0500

----------------------------------------------------------------------
 src/couch_db_updater.erl | 97 +++++++++++++++++++++++++++++++++++++++----
 src/couch_emsort.erl     | 71 ++++++++++++++++++++-----------
 2 files changed, 134 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/ae00a5a6/src/couch_db_updater.erl
----------------------------------------------------------------------
diff --git a/src/couch_db_updater.erl b/src/couch_db_updater.erl
index 270fffe..3f3c919 100644
--- a/src/couch_db_updater.erl
+++ b/src/couch_db_updater.erl
@@ -34,6 +34,8 @@
     infos
 }).
 
+-define(COMP_DOCID_BATCH_SIZE, 1000).
+
 init({DbName, Filepath, Fd, Options}) ->
     erlang:put(io_priority, {db_update, DbName}),
     case lists:member(create, Options) of
@@ -1108,10 +1110,10 @@ copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
     NewDb#db{id_tree=IdEms, seq_tree=SeqTree}.
 
 
-copy_compact(Db, NewDb0, Retry) ->
+copy_compact(Db, NewDb0, Retry, TotalChanges) ->
     Compression = couch_compress:get_compression_method(),
     NewDb = NewDb0#db{compression=Compression},
-    TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
+
     BufferSize = list_to_integer(
         config:get("database_compaction", "doc_buffer_size", "524288")),
     CheckpointAfter = couch_util:to_integer(
@@ -1147,6 +1149,7 @@ copy_compact(Db, NewDb0, Retry) ->
     TaskProps0 = [
         {type, database_compaction},
         {database, Db#db.name},
+        {phase, seq_tree},
         {progress, 0},
         {changes_done, 0},
         {total_changes, TotalChanges}
@@ -1193,6 +1196,8 @@ start_copy_compact(#db{}=Db) ->
         open_compaction_files(Name, Header, Filepath, Options),
     erlang:monitor(process, MFd),
 
+    TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
+
     % This is a bit worrisome. init_db/4 will monitor the data fd
     % but it doesn't know about the meta fd. For now I'll maintain
     % that the data fd is the old normal fd and meta fd is special
@@ -1200,10 +1205,10 @@ start_copy_compact(#db{}=Db) ->
     unlink(DFd),
 
     NewDb1 = copy_purge_info(Db, NewDb),
-    NewDb2 = copy_compact(Db, NewDb1, Retry),
-    NewDb3 = sort_meta_data(NewDb2),
+    NewDb2 = copy_compact(Db, NewDb1, Retry, TotalChanges),
+    NewDb3 = sort_meta_data(NewDb2, TotalChanges),
     NewDb4 = commit_compaction_data(NewDb3),
-    NewDb5 = copy_meta_data(NewDb4),
+    NewDb5 = copy_meta_data(NewDb4, TotalChanges),
     NewDb6 = sync_header(NewDb5, db_to_header(NewDb5, NewDb5#db.header)),
     close_db(NewDb6),
 
@@ -1323,12 +1328,84 @@ bind_id_tree(Db, Fd, State) ->
     Db#db{id_tree=IdBtree}.
 
 
-sort_meta_data(Db0) ->
-    {ok, Ems} = couch_emsort:merge(Db0#db.id_tree),
-    Db0#db{id_tree=Ems}.
+sort_meta_data(Db0, TotalChanges) ->
+    couch_task_status:update([
+        {phase, sort_ids_init},
+        {total_changes, TotalChanges},
+        {changes_done, 0},
+        {progress, 0}
+    ]),
+    Ems0 = Db0#db.id_tree,
+    Options = [
+        {event_cb, fun emsort_cb/3},
+        {event_st, {init, 0, 0}}
+    ],
+    Ems1 = couch_emsort:set_options(Ems0, Options),
+    {ok, Ems2} = couch_emsort:merge(Ems1),
+    Db0#db{id_tree=Ems2}.
+
+
+emsort_cb(_Ems, {merge, chain}, {init, Copied, Nodes}) ->
+    {init, Copied, Nodes + 1};
+emsort_cb(_Ems, row_copy, {init, Copied, Nodes})
+        when Copied >= ?COMP_DOCID_BATCH_SIZE ->
+    update_compact_task(Copied + 1),
+    {init, 0, Nodes};
+emsort_cb(_Ems, row_copy, {init, Copied, Nodes}) ->
+    {init, Copied + 1, Nodes};
+emsort_cb(Ems, {merge_start, reverse}, {init, Copied, Nodes}) ->
+    BBChunkSize = couch_emsort:get_bb_chunk_size(Ems),
+
+    % Subtract one because we already finished the first
+    % iteration when we were counting the number of nodes
+    % in the backbone.
+    Iters = calculate_sort_iters(Nodes, BBChunkSize, 0) - 1,
+
+    % Compaction retries mean we may have copied more than
+    % doc count rows. This accounts for that by using the
+    % number we've actually copied.
+    [PrevCopied] = couch_task_status:get([changes_done]),
+    TotalCopied = PrevCopied + Copied,
+
+    couch_task_status:update([
+        {phase, sort_ids},
+        {total_changes, Iters * TotalCopied},
+        {changes_done, 0},
+        {progress, 0}
+    ]),
+    0;
+
+emsort_cb(_Ems, row_copy, Copied)
+        when is_integer(Copied), Copied > ?COMP_DOCID_BATCH_SIZE ->
+    update_compact_task(Copied + 1),
+    0;
+
+emsort_cb(_Ems, row_copy, Copied) when is_integer(Copied) ->
+    Copied + 1;
 
+emsort_cb(_Ems, _Event, St) ->
+    St.
 
-copy_meta_data(#db{fd=Fd, header=Header}=Db) ->
+
+calculate_sort_iters(Nodes, BBChunk, Count) when Nodes < BBChunk ->
+    Count;
+calculate_sort_iters(Nodes0, BBChunk, Count) when BBChunk > 1 ->
+    Calc = fun(N0) ->
+        N1 = N0 div BBChunk,
+        N1 + if N1 rem BBChunk == 0 -> 0; true -> 1 end
+    end,
+    Nodes1 = Calc(Nodes0),
+    Nodes2 = Calc(Nodes1),
+    calculate_sort_iters(Nodes2, BBChunk, Count + 2).
+
+
+copy_meta_data(#db{fd=Fd, header=Header}=Db, TotalChanges) ->
+    couch_task_status:update([
+        {phase, copy_ids},
+        {changes_done, 0},
+        {total_changes, TotalChanges},
+        {progress, 0}
+    ]),
     Src = Db#db.id_tree,
     DstState = couch_db_header:id_tree_state(Header),
     {ok, IdTree0} = couch_btree:open(DstState, Fd, [
@@ -1348,6 +1425,7 @@ copy_meta_data(#db{fd=Fd, header=Header}=Db) ->
     {ok, SeqTree} = couch_btree:add_remove(
         Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
     ),
+    update_compact_task(length(Acc#merge_st.infos)),
     Db#db{id_tree=IdTree, seq_tree=SeqTree}.
 
 
@@ -1359,6 +1437,7 @@ merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 ->
     } = Acc,
     {ok, IdTree1} = couch_btree:add(IdTree0, Infos),
     {ok, SeqTree1} = couch_btree:add_remove(SeqTree0, [], RemSeqs),
+    update_compact_task(length(Infos)),
     Acc1 = Acc#merge_st{
         id_tree=IdTree1,
         seq_tree=SeqTree1,

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/ae00a5a6/src/couch_emsort.erl
----------------------------------------------------------------------
diff --git a/src/couch_emsort.erl b/src/couch_emsort.erl
index 2a25a23..d7f1b2b 100644
--- a/src/couch_emsort.erl
+++ b/src/couch_emsort.erl
@@ -129,7 +129,8 @@
 %     CA3                  CD3
 %
 
--export([open/1, open/2, get_fd/1, get_state/1]).
+-export([open/1, open/2, set_options/2, get_fd/1, get_state/1]).
+-export([get_bb_chunk_size/1]).
 -export([add/2, merge/1, sort/1, iter/1, next/1]).
 
 
@@ -137,7 +138,9 @@
     fd,
     root,
     bb_chunk = 10,
-    chain_chunk = 100
+    chain_chunk = 100,
+    event_cb,
+    event_st
 }).
 
 
@@ -156,7 +159,11 @@ set_options(Ems, [{root, Root} | Rest]) ->
 set_options(Ems, [{chain_chunk, Count} | Rest]) when is_integer(Count) ->
     set_options(Ems#ems{chain_chunk=Count}, Rest);
 set_options(Ems, [{back_bone_chunk, Count} | Rest]) when is_integer(Count) ->
-    set_options(Ems#ems{bb_chunk=Count}, Rest).
+    set_options(Ems#ems{bb_chunk=Count}, Rest);
+set_options(Ems, [{event_cb, EventCB} | Rest]) when is_function(EventCB, 3) ->
+    set_options(Ems#ems{event_cb=EventCB}, Rest);
+set_options(Ems, [{event_st, EventSt} | Rest]) ->
+    set_options(Ems#ems{event_st=EventSt}, Rest).
 
 
 get_fd(#ems{fd=Fd}) ->
@@ -167,6 +174,10 @@ get_state(#ems{root=Root}) ->
     Root.
 
 
+get_bb_chunk_size(#ems{bb_chunk = Size}) ->
+    Size.
+
+
 add(Ems, []) ->
     {ok, Ems};
 add(Ems, KVs) ->
@@ -224,7 +235,7 @@ decimate(#ems{root={_BB, nil}}=Ems) ->
     % We have less than bb_chunk backbone pointers so we're
     % good to start streaming KV's back to the client.
     Ems;
-decimate(#ems{root={BB, NextBB}}=Ems) ->
+decimate(#ems{}=Ems0) ->
     % To make sure we have a bounded amount of data in RAM
     % at any given point we first need to decimate the data
     % by performing the first couple iterations of a merge
@@ -232,43 +243,47 @@ decimate(#ems{root={BB, NextBB}}=Ems) ->
 
     % The first pass gives us a sort with pointers linked from
     % largest to smallest.
-    {RevBB, RevNextBB} = merge_back_bone(Ems, small, BB, NextBB),
+    {ok, Ems1} = event_notify(Ems0, {merge_start, forward}),
+    {ok, Ems2} = merge_back_bone(Ems1, small),
 
     % We have to run a second pass so that links are pointed
     % back from smallest to largest.
-    {FwdBB, FwdNextBB} = merge_back_bone(Ems, big, RevBB, RevNextBB),
+    {ok, Ems3} = event_notify(Ems2, {merge_start, reverse}),
+    {ok, Ems4} = merge_back_bone(Ems3, big),
 
     % Continue deicmating until we have an acceptable bound on
     % the number of keys to use.
-    decimate(Ems#ems{root={FwdBB, FwdNextBB}}).
+    decimate(Ems4).
 
 
-merge_back_bone(Ems, Choose, BB, NextBB) ->
-    BBPos = merge_chains(Ems, Choose, BB),
-    merge_rest_back_bone(Ems, Choose, NextBB, {[BBPos], nil}).
+merge_back_bone(#ems{root={BB, NextBB}}=Ems0, Choose) ->
+    {ok, Ems1, BBPos} = merge_chains(Ems0, Choose, BB),
+    merge_rest_back_bone(Ems1, Choose, NextBB, {[BBPos], nil}).
 
 
-merge_rest_back_bone(_Ems, _Choose, nil, Acc) ->
-    Acc;
-merge_rest_back_bone(Ems, Choose, BBPos, Acc) ->
-    {ok, {BB, NextBB}} = couch_file:pread_term(Ems#ems.fd, BBPos),
-    NewPos = merge_chains(Ems, Choose, BB),
-    {NewBB, NewPrev} = append_item(Ems, Acc, NewPos, Ems#ems.bb_chunk),
-    merge_rest_back_bone(Ems, Choose, NextBB, {NewBB, NewPrev}).
+merge_rest_back_bone(Ems, _Choose, nil, Acc) ->
+    {ok, Ems#ems{root=Acc}};
+merge_rest_back_bone(Ems0, Choose, BBPos, Acc) ->
+    {ok, {BB, NextBB}} = couch_file:pread_term(Ems0#ems.fd, BBPos),
+    {ok, Ems1, NewPos} = merge_chains(Ems0, Choose, BB),
+    {NewBB, NewPrev} = append_item(Ems1, Acc, NewPos, Ems1#ems.bb_chunk),
+    merge_rest_back_bone(Ems1, Choose, NextBB, {NewBB, NewPrev}).
 
 
-merge_chains(Ems, Choose, BB) ->
-    Chains = init_chains(Ems, Choose, BB),
-    merge_chains(Ems, Choose, Chains, {[], nil}).
+merge_chains(Ems0, Choose, BB) ->
+    {ok, Ems1} = event_notify(Ems0, {merge, chain}),
+    Chains = init_chains(Ems1, Choose, BB),
+    merge_chains(Ems1, Choose, Chains, {[], nil}).
 
 
 merge_chains(Ems, _Choose, [], ChainAcc) ->
     {ok, CPos, _} = couch_file:append_term(Ems#ems.fd, ChainAcc),
-    CPos;
-merge_chains(#ems{chain_chunk=CC}=Ems, Choose, Chains, Acc) ->
-    {KV, RestChains} = choose_kv(Choose, Ems, Chains),
-    {NewKVs, NewPrev} = append_item(Ems, Acc, KV, CC),
-    merge_chains(Ems, Choose, RestChains, {NewKVs, NewPrev}).
+    {ok, Ems, CPos};
+merge_chains(#ems{chain_chunk=CC}=Ems0, Choose, Chains, Acc) ->
+    {KV, RestChains} = choose_kv(Choose, Ems0, Chains),
+    {NewKVs, NewPrev} = append_item(Ems0, Acc, KV, CC),
+    {ok, Ems1} = event_notify(Ems0, row_copy),
+    merge_chains(Ems1, Choose, RestChains, {NewKVs, NewPrev}).
 
 
 init_chains(Ems, Choose, BB) ->
@@ -316,3 +331,9 @@ append_item(Ems, {List, Prev}, Pos, Size) when length(List) >= Size ->
 append_item(_Ems, {List, Prev}, Pos, _Size) ->
     {[Pos | List], Prev}.
 
+
+event_notify(#ems{event_cb = undefined} = Ems, _) ->
+    {ok, Ems};
+event_notify(#ems{event_cb=EventCB, event_st=EventSt}=Ems, Event) ->
+    NewSt = EventCB(Ems, Event, EventSt),
+    {ok, Ems#ems{event_st=NewSt}}.