You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/09/11 20:56:48 UTC
[couchdb] branch compactor-optimize-emsort updated (23167fa ->
a333595)
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a change to branch compactor-optimize-emsort
in repository https://gitbox.apache.org/repos/asf/couchdb.git.
omit 23167fa Optimize compactor to use multi-IO API
omit 8cd0eb8 Optimize btree node writes
omit 99cad8e Add multi-append functions to couch_file
omit e6d9c57 Implement compactor test suite
omit 54f7c7a Simplify compaction state management
omit 41cb42f Reorder compaction functions
add f8aee3a Add a debugging utilities for listing processes
add b710eca Add help function
add 60a8560 Rename functions to clarify they work on tree
add 9ac4aca Remove ps function
add 9b56fef Fix double mention of link_tree in help
add db0be08 Add few tests
add f7dac72 Remove dependency on proper
add 97b3626 ps is gone
add 5982428 Fix typos in help
add c47f051 Fix copy/paste error spawn -> spawn_link
add f3f0899 Add comment about structure of TableSpec
add 87c40b9 Fix tree generation in tests
add 64a48ba Merge pull request #761 from cloudant/print-linked-processes2
add af839e1 basic execution statistics for _find (#768)
add f6f5ff7 Fix Jenkins build - always pull latest pkg img
add 1bbb244 Remove copy/paste error in jenkinsfile
add ad29220 include mrview options in _explain result
add 3d0e4f4 Merge pull request #765 from willholley/mango_explain_view_parameters
add d3c8d41 Pass user ctx when opening a doc in show handler
add 7284b68 Improve Mango operator tests (#792)
add ff6e576 Allow library object in other design doc sections besides views
add 1091b5a Implement attachment size limits
add aa14e5d Query operator tests for multiple index types (#800)
new a7a66c3 Reorder compaction functions
new 7c28a76 Simplify compaction state management
new 98d35f2 Implement compactor test suite
new 55f8718 Add multi-append functions to couch_file
new 084f72a Optimize btree node writes
new a333595 Optimize compactor to use multi-IO API
This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version. This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:
* -- * -- B -- O -- O -- O (23167fa)
\
N -- N -- N refs/heads/compactor-optimize-emsort (a333595)
You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.
Any revisions marked "omit" are not gone; other references still
refer to them. Any revisions marked "discard" are gone forever.
The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
Jenkinsfile | 92 ++--
rel/overlay/etc/default.ini | 2 +
src/chttpd/src/chttpd.erl | 2 +
src/chttpd/src/chttpd_show.erl | 13 +-
.../test/chttpd_db_attachment_size_tests.erl | 206 ++++++++
src/couch/src/couch_att.erl | 35 +-
src/couch/src/couch_debug.erl | 517 +++++++++++++++++++++
src/couch/src/couch_doc.erl | 24 +-
src/couch/src/couch_httpd.erl | 4 +
src/couch/test/couch_doc_tests.erl | 1 +
src/couch/test/couchdb_attachments_tests.erl | 132 ++++++
src/couch_mrview/src/couch_mrview.erl | 8 +-
.../test/couch_mrview_ddoc_validation_tests.erl | 59 ++-
.../src/couch_replicator_api_wrap.erl | 4 +-
... => couch_replicator_attachments_too_large.erl} | 36 +-
src/fabric/src/fabric_doc_attachments.erl | 28 +-
src/fabric/src/fabric_doc_update.erl | 4 +-
src/mango/src/mango_cursor.hrl | 4 +
src/mango/src/mango_cursor_text.erl | 39 +-
src/mango/src/mango_cursor_view.erl | 84 ++--
src/mango/src/mango_execution_stats.erl | 89 ++++
.../{mango_idx.hrl => mango_execution_stats.hrl} | 14 +-
src/mango/src/mango_httpd.erl | 1 +
src/mango/src/mango_opts.erl | 6 +
src/mango/test/02-basic-find-test.py | 13 +-
src/mango/test/03-operator-test.py | 159 ++++++-
src/mango/test/15-execution-stats-test.py | 58 +++
src/mango/test/mango.py | 15 +-
src/mango/test/user_docs.py | 29 +-
test/javascript/tests/users_db_security.js | 46 ++
30 files changed, 1534 insertions(+), 190 deletions(-)
create mode 100644 src/chttpd/test/chttpd_db_attachment_size_tests.erl
copy src/couch_replicator/test/{couch_replicator_id_too_long_tests.erl => couch_replicator_attachments_too_large.erl} (73%)
create mode 100644 src/mango/src/mango_execution_stats.erl
copy src/mango/src/{mango_idx.hrl => mango_execution_stats.hrl} (74%)
create mode 100644 src/mango/test/15-execution-stats-test.py
--
To stop receiving notification emails like this one, please contact
['"commits@couchdb.apache.org" <co...@couchdb.apache.org>'].
[couchdb] 06/06: Optimize compactor to use multi-IO API
Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch compactor-optimize-emsort
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit a333595ee66e122ea6fa76bfb1f72d76d609ac60
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Fri Sep 8 09:40:52 2017 -0500
Optimize compactor to use multi-IO API
This updates couch_db_updater to use the new multi-IO API functions
(append_terms/pread_terms) in couch_file. This optimization benefits us
by no longer requiring the `couch_emsort:merge/1` step to copy
`#full_doc_info{}` records multiple times while also not being penalized
by signficantly increasing the number of calls through couch_file APIs.
---
src/couch/src/couch_db_updater.erl | 54 +++++++++++++++++++++++---------------
1 file changed, 33 insertions(+), 21 deletions(-)
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 6bcf727..4786ee7 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -36,11 +36,12 @@
}).
-record(merge_st, {
+ src_fd,
id_tree,
seq_tree,
curr,
rem_seqs,
- infos
+ locs
}).
init({DbName, Filepath, Fd, Options}) ->
@@ -1340,10 +1341,16 @@ copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
{ok, SeqTree} = couch_btree:add_remove(
NewDb#db.seq_tree, NewInfos, RemoveSeqs),
- FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) ->
- {{Id, Seq}, FDI}
- end, NewInfos),
- {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs),
+ EMSortFd = couch_emsort:get_fd(NewDb#db.id_tree),
+ {ok, LocSizes} = couch_file:append_terms(EMSortFd, NewInfos),
+ EMSortEntries = lists:zipwith(fun(FDI, {Loc, _}) ->
+ #full_doc_info{
+ id = Id,
+ update_seq = Seq
+ } = FDI,
+ {{Id, Seq}, Loc}
+ end, NewInfos, LocSizes),
+ {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, EMSortEntries),
update_compact_task(length(NewInfos)),
NewDb#db{id_tree=IdEms, seq_tree=SeqTree}.
@@ -1455,6 +1462,7 @@ copy_meta_data(#comp_st{new_db = Db} = CompSt) ->
header = Header
} = Db,
Src = Db#db.id_tree,
+ SrcFd = couch_emsort:get_fd(Src),
DstState = couch_db_header:id_tree_state(Header),
{ok, IdTree0} = couch_btree:open(DstState, Fd, [
{split, fun ?MODULE:btree_by_id_split/1},
@@ -1463,14 +1471,16 @@ copy_meta_data(#comp_st{new_db = Db} = CompSt) ->
]),
{ok, Iter} = couch_emsort:iter(Src),
Acc0 = #merge_st{
+ src_fd=SrcFd,
id_tree=IdTree0,
seq_tree=Db#db.seq_tree,
rem_seqs=[],
- infos=[]
+ locs=[]
},
?COMP_EVENT(md_copy_init),
Acc = merge_docids(Iter, Acc0),
- {ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Acc#merge_st.infos),
+ {ok, Infos} = couch_file:pread_terms(SrcFd, Acc#merge_st.locs),
+ {ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Infos),
{ok, SeqTree} = couch_btree:add_remove(
Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
),
@@ -1526,34 +1536,36 @@ verify_compaction(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
CompSt.
-merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 ->
+merge_docids(Iter, #merge_st{locs=Locs}=Acc) when length(Locs) > 1000 ->
#merge_st{
+ src_fd=SrcFd,
id_tree=IdTree0,
seq_tree=SeqTree0,
rem_seqs=RemSeqs
} = Acc,
+ {ok, Infos} = couch_file:pread_terms(SrcFd, Locs),
{ok, IdTree1} = couch_btree:add(IdTree0, Infos),
{ok, SeqTree1} = couch_btree:add_remove(SeqTree0, [], RemSeqs),
Acc1 = Acc#merge_st{
id_tree=IdTree1,
seq_tree=SeqTree1,
rem_seqs=[],
- infos=[]
+ locs=[]
},
merge_docids(Iter, Acc1);
merge_docids(Iter, #merge_st{curr=Curr}=Acc) ->
case next_info(Iter, Curr, []) of
- {NextIter, NewCurr, FDI, Seqs} ->
+ {NextIter, NewCurr, Loc, Seqs} ->
Acc1 = Acc#merge_st{
- infos = [FDI | Acc#merge_st.infos],
+ locs = [Loc | Acc#merge_st.locs],
rem_seqs = Seqs ++ Acc#merge_st.rem_seqs,
curr = NewCurr
},
?COMP_EVENT(md_copy_row),
merge_docids(NextIter, Acc1);
- {finished, FDI, Seqs} ->
+ {finished, Loc, Seqs} ->
Acc#merge_st{
- infos = [FDI | Acc#merge_st.infos],
+ locs = [Loc | Acc#merge_st.locs],
rem_seqs = Seqs ++ Acc#merge_st.rem_seqs,
curr = undefined
};
@@ -1564,19 +1576,19 @@ merge_docids(Iter, #merge_st{curr=Curr}=Acc) ->
next_info(Iter, undefined, []) ->
case couch_emsort:next(Iter) of
- {ok, {{Id, Seq}, FDI}, NextIter} ->
- next_info(NextIter, {Id, Seq, FDI}, []);
+ {ok, {{Id, Seq}, Loc}, NextIter} ->
+ next_info(NextIter, {Id, Seq, Loc}, []);
finished ->
empty
end;
-next_info(Iter, {Id, Seq, FDI}, Seqs) ->
+next_info(Iter, {Id, Seq, Loc}, Seqs) ->
case couch_emsort:next(Iter) of
- {ok, {{Id, NSeq}, NFDI}, NextIter} ->
- next_info(NextIter, {Id, NSeq, NFDI}, [Seq | Seqs]);
- {ok, {{NId, NSeq}, NFDI}, NextIter} ->
- {NextIter, {NId, NSeq, NFDI}, FDI, Seqs};
+ {ok, {{Id, NSeq}, NLoc}, NextIter} ->
+ next_info(NextIter, {Id, NSeq, NLoc}, [Seq | Seqs]);
+ {ok, {{NId, NSeq}, NLoc}, NextIter} ->
+ {NextIter, {NId, NSeq, NLoc}, Loc, Seqs};
finished ->
- {finished, FDI, Seqs}
+ {finished, Loc, Seqs}
end.
--
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.
[couchdb] 03/06: Implement compactor test suite
Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch compactor-optimize-emsort
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 98d35f2615c8170ba24d1bc7408be2f07cebd0dc
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Sep 6 16:26:29 2017 -0500
Implement compactor test suite
---
src/couch/src/couch_db_updater.erl | 61 +++++-
src/couch/test/couch_db_updater_ev.erl | 106 ++++++++++
src/couch/test/couch_db_updater_tests.erl | 317 ++++++++++++++++++++++++++++++
3 files changed, 483 insertions(+), 1 deletion(-)
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 3142516..6bcf727 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -1039,11 +1039,21 @@ check_md5(Md5, Md5) -> ok;
check_md5(_, _) -> throw(md5_mismatch).
+-ifdef(TEST).
+-define(COMP_EVENT(Name),
+ couch_db_updater_ev:event(Name)).
+-else.
+-define(COMP_EVENT(Name), ignore).
+-endif.
+
+
start_copy_compact(#db{}=Db) ->
erlang:put(io_priority, {db_compact, Db#db.name}),
couch_log:debug("Compaction process spawned for db \"~s\"", [Db#db.name]),
+ ?COMP_EVENT(init),
{ok, InitCompSt} = open_compaction_files(Db),
+ ?COMP_EVENT(files_opened),
Stages = [
fun copy_purge_info/1,
@@ -1052,7 +1062,8 @@ start_copy_compact(#db{}=Db) ->
fun sort_meta_data/1,
fun commit_compaction_data/1,
fun copy_meta_data/1,
- fun compact_final_sync/1
+ fun compact_final_sync/1,
+ fun verify_compaction/1
],
FinalCompSt = lists:foldl(fun(Stage, CompSt) ->
@@ -1067,6 +1078,7 @@ start_copy_compact(#db{}=Db) ->
close_db(FinalNewDb),
ok = couch_file:close(MetaFd),
+ ?COMP_EVENT(before_notify),
gen_server:cast(Db#db.main_pid, {compact_done, FinalNewDb#db.filepath}).
@@ -1146,6 +1158,7 @@ reset_compaction_file(Fd, Header) ->
copy_purge_info(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
+ ?COMP_EVENT(purge_init),
OldHdr = OldDb#db.header,
NewHdr = NewDb#db.header,
OldPurgeSeq = couch_db_header:purge_seq(OldHdr),
@@ -1154,6 +1167,7 @@ copy_purge_info(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
{ok, PurgedIdsRevs} = couch_db:get_last_purged(OldDb),
Opts = [{compression, NewDb#db.compression}],
{ok, Ptr, _} = couch_file:append_term(NewDb#db.fd, PurgedIdsRevs, Opts),
+ ?COMP_EVENT(purge_done),
CompSt#comp_st{
new_db = NewDb#db{
header = couch_db_header:set(NewHdr, [
@@ -1163,6 +1177,7 @@ copy_purge_info(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
}
};
true ->
+ ?COMP_EVENT(purge_done),
CompSt
end.
@@ -1229,6 +1244,7 @@ copy_compact(#comp_st{} = CompSt) ->
couch_task_status:set_update_frequency(500)
end,
+ ?COMP_EVENT(seq_init),
{ok, _, {NewDb2, Uncopied, _, _}} =
couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun,
{NewDb, [], 0, 0},
@@ -1236,6 +1252,8 @@ copy_compact(#comp_st{} = CompSt) ->
NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
+ ?COMP_EVENT(seq_done),
+
% copy misc header values
if NewDb3#db.security /= Db#db.security ->
{ok, Ptr, _} = couch_file:append_term(
@@ -1294,6 +1312,7 @@ copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts),
NewActiveSize = FinalAS + TotalAttSize,
NewExternalSize = FinalES + TotalAttSize,
+ ?COMP_EVENT(seq_copy),
Info#full_doc_info{
rev_tree = NewRevTree,
sizes = #size_info{
@@ -1420,7 +1439,9 @@ bind_id_tree(Db, Fd, State) ->
sort_meta_data(#comp_st{new_db = Db0} = CompSt) ->
+ ?COMP_EVENT(md_sort_init),
{ok, Ems} = couch_emsort:merge(Db0#db.id_tree),
+ ?COMP_EVENT(md_sort_done),
CompSt#comp_st{
new_db = Db0#db{
id_tree = Ems
@@ -1447,11 +1468,13 @@ copy_meta_data(#comp_st{new_db = Db} = CompSt) ->
rem_seqs=[],
infos=[]
},
+ ?COMP_EVENT(md_copy_init),
Acc = merge_docids(Iter, Acc0),
{ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Acc#merge_st.infos),
{ok, SeqTree} = couch_btree:add_remove(
Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
),
+ ?COMP_EVENT(md_copy_done),
CompSt#comp_st{
new_db = Db#db{
id_tree = IdTree,
@@ -1461,13 +1484,48 @@ copy_meta_data(#comp_st{new_db = Db} = CompSt) ->
compact_final_sync(#comp_st{new_db = NewDb0} = CompSt) ->
+ ?COMP_EVENT(before_final_sync),
NewHdr = db_to_header(NewDb0, NewDb0#db.header),
NewDb1 = sync_header(NewDb0, NewHdr),
+ ?COMP_EVENT(after_final_sync),
CompSt#comp_st{
new_db = NewDb1
}.
+verify_compaction(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
+ {ok, OldIdReds0} = couch_btree:full_reduce(OldDb#db.id_tree),
+ {ok, OldSeqReds} = couch_btree:full_reduce(OldDb#db.seq_tree),
+ {ok, NewIdReds0} = couch_btree:full_reduce(NewDb#db.id_tree),
+ {ok, NewSeqReds} = couch_btree:full_reduce(NewDb#db.seq_tree),
+ {
+ OldDocCount,
+ OldDelDocCount,
+ OldSizes
+ } = OldIdReds0,
+ #size_info{
+ external = OldExternalSize
+ } = upgrade_sizes(OldSizes),
+ OldIdReds = {OldDocCount, OldDelDocCount, OldExternalSize},
+ {
+ NewDocCount,
+ NewDelDocCount,
+ #size_info{external = NewExternalSize}
+ } = NewIdReds0,
+ NewIdReds = {NewDocCount, NewDelDocCount, NewExternalSize},
+ if NewIdReds == OldIdReds -> ok; true ->
+ Fmt1 = "Compacted id tree for ~s differs from source: ~p /= ~p",
+ couch_log:error(Fmt1, [couch_db:name(OldDb), NewIdReds, OldIdReds]),
+ exit({compaction_error, id_tree})
+ end,
+ if NewSeqReds == OldSeqReds -> ok; true ->
+ Fmt2 = "Compacted seq tree for ~s differs from source: ~p /= ~p",
+ couch_log:error(Fmt2, [couch_db:name(OldDb), NewSeqReds, OldSeqReds]),
+ exit({compaction_error, seq_tree})
+ end,
+ CompSt.
+
+
merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 ->
#merge_st{
id_tree=IdTree0,
@@ -1491,6 +1549,7 @@ merge_docids(Iter, #merge_st{curr=Curr}=Acc) ->
rem_seqs = Seqs ++ Acc#merge_st.rem_seqs,
curr = NewCurr
},
+ ?COMP_EVENT(md_copy_row),
merge_docids(NextIter, Acc1);
{finished, FDI, Seqs} ->
Acc#merge_st{
diff --git a/src/couch/test/couch_db_updater_ev.erl b/src/couch/test/couch_db_updater_ev.erl
new file mode 100644
index 0000000..3a57141
--- /dev/null
+++ b/src/couch/test/couch_db_updater_ev.erl
@@ -0,0 +1,106 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_db_updater_ev).
+
+
+-export([
+ init/0,
+ terminate/0,
+ clear/0,
+
+ set_wait/1,
+ set_crash/1,
+
+ event/1
+]).
+
+
+-define(TAB, couch_db_updater_ev_tab).
+
+
+init() ->
+ ets:new(?TAB, [set, public, named_table]).
+
+
+terminate() ->
+ ets:delete(?TAB).
+
+
+clear() ->
+ ets:delete_all_objects(?TAB).
+
+
+set_wait(Event) ->
+ Self = self(),
+ WaitFun = fun(_) ->
+ receive
+ {Self, go} ->
+ Self ! {self(), ok}
+ end,
+ ets:delete(?TAB, Event)
+ end,
+ ContinueFun = fun(Pid) ->
+ Pid ! {Self, go},
+ receive {Pid, ok} -> ok end
+ end,
+ ets:insert(?TAB, {Event, WaitFun}),
+ {ok, ContinueFun}.
+
+
+set_crash(Event) ->
+ Reason = {couch_db_updater_ev_crash, Event},
+ CrashFun = fun(_) -> exit(Reason) end,
+ ets:insert(?TAB, {Event, CrashFun}),
+ {ok, Reason}.
+
+
+event(Event) ->
+ NewEvent = case Event of
+ seq_init ->
+ put(?MODULE, 0),
+ Event;
+ seq_copy ->
+ Count = get(?MODULE),
+ put(?MODULE, Count + 1),
+ {seq_copy, Count};
+ id_init ->
+ put(?MODULE, 0),
+ Event;
+ id_copy ->
+ Count = get(?MODULE),
+ put(?MODULE, Count + 1),
+ {id_copy, Count};
+ md_copy_init ->
+ put(?MODULE, 0),
+ Event;
+ md_copy_row ->
+ Count = get(?MODULE),
+ put(?MODULE, Count + 1),
+ {md_copy_row, Count};
+ _ ->
+ Event
+ end,
+ handle_event(NewEvent).
+
+
+handle_event(Event) ->
+ try
+ case ets:lookup(?TAB, Event) of
+ [{Event, ActionFun}] ->
+ ActionFun(Event);
+ [] ->
+ ok
+ end
+ catch error:badarg ->
+ ok
+ end.
\ No newline at end of file
diff --git a/src/couch/test/couch_db_updater_tests.erl b/src/couch/test/couch_db_updater_tests.erl
new file mode 100644
index 0000000..9ad492e
--- /dev/null
+++ b/src/couch/test/couch_db_updater_tests.erl
@@ -0,0 +1,317 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_db_updater_tests).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
+
+-define(TIMEOUT_EUNIT, 60).
+-define(EV_MOD, couch_db_updater_ev).
+-define(INIT_DOCS, 2500).
+-define(WRITE_DOCS, 20).
+
+% The idea behind the tests in this module are to attempt to
+% cover the number of restart/recopy events during compaction
+% so that we can be as sure as possible that the compactor
+% is resilient to errors in the face of external conditions
+% (i.e., the VM rebooted). The single linear pass is easy enough
+% to prove, however restarting is important enough that we don't
+% want to waste work if a VM happens to bounce a lot.
+%
+% To try and cover as many restart situations we have created a
+% number of events in the compactor code that are present during
+% a test compiled version of the module. These events can then
+% be used (via meck) to introduce errors and coordinate writes
+% to the database while compaction is in progress.
+
+% This list of events is where we'll insert our errors.
+
+events() ->
+ [
+ init, % The compactor process is spawned
+ files_opened, % After compaction files have opened
+
+ purge_init, % Just before apply purge changes
+ purge_done, % Just after finish purge updates
+
+ % The firs phase is when we copy all document body and attachment
+ % data to the new database file in order of update sequence so
+ % that we can resume on crash.
+
+ seq_init, % Before the first change is copied
+ {seq_copy, 0}, % After change N is copied
+ {seq_copy, ?INIT_DOCS div 2},
+ {seq_copy, ?INIT_DOCS - 2},
+ seq_done, % After last change is copied
+
+ % The id copy phases come in two flavors. Before a compaction
+ % swap is attempted they're copied from the id_tree in the
+ % database being compacted. After a swap attempt they are
+ % stored in an emsort file on disk. Thus the two sets of
+ % related events here.
+
+ md_sort_init, % Just before metadata sort starts
+ md_sort_done, % Justa after metadata sort finished
+ md_copy_init, % Just before metadata copy starts
+ {md_copy_row, 0}, % After docid N is copied
+ {md_copy_row, ?INIT_DOCS div 2},
+ {md_copy_row, ?INIT_DOCS - 2},
+ md_copy_done, % Just after the last docid is copied
+
+ % And then the final steps before we finish
+
+ before_final_sync, % Just before final sync
+ after_final_sync, % Just after the final sync
+ before_notify % Just before the final notification
+ ].
+
+% Mark which evens only happen when documents are present
+
+requires_docs({seq_copy, _}) -> true;
+requires_docs(md_sort_init) -> true;
+requires_docs(md_sort_done) -> true;
+requires_docs(md_copy_init) -> true;
+requires_docs({md_copy_row, _}) -> true;
+requires_docs(md_copy_done) -> true;
+requires_docs(_) -> false.
+
+
+% Mark which events only happen when there's write activity during
+% a compaction.
+
+requires_write(md_sort_init) -> true;
+requires_write(md_sort_done) -> true;
+requires_write(md_copy_init) -> true;
+requires_write({md_copy_row, _}) -> true;
+requires_write(md_copy_done) -> true;
+requires_write(_) -> false.
+
+
+setup() ->
+ purge_module(),
+ ?EV_MOD:init(),
+ test_util:start_couch().
+
+
+teardown(Ctx) ->
+ test_util:stop_couch(Ctx),
+ ?EV_MOD:terminate().
+
+
+start_empty_db_test(_Event) ->
+ ?EV_MOD:clear(),
+ DbName = ?tempdb(),
+ {ok, _} = couch_db:create(DbName, [?ADMIN_CTX]),
+ DbName.
+
+
+start_populated_db_test(Event) ->
+ DbName = start_empty_db_test(Event),
+ {ok, Db} = couch_db:open_int(DbName, []),
+ try
+ populate_db(Db, ?INIT_DOCS)
+ after
+ couch_db:close(Db)
+ end,
+ DbName.
+
+
+stop_test(_Event, DbName) ->
+ couch_server:delete(DbName, [?ADMIN_CTX]).
+
+
+static_empty_db_test_() ->
+ FiltFun = fun(E) ->
+ not (requires_docs(E) or requires_write(E))
+ end,
+ Events = lists:filter(FiltFun, events()) -- [init],
+ {
+ "Idle empty database",
+ {
+ setup,
+ fun setup/0,
+ fun teardown/1,
+ [
+ {
+ foreachx,
+ fun start_empty_db_test/1,
+ fun stop_test/2,
+ [{Event, fun run_static_init/2} || Event <- Events]
+ }
+ ]
+ }
+ }.
+
+
+static_populated_db_test_() ->
+ FiltFun = fun(E) -> not requires_write(E) end,
+ Events = lists:filter(FiltFun, events()) -- [init],
+ {
+ "Idle populated database",
+ {
+ setup,
+ fun setup/0,
+ fun teardown/1,
+ [
+ {
+ foreachx,
+ fun start_populated_db_test/1,
+ fun stop_test/2,
+ [{Event, fun run_static_init/2} || Event <- Events]
+ }
+ ]
+ }
+ }.
+
+
+dynamic_empty_db_test_() ->
+ FiltFun = fun(E) -> not requires_docs(E) end,
+ Events = lists:filter(FiltFun, events()) -- [init],
+ {
+ "Writes to empty database",
+ {
+ setup,
+ fun setup/0,
+ fun teardown/1,
+ [
+ {
+ foreachx,
+ fun start_empty_db_test/1,
+ fun stop_test/2,
+ [{Event, fun run_dynamic_init/2} || Event <- Events]
+ }
+ ]
+ }
+ }.
+
+
+dynamic_populated_db_test_() ->
+ Events = events() -- [init],
+ {
+ "Writes to populated database",
+ {
+ setup,
+ fun setup/0,
+ fun teardown/1,
+ [
+ {
+ foreachx,
+ fun start_populated_db_test/1,
+ fun stop_test/2,
+ [{Event, fun run_dynamic_init/2} || Event <- Events]
+ }
+ ]
+ }
+ }.
+
+
+run_static_init(Event, DbName) ->
+ Name = lists:flatten(io_lib:format("~p", [Event])),
+ Test = {timeout, ?TIMEOUT_EUNIT, ?_test(run_static(Event, DbName))},
+ {Name, Test}.
+
+
+run_static(Event, DbName) ->
+ {ok, ContinueFun} = ?EV_MOD:set_wait(init),
+ {ok, Reason} = ?EV_MOD:set_crash(Event),
+ {ok, Db} = couch_db:open_int(DbName, []),
+ Ref = couch_db:monitor(Db),
+ {ok, CPid} = couch_db:start_compact(Db),
+ ContinueFun(CPid),
+ receive
+ {'DOWN', Ref, _, _, Reason} ->
+ wait_db_cleared(Db)
+ end,
+ run_successful_compaction(DbName),
+ couch_db:close(Db).
+
+
+run_dynamic_init(Event, DbName) ->
+ Name = lists:flatten(io_lib:format("~p", [Event])),
+ Test = {timeout, ?TIMEOUT_EUNIT, ?_test(run_dynamic(Event, DbName))},
+ {Name, Test}.
+
+
+run_dynamic(Event, DbName) ->
+ {ok, ContinueFun} = ?EV_MOD:set_wait(init),
+ {ok, Reason} = ?EV_MOD:set_crash(Event),
+ {ok, Db} = couch_db:open_int(DbName, []),
+ Ref = couch_db:monitor(Db),
+ {ok, CPid} = couch_db:start_compact(Db),
+ ok = populate_db(Db, 10),
+ ContinueFun(CPid),
+ receive
+ {'DOWN', Ref, _, _, Reason} ->
+ wait_db_cleared(Db)
+ end,
+ run_successful_compaction(DbName),
+ couch_db:close(Db).
+
+
+run_successful_compaction(DbName) ->
+ ?EV_MOD:clear(),
+ {ok, ContinueFun} = ?EV_MOD:set_wait(init),
+ {ok, Db} = couch_db:open_int(DbName, []),
+ {ok, CPid} = couch_db:start_compact(Db),
+ Ref = erlang:monitor(process, CPid),
+ ContinueFun(CPid),
+ receive
+ {'DOWN', Ref, _, _, normal} -> ok
+ end,
+ ?assertMatch({ok, _}, gen_server:call(Db#db.main_pid, get_db)),
+ couch_db:close(Db).
+
+
+wait_db_cleared(Db) ->
+ wait_db_cleared(Db, 5).
+
+
+wait_db_cleared(Db, N) when N < 0 ->
+ erlang:error({db_clear_timeout, couch_db:name(Db)});
+
+wait_db_cleared(Db, N) ->
+ case ets:lookup(couch_dbs, couch_db:name(Db)) of
+ [] ->
+ ok;
+ [NewDb] when NewDb#db.main_pid /= Db#db.main_pid ->
+ ok;
+ _ ->
+ timer:sleep(100),
+ wait_db_cleared(Db, N - 1)
+ end.
+
+
+populate_db(_Db, NumDocs) when NumDocs =< 0 ->
+ ok;
+populate_db(Db, NumDocs) ->
+ String = [$a || _ <- lists:seq(1, erlang:min(NumDocs, 500))],
+ Docs = lists:map(
+ fun(_) ->
+ couch_doc:from_json_obj({[
+ {<<"_id">>, couch_uuids:random()},
+ {<<"string">>, list_to_binary(String)}
+ ]})
+ end,
+ lists:seq(1, 500)),
+ {ok, _} = couch_db:update_docs(Db, Docs, []),
+ populate_db(Db, NumDocs - 500).
+
+
+purge_module() ->
+ case code:which(couch_db_updater) of
+ cover_compiled ->
+ ok;
+ _ ->
+ code:delete(couch_db_updater),
+ code:purge(couch_db_updater)
+ end.
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.
[couchdb] 05/06: Optimize btree node writes
Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch compactor-optimize-emsort
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 084f72a2f639a72a1ec8694a3caf1f14c6ba9f26
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Sep 6 16:26:03 2017 -0500
Optimize btree node writes
This uses the new couch_file:append_terms/2 function to write all chunks
in a single write call.
---
src/couch/src/couch_btree.erl | 28 +++++++++++++++-------------
1 file changed, 15 insertions(+), 13 deletions(-)
diff --git a/src/couch/src/couch_btree.erl b/src/couch/src/couch_btree.erl
index ea224b1..0ae6e7a 100644
--- a/src/couch/src/couch_btree.erl
+++ b/src/couch/src/couch_btree.erl
@@ -436,20 +436,22 @@ get_node(#btree{fd = Fd}, NodePos) ->
write_node(#btree{fd = Fd, compression = Comp} = Bt, NodeType, NodeList) ->
% split up nodes into smaller sizes
- NodeListList = chunkify(NodeList),
+ Chunks = chunkify(NodeList),
% now write out each chunk and return the KeyPointer pairs for those nodes
- ResultList = [
- begin
- {ok, Pointer, Size} = couch_file:append_term(
- Fd, {NodeType, ANodeList}, [{compression, Comp}]),
- {LastKey, _} = lists:last(ANodeList),
- SubTreeSize = reduce_tree_size(NodeType, Size, ANodeList),
- {LastKey, {Pointer, reduce_node(Bt, NodeType, ANodeList), SubTreeSize}}
- end
- ||
- ANodeList <- NodeListList
- ],
- {ok, ResultList}.
+ ToWrite = [{NodeType, Chunk} || Chunk <- Chunks],
+ WriteOpts = [{compression, Comp}],
+ {ok, PtrSizes} = couch_file:append_terms(Fd, ToWrite, WriteOpts),
+ {ok, group_kps(Bt, NodeType, Chunks, PtrSizes)}.
+
+
+group_kps(_Bt, _NodeType, [], []) ->
+ [];
+
+group_kps(Bt, NodeType, [Chunk | RestChunks], [{Ptr, Size} | RestPtrSizes]) ->
+ {LastKey, _} = lists:last(Chunk),
+ SubTreeSize = reduce_tree_size(NodeType, Size, Chunk),
+ KP = {LastKey, {Ptr, reduce_node(Bt, NodeType, Chunk), SubTreeSize}},
+ [KP | group_kps(Bt, NodeType, RestChunks, RestPtrSizes)].
write_node(Bt, _OldNode, NodeType, [], NewList) ->
--
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.
[couchdb] 01/06: Reorder compaction functions
Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch compactor-optimize-emsort
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit a7a66c3b74a932adc5f71e4bc5d046928be61389
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Aug 16 11:23:07 2017 -0500
Reorder compaction functions
Strictly copy/paste moving of code to make a more logical ordering of
start to finish reading top down.
---
src/couch/src/couch_db_updater.erl | 373 +++++++++++++++++++------------------
1 file changed, 188 insertions(+), 185 deletions(-)
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 78e0b8c..f786048 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -1014,40 +1014,6 @@ sync_header(Db, NewHeader) ->
waiting_delayed_commit=nil
}.
-copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) ->
- {ok, {BodyData, BinInfos0}} = couch_db:read_doc(SrcDb, SrcSp),
- BinInfos = case BinInfos0 of
- _ when is_binary(BinInfos0) ->
- couch_compress:decompress(BinInfos0);
- _ when is_list(BinInfos0) ->
- % pre 1.2 file format
- BinInfos0
- end,
- % copy the bin values
- NewBinInfos = lists:map(
- fun({Name, Type, BinSp, AttLen, RevPos, ExpectedMd5}) ->
- % 010 UPGRADE CODE
- {NewBinSp, AttLen, AttLen, ActualMd5, _IdentityMd5} =
- couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
- check_md5(ExpectedMd5, ActualMd5),
- {Name, Type, NewBinSp, AttLen, AttLen, RevPos, ExpectedMd5, identity};
- ({Name, Type, BinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc1}) ->
- {NewBinSp, AttLen, _, ActualMd5, _IdentityMd5} =
- couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
- check_md5(ExpectedMd5, ActualMd5),
- Enc = case Enc1 of
- true ->
- % 0110 UPGRADE CODE
- gzip;
- false ->
- % 0110 UPGRADE CODE
- identity;
- _ ->
- Enc1
- end,
- {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc}
- end, BinInfos),
- {BodyData, NewBinInfos}.
merge_lookups(Infos, []) ->
Infos;
@@ -1065,157 +1031,6 @@ merge_lookups([FDI | RestInfos], Lookups) ->
check_md5(Md5, Md5) -> ok;
check_md5(_, _) -> throw(md5_mismatch).
-copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
- DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos],
- LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds),
- % COUCHDB-968, make sure we prune duplicates during compaction
- NewInfos0 = lists:usort(fun(#full_doc_info{id=A}, #full_doc_info{id=B}) ->
- A =< B
- end, merge_lookups(MixedInfos, LookupResults)),
-
- NewInfos1 = lists:map(fun(Info) ->
- {NewRevTree, FinalAcc} = couch_key_tree:mapfold(fun
- (_Rev, #leaf{ptr=Sp}=Leaf, leaf, SizesAcc) ->
- {Body, AttInfos} = copy_doc_attachments(Db, Sp, DestFd),
- % In the future, we should figure out how to do this for
- % upgrade purposes.
- EJsonBody = case is_binary(Body) of
- true ->
- couch_compress:decompress(Body);
- false ->
- Body
- end,
- SummaryChunk = make_doc_summary(NewDb, {Body, AttInfos}),
- ExternalSize = ?term_size(EJsonBody),
- {ok, Pos, SummarySize} = couch_file:append_raw_chunk(
- DestFd, SummaryChunk),
- AttSizes = [{element(3,A), element(4,A)} || A <- AttInfos],
- NewLeaf = Leaf#leaf{
- ptr = Pos,
- sizes = #size_info{
- active = SummarySize,
- external = ExternalSize
- },
- atts = AttSizes
- },
- {NewLeaf, add_sizes(leaf, NewLeaf, SizesAcc)};
- (_Rev, _Leaf, branch, SizesAcc) ->
- {?REV_MISSING, SizesAcc}
- end, {0, 0, []}, Info#full_doc_info.rev_tree),
- {FinalAS, FinalES, FinalAtts} = FinalAcc,
- TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts),
- NewActiveSize = FinalAS + TotalAttSize,
- NewExternalSize = FinalES + TotalAttSize,
- Info#full_doc_info{
- rev_tree = NewRevTree,
- sizes = #size_info{
- active = NewActiveSize,
- external = NewExternalSize
- }
- }
- end, NewInfos0),
-
- NewInfos = stem_full_doc_infos(Db, NewInfos1),
- RemoveSeqs =
- case Retry of
- nil ->
- [];
- OldDocIdTree ->
- % Compaction is being rerun to catch up to writes during the
- % first pass. This means we may have docs that already exist
- % in the seq_tree in the .data file. Here we lookup any old
- % update_seqs so that they can be removed.
- Ids = [Id || #full_doc_info{id=Id} <- NewInfos],
- Existing = couch_btree:lookup(OldDocIdTree, Ids),
- [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing]
- end,
-
- {ok, SeqTree} = couch_btree:add_remove(
- NewDb#db.seq_tree, NewInfos, RemoveSeqs),
-
- FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) ->
- {{Id, Seq}, FDI}
- end, NewInfos),
- {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs),
- update_compact_task(length(NewInfos)),
- NewDb#db{id_tree=IdEms, seq_tree=SeqTree}.
-
-
-copy_compact(Db, NewDb0, Retry) ->
- Compression = couch_compress:get_compression_method(),
- NewDb = NewDb0#db{compression=Compression},
- TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
- BufferSize = list_to_integer(
- config:get("database_compaction", "doc_buffer_size", "524288")),
- CheckpointAfter = couch_util:to_integer(
- config:get("database_compaction", "checkpoint_after",
- BufferSize * 10)),
-
- EnumBySeqFun =
- fun(DocInfo, _Offset,
- {AccNewDb, AccUncopied, AccUncopiedSize, AccCopiedSize}) ->
-
- Seq = case DocInfo of
- #full_doc_info{} -> DocInfo#full_doc_info.update_seq;
- #doc_info{} -> DocInfo#doc_info.high_seq
- end,
-
- AccUncopiedSize2 = AccUncopiedSize + ?term_size(DocInfo),
- if AccUncopiedSize2 >= BufferSize ->
- NewDb2 = copy_docs(
- Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
- AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2,
- if AccCopiedSize2 >= CheckpointAfter ->
- CommNewDb2 = commit_compaction_data(NewDb2#db{update_seq=Seq}),
- {ok, {CommNewDb2, [], 0, 0}};
- true ->
- {ok, {NewDb2#db{update_seq = Seq}, [], 0, AccCopiedSize2}}
- end;
- true ->
- {ok, {AccNewDb, [DocInfo | AccUncopied], AccUncopiedSize2,
- AccCopiedSize}}
- end
- end,
-
- TaskProps0 = [
- {type, database_compaction},
- {database, Db#db.name},
- {progress, 0},
- {changes_done, 0},
- {total_changes, TotalChanges}
- ],
- case (Retry =/= nil) and couch_task_status:is_task_added() of
- true ->
- couch_task_status:update([
- {retry, true},
- {progress, 0},
- {changes_done, 0},
- {total_changes, TotalChanges}
- ]);
- false ->
- couch_task_status:add_task(TaskProps0),
- couch_task_status:set_update_frequency(500)
- end,
-
- {ok, _, {NewDb2, Uncopied, _, _}} =
- couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun,
- {NewDb, [], 0, 0},
- [{start_key, NewDb#db.update_seq + 1}]),
-
- NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
-
- % copy misc header values
- if NewDb3#db.security /= Db#db.security ->
- {ok, Ptr, _} = couch_file:append_term(
- NewDb3#db.fd, Db#db.security,
- [{compression, NewDb3#db.compression}]),
- NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr};
- true ->
- NewDb4 = NewDb3
- end,
-
- commit_compaction_data(NewDb4#db{update_seq=Db#db.update_seq}).
-
start_copy_compact(#db{}=Db) ->
erlang:put(io_priority, {db_compact, Db#db.name}),
@@ -1307,6 +1122,194 @@ copy_purge_info(OldDb, NewDb) ->
end.
+copy_compact(Db, NewDb0, Retry) ->
+ Compression = couch_compress:get_compression_method(),
+ NewDb = NewDb0#db{compression=Compression},
+ TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
+ BufferSize = list_to_integer(
+ config:get("database_compaction", "doc_buffer_size", "524288")),
+ CheckpointAfter = couch_util:to_integer(
+ config:get("database_compaction", "checkpoint_after",
+ BufferSize * 10)),
+
+ EnumBySeqFun =
+ fun(DocInfo, _Offset,
+ {AccNewDb, AccUncopied, AccUncopiedSize, AccCopiedSize}) ->
+
+ Seq = case DocInfo of
+ #full_doc_info{} -> DocInfo#full_doc_info.update_seq;
+ #doc_info{} -> DocInfo#doc_info.high_seq
+ end,
+
+ AccUncopiedSize2 = AccUncopiedSize + ?term_size(DocInfo),
+ if AccUncopiedSize2 >= BufferSize ->
+ NewDb2 = copy_docs(
+ Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
+ AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2,
+ if AccCopiedSize2 >= CheckpointAfter ->
+ CommNewDb2 = commit_compaction_data(NewDb2#db{update_seq=Seq}),
+ {ok, {CommNewDb2, [], 0, 0}};
+ true ->
+ {ok, {NewDb2#db{update_seq = Seq}, [], 0, AccCopiedSize2}}
+ end;
+ true ->
+ {ok, {AccNewDb, [DocInfo | AccUncopied], AccUncopiedSize2,
+ AccCopiedSize}}
+ end
+ end,
+
+ TaskProps0 = [
+ {type, database_compaction},
+ {database, Db#db.name},
+ {progress, 0},
+ {changes_done, 0},
+ {total_changes, TotalChanges}
+ ],
+ case (Retry =/= nil) and couch_task_status:is_task_added() of
+ true ->
+ couch_task_status:update([
+ {retry, true},
+ {progress, 0},
+ {changes_done, 0},
+ {total_changes, TotalChanges}
+ ]);
+ false ->
+ couch_task_status:add_task(TaskProps0),
+ couch_task_status:set_update_frequency(500)
+ end,
+
+ {ok, _, {NewDb2, Uncopied, _, _}} =
+ couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun,
+ {NewDb, [], 0, 0},
+ [{start_key, NewDb#db.update_seq + 1}]),
+
+ NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
+
+ % copy misc header values
+ if NewDb3#db.security /= Db#db.security ->
+ {ok, Ptr, _} = couch_file:append_term(
+ NewDb3#db.fd, Db#db.security,
+ [{compression, NewDb3#db.compression}]),
+ NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr};
+ true ->
+ NewDb4 = NewDb3
+ end,
+
+ commit_compaction_data(NewDb4#db{update_seq=Db#db.update_seq}).
+
+
+copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
+ DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos],
+ LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds),
+ % COUCHDB-968, make sure we prune duplicates during compaction
+ NewInfos0 = lists:usort(fun(#full_doc_info{id=A}, #full_doc_info{id=B}) ->
+ A =< B
+ end, merge_lookups(MixedInfos, LookupResults)),
+
+ NewInfos1 = lists:map(fun(Info) ->
+ {NewRevTree, FinalAcc} = couch_key_tree:mapfold(fun
+ (_Rev, #leaf{ptr=Sp}=Leaf, leaf, SizesAcc) ->
+ {Body, AttInfos} = copy_doc_attachments(Db, Sp, DestFd),
+ % In the future, we should figure out how to do this for
+ % upgrade purposes.
+ EJsonBody = case is_binary(Body) of
+ true ->
+ couch_compress:decompress(Body);
+ false ->
+ Body
+ end,
+ SummaryChunk = make_doc_summary(NewDb, {Body, AttInfos}),
+ ExternalSize = ?term_size(EJsonBody),
+ {ok, Pos, SummarySize} = couch_file:append_raw_chunk(
+ DestFd, SummaryChunk),
+ AttSizes = [{element(3,A), element(4,A)} || A <- AttInfos],
+ NewLeaf = Leaf#leaf{
+ ptr = Pos,
+ sizes = #size_info{
+ active = SummarySize,
+ external = ExternalSize
+ },
+ atts = AttSizes
+ },
+ {NewLeaf, add_sizes(leaf, NewLeaf, SizesAcc)};
+ (_Rev, _Leaf, branch, SizesAcc) ->
+ {?REV_MISSING, SizesAcc}
+ end, {0, 0, []}, Info#full_doc_info.rev_tree),
+ {FinalAS, FinalES, FinalAtts} = FinalAcc,
+ TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts),
+ NewActiveSize = FinalAS + TotalAttSize,
+ NewExternalSize = FinalES + TotalAttSize,
+ Info#full_doc_info{
+ rev_tree = NewRevTree,
+ sizes = #size_info{
+ active = NewActiveSize,
+ external = NewExternalSize
+ }
+ }
+ end, NewInfos0),
+
+ NewInfos = stem_full_doc_infos(Db, NewInfos1),
+ RemoveSeqs =
+ case Retry of
+ nil ->
+ [];
+ OldDocIdTree ->
+ % Compaction is being rerun to catch up to writes during the
+ % first pass. This means we may have docs that already exist
+ % in the seq_tree in the .data file. Here we lookup any old
+ % update_seqs so that they can be removed.
+ Ids = [Id || #full_doc_info{id=Id} <- NewInfos],
+ Existing = couch_btree:lookup(OldDocIdTree, Ids),
+ [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing]
+ end,
+
+ {ok, SeqTree} = couch_btree:add_remove(
+ NewDb#db.seq_tree, NewInfos, RemoveSeqs),
+
+ FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) ->
+ {{Id, Seq}, FDI}
+ end, NewInfos),
+ {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs),
+ update_compact_task(length(NewInfos)),
+ NewDb#db{id_tree=IdEms, seq_tree=SeqTree}.
+
+
+copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) ->
+ {ok, {BodyData, BinInfos0}} = couch_db:read_doc(SrcDb, SrcSp),
+ BinInfos = case BinInfos0 of
+ _ when is_binary(BinInfos0) ->
+ couch_compress:decompress(BinInfos0);
+ _ when is_list(BinInfos0) ->
+ % pre 1.2 file format
+ BinInfos0
+ end,
+ % copy the bin values
+ NewBinInfos = lists:map(
+ fun({Name, Type, BinSp, AttLen, RevPos, ExpectedMd5}) ->
+ % 010 UPGRADE CODE
+ {NewBinSp, AttLen, AttLen, ActualMd5, _IdentityMd5} =
+ couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
+ check_md5(ExpectedMd5, ActualMd5),
+ {Name, Type, NewBinSp, AttLen, AttLen, RevPos, ExpectedMd5, identity};
+ ({Name, Type, BinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc1}) ->
+ {NewBinSp, AttLen, _, ActualMd5, _IdentityMd5} =
+ couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
+ check_md5(ExpectedMd5, ActualMd5),
+ Enc = case Enc1 of
+ true ->
+ % 0110 UPGRADE CODE
+ gzip;
+ false ->
+ % 0110 UPGRADE CODE
+ identity;
+ _ ->
+ Enc1
+ end,
+ {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc}
+ end, BinInfos),
+ {BodyData, NewBinInfos}.
+
+
commit_compaction_data(#db{}=Db) ->
% Compaction needs to write headers to both the data file
% and the meta file so if we need to restart we can pick
--
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.
[couchdb] 02/06: Simplify compaction state management
Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch compactor-optimize-emsort
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 7c28a76a1f9044b52252e7127c54cee6b4508218
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Aug 16 12:17:40 2017 -0500
Simplify compaction state management
This change adds a new `#comp_st{}` record that is used to pass
compaction state through the various compaction steps. There are zero
changes to the existing compaction logic. This merely sets the stage for
adding our docid copy optimization.
---
src/couch/src/couch_db_updater.erl | 169 +++++++++++++++++++++++++++----------
1 file changed, 125 insertions(+), 44 deletions(-)
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index f786048..3142516 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -23,9 +23,16 @@
-define(IDLE_LIMIT_DEFAULT, 61000).
+-record(comp_st, {
+ old_db,
+ new_db,
+ meta_fd,
+ retry
+}).
+
-record(comp_header, {
db_header,
- meta_state
+ meta_st
}).
-record(merge_st, {
@@ -1034,56 +1041,90 @@ check_md5(_, _) -> throw(md5_mismatch).
start_copy_compact(#db{}=Db) ->
erlang:put(io_priority, {db_compact, Db#db.name}),
- #db{name=Name, filepath=Filepath, options=Options, header=Header} = Db,
- couch_log:debug("Compaction process spawned for db \"~s\"", [Name]),
+ couch_log:debug("Compaction process spawned for db \"~s\"", [Db#db.name]),
+
+ {ok, InitCompSt} = open_compaction_files(Db),
+
+ Stages = [
+ fun copy_purge_info/1,
+ fun copy_compact/1,
+ fun commit_compaction_data/1,
+ fun sort_meta_data/1,
+ fun commit_compaction_data/1,
+ fun copy_meta_data/1,
+ fun compact_final_sync/1
+ ],
- {ok, NewDb, DName, DFd, MFd, Retry} =
- open_compaction_files(Name, Header, Filepath, Options),
- erlang:monitor(process, MFd),
+ FinalCompSt = lists:foldl(fun(Stage, CompSt) ->
+ Stage(CompSt)
+ end, InitCompSt, Stages),
- % This is a bit worrisome. init_db/4 will monitor the data fd
- % but it doesn't know about the meta fd. For now I'll maintain
- % that the data fd is the old normal fd and meta fd is special
- % and hope everything works out for the best.
- unlink(DFd),
+ #comp_st{
+ new_db = FinalNewDb,
+ meta_fd = MetaFd
+ } = FinalCompSt,
- NewDb1 = copy_purge_info(Db, NewDb),
- NewDb2 = copy_compact(Db, NewDb1, Retry),
- NewDb3 = sort_meta_data(NewDb2),
- NewDb4 = commit_compaction_data(NewDb3),
- NewDb5 = copy_meta_data(NewDb4),
- NewDb6 = sync_header(NewDb5, db_to_header(NewDb5, NewDb5#db.header)),
- close_db(NewDb6),
+ close_db(FinalNewDb),
+ ok = couch_file:close(MetaFd),
- ok = couch_file:close(MFd),
- gen_server:cast(Db#db.main_pid, {compact_done, DName}).
+ gen_server:cast(Db#db.main_pid, {compact_done, FinalNewDb#db.filepath}).
-open_compaction_files(DbName, SrcHdr, DbFilePath, Options) ->
+open_compaction_files(OldDb) ->
+ #db{
+ name = DbName,
+ filepath = DbFilePath,
+ options = Options,
+ header = SrcHdr
+ } = OldDb,
DataFile = DbFilePath ++ ".compact.data",
MetaFile = DbFilePath ++ ".compact.meta",
{ok, DataFd, DataHdr} = open_compaction_file(DataFile),
{ok, MetaFd, MetaHdr} = open_compaction_file(MetaFile),
DataHdrIsDbHdr = couch_db_header:is_header(DataHdr),
- case {DataHdr, MetaHdr} of
+ CompSt = case {DataHdr, MetaHdr} of
{#comp_header{}=A, #comp_header{}=A} ->
+ % We're restarting a compaction that did not finish
+ % before trying to swap out with the original db
DbHeader = A#comp_header.db_header,
Db0 = init_db(DbName, DataFile, DataFd, DbHeader, Options),
- Db1 = bind_emsort(Db0, MetaFd, A#comp_header.meta_state),
- {ok, Db1, DataFile, DataFd, MetaFd, Db0#db.id_tree};
+ Db1 = bind_emsort(Db0, MetaFd, A#comp_header.meta_st),
+ #comp_st{
+ old_db = OldDb,
+ new_db = Db1,
+ meta_fd = MetaFd,
+ retry = Db0#db.id_tree
+ };
_ when DataHdrIsDbHdr ->
+ % We tried to swap out the compaction but there were
+ % writes to the database during compaction. Start
+ % a compaction retry.
ok = reset_compaction_file(MetaFd, couch_db_header:from(SrcHdr)),
Db0 = init_db(DbName, DataFile, DataFd, DataHdr, Options),
Db1 = bind_emsort(Db0, MetaFd, nil),
- {ok, Db1, DataFile, DataFd, MetaFd, Db0#db.id_tree};
+ #comp_st{
+ old_db = OldDb,
+ new_db = Db1,
+ meta_fd = MetaFd,
+ retry = Db0#db.id_tree
+ };
_ ->
+ % We're starting a compaction from scratch
Header = couch_db_header:from(SrcHdr),
ok = reset_compaction_file(DataFd, Header),
ok = reset_compaction_file(MetaFd, Header),
Db0 = init_db(DbName, DataFile, DataFd, Header, Options),
Db1 = bind_emsort(Db0, MetaFd, nil),
- {ok, Db1, DataFile, DataFd, MetaFd, nil}
- end.
+ #comp_st{
+ old_db = OldDb,
+ new_db = Db1,
+ meta_fd = MetaFd,
+ retry = nil
+ }
+ end,
+ unlink(DataFd),
+ erlang:monitor(process, MetaFd),
+ {ok, CompSt}.
open_compaction_file(FilePath) ->
@@ -1104,25 +1145,34 @@ reset_compaction_file(Fd, Header) ->
ok = couch_file:write_header(Fd, Header).
-copy_purge_info(OldDb, NewDb) ->
+copy_purge_info(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
OldHdr = OldDb#db.header,
NewHdr = NewDb#db.header,
OldPurgeSeq = couch_db_header:purge_seq(OldHdr),
- if OldPurgeSeq > 0 ->
+ NewPurgeSeq = couch_db_header:purge_seq(NewHdr),
+ if OldPurgeSeq > NewPurgeSeq ->
{ok, PurgedIdsRevs} = couch_db:get_last_purged(OldDb),
Opts = [{compression, NewDb#db.compression}],
{ok, Ptr, _} = couch_file:append_term(NewDb#db.fd, PurgedIdsRevs, Opts),
- NewNewHdr = couch_db_header:set(NewHdr, [
- {purge_seq, OldPurgeSeq},
- {purged_docs, Ptr}
- ]),
- NewDb#db{header = NewNewHdr};
+ CompSt#comp_st{
+ new_db = NewDb#db{
+ header = couch_db_header:set(NewHdr, [
+ {purge_seq, OldPurgeSeq},
+ {purged_docs, Ptr}
+ ])
+ }
+ };
true ->
- NewDb
+ CompSt
end.
-copy_compact(Db, NewDb0, Retry) ->
+copy_compact(#comp_st{} = CompSt) ->
+ #comp_st{
+ old_db = Db,
+ new_db = NewDb0,
+ retry = Retry
+ } = CompSt,
Compression = couch_compress:get_compression_method(),
NewDb = NewDb0#db{compression=Compression},
TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
@@ -1160,12 +1210,13 @@ copy_compact(Db, NewDb0, Retry) ->
TaskProps0 = [
{type, database_compaction},
+ {retry, (Retry /= nil)},
{database, Db#db.name},
{progress, 0},
{changes_done, 0},
{total_changes, TotalChanges}
],
- case (Retry =/= nil) and couch_task_status:is_task_added() of
+ case (Retry /= nil) and couch_task_status:is_task_added() of
true ->
couch_task_status:update([
{retry, true},
@@ -1195,7 +1246,11 @@ copy_compact(Db, NewDb0, Retry) ->
NewDb4 = NewDb3
end,
- commit_compaction_data(NewDb4#db{update_seq=Db#db.update_seq}).
+ CompSt#comp_st{
+ new_db = NewDb4#db{
+ update_seq = Db#db.update_seq
+ }
+ }.
copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
@@ -1310,10 +1365,15 @@ copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) ->
{BodyData, NewBinInfos}.
-commit_compaction_data(#db{}=Db) ->
+commit_compaction_data(#comp_st{new_db = Db} = CompSt) ->
% Compaction needs to write headers to both the data file
% and the meta file so if we need to restart we can pick
% back up from where we left off.
+ CompSt#comp_st{
+ new_db = commit_compaction_data(Db)
+ };
+
+commit_compaction_data(#db{} = Db) ->
commit_compaction_data(Db, couch_emsort:get_fd(Db#db.id_tree)),
commit_compaction_data(Db, Db#db.fd).
@@ -1330,7 +1390,7 @@ commit_compaction_data(#db{header=OldHeader}=Db0, Fd) ->
Header = db_to_header(Db1, OldHeader),
CompHeader = #comp_header{
db_header = Header,
- meta_state = MetaState
+ meta_st = MetaState
},
ok = couch_file:sync(Fd),
ok = couch_file:write_header(Fd, CompHeader),
@@ -1359,12 +1419,20 @@ bind_id_tree(Db, Fd, State) ->
Db#db{id_tree=IdBtree}.
-sort_meta_data(Db0) ->
+sort_meta_data(#comp_st{new_db = Db0} = CompSt) ->
{ok, Ems} = couch_emsort:merge(Db0#db.id_tree),
- Db0#db{id_tree=Ems}.
+ CompSt#comp_st{
+ new_db = Db0#db{
+ id_tree = Ems
+ }
+ }.
-copy_meta_data(#db{fd=Fd, header=Header}=Db) ->
+copy_meta_data(#comp_st{new_db = Db} = CompSt) ->
+ #db{
+ fd = Fd,
+ header = Header
+ } = Db,
Src = Db#db.id_tree,
DstState = couch_db_header:id_tree_state(Header),
{ok, IdTree0} = couch_btree:open(DstState, Fd, [
@@ -1384,7 +1452,20 @@ copy_meta_data(#db{fd=Fd, header=Header}=Db) ->
{ok, SeqTree} = couch_btree:add_remove(
Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
),
- Db#db{id_tree=IdTree, seq_tree=SeqTree}.
+ CompSt#comp_st{
+ new_db = Db#db{
+ id_tree = IdTree,
+ seq_tree = SeqTree
+ }
+ }.
+
+
+compact_final_sync(#comp_st{new_db = NewDb0} = CompSt) ->
+ NewHdr = db_to_header(NewDb0, NewDb0#db.header),
+ NewDb1 = sync_header(NewDb0, NewHdr),
+ CompSt#comp_st{
+ new_db = NewDb1
+ }.
merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 ->
--
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.
[couchdb] 04/06: Add multi-append functions to couch_file
Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch compactor-optimize-emsort
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 55f8718a5e124e963541971078c6d431a5a9f088
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Sep 6 13:36:16 2017 -0500
Add multi-append functions to couch_file
These functions allow the caller to append multiple terms or binaries to
a file and receive the file position and size for each individual
element. This is to optimize throughput in situations where we want to
write multiple pieces of independant data.
---
src/couch/src/couch_file.erl | 144 ++++++++++++++++++++++++++++++++++++++-----
1 file changed, 130 insertions(+), 14 deletions(-)
diff --git a/src/couch/src/couch_file.erl b/src/couch/src/couch_file.erl
index acd4fda..a8fcc6c 100644
--- a/src/couch/src/couch_file.erl
+++ b/src/couch/src/couch_file.erl
@@ -42,6 +42,8 @@
-export([append_binary/2, append_binary_md5/2]).
-export([append_raw_chunk/2, assemble_file_chunk/1, assemble_file_chunk/2]).
-export([append_term/2, append_term/3, append_term_md5/2, append_term_md5/3]).
+-export([pread_terms/2, pread_binaries/2, pread_iolists/2]).
+-export([append_terms/2, append_terms/3, append_binaries/2]).
-export([write_header/2, read_header/1]).
-export([delete/2, delete/3, nuke_dir/2, init_delete_dir/1]).
-export([msec_since_last_read/1]).
@@ -119,6 +121,7 @@ append_term_md5(Fd, Term, Options) ->
Comp = couch_util:get_value(compression, Options, ?DEFAULT_COMPRESSION),
append_binary_md5(Fd, couch_compress:compress(Term, Comp)).
+
%%----------------------------------------------------------------------
%% Purpose: To append an Erlang binary to the end of the file.
%% Args: Erlang term to serialize and append to the file.
@@ -129,7 +132,7 @@ append_term_md5(Fd, Term, Options) ->
append_binary(Fd, Bin) ->
ioq:call(Fd, {append_bin, assemble_file_chunk(Bin)}, erlang:get(io_priority)).
-
+
append_binary_md5(Fd, Bin) ->
ioq:call(Fd,
{append_bin, assemble_file_chunk(Bin, crypto:hash(md5, Bin))},
@@ -172,21 +175,55 @@ pread_binary(Fd, Pos) ->
pread_iolist(Fd, Pos) ->
case ioq:call(Fd, {pread_iolist, Pos}, erlang:get(io_priority)) of
- {ok, IoList, <<>>} ->
- {ok, IoList};
- {ok, IoList, Md5} ->
- case crypto:hash(md5, IoList) of
- Md5 ->
- {ok, IoList};
- _ ->
- couch_log:emergency("File corruption in ~p at position ~B",
- [Fd, Pos]),
- exit({file_corruption, <<"file corruption">>})
- end;
- Error ->
- Error
+ {ok, IoList, Md5} ->
+ {ok, verify_md5(Fd, Pos, IoList, Md5)};
+ Error ->
+ Error
end.
+
+pread_terms(Fd, PosList) ->
+ {ok, Bins} = pread_binaries(Fd, PosList),
+ Terms = lists:map(fun(Bin) ->
+ couch_compress:decompress(Bin)
+ end, Bins),
+ {ok, Terms}.
+
+
+pread_binaries(Fd, PosList) ->
+ {ok, Data} = pread_iolists(Fd, PosList),
+ {ok, lists:map(fun erlang:iolist_to_binary/1, Data)}.
+
+
+pread_iolists(Fd, PosList) ->
+ case ioq:call(Fd, {pread_iolists, PosList}, erlang:get(io_priority)) of
+ {ok, DataMd5s} ->
+ Data = lists:zipwith(fun(Pos, {IoList, Md5}) ->
+ verify_md5(Fd, Pos, IoList, Md5)
+ end, PosList, DataMd5s),
+ {ok, Data};
+ Error ->
+ Error
+ end.
+
+
+append_terms(Fd, Terms) ->
+ append_terms(Fd, Terms, []).
+
+
+append_terms(Fd, Terms, Options) ->
+ Comp = couch_util:get_value(compression, Options, ?DEFAULT_COMPRESSION),
+ Bins = lists:map(fun(Term) ->
+ couch_compress:compress(Term, Comp)
+ end, Terms),
+ append_binaries(Fd, Bins).
+
+
+append_binaries(Fd, Bins) ->
+ WriteBins = lists:map(fun assemble_file_chunk/1, Bins),
+ ioq:call(Fd, {append_bins, WriteBins}, erlang:get(io_priority)).
+
+
%%----------------------------------------------------------------------
%% Purpose: The length of a file, in bytes.
%% Returns: {ok, Bytes}
@@ -448,6 +485,30 @@ handle_call({pread_iolist, Pos}, _From, File) ->
{reply, {ok, Iolist, <<>>}, File}
end;
+handle_call({pread_iolists, PosL}, _From, File) ->
+ update_read_timestamp(),
+ LocNums1 = [{Pos, 4} || Pos <- PosL],
+ DataSizes = read_multi_raw_iolists_int(File, LocNums1),
+ LocNums2 = lists:map(fun({LenIoList, NextPos}) ->
+ case iolist_to_binary(LenIoList) of
+ <<1:1/integer, Len:31/integer>> -> % an MD5-prefixed term
+ {NextPos, Len + 16};
+ <<0:1/integer, Len:31/integer>> ->
+ {NextPos, Len}
+ end
+ end, DataSizes),
+ Resps = read_multi_raw_iolists_int(File, LocNums2),
+ Extracted = lists:zipwith(fun({LenIoList, _}, {IoList, _}) ->
+ case iolist_to_binary(LenIoList) of
+ <<1:1/integer, _:31/integer>> ->
+ {Md5, IoList} = extract_md5(IoList),
+ {IoList, Md5};
+ <<0:1/integer, _:31/integer>> ->
+ {IoList, <<>>}
+ end
+ end, DataSizes, Resps),
+ {reply, {ok, Extracted}, File};
+
handle_call(bytes, _From, #file{fd = Fd} = File) ->
{reply, file:position(Fd, eof), File};
@@ -481,6 +542,20 @@ handle_call({append_bin, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
{reply, Error, reset_eof(File)}
end;
+handle_call({append_bins, Bins}, _From, #file{fd = Fd, eof = Pos} = File) ->
+ {BlockResps, FinalPos} = lists:mapfoldl(fun(Bin, PosAcc) ->
+ Blocks = make_blocks(PosAcc rem ?SIZE_BLOCK, Bin),
+ Size = iolist_size(Blocks),
+ {{Blocks, {PosAcc, Size}}, PosAcc + Size}
+ end, Pos, Bins),
+ {AllBlocks, Resps} = lists:unzip(BlockResps),
+ case file:write(Fd, AllBlocks) of
+ ok ->
+ {reply, {ok, Resps}, File#file{eof = FinalPos}};
+ Error ->
+ {reply, Error, reset_eof(File)}
+ end;
+
handle_call({write_header, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
BinSize = byte_size(Bin),
case Pos rem ?SIZE_BLOCK of
@@ -623,6 +698,31 @@ read_raw_iolist_int(#file{fd = Fd, pread_limit = Limit} = F, Pos, Len) ->
{remove_block_prefixes(BlockOffset, RawBin), Size}
end.
+
+read_multi_raw_iolists_int(#file{fd = Fd, pread_limit = Limit} = F, PosLens) ->
+ LocNums = lists:map(fun({Pos, Len}) ->
+ BlockOffset = Pos rem ?SIZE_BLOCK,
+ TotalBytes = calculate_total_read_len(BlockOffset, Len),
+ case Pos + TotalBytes of
+ Size when Size > F#file.eof ->
+ couch_stats:increment_counter([pread, exceed_eof]),
+ {_Fd, Filepath} = get(couch_file_fd),
+ throw({read_beyond_eof, Filepath});
+ Size when Size > Limit ->
+ couch_stats:increment_counter([pread, exceed_limit]),
+ {_Fd, Filepath} = get(couch_file_fd),
+ throw({exceed_pread_limit, Filepath, Limit});
+ _ ->
+ {Pos, TotalBytes}
+ end
+ end, PosLens),
+ {ok, Bins} = file:pread(Fd, LocNums),
+ lists:zipwith(fun({Pos, TotalBytes}, Bin) ->
+ <<RawBin:TotalBytes/binary>> = Bin,
+ {remove_block_prefixes(Pos rem ?SIZE_BLOCK, RawBin), Pos + TotalBytes}
+ end, LocNums, Bins).
+
+
-spec extract_md5(iolist()) -> {binary(), iolist()}.
extract_md5(FullIoList) ->
{Md5List, IoList} = split_iolist(FullIoList, 16, []),
@@ -691,6 +791,22 @@ split_iolist([Byte | Rest], SplitAt, BeginAcc) when is_integer(Byte) ->
split_iolist(Rest, SplitAt - 1, [Byte | BeginAcc]).
+
+verify_md5(_Fd, _Pos, IoList, <<>>) ->
+ IoList;
+
+verify_md5(Fd, Pos, IoList, Md5) ->
+ case crypto:hash(md5, IoList) of
+ Md5 -> IoList;
+ _ -> report_md5_error(Fd, Pos)
+ end.
+
+
+report_md5_error(Fd, Pos) ->
+ couch_log:emergency("File corruption in ~p at position ~B", [Fd, Pos]),
+ exit({file_corruption, <<"file corruption">>}).
+
+
% System dbs aren't monitored by couch_stats_process_tracker
is_idle(#file{is_sys=true}) ->
case process_info(self(), monitored_by) of
--
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.