You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/03/30 16:44:34 UTC
[1/4] couch commit: updated
refs/heads/feat-optimize-compaction-docid-phase to 14766b3
Repository: couchdb-couch
Updated Branches:
refs/heads/feat-optimize-compaction-docid-phase [created] 14766b3f7
Redo mix
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/bb201827
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/bb201827
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/bb201827
Branch: refs/heads/feat-optimize-compaction-docid-phase
Commit: bb20182746a474f97f5f39c6ba027681d9f400b4
Parents: 1db1337
Author: Paul J. Davis <pa...@gmail.com>
Authored: Tue Mar 28 16:24:58 2017 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Wed Mar 29 13:39:06 2017 -0500
----------------------------------------------------------------------
src/couch_db_updater.erl | 45 +++++++++++++++++++++++--------------------
1 file changed, 24 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/bb201827/src/couch_db_updater.erl
----------------------------------------------------------------------
diff --git a/src/couch_db_updater.erl b/src/couch_db_updater.erl
index db5397e..1afad34 100644
--- a/src/couch_db_updater.erl
+++ b/src/couch_db_updater.erl
@@ -1416,31 +1416,16 @@ copy_meta_data(#db{fd=Fd, header=Header}=Db, DocCount) ->
rem_seqs=[],
infos=[]
},
- Acc = merge_docids(Iter, Acc0),
- {ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Acc#merge_st.infos),
- {ok, SeqTree} = couch_btree:add_remove(
- Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
- ),
- update_compact_task(length(Acc#merge_st.infos)),
+ Acc1 = merge_docids(Iter, Acc0),
+ #merge_st{
+ id_tree = IdTree,
+ seq_tree = SeqTree
+ } = flush_merge_st(Acc1),
Db#db{id_tree=IdTree, seq_tree=SeqTree}.
merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 ->
- #merge_st{
- id_tree=IdTree0,
- seq_tree=SeqTree0,
- rem_seqs=RemSeqs
- } = Acc,
- {ok, IdTree1} = couch_btree:add(IdTree0, Infos),
- {ok, SeqTree1} = couch_btree:add_remove(SeqTree0, [], RemSeqs),
- update_compact_task(length(Infos)),
- Acc1 = Acc#merge_st{
- id_tree=IdTree1,
- seq_tree=SeqTree1,
- rem_seqs=[],
- infos=[]
- },
- merge_docids(Iter, Acc1);
+ merge_docids(Iter, flush_merge_st(Acc));
merge_docids(Iter, #merge_st{curr=Curr}=Acc) ->
case next_info(Iter, Curr, []) of
{NextIter, NewCurr, FDI, Seqs} ->
@@ -1479,6 +1464,24 @@ next_info(Iter, {Id, Seq, FDI}, Seqs) ->
end.
+flush_merge_st(MergeSt) ->
+ #merge_st{
+ id_tree=IdTree0,
+ seq_tree=SeqTree0,
+ infos=Infos,
+ rem_seqs=RemSeqs
+ } = MergeSt,
+ {ok, IdTree1} = couch_btree:add(IdTree0, Infos),
+ {ok, SeqTree1} = couch_btree:add_remove(SeqTree0, [], RemSeqs),
+ update_compact_task(length(Infos)),
+ MergeSt#merge_st{
+ id_tree=IdTree1,
+ seq_tree=SeqTree1,
+ infos=[],
+ rem_seqs=[]
+ }.
+
+
update_compact_task(NumChanges) ->
[Changes, Total] = couch_task_status:get([changes_done, total_changes]),
Changes2 = Changes + NumChanges,
[3/4] couch commit: updated
refs/heads/feat-optimize-compaction-docid-phase to 14766b3
Posted by da...@apache.org.
Use the parallel write API when updating btrees.
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/880b6b7e
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/880b6b7e
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/880b6b7e
Branch: refs/heads/feat-optimize-compaction-docid-phase
Commit: 880b6b7e34c47bc1bcccc5eaba8f7b2660ab544b
Parents: ce6778f
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Mar 30 11:43:43 2017 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Thu Mar 30 11:43:43 2017 -0500
----------------------------------------------------------------------
src/couch_btree.erl | 19 ++++++++-----------
1 file changed, 8 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/880b6b7e/src/couch_btree.erl
----------------------------------------------------------------------
diff --git a/src/couch_btree.erl b/src/couch_btree.erl
index adbc92b..e4b85ad 100644
--- a/src/couch_btree.erl
+++ b/src/couch_btree.erl
@@ -427,17 +427,14 @@ write_node(#btree{fd = Fd, compression = Comp} = Bt, NodeType, NodeList) ->
% split up nodes into smaller sizes
NodeListList = chunkify(NodeList),
% now write out each chunk and return the KeyPointer pairs for those nodes
- ResultList = [
- begin
- {ok, Pointer, Size} = couch_file:append_term(
- Fd, {NodeType, ANodeList}, [{compression, Comp}]),
- {LastKey, _} = lists:last(ANodeList),
- SubTreeSize = reduce_tree_size(NodeType, Size, ANodeList),
- {LastKey, {Pointer, reduce_node(Bt, NodeType, ANodeList), SubTreeSize}}
- end
- ||
- ANodeList <- NodeListList
- ],
+ ToWrite = [{NodeType, ANodeList} || ANodeList <- NodeListList],
+ WriteOpts = [{compression, Comp}],
+ {ok, PointerSizePairs} = couch_file:append_terms(Fd, ToWrite, WriteOpts),
+ ResultList = lists:zipwith(fun(ANodeList, {Pointer, Size}) ->
+ {LastKey, _} = lists:last(ANodeList),
+ SubTreeSize = reduce_tree_size(NodeType, Size, ANodeList),
+ {LastKey, {Pointer, reduce_node(Bt, NodeType, ANodeList), SubTreeSize}}
+ end, NodeListList, PointerSizePairs),
{ok, ResultList}.
modify_kpnode(Bt, {}, _LowerBound, Actions, [], QueryOutput) ->
[4/4] couch commit: updated
refs/heads/feat-optimize-compaction-docid-phase to 14766b3
Posted by da...@apache.org.
Use parallel file API to avoid copying revision trees multiple times
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/14766b3f
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/14766b3f
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/14766b3f
Branch: refs/heads/feat-optimize-compaction-docid-phase
Commit: 14766b3f7bb07a23df8ae4766e0544ae57ea221f
Parents: 880b6b7
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Mar 30 11:44:15 2017 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Thu Mar 30 11:44:15 2017 -0500
----------------------------------------------------------------------
src/couch_db_updater.erl | 20 +++++++++++++++-----
1 file changed, 15 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/14766b3f/src/couch_db_updater.erl
----------------------------------------------------------------------
diff --git a/src/couch_db_updater.erl b/src/couch_db_updater.erl
index 1afad34..875cc4b 100644
--- a/src/couch_db_updater.erl
+++ b/src/couch_db_updater.erl
@@ -27,6 +27,7 @@
}).
-record(merge_st, {
+ fd,
id_tree,
seq_tree,
curr,
@@ -1097,17 +1098,23 @@ copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
[Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing]
end,
+ {ok, IdEms} = update_emsort(NewDb, NewInfos),
{ok, SeqTree} = couch_btree:add_remove(
NewDb#db.seq_tree, NewInfos, RemoveSeqs),
- FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) ->
- {{Id, Seq}, FDI}
- end, NewInfos),
- {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs),
update_compact_task(length(NewInfos)),
NewDb#db{id_tree=IdEms, seq_tree=SeqTree}.
+update_emsort(#db{id_tree=IdTree}, FDIs) ->
+ Fd = couch_emsort:get_fd(IdTree),
+ {ok, PosSizePairs} = couch_file:append_terms(Fd, FDIs),
+ KVs = lists:zipwith(fun(#full_doc_info{id=Id, update_seq=Seq}, {Pos, _}) ->
+ {{Id, Seq}, Pos}
+ end, FDIs, PosSizePairs),
+ couch_emsort:add(IdTree, KVs).
+
+
copy_compact(Db, NewDb0, Retry) ->
Compression = couch_compress:get_compression_method(),
NewDb = NewDb0#db{compression=Compression},
@@ -1411,6 +1418,7 @@ copy_meta_data(#db{fd=Fd, header=Header}=Db, DocCount) ->
]),
{ok, Iter} = couch_emsort:iter(Src),
Acc0 = #merge_st{
+ fd=couch_emsort:get_fd(Src),
id_tree=IdTree0,
seq_tree=Db#db.seq_tree,
rem_seqs=[],
@@ -1466,11 +1474,13 @@ next_info(Iter, {Id, Seq, FDI}, Seqs) ->
flush_merge_st(MergeSt) ->
#merge_st{
+ fd=Fd,
id_tree=IdTree0,
seq_tree=SeqTree0,
- infos=Infos,
+ infos=PosList,
rem_seqs=RemSeqs
} = MergeSt,
+ {ok, Infos} = couch_file:mpread_terms(Fd, PosList),
{ok, IdTree1} = couch_btree:add(IdTree0, Infos),
{ok, SeqTree1} = couch_btree:add_remove(SeqTree0, [], RemSeqs),
update_compact_task(length(Infos)),
[2/4] couch commit: updated
refs/heads/feat-optimize-compaction-docid-phase to 14766b3
Posted by da...@apache.org.
Add parallel read and write APIs to couch_file
This allows for terms and binaries to be read and written in a single
call to couch_file.
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/ce6778f4
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/ce6778f4
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/ce6778f4
Branch: refs/heads/feat-optimize-compaction-docid-phase
Commit: ce6778f40b00c312217133990b89342b35afdd56
Parents: bb20182
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Mar 29 13:39:21 2017 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Thu Mar 30 11:43:21 2017 -0500
----------------------------------------------------------------------
src/couch_file.erl | 141 +++++++++++++++++++++++++++++++++++++++++-------
1 file changed, 123 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/ce6778f4/src/couch_file.erl
----------------------------------------------------------------------
diff --git a/src/couch_file.erl b/src/couch_file.erl
index d7b176e..69dbaa4 100644
--- a/src/couch_file.erl
+++ b/src/couch_file.erl
@@ -39,9 +39,11 @@
% public API
-export([open/1, open/2, close/1, bytes/1, sync/1, truncate/2, set_db_pid/2]).
-export([pread_term/2, pread_iolist/2, pread_binary/2]).
+-export([mpread_terms/2, mpread_iolists/2, mpread_binaries/2]).
-export([append_binary/2, append_binary_md5/2]).
-export([append_raw_chunk/2, assemble_file_chunk/1, assemble_file_chunk/2]).
-export([append_term/2, append_term/3, append_term_md5/2, append_term_md5/3]).
+-export([append_terms/2, append_terms/3, append_binaries/2]).
-export([write_header/2, read_header/1]).
-export([delete/2, delete/3, nuke_dir/2, init_delete_dir/1]).
@@ -128,7 +130,7 @@ append_term_md5(Fd, Term, Options) ->
append_binary(Fd, Bin) ->
ioq:call(Fd, {append_bin, assemble_file_chunk(Bin)}, erlang:get(io_priority)).
-
+
append_binary_md5(Fd, Bin) ->
ioq:call(Fd,
{append_bin, assemble_file_chunk(Bin, couch_crypto:hash(md5, Bin))},
@@ -144,6 +146,21 @@ assemble_file_chunk(Bin) ->
assemble_file_chunk(Bin, Md5) ->
[<<1:1/integer, (iolist_size(Bin)):31/integer>>, Md5, Bin].
+
+append_terms(Fd, Terms) when is_list(Terms) ->
+ append_terms(Fd, Terms, []).
+
+
+append_terms(Fd, Terms, Options) ->
+ Comp = couch_util:get_value(compression, Options, ?DEFAULT_COMPRESSION),
+ Compressed = [couch_compress:compress(Term, Comp) || Term <- Terms],
+ append_binaries(Fd, Compressed).
+
+
+append_binaries(Fd, Bins0) when is_list(Bins0) ->
+ Bins1 = [assemble_file_chunk(B) || B <- Bins0],
+ ioq:call(Fd, {append_bins, Bins1}, erlang:get(io_priority)).
+
%%----------------------------------------------------------------------
%% Purpose: Reads a term from a file that was written with append_term
%% Args: Pos, the offset into the file where the term is serialized.
@@ -171,17 +188,29 @@ pread_binary(Fd, Pos) ->
pread_iolist(Fd, Pos) ->
case ioq:call(Fd, {pread_iolist, Pos}, erlang:get(io_priority)) of
- {ok, IoList, <<>>} ->
- {ok, IoList};
- {ok, IoList, Md5} ->
- case couch_crypto:hash(md5, IoList) of
- Md5 ->
- {ok, IoList};
- _ ->
- couch_log:emergency("File corruption in ~p at position ~B",
- [Fd, Pos]),
- exit({file_corruption, <<"file corruption">>})
- end;
+ {ok, Bin, MaybeMd5} ->
+ {ok, verify_md5(Fd, Pos, Bin, MaybeMd5)};
+ Error ->
+ Error
+ end.
+
+
+mpread_terms(Fd, PosList) ->
+ {ok, Bins} = mpread_binaries(Fd, PosList),
+ {ok, lists:map(fun couch_compress:decompress/1, Bins)}.
+
+
+mpread_binaries(Fd, PosList) ->
+ {ok, Results} = mpread_iolists(Fd, PosList),
+ {ok, lists:map(fun iolist_to_binary/1, Results)}.
+
+
+mpread_iolists(Fd, PosList) ->
+ case ioq:call(Fd, {mpread_iolists, PosList}, erlang:get(io_priority)) of
+ {ok, Results} ->
+ {ok, lists:zipwith(fun(Pos, {DataBin, MaybeMd5}) ->
+ verify_md5(Fd, Pos, DataBin, MaybeMd5)
+ end, PosList, Results)};
Error ->
Error
end.
@@ -425,6 +454,30 @@ handle_call({pread_iolist, Pos}, _From, File) ->
{reply, {ok, Iolist, <<>>}, File}
end;
+handle_call({mpread_iolists, PosList}, _From, File) ->
+ PosLenPairs = lists:map(fun(Pos) -> {Pos, 4} end, PosList),
+ LengthBins = read_raw_iolist_multi(File, PosLenPairs),
+ {HasMd5List, DataPosLenPairs} = lists:foldr(fun
+ ({LenIoList, NextPos}, {HasMd5Acc, PairAcc}) ->
+ case iolist_to_binary(LenIoList) of
+ <<1:1/integer, Len:31/integer>> -> % An MD5-prefixed term
+ {[true | HasMd5Acc], [{NextPos, Len + 16} | PairAcc]};
+ <<0:1/integer, Len:31/integer>> ->
+ {[false | HasMd5Acc], [{NextPos, Len} | PairAcc]}
+ end
+ end, {[], []}, LengthBins),
+ DataBins = read_raw_iolist_multi(File, DataPosLenPairs),
+ Results = lists:zipwith(fun(HasMd5, {DataBin, _}) ->
+ case HasMd5 of
+ true ->
+ {Md5, Rest} = extract_md5(DataBin),
+ {Rest, Md5};
+ false ->
+ {DataBin, <<>>}
+ end
+ end, HasMd5List, DataBins),
+ {reply, {ok, Results}, File};
+
handle_call(bytes, _From, #file{fd = Fd} = File) ->
{reply, file:position(Fd, eof), File};
@@ -458,6 +511,20 @@ handle_call({append_bin, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
{reply, Error, reset_eof(File)}
end;
+handle_call({append_bins, Bins}, _From, #file{fd = Fd, eof = Pos} = File) ->
+ {NewPos, BlocksOut, RespOut} =
+ lists:foldl(fun(Bin, {PosAcc, BlocksAcc, RespAcc}) ->
+ Blocks = make_blocks(PosAcc rem ?SIZE_BLOCK, Bin),
+ Size = iolist_size(Blocks),
+ {PosAcc + Size, [Blocks | BlocksAcc], [{PosAcc, Size} | RespAcc]}
+ end, {Pos, [], []}, Bins),
+ case file:write(Fd, lists:reverse(BlocksOut)) of
+ ok ->
+ {reply, {ok, lists:reverse(RespOut)}, File#file{eof = NewPos}};
+ Error ->
+ {reply, Error, reset_eof(File)}
+ end;
+
handle_call({write_header, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
BinSize = byte_size(Bin),
case Pos rem ?SIZE_BLOCK of
@@ -502,6 +569,19 @@ handle_info({'DOWN', Ref, process, _Pid, _Info}, #file{db_monitor=Ref}=File) ->
end.
+verify_md5(_Fd, _Pos, Bin, <<>>) ->
+ Bin;
+verify_md5(Fd, Pos, Bin, Md5) ->
+ case couch_crypto:hash(md5, Bin) of
+ Md5 ->
+ Bin;
+ _ ->
+ Msg = "File corruption in ~p at position ~B",
+ couch_log:emergency(Msg, [Fd, Pos]),
+ exit({file_corruption, <<"file corruption">>})
+ end.
+
+
find_header(Fd, Block) ->
case (catch load_header(Fd, Block)) of
{ok, Bin} ->
@@ -583,23 +663,48 @@ find_newest_header(Fd, [{Location, Size} | LocationSizes]) ->
{Data::iolist(), CurPos::non_neg_integer()}.
read_raw_iolist_int(Fd, {Pos, _Size}, Len) -> % 0110 UPGRADE CODE
read_raw_iolist_int(Fd, Pos, Len);
-read_raw_iolist_int(#file{fd = Fd, pread_limit = Limit} = F, Pos, Len) ->
+read_raw_iolist_int(#file{fd = Fd} = File, Pos, Len) ->
+ {TotalBytes, BlockOffset, FinalPos} = calculate_read_info(File, Pos, Len),
+ {ok, <<RawBin:TotalBytes/binary>>} = file:pread(Fd, Pos, TotalBytes),
+ {remove_block_prefixes(BlockOffset, RawBin), FinalPos}.
+
+
+-spec read_raw_iolist_multi(#file{}, [{Pos::non_neg_integer(), Len::non_neg_integer()}]) ->
+ [{Data::iolist(), CurrPos::non_neg_integer()}].
+read_raw_iolist_multi(#file{fd = Fd} = File, PosLenPairs) ->
+ ReadInfo = lists:map(fun({Pos, Len}) ->
+ calculate_read_info(File, Pos, Len)
+ end, PosLenPairs),
+ LocNums = lists:zipwith(fun({Pos, _}, {TotalBytes, _, _}) ->
+ {Pos, TotalBytes}
+ end, PosLenPairs, ReadInfo),
+ {ok, Bins} = file:pread(Fd, LocNums),
+ lists:zipwith(fun
+ ({TotalBytes, BlockOffset, FinalPos}, Bin) ->
+ <<RawBin:TotalBytes/binary>> = Bin,
+ {remove_block_prefixes(BlockOffset, RawBin), FinalPos}
+ end, ReadInfo, Bins).
+
+
+-spec calculate_read_info(#file{}, Pos::non_neg_integer(), Len::non_neg_integer()) ->
+ Size::non_neg_integer().
+calculate_read_info(#file{eof = Eof, pread_limit = Limit}, Pos, Len) ->
BlockOffset = Pos rem ?SIZE_BLOCK,
TotalBytes = calculate_total_read_len(BlockOffset, Len),
case Pos + TotalBytes of
- Size when Size > F#file.eof ->
+ FinalPos when FinalPos > Eof ->
couch_stats:increment_counter([pread, exceed_eof]),
{_Fd, Filepath} = get(couch_file_fd),
throw({read_beyond_eof, Filepath});
- Size when Size > Limit ->
+ FinalPos when FinalPos > Limit ->
couch_stats:increment_counter([pread, exceed_limit]),
{_Fd, Filepath} = get(couch_file_fd),
throw({exceed_pread_limit, Filepath, Limit});
- Size ->
- {ok, <<RawBin:TotalBytes/binary>>} = file:pread(Fd, Pos, TotalBytes),
- {remove_block_prefixes(BlockOffset, RawBin), Size}
+ FinalPos ->
+ {TotalBytes, BlockOffset, FinalPos}
end.
+
-spec extract_md5(iolist()) -> {binary(), iolist()}.
extract_md5(FullIoList) ->
{Md5List, IoList} = split_iolist(FullIoList, 16, []),