You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ra...@apache.org on 2011/11/18 09:56:57 UTC
git commit: Asynchronous file writes
Updated Branches:
refs/heads/COUCHDB-1342 c2fa181d3 -> 8d698f571
Asynchronous file writes
This change updates the file module so that it can do
asynchronous writes. Basically it replies immediately
to process asking to write something to the file, with
the position where the chunks will be written to the
file, while a dedicated child process keeps collecting
chunks and write them to the file (and batching them
when possible). After issuing a series of write request
to the file module, the caller can call its 'flush'
function which will block the caller until all the
chunks it requested to write are effectively written
to the file.
This maximizes the IO subsystem, as for example, while
the updater is traversing and modifying the btrees and
doing CPU bound tasks, the writes are happening in
parallel.
Originally described at http://s.apache.org/TVu
Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/8d698f57
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/8d698f57
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/8d698f57
Branch: refs/heads/COUCHDB-1342
Commit: 8d698f5713b4883686a71d830a896b0a14ef40e5
Parents: c2fa181
Author: Filipe David Borba Manana <fd...@apache.org>
Authored: Sat Nov 5 13:14:13 2011 +0000
Committer: Randall Leeds <ra...@apache.org>
Committed: Fri Nov 18 00:56:39 2011 -0800
----------------------------------------------------------------------
src/couch_mrview/src/couch_mrview_index.erl | 3 +-
src/couch_mrview/src/couch_mrview_updater.erl | 8 +-
src/couch_mrview/src/couch_mrview_util.erl | 1 +
src/couchdb/couch_db.erl | 36 +-
src/couchdb/couch_db.hrl | 1 -
src/couchdb/couch_db_updater.erl | 54 +--
src/couchdb/couch_file.erl | 425 ++++++++++++++------
test/etap/010-file-basics.t | 6 +
test/etap/011-file-headers.t | 7 +
test/etap/020-btree-basics.t | 21 +-
test/etap/021-btree-reductions.t | 1 +
test/etap/050-stream.t | 1 +
test/etap/200-view-group-no-db-leaks.t | 1 -
test/etap/201-view-group-shutdown.t | 1 -
test/etap/240-replication-compact.t | 1 -
15 files changed, 385 insertions(+), 182 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/src/couch_mrview/src/couch_mrview_index.erl
----------------------------------------------------------------------
diff --git a/src/couch_mrview/src/couch_mrview_index.erl b/src/couch_mrview/src/couch_mrview_index.erl
index a604651..21d38d1 100644
--- a/src/couch_mrview/src/couch_mrview_index.erl
+++ b/src/couch_mrview/src/couch_mrview_index.erl
@@ -128,7 +128,8 @@ finish_update(State) ->
commit(State) ->
Header = {State#mrst.sig, couch_mrview_util:make_header(State)},
- couch_file:write_header(State#mrst.fd, Header).
+ ok = couch_file:write_header(State#mrst.fd, Header),
+ ok = couch_file:flush(State#mrst.fd).
compact(Db, State, Opts) ->
http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/src/couch_mrview/src/couch_mrview_updater.erl
----------------------------------------------------------------------
diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl
index 3014664..ef6f2e7 100644
--- a/src/couch_mrview/src/couch_mrview_updater.erl
+++ b/src/couch_mrview/src/couch_mrview_updater.erl
@@ -55,7 +55,8 @@ start_update(Partial, State, NumChanges) ->
purge(_Db, PurgeSeq, PurgedIdRevs, State) ->
#mrst{
id_btree=IdBtree,
- views=Views
+ views=Views,
+ fd=Fd
} = State,
Ids = [Id || {Id, _Revs} <- PurgedIdRevs],
@@ -87,6 +88,7 @@ purge(_Db, PurgeSeq, PurgedIdRevs, State) ->
end,
Views2 = lists:map(RemKeysFun, Views),
+ ok = couch_file:flush(Fd),
{ok, State#mrst{
id_btree=IdBtree2,
views=Views2,
@@ -229,7 +231,8 @@ insert_results(DocId, [KVs | RKVs], [{Id, VKVs} | RVKVs], VKVAcc, VIdKeys) ->
write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys) ->
#mrst{
id_btree=IdBtree,
- first_build=FirstBuild
+ first_build=FirstBuild,
+ fd=Fd
} = State,
{ok, ToRemove, IdBtree2} = update_id_btree(IdBtree, DocIdKeys, FirstBuild),
@@ -245,6 +248,7 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys) ->
View#mrview{btree=VBtree2, update_seq=NewUpdateSeq}
end,
+ ok = couch_file:flush(Fd),
State#mrst{
views=lists:zipwith(UpdateView, State#mrst.views, ViewKVs),
update_seq=UpdateSeq,
http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/src/couch_mrview/src/couch_mrview_util.erl
----------------------------------------------------------------------
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl
index a13b154..065a2cd 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -546,6 +546,7 @@ delete_file(FName) ->
reset_index(Db, Fd, #mrst{sig=Sig}=State) ->
ok = couch_file:truncate(Fd, 0),
ok = couch_file:write_header(Fd, {Sig, nil}),
+ ok = couch_file:flush(Fd),
init_state(Db, Fd, reset_state(State), nil).
http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/src/couchdb/couch_db.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 0656b52..ca6424d 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -232,7 +232,7 @@ get_full_doc_info(Db, Id) ->
Result.
get_full_doc_infos(Db, Ids) ->
- couch_btree:lookup(by_id_btree(Db), Ids).
+ couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids).
increment_update_seq(#db{update_pid=UpdatePid}) ->
gen_server:call(UpdatePid, increment_update_seq).
@@ -267,7 +267,7 @@ get_db_info(Db) ->
local_docs_btree = LocalBtree
} = Db,
{ok, Size} = couch_file:bytes(Fd),
- {ok, DbReduction} = couch_btree:full_reduce(by_id_btree(Db)),
+ {ok, DbReduction} = couch_btree:full_reduce(IdBtree),
InfoList = [
{db_name, Name},
{doc_count, element(1, DbReduction)},
@@ -301,7 +301,7 @@ sum_tree_sizes(Acc, [T | Rest]) ->
sum_tree_sizes(Acc + Sz, Rest)
end.
-get_design_docs(Db) ->
+get_design_docs(#db{fulldocinfo_by_id_btree = IdBtree} = Db) ->
FoldFun = skip_deleted(fun
(#full_doc_info{deleted = true}, _Reds, Acc) ->
{ok, Acc};
@@ -311,7 +311,7 @@ get_design_docs(Db) ->
{stop, Acc}
end),
KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}],
- {ok, _, Docs} = couch_btree:fold(by_id_btree(Db), FoldFun, [], KeyOpts),
+ {ok, _, Docs} = couch_btree:fold(IdBtree, FoldFun, [], KeyOpts),
Docs.
check_is_admin(#db{user_ctx=#user_ctx{name=Name,roles=Roles}}=Db) ->
@@ -702,7 +702,7 @@ update_docs(Db, Docs, Options, replicated_changes) ->
DocErrors = [],
DocBuckets3 = DocBuckets
end,
- DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.updater_fd)
+ DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.fd)
|| Doc <- Bucket] || Bucket <- DocBuckets3],
{ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]),
{ok, DocErrors};
@@ -758,7 +758,7 @@ update_docs(Db, Docs, Options, interactive_edit) ->
true -> [] end ++ Options,
DocBuckets3 = [[
doc_flush_atts(set_new_att_revpos(
- check_dup_atts(Doc)), Db#db.updater_fd)
+ check_dup_atts(Doc)), Db#db.fd)
|| Doc <- B] || B <- DocBuckets2],
{DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),
@@ -832,7 +832,7 @@ write_and_commit(#db{update_pid=UpdatePid}=Db, DocBuckets1,
% compaction. Retry by reopening the db and writing to the current file
{ok, Db2} = open_ref_counted(Db#db.main_pid, self()),
DocBuckets2 = [
- [doc_flush_atts(Doc, Db2#db.updater_fd) || Doc <- Bucket] ||
+ [doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] ||
Bucket <- DocBuckets1
],
% We only retry once
@@ -1037,12 +1037,12 @@ changes_since(Db, StartSeq, Fun, Acc) ->
changes_since(Db, StartSeq, Fun, Options, Acc) ->
Wrapper = fun(DocInfo, _Offset, Acc2) -> Fun(DocInfo, Acc2) end,
- {ok, _LastReduction, AccOut} = couch_btree:fold(by_seq_btree(Db),
+ {ok, _LastReduction, AccOut} = couch_btree:fold(Db#db.docinfo_by_seq_btree,
Wrapper, Acc, [{start_key, StartSeq + 1}] ++ Options),
{ok, AccOut}.
count_changes_since(Db, SinceSeq) ->
- BTree = by_seq_btree(Db),
+ BTree = Db#db.docinfo_by_seq_btree,
{ok, Changes} =
couch_btree:fold_reduce(BTree,
fun(_SeqStart, PartialReds, 0) ->
@@ -1053,13 +1053,14 @@ count_changes_since(Db, SinceSeq) ->
enum_docs_since(Db, SinceSeq, InFun, Acc, Options) ->
{ok, LastReduction, AccOut} = couch_btree:fold(
- by_seq_btree(Db), InFun, Acc, [{start_key, SinceSeq + 1} | Options]),
+ Db#db.docinfo_by_seq_btree, InFun, Acc,
+ [{start_key, SinceSeq + 1} | Options]),
{ok, enum_docs_since_reduce_to_count(LastReduction), AccOut}.
enum_docs(Db, InFun, InAcc, Options) ->
FoldFun = skip_deleted(InFun),
{ok, LastReduce, OutAcc} = couch_btree:fold(
- by_id_btree(Db), FoldFun, InAcc, Options),
+ Db#db.fulldocinfo_by_id_btree, FoldFun, InAcc, Options),
{ok, enum_docs_reduce_to_count(LastReduce), OutAcc}.
% server functions
@@ -1161,7 +1162,7 @@ open_doc_revs_int(Db, IdRevs, Options) ->
IdRevs, LookupResults).
open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, Options) ->
- case couch_btree:lookup(local_btree(Db), [Id]) of
+ case couch_btree:lookup(Db#db.local_docs_btree, [Id]) of
[{ok, {_, {Rev, BodyData}}}] ->
Doc = #doc{id=Id, revs={0, [?l2b(integer_to_list(Rev))]}, body=BodyData},
apply_open_options({ok, Doc}, Options);
@@ -1232,7 +1233,7 @@ read_doc(#db{fd=Fd}, Pos) ->
couch_file:pread_term(Fd, Pos).
-make_doc(#db{updater_fd = Fd} = Db, Id, Deleted, Bp, RevisionPath) ->
+make_doc(#db{fd = Fd} = Db, Id, Deleted, Bp, RevisionPath) ->
{BodyData, Atts} =
case Bp of
nil ->
@@ -1303,15 +1304,6 @@ increment_stat(#db{options = Options}, Stat) ->
couch_stats_collector:increment(Stat)
end.
-local_btree(#db{local_docs_btree = BTree, fd = ReaderFd}) ->
- BTree#btree{fd = ReaderFd}.
-
-by_seq_btree(#db{docinfo_by_seq_btree = BTree, fd = ReaderFd}) ->
- BTree#btree{fd = ReaderFd}.
-
-by_id_btree(#db{fulldocinfo_by_id_btree = BTree, fd = ReaderFd}) ->
- BTree#btree{fd = ReaderFd}.
-
skip_deleted(FoldFun) ->
fun
(visit, KV, Reds, Acc) ->
http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/src/couchdb/couch_db.hrl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl
index cc97351..d14d5b3 100644
--- a/src/couchdb/couch_db.hrl
+++ b/src/couchdb/couch_db.hrl
@@ -170,7 +170,6 @@
compactor_pid = nil,
instance_start_time, % number of microsecs since jan 1 1970 as a binary string
fd,
- updater_fd,
fd_ref_counter,
header = #db_header{},
committed_update_seq,
http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/src/couchdb/couch_db_updater.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
index e15a944..03a04aa 100644
--- a/src/couchdb/couch_db_updater.erl
+++ b/src/couchdb/couch_db_updater.erl
@@ -42,14 +42,12 @@ init({MainPid, DbName, Filepath, Fd, Options}) ->
file:delete(Filepath ++ ".compact")
end
end,
- ReaderFd = open_reader_fd(Filepath, Options),
- Db = init_db(DbName, Filepath, Fd, ReaderFd, Header, Options),
+ Db = init_db(DbName, Filepath, Fd, Header, Options),
Db2 = refresh_validate_doc_funs(Db),
{ok, Db2#db{main_pid = MainPid}}.
terminate(_Reason, Db) ->
- ok = couch_file:close(Db#db.updater_fd),
ok = couch_file:close(Db#db.fd),
couch_util:shutdown_sync(Db#db.compactor_pid),
couch_util:shutdown_sync(Db#db.fd_ref_counter),
@@ -69,7 +67,7 @@ handle_call(increment_update_seq, _From, Db) ->
handle_call({set_security, NewSec}, _From, #db{compression = Comp} = Db) ->
{ok, Ptr, _} = couch_file:append_term(
- Db#db.updater_fd, NewSec, [{compression, Comp}]),
+ Db#db.fd, NewSec, [{compression, Comp}]),
Db2 = commit_data(Db#db{security=NewSec, security_ptr=Ptr,
update_seq=Db#db.update_seq+1}),
ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
@@ -86,7 +84,7 @@ handle_call({purge_docs, _IdRevs}, _From,
{reply, {error, purge_during_compaction}, Db};
handle_call({purge_docs, IdRevs}, _From, Db) ->
#db{
- updater_fd = Fd,
+ fd = Fd,
fulldocinfo_by_id_btree = DocInfoByIdBTree,
docinfo_by_seq_btree = DocInfoBySeqBTree,
update_seq = LastSeq,
@@ -175,10 +173,9 @@ handle_call(cancel_compact, _From, #db{compactor_pid = Pid} = Db) ->
handle_call({compact_done, CompactFilepath}, _From, #db{filepath=Filepath}=Db) ->
{ok, NewFd} = couch_file:open(CompactFilepath),
- ReaderFd = open_reader_fd(CompactFilepath, Db#db.options),
{ok, NewHeader} = couch_file:read_header(NewFd),
#db{update_seq=NewSeq} = NewDb =
- init_db(Db#db.name, Filepath, NewFd, ReaderFd, NewHeader, Db#db.options),
+ init_db(Db#db.name, Filepath, NewFd, NewHeader, Db#db.options),
unlink(NewFd),
case Db#db.update_seq == NewSeq of
true ->
@@ -414,7 +411,7 @@ simple_upgrade_record(Old, _New) ->
-define(OLD_DISK_VERSION_ERROR,
"Database files from versions smaller than 0.10.0 are no longer supported").
-init_db(DbName, Filepath, Fd, ReaderFd, Header0, Options) ->
+init_db(DbName, Filepath, Fd, Header0, Options) ->
Header1 = simple_upgrade_record(Header0, #db_header{}),
Header =
case element(2, Header1) of
@@ -461,11 +458,10 @@ init_db(DbName, Filepath, Fd, ReaderFd, Header0, Options) ->
{MegaSecs, Secs, MicroSecs} = now(),
StartTime = ?l2b(io_lib:format("~p",
[(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])),
- {ok, RefCntr} = couch_ref_counter:start([Fd, ReaderFd]),
+ {ok, RefCntr} = couch_ref_counter:start([Fd]),
#db{
update_pid=self(),
- fd = ReaderFd,
- updater_fd = Fd,
+ fd = Fd,
fd_ref_counter = RefCntr,
header=Header,
fulldocinfo_by_id_btree = IdBtree,
@@ -484,15 +480,6 @@ init_db(DbName, Filepath, Fd, ReaderFd, Header0, Options) ->
compression = Compression
}.
-open_reader_fd(Filepath, Options) ->
- {ok, Fd} = case lists:member(sys_db, Options) of
- true ->
- couch_file:open(Filepath, [read_only, sys_db]);
- false ->
- couch_file:open(Filepath, [read_only])
- end,
- unlink(Fd),
- Fd.
close_db(#db{fd_ref_counter = RefCntr}) ->
couch_ref_counter:drop(RefCntr).
@@ -515,7 +502,7 @@ refresh_validate_doc_funs(Db) ->
flush_trees(_Db, [], AccFlushedTrees) ->
{ok, lists:reverse(AccFlushedTrees)};
-flush_trees(#db{updater_fd = Fd} = Db,
+flush_trees(#db{fd = Fd} = Db,
[InfoUnflushed | RestUnflushed], AccFlushed) ->
#full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed,
{Flushed, LeafsSize} = couch_key_tree:mapfold(
@@ -672,7 +659,8 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
fulldocinfo_by_id_btree = DocInfoByIdBTree,
docinfo_by_seq_btree = DocInfoBySeqBTree,
update_seq = LastSeq,
- revs_limit = RevsLimit
+ revs_limit = RevsLimit,
+ fd = Fd
} = Db,
Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList],
% lookup up the old documents, if they exist.
@@ -707,6 +695,7 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
fulldocinfo_by_id_btree = DocInfoByIdBTree2,
docinfo_by_seq_btree = DocInfoBySeqBTree2,
update_seq = NewSeq},
+ couch_file:flush(Fd),
% Check if we just updated any design documents, and update the validation
% funs if we did.
@@ -782,8 +771,7 @@ commit_data(Db, true) ->
Db;
commit_data(Db, _) ->
#db{
- updater_fd = Fd,
- filepath = Filepath,
+ fd = Fd,
header = OldHeader,
fsync_options = FsyncOptions,
waiting_delayed_commit = Timer
@@ -794,14 +782,15 @@ commit_data(Db, _) ->
Db#db{waiting_delayed_commit=nil};
Header ->
case lists:member(before_header, FsyncOptions) of
- true -> ok = couch_file:sync(Filepath);
+ true -> ok = couch_file:sync(Fd);
_ -> ok
end,
ok = couch_file:write_header(Fd, Header),
+ ok = couch_file:flush(Fd),
case lists:member(after_header, FsyncOptions) of
- true -> ok = couch_file:sync(Filepath);
+ true -> ok = couch_file:sync(Fd);
_ -> ok
end,
@@ -811,7 +800,7 @@ commit_data(Db, _) ->
end.
-copy_doc_attachments(#db{updater_fd = SrcFd} = SrcDb, SrcSp, DestFd) ->
+copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) ->
{ok, {BodyData, BinInfos0}} = couch_db:read_doc(SrcDb, SrcSp),
BinInfos = case BinInfos0 of
_ when is_binary(BinInfos0) ->
@@ -844,11 +833,12 @@ copy_doc_attachments(#db{updater_fd = SrcFd} = SrcDb, SrcSp, DestFd) ->
end, BinInfos),
{BodyData, NewBinInfos}.
-copy_docs(Db, #db{updater_fd = DestFd} = NewDb, InfoBySeq0, Retry) ->
+copy_docs(Db, #db{fd = DestFd} = NewDb, InfoBySeq0, Retry) ->
% COUCHDB-968, make sure we prune duplicates during compaction
InfoBySeq = lists:usort(fun(#doc_info{id=A}, #doc_info{id=B}) -> A =< B end,
InfoBySeq0),
Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
+ ok = couch_file:flush(DestFd),
LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),
NewFullDocInfos1 = lists:map(
@@ -898,6 +888,8 @@ copy_docs(Db, #db{updater_fd = DestFd} = NewDb, InfoBySeq0, Retry) ->
copy_compact(Db, NewDb0, Retry) ->
FsyncOptions = [Op || Op <- NewDb0#db.fsync_options, Op == before_header],
NewDb = NewDb0#db{fsync_options=FsyncOptions},
+ ok = couch_file:flush(Db#db.fd),
+ ok = couch_file:flush(NewDb#db.fd),
TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
BufferSize = list_to_integer(
couch_config:get("database_compaction", "doc_buffer_size", "524288")),
@@ -956,13 +948,14 @@ copy_compact(Db, NewDb0, Retry) ->
% copy misc header values
if NewDb3#db.security /= Db#db.security ->
{ok, Ptr, _} = couch_file:append_term(
- NewDb3#db.updater_fd, Db#db.security,
+ NewDb3#db.fd, Db#db.security,
[{compression, NewDb3#db.compression}]),
NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr};
true ->
NewDb4 = NewDb3
end,
+ ok = couch_file:flush(NewDb4#db.fd),
commit_data(NewDb4#db{update_seq=Db#db.update_seq}).
start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=PurgeSeq}}=Db) ->
@@ -982,8 +975,7 @@ start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=P
Retry = false,
ok = couch_file:write_header(Fd, Header=#db_header{})
end,
- ReaderFd = open_reader_fd(CompactFile, Db#db.options),
- NewDb = init_db(Name, CompactFile, Fd, ReaderFd, Header, Db#db.options),
+ NewDb = init_db(Name, CompactFile, Fd, Header, Db#db.options),
NewDb2 = if PurgeSeq > 0 ->
{ok, PurgedIdsRevs} = couch_db:get_last_purged(Db),
{ok, Pointer, _} = couch_file:append_term(
http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/src/couchdb/couch_file.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl
index 2c2f11a..253ee17 100644
--- a/src/couchdb/couch_file.erl
+++ b/src/couchdb/couch_file.erl
@@ -17,13 +17,20 @@
-define(SIZE_BLOCK, 4096).
+% Queue a max of 1 meg of data before blocking write callers. Prevents memory
+% blowup when streaming in large binaries, etc with slow disk IO.
+-define(MAX_QUEUED_BYTES, 1048576).
+
-record(file, {
- fd,
- eof = 0
+ reader = nil,
+ writer = nil,
+ eof = 0,
+ queued_write_bytes = 0,
+ blocked_writers = []
}).
% public API
--export([open/1, open/2, close/1, bytes/1, sync/1, truncate/2]).
+-export([open/1, open/2, close/1, bytes/1, flush/1, sync/1, truncate/2]).
-export([pread_term/2, pread_iolist/2, pread_binary/2]).
-export([append_binary/2, append_binary_md5/2]).
-export([append_raw_chunk/2, assemble_file_chunk/1, assemble_file_chunk/2]).
@@ -143,8 +150,8 @@ pread_binary(Fd, Pos) ->
pread_iolist(Fd, Pos) ->
case gen_server:call(Fd, {pread_iolist, Pos}, infinity) of
- {ok, IoList, <<>>} ->
- {ok, IoList};
+ {ok, _IoList} = Ok ->
+ Ok;
{ok, IoList, Md5} ->
case couch_util:md5(IoList) of
Md5 ->
@@ -181,13 +188,19 @@ truncate(Fd, Pos) ->
%% or {error, Reason}.
%%----------------------------------------------------------------------
-sync(Filepath) when is_list(Filepath) ->
- {ok, Fd} = file:open(Filepath, [append, raw]),
- try ok = file:sync(Fd) after ok = file:close(Fd) end;
sync(Fd) ->
gen_server:call(Fd, sync, infinity).
%%----------------------------------------------------------------------
+%% Purpose: Ensure that all the data the caller previously asked to write
+%% to the file were flushed to disk (not necessarily fsync'ed).
+%% Returns: ok
+%%----------------------------------------------------------------------
+
+flush(Fd) ->
+ gen_server:call(Fd, flush, infinity).
+
+%%----------------------------------------------------------------------
%% Purpose: Close the file.
%% Returns: ok
%%----------------------------------------------------------------------
@@ -247,6 +260,14 @@ read_header(Fd) ->
case gen_server:call(Fd, find_header, infinity) of
{ok, Bin} ->
{ok, binary_to_term(Bin)};
+ no_valid_header ->
+ flush(Fd),
+ case gen_server:call(Fd, find_header, infinity) of
+ {ok, Bin} ->
+ {ok, binary_to_term(Bin)};
+ Else ->
+ Else
+ end;
Else ->
Else
end.
@@ -256,7 +277,7 @@ write_header(Fd, Data) ->
Md5 = couch_util:md5(Bin),
% now we assemble the final header binary and write to disk
FinalBin = <<Md5/binary, Bin/binary>>,
- gen_server:call(Fd, {write_header, FinalBin}, infinity).
+ ok = gen_server:call(Fd, {write_header, FinalBin}, infinity).
@@ -268,12 +289,23 @@ init_status_error(ReturnPid, Ref, Error) ->
% server functions
init({Filepath, Options, ReturnPid, Ref}) ->
- process_flag(trap_exit, true),
- OpenOptions = file_open_options(Options),
+ try
+ maybe_create_file(Filepath, Options),
+ process_flag(trap_exit, true),
+ Reader = spawn_reader(Filepath),
+ {Writer, Eof} = spawn_writer(Filepath),
+ maybe_track_open_os_files(Options),
+ {ok, #file{reader = Reader, writer = Writer, eof = Eof}}
+ catch
+ throw:{error, Err} ->
+ init_status_error(ReturnPid, Ref, Err)
+ end.
+
+maybe_create_file(Filepath, Options) ->
case lists:member(create, Options) of
true ->
filelib:ensure_dir(Filepath),
- case file:open(Filepath, OpenOptions) of
+ case file:open(Filepath, [read, write, binary]) of
{ok, Fd} ->
{ok, Length} = file:position(Fd, eof),
case Length > 0 of
@@ -285,40 +317,19 @@ init({Filepath, Options, ReturnPid, Ref}) ->
true ->
{ok, 0} = file:position(Fd, 0),
ok = file:truncate(Fd),
- ok = file:sync(Fd),
- maybe_track_open_os_files(Options),
- {ok, #file{fd=Fd}};
+ ok = file:sync(Fd);
false ->
ok = file:close(Fd),
- init_status_error(ReturnPid, Ref, file_exists)
+ throw({error, file_exists})
end;
false ->
- maybe_track_open_os_files(Options),
- {ok, #file{fd=Fd}}
+ ok
end;
Error ->
- init_status_error(ReturnPid, Ref, Error)
+ throw({error, Error})
end;
false ->
- % open in read mode first, so we don't create the file if it doesn't exist.
- case file:open(Filepath, [read, raw]) of
- {ok, Fd_Read} ->
- {ok, Fd} = file:open(Filepath, OpenOptions),
- ok = file:close(Fd_Read),
- maybe_track_open_os_files(Options),
- {ok, Eof} = file:position(Fd, eof),
- {ok, #file{fd=Fd, eof=Eof}};
- Error ->
- init_status_error(ReturnPid, Ref, Error)
- end
- end.
-
-file_open_options(Options) ->
- [read, raw, binary] ++ case lists:member(read_only, Options) of
- true ->
- [];
- false ->
- [append]
+ ok
end.
maybe_track_open_os_files(FileOptions) ->
@@ -329,86 +340,110 @@ maybe_track_open_os_files(FileOptions) ->
couch_stats_collector:track_process_count({couchdb, open_os_files})
end.
-terminate(_Reason, #file{fd = Fd}) ->
- ok = file:close(Fd).
-
-
-handle_call({pread_iolist, Pos}, _From, File) ->
- {RawData, NextPos} = try
- % up to 8Kbs of read ahead
- read_raw_iolist_int(File, Pos, 2 * ?SIZE_BLOCK - (Pos rem ?SIZE_BLOCK))
- catch
- _:_ ->
- read_raw_iolist_int(File, Pos, 4)
- end,
- <<Prefix:1/integer, Len:31/integer, RestRawData/binary>> =
- iolist_to_binary(RawData),
- case Prefix of
- 1 ->
- {Md5, IoList} = extract_md5(
- maybe_read_more_iolist(RestRawData, 16 + Len, NextPos, File)),
- {reply, {ok, IoList, Md5}, File};
- 0 ->
- IoList = maybe_read_more_iolist(RestRawData, Len, NextPos, File),
- {reply, {ok, IoList, <<>>}, File}
- end;
-
-handle_call(bytes, _From, #file{fd = Fd} = File) ->
- {reply, file:position(Fd, eof), File};
-
-handle_call(sync, _From, #file{fd=Fd}=File) ->
- {reply, file:sync(Fd), File};
-
-handle_call({truncate, Pos}, _From, #file{fd=Fd}=File) ->
- {ok, Pos} = file:position(Fd, Pos),
- case file:truncate(Fd) of
- ok ->
- {reply, ok, File#file{eof = Pos}};
- Error ->
- {reply, Error, File}
- end;
-
-handle_call({append_bin, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
- Blocks = make_blocks(Pos rem ?SIZE_BLOCK, Bin),
- Size = iolist_size(Blocks),
- case file:write(Fd, Blocks) of
- ok ->
- {reply, {ok, Pos, Size}, File#file{eof = Pos + Size}};
- Error ->
- {reply, Error, File}
+terminate(_Reason, #file{reader = Reader, writer = Writer}) ->
+ couch_util:shutdown_sync(Reader),
+ couch_util:shutdown_sync(Writer).
+
+handle_call({pread_iolist, Pos}, From, #file{reader = Reader} = File) ->
+ Reader ! {read, Pos, From},
+ {noreply, File};
+
+handle_call(bytes, _From, #file{eof = Eof} = File) ->
+ {reply, {ok, Eof}, File};
+
+handle_call(sync, From, #file{writer = W} = File) ->
+ W ! {sync, From},
+ {noreply, File};
+
+handle_call({truncate, Pos}, _From, #file{writer = W} = File) ->
+ W ! {truncate, Pos, self()},
+ receive {W, truncated, Pos} -> ok end,
+ {reply, ok, File#file{eof = Pos}};
+
+handle_call({append_bin, Bin}, From, #file{writer = W, eof = Pos} = File) ->
+ if File#file.queued_write_bytes > ?MAX_QUEUED_BYTES ->
+ % We have too much data queued already. Put the writer on the blocked
+ % list so it's blocked until we write already queued data and free
+ % some memory.
+ BlockedWriters = [{From, Bin} | File#file.blocked_writers],
+ {noreply, File#file{blocked_writers = BlockedWriters}};
+ true ->
+ Bytes = iolist_size(Bin),
+ Size = calculate_total_read_len(Pos rem ?SIZE_BLOCK, Bytes),
+ gen_server:reply(From, {ok, Pos, Size}),
+ W ! {chunk, Bin},
+ NewFile = File#file{
+ eof = Pos + Size,
+ queued_write_bytes = File#file.queued_write_bytes + Bytes
+ },
+ {noreply, NewFile}
end;
-handle_call({write_header, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
- BinSize = byte_size(Bin),
- case Pos rem ?SIZE_BLOCK of
+handle_call({write_header, Bin}, From, #file{writer = W, eof = Pos} = File) ->
+ gen_server:reply(From, ok),
+ W ! {header, Bin},
+ Pos2 = case Pos rem ?SIZE_BLOCK of
0 ->
- Padding = <<>>;
+ Pos + 5;
BlockOffset ->
- Padding = <<0:(8*(?SIZE_BLOCK-BlockOffset))>>
+ Pos + 5 + (?SIZE_BLOCK - BlockOffset)
end,
- FinalBin = [Padding, <<1, BinSize:32/integer>> | make_blocks(5, [Bin])],
- case file:write(Fd, FinalBin) of
- ok ->
- {reply, ok, File#file{eof = Pos + iolist_size(FinalBin)}};
- Error ->
- {reply, Error, File}
- end;
+ File2 = File#file{
+ eof = Pos2 + calculate_total_read_len(5, byte_size(Bin))
+ },
+ {noreply, File2};
+
+handle_call(flush, From, #file{writer = W} = File) ->
+ W ! {flush, From},
+ {noreply, File};
-handle_call(find_header, _From, #file{fd = Fd, eof = Pos} = File) ->
- {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}.
+handle_call(find_header, From, #file{reader = Reader, eof = Eof} = File) ->
+ Reader ! {find_header, Eof, From},
+ {noreply, File}.
-handle_cast(close, Fd) ->
- {stop,normal,Fd}.
+handle_cast(close, File) ->
+ {stop,normal,File}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+handle_info({dequeued, Bytes}, File) ->
+ #file{
+ writer = Writer,
+ queued_write_bytes = QueuedBytes,
+ eof = Pos,
+ blocked_writers = Blocked
+ } = File,
+ {Blocked2, QueuedBytes2, Pos2} = enqueue_blocked_writes(
+ Writer, lists:reverse(Blocked), QueuedBytes - Bytes, Pos),
+ NewFile = File#file{
+ queued_write_bytes = QueuedBytes2,
+ eof = Pos2,
+ blocked_writers = Blocked2
+ },
+ {noreply, NewFile};
handle_info({'EXIT', _, normal}, Fd) ->
{noreply, Fd};
+handle_info({'EXIT', Pid, Reason}, #file{writer = Pid} = Fd) ->
+ {stop, {write_loop_died, Reason}, Fd};
+handle_info({'EXIT', Pid, Reason}, #file{reader = Pid} = Fd) ->
+ {stop, {read_loop_died, Reason}, Fd};
handle_info({'EXIT', _, Reason}, Fd) ->
{stop, Reason, Fd}.
+enqueue_blocked_writes(Writer, [{NextFrom, NextBin} | RestWriteReqs],
+ QueuedBytes, Pos) when QueuedBytes =< ?MAX_QUEUED_BYTES ->
+ Bytes = iolist_size(NextBin),
+ Size = calculate_total_read_len(Pos rem ?SIZE_BLOCK, Bytes),
+ gen_server:reply(NextFrom, {ok, Pos, Size}),
+ Writer ! {chunk, NextBin},
+ enqueue_blocked_writes(
+ Writer, RestWriteReqs, QueuedBytes + Bytes, Pos + Size);
+enqueue_blocked_writes(_Writer, RemainingWriteReqs, QueuedBytes, Pos) ->
+ {lists:reverse(RemainingWriteReqs), QueuedBytes, Pos}.
+
+
find_header(_Fd, -1) ->
no_valid_header;
find_header(Fd, Block) ->
@@ -441,19 +476,17 @@ maybe_read_more_iolist(Buffer, DataSize, _, _)
when DataSize =< byte_size(Buffer) ->
<<Data:DataSize/binary, _/binary>> = Buffer,
[Data];
-maybe_read_more_iolist(Buffer, DataSize, NextPos, File) ->
+maybe_read_more_iolist(Buffer, DataSize, NextPos, Fd) ->
{Missing, _} =
- read_raw_iolist_int(File, NextPos, DataSize - byte_size(Buffer)),
+ read_raw_iolist_int(Fd, NextPos, DataSize - byte_size(Buffer)),
[Buffer, Missing].
--spec read_raw_iolist_int(#file{}, Pos::non_neg_integer(), Len::non_neg_integer()) ->
- {Data::iolist(), CurPos::non_neg_integer()}.
-read_raw_iolist_int(Fd, {Pos, _Size}, Len) -> % 0110 UPGRADE CODE
- read_raw_iolist_int(Fd, Pos, Len);
-read_raw_iolist_int(#file{fd = Fd}, Pos, Len) ->
+read_raw_iolist_int(ReadFd, {Pos, _Size}, Len) -> % 0110 UPGRADE CODE
+ read_raw_iolist_int(ReadFd, Pos, Len);
+read_raw_iolist_int(ReadFd, Pos, Len) ->
BlockOffset = Pos rem ?SIZE_BLOCK,
TotalBytes = calculate_total_read_len(BlockOffset, Len),
- {ok, <<RawBin:TotalBytes/binary>>} = file:pread(Fd, Pos, TotalBytes),
+ {ok, <<RawBin:TotalBytes/binary>>} = file:pread(ReadFd, Pos, TotalBytes),
{remove_block_prefixes(BlockOffset, RawBin), Pos + TotalBytes}.
-spec extract_md5(iolist()) -> {binary(), iolist()}.
@@ -487,16 +520,24 @@ remove_block_prefixes(BlockOffset, Bin) ->
[Bin]
end.
-make_blocks(_BlockOffset, []) ->
- [];
make_blocks(0, IoList) ->
- [<<0>> | make_blocks(1, IoList)];
+ case iolist_size(IoList) of
+ 0 ->
+ [];
+ _ ->
+ [<<0>> | make_blocks(1, IoList)]
+ end;
make_blocks(BlockOffset, IoList) ->
- case split_iolist(IoList, (?SIZE_BLOCK - BlockOffset), []) of
- {Begin, End} ->
- [Begin | make_blocks(0, End)];
- _SplitRemaining ->
- IoList
+ case iolist_size(IoList) of
+ 0 ->
+ [];
+ _ ->
+ case split_iolist(IoList, (?SIZE_BLOCK - BlockOffset), []) of
+ {Begin, End} ->
+ [Begin | make_blocks(0, End)];
+ _SplitRemaining ->
+ IoList
+ end
end.
%% @doc Returns a tuple where the first element contains the leading SplitAt
@@ -522,3 +563,153 @@ split_iolist([Sublist| Rest], SplitAt, BeginAcc) when is_list(Sublist) ->
end;
split_iolist([Byte | Rest], SplitAt, BeginAcc) when is_integer(Byte) ->
split_iolist(Rest, SplitAt - 1, [Byte | BeginAcc]).
+
+
+spawn_writer(Filepath) ->
+ Parent = self(),
+ Pid = spawn_link(fun() ->
+ case file:open(Filepath, [binary, append, raw]) of
+ {ok, Fd} ->
+ {ok, Eof} = file:position(Fd, eof),
+ Parent ! {self(), {ok, Eof}},
+ writer_loop(Parent, Fd, Eof);
+ Error ->
+ Parent ! {self(), Error}
+ end
+ end),
+ receive
+ {Pid, {ok, Eof}} ->
+ {Pid, Eof};
+ {Pid, Error} ->
+ throw({error, Error})
+ end.
+
+
+spawn_reader(Filepath) ->
+ Parent = self(),
+ Pid = spawn_link(fun() ->
+ case file:open(Filepath, [binary, read, raw]) of
+ {ok, Fd} ->
+ Parent ! {self(), ok},
+ reader_loop(Fd);
+ Error ->
+ Parent ! {self(), Error}
+ end
+ end),
+ receive
+ {Pid, ok} ->
+ Pid;
+ {Pid, Error} ->
+ throw({error, Error})
+ end.
+
+
+writer_loop(Parent, Fd, Eof) ->
+ receive
+ {chunk, Chunk} ->
+ writer_collect_chunks(Parent, Fd, Eof, [Chunk]);
+ {header, Header} ->
+ Eof2 = write_header_blocks(Fd, Eof, Header),
+ writer_loop(Parent, Fd, Eof2);
+ {truncate, Pos, From} ->
+ {ok, Pos} = file:position(Fd, Pos),
+ ok = file:truncate(Fd),
+ From ! {self(), truncated, Pos},
+ writer_loop(Parent, Fd, Pos);
+ {flush, From} ->
+ gen_server:reply(From, ok),
+ writer_loop(Parent, Fd, Eof);
+ {sync, From} ->
+ ok = file:sync(Fd),
+ gen_server:reply(From, ok),
+ writer_loop(Parent, Fd, Eof);
+ stop ->
+ ok = file:close(Fd),
+ exit(done)
+ end.
+
+writer_collect_chunks(Parent, Fd, Eof, Acc) ->
+ receive
+ {chunk, Chunk} ->
+ writer_collect_chunks(Parent, Fd, Eof, [Chunk | Acc]);
+ {header, Header} ->
+ Eof2 = write_blocks(Parent, Fd, Eof, Acc),
+ Eof3 = write_header_blocks(Fd, Eof2, Header),
+ writer_loop(Parent, Fd, Eof3);
+ {truncate, Pos, From} ->
+ _ = write_blocks(Parent, Fd, Eof, Acc),
+ {ok, Pos} = file:position(Fd, Pos),
+ ok = file:truncate(Fd),
+ From ! {self(), truncated, Pos},
+ writer_loop(Parent, Fd, Pos);
+ {flush, From} ->
+ Eof2 = write_blocks(Parent, Fd, Eof, Acc),
+ gen_server:reply(From, ok),
+ writer_loop(Parent, Fd, Eof2);
+ {sync, From} ->
+ Eof2 = write_blocks(Parent, Fd, Eof, Acc),
+ ok = file:sync(Fd),
+ gen_server:reply(From, ok),
+ writer_loop(Parent, Fd, Eof2)
+ after 0 ->
+ Eof2 = write_blocks(Parent, Fd, Eof, Acc),
+ writer_loop(Parent, Fd, Eof2)
+ end.
+
+
+write_blocks(Parent, Fd, Eof, Data) ->
+ Blocks = make_blocks(Eof rem ?SIZE_BLOCK, lists:reverse(Data)),
+ ok = file:write(Fd, Blocks),
+ Parent ! {dequeued, iolist_size(Data)},
+ Eof + iolist_size(Blocks).
+
+write_header_blocks(Fd, Eof, Header) ->
+ case Eof rem ?SIZE_BLOCK of
+ 0 ->
+ Padding = <<>>;
+ BlockOffset ->
+ Padding = <<0:(8 * (?SIZE_BLOCK - BlockOffset))>>
+ end,
+ FinalHeader = [
+ Padding,
+ <<1, (byte_size(Header)):32/integer>> | make_blocks(5, [Header])
+ ],
+ ok = file:write(Fd, FinalHeader),
+ Eof + iolist_size(FinalHeader).
+
+
+reader_loop(Fd) ->
+ receive
+ {read, Pos, From} ->
+ read_iolist(Fd, Pos, From),
+ reader_loop(Fd);
+ {find_header, Eof, From} ->
+ gen_server:reply(From, find_header(Fd, Eof div ?SIZE_BLOCK)),
+ reader_loop(Fd);
+ stop ->
+ ok = file:close(Fd),
+ exit(done)
+ end.
+
+
+-compile({inline, [read_iolist/3]}).
+
+read_iolist(Fd, Pos, From) ->
+ {RawData, NextPos} = try
+ % up to 8Kbs of read ahead
+ read_raw_iolist_int(Fd, Pos, 2 * ?SIZE_BLOCK - (Pos rem ?SIZE_BLOCK))
+ catch
+ _:_ ->
+ read_raw_iolist_int(Fd, Pos, 4)
+ end,
+ <<Prefix:1/integer, Len:31/integer, RestRawData/binary>> =
+ iolist_to_binary(RawData),
+ case Prefix of
+ 1 ->
+ {Md5, IoList} = extract_md5(
+ maybe_read_more_iolist(RestRawData, 16 + Len, NextPos, Fd)),
+ gen_server:reply(From, {ok, IoList, Md5});
+ 0 ->
+ IoList = maybe_read_more_iolist(RestRawData, Len, NextPos, Fd),
+ gen_server:reply(From, {ok, IoList})
+ end.
http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/test/etap/010-file-basics.t
----------------------------------------------------------------------
diff --git a/test/etap/010-file-basics.t b/test/etap/010-file-basics.t
index fb1b29e..a1b9a98 100755
--- a/test/etap/010-file-basics.t
+++ b/test/etap/010-file-basics.t
@@ -57,6 +57,7 @@ test() ->
?etap_match(couch_file:append_binary(Fd, <<"fancy!">>), {ok, Size, _},
"Appending a binary returns the current file size."),
+ ok = couch_file:flush(Fd),
etap:is({ok, foo}, couch_file:pread_term(Fd, 0),
"Reading the first term returns what we wrote: foo"),
@@ -70,25 +71,30 @@ test() ->
),
{ok, BinPos, _} = couch_file:append_binary(Fd, <<131,100,0,3,102,111,111>>),
+ ok = couch_file:flush(Fd),
etap:is({ok, foo}, couch_file:pread_term(Fd, BinPos),
"Reading a term from a written binary term representation succeeds."),
BigBin = list_to_binary(lists:duplicate(100000, 0)),
{ok, BigBinPos, _} = couch_file:append_binary(Fd, BigBin),
+ ok = couch_file:flush(Fd),
etap:is({ok, BigBin}, couch_file:pread_binary(Fd, BigBinPos),
"Reading a large term from a written representation succeeds."),
ok = couch_file:write_header(Fd, hello),
+ ok = couch_file:flush(Fd),
etap:is({ok, hello}, couch_file:read_header(Fd),
"Reading a header succeeds."),
{ok, BigBinPos2, _} = couch_file:append_binary(Fd, BigBin),
+ ok = couch_file:flush(Fd),
etap:is({ok, BigBin}, couch_file:pread_binary(Fd, BigBinPos2),
"Reading a large term from a written representation succeeds 2."),
% append_binary == append_iolist?
% Possible bug in pread_iolist or iolist() -> append_binary
{ok, IOLPos, _} = couch_file:append_binary(Fd, ["foo", $m, <<"bam">>]),
+ ok = couch_file:flush(Fd),
{ok, IoList} = couch_file:pread_iolist(Fd, IOLPos),
etap:is(<<"foombam">>, iolist_to_binary(IoList),
"Reading an results in a binary form of the written iolist()"),
http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/test/etap/011-file-headers.t
----------------------------------------------------------------------
diff --git a/test/etap/011-file-headers.t b/test/etap/011-file-headers.t
index a26b032..82c028f 100755
--- a/test/etap/011-file-headers.t
+++ b/test/etap/011-file-headers.t
@@ -45,6 +45,7 @@ test() ->
etap:is_greater(Size1, 0,
"Writing a header allocates space in the file."),
+ ok = couch_file:flush(Fd),
etap:is({ok, {<<"some_data">>, 32}}, couch_file:read_header(Fd),
"Reading the header returns what we wrote."),
@@ -55,6 +56,7 @@ test() ->
etap:is_greater(Size2, Size1,
"Writing a second header allocates more space."),
+ ok = couch_file:flush(Fd),
etap:is({ok, [foo, <<"more">>]}, couch_file:read_header(Fd),
"Reading the second header does not return the first header."),
@@ -69,6 +71,7 @@ test() ->
"Rewriting the same second header returns the same second size."),
couch_file:write_header(Fd, erlang:make_tuple(5000, <<"CouchDB">>)),
+ ok = couch_file:flush(Fd),
etap:is(
couch_file:read_header(Fd),
{ok, erlang:make_tuple(5000, <<"CouchDB">>)},
@@ -82,6 +85,7 @@ test() ->
% Destroy the 0x1 byte that marks a header
check_header_recovery(fun(CouchFd, RawFd, Expect, HeaderPos) ->
+ ok = couch_file:flush(CouchFd),
etap:isnt(Expect, couch_file:read_header(CouchFd),
"Should return a different header before corruption."),
file:pwrite(RawFd, HeaderPos, <<0>>),
@@ -91,6 +95,7 @@ test() ->
% Corrupt the size.
check_header_recovery(fun(CouchFd, RawFd, Expect, HeaderPos) ->
+ ok = couch_file:flush(CouchFd),
etap:isnt(Expect, couch_file:read_header(CouchFd),
"Should return a different header before corruption."),
% +1 for 0x1 byte marker
@@ -101,6 +106,7 @@ test() ->
% Corrupt the MD5 signature
check_header_recovery(fun(CouchFd, RawFd, Expect, HeaderPos) ->
+ ok = couch_file:flush(CouchFd),
etap:isnt(Expect, couch_file:read_header(CouchFd),
"Should return a different header before corruption."),
% +5 = +1 for 0x1 byte and +4 for term size.
@@ -111,6 +117,7 @@ test() ->
% Corrupt the data
check_header_recovery(fun(CouchFd, RawFd, Expect, HeaderPos) ->
+ ok = couch_file:flush(CouchFd),
etap:isnt(Expect, couch_file:read_header(CouchFd),
"Should return a different header before corruption."),
% +21 = +1 for 0x1 byte, +4 for term size and +16 for MD5 sig
http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/test/etap/020-btree-basics.t
----------------------------------------------------------------------
diff --git a/test/etap/020-btree-basics.t b/test/etap/020-btree-basics.t
index 6886ee1..372b494 100755
--- a/test/etap/020-btree-basics.t
+++ b/test/etap/020-btree-basics.t
@@ -97,6 +97,7 @@ test_kvs(KeyValues) ->
"After removing all keys btree size is 0."),
{Btree4, _} = lists:foldl(fun(KV, {BtAcc, PrevSize}) ->
+ ok = couch_file:flush(Fd),
{ok, BtAcc2} = couch_btree:add_remove(BtAcc, [KV], []),
case couch_btree:size(BtAcc2) > PrevSize of
true ->
@@ -106,6 +107,7 @@ test_kvs(KeyValues) ->
end,
{BtAcc2, couch_btree:size(BtAcc2)}
end, {Btree3, couch_btree:size(Btree3)}, KeyValues),
+ ok = couch_file:flush(Fd),
etap:ok(test_btree(Btree4, KeyValues),
"Adding all keys one at a time returns a complete btree."),
@@ -113,6 +115,7 @@ test_kvs(KeyValues) ->
"Non empty btrees have a size > 0."),
{Btree5, _} = lists:foldl(fun({K, _}, {BtAcc, PrevSize}) ->
+ ok = couch_file:flush(Fd),
{ok, BtAcc2} = couch_btree:add_remove(BtAcc, [], [K]),
case couch_btree:size(BtAcc2) < PrevSize of
true ->
@@ -122,6 +125,8 @@ test_kvs(KeyValues) ->
end,
{BtAcc2, couch_btree:size(BtAcc2)}
end, {Btree4, couch_btree:size(Btree4)}, KeyValues),
+ ok = couch_file:flush(Fd),
+
etap:ok(test_btree(Btree5, []),
"Removing all keys one at a time returns an empty btree."),
etap:is(0, couch_btree:size(Btree5),
@@ -129,6 +134,7 @@ test_kvs(KeyValues) ->
KeyValuesRev = lists:reverse(KeyValues),
{Btree6, _} = lists:foldl(fun(KV, {BtAcc, PrevSize}) ->
+ ok = couch_file:flush(Fd),
{ok, BtAcc2} = couch_btree:add_remove(BtAcc, [KV], []),
case couch_btree:size(BtAcc2) > PrevSize of
true ->
@@ -170,14 +176,16 @@ test_kvs(KeyValues) ->
etap:is(couch_file:close(Fd), ok, "closing out"),
true.
-test_btree(Btree, KeyValues) ->
+test_btree(#btree{fd = Fd} = Btree, KeyValues) ->
+ ok = couch_file:flush(Fd),
ok = test_key_access(Btree, KeyValues),
ok = test_lookup_access(Btree, KeyValues),
ok = test_final_reductions(Btree, KeyValues),
ok = test_traversal_callbacks(Btree, KeyValues),
true.
-test_add_remove(Btree, OutKeyValues, RemainingKeyValues) ->
+test_add_remove(#btree{fd = Fd} = Btree, OutKeyValues, RemainingKeyValues) ->
+ ok = couch_file:flush(Fd),
Btree2 = lists:foldl(fun({K, _}, BtAcc) ->
{ok, BtAcc2} = couch_btree:add_remove(BtAcc, [], [K]),
BtAcc2
@@ -190,7 +198,7 @@ test_add_remove(Btree, OutKeyValues, RemainingKeyValues) ->
end, Btree2, OutKeyValues),
true = test_btree(Btree3, OutKeyValues ++ RemainingKeyValues).
-test_key_access(Btree, List) ->
+test_key_access(#btree{fd = Fd} = Btree, List) ->
FoldFun = fun(Element, {[HAcc|TAcc], Count}) ->
case Element == HAcc of
true -> {ok, {TAcc, Count + 1}};
@@ -199,18 +207,21 @@ test_key_access(Btree, List) ->
end,
Length = length(List),
Sorted = lists:sort(List),
+ ok = couch_file:flush(Fd),
{ok, _, {[], Length}} = couch_btree:foldl(Btree, FoldFun, {Sorted, 0}),
{ok, _, {[], Length}} = couch_btree:fold(Btree, FoldFun, {Sorted, 0}, [{dir, rev}]),
ok.
-test_lookup_access(Btree, KeyValues) ->
+test_lookup_access(#btree{fd = Fd} = Btree, KeyValues) ->
+ ok = couch_file:flush(Fd),
FoldFun = fun({Key, Value}, {Key, Value}) -> {stop, true} end,
lists:foreach(fun({Key, Value}) ->
[{ok, {Key, Value}}] = couch_btree:lookup(Btree, [Key]),
{ok, _, true} = couch_btree:foldl(Btree, FoldFun, {Key, Value}, [{start_key, Key}])
end, KeyValues).
-test_final_reductions(Btree, KeyValues) ->
+test_final_reductions(#btree{fd = Fd} = Btree, KeyValues) ->
+ ok = couch_file:flush(Fd),
KVLen = length(KeyValues),
FoldLFun = fun(_X, LeadingReds, Acc) ->
CountToStart = KVLen div 3 + Acc,
http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/test/etap/021-btree-reductions.t
----------------------------------------------------------------------
diff --git a/test/etap/021-btree-reductions.t b/test/etap/021-btree-reductions.t
index 331e49a..e597772 100755
--- a/test/etap/021-btree-reductions.t
+++ b/test/etap/021-btree-reductions.t
@@ -47,6 +47,7 @@ test()->
end, {"odd", []}, lists:seq(1, rows())),
{ok, Btree2} = couch_btree:add_remove(Btree, EvenOddKVs, []),
+ ok = couch_file:flush(Fd),
GroupFun = fun({K1, _}, {K2, _}) -> K1 == K2 end,
FoldFun = fun(GroupedKey, Unreduced, Acc) ->
http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/test/etap/050-stream.t
----------------------------------------------------------------------
diff --git a/test/etap/050-stream.t b/test/etap/050-stream.t
index de0dfad..622079a 100755
--- a/test/etap/050-stream.t
+++ b/test/etap/050-stream.t
@@ -26,6 +26,7 @@ main(_) ->
ok.
read_all(Fd, PosList) ->
+ ok = couch_file:flush(Fd),
Data = couch_stream:foldl(Fd, PosList, fun(Bin, Acc) -> [Bin, Acc] end, []),
iolist_to_binary(Data).
http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/test/etap/200-view-group-no-db-leaks.t
----------------------------------------------------------------------
diff --git a/test/etap/200-view-group-no-db-leaks.t b/test/etap/200-view-group-no-db-leaks.t
index 4586ff4..9b74e01 100755
--- a/test/etap/200-view-group-no-db-leaks.t
+++ b/test/etap/200-view-group-no-db-leaks.t
@@ -25,7 +25,6 @@
compactor_pid = nil,
instance_start_time, % number of microsecs since jan 1 1970 as a binary string
fd,
- updater_fd,
fd_ref_counter,
header = nil,
committed_update_seq,
http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/test/etap/201-view-group-shutdown.t
----------------------------------------------------------------------
diff --git a/test/etap/201-view-group-shutdown.t b/test/etap/201-view-group-shutdown.t
index 2fabef6..dfd218e 100755
--- a/test/etap/201-view-group-shutdown.t
+++ b/test/etap/201-view-group-shutdown.t
@@ -25,7 +25,6 @@
compactor_pid = nil,
instance_start_time, % number of microsecs since jan 1 1970 as a binary string
fd,
- updater_fd,
fd_ref_counter,
header = nil,
committed_update_seq,
http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/test/etap/240-replication-compact.t
----------------------------------------------------------------------
diff --git a/test/etap/240-replication-compact.t b/test/etap/240-replication-compact.t
index c8b265e..3e3d608 100755
--- a/test/etap/240-replication-compact.t
+++ b/test/etap/240-replication-compact.t
@@ -30,7 +30,6 @@
compactor_pid = nil,
instance_start_time, % number of microsecs since jan 1 1970 as a binary string
fd,
- updater_fd,
fd_ref_counter,
header = nil,
committed_update_seq,