You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/03/30 16:44:34 UTC

[1/4] couch commit: updated refs/heads/feat-optimize-compaction-docid-phase to 14766b3

Repository: couchdb-couch
Updated Branches:
  refs/heads/feat-optimize-compaction-docid-phase [created] 14766b3f7


Redo mix


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/bb201827
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/bb201827
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/bb201827

Branch: refs/heads/feat-optimize-compaction-docid-phase
Commit: bb20182746a474f97f5f39c6ba027681d9f400b4
Parents: 1db1337
Author: Paul J. Davis <pa...@gmail.com>
Authored: Tue Mar 28 16:24:58 2017 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Wed Mar 29 13:39:06 2017 -0500

----------------------------------------------------------------------
 src/couch_db_updater.erl | 45 +++++++++++++++++++++++--------------------
 1 file changed, 24 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/bb201827/src/couch_db_updater.erl
----------------------------------------------------------------------
diff --git a/src/couch_db_updater.erl b/src/couch_db_updater.erl
index db5397e..1afad34 100644
--- a/src/couch_db_updater.erl
+++ b/src/couch_db_updater.erl
@@ -1416,31 +1416,16 @@ copy_meta_data(#db{fd=Fd, header=Header}=Db, DocCount) ->
         rem_seqs=[],
         infos=[]
     },
