You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/09/06 22:57:51 UTC
[couchdb] 05/05: Optimize couch_emsort writes
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch compactor-optimize-emsort
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit d1a8fc58fabefc0ce75b7ca14df3ab27197cdf98
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Sep 6 16:46:04 2017 -0500
Optimize couch_emsort writes
This makes two related changes to couch_emsort. First it uses the new
couch_file:append_terms/2 function to write out all values to disk so
that they are not re-written during the merge phase before streaming
key/values from disk.
Secondly, this buffers KVs added to the emsort structure so that our
chains are a consistent size. This helps to minimize the number of
merge phases executed during decimation.
---
src/couch/src/couch_db_updater.erl | 5 ++--
src/couch/src/couch_emsort.erl | 57 ++++++++++++++++++++++++++++----------
2 files changed, 46 insertions(+), 16 deletions(-)
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 226acb2..f3c79ce 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -1403,8 +1403,9 @@ commit_compaction_data(#db{header=OldHeader}=Db0, Fd) ->
% fd instead of the Filepath stuff that commit_data/2
% does.
DataState = couch_db_header:id_tree_state(OldHeader),
- MetaFd = couch_emsort:get_fd(Db0#db.id_tree),
- MetaState = couch_emsort:get_state(Db0#db.id_tree),
+ {ok, Ems} = couch_emsort:flush(Db0#db.id_tree),
+ MetaFd = couch_emsort:get_fd(Ems),
+ MetaState = couch_emsort:get_state(Ems),
Db1 = bind_id_tree(Db0, Db0#db.fd, DataState),
Header = db_to_header(Db1, OldHeader),
CompHeader = #comp_header{
diff --git a/src/couch/src/couch_emsort.erl b/src/couch/src/couch_emsort.erl
index 2a25a23..b70f8d3 100644
--- a/src/couch/src/couch_emsort.erl
+++ b/src/couch/src/couch_emsort.erl
@@ -129,13 +129,14 @@
% CA3 CD3
%
--export([open/1, open/2, get_fd/1, get_state/1]).
+-export([open/1, open/2, get_fd/1, get_state/1, flush/1]).
-export([add/2, merge/1, sort/1, iter/1, next/1]).
-record(ems, {
fd,
root,
+ curr_batch = [],
bb_chunk = 10,
chain_chunk = 100
}).
@@ -163,15 +164,35 @@ get_fd(#ems{fd=Fd}) ->
Fd.
-get_state(#ems{root=Root}) ->
+get_state(#ems{root=Root, curr_batch=[]}) ->
Root.
add(Ems, []) ->
{ok, Ems};
add(Ems, KVs) ->
- Pos = write_kvs(Ems, KVs),
- {ok, add_bb_pos(Ems, Pos)}.
+ #ems{
+ fd = Fd,
+ curr_batch = CurrBatch,
+ bb_chunk = BBChunk,
+ chain_chunk = ChainChunk
+ } = Ems,
+ KPs = write_values(Fd, KVs),
+ Limit = BBChunk * ChainChunk,
+ case length(KPs) + length(CurrBatch) > Limit of
+ true ->
+ flush_kps(Ems, KPs ++ CurrBatch);
+ false ->
+ {ok, Ems#ems{
+ curr_batch = KPs ++ CurrBatch
+ }}
+ end.
+
+
+flush(#ems{curr_batch=[]}=Ems) ->
+ {ok, Ems};
+flush(Ems) ->
+ flush_kps(Ems, Ems#ems.curr_batch).
sort(#ems{}=Ems) ->
@@ -182,7 +203,8 @@ sort(#ems{}=Ems) ->
merge(#ems{root=undefined}=Ems) ->
{ok, Ems};
merge(#ems{}=Ems) ->
- {ok, decimate(Ems)}.
+ {ok, FlushedEms} = flush(Ems),
+ {ok, decimate(FlushedEms)}.
iter(#ems{root=undefined}=Ems) ->
@@ -197,8 +219,9 @@ iter(#ems{root={_, _}}) ->
next({_Ems, []}) ->
finished;
next({Ems, Chains}) ->
- {KV, RestChains} = choose_kv(small, Ems, Chains),
- {ok, KV, {Ems, RestChains}}.
+ {{Key, Pos}, RestChains} = choose_kv(small, Ems, Chains),
+ {ok, Val} = couch_file:pread_term(Ems#ems.fd, Pos),
+ {ok, {Key, Val}, {Ems, RestChains}}.
add_bb_pos(#ems{root=undefined}=Ems, Pos) ->
@@ -208,16 +231,22 @@ add_bb_pos(#ems{root={BB, Prev}}=Ems, Pos) ->
Ems#ems{root={NewBB, NewPrev}}.
-write_kvs(Ems, KVs) ->
+write_values(Fd, KVs) ->
+ {Keys, Vals} = lists:unzip(KVs),
+ {ok, PosSizes} = couch_file:append_terms(Fd, Vals),
+ {Pos, _} = lists:unzip(PosSizes),
+ lists:zip(Keys, Pos).
+
+
+flush_kps(#ems{fd=Fd}=Ems, KPs) ->
% Write the list of KV's to disk in sorted order in chunks
% of 100. Also make sure that the order is so that they
% can be streamed in asscending order.
- {LastKVs, LastPos} =
- lists:foldr(fun(KV, Acc) ->
- append_item(Ems, Acc, KV, Ems#ems.chain_chunk)
- end, {[], nil}, lists:sort(KVs)),
- {ok, Final, _} = couch_file:append_term(Ems#ems.fd, {LastKVs, LastPos}),
- Final.
+ {LastKPs, LastPos} = lists:foldr(fun(KP, Acc) ->
+ append_item(Ems, Acc, KP, Ems#ems.chain_chunk)
+ end, {[], nil}, lists:sort(KPs)),
+ {ok, Final, _} = couch_file:append_term(Fd, {LastKPs, LastPos}),
+ {ok, add_bb_pos(Ems#ems{curr_batch = []}, Final)}.
decimate(#ems{root={_BB, nil}}=Ems) ->
--
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.