You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/09/11 20:56:52 UTC

[couchdb] 04/06: Add multi-append functions to couch_file

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch compactor-optimize-emsort
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 55f8718a5e124e963541971078c6d431a5a9f088
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Sep 6 13:36:16 2017 -0500

    Add multi-append functions to couch_file
    
    These functions allow the caller to append multiple terms or binaries to
    a file and receive the file position and size for each individual
    element. This is to optimize throughput in situations where we want to
    write multiple pieces of independant data.
---
 src/couch/src/couch_file.erl | 144 ++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 130 insertions(+), 14 deletions(-)

diff --git a/src/couch/src/couch_file.erl b/src/couch/src/couch_file.erl
index acd4fda..a8fcc6c 100644
--- a/src/couch/src/couch_file.erl
+++ b/src/couch/src/couch_file.erl
@@ -42,6 +42,8 @@
 -export([append_binary/2, append_binary_md5/2]).
 -export([append_raw_chunk/2, assemble_file_chunk/1, assemble_file_chunk/2]).
 -export([append_term/2, append_term/3, append_term_md5/2, append_term_md5/3]).
+-export([pread_terms/2, pread_binaries/2, pread_iolists/2]).
+-export([append_terms/2, append_terms/3, append_binaries/2]).
 -export([write_header/2, read_header/1]).
 -export([delete/2, delete/3, nuke_dir/2, init_delete_dir/1]).
 -export([msec_since_last_read/1]).
@@ -119,6 +121,7 @@ append_term_md5(Fd, Term, Options) ->
     Comp = couch_util:get_value(compression, Options, ?DEFAULT_COMPRESSION),
     append_binary_md5(Fd, couch_compress:compress(Term, Comp)).
 
+
 %%----------------------------------------------------------------------
 %% Purpose: To append an Erlang binary to the end of the file.
 %% Args:    Erlang term to serialize and append to the file.
@@ -129,7 +132,7 @@ append_term_md5(Fd, Term, Options) ->
 
 append_binary(Fd, Bin) ->
     ioq:call(Fd, {append_bin, assemble_file_chunk(Bin)}, erlang:get(io_priority)).
