You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/09/06 22:57:51 UTC

[couchdb] 05/05: Optimize couch_emsort writes

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch compactor-optimize-emsort
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit d1a8fc58fabefc0ce75b7ca14df3ab27197cdf98
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Sep 6 16:46:04 2017 -0500

    Optimize couch_emsort writes
    
    This makes two related changes to couch_emsort. First it uses the new
    couch_file:append_terms/2 function to write out all values to disk so
    that they are not re-written during the merge phase before streaming
    key/values from disk.
    
    Secondly, this buffers KVs added to the emsort structure so that our
    chains are a consistent size. This helps to minimize the number of
    merge phases executed during decimation.
---
 src/couch/src/couch_db_updater.erl |  5 ++--
 src/couch/src/couch_emsort.erl     | 57 ++++++++++++++++++++++++++++----------
 2 files changed, 46 insertions(+), 16 deletions(-)

diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 226acb2..f3c79ce 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -1403,8 +1403,9 @@ commit_compaction_data(#db{header=OldHeader}=Db0, Fd) ->
     % fd instead of the Filepath stuff that commit_data/2
     % does.
     DataState = couch_db_header:id_tree_state(OldHeader),
-    MetaFd = couch_emsort:get_fd(Db0#db.id_tree),
-    MetaState = couch_emsort:get_state(Db0#db.id_tree),
+    {ok, Ems} = couch_emsort:flush(Db0#db.id_tree),
+    MetaFd = couch_emsort:get_fd(Ems),
+    MetaState = couch_emsort:get_state(Ems),
     Db1 = bind_id_tree(Db0, Db0#db.fd, DataState),
     Header = db_to_header(Db1, OldHeader),
     CompHeader = #comp_header{
diff --git a/src/couch/src/couch_emsort.erl b/src/couch/src/couch_emsort.erl
index 2a25a23..b70f8d3 100644
--- a/src/couch/src/couch_emsort.erl
+++ b/src/couch/src/couch_emsort.erl
@@ -129,13 +129,14 @@
 %     CA3                  CD3
 %
 
--export([open/1, open/2, get_fd/1, get_state/1]).
+-export([open/1, open/2, get_fd/1, get_state/1, flush/1]).
 -export([add/2, merge/1, sort/1, iter/1, next/1]).
 
 
 -record(ems, {
     fd,
     root,
+    curr_batch = [],
     bb_chunk = 10,
     chain_chunk = 100
 }).
@@ -163,15 +164,35 @@ get_fd(#ems{fd=Fd}) ->
     Fd.
 
 
-get_state(#ems{root=Root}) ->
+get_state(#ems{root=Root, curr_batch=[]}) ->
     Root.
 
 
 add(Ems, []) ->
     {ok, Ems};
 add(Ems, KVs) ->
-    Pos = write_kvs(Ems, KVs),
-    {ok, add_bb_pos(Ems, Pos)}.
+    #ems{
+        fd = Fd,
+        curr_batch = CurrBatch,
+        bb_chunk = BBChunk,
+        chain_chunk = ChainChunk
+    } = Ems,
+    KPs = write_values(Fd, KVs),
+    Limit = BBChunk * ChainChunk,
+    case length(KPs) + length(CurrBatch) > Limit of
+        true ->
+            flush_kps(Ems, KPs ++ CurrBatch);
+        false ->
+            {ok, Ems#ems{
+                curr_batch = KPs ++ CurrBatch
+            }}
+    end.
+
+
+flush(#ems{curr_batch=[]}=Ems) ->
+    {ok, Ems};
+flush(Ems) ->
+    flush_kps(Ems, Ems#ems.curr_batch).
 
 
 sort(#ems{}=Ems) ->
@@ -182,7 +203,8 @@ sort(#ems{}=Ems) ->
 merge(#ems{root=undefined}=Ems) ->
     {ok, Ems};
 merge(#ems{}=Ems) ->
-    {ok, decimate(Ems)}.
+    {ok, FlushedEms} = flush(Ems),
+    {ok, decimate(FlushedEms)}.
 
 
 iter(#ems{root=undefined}=Ems) ->
@@ -197,8 +219,9 @@ iter(#ems{root={_, _}}) ->
 next({_Ems, []}) ->
     finished;
 next({Ems, Chains}) ->
-    {KV, RestChains} = choose_kv(small, Ems, Chains),
-    {ok, KV, {Ems, RestChains}}.
+    {{Key, Pos}, RestChains} = choose_kv(small, Ems, Chains),
+    {ok, Val} = couch_file:pread_term(Ems#ems.fd, Pos),
+    {ok, {Key, Val}, {Ems, RestChains}}.
 
 
 add_bb_pos(#ems{root=undefined}=Ems, Pos) ->
@@ -208,16 +231,22 @@ add_bb_pos(#ems{root={BB, Prev}}=Ems, Pos) ->
     Ems#ems{root={NewBB, NewPrev}}.
 
 
-write_kvs(Ems, KVs) ->
+write_values(Fd, KVs) ->
+    {Keys, Vals} = lists:unzip(KVs),
+    {ok, PosSizes} = couch_file:append_terms(Fd, Vals),
+    {Pos, _} = lists:unzip(PosSizes),
+    lists:zip(Keys, Pos).
+
+
+flush_kps(#ems{fd=Fd}=Ems, KPs) ->
     % Write the list of KV's to disk in sorted order in chunks
     % of 100. Also make sure that the order is so that they
     % can be streamed in asscending order.
-    {LastKVs, LastPos} =
-    lists:foldr(fun(KV, Acc) ->
-        append_item(Ems, Acc, KV, Ems#ems.chain_chunk)
-    end, {[], nil}, lists:sort(KVs)),
-    {ok, Final, _} = couch_file:append_term(Ems#ems.fd, {LastKVs, LastPos}),
-    Final.
+    {LastKPs, LastPos} = lists:foldr(fun(KP, Acc) ->
+        append_item(Ems, Acc, KP, Ems#ems.chain_chunk)
+    end, {[], nil}, lists:sort(KPs)),
+    {ok, Final, _} = couch_file:append_term(Fd, {LastKPs, LastPos}),
+    {ok, add_bb_pos(Ems#ems{curr_batch = []}, Final)}.
 
 
 decimate(#ems{root={_BB, nil}}=Ems) ->

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.