You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/09/08 14:40:59 UTC
[couchdb] 01/04: ss - add parallel file ops
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch compactor-optimize-emsort
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 3890387e3c315ea388a698ada9fc1042f05cd24e
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Fri Sep 8 09:21:13 2017 -0500
ss - add parallel file ops
---
src/couch/src/couch_file.erl | 139 +++++++++++++++++++++++++++++++++++--------
1 file changed, 113 insertions(+), 26 deletions(-)
diff --git a/src/couch/src/couch_file.erl b/src/couch/src/couch_file.erl
index 4068872..e96e5c3 100644
--- a/src/couch/src/couch_file.erl
+++ b/src/couch/src/couch_file.erl
@@ -42,6 +42,7 @@
-export([append_binary/2, append_binary_md5/2]).
-export([append_raw_chunk/2, assemble_file_chunk/1, assemble_file_chunk/2]).
-export([append_term/2, append_term/3, append_term_md5/2, append_term_md5/3]).
+-export([pread_terms/2, pread_binaries/2, pread_iolists/2]).
-export([append_terms/2, append_terms/3, append_binaries/2]).
-export([write_header/2, read_header/1]).
-export([delete/2, delete/3, nuke_dir/2, init_delete_dir/1]).
@@ -120,15 +121,6 @@ append_term_md5(Fd, Term, Options) ->
Comp = couch_util:get_value(compression, Options, ?DEFAULT_COMPRESSION),
append_binary_md5(Fd, couch_compress:compress(Term, Comp)).
-append_terms(Fd, Terms) ->
- append_terms(Fd, Terms, []).
-
-append_terms(Fd, Terms, Options) ->
- Comp = couch_util:get_value(compression, Options, ?DEFAULT_COMPRESSION),
- Bins = lists:map(fun(Term) ->
- couch_compress:compress(Term, Comp)
- end, Terms),
- append_binaries(Fd, Bins).
%%----------------------------------------------------------------------
%% Purpose: To append an Erlang binary to the end of the file.
@@ -146,10 +138,6 @@ append_binary_md5(Fd, Bin) ->
{append_bin, assemble_file_chunk(Bin, crypto:hash(md5, Bin))},
erlang:get(io_priority)).
-append_binaries(Fd, Bins) ->
- WriteBins = lists:map(fun assemble_file_chunk/1, Bins),
- ioq:call(Fd, {append_bins, WriteBins}, erlang:get(io_priority)).
-
append_raw_chunk(Fd, Chunk) ->
ioq:call(Fd, {append_bin, Chunk}, erlang:get(io_priority)).
@@ -187,21 +175,55 @@ pread_binary(Fd, Pos) ->
pread_iolist(Fd, Pos) ->
case ioq:call(Fd, {pread_iolist, Pos}, erlang:get(io_priority)) of
- {ok, IoList, <<>>} ->
- {ok, IoList};
- {ok, IoList, Md5} ->
- case crypto:hash(md5, IoList) of
- Md5 ->
- {ok, IoList};
- _ ->
- couch_log:emergency("File corruption in ~p at position ~B",
- [Fd, Pos]),
- exit({file_corruption, <<"file corruption">>})
- end;
- Error ->
- Error
+ {ok, IoList, Md5} ->
+ {ok, verify_md5(Fd, Pos, IoList, Md5)};
+ Error ->
+ Error
end.
+
+pread_terms(Fd, PosList) ->
+ {ok, Bins} = pread_binaries(Fd, PosList),
+ Terms = lists:map(fun(Bin) ->
+ couch_compress:decompress(Bin)
+ end, Bins),
+ {ok, Terms}.
+
+
+pread_binaries(Fd, PosList) ->
+ {ok, Data} = pread_iolists(Fd, PosList),
+ {ok, lists:map(fun erlang:iolist_to_binary/1, Data)}.
+
+
+pread_iolists(Fd, PosList) ->
+ case ioq:call(Fd, {pread_iolists, PosList}, erlang:get(io_priority)) of
+ {ok, DataMd5s} ->
+ Data = lists:zipwith(fun({Pos, _}, {IoList, Md5}) ->
+ verify_md5(Fd, Pos, IoList, Md5)
+ end, PosList, DataMd5s),
+ {ok, Data};
+ Error ->
+ Error
+ end.
+
+
+append_terms(Fd, Terms) ->
+ append_terms(Fd, Terms, []).
+
+
+append_terms(Fd, Terms, Options) ->
+ Comp = couch_util:get_value(compression, Options, ?DEFAULT_COMPRESSION),
+ Bins = lists:map(fun(Term) ->
+ couch_compress:compress(Term, Comp)
+ end, Terms),
+ append_binaries(Fd, Bins).
+
+
+append_binaries(Fd, Bins) ->
+ WriteBins = lists:map(fun assemble_file_chunk/1, Bins),
+ ioq:call(Fd, {append_bins, WriteBins}, erlang:get(io_priority)).
+
+
%%----------------------------------------------------------------------
%% Purpose: The length of a file, in bytes.
%% Returns: {ok, Bytes}
@@ -463,6 +485,30 @@ handle_call({pread_iolist, Pos}, _From, File) ->
{reply, {ok, Iolist, <<>>}, File}
end;
+handle_call({pread_iolists, PosL}, _From, File) ->
+ update_read_timestamp(),
+ LocNums = [{Pos, 4} || Pos <- PosL],
+ DataSizes = read_multi_raw_iolists_int(File, LocNums),
+ LocNums = lists:map(fun({LenIoList, NextPos}) ->
+ case iolist_to_binary(LenIoList) of
+ <<1:1/integer, Len:31/integer>> -> % an MD5-prefixed term
+ {NextPos, Len + 16};
+ <<0:1/integer, Len:31/integer>> ->
+ {NextPos, Len}
+ end
+ end, DataSizes),
+ {ok, Resps} = read_multi_raw_iolists_int(File, LocNums),
+ Extracted = lists:zipwith(fun({LenIoList, _}, {IoList, _}) ->
+ case iolist_to_binary(LenIoList) of
+ <<1:1/integer, _/binary>> ->
+ {Md5, IoList} = extract_md5(IoList),
+ {IoList, Md5};
+ <<0:1/integer, _/binary>> ->
+ {IoList, <<>>}
+ end
+ end, DataSizes, Resps),
+ {reply, {ok, Extracted}, File};
+
handle_call(bytes, _From, #file{fd = Fd} = File) ->
{reply, file:position(Fd, eof), File};
@@ -652,6 +698,31 @@ read_raw_iolist_int(#file{fd = Fd, pread_limit = Limit} = F, Pos, Len) ->
{remove_block_prefixes(BlockOffset, RawBin), Size}
end.
+
+read_multi_raw_iolists_int(#file{fd = Fd, pread_limit = Limit} = F, PosLens) ->
+ LocNums = lists:map(fun({Pos, Len}) ->
+ BlockOffset = Pos rem ?SIZE_BLOCK,
+ TotalBytes = calculate_total_read_len(BlockOffset, Len),
+ case Pos + TotalBytes of
+ Size when Size > F#file.eof ->
+ couch_stats:increment_counter([pread, exceed_eof]),
+ {_Fd, Filepath} = get(couch_file_fd),
+ throw({read_beyond_eof, Filepath});
+ Size when Size > Limit ->
+ couch_stats:increment_counter([pread, exceed_limit]),
+ {_Fd, Filepath} = get(couch_file_fd),
+ throw({exceed_pread_limit, Filepath, Limit});
+ _ ->
+ {Pos, TotalBytes}
+ end
+ end, PosLens),
+ {ok, Bins} = file:pread(Fd, LocNums),
+ lists:zipwith(fun({Pos, TotalBytes}, Bin) ->
+ <<RawBin:TotalBytes/binary>> = Bin,
+ {remove_block_prefixes(Pos rem ?SIZE_BLOCK, RawBin), Pos + TotalBytes}
+ end, LocNums, Bins).
+
+
-spec extract_md5(iolist()) -> {binary(), iolist()}.
extract_md5(FullIoList) ->
{Md5List, IoList} = split_iolist(FullIoList, 16, []),
@@ -720,6 +791,22 @@ split_iolist([Byte | Rest], SplitAt, BeginAcc) when is_integer(Byte) ->
split_iolist(Rest, SplitAt - 1, [Byte | BeginAcc]).
+
+verify_md5(_Fd, _Pos, IoList, <<>>) ->
+ IoList;
+
+verify_md5(Fd, Pos, IoList, Md5) ->
+ case crypto:hash(md5, IoList) of
+ Md5 -> IoList;
+ _ -> report_md5_error(Fd, Pos)
+ end.
+
+
+report_md5_error(Fd, Pos) ->
+ couch_log:emergency("File corruption in ~p at position ~B", [Fd, Pos]),
+ exit({file_corruption, <<"file corruption">>}).
+
+
% System dbs aren't monitored by couch_stats_process_tracker
is_idle(#file{is_sys=true}) ->
case process_info(self(), monitored_by) of
--
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.