-    
+
 append_binary_md5(Fd, Bin) ->
     ioq:call(Fd,
         {append_bin, assemble_file_chunk(Bin, crypto:hash(md5, Bin))},
@@ -172,21 +175,55 @@ pread_binary(Fd, Pos) ->
 
 pread_iolist(Fd, Pos) ->
     case ioq:call(Fd, {pread_iolist, Pos}, erlang:get(io_priority)) of
-    {ok, IoList, <<>>} ->
-        {ok, IoList};
-    {ok, IoList, Md5} ->
-        case crypto:hash(md5, IoList) of
-        Md5 ->
-            {ok, IoList};
-        _ ->
-            couch_log:emergency("File corruption in ~p at position ~B",
-                     [Fd, Pos]),
-            exit({file_corruption, <<"file corruption">>})
-        end;
-    Error ->
-        Error
+        {ok, IoList, Md5} ->
+            {ok, verify_md5(Fd, Pos, IoList, Md5)};
+        Error ->
+            Error
     end.
 
+
+pread_terms(Fd, PosList) ->
+    {ok, Bins} = pread_binaries(Fd, PosList),
+    Terms = lists:map(fun(Bin) ->
+        couch_compress:decompress(Bin)
+    end, Bins),
+    {ok, Terms}.
+
+
+pread_binaries(Fd, PosList) ->
+    {ok, Data} = pread_iolists(Fd, PosList),
+    {ok, lists:map(fun erlang:iolist_to_binary/1, Data)}.
+
+
+pread_iolists(Fd, PosList) ->
+    case ioq:call(Fd, {pread_iolists, PosList}, erlang:get(io_priority)) of
+        {ok, DataMd5s} ->
+            Data = lists:zipwith(fun(Pos, {IoList, Md5}) ->
+                verify_md5(Fd, Pos, IoList, Md5)
+            end, PosList, DataMd5s),
+            {ok, Data};
+        Error ->
+            Error
+    end.
+
+
+append_terms(Fd, Terms) ->
+    append_terms(Fd, Terms, []).
+
+
+append_terms(Fd, Terms, Options) ->
+    Comp = couch_util:get_value(compression, Options, ?DEFAULT_COMPRESSION),
+    Bins = lists:map(fun(Term) ->
+        couch_compress:compress(Term, Comp)
+    end, Terms),
+    append_binaries(Fd, Bins).
+
+
+append_binaries(Fd, Bins) ->
+    WriteBins = lists:map(fun assemble_file_chunk/1, Bins),
+    ioq:call(Fd, {append_bins, WriteBins}, erlang:get(io_priority)).
+
+
 %%----------------------------------------------------------------------
 %% Purpose: The length of a file, in bytes.
 %% Returns: {ok, Bytes}
@@ -448,6 +485,30 @@ handle_call({pread_iolist, Pos}, _From, File) ->
         {reply, {ok, Iolist, <<>>}, File}
     end;
 
+handle_call({pread_iolists, PosL}, _From, File) ->
+    update_read_timestamp(),
+    LocNums1 = [{Pos, 4} || Pos <- PosL],
+    DataSizes = read_multi_raw_iolists_int(File, LocNums1),
+    LocNums2 = lists:map(fun({LenIoList, NextPos}) ->
+        case iolist_to_binary(LenIoList) of
+            <<1:1/integer, Len:31/integer>> -> % an MD5-prefixed term
+                {NextPos, Len + 16};
+            <<0:1/integer, Len:31/integer>> ->
+                {NextPos, Len}
+        end
+    end, DataSizes),
+    Resps = read_multi_raw_iolists_int(File, LocNums2),
+    Extracted = lists:zipwith(fun({LenIoList, _}, {IoList, _}) ->
+        case iolist_to_binary(LenIoList) of
+            <<1:1/integer, _:31/integer>> ->
+                {Md5, IoList} = extract_md5(IoList),
+                {IoList, Md5};
+            <<0:1/integer, _:31/integer>> ->
+                {IoList, <<>>}
+        end
+    end, DataSizes, Resps),
+    {reply, {ok, Extracted}, File};
+
 handle_call(bytes, _From, #file{fd = Fd} = File) ->
     {reply, file:position(Fd, eof), File};
 
@@ -481,6 +542,20 @@ handle_call({append_bin, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
         {reply, Error, reset_eof(File)}
     end;
 
+handle_call({append_bins, Bins}, _From, #file{fd = Fd, eof = Pos} = File) ->
+    {BlockResps, FinalPos} = lists:mapfoldl(fun(Bin, PosAcc) ->
+        Blocks = make_blocks(PosAcc rem ?SIZE_BLOCK, Bin),
+        Size = iolist_size(Blocks),
+        {{Blocks, {PosAcc, Size}}, PosAcc + Size}
+    end, Pos, Bins),
+    {AllBlocks, Resps} = lists:unzip(BlockResps),
+    case file:write(Fd, AllBlocks) of
+    ok ->
+        {reply, {ok, Resps}, File#file{eof = FinalPos}};
+    Error ->
+        {reply, Error, reset_eof(File)}
+    end;
+
 handle_call({write_header, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
     BinSize = byte_size(Bin),
     case Pos rem ?SIZE_BLOCK of
@@ -623,6 +698,31 @@ read_raw_iolist_int(#file{fd = Fd, pread_limit = Limit} = F, Pos, Len) ->
         {remove_block_prefixes(BlockOffset, RawBin), Size}
     end.
 
+
+read_multi_raw_iolists_int(#file{fd = Fd, pread_limit = Limit} = F, PosLens) ->
+    LocNums = lists:map(fun({Pos, Len}) ->
+        BlockOffset = Pos rem ?SIZE_BLOCK,
+        TotalBytes = calculate_total_read_len(BlockOffset, Len),
+        case Pos + TotalBytes of
+        Size when Size > F#file.eof ->
+            couch_stats:increment_counter([pread, exceed_eof]),
+            {_Fd, Filepath} = get(couch_file_fd),
+            throw({read_beyond_eof, Filepath});
+        Size when Size > Limit ->
+            couch_stats:increment_counter([pread, exceed_limit]),
+            {_Fd, Filepath} = get(couch_file_fd),
+            throw({exceed_pread_limit, Filepath, Limit});
+        _ ->
+            {Pos, TotalBytes}
+        end
+    end, PosLens),
+    {ok, Bins} = file:pread(Fd, LocNums),
+    lists:zipwith(fun({Pos, TotalBytes}, Bin) ->
+        <<RawBin:TotalBytes/binary>> = Bin,
+        {remove_block_prefixes(Pos rem ?SIZE_BLOCK, RawBin), Pos + TotalBytes}
+    end, LocNums, Bins).
+
+
 -spec extract_md5(iolist()) -> {binary(), iolist()}.
 extract_md5(FullIoList) ->
     {Md5List, IoList} = split_iolist(FullIoList, 16, []),
@@ -691,6 +791,22 @@ split_iolist([Byte | Rest], SplitAt, BeginAcc) when is_integer(Byte) ->
     split_iolist(Rest, SplitAt - 1, [Byte | BeginAcc]).
 
 
+
+verify_md5(_Fd, _Pos, IoList, <<>>) ->
+    IoList;
+
+verify_md5(Fd, Pos, IoList, Md5) ->
+    case crypto:hash(md5, IoList) of
+        Md5 -> IoList;
+        _ -> report_md5_error(Fd, Pos)
+    end.
+
+
+report_md5_error(Fd, Pos) ->
+    couch_log:emergency("File corruption in ~p at position ~B", [Fd, Pos]),
+    exit({file_corruption, <<"file corruption">>}).
+
+
 % System dbs aren't monitored by couch_stats_process_tracker
 is_idle(#file{is_sys=true}) ->
     case process_info(self(), monitored_by) of

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.