You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/03/30 16:44:35 UTC
[2/4] couch commit: updated
refs/heads/feat-optimize-compaction-docid-phase to 14766b3
Add parallel read and write APIs to couch_file
This allows for terms and binaries to be read and written in a single
call to couch_file.
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/ce6778f4
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/ce6778f4
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/ce6778f4
Branch: refs/heads/feat-optimize-compaction-docid-phase
Commit: ce6778f40b00c312217133990b89342b35afdd56
Parents: bb20182
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Mar 29 13:39:21 2017 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Thu Mar 30 11:43:21 2017 -0500
----------------------------------------------------------------------
src/couch_file.erl | 141 +++++++++++++++++++++++++++++++++++++++++-------
1 file changed, 123 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/ce6778f4/src/couch_file.erl
----------------------------------------------------------------------
diff --git a/src/couch_file.erl b/src/couch_file.erl
index d7b176e..69dbaa4 100644
--- a/src/couch_file.erl
+++ b/src/couch_file.erl
@@ -39,9 +39,11 @@
% public API
-export([open/1, open/2, close/1, bytes/1, sync/1, truncate/2, set_db_pid/2]).
-export([pread_term/2, pread_iolist/2, pread_binary/2]).
+-export([mpread_terms/2, mpread_iolists/2, mpread_binaries/2]).
-export([append_binary/2, append_binary_md5/2]).
-export([append_raw_chunk/2, assemble_file_chunk/1, assemble_file_chunk/2]).
-export([append_term/2, append_term/3, append_term_md5/2, append_term_md5/3]).
+-export([append_terms/2, append_terms/3, append_binaries/2]).
-export([write_header/2, read_header/1]).
-export([delete/2, delete/3, nuke_dir/2, init_delete_dir/1]).
@@ -128,7 +130,7 @@ append_term_md5(Fd, Term, Options) ->
append_binary(Fd, Bin) ->
ioq:call(Fd, {append_bin, assemble_file_chunk(Bin)}, erlang:get(io_priority)).
-
+
append_binary_md5(Fd, Bin) ->
ioq:call(Fd,
{append_bin, assemble_file_chunk(Bin, couch_crypto:hash(md5, Bin))},
@@ -144,6 +146,21 @@ assemble_file_chunk(Bin) ->
assemble_file_chunk(Bin, Md5) ->
[<<1:1/integer, (iolist_size(Bin)):31/integer>>, Md5, Bin].
+
+append_terms(Fd, Terms) when is_list(Terms) ->
+ append_terms(Fd, Terms, []).
+
+
+append_terms(Fd, Terms, Options) ->
+ Comp = couch_util:get_value(compression, Options, ?DEFAULT_COMPRESSION),
+ Compressed = [couch_compress:compress(Term, Comp) || Term <- Terms],
+ append_binaries(Fd, Compressed).
+
+
+append_binaries(Fd, Bins0) when is_list(Bins0) ->
+ Bins1 = [assemble_file_chunk(B) || B <- Bins0],
+ ioq:call(Fd, {append_bins, Bins1}, erlang:get(io_priority)).
+
%%----------------------------------------------------------------------
%% Purpose: Reads a term from a file that was written with append_term
%% Args: Pos, the offset into the file where the term is serialized.
@@ -171,17 +188,29 @@ pread_binary(Fd, Pos) ->
pread_iolist(Fd, Pos) ->
case ioq:call(Fd, {pread_iolist, Pos}, erlang:get(io_priority)) of
- {ok, IoList, <<>>} ->
- {ok, IoList};
- {ok, IoList, Md5} ->
- case couch_crypto:hash(md5, IoList) of
- Md5 ->
- {ok, IoList};
- _ ->
- couch_log:emergency("File corruption in ~p at position ~B",
- [Fd, Pos]),
- exit({file_corruption, <<"file corruption">>})
- end;
+ {ok, Bin, MaybeMd5} ->
+ {ok, verify_md5(Fd, Pos, Bin, MaybeMd5)};
+ Error ->
+ Error
+ end.
+
+
+mpread_terms(Fd, PosList) ->
+ {ok, Bins} = mpread_binaries(Fd, PosList),
+ {ok, lists:map(fun couch_compress:decompress/1, Bins)}.
+
+
+mpread_binaries(Fd, PosList) ->
+ {ok, Results} = mpread_iolists(Fd, PosList),
+ {ok, lists:map(fun iolist_to_binary/1, Results)}.
+
+
+mpread_iolists(Fd, PosList) ->
+ case ioq:call(Fd, {mpread_iolists, PosList}, erlang:get(io_priority)) of
+ {ok, Results} ->
+ {ok, lists:zipwith(fun(Pos, {DataBin, MaybeMd5}) ->
+ verify_md5(Fd, Pos, DataBin, MaybeMd5)
+ end, PosList, Results)};
Error ->
Error
end.
@@ -425,6 +454,30 @@ handle_call({pread_iolist, Pos}, _From, File) ->
{reply, {ok, Iolist, <<>>}, File}
end;
+handle_call({mpread_iolists, PosList}, _From, File) ->
+ PosLenPairs = lists:map(fun(Pos) -> {Pos, 4} end, PosList),
+ LengthBins = read_raw_iolist_multi(File, PosLenPairs),
+ {HasMd5List, DataPosLenPairs} = lists:foldr(fun
+ ({LenIoList, NextPos}, {HasMd5Acc, PairAcc}) ->
+ case iolist_to_binary(LenIoList) of
+ <<1:1/integer, Len:31/integer>> -> % An MD5-prefixed term
+ {[true | HasMd5Acc], [{NextPos, Len + 16} | PairAcc]};
+ <<0:1/integer, Len:31/integer>> ->
+ {[false | HasMd5Acc], [{NextPos, Len} | PairAcc]}
+ end
+ end, {[], []}, LengthBins),
+ DataBins = read_raw_iolist_multi(File, DataPosLenPairs),
+ Results = lists:zipwith(fun(HasMd5, {DataBin, _}) ->
+ case HasMd5 of
+ true ->
+ {Md5, Rest} = extract_md5(DataBin),
+ {Rest, Md5};
+ false ->
+ {DataBin, <<>>}
+ end
+ end, HasMd5List, DataBins),
+ {reply, {ok, Results}, File};
+
handle_call(bytes, _From, #file{fd = Fd} = File) ->
{reply, file:position(Fd, eof), File};
@@ -458,6 +511,20 @@ handle_call({append_bin, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
{reply, Error, reset_eof(File)}
end;
+handle_call({append_bins, Bins}, _From, #file{fd = Fd, eof = Pos} = File) ->
+ {NewPos, BlocksOut, RespOut} =
+ lists:foldl(fun(Bin, {PosAcc, BlocksAcc, RespAcc}) ->
+ Blocks = make_blocks(PosAcc rem ?SIZE_BLOCK, Bin),
+ Size = iolist_size(Blocks),
+ {PosAcc + Size, [Blocks | BlocksAcc], [{PosAcc, Size} | RespAcc]}
+ end, {Pos, [], []}, Bins),
+ case file:write(Fd, lists:reverse(BlocksOut)) of
+ ok ->
+ {reply, {ok, lists:reverse(RespOut)}, File#file{eof = NewPos}};
+ Error ->
+ {reply, Error, reset_eof(File)}
+ end;
+
handle_call({write_header, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
BinSize = byte_size(Bin),
case Pos rem ?SIZE_BLOCK of
@@ -502,6 +569,19 @@ handle_info({'DOWN', Ref, process, _Pid, _Info}, #file{db_monitor=Ref}=File) ->
end.
+verify_md5(_Fd, _Pos, Bin, <<>>) ->
+ Bin;
+verify_md5(Fd, Pos, Bin, Md5) ->
+ case couch_crypto:hash(md5, Bin) of
+ Md5 ->
+ Bin;
+ _ ->
+ Msg = "File corruption in ~p at position ~B",
+ couch_log:emergency(Msg, [Fd, Pos]),
+ exit({file_corruption, <<"file corruption">>})
+ end.
+
+
find_header(Fd, Block) ->
case (catch load_header(Fd, Block)) of
{ok, Bin} ->
@@ -583,23 +663,48 @@ find_newest_header(Fd, [{Location, Size} | LocationSizes]) ->
{Data::iolist(), CurPos::non_neg_integer()}.
read_raw_iolist_int(Fd, {Pos, _Size}, Len) -> % 0110 UPGRADE CODE
read_raw_iolist_int(Fd, Pos, Len);
-read_raw_iolist_int(#file{fd = Fd, pread_limit = Limit} = F, Pos, Len) ->
+read_raw_iolist_int(#file{fd = Fd} = File, Pos, Len) ->
+ {TotalBytes, BlockOffset, FinalPos} = calculate_read_info(File, Pos, Len),
+ {ok, <<RawBin:TotalBytes/binary>>} = file:pread(Fd, Pos, TotalBytes),
+ {remove_block_prefixes(BlockOffset, RawBin), FinalPos}.
+
+
+-spec read_raw_iolist_multi(#file{}, [{Pos::non_neg_integer(), Len::non_neg_integer()}]) ->
+ [{Data::iolist(), CurrPos::non_neg_integer()}].
+read_raw_iolist_multi(#file{fd = Fd} = File, PosLenPairs) ->
+ ReadInfo = lists:map(fun({Pos, Len}) ->
+ calculate_read_info(File, Pos, Len)
+ end, PosLenPairs),
+ LocNums = lists:zipwith(fun({Pos, _}, {TotalBytes, _, _}) ->
+ {Pos, TotalBytes}
+ end, PosLenPairs, ReadInfo),
+ {ok, Bins} = file:pread(Fd, LocNums),
+ lists:zipwith(fun
+ ({TotalBytes, BlockOffset, FinalPos}, Bin) ->
+ <<RawBin:TotalBytes/binary>> = Bin,
+ {remove_block_prefixes(BlockOffset, RawBin), FinalPos}
+ end, ReadInfo, Bins).
+
+
+-spec calculate_read_info(#file{}, Pos::non_neg_integer(), Len::non_neg_integer()) ->
+ Size::non_neg_integer().
+calculate_read_info(#file{eof = Eof, pread_limit = Limit}, Pos, Len) ->
BlockOffset = Pos rem ?SIZE_BLOCK,
TotalBytes = calculate_total_read_len(BlockOffset, Len),
case Pos + TotalBytes of
- Size when Size > F#file.eof ->
+ FinalPos when FinalPos > Eof ->
couch_stats:increment_counter([pread, exceed_eof]),
{_Fd, Filepath} = get(couch_file_fd),
throw({read_beyond_eof, Filepath});
- Size when Size > Limit ->
+ FinalPos when FinalPos > Limit ->
couch_stats:increment_counter([pread, exceed_limit]),
{_Fd, Filepath} = get(couch_file_fd),
throw({exceed_pread_limit, Filepath, Limit});
- Size ->
- {ok, <<RawBin:TotalBytes/binary>>} = file:pread(Fd, Pos, TotalBytes),
- {remove_block_prefixes(BlockOffset, RawBin), Size}
+ FinalPos ->
+ {TotalBytes, BlockOffset, FinalPos}
end.
+
-spec extract_md5(iolist()) -> {binary(), iolist()}.
extract_md5(FullIoList) ->
{Md5List, IoList} = split_iolist(FullIoList, 16, []),