You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/03/28 21:57:30 UTC

[1/2] couch commit: updated refs/heads/feat-improve-compaction-task-status to 552446d

Repository: couchdb-couch
Updated Branches:
  refs/heads/feat-improve-compaction-task-status [created] 552446d80


Redo mix


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/552446d8
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/552446d8
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/552446d8

Branch: refs/heads/feat-improve-compaction-task-status
Commit: 552446d80d7bdc873719b2d24a37ea85343fc5ec
Parents: 6e8fdcf
Author: Paul J. Davis <pa...@gmail.com>
Authored: Tue Mar 28 16:24:58 2017 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Tue Mar 28 16:57:22 2017 -0500

----------------------------------------------------------------------
 src/couch_db_updater.erl | 50 ++++++++++++++++++++++++-------------------
 1 file changed, 28 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/552446d8/src/couch_db_updater.erl
----------------------------------------------------------------------
diff --git a/src/couch_db_updater.erl b/src/couch_db_updater.erl
index be441b9..39ed436 100644
--- a/src/couch_db_updater.erl
+++ b/src/couch_db_updater.erl
@@ -1407,31 +1407,18 @@ copy_meta_data(#db{fd=Fd, header=Header}=Db, DocCount) ->
         rem_seqs=[],
         infos=[]
     },