-    Acc = merge_docids(Iter, Acc0),
-    {ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Acc#merge_st.infos),
-    {ok, SeqTree} = couch_btree:add_remove(
-        Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
-    ),
-    update_compact_task(length(Acc#merge_st.infos)),
+    Acc1 = merge_docids(Iter, Acc0),
+    #merge_st{
+        id_tree = IdTree,
+        seq_tree = SeqTree
+    } = flush_merge_st(Acc1),
     Db#db{id_tree=IdTree, seq_tree=SeqTree}.
 
 
 merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 ->
-    #merge_st{
-        id_tree=IdTree0,
-        seq_tree=SeqTree0,
-        rem_seqs=RemSeqs
-    } = Acc,
-    {ok, IdTree1} = couch_btree:add(IdTree0, Infos),
-    {ok, SeqTree1} = couch_btree:add_remove(SeqTree0, [], RemSeqs),
-    update_compact_task(length(Infos)),
-    Acc1 = Acc#merge_st{
-        id_tree=IdTree1,
-        seq_tree=SeqTree1,
-        rem_seqs=[],
-        infos=[]
-    },
-    merge_docids(Iter, Acc1);
+    merge_docids(Iter, flush_merge_st(Acc));
 merge_docids(Iter, #merge_st{curr=Curr}=Acc) ->
     case next_info(Iter, Curr, []) of
         {NextIter, NewCurr, FDI, Seqs} ->
@@ -1479,6 +1464,24 @@ next_info(Iter, {Id, Seq, FDI}, Seqs) ->
     end.
 
 
+flush_merge_st(MergeSt) ->
+    #merge_st{
+        id_tree=IdTree0,
+        seq_tree=SeqTree0,
+        infos=Infos,
+        rem_seqs=RemSeqs
+    } = MergeSt,
+    {ok, IdTree1} = couch_btree:add(IdTree0, Infos),
+    {ok, SeqTree1} = couch_btree:add_remove(SeqTree0, [], RemSeqs),
+    update_compact_task(length(Infos)),
+    MergeSt#merge_st{
+        id_tree=IdTree1,
+        seq_tree=SeqTree1,
+        infos=[],
+        rem_seqs=[]
+    }.
+
+
 update_compact_task(NumChanges) ->
     [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
     Changes2 = Changes + NumChanges,


[3/4] couch commit: updated refs/heads/feat-optimize-compaction-docid-phase to 14766b3

Posted by da...@apache.org.
Use the parallel write API when updating btrees.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/880b6b7e
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/880b6b7e
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/880b6b7e

Branch: refs/heads/feat-optimize-compaction-docid-phase
Commit: 880b6b7e34c47bc1bcccc5eaba8f7b2660ab544b
Parents: ce6778f
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Mar 30 11:43:43 2017 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Thu Mar 30 11:43:43 2017 -0500

----------------------------------------------------------------------
 src/couch_btree.erl | 19 ++++++++-----------
 1 file changed, 8 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/880b6b7e/src/couch_btree.erl
----------------------------------------------------------------------
diff --git a/src/couch_btree.erl b/src/couch_btree.erl
index adbc92b..e4b85ad 100644
--- a/src/couch_btree.erl
+++ b/src/couch_btree.erl
@@ -427,17 +427,14 @@ write_node(#btree{fd = Fd, compression = Comp} = Bt, NodeType, NodeList) ->
     % split up nodes into smaller sizes
     NodeListList = chunkify(NodeList),
     % now write out each chunk and return the KeyPointer pairs for those nodes
-    ResultList = [
-        begin
-            {ok, Pointer, Size} = couch_file:append_term(
-                Fd, {NodeType, ANodeList}, [{compression, Comp}]),
-            {LastKey, _} = lists:last(ANodeList),
-            SubTreeSize = reduce_tree_size(NodeType, Size, ANodeList),
-            {LastKey, {Pointer, reduce_node(Bt, NodeType, ANodeList), SubTreeSize}}
-        end
-    ||
-        ANodeList <- NodeListList
-    ],
+    ToWrite = [{NodeType, ANodeList} || ANodeList <- NodeListList],
+    WriteOpts = [{compression, Comp}],
+    {ok, PointerSizePairs} = couch_file:append_terms(Fd, ToWrite, WriteOpts),
+    ResultList = lists:zipwith(fun(ANodeList, {Pointer, Size}) ->
+        {LastKey, _} = lists:last(ANodeList),
+        SubTreeSize = reduce_tree_size(NodeType, Size, ANodeList),
+        {LastKey, {Pointer, reduce_node(Bt, NodeType, ANodeList), SubTreeSize}}
+    end, NodeListList, PointerSizePairs),
     {ok, ResultList}.
 
 modify_kpnode(Bt, {}, _LowerBound, Actions, [], QueryOutput) ->


[4/4] couch commit: updated refs/heads/feat-optimize-compaction-docid-phase to 14766b3

Posted by da...@apache.org.
Use parallel file API to avoid copying revision trees multiple times


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/14766b3f
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/14766b3f
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/14766b3f

Branch: refs/heads/feat-optimize-compaction-docid-phase
Commit: 14766b3f7bb07a23df8ae4766e0544ae57ea221f
Parents: 880b6b7
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Mar 30 11:44:15 2017 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Thu Mar 30 11:44:15 2017 -0500

----------------------------------------------------------------------
 src/couch_db_updater.erl | 20 +++++++++++++++-----
 1 file changed, 15 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/14766b3f/src/couch_db_updater.erl
----------------------------------------------------------------------
diff --git a/src/couch_db_updater.erl b/src/couch_db_updater.erl
index 1afad34..875cc4b 100644
--- a/src/couch_db_updater.erl
+++ b/src/couch_db_updater.erl
@@ -27,6 +27,7 @@
 }).
 
 -record(merge_st, {
+    fd,
     id_tree,
     seq_tree,
     curr,
@@ -1097,17 +1098,23 @@ copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
         [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing]
     end,
 
+    {ok, IdEms} = update_emsort(NewDb, NewInfos),
     {ok, SeqTree} = couch_btree:add_remove(
             NewDb#db.seq_tree, NewInfos, RemoveSeqs),
 
-    FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) ->
-        {{Id, Seq}, FDI}
-    end, NewInfos),
-    {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs),
     update_compact_task(length(NewInfos)),
     NewDb#db{id_tree=IdEms, seq_tree=SeqTree}.
 
 
+update_emsort(#db{id_tree=IdTree}, FDIs) ->
+    Fd = couch_emsort:get_fd(IdTree),
+    {ok, PosSizePairs} = couch_file:append_terms(Fd, FDIs),
+    KVs = lists:zipwith(fun(#full_doc_info{id=Id, update_seq=Seq}, {Pos, _}) ->
+        {{Id, Seq}, Pos}
+    end, FDIs, PosSizePairs),
+    couch_emsort:add(IdTree, KVs).
+
+
 copy_compact(Db, NewDb0, Retry) ->
     Compression = couch_compress:get_compression_method(),
     NewDb = NewDb0#db{compression=Compression},
@@ -1411,6 +1418,7 @@ copy_meta_data(#db{fd=Fd, header=Header}=Db, DocCount) ->
     ]),
     {ok, Iter} = couch_emsort:iter(Src),
     Acc0 = #merge_st{
+        fd=couch_emsort:get_fd(Src),
         id_tree=IdTree0,
         seq_tree=Db#db.seq_tree,
         rem_seqs=[],
@@ -1466,11 +1474,13 @@ next_info(Iter, {Id, Seq, FDI}, Seqs) ->
 
 flush_merge_st(MergeSt) ->
     #merge_st{
+        fd=Fd,
         id_tree=IdTree0,
         seq_tree=SeqTree0,
-        infos=Infos,
+        infos=PosList,
         rem_seqs=RemSeqs
     } = MergeSt,
+    {ok, Infos} = couch_file:mpread_terms(Fd, PosList),
     {ok, IdTree1} = couch_btree:add(IdTree0, Infos),
     {ok, SeqTree1} = couch_btree:add_remove(SeqTree0, [], RemSeqs),
     update_compact_task(length(Infos)),


[2/4] couch commit: updated refs/heads/feat-optimize-compaction-docid-phase to 14766b3

Posted by da...@apache.org.
Add parallel read and write APIs to couch_file

This allows for terms and binaries to be read and written in a single
call to couch_file.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/ce6778f4
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/ce6778f4
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/ce6778f4

Branch: refs/heads/feat-optimize-compaction-docid-phase
Commit: ce6778f40b00c312217133990b89342b35afdd56
Parents: bb20182
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Mar 29 13:39:21 2017 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Thu Mar 30 11:43:21 2017 -0500

----------------------------------------------------------------------
 src/couch_file.erl | 141 +++++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 123 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/ce6778f4/src/couch_file.erl
----------------------------------------------------------------------
diff --git a/src/couch_file.erl b/src/couch_file.erl
index d7b176e..69dbaa4 100644
--- a/src/couch_file.erl
+++ b/src/couch_file.erl
@@ -39,9 +39,11 @@
 % public API
 -export([open/1, open/2, close/1, bytes/1, sync/1, truncate/2, set_db_pid/2]).
 -export([pread_term/2, pread_iolist/2, pread_binary/2]).
+-export([mpread_terms/2, mpread_iolists/2, mpread_binaries/2]).
 -export([append_binary/2, append_binary_md5/2]).
 -export([append_raw_chunk/2, assemble_file_chunk/1, assemble_file_chunk/2]).
 -export([append_term/2, append_term/3, append_term_md5/2, append_term_md5/3]).
+-export([append_terms/2, append_terms/3, append_binaries/2]).
 -export([write_header/2, read_header/1]).
 -export([delete/2, delete/3, nuke_dir/2, init_delete_dir/1]).
 
@@ -128,7 +130,7 @@ append_term_md5(Fd, Term, Options) ->
 
 append_binary(Fd, Bin) ->
     ioq:call(Fd, {append_bin, assemble_file_chunk(Bin)}, erlang:get(io_priority)).
-    
+
 append_binary_md5(Fd, Bin) ->
     ioq:call(Fd,
         {append_bin, assemble_file_chunk(Bin, couch_crypto:hash(md5, Bin))},
@@ -144,6 +146,21 @@ assemble_file_chunk(Bin) ->
 assemble_file_chunk(Bin, Md5) ->
     [<<1:1/integer, (iolist_size(Bin)):31/integer>>, Md5, Bin].
 
+
+append_terms(Fd, Terms) when is_list(Terms) ->
+    append_terms(Fd, Terms, []).
+
+
+append_terms(Fd, Terms, Options) ->
+    Comp = couch_util:get_value(compression, Options, ?DEFAULT_COMPRESSION),
+    Compressed = [couch_compress:compress(Term, Comp) || Term <- Terms],
+    append_binaries(Fd, Compressed).
+
+
+append_binaries(Fd, Bins0) when is_list(Bins0) ->
+    Bins1 = [assemble_file_chunk(B) || B <- Bins0],
+    ioq:call(Fd, {append_bins, Bins1}, erlang:get(io_priority)).
+
 %%----------------------------------------------------------------------
 %% Purpose: Reads a term from a file that was written with append_term
 %% Args:    Pos, the offset into the file where the term is serialized.
@@ -171,17 +188,29 @@ pread_binary(Fd, Pos) ->
 
 pread_iolist(Fd, Pos) ->
     case ioq:call(Fd, {pread_iolist, Pos}, erlang:get(io_priority)) of
-    {ok, IoList, <<>>} ->
-        {ok, IoList};
-    {ok, IoList, Md5} ->
-        case couch_crypto:hash(md5, IoList) of
-        Md5 ->
-            {ok, IoList};
-        _ ->
-            couch_log:emergency("File corruption in ~p at position ~B",
-                     [Fd, Pos]),
-            exit({file_corruption, <<"file corruption">>})
-        end;
+    {ok, Bin, MaybeMd5} ->
+        {ok, verify_md5(Fd, Pos, Bin, MaybeMd5)};
+    Error ->
+        Error
+    end.
+
+
+mpread_terms(Fd, PosList) ->
+    {ok, Bins} = mpread_binaries(Fd, PosList),
+    {ok, lists:map(fun couch_compress:decompress/1, Bins)}.
+
+
+mpread_binaries(Fd, PosList) ->
+    {ok, Results} = mpread_iolists(Fd, PosList),
+    {ok, lists:map(fun iolist_to_binary/1, Results)}.
+
+
+mpread_iolists(Fd, PosList) ->
+    case ioq:call(Fd, {mpread_iolists, PosList}, erlang:get(io_priority)) of
+    {ok, Results} ->
+        {ok, lists:zipwith(fun(Pos, {DataBin, MaybeMd5}) ->
+            verify_md5(Fd, Pos, DataBin, MaybeMd5)
+        end, PosList, Results)};
     Error ->
         Error
     end.
@@ -425,6 +454,30 @@ handle_call({pread_iolist, Pos}, _From, File) ->
         {reply, {ok, Iolist, <<>>}, File}
     end;
 
+handle_call({mpread_iolists, PosList}, _From, File) ->
+    PosLenPairs = lists:map(fun(Pos) -> {Pos, 4} end, PosList),
+    LengthBins = read_raw_iolist_multi(File, PosLenPairs),
+    {HasMd5List, DataPosLenPairs} = lists:foldr(fun
+        ({LenIoList, NextPos}, {HasMd5Acc, PairAcc}) ->
+            case iolist_to_binary(LenIoList) of
+                <<1:1/integer, Len:31/integer>> -> % An MD5-prefixed term
+                    {[true | HasMd5Acc], [{NextPos, Len + 16} | PairAcc]};
+                <<0:1/integer, Len:31/integer>> ->
+                    {[false | HasMd5Acc], [{NextPos, Len} | PairAcc]}
+            end
+    end, {[], []}, LengthBins),
+    DataBins = read_raw_iolist_multi(File, DataPosLenPairs),
+    Results = lists:zipwith(fun(HasMd5, {DataBin, _}) ->
+        case HasMd5 of
+            true ->
+                {Md5, Rest} = extract_md5(DataBin),
+                {Rest, Md5};
+            false ->
+                {DataBin, <<>>}
+        end
+    end, HasMd5List, DataBins),
+    {reply, {ok, Results}, File};
+
 handle_call(bytes, _From, #file{fd = Fd} = File) ->
     {reply, file:position(Fd, eof), File};
 
@@ -458,6 +511,20 @@ handle_call({append_bin, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
         {reply, Error, reset_eof(File)}
     end;
 
+handle_call({append_bins, Bins}, _From, #file{fd = Fd, eof = Pos} = File) ->
+    {NewPos, BlocksOut, RespOut} =
+    lists:foldl(fun(Bin, {PosAcc, BlocksAcc, RespAcc}) ->
+        Blocks = make_blocks(PosAcc rem ?SIZE_BLOCK, Bin),
+        Size = iolist_size(Blocks),
+        {PosAcc + Size, [Blocks | BlocksAcc], [{PosAcc, Size} | RespAcc]}
+    end, {Pos, [], []}, Bins),
+    case file:write(Fd, lists:reverse(BlocksOut)) of
+    ok ->
+        {reply, {ok, lists:reverse(RespOut)}, File#file{eof = NewPos}};
+    Error ->
+        {reply, Error, reset_eof(File)}
+    end;
+
 handle_call({write_header, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
     BinSize = byte_size(Bin),
     case Pos rem ?SIZE_BLOCK of
@@ -502,6 +569,19 @@ handle_info({'DOWN', Ref, process, _Pid, _Info}, #file{db_monitor=Ref}=File) ->
     end.
 
 
+verify_md5(_Fd, _Pos, Bin, <<>>) ->
+    Bin;
+verify_md5(Fd, Pos, Bin, Md5) ->
+    case couch_crypto:hash(md5, Bin) of
+        Md5 ->
+            Bin;
+        _ ->
+            Msg = "File corruption in ~p at position ~B",
+            couch_log:emergency(Msg, [Fd, Pos]),
+            exit({file_corruption, <<"file corruption">>})
+    end.
+
+
 find_header(Fd, Block) ->
     case (catch load_header(Fd, Block)) of
     {ok, Bin} ->
@@ -583,23 +663,48 @@ find_newest_header(Fd, [{Location, Size} | LocationSizes]) ->
     {Data::iolist(), CurPos::non_neg_integer()}.
 read_raw_iolist_int(Fd, {Pos, _Size}, Len) -> % 0110 UPGRADE CODE
     read_raw_iolist_int(Fd, Pos, Len);
-read_raw_iolist_int(#file{fd = Fd, pread_limit = Limit} = F, Pos, Len) ->
+read_raw_iolist_int(#file{fd = Fd} = File, Pos, Len) ->
+    {TotalBytes, BlockOffset, FinalPos} = calculate_read_info(File, Pos, Len),
+    {ok, <<RawBin:TotalBytes/binary>>} = file:pread(Fd, Pos, TotalBytes),
+    {remove_block_prefixes(BlockOffset, RawBin), FinalPos}.
+
+
+-spec read_raw_iolist_multi(#file{}, [{Pos::non_neg_integer(), Len::non_neg_integer()}]) ->
+    [{Data::iolist(), CurrPos::non_neg_integer()}].
+read_raw_iolist_multi(#file{fd = Fd} = File, PosLenPairs) ->
+    ReadInfo = lists:map(fun({Pos, Len}) ->
+        calculate_read_info(File, Pos, Len)
+    end, PosLenPairs),
+    LocNums = lists:zipwith(fun({Pos, _}, {TotalBytes, _, _}) ->
+        {Pos, TotalBytes}
+    end, PosLenPairs, ReadInfo),
+    {ok, Bins} = file:pread(Fd, LocNums),
+    lists:zipwith(fun
+        ({TotalBytes, BlockOffset, FinalPos}, Bin) ->
+            <<RawBin:TotalBytes/binary>> = Bin,
+            {remove_block_prefixes(BlockOffset, RawBin), FinalPos}
+    end, ReadInfo, Bins).
+
+
+-spec calculate_read_info(#file{}, Pos::non_neg_integer(), Len::non_neg_integer()) ->
+    Size::non_neg_integer().
+calculate_read_info(#file{eof = Eof, pread_limit = Limit}, Pos, Len) ->
     BlockOffset = Pos rem ?SIZE_BLOCK,
     TotalBytes = calculate_total_read_len(BlockOffset, Len),
     case Pos + TotalBytes of
-    Size when Size > F#file.eof ->
+    FinalPos when FinalPos > Eof ->
         couch_stats:increment_counter([pread, exceed_eof]),
         {_Fd, Filepath} = get(couch_file_fd),
         throw({read_beyond_eof, Filepath});
-    Size when Size > Limit ->
+    FinalPos when FinalPos > Limit ->
         couch_stats:increment_counter([pread, exceed_limit]),
         {_Fd, Filepath} = get(couch_file_fd),
         throw({exceed_pread_limit, Filepath, Limit});
-    Size ->
-        {ok, <<RawBin:TotalBytes/binary>>} = file:pread(Fd, Pos, TotalBytes),
-        {remove_block_prefixes(BlockOffset, RawBin), Size}
+    FinalPos ->
+        {TotalBytes, BlockOffset, FinalPos}
     end.
 
+
 -spec extract_md5(iolist()) -> {binary(), iolist()}.
 extract_md5(FullIoList) ->
     {Md5List, IoList} = split_iolist(FullIoList, 16, []),