You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/03/30 16:44:35 UTC

[2/4] couch commit: updated refs/heads/feat-optimize-compaction-docid-phase to 14766b3

Add parallel read and write APIs to couch_file

This allows for terms and binaries to be read and written in a single
call to couch_file.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/ce6778f4
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/ce6778f4
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/ce6778f4

Branch: refs/heads/feat-optimize-compaction-docid-phase
Commit: ce6778f40b00c312217133990b89342b35afdd56
Parents: bb20182
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Mar 29 13:39:21 2017 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Thu Mar 30 11:43:21 2017 -0500

----------------------------------------------------------------------
 src/couch_file.erl | 141 +++++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 123 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/ce6778f4/src/couch_file.erl
----------------------------------------------------------------------
diff --git a/src/couch_file.erl b/src/couch_file.erl
index d7b176e..69dbaa4 100644
--- a/src/couch_file.erl
+++ b/src/couch_file.erl
@@ -39,9 +39,11 @@
 % public API
 -export([open/1, open/2, close/1, bytes/1, sync/1, truncate/2, set_db_pid/2]).
 -export([pread_term/2, pread_iolist/2, pread_binary/2]).
+-export([mpread_terms/2, mpread_iolists/2, mpread_binaries/2]).
 -export([append_binary/2, append_binary_md5/2]).
 -export([append_raw_chunk/2, assemble_file_chunk/1, assemble_file_chunk/2]).
 -export([append_term/2, append_term/3, append_term_md5/2, append_term_md5/3]).
+-export([append_terms/2, append_terms/3, append_binaries/2]).
 -export([write_header/2, read_header/1]).
 -export([delete/2, delete/3, nuke_dir/2, init_delete_dir/1]).
 
@@ -128,7 +130,7 @@ append_term_md5(Fd, Term, Options) ->
 
 append_binary(Fd, Bin) ->
     ioq:call(Fd, {append_bin, assemble_file_chunk(Bin)}, erlang:get(io_priority)).