-    Acc = merge_docids(Iter, Acc0),
-    {ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Acc#merge_st.infos),
-    {ok, Seqtree} = couch_btree:add_remove(
-        Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
-    ),
-    update_compact_task(Acc#merge_st.infos),
+    Acc1 = merge_docids(Iter, Acc0),
+    Acc2 = flush_merge_st(Acc1),
+    #merge_st{
+        id_tree = Idtree,
+        seq_tree = SeqTree
+    } = Acc2,
     Db#db{id_tree=IdTree, seq_tree=SeqTree}.
 
 
-merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 ->
-    #merge_st{
-        id_tree=IdTree0,
-        seq_tree=SeqTree0,
-        rem_seqs=RemSeqs
-    } = Acc,
-    {ok, IdTree1} = couch_btree:add(IdTree0, Infos),
-    {ok, SeqTree1} = couch_btree:add_remove(SeqTree0, [], RemSeqs),
-    update_compact_task(length(Infos)),
-    Acc1 = Acc#merge_st{
-        id_tree=IdTree1,
-        seq_tree=SeqTree1,
-        rem_seqs=[],
-        infos=[]
-    },
-    merge_docids(Iter, Acc1);
+merge_docids(Iter, #merge_st{docid_seqs = DocIdSeqs} = Acc)
+        when length(DocIdSeqs) > 1000 ->
+    merge_docids(Iter, flush_merge_st(Acc)),
 merge_docids(Iter, #merge_st{curr=Curr}=Acc) ->
     case next_info(Iter, Curr, []) of
         {NextIter, NewCurr, FDI, Seqs} ->
@@ -1470,6 +1457,25 @@ next_info(Iter, {Id, Seq, FDI}, Seqs) ->
     end.
 
 
+flush_merge_st(MergeSt) ->
+    #merge_st{
+        id_tree=IdTree0,
+        seq_tree=SeqTree0,
+        rem_seqs=RemSeqs,
+        docid_seqs=DocIdSeqs
+    } = Acc,
+    {ok, FDIKVs, SeqTree1} = couch_btree:query_modify(
+            SeqTree0, DocIdSeqs, [], RemSeqs),
+    FDIs = lists:map(fun({ok, #full_doc_info{} = FDI}) -> FDI end, FDIKVs),
+    {ok, IdTree1} = couch_btree:add(IdTree0, FDIs),
+    Acc#merge_st{
+        id_tree=IdTree1,
+        seq_tree=SeqTree1,
+        rem_seqs=[],
+        docid_seqs=[]
+    }.
+
+
 update_compact_task(NumChanges) ->
     [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
     Changes2 = Changes + NumChanges,


[2/2] couch commit: updated refs/heads/feat-improve-compaction-task-status to 552446d

Posted by da...@apache.org.
Improve compaction task status updates

Previous the emsort related operations did not update the compaction
task status. For large databases this leads to some very long waits
while the compaction task stays at 100%. This change adds progress
reports to the steps for sorting and copying document ids back into the
database file.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/6e8fdcf1
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/6e8fdcf1
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/6e8fdcf1

Branch: refs/heads/feat-improve-compaction-task-status
Commit: 6e8fdcf1fd16b0fcada1cde0d361af3d13f1a2c7
Parents: 21c8d37
Author: Paul J. Davis <pa...@gmail.com>
Authored: Tue Mar 28 16:21:38 2017 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Tue Mar 28 16:57:22 2017 -0500

----------------------------------------------------------------------
 src/couch_db_updater.erl | 80 +++++++++++++++++++++++++++++++++++++++----
 src/couch_emsort.erl     | 44 ++++++++++++++++++------
 2 files changed, 107 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/6e8fdcf1/src/couch_db_updater.erl
----------------------------------------------------------------------
diff --git a/src/couch_db_updater.erl b/src/couch_db_updater.erl
index 270fffe..be441b9 100644
--- a/src/couch_db_updater.erl
+++ b/src/couch_db_updater.erl
@@ -1147,6 +1147,7 @@ copy_compact(Db, NewDb0, Retry) ->
     TaskProps0 = [
         {type, database_compaction},
         {database, Db#db.name},
+        {phase, seq_tree},
         {progress, 0},
         {changes_done, 0},
         {total_changes, TotalChanges}
@@ -1193,6 +1194,8 @@ start_copy_compact(#db{}=Db) ->
         open_compaction_files(Name, Header, Filepath, Options),
     erlang:monitor(process, MFd),
 
+    {ok, DocCount} = couch_db:get_doc_count(Db),
+
     % This is a bit worrisome. init_db/4 will monitor the data fd
     % but it doesn't know about the meta fd. For now I'll maintain
     % that the data fd is the old normal fd and meta fd is special
@@ -1201,9 +1204,9 @@ start_copy_compact(#db{}=Db) ->
 
     NewDb1 = copy_purge_info(Db, NewDb),
     NewDb2 = copy_compact(Db, NewDb1, Retry),
-    NewDb3 = sort_meta_data(NewDb2),
+    NewDb3 = sort_meta_data(NewDb2, DocCount),
     NewDb4 = commit_compaction_data(NewDb3),
-    NewDb5 = copy_meta_data(NewDb4),
+    NewDb5 = copy_meta_data(NewDb4, DocCount),
     NewDb6 = sync_header(NewDb5, db_to_header(NewDb5, NewDb5#db.header)),
     close_db(NewDb6),
 
@@ -1323,12 +1326,73 @@ bind_id_tree(Db, Fd, State) ->
     Db#db{id_tree=IdBtree}.
 
 
-sort_meta_data(Db0) ->
-    {ok, Ems} = couch_emsort:merge(Db0#db.id_tree),
-    Db0#db{id_tree=Ems}.
+sort_meta_data(Db0, DocCount) ->
+    couch_task_status:update([
+        {phase, sort_ids_init},
+        {total_changes, DocCount},
+        {changes_done, 0},
+        {progress, 0}
+    ]),
+    Ems0 = Db0#db.id_tree,
+    Options = [
+        {event_cb, fun emsort_cb/2},
+        {event_st, {init, 0, 0, couch_emsort:get_bb_chunk(Ems0)}}
+    ],
+    Ems1 = couch_emsort:set_options(Ems0, Options)
+    {ok, Ems2} = couch_emsort:merge(Ems1),
+    Db0#db{id_tree=Ems2}.
+
+
+emsort_cb({merge, chain}, {init, Copied, Nodes, BBChunk}) ->
+    {init, Copied, Nodes + 1, BBChunk};
+emsort_cb(row_copy, {init, Copied, Nodes, BBChun}) when Copied > 1000 ->
+    update_compact_task(Copied + 1),
+    {init, 0, Nodes, BBChunk};
+emsort_cb(row_copy, {init, Copied, Nodes, BBChunk}) ->
+    {init, Copied + 1, Nodes, BBChunk};
+emsort_cb({merge_start, reverse}, {init, Copied, Nodes, BBChunk}) ->
+    % Subtract one because we already finished the first
+    % iteration when we were counting the number of nodes
+    % in the backbone.
+    Iters = calculate_sort_iters(Nodes, BBChunk, 0) - 1,
+    couch_task_status:update([
+        {phase, sort_ids},
+        {total_changes, Iters * Copied},
+        {changes_done, 0},
+        {progress, 0}
+    ]),
+    0;
+
+emsort_cb(row_copy, Copied) when is_integer(Copied), Copied > 1000 ->
+    update_compact_task(Copied + 1),
+    0;
+
+emsort_cb(row_copy, Copied) when is_integer(Copied) ->
+    Copied + 1;
 
+emsort_cb(_Event, St) ->
+    St.
 
-copy_meta_data(#db{fd=Fd, header=Header}=Db) ->
+
+calculate_sort_iters(Nodes, BBChunk, Count) when Nodes < BBChunk ->
+    Count;
+calculate_sort_iters(Nodes0, BBChunk, Count) when BBChunk > 1 ->
+    Calc = fun(N0) ->
+        N1 = N0 div BBChunk,
+        N1 + if N1 rem BBChunk /= 0 -> 1; true -> 1 end
+    end,
+    Nodes1 = Calc(Nodes0),
+    Nodes2 = Calc(Nodes1),
+    calculate_sort_iters(Nodes2, BBChunk, Count + 2).
+
+
+copy_meta_data(#db{fd=Fd, header=Header}=Db, DocCount) ->
+    couch_task_status:update([
+        {phase, copy_ids},
+        {changes_done, 0},
+        {total_changes, DocCount},
+        {progress, 0}
+    ]),
     Src = Db#db.id_tree,
     DstState = couch_db_header:id_tree_state(Header),
     {ok, IdTree0} = couch_btree:open(DstState, Fd, [
@@ -1345,9 +1409,10 @@ copy_meta_data(#db{fd=Fd, header=Header}=Db) ->
     },
     Acc = merge_docids(Iter, Acc0),
     {ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Acc#merge_st.infos),
-    {ok, SeqTree} = couch_btree:add_remove(
+    {ok, Seqtree} = couch_btree:add_remove(
         Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
     ),
+    update_compact_task(Acc#merge_st.infos),
     Db#db{id_tree=IdTree, seq_tree=SeqTree}.
 
 
@@ -1359,6 +1424,7 @@ merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 ->
     } = Acc,
     {ok, IdTree1} = couch_btree:add(IdTree0, Infos),
     {ok, SeqTree1} = couch_btree:add_remove(SeqTree0, [], RemSeqs),
+    update_compact_task(length(Infos)),
     Acc1 = Acc#merge_st{
         id_tree=IdTree1,
         seq_tree=SeqTree1,

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/6e8fdcf1/src/couch_emsort.erl
----------------------------------------------------------------------
diff --git a/src/couch_emsort.erl b/src/couch_emsort.erl
index 2a25a23..ba2c2a3 100644
--- a/src/couch_emsort.erl
+++ b/src/couch_emsort.erl
@@ -129,7 +129,7 @@
 %     CA3                  CD3
 %
 
--export([open/1, open/2, get_fd/1, get_state/1]).
+-export([open/1, open/2, set_options/2, get_fd/1, get_state/1, get_bb_chunk/1]).
 -export([add/2, merge/1, sort/1, iter/1, next/1]).
 
 
@@ -137,7 +137,9 @@
     fd,
     root,
     bb_chunk = 10,
-    chain_chunk = 100
+    chain_chunk = 100,
+    event_cb,
+    event_st
 }).
 
 
@@ -156,7 +158,11 @@ set_options(Ems, [{root, Root} | Rest]) ->
 set_options(Ems, [{chain_chunk, Count} | Rest]) when is_integer(Count) ->
     set_options(Ems#ems{chain_chunk=Count}, Rest);
 set_options(Ems, [{back_bone_chunk, Count} | Rest]) when is_integer(Count) ->
-    set_options(Ems#ems{bb_chunk=Count}, Rest).
+    set_options(Ems#ems{bb_chunk=Count}, Rest);
+set_options(Ems, [{event_cb, EventCB} | Rest]) when is_function(EventCB, 2) ->
+    set_options(Ems#ems{event_cb=EventCB}, Rest);
+set_options(Ems, [{event_st, EventSt} | Rest]) ->
+    set_options(Ems#ems{event_st=EventSt}, Rest).
 
 
 get_fd(#ems{fd=Fd}) ->
@@ -167,6 +173,10 @@ get_state(#ems{root=Root}) ->
     Root.
 
 
+get_bb_chunk(#ems{bb_cunk = Size}) ->
+    Size.
+
+
 add(Ems, []) ->
     {ok, Ems};
 add(Ems, KVs) ->
@@ -224,7 +234,7 @@ decimate(#ems{root={_BB, nil}}=Ems) ->
     % We have less than bb_chunk backbone pointers so we're
     % good to start streaming KV's back to the client.
     Ems;
-decimate(#ems{root={BB, NextBB}}=Ems) ->
+decimate(#ems{root={BB, NextBB}}=Ems0) ->
     % To make sure we have a bounded amount of data in RAM
     % at any given point we first need to decimate the data
     % by performing the first couple iterations of a merge
@@ -232,15 +242,17 @@ decimate(#ems{root={BB, NextBB}}=Ems) ->
 
     % The first pass gives us a sort with pointers linked from
     % largest to smallest.
-    {RevBB, RevNextBB} = merge_back_bone(Ems, small, BB, NextBB),
+    Ems1 = event_notify(Ems0, {merge_start, forward}),
+    {RevBB, RevNextBB} = merge_back_bone(Ems1, small, BB, NextBB),
 
     % We have to run a second pass so that links are pointed
     % back from smallest to largest.
-    {FwdBB, FwdNextBB} = merge_back_bone(Ems, big, RevBB, RevNextBB),
+    Ems2 = event_notify(Ems1, {merge_start, reverse}),
+    {FwdBB, FwdNextBB} = merge_back_bone(Ems2, big, RevBB, RevNextBB),
 
     % Continue deicmating until we have an acceptable bound on
     % the number of keys to use.
-    decimate(Ems#ems{root={FwdBB, FwdNextBB}}).
+    decimate(Ems2#ems{root={FwdBB, FwdNextBB}}).
 
 
 merge_back_bone(Ems, Choose, BB, NextBB) ->
@@ -258,8 +270,9 @@ merge_rest_back_bone(Ems, Choose, BBPos, Acc) ->
 
 
 merge_chains(Ems, Choose, BB) ->
-    Chains = init_chains(Ems, Choose, BB),
-    merge_chains(Ems, Choose, Chains, {[], nil}).
+    NewEms = event_notify(Ems, {merge, chain}),
+    Chains = init_chains(NewEms, Choose, BB),
+    merge_chains(NewEms, Choose, Chains, {[], nil}).
 
 
 merge_chains(Ems, _Choose, [], ChainAcc) ->
@@ -268,7 +281,8 @@ merge_chains(Ems, _Choose, [], ChainAcc) ->
 merge_chains(#ems{chain_chunk=CC}=Ems, Choose, Chains, Acc) ->
     {KV, RestChains} = choose_kv(Choose, Ems, Chains),
     {NewKVs, NewPrev} = append_item(Ems, Acc, KV, CC),
-    merge_chains(Ems, Choose, RestChains, {NewKVs, NewPrev}).
+    NewEms = event_notify(Ems, row_copy),
+    merge_chains(NewEms, Choose, RestChains, {NewKVs, NewPrev}).
 
 
 init_chains(Ems, Choose, BB) ->
@@ -316,3 +330,13 @@ append_item(Ems, {List, Prev}, Pos, Size) when length(List) >= Size ->
 append_item(_Ems, {List, Prev}, Pos, _Size) ->
     {[Pos | List], Prev}.
 
+
+event_notify(#ems{event_cb = undefined} = Ems, _) ->
+    Ems;
+event_notify(#ems{event_cb=EventCB, event_st=EventSt}=Ems, Event) ->
+    try
+        NewSt = EventCB(Event, EventSt),
+        Ems#ems{event_st=NewSt}
+    catch _:_ ->
+        Ems
+    end.