You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@couchdb.apache.org by GitBox <gi...@apache.org> on 2018/07/14 14:58:32 UTC

[GitHub] janl closed pull request #1006: Compaction status update

janl closed pull request #1006: Compaction status update
URL: https://github.com/apache/couchdb/pull/1006
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/src/couch/src/couch_compact_status.erl b/src/couch/src/couch_compact_status.erl
new file mode 100644
index 0000000000..feef226643
--- /dev/null
+++ b/src/couch/src/couch_compact_status.erl
@@ -0,0 +1,105 @@
+
+-module(couch_compact_status).
+-behaviour(gen_server).
+-vsn(1).
+
+-export([start_link/2]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
+
+-record(state, {
+    dbname,
+    filepath,
+    stop=false
+    }).
+
+-define(TIMEOUT,  500).
+
+start_link(DbName, Filepath) ->
+    gen_server:start_link(?MODULE, [DbName, Filepath], []).
+
+init([DbName, Filepath]) ->
+    process_flag(trap_exit, true),
+    {ok, #state{dbname=DbName, filepath=Filepath}}.
+
+handle_call(_Request, _From, State) ->
+    {reply, ignored, State}.
+
+handle_cast({add_task, [Retry, TotalChanges]}, State) ->
+    TaskProps0 = [
+        {type, database_compaction},
+        {database, State#state.dbname},
+        {progress, 0},
+        {changes_done, 0},
+        {compact_meta_size, 0},
+        {compact_data_size, 0},
+        {sort_start, 0},
+        {merge_start, 0},
+        {total_changes, TotalChanges}
+    ],
+    case (Retry =/= nil) and couch_task_status:is_task_added() of
+    true ->
+        couch_task_status:update([
+            {retry, true},
+            {progress, 0},
+            {changes_done, 0},
+            {total_changes, TotalChanges}
+        ]);
+    false ->
+        couch_task_status:add_task(TaskProps0),
+        couch_task_status:set_update_frequency(?TIMEOUT)
+    end,
+    erlang:send_after(?TIMEOUT + 300, self(), track_fsize),
+    {noreply, State};
+handle_cast({update_task, NumChanges}, State) ->
+    [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
+    Changes2 = Changes + NumChanges,
+    Progress = case Total of
+    0 ->
+        0;
+    _ ->
+        (Changes2 * 100) div Total
+    end,
+    couch_task_status:update([{changes_done, Changes2}, {progress, Progress}]),
+    {noreply, State};
+handle_cast({update_progress, Progress}, State) ->
+    couch_task_status:update([{progress, Progress}]),
+    {noreply, State};
+handle_cast(sort_start, State) ->
+    couch_task_status:update([{sort_start, timestamp()}]),
+    {noreply, State};
+handle_cast(merge_start, State) ->
+    couch_task_status:update([{merge_start, timestamp()}]),
+    {noreply, State};
+handle_cast(done, State) ->
+    {noreply, State#state{stop=true}};
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info(track_fsize, #state{filepath=Filepath, stop=Stop}=State) ->
+    if Stop == true -> ok; true ->
+        erlang:send_after(?TIMEOUT, self(), track_fsize)
+    end,
+    Meta = Filepath ++ ".compact.meta",
+    Data = Filepath ++ ".compact.data",
+    MetaSize = filelib:file_size(Meta),
+    DataSize = filelib:file_size(Data),
+    couch_task_status:update([{compact_meta_size, MetaSize}, {compact_data_size, DataSize}]),
+    {noreply, State};
+handle_info({'EXIT', _From, Reason}, State) ->
+    {stop, Reason, State};
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+timestamp() ->
+    timestamp(now()).
+
+timestamp({Mega, Secs, _}) ->
+    Mega * 1000000 + Secs.
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index ca61e04c62..125789636e 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -1137,7 +1137,7 @@ copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
         {{Id, Seq}, FDI}
     end, NewInfos),
     {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs),
-    update_compact_task(length(NewInfos)),
+    update_compact_task(length(NewInfos), NewDb#db.name, NewDb#db.filepath),
     NewDb#db{id_tree=IdEms, seq_tree=SeqTree}.
 
 
@@ -1177,25 +1177,8 @@ copy_compact(Db, NewDb0, Retry) ->
         end
     end,
 
-    TaskProps0 = [
-        {type, database_compaction},
-        {database, Db#db.name},
-        {progress, 0},
-        {changes_done, 0},
-        {total_changes, TotalChanges}
-    ],
-    case (Retry =/= nil) and couch_task_status:is_task_added() of
-    true ->
-        couch_task_status:update([
-            {retry, true},
-            {progress, 0},
-            {changes_done, 0},
-            {total_changes, TotalChanges}
-        ]);
-    false ->
-        couch_task_status:add_task(TaskProps0),
-        couch_task_status:set_update_frequency(500)
-    end,
+    SU_pid = get_status_updater(Db#db.name, Db#db.filepath),
+    gen_server:cast(SU_pid, {add_task, [Retry, TotalChanges]}),
 
     {ok, _, {NewDb2, Uncopied, _, _}} =
         couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun,
@@ -1343,7 +1326,11 @@ bind_emsort(Db, Fd, nil) ->
     {ok, Ems} = couch_emsort:open(Fd),
     Db#db{id_tree=Ems};
 bind_emsort(Db, Fd, State) ->
-    {ok, Ems} = couch_emsort:open(Fd, [{root, State}]),
+    {ok, Ems} =
+    case State of
+        [{root, _},{added,_}] -> couch_emsort:open(Fd, State);
+        _ ->  couch_emsort:open(Fd, [{root, State}])
+    end,
     Db#db{id_tree=Ems}.
 
 
@@ -1357,11 +1344,15 @@ bind_id_tree(Db, Fd, State) ->
 
 
 sort_meta_data(Db0) ->
+    SU_pid = get_status_updater(Db0#db.name, Db0#db.filepath),
+    gen_server:cast(SU_pid, sort_start),
     {ok, Ems} = couch_emsort:merge(Db0#db.id_tree),
     Db0#db{id_tree=Ems}.
 
 
 copy_meta_data(#db{fd=Fd, header=Header}=Db) ->
+    SU_pid = get_status_updater(Db#db.name, Db#db.filepath),
+    gen_server:cast(SU_pid, merge_start),
     Src = Db#db.id_tree,
     DstState = couch_db_header:id_tree_state(Header),
     {ok, IdTree0} = couch_btree:open(DstState, Fd, [
@@ -1376,20 +1367,29 @@ copy_meta_data(#db{fd=Fd, header=Header}=Db) ->
         rem_seqs=[],
         infos=[]
     },
-    Acc = merge_docids(Iter, Acc0),
+    Acc = merge_docids(Iter, Acc0, SU_pid),
     {ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Acc#merge_st.infos),
     {ok, SeqTree} = couch_btree:add_remove(
         Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
     ),
+    gen_server:cast(SU_pid, done),
     Db#db{id_tree=IdTree, seq_tree=SeqTree}.
 
 
-merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 ->
+merge_docids(Iter, #merge_st{infos=Infos}=Acc, SU_pid) when length(Infos) > 1000 ->
     #merge_st{
         id_tree=IdTree0,
         seq_tree=SeqTree0,
         rem_seqs=RemSeqs
     } = Acc,
+    Merged = couch_emsort:get_merged(Iter),
+    {ok, Total} = couch_btree:full_reduce(SeqTree0),
+    Progress =
+    case ok of
+        _ when Merged < Total -> (Merged * 100) div Total;
+        _ -> 100
+    end,
+    gen_server:cast(SU_pid, {update_progress, Progress}),
     {ok, IdTree1} = couch_btree:add(IdTree0, Infos),
     {ok, SeqTree1} = couch_btree:add_remove(SeqTree0, [], RemSeqs),
     Acc1 = Acc#merge_st{
@@ -1398,8 +1398,8 @@ merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 ->
         rem_seqs=[],
         infos=[]
     },
-    merge_docids(Iter, Acc1);
-merge_docids(Iter, #merge_st{curr=Curr}=Acc) ->
+    merge_docids(Iter, Acc1, SU_pid);
+merge_docids(Iter, #merge_st{curr=Curr}=Acc, SU_pid) ->
     case next_info(Iter, Curr, []) of
         {NextIter, NewCurr, FDI, Seqs} ->
             Acc1 = Acc#merge_st{
@@ -1407,7 +1407,7 @@ merge_docids(Iter, #merge_st{curr=Curr}=Acc) ->
                 rem_seqs = Seqs ++ Acc#merge_st.rem_seqs,
                 curr = NewCurr
             },
-            merge_docids(NextIter, Acc1);
+            merge_docids(NextIter, Acc1, SU_pid);
         {finished, FDI, Seqs} ->
             Acc#merge_st{
                 infos = [FDI | Acc#merge_st.infos],
@@ -1437,16 +1437,9 @@ next_info(Iter, {Id, Seq, FDI}, Seqs) ->
     end.
 
 
-update_compact_task(NumChanges) ->
-    [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
-    Changes2 = Changes + NumChanges,
-    Progress = case Total of
-    0 ->
-        0;
-    _ ->
-        (Changes2 * 100) div Total
-    end,
-    couch_task_status:update([{changes_done, Changes2}, {progress, Progress}]).
+update_compact_task(NumChanges, DbName, Filepath) ->
+    SU_pid = get_status_updater(DbName, Filepath),
+    gen_server:cast(SU_pid, {update_task, NumChanges}).
 
 
 make_doc_summary(#db{compression = Comp}, {Body0, Atts0}) ->
@@ -1518,3 +1511,15 @@ hibernate_if_no_idle_limit() ->
         Timeout when is_integer(Timeout) ->
             Timeout
     end.
+
+get_status_updater(DbName, Filepath) ->
+    case erlang:get(status_updater) of
+    undefined ->
+        {ok, SU_pid} = couch_compact_status:start_link(DbName, Filepath),
+        erlang:put(status_updater, SU_pid),
+        SU_pid;
+    Pid ->
+        Pid
+    end.
+
+
diff --git a/src/couch/src/couch_emsort.erl b/src/couch/src/couch_emsort.erl
index 2a25a23223..079bc181b7 100644
--- a/src/couch/src/couch_emsort.erl
+++ b/src/couch/src/couch_emsort.erl
@@ -131,11 +131,15 @@
 
 -export([open/1, open/2, get_fd/1, get_state/1]).
 -export([add/2, merge/1, sort/1, iter/1, next/1]).
+-export([get_merged/1]).
 
 
 -record(ems, {
     fd,
     root,
+    added = 0,
+    sorted = 0,
+    merged = 0,
     bb_chunk = 10,
     chain_chunk = 100
 }).
@@ -151,6 +155,8 @@ open(Fd, Options) ->
 
 set_options(Ems, []) ->
     Ems;
+set_options(Ems, [{added, Added} | Rest]) ->
+    set_options(Ems#ems{added=Added}, Rest);
 set_options(Ems, [{root, Root} | Rest]) ->
     set_options(Ems#ems{root=Root}, Rest);
 set_options(Ems, [{chain_chunk, Count} | Rest]) when is_integer(Count) ->
@@ -163,8 +169,10 @@ get_fd(#ems{fd=Fd}) ->
     Fd.
 
 
-get_state(#ems{root=Root}) ->
-    Root.
+get_state(#ems{root=Root, added=Added}) ->
+    [{root, Root},{added,Added}].
+
+get_merged({#ems{merged=Merged}=Ems, _}) -> Merged.
 
 
 add(Ems, []) ->
@@ -196,16 +204,16 @@ iter(#ems{root={_, _}}) ->
 
 next({_Ems, []}) ->
     finished;
-next({Ems, Chains}) ->
+next({#ems{merged=MCount}=Ems, Chains}) ->
     {KV, RestChains} = choose_kv(small, Ems, Chains),
-    {ok, KV, {Ems, RestChains}}.
+    {ok, KV, {Ems#ems{merged=MCount+1}, RestChains}}.
 
 
-add_bb_pos(#ems{root=undefined}=Ems, Pos) ->
-    Ems#ems{root={[Pos], nil}};
-add_bb_pos(#ems{root={BB, Prev}}=Ems, Pos) ->
+add_bb_pos(#ems{root=undefined, added=ACount}=Ems, Pos) ->
+    Ems#ems{root={[Pos], nil}, added=ACount+1};
+add_bb_pos(#ems{root={BB, Prev}, added=ACount}=Ems, Pos) ->
     {NewBB, NewPrev} = append_item(Ems, {BB, Prev}, Pos, Ems#ems.bb_chunk),
-    Ems#ems{root={NewBB, NewPrev}}.
+    Ems#ems{root={NewBB, NewPrev}, added=ACount+1}.
 
 
 write_kvs(Ems, KVs) ->
@@ -232,15 +240,15 @@ decimate(#ems{root={BB, NextBB}}=Ems) ->
 
     % The first pass gives us a sort with pointers linked from
     % largest to smallest.
-    {RevBB, RevNextBB} = merge_back_bone(Ems, small, BB, NextBB),
+    {Ems1, {RevBB, RevNextBB}} = merge_back_bone(Ems, small, BB, NextBB),
 
     % We have to run a second pass so that links are pointed
     % back from smallest to largest.
-    {FwdBB, FwdNextBB} = merge_back_bone(Ems, big, RevBB, RevNextBB),
+    {Ems2, {FwdBB, FwdNextBB}} = merge_back_bone(Ems1, big, RevBB, RevNextBB),
 
     % Continue deicmating until we have an acceptable bound on
     % the number of keys to use.
-    decimate(Ems#ems{root={FwdBB, FwdNextBB}}).
+    decimate(Ems2#ems{root={FwdBB, FwdNextBB}}).
 
 
 merge_back_bone(Ems, Choose, BB, NextBB) ->
@@ -248,13 +256,36 @@ merge_back_bone(Ems, Choose, BB, NextBB) ->
     merge_rest_back_bone(Ems, Choose, NextBB, {[BBPos], nil}).
 
 
-merge_rest_back_bone(_Ems, _Choose, nil, Acc) ->
-    Acc;
+merge_rest_back_bone(Ems, _Choose, nil, Acc) ->
+    {Ems, Acc};
 merge_rest_back_bone(Ems, Choose, BBPos, Acc) ->
     {ok, {BB, NextBB}} = couch_file:pread_term(Ems#ems.fd, BBPos),
     NewPos = merge_chains(Ems, Choose, BB),
     {NewBB, NewPrev} = append_item(Ems, Acc, NewPos, Ems#ems.bb_chunk),
-    merge_rest_back_bone(Ems, Choose, NextBB, {NewBB, NewPrev}).
+
+    Total = Ems#ems.added * 2, % two passes of sort
+
+    SCount =
+    case Choose of
+    small -> 
+        Ems#ems.sorted + Ems#ems.bb_chunk;
+    _Else -> 
+        Ems#ems.sorted + Ems#ems.chain_chunk
+    end,
+
+    case erlang:get(status_updater) of
+    undefined ->
+        ok;
+    Pid ->
+        Progress =
+        case ok of
+            _ when SCount < Total -> (SCount * 100) div Total;
+            _ -> 100
+        end,
+        gen_server:cast(Pid, {update_progress, Progress})
+    end,
+
+    merge_rest_back_bone(Ems#ems{sorted=SCount}, Choose, NextBB, {NewBB, NewPrev}).
 
 
 merge_chains(Ems, Choose, BB) ->


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services