-    
+
 append_binary_md5(Fd, Bin) ->
     ioq:call(Fd,
         {append_bin, assemble_file_chunk(Bin, couch_crypto:hash(md5, Bin))},
@@ -144,6 +146,21 @@ assemble_file_chunk(Bin) ->
 assemble_file_chunk(Bin, Md5) ->
     [<<1:1/integer, (iolist_size(Bin)):31/integer>>, Md5, Bin].
 
+
+append_terms(Fd, Terms) when is_list(Terms) ->
+    append_terms(Fd, Terms, []).
+
+
+append_terms(Fd, Terms, Options) ->
+    Comp = couch_util:get_value(compression, Options, ?DEFAULT_COMPRESSION),
+    Compressed = [couch_compress:compress(Term, Comp) || Term <- Terms],
+    append_binaries(Fd, Compressed).
+
+
+append_binaries(Fd, Bins0) when is_list(Bins0) ->
+    Bins1 = [assemble_file_chunk(B) || B <- Bins0],
+    ioq:call(Fd, {append_bins, Bins1}, erlang:get(io_priority)).
+
 %%----------------------------------------------------------------------
 %% Purpose: Reads a term from a file that was written with append_term
 %% Args:    Pos, the offset into the file where the term is serialized.
@@ -171,17 +188,29 @@ pread_binary(Fd, Pos) ->
 
 pread_iolist(Fd, Pos) ->
     case ioq:call(Fd, {pread_iolist, Pos}, erlang:get(io_priority)) of
-    {ok, IoList, <<>>} ->
-        {ok, IoList};
-    {ok, IoList, Md5} ->
-        case couch_crypto:hash(md5, IoList) of
-        Md5 ->
-            {ok, IoList};
-        _ ->
-            couch_log:emergency("File corruption in ~p at position ~B",
-                     [Fd, Pos]),
-            exit({file_corruption, <<"file corruption">>})
-        end;
+    {ok, Bin, MaybeMd5} ->
+        {ok, verify_md5(Fd, Pos, Bin, MaybeMd5)};
+    Error ->
+        Error
+    end.
+
+
+mpread_terms(Fd, PosList) ->
+    {ok, Bins} = mpread_binaries(Fd, PosList),
+    {ok, lists:map(fun couch_compress:decompress/1, Bins)}.
+
+
+mpread_binaries(Fd, PosList) ->
+    {ok, Results} = mpread_iolists(Fd, PosList),
+    {ok, lists:map(fun iolist_to_binary/1, Results)}.
+
+
+mpread_iolists(Fd, PosList) ->
+    case ioq:call(Fd, {mpread_iolists, PosList}, erlang:get(io_priority)) of
+    {ok, Results} ->
+        {ok, lists:zipwith(fun(Pos, {DataBin, MaybeMd5}) ->
+            verify_md5(Fd, Pos, DataBin, MaybeMd5)
+        end, PosList, Results)};
     Error ->
         Error
     end.
@@ -425,6 +454,30 @@ handle_call({pread_iolist, Pos}, _From, File) ->
         {reply, {ok, Iolist, <<>>}, File}
     end;
 
+handle_call({mpread_iolists, PosList}, _From, File) ->
+    PosLenPairs = lists:map(fun(Pos) -> {Pos, 4} end, PosList),
+    LengthBins = read_raw_iolist_multi(File, PosLenPairs),
+    {HasMd5List, DataPosLenPairs} = lists:foldr(fun
+        ({LenIoList, NextPos}, {HasMd5Acc, PairAcc}) ->
+            case iolist_to_binary(LenIoList) of
+                <<1:1/integer, Len:31/integer>> -> % An MD5-prefixed term
+                    {[true | HasMd5Acc], [{NextPos, Len + 16} | PairAcc]};
+                <<0:1/integer, Len:31/integer>> ->
+                    {[false | HasMd5Acc], [{NextPos, Len} | PairAcc]}
+            end
+    end, {[], []}, LengthBins),
+    DataBins = read_raw_iolist_multi(File, DataPosLenPairs),
+    Results = lists:zipwith(fun(HasMd5, {DataBin, _}) ->
+        case HasMd5 of
+            true ->
+                {Md5, Rest} = extract_md5(DataBin),
+                {Rest, Md5};
+            false ->
+                {DataBin, <<>>}
+        end
+    end, HasMd5List, DataBins),
+    {reply, {ok, Results}, File};
+
 handle_call(bytes, _From, #file{fd = Fd} = File) ->
     {reply, file:position(Fd, eof), File};
 
@@ -458,6 +511,20 @@ handle_call({append_bin, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
         {reply, Error, reset_eof(File)}
     end;
 
+handle_call({append_bins, Bins}, _From, #file{fd = Fd, eof = Pos} = File) ->
+    {NewPos, BlocksOut, RespOut} =
+    lists:foldl(fun(Bin, {PosAcc, BlocksAcc, RespAcc}) ->
+        Blocks = make_blocks(PosAcc rem ?SIZE_BLOCK, Bin),
+        Size = iolist_size(Blocks),
+        {PosAcc + Size, [Blocks | BlocksAcc], [{PosAcc, Size} | RespAcc]}
+    end, {Pos, [], []}, Bins),
+    case file:write(Fd, lists:reverse(BlocksOut)) of
+    ok ->
+        {reply, {ok, lists:reverse(RespOut)}, File#file{eof = NewPos}};
+    Error ->
+        {reply, Error, reset_eof(File)}
+    end;
+
 handle_call({write_header, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
     BinSize = byte_size(Bin),
     case Pos rem ?SIZE_BLOCK of
@@ -502,6 +569,19 @@ handle_info({'DOWN', Ref, process, _Pid, _Info}, #file{db_monitor=Ref}=File) ->
     end.
 
 
+verify_md5(_Fd, _Pos, Bin, <<>>) ->
+    Bin;
+verify_md5(Fd, Pos, Bin, Md5) ->
+    case couch_crypto:hash(md5, Bin) of
+        Md5 ->
+            Bin;
+        _ ->
+            Msg = "File corruption in ~p at position ~B",
+            couch_log:emergency(Msg, [Fd, Pos]),
+            exit({file_corruption, <<"file corruption">>})
+    end.
+
+
 find_header(Fd, Block) ->
     case (catch load_header(Fd, Block)) of
     {ok, Bin} ->
@@ -583,23 +663,48 @@ find_newest_header(Fd, [{Location, Size} | LocationSizes]) ->
     {Data::iolist(), CurPos::non_neg_integer()}.
 read_raw_iolist_int(Fd, {Pos, _Size}, Len) -> % 0110 UPGRADE CODE
     read_raw_iolist_int(Fd, Pos, Len);
-read_raw_iolist_int(#file{fd = Fd, pread_limit = Limit} = F, Pos, Len) ->
+read_raw_iolist_int(#file{fd = Fd} = File, Pos, Len) ->
+    {TotalBytes, BlockOffset, FinalPos} = calculate_read_info(File, Pos, Len),
+    {ok, <<RawBin:TotalBytes/binary>>} = file:pread(Fd, Pos, TotalBytes),
+    {remove_block_prefixes(BlockOffset, RawBin), FinalPos}.
+
+
+-spec read_raw_iolist_multi(#file{}, [{Pos::non_neg_integer(), Len::non_neg_integer()}]) ->
+    [{Data::iolist(), CurrPos::non_neg_integer()}].
+read_raw_iolist_multi(#file{fd = Fd} = File, PosLenPairs) ->
+    ReadInfo = lists:map(fun({Pos, Len}) ->
+        calculate_read_info(File, Pos, Len)
+    end, PosLenPairs),
+    LocNums = lists:zipwith(fun({Pos, _}, {TotalBytes, _, _}) ->
+        {Pos, TotalBytes}
+    end, PosLenPairs, ReadInfo),
+    {ok, Bins} = file:pread(Fd, LocNums),
+    lists:zipwith(fun
+        ({TotalBytes, BlockOffset, FinalPos}, Bin) ->
+            <<RawBin:TotalBytes/binary>> = Bin,
+            {remove_block_prefixes(BlockOffset, RawBin), FinalPos}
+    end, ReadInfo, Bins).
+
+
+-spec calculate_read_info(#file{}, Pos::non_neg_integer(), Len::non_neg_integer()) ->
+    Size::non_neg_integer().
+calculate_read_info(#file{eof = Eof, pread_limit = Limit}, Pos, Len) ->
     BlockOffset = Pos rem ?SIZE_BLOCK,
     TotalBytes = calculate_total_read_len(BlockOffset, Len),
     case Pos + TotalBytes of
-    Size when Size > F#file.eof ->
+    FinalPos when FinalPos > Eof ->
         couch_stats:increment_counter([pread, exceed_eof]),
         {_Fd, Filepath} = get(couch_file_fd),
         throw({read_beyond_eof, Filepath});
-    Size when Size > Limit ->
+    FinalPos when FinalPos > Limit ->
         couch_stats:increment_counter([pread, exceed_limit]),
         {_Fd, Filepath} = get(couch_file_fd),
         throw({exceed_pread_limit, Filepath, Limit});
-    Size ->
-        {ok, <<RawBin:TotalBytes/binary>>} = file:pread(Fd, Pos, TotalBytes),
-        {remove_block_prefixes(BlockOffset, RawBin), Size}
+    FinalPos ->
+        {TotalBytes, BlockOffset, FinalPos}
     end.
 
+
 -spec extract_md5(iolist()) -> {binary(), iolist()}.
 extract_md5(FullIoList) ->
     {Md5List, IoList} = split_iolist(FullIoList, 16, []),