You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/03/02 17:04:07 UTC
[1/5] couch commit: updated
refs/heads/COUCHDB-3287-pluggable-storage-engines to 0f4e1a7 [Forced Update!]
Repository: couchdb-couch
Updated Branches:
refs/heads/COUCHDB-3287-pluggable-storage-engines 09a4de61e -> 0f4e1a7fd (forced update)
Ensure deterministic revisions for attachments
This re-fixes a corner case when recreating a document with an
attachment in a single multipart request. Since we don't detect that we
need a new revision until after the document has been serialized we need
to be able to deserialize the body so that we can generate the same
revisions regardless of the contents of the database. If we don't do
this then we end up including information from the position of the
attachment on disk in the revision calculation which can introduce
branches in the revision tree.
I've left this as a separate commit from the pluggable storage engine
work so that its called out clearly for us to revisit.
COUCHDB-3255
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/0f4e1a7f
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/0f4e1a7f
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/0f4e1a7f
Branch: refs/heads/COUCHDB-3287-pluggable-storage-engines
Commit: 0f4e1a7fdb3a3816185d2c1cebf4439a4c359872
Parents: 414880e
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Feb 8 07:25:37 2017 -0600
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Thu Mar 2 11:03:55 2017 -0600
----------------------------------------------------------------------
src/couch_bt_engine.erl | 10 +++++++++-
src/couch_db.erl | 12 +-----------
src/couch_db_updater.erl | 12 +++++++++++-
3 files changed, 21 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/0f4e1a7f/src/couch_bt_engine.erl
----------------------------------------------------------------------
diff --git a/src/couch_bt_engine.erl b/src/couch_bt_engine.erl
index cf31b09..3b31341 100644
--- a/src/couch_bt_engine.erl
+++ b/src/couch_bt_engine.erl
@@ -324,7 +324,15 @@ serialize_doc(#st{} = St, #doc{} = Doc) ->
SummaryBin = ?term_to_bin({Body, Atts}),
Md5 = couch_crypto:hash(md5, SummaryBin),
Data = couch_file:assemble_file_chunk(SummaryBin, Md5),
- Doc#doc{body = Data}.
+ % TODO: This is a terrible hack to get around the issues
+ % in COUCHDB-3255. We'll need to come back and figure
+ % out a better approach to handling the case when we
+ % need to generate a new revision id after the doc
+ % has been serialized.
+ Doc#doc{
+ body = Data,
+ meta = [{comp_body, Body} | Doc#doc.meta]
+ }.
write_doc_body(St, #doc{} = Doc) ->
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/0f4e1a7f/src/couch_db.erl
----------------------------------------------------------------------
diff --git a/src/couch_db.erl b/src/couch_db.erl
index f5281a3..ca63a40 100644
--- a/src/couch_db.erl
+++ b/src/couch_db.erl
@@ -945,7 +945,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
-new_revid(#doc{body=Body0, revs={OldStart,OldRevs}, atts=Atts, deleted=Deleted}) ->
+new_revid(#doc{body=Body, revs={OldStart,OldRevs}, atts=Atts, deleted=Deleted}) ->
DigestedAtts = lists:foldl(fun(Att, Acc) ->
[N, T, M] = couch_att:fetch([name, type, md5], Att),
case M == <<>> of
@@ -953,16 +953,6 @@ new_revid(#doc{body=Body0, revs={OldStart,OldRevs}, atts=Atts, deleted=Deleted})
false -> [{N, T, M} | Acc]
end
end, [], Atts),
- Body = case Body0 of
- {summary, [_Len, _Md5, BodyAtts], _SizeInfo, _AttsFd} ->
- {CompBody, _CompAtts} = binary_to_term(BodyAtts),
- couch_compress:decompress(CompBody);
- {summary, [_Len, BodyAtts], _SizeInfo, _AttsFd} ->
- {CompBody, _CompAtts} = binary_to_term(BodyAtts),
- couch_compress:decompress(CompBody);
- Else ->
- Else
- end,
case DigestedAtts of
Atts2 when length(Atts) =/= length(Atts2) ->
% We must have old style non-md5 attachments
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/0f4e1a7f/src/couch_db_updater.erl
----------------------------------------------------------------------
diff --git a/src/couch_db_updater.erl b/src/couch_db_updater.erl
index cbd2e79..86a0300 100644
--- a/src/couch_db_updater.erl
+++ b/src/couch_db_updater.erl
@@ -520,7 +520,17 @@ merge_rev_tree(OldInfo, NewDoc, Client, Limit, false)
% Update the new doc based on revisions in OldInfo
#doc_info{revs=[WinningRev | _]} = couch_doc:to_doc_info(OldInfo),
#rev_info{rev={OldPos, OldRev}} = WinningRev,
- NewRevId = couch_db:new_revid(NewDoc#doc{revs={OldPos, [OldRev]}}),
+ Body = case couch_util:get_value(comp_body, NewDoc#doc.meta) of
+ CompBody when is_binary(CompBody) ->
+ couch_compress:decompress(CompBody);
+ _ ->
+ NewDoc#doc.body
+ end,
+ RevIdDoc = NewDoc#doc{
+ revs = {OldPos, [OldRev]},
+ body = Body
+ },
+ NewRevId = couch_db:new_revid(RevIdDoc),
NewDoc2 = NewDoc#doc{revs={OldPos + 1, [NewRevId, OldRev]}},
% Merge our modified new doc into the tree
[3/5] couch commit: updated
refs/heads/COUCHDB-3287-pluggable-storage-engines to 0f4e1a7
Posted by da...@apache.org.
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/e056ae9e/test/couch_stream_tests.erl
----------------------------------------------------------------------
diff --git a/test/couch_stream_tests.erl b/test/couch_stream_tests.erl
index 3d7bf09..901b4fd 100644
--- a/test/couch_stream_tests.erl
+++ b/test/couch_stream_tests.erl
@@ -14,10 +14,11 @@
-include_lib("couch/include/couch_eunit.hrl").
+-define(ENGINE, {couch_bt_engine_stream, {Fd, []}}).
setup() ->
{ok, Fd} = couch_file:open(?tempfile(), [create, overwrite]),
- {ok, Stream} = couch_stream:open(Fd),
+ {ok, Stream} = couch_stream:open(?ENGINE, []),
{Fd, Stream}.
teardown({Fd, _}) ->
@@ -61,7 +62,8 @@ should_write_empty_binary({_, Stream}) ->
should_return_file_pointers_on_close({_, Stream}) ->
couch_stream:write(Stream, <<"foodfoob">>),
- {Ptrs, _, _, _, _} = couch_stream:close(Stream),
+ {NewEngine, _, _, _, _} = couch_stream:close(Stream),
+ {ok, Ptrs} = couch_stream:to_disk_term(NewEngine),
?_assertEqual([{0, 8}], Ptrs).
should_return_stream_size_on_close({_, Stream}) ->
@@ -69,41 +71,43 @@ should_return_stream_size_on_close({_, Stream}) ->
{_, Length, _, _, _} = couch_stream:close(Stream),
?_assertEqual(8, Length).
-should_return_valid_pointers({Fd, Stream}) ->
+should_return_valid_pointers({_Fd, Stream}) ->
couch_stream:write(Stream, <<"foodfoob">>),
- {Ptrs, _, _, _, _} = couch_stream:close(Stream),
- ?_assertEqual(<<"foodfoob">>, read_all(Fd, Ptrs)).
+ {NewEngine, _, _, _, _} = couch_stream:close(Stream),
+ ?_assertEqual(<<"foodfoob">>, read_all(NewEngine)).
should_recall_last_pointer_position({Fd, Stream}) ->
couch_stream:write(Stream, <<"foodfoob">>),
{_, _, _, _, _} = couch_stream:close(Stream),
{ok, ExpPtr} = couch_file:bytes(Fd),
- {ok, Stream2} = couch_stream:open(Fd),
+ {ok, Stream2} = couch_stream:open(?ENGINE),
ZeroBits = <<0:(8 * 10)>>,
OneBits = <<1:(8 * 10)>>,
ok = couch_stream:write(Stream2, OneBits),
ok = couch_stream:write(Stream2, ZeroBits),
- {Ptrs, 20, _, _, _} = couch_stream:close(Stream2),
+ {NewEngine, 20, _, _, _} = couch_stream:close(Stream2),
+ {ok, Ptrs} = couch_stream:to_disk_term(NewEngine),
[{ExpPtr, 20}] = Ptrs,
AllBits = iolist_to_binary([OneBits, ZeroBits]),
- ?_assertEqual(AllBits, read_all(Fd, Ptrs)).
+ ?_assertEqual(AllBits, read_all(NewEngine)).
should_stream_more_with_4K_chunk_size({Fd, _}) ->
- {ok, Stream} = couch_stream:open(Fd, [{buffer_size, 4096}]),
+ {ok, Stream} = couch_stream:open(?ENGINE, [{buffer_size, 4096}]),
lists:foldl(
fun(_, Acc) ->
Data = <<"a1b2c">>,
couch_stream:write(Stream, Data),
[Data | Acc]
end, [], lists:seq(1, 1024)),
- ?_assertMatch({[{0, 4100}, {4106, 1020}], 5120, _, _, _},
- couch_stream:close(Stream)).
+ {NewEngine, Length, _, _, _} = couch_stream:close(Stream),
+ {ok, Ptrs} = couch_stream:to_disk_term(NewEngine),
+ ?_assertMatch({[{0, 4100}, {4106, 1020}], 5120}, {Ptrs, Length}).
should_stop_on_normal_exit_of_stream_opener({Fd, _}) ->
RunnerPid = self(),
OpenerPid = spawn(
fun() ->
- {ok, StreamPid} = couch_stream:open(Fd),
+ {ok, StreamPid} = couch_stream:open(?ENGINE),
RunnerPid ! {pid, StreamPid}
end),
StreamPid = receive
@@ -115,6 +119,6 @@ should_stop_on_normal_exit_of_stream_opener({Fd, _}) ->
?_assertNot(is_process_alive(StreamPid)).
-read_all(Fd, PosList) ->
- Data = couch_stream:foldl(Fd, PosList, fun(Bin, Acc) -> [Bin, Acc] end, []),
+read_all(Engine) ->
+ Data = couch_stream:foldl(Engine, fun(Bin, Acc) -> [Bin, Acc] end, []),
iolist_to_binary(Data).
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/e056ae9e/test/couchdb_compaction_daemon_tests.erl
----------------------------------------------------------------------
diff --git a/test/couchdb_compaction_daemon_tests.erl b/test/couchdb_compaction_daemon_tests.erl
index 6d423d9..28a157c 100644
--- a/test/couchdb_compaction_daemon_tests.erl
+++ b/test/couchdb_compaction_daemon_tests.erl
@@ -242,7 +242,7 @@ spawn_compaction_monitor(DbName) ->
1,
couch_db_updater,
handle_cast,
- [{compact_done, '_'}, '_'],
+ [{compact_done, '_', '_'}, '_'],
DbPid,
?TIMEOUT
),
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/e056ae9e/test/couchdb_views_tests.erl
----------------------------------------------------------------------
diff --git a/test/couchdb_views_tests.erl b/test/couchdb_views_tests.erl
index 69277e6..48e5716 100644
--- a/test/couchdb_views_tests.erl
+++ b/test/couchdb_views_tests.erl
@@ -545,23 +545,27 @@ has_doc(DocId1, Rows) ->
lists:any(fun({R}) -> lists:member({<<"id">>, DocId}, R) end, Rows).
backup_db_file(DbName) ->
- DbDir = config:get("couchdb", "database_dir"),
- DbFile = filename:join([DbDir, ?b2l(DbName) ++ ".couch"]),
- {ok, _} = file:copy(DbFile, DbFile ++ ".backup"),
- ok.
+ {ok, Db} = couch_db:open_int(DbName, []),
+ try
+ SrcPath = couch_db:get_filepath(Db),
+ Src = if
+ is_list(SrcPath) -> SrcPath;
+ true -> binary_to_list(SrcPath)
+ end,
+ ok = copy_tree(Src, Src ++ ".backup")
+ after
+ couch_db:close(Db)
+ end.
restore_backup_db_file(DbName) ->
- DbDir = config:get("couchdb", "database_dir"),
-
{ok, Db} = couch_db:open_int(DbName, []),
+ Src = couch_db:get_filepath(Db),
ok = couch_db:close(Db),
DbPid = couch_db:get_pid(Db),
exit(DbPid, shutdown),
- DbFile = filename:join([DbDir, ?b2l(DbName) ++ ".couch"]),
- ok = file:delete(DbFile),
- ok = file:rename(DbFile ++ ".backup", DbFile),
- ok.
+ exit(couch_db:get_pid(Db), shutdown),
+ ok = copy_tree(Src ++ ".backup", Src).
compact_db(DbName) ->
{ok, Db} = couch_db:open_int(DbName, []),
@@ -709,3 +713,22 @@ wait_indexer(IndexerPid) ->
ok
end
end).
+
+copy_tree(Src, Dst) ->
+ case filelib:is_dir(Src) of
+ true ->
+ {ok, Files} = file:list_dir(Src),
+ copy_tree(Files, Src, Dst);
+ false ->
+ ok = filelib:ensure_dir(Dst),
+ {ok, _} = file:copy(Src, Dst),
+ ok
+ end.
+
+copy_tree([], _Src, _Dst) ->
+ ok;
+copy_tree([File | Rest], Src, Dst) ->
+ FullSrc = filename:join(Src, File),
+ FullDst = filename:join(Dst, File),
+ ok = copy_tree(FullSrc, FullDst),
+ copy_tree(Rest, Src, Dst).
[4/5] couch commit: updated
refs/heads/COUCHDB-3287-pluggable-storage-engines to 0f4e1a7
Posted by da...@apache.org.
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/e056ae9e/src/couch_db_updater.erl
----------------------------------------------------------------------
diff --git a/src/couch_db_updater.erl b/src/couch_db_updater.erl
index 1970b78..cbd2e79 100644
--- a/src/couch_db_updater.erl
+++ b/src/couch_db_updater.erl
@@ -14,74 +14,37 @@
-behaviour(gen_server).
-vsn(1).
--export([btree_by_id_split/1, btree_by_id_join/2, btree_by_id_reduce/2]).
--export([btree_by_seq_split/1, btree_by_seq_join/2, btree_by_seq_reduce/2]).
--export([make_doc_summary/2]).
+-export([add_sizes/3, upgrade_sizes/1]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
-include_lib("couch/include/couch_db.hrl").
-include("couch_db_int.hrl").
--record(comp_header, {
- db_header,
- meta_state
-}).
--record(merge_st, {
- id_tree,
- seq_tree,
- curr,
- rem_seqs,
- infos
-}).
-init({DbName, Filepath, Fd, Options}) ->
+init({Engine, DbName, FilePath, Options0}) ->
erlang:put(io_priority, {db_update, DbName}),
- case lists:member(create, Options) of
- true ->
- % create a new header and writes it to the file
- Header = couch_db_header:new(),
- ok = couch_file:write_header(Fd, Header),
- % delete any old compaction files that might be hanging around
- RootDir = config:get("couchdb", "database_dir", "."),
- couch_file:delete(RootDir, Filepath ++ ".compact"),
- couch_file:delete(RootDir, Filepath ++ ".compact.data"),
- couch_file:delete(RootDir, Filepath ++ ".compact.meta");
- false ->
- case couch_file:read_header(Fd) of
- {ok, Header} ->
- ok;
- no_valid_header ->
- % create a new header and writes it to the file
- Header = couch_db_header:new(),
- ok = couch_file:write_header(Fd, Header),
- % delete any old compaction files that might be hanging around
- file:delete(Filepath ++ ".compact"),
- file:delete(Filepath ++ ".compact.data"),
- file:delete(Filepath ++ ".compact.meta")
- end
- end,
- Db = init_db(DbName, Filepath, Fd, Header, Options),
- case lists:member(sys_db, Options) of
- false ->
- couch_stats_process_tracker:track([couchdb, open_databases]);
- true ->
- ok
- end,
- % we don't load validation funs here because the fabric query is liable to
- % race conditions. Instead see couch_db:validate_doc_update, which loads
- % them lazily
- {ok, Db#db{main_pid = self()}}.
+ DefaultSecObj = default_security_object(DbName),
+ Options = [{default_security_object, DefaultSecObj} | Options0],
+ try
+ {ok, EngineState} = couch_db_engine:init(Engine, FilePath, Options),
+ Db = init_db(DbName, FilePath, EngineState, Options),
+ maybe_track_db(Db),
+ % we don't load validation funs here because the fabric query is liable to
+ % race conditions. Instead see couch_db:validate_doc_update, which loads
+ % them lazily
+ NewDb = Db#db{main_pid = self()},
+ proc_lib:init_ack({ok, NewDb}),
+ gen_server:enter_loop(?MODULE, [], NewDb)
+ catch
+ throw:InitError ->
+ proc_lib:init_ack(InitError)
+ end.
-terminate(_Reason, Db) ->
- % If the reason we died is because our fd disappeared
- % then we don't need to try closing it again.
- if Db#db.fd_monitor == closed -> ok; true ->
- ok = couch_file:close(Db#db.fd)
- end,
+terminate(Reason, Db) ->
couch_util:shutdown_sync(Db#db.compactor_pid),
- couch_util:shutdown_sync(Db#db.fd),
+ couch_db_engine:terminate(Reason, Db),
ok.
handle_call(get_db, _From, Db) ->
@@ -105,28 +68,21 @@ handle_call(cancel_compact, _From, #db{compactor_pid = nil} = Db) ->
handle_call(cancel_compact, _From, #db{compactor_pid = Pid} = Db) ->
unlink(Pid),
exit(Pid, kill),
- RootDir = config:get("couchdb", "database_dir", "."),
- ok = couch_file:delete(RootDir, Db#db.filepath ++ ".compact"),
+ couch_server:delete_compaction_files(Db#db.name),
Db2 = Db#db{compactor_pid = nil},
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
{reply, ok, Db2};
-handle_call(increment_update_seq, _From, Db) ->
- Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}),
- ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- couch_event:notify(Db#db.name, updated),
- {reply, {ok, Db2#db.update_seq}, Db2};
-handle_call({set_security, NewSec}, _From, #db{compression = Comp} = Db) ->
- {ok, Ptr, _} = couch_file:append_term(
- Db#db.fd, NewSec, [{compression, Comp}]),
- Db2 = commit_data(Db#db{security=NewSec, security_ptr=Ptr,
- update_seq=Db#db.update_seq+1}),
- ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- {reply, ok, Db2};
+handle_call({set_security, NewSec}, _From, #db{} = Db) ->
+ {ok, NewDb} = couch_db_engine:set_security(Db, NewSec),
+ NewSecDb = NewDb#db{
+ security = NewSec
+ },
+ ok = gen_server:call(couch_server, {db_updated, NewSecDb}, infinity),
+ {reply, ok, NewSecDb};
handle_call({set_revs_limit, Limit}, _From, Db) ->
- Db2 = commit_data(Db#db{revs_limit=Limit,
- update_seq=Db#db.update_seq+1}),
+ {ok, Db2} = couch_db_engine:set_revs_limit(Db, Limit),
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
{reply, ok, Db2};
@@ -134,73 +90,78 @@ handle_call({purge_docs, _IdRevs}, _From,
#db{compactor_pid=Pid}=Db) when Pid /= nil ->
{reply, {error, purge_during_compaction}, Db};
handle_call({purge_docs, IdRevs}, _From, Db) ->
- #db{
- fd = Fd,
- id_tree = DocInfoByIdBTree,
- seq_tree = DocInfoBySeqBTree,
- update_seq = LastSeq,
- header = Header,
- compression = Comp
- } = Db,
- DocLookups = couch_btree:lookup(DocInfoByIdBTree,
- [Id || {Id, _Revs} <- IdRevs]),
-
- NewDocInfos = lists:zipwith(
- fun({_Id, Revs}, {ok, #full_doc_info{rev_tree=Tree}=FullDocInfo}) ->
+ DocIds = [Id || {Id, _Revs} <- IdRevs],
+ OldDocInfos = couch_db_engine:open_docs(Db, DocIds),
+
+ NewDocInfos = lists:flatmap(fun
+ ({{Id, Revs}, #full_doc_info{id = Id, rev_tree = Tree} = FDI}) ->
case couch_key_tree:remove_leafs(Tree, Revs) of
- {_, []=_RemovedRevs} -> % no change
- nil;
- {NewTree, RemovedRevs} ->
- {FullDocInfo#full_doc_info{rev_tree=NewTree},RemovedRevs}
+ {_, [] = _RemovedRevs} -> % no change
+ [];
+ {NewTree, RemovedRevs} ->
+ NewFDI = FDI#full_doc_info{rev_tree = NewTree},
+ [{FDI, NewFDI, RemovedRevs}]
end;
- (_, not_found) ->
- nil
+ ({_, not_found}) ->
+ []
+ end, lists:zip(IdRevs, OldDocInfos)),
+
+ InitUpdateSeq = couch_db_engine:get_update_seq(Db),
+ InitAcc = {InitUpdateSeq, [], []},
+ FinalAcc = lists:foldl(fun({_, #full_doc_info{} = OldFDI, RemRevs}, Acc) ->
+ #full_doc_info{
+ id = Id,
+ rev_tree = OldTree
+ } = OldFDI,
+ {SeqAcc0, FDIAcc, IdRevsAcc} = Acc,
+
+ {NewFDIAcc, NewSeqAcc} = case OldTree of
+ [] ->
+ % If we purged every #leaf{} in the doc record
+ % then we're removing it completely from the
+ % database.
+ FDIAcc;
+ _ ->
+ % Its possible to purge the #leaf{} that contains
+ % the update_seq where this doc sits in the update_seq
+ % sequence. Rather than do a bunch of complicated checks
+ % we just re-label every #leaf{} and reinsert it into
+ % the update_seq sequence.
+ {NewTree, SeqAcc1} = couch_key_tree:mapfold(fun
+ (_RevId, Leaf, leaf, InnerSeqAcc) ->
+ {Leaf#leaf{seq = InnerSeqAcc + 1}, InnerSeqAcc + 1};
+ (_RevId, Value, _Type, InnerSeqAcc) ->
+ {Value, InnerSeqAcc}
+ end, SeqAcc0, OldTree),
+
+ NewFDI = OldFDI#full_doc_info{
+ update_seq = SeqAcc1,
+ rev_tree = NewTree
+ },
+
+ {[NewFDI | FDIAcc], SeqAcc1}
end,
- IdRevs, DocLookups),
-
- SeqsToRemove = [Seq
- || {#full_doc_info{update_seq=Seq},_} <- NewDocInfos],
-
- FullDocInfoToUpdate = [FullInfo
- || {#full_doc_info{rev_tree=Tree}=FullInfo,_}
- <- NewDocInfos, Tree /= []],
-
- IdRevsPurged = [{Id, Revs}
- || {#full_doc_info{id=Id}, Revs} <- NewDocInfos],
-
- {DocInfoToUpdate, NewSeq} = lists:mapfoldl(
- fun(#full_doc_info{rev_tree=Tree}=FullInfo, SeqAcc) ->
- Tree2 = couch_key_tree:map_leafs(
- fun(_RevId, Leaf) ->
- Leaf#leaf{seq=SeqAcc+1}
- end, Tree),
- {FullInfo#full_doc_info{rev_tree=Tree2}, SeqAcc + 1}
- end, LastSeq, FullDocInfoToUpdate),
-
- IdsToRemove = [Id || {#full_doc_info{id=Id,rev_tree=[]},_}
- <- NewDocInfos],
-
- {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree,
- DocInfoToUpdate, SeqsToRemove),
- {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree,
- FullDocInfoToUpdate, IdsToRemove),
- {ok, Pointer, _} = couch_file:append_term(
- Fd, IdRevsPurged, [{compression, Comp}]),
-
- NewHeader = couch_db_header:set(Header, [
- {purge_seq, couch_db_header:purge_seq(Header) + 1},
- {purged_docs, Pointer}
- ]),
- Db2 = commit_data(
- Db#db{
- id_tree = DocInfoByIdBTree2,
- seq_tree = DocInfoBySeqBTree2,
- update_seq = NewSeq + 1,
- header=NewHeader}),
+ NewIdRevsAcc = [{Id, RemRevs} | IdRevsAcc],
+ {NewSeqAcc, NewFDIAcc, NewIdRevsAcc}
+ end, InitAcc, NewDocInfos),
+
+ {_FinalSeq, FDIs, PurgedIdRevs} = FinalAcc,
+
+ % We need to only use the list of #full_doc_info{} records
+ % that we have actually changed due to a purge.
+ PreviousFDIs = [PrevFDI || {PrevFDI, _, _} <- NewDocInfos],
+ Pairs = pair_purge_info(PreviousFDIs, FDIs),
+
+ {ok, Db2} = couch_db_engine:write_doc_infos(Db, Pairs, [], PurgedIdRevs),
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
couch_event:notify(Db#db.name, updated),
- {reply, {ok, couch_db_header:purge_seq(NewHeader), IdRevsPurged}, Db2}.
+
+ PurgeSeq = couch_db_engine:get_purge_seq(Db2),
+ {reply, {ok, PurgeSeq, PurgedIdRevs}, Db2};
+
+handle_call(Msg, From, Db) ->
+ couch_db_engine:handle_call(Msg, From, Db).
handle_cast({load_validation_funs, ValidationFuns}, Db) ->
@@ -209,65 +170,29 @@ handle_cast({load_validation_funs, ValidationFuns}, Db) ->
{noreply, Db2};
handle_cast(start_compact, Db) ->
case Db#db.compactor_pid of
- nil ->
- couch_log:info("Starting compaction for db \"~s\"", [Db#db.name]),
- Pid = spawn_link(fun() -> start_copy_compact(Db) end),
- Db2 = Db#db{compactor_pid=Pid},
- ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- {noreply, Db2};
- _ ->
- % compact currently running, this is a no-op
- {noreply, Db}
- end;
-handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
- {ok, NewFd} = couch_file:open(CompactFilepath),
- {ok, NewHeader0} = couch_file:read_header(NewFd),
- NewHeader = couch_db_header:set(NewHeader0, [
- {compacted_seq, Db#db.update_seq}
- ]),
- #db{update_seq=NewSeq} = NewDb =
- init_db(Db#db.name, Filepath, NewFd, NewHeader, Db#db.options),
- unlink(NewFd),
- case Db#db.update_seq == NewSeq of
- true ->
- % suck up all the local docs into memory and write them to the new db
- {ok, _, LocalDocs} = couch_btree:foldl(Db#db.local_tree,
- fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
- {ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_tree, LocalDocs),
-
- NewDb2 = commit_data(NewDb#db{
- local_tree = NewLocalBtree,
- main_pid = self(),
- filepath = Filepath,
- instance_start_time = Db#db.instance_start_time,
- revs_limit = Db#db.revs_limit
- }),
-
- couch_log:debug("CouchDB swapping files ~s and ~s.",
- [Filepath, CompactFilepath]),
- ok = file:rename(CompactFilepath, Filepath ++ ".compact"),
- RootDir = config:get("couchdb", "database_dir", "."),
- couch_file:delete(RootDir, Filepath),
- ok = file:rename(Filepath ++ ".compact", Filepath),
- % Delete the old meta compaction file after promoting
- % the compaction file.
- couch_file:delete(RootDir, Filepath ++ ".compact.meta"),
- close_db(Db),
- NewDb3 = refresh_validate_doc_funs(NewDb2),
- ok = gen_server:call(couch_server, {db_updated, NewDb3}, infinity),
- couch_event:notify(NewDb3#db.name, compacted),
- couch_log:info("Compaction for db \"~s\" completed.", [Db#db.name]),
- {noreply, NewDb3#db{compactor_pid=nil}};
- false ->
- couch_log:info("Compaction file still behind main file "
- "(update seq=~p. compact update seq=~p). Retrying.",
- [Db#db.update_seq, NewSeq]),
- close_db(NewDb),
- Pid = spawn_link(fun() -> start_copy_compact(Db) end),
- Db2 = Db#db{compactor_pid=Pid},
- ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- {noreply, Db2}
+ nil ->
+ % For now we only support compacting to the same
+ % storage engine. After the first round of patches
+ % we'll add a field that sets the target engine
+ % type to compact to with a new copy compactor.
+ UpdateSeq = couch_db_engine:get_update_seq(Db),
+ Args = [Db#db.name, UpdateSeq],
+ couch_log:info("Starting compaction for db \"~s\" at ~p", Args),
+ {ok, Db2} = couch_db_engine:start_compaction(Db),
+ ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
+ {noreply, Db2};
+ _ ->
+ % compact currently running, this is a no-op
+ {noreply, Db}
end;
+handle_cast({compact_done, CompactEngine, CompactInfo}, #db{} = OldDb) ->
+ {ok, NewDb} = case couch_db_engine:get_engine(OldDb) of
+ CompactEngine ->
+ couch_db_engine:finish_compaction(OldDb, CompactInfo);
+ _ ->
+ finish_engine_swap(OldDb, CompactEngine, CompactInfo)
+ end,
+ {noreply, NewDb};
handle_cast(Msg, #db{name = Name} = Db) ->
couch_log:error("Database `~s` updater received unexpected cast: ~p",
@@ -291,9 +216,9 @@ handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts,
FullCommit2) of
{ok, Db2, UpdatedDDocIds} ->
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- if Db2#db.update_seq /= Db#db.update_seq ->
- couch_event:notify(Db2#db.name, updated);
- true -> ok
+ case {couch_db:get_update_seq(Db), couch_db:get_update_seq(Db2)} of
+ {Seq, Seq} -> ok;
+ _ -> couch_event:notify(Db2#db.name, updated)
end,
if NonRepDocs2 /= [] ->
couch_event:notify(Db2#db.name, local_updated);
@@ -336,9 +261,8 @@ handle_info({'EXIT', _Pid, normal}, Db) ->
{noreply, Db};
handle_info({'EXIT', _Pid, Reason}, Db) ->
{stop, Reason, Db};
-handle_info({'DOWN', Ref, _, _, Reason}, #db{fd_monitor=Ref, name=Name} = Db) ->
- couch_log:error("DB ~s shutting down - Fd ~p", [Name, Reason]),
- {stop, normal, Db#db{fd=undefined, fd_monitor=closed}}.
+handle_info(Msg, Db) ->
+ couch_db_engine:handle_info(Msg, Db).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -389,235 +313,32 @@ collect_updates(GroupedDocsAcc, ClientsAcc, MergeConflicts, FullCommit) ->
{GroupedDocsAcc, ClientsAcc, FullCommit}
end.
-rev_tree(DiskTree) ->
- couch_key_tree:map(fun
- (_RevId, {Del, Ptr, Seq}) ->
- #leaf{
- deleted = ?i2b(Del),
- ptr = Ptr,
- seq = Seq
- };
- (_RevId, {Del, Ptr, Seq, Size}) ->
- #leaf{
- deleted = ?i2b(Del),
- ptr = Ptr,
- seq = Seq,
- sizes = upgrade_sizes(Size)
- };
- (_RevId, {Del, Ptr, Seq, Sizes, Atts}) ->
- #leaf{
- deleted = ?i2b(Del),
- ptr = Ptr,
- seq = Seq,
- sizes = upgrade_sizes(Sizes),
- atts = Atts
- };
- (_RevId, ?REV_MISSING) ->
- ?REV_MISSING
- end, DiskTree).
-
-disk_tree(RevTree) ->
- couch_key_tree:map(fun
- (_RevId, ?REV_MISSING) ->
- ?REV_MISSING;
- (_RevId, #leaf{} = Leaf) ->
- #leaf{
- deleted = Del,
- ptr = Ptr,
- seq = Seq,
- sizes = Sizes,
- atts = Atts
- } = Leaf,
- {?b2i(Del), Ptr, Seq, split_sizes(Sizes), Atts}
- end, RevTree).
-upgrade_sizes(#size_info{}=SI) ->
- SI;
-upgrade_sizes({D, E}) ->
- #size_info{active=D, external=E};
-upgrade_sizes(S) when is_integer(S) ->
- #size_info{active=S, external=0}.
-
-split_sizes(#size_info{}=SI) ->
- {SI#size_info.active, SI#size_info.external}.
-
-join_sizes({Active, External}) when is_integer(Active), is_integer(External) ->
- #size_info{active=Active, external=External}.
-
-btree_by_seq_split(#full_doc_info{}=Info) ->
- #full_doc_info{
- id = Id,
- update_seq = Seq,
- deleted = Del,
- sizes = SizeInfo,
- rev_tree = Tree
- } = Info,
- {Seq, {Id, ?b2i(Del), split_sizes(SizeInfo), disk_tree(Tree)}}.
-
-btree_by_seq_join(Seq, {Id, Del, DiskTree}) when is_integer(Del) ->
- btree_by_seq_join(Seq, {Id, Del, {0, 0}, DiskTree});
-btree_by_seq_join(Seq, {Id, Del, Sizes, DiskTree}) when is_integer(Del) ->
- #full_doc_info{
- id = Id,
- update_seq = Seq,
- deleted = ?i2b(Del),
- sizes = join_sizes(Sizes),
- rev_tree = rev_tree(DiskTree)
- };
-btree_by_seq_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) ->
- % Older versions stored #doc_info records in the seq_tree.
- % Compact to upgrade.
- #doc_info{
- id = Id,
- high_seq=KeySeq,
- revs =
- [#rev_info{rev=Rev,seq=Seq,deleted=false,body_sp = Bp} ||
- {Rev, Seq, Bp} <- RevInfos] ++
- [#rev_info{rev=Rev,seq=Seq,deleted=true,body_sp = Bp} ||
- {Rev, Seq, Bp} <- DeletedRevInfos]}.
-
-btree_by_id_split(#full_doc_info{}=Info) ->
- #full_doc_info{
- id = Id,
- update_seq = Seq,
- deleted = Deleted,
- sizes = SizeInfo,
- rev_tree = Tree
- } = Info,
- {Id, {Seq, ?b2i(Deleted), split_sizes(SizeInfo), disk_tree(Tree)}}.
-
-% Handle old formats before data_size was added
-btree_by_id_join(Id, {HighSeq, Deleted, DiskTree}) ->
- btree_by_id_join(Id, {HighSeq, Deleted, #size_info{}, DiskTree});
-
-btree_by_id_join(Id, {HighSeq, Deleted, Sizes, DiskTree}) ->
- #full_doc_info{
- id = Id,
- update_seq = HighSeq,
- deleted = ?i2b(Deleted),
- sizes = upgrade_sizes(Sizes),
- rev_tree = rev_tree(DiskTree)
- }.
-
-btree_by_id_reduce(reduce, FullDocInfos) ->
- lists:foldl(
- fun(Info, {NotDeleted, Deleted, Sizes}) ->
- Sizes2 = reduce_sizes(Sizes, Info#full_doc_info.sizes),
- case Info#full_doc_info.deleted of
- true ->
- {NotDeleted, Deleted + 1, Sizes2};
- false ->
- {NotDeleted + 1, Deleted, Sizes2}
- end
- end,
- {0, 0, #size_info{}}, FullDocInfos);
-btree_by_id_reduce(rereduce, Reds) ->
- lists:foldl(
- fun({NotDeleted, Deleted}, {AccNotDeleted, AccDeleted, _AccSizes}) ->
- % pre 1.2 format, will be upgraded on compaction
- {AccNotDeleted + NotDeleted, AccDeleted + Deleted, nil};
- ({NotDeleted, Deleted, Sizes}, {AccNotDeleted, AccDeleted, AccSizes}) ->
- AccSizes2 = reduce_sizes(AccSizes, Sizes),
- {AccNotDeleted + NotDeleted, AccDeleted + Deleted, AccSizes2}
- end,
- {0, 0, #size_info{}}, Reds).
-
-reduce_sizes(nil, _) ->
- nil;
-reduce_sizes(_, nil) ->
- nil;
-reduce_sizes(#size_info{}=S1, #size_info{}=S2) ->
- #size_info{
- active = S1#size_info.active + S2#size_info.active,
- external = S1#size_info.external + S2#size_info.external
- };
-reduce_sizes(S1, S2) ->
- reduce_sizes(upgrade_sizes(S1), upgrade_sizes(S2)).
-
-btree_by_seq_reduce(reduce, DocInfos) ->
- % count the number of documents
- length(DocInfos);
-btree_by_seq_reduce(rereduce, Reds) ->
- lists:sum(Reds).
-
-init_db(DbName, Filepath, Fd, Header0, Options) ->
- Header = couch_db_header:upgrade(Header0),
-
- {ok, FsyncOptions} = couch_util:parse_term(
- config:get("couchdb", "fsync_options",
- "[before_header, after_header, on_file_open]")),
-
- case lists:member(on_file_open, FsyncOptions) of
- true -> ok = couch_file:sync(Fd);
- _ -> ok
- end,
-
- Compression = couch_compress:get_compression_method(),
-
- IdTreeState = couch_db_header:id_tree_state(Header),
- SeqTreeState = couch_db_header:seq_tree_state(Header),
- LocalTreeState = couch_db_header:local_tree_state(Header),
- {ok, IdBtree} = couch_btree:open(IdTreeState, Fd,
- [{split, fun ?MODULE:btree_by_id_split/1},
- {join, fun ?MODULE:btree_by_id_join/2},
- {reduce, fun ?MODULE:btree_by_id_reduce/2},
- {compression, Compression}]),
- {ok, SeqBtree} = couch_btree:open(SeqTreeState, Fd,
- [{split, fun ?MODULE:btree_by_seq_split/1},
- {join, fun ?MODULE:btree_by_seq_join/2},
- {reduce, fun ?MODULE:btree_by_seq_reduce/2},
- {compression, Compression}]),
- {ok, LocalDocsBtree} = couch_btree:open(LocalTreeState, Fd,
- [{compression, Compression}]),
- case couch_db_header:security_ptr(Header) of
- nil ->
- Security = default_security_object(DbName),
- SecurityPtr = nil;
- SecurityPtr ->
- {ok, Security} = couch_file:pread_term(Fd, SecurityPtr)
- end,
+init_db(DbName, FilePath, EngineState, Options) ->
% convert start time tuple to microsecs and store as a binary string
{MegaSecs, Secs, MicroSecs} = os:timestamp(),
StartTime = ?l2b(io_lib:format("~p",
[(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])),
- ok = couch_file:set_db_pid(Fd, self()),
- Db = #db{
- fd=Fd,
- fd_monitor = erlang:monitor(process, Fd),
- header=Header,
- id_tree = IdBtree,
- seq_tree = SeqBtree,
- local_tree = LocalDocsBtree,
- committed_update_seq = couch_db_header:update_seq(Header),
- update_seq = couch_db_header:update_seq(Header),
+
+ BDU = couch_util:get_value(before_doc_update, Options, nil),
+ ADR = couch_util:get_value(after_doc_read, Options, nil),
+
+ CleanedOpts = [Opt || Opt <- Options, Opt /= create],
+
+ InitDb = #db{
name = DbName,
- filepath = Filepath,
- security = Security,
- security_ptr = SecurityPtr,
+ filepath = FilePath,
+ engine = EngineState,
instance_start_time = StartTime,
- revs_limit = couch_db_header:revs_limit(Header),
- fsync_options = FsyncOptions,
- options = Options,
- compression = Compression,
- before_doc_update = couch_util:get_value(before_doc_update, Options, nil),
- after_doc_read = couch_util:get_value(after_doc_read, Options, nil)
+ options = CleanedOpts,
+ before_doc_update = BDU,
+ after_doc_read = ADR
},
- % If we just created a new UUID while upgrading a
- % database then we want to flush that to disk or
- % we risk sending out the uuid and having the db
- % crash which would result in it generating a new
- % uuid each time it was reopened.
- case Header /= Header0 of
- true ->
- sync_header(Db, Header);
- false ->
- Db
- end.
-
-
-close_db(#db{fd_monitor = Ref}) ->
- erlang:demonitor(Ref).
+ InitDb#db{
+ committed_update_seq = couch_db_engine:get_update_seq(InitDb),
+ security = couch_db_engine:get_security(InitDb)
+ }.
refresh_validate_doc_funs(#db{name = <<"shards/", _/binary>> = Name} = Db) ->
@@ -641,50 +362,36 @@ refresh_validate_doc_funs(Db0) ->
flush_trees(_Db, [], AccFlushedTrees) ->
{ok, lists:reverse(AccFlushedTrees)};
-flush_trees(#db{fd = Fd} = Db,
+flush_trees(#db{} = Db,
[InfoUnflushed | RestUnflushed], AccFlushed) ->
#full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed,
{Flushed, FinalAcc} = couch_key_tree:mapfold(
fun(_Rev, Value, Type, SizesAcc) ->
case Value of
- #doc{deleted = IsDeleted, body = {summary, _, _, _} = DocSummary} ->
- {summary, Summary, AttSizeInfo, AttsFd} = DocSummary,
- % this node value is actually an unwritten document summary,
- % write to disk.
- % make sure the Fd in the written bins is the same Fd we are
- % and convert bins, removing the FD.
- % All bins should have been written to disk already.
- case {AttsFd, Fd} of
- {nil, _} ->
- ok;
- {SameFd, SameFd} ->
- ok;
- _ ->
- % Fd where the attachments were written to is not the same
- % as our Fd. This can happen when a database is being
- % switched out during a compaction.
- couch_log:debug("File where the attachments are written has"
- " changed. Possibly retrying.", []),
- throw(retry)
- end,
- ExternalSize = ?term_size(Summary),
- {ok, NewSummaryPointer, SummarySize} =
- couch_file:append_raw_chunk(Fd, Summary),
- Leaf = #leaf{
- deleted = IsDeleted,
- ptr = NewSummaryPointer,
- seq = UpdateSeq,
- sizes = #size_info{
- active = SummarySize,
- external = ExternalSize
+ % This node is a document summary that needs to be
+ % flushed to disk.
+ #doc{} = Doc ->
+ check_doc_atts(Db, Doc),
+ ExternalSize = ?term_size(Doc#doc.body),
+ {size_info, AttSizeInfo} =
+ lists:keyfind(size_info, 1, Doc#doc.meta),
+ {ok, NewDoc, WrittenSize} =
+ couch_db_engine:write_doc_body(Db, Doc),
+ Leaf = #leaf{
+ deleted = Doc#doc.deleted,
+ ptr = NewDoc#doc.body,
+ seq = UpdateSeq,
+ sizes = #size_info{
+ active = WrittenSize,
+ external = ExternalSize
+ },
+ atts = AttSizeInfo
},
- atts = AttSizeInfo
- },
- {Leaf, add_sizes(Type, Leaf, SizesAcc)};
- #leaf{} ->
- {Value, add_sizes(Type, Value, SizesAcc)};
- _ ->
- {Value, SizesAcc}
+ {Leaf, add_sizes(Type, Leaf, SizesAcc)};
+ #leaf{} ->
+ {Value, add_sizes(Type, Value, SizesAcc)};
+ _ ->
+ {Value, SizesAcc}
end
end, {0, 0, []}, Unflushed),
{FinalAS, FinalES, FinalAtts} = FinalAcc,
@@ -698,6 +405,29 @@ flush_trees(#db{fd = Fd} = Db,
},
flush_trees(Db, RestUnflushed, [NewInfo | AccFlushed]).
+
+check_doc_atts(Db, Doc) ->
+ {atts_stream, Stream} = lists:keyfind(atts_stream, 1, Doc#doc.meta),
+ % Make sure that the attachments were written to the currently
+ % active attachment stream. If compaction swaps during a write
+ % request we may have to rewrite our attachment bodies.
+ if Stream == nil -> ok; true ->
+ case couch_db:is_active_stream(Db, Stream) of
+ true ->
+ ok;
+ false ->
+ % Stream where the attachments were written to is
+ % no longer the current attachment stream. This
+ % can happen when a database is switched at
+ % compaction time.
+ couch_log:debug("Stream where the attachments were"
+ " written has changed."
+ " Possibly retrying.", []),
+ throw(retry)
+ end
+ end.
+
+
add_sizes(Type, #leaf{sizes=Sizes, atts=AttSizes}, Acc) ->
% Maybe upgrade from disk_size only
#size_info{
@@ -710,6 +440,15 @@ add_sizes(Type, #leaf{sizes=Sizes, atts=AttSizes}, Acc) ->
NewAttsAcc = lists:umerge(AttSizes, AttsAcc),
{NewASAcc, NewESAcc, NewAttsAcc}.
+
+upgrade_sizes(#size_info{}=SI) ->
+ SI;
+upgrade_sizes({D, E}) ->
+ #size_info{active=D, external=E};
+upgrade_sizes(S) when is_integer(S) ->
+ #size_info{active=S, external=0}.
+
+
send_result(Client, Doc, NewResult) ->
% used to send a result to the client
catch(Client ! {result, self(), {doc_tag(Doc), NewResult}}).
@@ -836,58 +575,40 @@ merge_rev_tree(OldInfo, NewDoc, _Client, Limit, true) ->
{NewTree, _} = couch_key_tree:merge(OldTree, NewTree0, Limit),
OldInfo#full_doc_info{rev_tree = NewTree}.
-stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) ->
- [Info#full_doc_info{rev_tree=couch_key_tree:stem(Tree, Limit)} ||
- #full_doc_info{rev_tree=Tree}=Info <- DocInfos].
+update_docs_int(Db, DocsList, LocalDocs, MergeConflicts, FullCommit) ->
+ UpdateSeq = couch_db_engine:get_update_seq(Db),
+ RevsLimit = couch_db_engine:get_revs_limit(Db),
-update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
- #db{
- id_tree = DocInfoByIdBTree,
- seq_tree = DocInfoBySeqBTree,
- update_seq = LastSeq,
- revs_limit = RevsLimit
- } = Db,
Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList],
% lookup up the old documents, if they exist.
- OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
- OldDocInfos = lists:zipwith(
- fun(_Id, {ok, FullDocInfo}) ->
- FullDocInfo;
+ OldDocLookups = couch_db_engine:open_docs(Db, Ids),
+ OldDocInfos = lists:zipwith(fun
+ (_Id, #full_doc_info{} = FDI) ->
+ FDI;
(Id, not_found) ->
#full_doc_info{id=Id}
- end,
- Ids, OldDocLookups),
+ end, Ids, OldDocLookups),
% Merge the new docs into the revision trees.
- {ok, NewFullDocInfos, RemoveSeqs, NewSeq} = merge_rev_trees(RevsLimit,
- MergeConflicts, DocsList, OldDocInfos, [], [], LastSeq),
-
- % All documents are now ready to write.
-
- {ok, Db2} = update_local_docs(Db, NonRepDocs),
+ {ok, NewFullDocInfos, RemSeqs, _} = merge_rev_trees(RevsLimit,
+ MergeConflicts, DocsList, OldDocInfos, [], [], UpdateSeq),
% Write out the document summaries (the bodies are stored in the nodes of
% the trees, the attachments are already written to disk)
- {ok, IndexFullDocInfos} = flush_trees(Db2, NewFullDocInfos, []),
-
- % and the indexes
- {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, IndexFullDocInfos, []),
- {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, IndexFullDocInfos, RemoveSeqs),
+ {ok, IndexFDIs} = flush_trees(Db, NewFullDocInfos, []),
+ Pairs = pair_write_info(OldDocLookups, IndexFDIs),
+ LocalDocs2 = update_local_doc_revs(LocalDocs),
+ {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2, []),
- WriteCount = length(IndexFullDocInfos),
+ WriteCount = length(IndexFDIs),
couch_stats:increment_counter([couchdb, document_inserts],
- WriteCount - length(RemoveSeqs)),
+ WriteCount - length(RemSeqs)),
couch_stats:increment_counter([couchdb, document_writes], WriteCount),
couch_stats:increment_counter(
[couchdb, local_document_writes],
- length(NonRepDocs)
+ length(LocalDocs2)
),
- Db3 = Db2#db{
- id_tree = DocInfoByIdBTree2,
- seq_tree = DocInfoBySeqBTree2,
- update_seq = NewSeq},
-
% Check if we just updated any design documents, and update the validation
% funs if we did.
UpdatedDDocIds = lists:flatmap(fun
@@ -895,549 +616,92 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
(_) -> []
end, Ids),
- Db4 = case length(UpdatedDDocIds) > 0 of
+ Db2 = case length(UpdatedDDocIds) > 0 of
true ->
- couch_event:notify(Db3#db.name, ddoc_updated),
- ddoc_cache:evict(Db3#db.name, UpdatedDDocIds),
- refresh_validate_doc_funs(Db3);
+ ddoc_cache:evict(Db1#db.name, UpdatedDDocIds),
+ refresh_validate_doc_funs(Db1);
false ->
- Db3
+ Db1
end,
- {ok, commit_data(Db4, not FullCommit), UpdatedDDocIds}.
-
-update_local_docs(Db, []) ->
- {ok, Db};
-update_local_docs(#db{local_tree=Btree}=Db, Docs) ->
- BtreeEntries = lists:map(
- fun({Client, NewDoc}) ->
- #doc{
- id = Id,
- deleted = Delete,
- revs = {0, PrevRevs},
- body = Body
- } = NewDoc,
- case PrevRevs of
- [RevStr|_] ->
+ {ok, commit_data(Db2, not FullCommit), UpdatedDDocIds}.
+
+
+update_local_doc_revs(Docs) ->
+ lists:map(fun({Client, NewDoc}) ->
+ #doc{
+ deleted = Delete,
+ revs = {0, PrevRevs}
+ } = NewDoc,
+ case PrevRevs of
+ [RevStr | _] ->
PrevRev = list_to_integer(?b2l(RevStr));
[] ->
PrevRev = 0
- end,
- case Delete of
- false ->
- send_result(Client, NewDoc, {ok,
- {0, ?l2b(integer_to_list(PrevRev + 1))}}),
- {update, {Id, {PrevRev + 1, Body}}};
- true ->
- send_result(Client, NewDoc,
- {ok, {0, <<"0">>}}),
- {remove, Id}
- end
- end, Docs),
-
- BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries],
- BtreeIdsUpdate = [{Key, Val} || {update, {Key, Val}} <- BtreeEntries],
-
- {ok, Btree2} =
- couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove),
-
- {ok, Db#db{local_tree = Btree2}}.
+ end,
+ NewRev = case Delete of
+ false ->
+ ?l2b(integer_to_list(PrevRev + 1));
+ true ->
+ <<"0">>
+ end,
+ send_result(Client, NewDoc, {ok, {0, NewRev}}),
+ NewDoc#doc{
+ revs = {0, [NewRev]}
+ }
+ end, Docs).
-db_to_header(Db, Header) ->
- couch_db_header:set(Header, [
- {update_seq, Db#db.update_seq},
- {seq_tree_state, couch_btree:get_state(Db#db.seq_tree)},
- {id_tree_state, couch_btree:get_state(Db#db.id_tree)},
- {local_tree_state, couch_btree:get_state(Db#db.local_tree)},
- {security_ptr, Db#db.security_ptr},
- {revs_limit, Db#db.revs_limit}
- ]).
commit_data(Db) ->
commit_data(Db, false).
-commit_data(#db{waiting_delayed_commit=nil} = Db, true) ->
- TRef = erlang:send_after(1000,self(),delayed_commit),
- Db#db{waiting_delayed_commit=TRef};
+commit_data(#db{waiting_delayed_commit = nil} = Db, true) ->
+ TRef = erlang:send_after(1000, self(), delayed_commit),
+ Db#db{waiting_delayed_commit = TRef};
commit_data(Db, true) ->
Db;
commit_data(Db, _) ->
#db{
- header = OldHeader,
- waiting_delayed_commit = Timer
- } = Db,
- if is_reference(Timer) -> erlang:cancel_timer(Timer); true -> ok end,
- case db_to_header(Db, OldHeader) of
- OldHeader -> Db#db{waiting_delayed_commit=nil};
- NewHeader -> sync_header(Db, NewHeader)
- end.
-
-sync_header(Db, NewHeader) ->
- #db{
- fd = Fd,
- filepath = FilePath,
- fsync_options = FsyncOptions,
waiting_delayed_commit = Timer
} = Db,
-
if is_reference(Timer) -> erlang:cancel_timer(Timer); true -> ok end,
-
- Before = lists:member(before_header, FsyncOptions),
- After = lists:member(after_header, FsyncOptions),
-
- if Before -> couch_file:sync(FilePath); true -> ok end,
- ok = couch_file:write_header(Fd, NewHeader),
- if After -> couch_file:sync(FilePath); true -> ok end,
-
- Db#db{
- header=NewHeader,
- committed_update_seq=Db#db.update_seq,
- waiting_delayed_commit=nil
+ {ok, Db1} = couch_db_engine:commit_data(Db),
+ Db1#db{
+ waiting_delayed_commit = nil,
+ committed_update_seq = couch_db_engine:get_update_seq(Db)
}.
-copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) ->
- {ok, {BodyData, BinInfos0}} = couch_db:read_doc(SrcDb, SrcSp),
- BinInfos = case BinInfos0 of
- _ when is_binary(BinInfos0) ->
- couch_compress:decompress(BinInfos0);
- _ when is_list(BinInfos0) ->
- % pre 1.2 file format
- BinInfos0
- end,
- % copy the bin values
- NewBinInfos = lists:map(
- fun({Name, Type, BinSp, AttLen, RevPos, ExpectedMd5}) ->
- % 010 UPGRADE CODE
- {NewBinSp, AttLen, AttLen, ActualMd5, _IdentityMd5} =
- couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
- check_md5(ExpectedMd5, ActualMd5),
- {Name, Type, NewBinSp, AttLen, AttLen, RevPos, ExpectedMd5, identity};
- ({Name, Type, BinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc1}) ->
- {NewBinSp, AttLen, _, ActualMd5, _IdentityMd5} =
- couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
- check_md5(ExpectedMd5, ActualMd5),
- Enc = case Enc1 of
- true ->
- % 0110 UPGRADE CODE
- gzip;
- false ->
- % 0110 UPGRADE CODE
- identity;
- _ ->
- Enc1
- end,
- {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc}
- end, BinInfos),
- {BodyData, NewBinInfos}.
-
-merge_lookups(Infos, []) ->
- Infos;
-merge_lookups([], _) ->
- [];
-merge_lookups([#doc_info{}=DI | RestInfos], [{ok, FDI} | RestLookups]) ->
- % Assert we've matched our lookups
- if DI#doc_info.id == FDI#full_doc_info.id -> ok; true ->
- erlang:error({mismatched_doc_infos, DI#doc_info.id})
- end,
- [FDI | merge_lookups(RestInfos, RestLookups)];
-merge_lookups([FDI | RestInfos], Lookups) ->
- [FDI | merge_lookups(RestInfos, Lookups)].
-
-check_md5(Md5, Md5) -> ok;
-check_md5(_, _) -> throw(md5_mismatch).
-
-copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
- DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos],
- LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds),
- % COUCHDB-968, make sure we prune duplicates during compaction
- NewInfos0 = lists:usort(fun(#full_doc_info{id=A}, #full_doc_info{id=B}) ->
- A =< B
- end, merge_lookups(MixedInfos, LookupResults)),
-
- NewInfos1 = lists:map(fun(Info) ->
- {NewRevTree, FinalAcc} = couch_key_tree:mapfold(fun
- (_Rev, #leaf{ptr=Sp}=Leaf, leaf, SizesAcc) ->
- {Body, AttInfos} = copy_doc_attachments(Db, Sp, DestFd),
- SummaryChunk = make_doc_summary(NewDb, {Body, AttInfos}),
- ExternalSize = ?term_size(SummaryChunk),
- {ok, Pos, SummarySize} = couch_file:append_raw_chunk(
- DestFd, SummaryChunk),
- AttSizes = [{element(3,A), element(4,A)} || A <- AttInfos],
- NewLeaf = Leaf#leaf{
- ptr = Pos,
- sizes = #size_info{
- active = SummarySize,
- external = ExternalSize
- },
- atts = AttSizes
- },
- {NewLeaf, add_sizes(leaf, NewLeaf, SizesAcc)};
- (_Rev, _Leaf, branch, SizesAcc) ->
- {?REV_MISSING, SizesAcc}
- end, {0, 0, []}, Info#full_doc_info.rev_tree),
- {FinalAS, FinalES, FinalAtts} = FinalAcc,
- TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts),
- NewActiveSize = FinalAS + TotalAttSize,
- NewExternalSize = FinalES + TotalAttSize,
- Info#full_doc_info{
- rev_tree = NewRevTree,
- sizes = #size_info{
- active = NewActiveSize,
- external = NewExternalSize
- }
- }
- end, NewInfos0),
-
- NewInfos = stem_full_doc_infos(Db, NewInfos1),
- RemoveSeqs =
- case Retry of
- nil ->
- [];
- OldDocIdTree ->
- % Compaction is being rerun to catch up to writes during the
- % first pass. This means we may have docs that already exist
- % in the seq_tree in the .data file. Here we lookup any old
- % update_seqs so that they can be removed.
- Ids = [Id || #full_doc_info{id=Id} <- NewInfos],
- Existing = couch_btree:lookup(OldDocIdTree, Ids),
- [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing]
- end,
-
- {ok, SeqTree} = couch_btree:add_remove(
- NewDb#db.seq_tree, NewInfos, RemoveSeqs),
-
- FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) ->
- {{Id, Seq}, FDI}
- end, NewInfos),
- {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs),
- update_compact_task(length(NewInfos)),
- NewDb#db{id_tree=IdEms, seq_tree=SeqTree}.
-
-
-copy_compact(Db, NewDb0, Retry) ->
- Compression = couch_compress:get_compression_method(),
- NewDb = NewDb0#db{compression=Compression},
- TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
- BufferSize = list_to_integer(
- config:get("database_compaction", "doc_buffer_size", "524288")),
- CheckpointAfter = couch_util:to_integer(
- config:get("database_compaction", "checkpoint_after",
- BufferSize * 10)),
-
- EnumBySeqFun =
- fun(DocInfo, _Offset,
- {AccNewDb, AccUncopied, AccUncopiedSize, AccCopiedSize}) ->
-
- Seq = case DocInfo of
- #full_doc_info{} -> DocInfo#full_doc_info.update_seq;
- #doc_info{} -> DocInfo#doc_info.high_seq
- end,
- AccUncopiedSize2 = AccUncopiedSize + ?term_size(DocInfo),
- if AccUncopiedSize2 >= BufferSize ->
- NewDb2 = copy_docs(
- Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
- AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2,
- if AccCopiedSize2 >= CheckpointAfter ->
- CommNewDb2 = commit_compaction_data(NewDb2#db{update_seq=Seq}),
- {ok, {CommNewDb2, [], 0, 0}};
- true ->
- {ok, {NewDb2#db{update_seq = Seq}, [], 0, AccCopiedSize2}}
- end;
+maybe_track_db(#db{options = Options}) ->
+ case lists:member(sys_db, Options) of
true ->
- {ok, {AccNewDb, [DocInfo | AccUncopied], AccUncopiedSize2,
- AccCopiedSize}}
- end
- end,
-
- TaskProps0 = [
- {type, database_compaction},
- {database, Db#db.name},
- {progress, 0},
- {changes_done, 0},
- {total_changes, TotalChanges}
- ],
- case (Retry =/= nil) and couch_task_status:is_task_added() of
- true ->
- couch_task_status:update([
- {retry, true},
- {progress, 0},
- {changes_done, 0},
- {total_changes, TotalChanges}
- ]);
- false ->
- couch_task_status:add_task(TaskProps0),
- couch_task_status:set_update_frequency(500)
- end,
-
- {ok, _, {NewDb2, Uncopied, _, _}} =
- couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun,
- {NewDb, [], 0, 0},
- [{start_key, NewDb#db.update_seq + 1}]),
-
- NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
-
- % copy misc header values
- if NewDb3#db.security /= Db#db.security ->
- {ok, Ptr, _} = couch_file:append_term(
- NewDb3#db.fd, Db#db.security,
- [{compression, NewDb3#db.compression}]),
- NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr};
- true ->
- NewDb4 = NewDb3
- end,
-
- commit_compaction_data(NewDb4#db{update_seq=Db#db.update_seq}).
-
-
-start_copy_compact(#db{}=Db) ->
- erlang:put(io_priority, {db_compact, Db#db.name}),
- #db{name=Name, filepath=Filepath, options=Options, header=Header} = Db,
- couch_log:debug("Compaction process spawned for db \"~s\"", [Name]),
-
- {ok, NewDb, DName, DFd, MFd, Retry} =
- open_compaction_files(Name, Header, Filepath, Options),
- erlang:monitor(process, MFd),
-
- % This is a bit worrisome. init_db/4 will monitor the data fd
- % but it doesn't know about the meta fd. For now I'll maintain
- % that the data fd is the old normal fd and meta fd is special
- % and hope everything works out for the best.
- unlink(DFd),
-
- NewDb1 = copy_purge_info(Db, NewDb),
- NewDb2 = copy_compact(Db, NewDb1, Retry),
- NewDb3 = sort_meta_data(NewDb2),
- NewDb4 = commit_compaction_data(NewDb3),
- NewDb5 = copy_meta_data(NewDb4),
- NewDb6 = sync_header(NewDb5, db_to_header(NewDb5, NewDb5#db.header)),
- close_db(NewDb6),
-
- ok = couch_file:close(MFd),
- gen_server:cast(Db#db.main_pid, {compact_done, DName}).
-
-
-open_compaction_files(DbName, SrcHdr, DbFilePath, Options) ->
- DataFile = DbFilePath ++ ".compact.data",
- MetaFile = DbFilePath ++ ".compact.meta",
- {ok, DataFd, DataHdr} = open_compaction_file(DataFile),
- {ok, MetaFd, MetaHdr} = open_compaction_file(MetaFile),
- DataHdrIsDbHdr = couch_db_header:is_header(DataHdr),
- case {DataHdr, MetaHdr} of
- {#comp_header{}=A, #comp_header{}=A} ->
- DbHeader = A#comp_header.db_header,
- Db0 = init_db(DbName, DataFile, DataFd, DbHeader, Options),
- Db1 = bind_emsort(Db0, MetaFd, A#comp_header.meta_state),
- {ok, Db1, DataFile, DataFd, MetaFd, Db0#db.id_tree};
- _ when DataHdrIsDbHdr ->
- ok = reset_compaction_file(MetaFd, couch_db_header:from(SrcHdr)),
- Db0 = init_db(DbName, DataFile, DataFd, DataHdr, Options),
- Db1 = bind_emsort(Db0, MetaFd, nil),
- {ok, Db1, DataFile, DataFd, MetaFd, Db0#db.id_tree};
- _ ->
- Header = couch_db_header:from(SrcHdr),
- ok = reset_compaction_file(DataFd, Header),
- ok = reset_compaction_file(MetaFd, Header),
- Db0 = init_db(DbName, DataFile, DataFd, Header, Options),
- Db1 = bind_emsort(Db0, MetaFd, nil),
- {ok, Db1, DataFile, DataFd, MetaFd, nil}
- end.
-
-
-open_compaction_file(FilePath) ->
- case couch_file:open(FilePath, [nologifmissing]) of
- {ok, Fd} ->
- case couch_file:read_header(Fd) of
- {ok, Header} -> {ok, Fd, Header};
- no_valid_header -> {ok, Fd, nil}
- end;
- {error, enoent} ->
- {ok, Fd} = couch_file:open(FilePath, [create]),
- {ok, Fd, nil}
- end.
-
-
-reset_compaction_file(Fd, Header) ->
- ok = couch_file:truncate(Fd, 0),
- ok = couch_file:write_header(Fd, Header).
-
-
-copy_purge_info(OldDb, NewDb) ->
- OldHdr = OldDb#db.header,
- NewHdr = NewDb#db.header,
- OldPurgeSeq = couch_db_header:purge_seq(OldHdr),
- if OldPurgeSeq > 0 ->
- {ok, PurgedIdsRevs} = couch_db:get_last_purged(OldDb),
- Opts = [{compression, NewDb#db.compression}],
- {ok, Ptr, _} = couch_file:append_term(NewDb#db.fd, PurgedIdsRevs, Opts),
- NewNewHdr = couch_db_header:set(NewHdr, [
- {purge_seq, OldPurgeSeq},
- {purged_docs, Ptr}
- ]),
- NewDb#db{header = NewNewHdr};
- true ->
- NewDb
+ ok;
+ false ->
+ couch_stats_process_tracker:track([couchdb, open_databases])
end.
-commit_compaction_data(#db{}=Db) ->
- % Compaction needs to write headers to both the data file
- % and the meta file so if we need to restart we can pick
- % back up from where we left off.
- commit_compaction_data(Db, couch_emsort:get_fd(Db#db.id_tree)),
- commit_compaction_data(Db, Db#db.fd).
-
-
-commit_compaction_data(#db{header=OldHeader}=Db0, Fd) ->
- % Mostly copied from commit_data/2 but I have to
- % replace the logic to commit and fsync to a specific
- % fd instead of the Filepath stuff that commit_data/2
- % does.
- DataState = couch_db_header:id_tree_state(OldHeader),
- MetaFd = couch_emsort:get_fd(Db0#db.id_tree),
- MetaState = couch_emsort:get_state(Db0#db.id_tree),
- Db1 = bind_id_tree(Db0, Db0#db.fd, DataState),
- Header = db_to_header(Db1, OldHeader),
- CompHeader = #comp_header{
- db_header = Header,
- meta_state = MetaState
- },
- ok = couch_file:sync(Fd),
- ok = couch_file:write_header(Fd, CompHeader),
- Db2 = Db1#db{
- waiting_delayed_commit=nil,
- header=Header,
- committed_update_seq=Db1#db.update_seq
- },
- bind_emsort(Db2, MetaFd, MetaState).
-
-
-bind_emsort(Db, Fd, nil) ->
- {ok, Ems} = couch_emsort:open(Fd),
- Db#db{id_tree=Ems};
-bind_emsort(Db, Fd, State) ->
- {ok, Ems} = couch_emsort:open(Fd, [{root, State}]),
- Db#db{id_tree=Ems}.
-
-
-bind_id_tree(Db, Fd, State) ->
- {ok, IdBtree} = couch_btree:open(State, Fd, [
- {split, fun ?MODULE:btree_by_id_split/1},
- {join, fun ?MODULE:btree_by_id_join/2},
- {reduce, fun ?MODULE:btree_by_id_reduce/2}
- ]),
- Db#db{id_tree=IdBtree}.
-
-
-sort_meta_data(Db0) ->
- {ok, Ems} = couch_emsort:merge(Db0#db.id_tree),
- Db0#db{id_tree=Ems}.
-
-
-copy_meta_data(#db{fd=Fd, header=Header}=Db) ->
- Src = Db#db.id_tree,
- DstState = couch_db_header:id_tree_state(Header),
- {ok, IdTree0} = couch_btree:open(DstState, Fd, [
- {split, fun ?MODULE:btree_by_id_split/1},
- {join, fun ?MODULE:btree_by_id_join/2},
- {reduce, fun ?MODULE:btree_by_id_reduce/2}
- ]),
- {ok, Iter} = couch_emsort:iter(Src),
- Acc0 = #merge_st{
- id_tree=IdTree0,
- seq_tree=Db#db.seq_tree,
- rem_seqs=[],
- infos=[]
- },
- Acc = merge_docids(Iter, Acc0),
- {ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Acc#merge_st.infos),
- {ok, SeqTree} = couch_btree:add_remove(
- Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
- ),
- Db#db{id_tree=IdTree, seq_tree=SeqTree}.
-
-
-merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 ->
- #merge_st{
- id_tree=IdTree0,
- seq_tree=SeqTree0,
- rem_seqs=RemSeqs
- } = Acc,
- {ok, IdTree1} = couch_btree:add(IdTree0, Infos),
- {ok, SeqTree1} = couch_btree:add_remove(SeqTree0, [], RemSeqs),
- Acc1 = Acc#merge_st{
- id_tree=IdTree1,
- seq_tree=SeqTree1,
- rem_seqs=[],
- infos=[]
- },
- merge_docids(Iter, Acc1);
-merge_docids(Iter, #merge_st{curr=Curr}=Acc) ->
- case next_info(Iter, Curr, []) of
- {NextIter, NewCurr, FDI, Seqs} ->
- Acc1 = Acc#merge_st{
- infos = [FDI | Acc#merge_st.infos],
- rem_seqs = Seqs ++ Acc#merge_st.rem_seqs,
- curr = NewCurr
- },
- merge_docids(NextIter, Acc1);
- {finished, FDI, Seqs} ->
- Acc#merge_st{
- infos = [FDI | Acc#merge_st.infos],
- rem_seqs = Seqs ++ Acc#merge_st.rem_seqs,
- curr = undefined
- };
- empty ->
- Acc
- end.
+finish_engine_swap(_OldDb, _NewEngine, _CompactFilePath) ->
+ erlang:error(explode).
-next_info(Iter, undefined, []) ->
- case couch_emsort:next(Iter) of
- {ok, {{Id, Seq}, FDI}, NextIter} ->
- next_info(NextIter, {Id, Seq, FDI}, []);
- finished ->
- empty
- end;
-next_info(Iter, {Id, Seq, FDI}, Seqs) ->
- case couch_emsort:next(Iter) of
- {ok, {{Id, NSeq}, NFDI}, NextIter} ->
- next_info(NextIter, {Id, NSeq, NFDI}, [Seq | Seqs]);
- {ok, {{NId, NSeq}, NFDI}, NextIter} ->
- {NextIter, {NId, NSeq, NFDI}, FDI, Seqs};
- finished ->
- {finished, FDI, Seqs}
- end.
-
+pair_write_info(Old, New) ->
+ lists:map(fun(FDI) ->
+ case lists:keyfind(FDI#full_doc_info.id, #full_doc_info.id, Old) of
+ #full_doc_info{} = OldFDI -> {OldFDI, FDI};
+ false -> {not_found, FDI}
+ end
+ end, New).
-update_compact_task(NumChanges) ->
- [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
- Changes2 = Changes + NumChanges,
- Progress = case Total of
- 0 ->
- 0;
- _ ->
- (Changes2 * 100) div Total
- end,
- couch_task_status:update([{changes_done, Changes2}, {progress, Progress}]).
+pair_purge_info(Old, New) ->
+ lists:map(fun(OldFDI) ->
+ case lists:keyfind(OldFDI#full_doc_info.id, #full_doc_info.id, New) of
+ #full_doc_info{} = NewFDI -> {OldFDI, NewFDI};
+ false -> {OldFDI, not_found}
+ end
+ end, Old).
-make_doc_summary(#db{compression = Comp}, {Body0, Atts0}) ->
- Body = case couch_compress:is_compressed(Body0, Comp) of
- true ->
- Body0;
- false ->
- % pre 1.2 database file format
- couch_compress:compress(Body0, Comp)
- end,
- Atts = case couch_compress:is_compressed(Atts0, Comp) of
- true ->
- Atts0;
- false ->
- couch_compress:compress(Atts0, Comp)
- end,
- SummaryBin = ?term_to_bin({Body, Atts}),
- couch_file:assemble_file_chunk(SummaryBin, couch_crypto:hash(md5, SummaryBin)).
default_security_object(<<"shards/", _/binary>>) ->
case config:get("couchdb", "default_security", "everyone") of
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/e056ae9e/src/couch_httpd_db.erl
----------------------------------------------------------------------
diff --git a/src/couch_httpd_db.erl b/src/couch_httpd_db.erl
index fe42dfe..4cf6857 100644
--- a/src/couch_httpd_db.erl
+++ b/src/couch_httpd_db.erl
@@ -217,7 +217,13 @@ handle_design_info_req(#httpd{
create_db_req(#httpd{user_ctx=UserCtx}=Req, DbName) ->
ok = couch_httpd:verify_is_server_admin(Req),
- case couch_server:create(DbName, [{user_ctx, UserCtx}]) of
+ Engine = case couch_httpd:qs_value(Req, "engine") of
+ EngineStr when is_list(EngineStr) ->
+ [{engine, iolist_to_binary(EngineStr)}];
+ _ ->
+ []
+ end,
+ case couch_server:create(DbName, [{user_ctx, UserCtx}] ++ Engine) of
{ok, Db} ->
couch_db:close(Db),
DbUrl = absolute_uri(Req, "/" ++ couch_util:url_encode(DbName)),
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/e056ae9e/src/couch_httpd_misc_handlers.erl
----------------------------------------------------------------------
diff --git a/src/couch_httpd_misc_handlers.erl b/src/couch_httpd_misc_handlers.erl
index eb75a94..3fc4d9a 100644
--- a/src/couch_httpd_misc_handlers.erl
+++ b/src/couch_httpd_misc_handlers.erl
@@ -17,8 +17,6 @@
handle_uuids_req/1,handle_config_req/1,
handle_task_status_req/1, handle_file_req/2]).
--export([increment_update_seq_req/2]).
-
-include_lib("couch/include/couch_db.hrl").
@@ -310,14 +308,3 @@ handle_approved_config_req(#httpd{method='DELETE',path_parts=[_,Section,Key]}=Re
send_json(Req, 200, list_to_binary(OldValue))
end.
-
-% httpd db handlers
-
-increment_update_seq_req(#httpd{method='POST'}=Req, Db) ->
- couch_httpd:validate_ctype(Req, "application/json"),
- {ok, NewSeq} = couch_db:increment_update_seq(Db),
- send_json(Req, {[{ok, true},
- {update_seq, NewSeq}
- ]});
-increment_update_seq_req(Req, _Db) ->
- send_method_not_allowed(Req, "POST").
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/e056ae9e/src/couch_server.erl
----------------------------------------------------------------------
diff --git a/src/couch_server.erl b/src/couch_server.erl
index f365718..2c77e98 100644
--- a/src/couch_server.erl
+++ b/src/couch_server.erl
@@ -21,6 +21,8 @@
-export([handle_cast/2,code_change/3,handle_info/2,terminate/2]).
-export([dev_start/0,is_admin/2,has_admins/0,get_stats/0]).
-export([close_lru/0]).
+-export([delete_compaction_files/1]).
+-export([exists/1]).
% config_listener api
-export([handle_config_change/5, handle_config_terminate/3]).
@@ -33,6 +35,7 @@
-record(server,{
root_dir = [],
+ engines = [],
max_dbs_open=?MAX_DBS_OPEN,
dbs_open=0,
start_time="",
@@ -118,6 +121,35 @@ create(DbName, Options0) ->
delete(DbName, Options) ->
gen_server:call(couch_server, {delete, DbName, Options}, infinity).
+
+exists(DbName) ->
+ RootDir = config:get("couchdb", "database_dir", "."),
+ Engines = get_configured_engines(),
+ Possible = lists:foldl(fun({Extension, Engine}, Acc) ->
+ Path = make_filepath(RootDir, DbName, Extension),
+ case couch_db_engine:exists(Engine, Path) of
+ true ->
+ [{Engine, Path} | Acc];
+ false ->
+ Acc
+ end
+ end, [], Engines),
+ Possible /= [].
+
+
+delete_compaction_files(DbName) ->
+ delete_compaction_files(DbName, []).
+
+delete_compaction_files(DbName, DelOpts) when is_list(DbName) ->
+ RootDir = config:get("couchdb", "database_dir", "."),
+ lists:foreach(fun({Ext, Engine}) ->
+ FPath = make_filepath(RootDir, DbName, Ext),
+ couch_db_engine:delete_compaction_files(Engine, RootDir, FPath, DelOpts)
+ end, get_configured_engines()),
+ ok;
+delete_compaction_files(DbName, DelOpts) when is_binary(DbName) ->
+ delete_compaction_files(?b2l(DbName), DelOpts).
+
maybe_add_sys_db_callbacks(DbName, Options) when is_binary(DbName) ->
maybe_add_sys_db_callbacks(?b2l(DbName), Options);
maybe_add_sys_db_callbacks(DbName, Options) ->
@@ -165,9 +197,6 @@ is_admin(User, ClearPwd) ->
has_admins() ->
config:get("admins") /= [].
-get_full_filename(Server, DbName) ->
- filename:join([Server#server.root_dir, "./" ++ DbName ++ ".couch"]).
-
hash_admin_passwords() ->
hash_admin_passwords(true).
@@ -185,6 +214,7 @@ init([]) ->
% will restart us and then we will pick up the new settings.
RootDir = config:get("couchdb", "database_dir", "."),
+ Engines = get_configured_engines(),
MaxDbsOpen = list_to_integer(
config:get("couchdb", "max_dbs_open", integer_to_list(?MAX_DBS_OPEN))),
UpdateLruOnRead =
@@ -196,6 +226,7 @@ init([]) ->
ets:new(couch_dbs_pid_to_name, [set, protected, named_table]),
process_flag(trap_exit, true),
{ok, #server{root_dir=RootDir,
+ engines = Engines,
max_dbs_open=MaxDbsOpen,
update_lru_on_read=UpdateLruOnRead,
start_time=couch_util:rfc1123_date()}}.
@@ -220,6 +251,8 @@ handle_config_change("couchdb", "max_dbs_open", Max, _, _) when is_list(Max) ->
{ok, gen_server:call(couch_server,{set_max_dbs_open,list_to_integer(Max)})};
handle_config_change("couchdb", "max_dbs_open", _, _, _) ->
{ok, gen_server:call(couch_server,{set_max_dbs_open,?MAX_DBS_OPEN})};
+handle_config_change("couchdb_engines", _, _, _, _) ->
+ {ok, gen_server:call(couch_server,reload_engines)};
handle_config_change("admins", _, _, Persist, _) ->
% spawn here so couch event manager doesn't deadlock
{ok, spawn(fun() -> hash_admin_passwords(Persist) end)};
@@ -254,11 +287,15 @@ all_databases() ->
all_databases(Fun, Acc0) ->
{ok, #server{root_dir=Root}} = gen_server:call(couch_server, get_server),
NormRoot = couch_util:normpath(Root),
- FinalAcc = try
- filelib:fold_files(Root,
+ Extensions = get_engine_extensions(),
+ ExtRegExp = "(" ++ string:join(Extensions, "|") ++ ")",
+ RegExp =
"^[a-z0-9\\_\\$()\\+\\-]*" % stock CouchDB name regex
"(\\.[0-9]{10,})?" % optional shard timestamp
- "\\.couch$", % filename extension
+ "\\." ++ ExtRegExp ++ "$", % filename extension
+ FinalAcc = try
+ couch_util:fold_files(Root,
+ RegExp,
true,
fun(Filename, AccIn) ->
NormFilename = couch_util:normpath(Filename),
@@ -266,7 +303,8 @@ all_databases(Fun, Acc0) ->
[$/ | RelativeFilename] -> ok;
RelativeFilename -> ok
end,
- case Fun(couch_util:drop_dot_couch_ext(?l2b(RelativeFilename)), AccIn) of
+ Ext = filename:extension(RelativeFilename),
+ case Fun(?l2b(filename:rootname(RelativeFilename, Ext)), AccIn) of
{ok, NewAcc} -> NewAcc;
{stop, NewAcc} -> throw({stop, Fun, NewAcc})
end
@@ -293,11 +331,11 @@ maybe_close_lru_db(#server{lru=Lru}=Server) ->
{error, all_dbs_active}
end.
-open_async(Server, From, DbName, Filepath, Options) ->
+open_async(Server, From, DbName, {Module, Filepath}, Options) ->
Parent = self(),
T0 = os:timestamp(),
Opener = spawn_link(fun() ->
- Res = couch_db:start_link(DbName, Filepath, Options),
+ Res = couch_db:start_link(Module, DbName, Filepath, Options),
case {Res, lists:member(create, Options)} of
{{ok, _Db}, true} ->
couch_event:notify(DbName, created);
@@ -334,6 +372,8 @@ handle_call({set_update_lru_on_read, UpdateOnRead}, _From, Server) ->
{reply, ok, Server#server{update_lru_on_read=UpdateOnRead}};
handle_call({set_max_dbs_open, Max}, _From, Server) ->
{reply, ok, Server#server{max_dbs_open=Max}};
+handle_call(reload_engines, _From, Server) ->
+ {reply, ok, Server#server{engines = get_configured_engines()}};
handle_call(get_server, _From, Server) ->
{reply, {ok, Server}, Server};
handle_call({open_result, T0, DbName, {ok, Db}}, {FromPid, _Tag}, Server) ->
@@ -351,7 +391,7 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {FromPid, _Tag}, Server) ->
[gen_server:reply(Waiter, {ok, Db}) || Waiter <- Waiters],
% Cancel the creation request if it exists.
case ReqType of
- {create, DbName, _Filepath, _Options, CrFrom} ->
+ {create, DbName, _Engine, _Options, CrFrom} ->
gen_server:reply(CrFrom, file_exists);
_ ->
ok
@@ -386,8 +426,8 @@ handle_call({open_result, _T0, DbName, Error}, {FromPid, _Tag}, Server) ->
true = ets:delete(couch_dbs, DbName),
true = ets:delete(couch_dbs_pid_to_name, FromPid),
NewServer = case ReqType of
- {create, DbName, Filepath, Options, CrFrom} ->
- open_async(Server, CrFrom, DbName, Filepath, Options);
+ {create, DbName, Engine, Options, CrFrom} ->
+ open_async(Server, CrFrom, DbName, Engine, Options);
_ ->
Server
end,
@@ -401,8 +441,8 @@ handle_call({open, DbName, Options}, From, Server) ->
ok ->
case make_room(Server, Options) of
{ok, Server2} ->
- Filepath = get_full_filename(Server, DbNameList),
- {noreply, open_async(Server2, From, DbName, Filepath, Options)};
+ Engine = get_engine(Server, DbNameList),
+ {noreply, open_async(Server2, From, DbName, Engine, Options)};
CloseError ->
{reply, CloseError, Server}
end;
@@ -421,14 +461,14 @@ handle_call({open, DbName, Options}, From, Server) ->
end;
handle_call({create, DbName, Options}, From, Server) ->
DbNameList = binary_to_list(DbName),
- Filepath = get_full_filename(Server, DbNameList),
+ Engine = get_engine(Server, DbNameList, Options),
case check_dbname(Server, DbNameList) of
ok ->
case ets:lookup(couch_dbs, DbName) of
[] ->
case make_room(Server, Options) of
{ok, Server2} ->
- {noreply, open_async(Server2, From, DbName, Filepath,
+ {noreply, open_async(Server2, From, DbName, Engine,
[create | Options])};
CloseError ->
{reply, CloseError, Server}
@@ -438,7 +478,7 @@ handle_call({create, DbName, Options}, From, Server) ->
% the middle of trying to open it. We allow one creator
% to wait while we figure out if it'll succeed.
CrOptions = [create | Options],
- Req = {create, DbName, Filepath, CrOptions, From},
+ Req = {create, DbName, Engine, CrOptions, From},
true = ets:insert(couch_dbs, Entry#entry{req_type = Req}),
{noreply, Server};
[_AlreadyRunningDb] ->
@@ -451,7 +491,6 @@ handle_call({delete, DbName, Options}, _From, Server) ->
DbNameList = binary_to_list(DbName),
case check_dbname(Server, DbNameList) of
ok ->
- FullFilepath = get_full_filename(Server, DbNameList),
Server2 =
case ets:lookup(couch_dbs, DbName) of
[] -> Server;
@@ -468,18 +507,16 @@ handle_call({delete, DbName, Options}, _From, Server) ->
db_closed(Server, Entry#entry.db_options)
end,
- %% Delete any leftover compaction files. If we don't do this a
- %% subsequent request for this DB will try to open them to use
- %% as a recovery.
- lists:foreach(fun(Ext) ->
- couch_file:delete(Server#server.root_dir, FullFilepath ++ Ext)
- end, [".compact", ".compact.data", ".compact.meta"]),
- couch_file:delete(Server#server.root_dir, FullFilepath ++ ".compact"),
-
couch_db_plugin:on_delete(DbName, Options),
DelOpt = [{context, delete} | Options],
- case couch_file:delete(Server#server.root_dir, FullFilepath, DelOpt) of
+
+ % Make sure and remove all compaction data
+ delete_compaction_files(DbNameList, DelOpt),
+
+ {Engine, FilePath} = get_engine(Server, DbNameList),
+ RootDir = Server#server.root_dir,
+ case couch_db_engine:delete(Engine, RootDir, FilePath, DelOpt) of
ok ->
couch_event:notify(DbName, deleted),
{reply, ok, Server2};
@@ -558,6 +595,106 @@ db_closed(Server, Options) ->
true -> Server
end.
+
+get_configured_engines() ->
+ ConfigEntries = config:get("couchdb_engines"),
+ Engines = lists:flatmap(fun({Extension, ModuleStr}) ->
+ try
+ [{Extension, list_to_atom(ModuleStr)}]
+ catch _T:_R ->
+ []
+ end
+ end, ConfigEntries),
+ case Engines of
+ [] ->
+ [{"couch", couch_bt_engine}];
+ Else ->
+ Else
+ end.
+
+
+get_engine(Server, DbName, Options) ->
+ #server{
+ root_dir = RootDir,
+ engines = Engines
+ } = Server,
+ case couch_util:get_value(engine, Options) of
+ Ext when is_binary(Ext) ->
+ ExtStr = binary_to_list(Ext),
+ case couch_util:get_value(ExtStr, Engines) of
+ Engine when is_atom(Engine) ->
+ Path = make_filepath(RootDir, DbName, ExtStr),
+ {Engine, Path};
+ _ ->
+ get_engine(Server, DbName)
+ end;
+ _ ->
+ get_engine(Server, DbName)
+ end.
+
+
+get_engine(Server, DbName) ->
+ #server{
+ root_dir = RootDir,
+ engines = Engines
+ } = Server,
+ Possible = lists:foldl(fun({Extension, Engine}, Acc) ->
+ Path = make_filepath(RootDir, DbName, Extension),
+ case couch_db_engine:exists(Engine, Path) of
+ true ->
+ [{Engine, Path} | Acc];
+ false ->
+ Acc
+ end
+ end, [], Engines),
+ case Possible of
+ [] ->
+ get_default_engine(Server, DbName);
+ [Engine] ->
+ Engine;
+ _ ->
+ erlang:error(engine_conflict)
+ end.
+
+
+get_default_engine(Server, DbName) ->
+ #server{
+ root_dir = RootDir,
+ engines = Engines
+ } = Server,
+ Default = {couch_bt_engine, make_filepath(RootDir, DbName, "couch")},
+ case config:get("couchdb", "default_engine") of
+ Extension when is_list(Extension) ->
+ case lists:keyfind(Extension, 1, Engines) of
+ {Extension, Module} ->
+ {Module, make_filepath(RootDir, DbName, Extension)};
+ false ->
+ Default
+ end;
+ _ ->
+ Default
+ end.
+
+
+make_filepath(RootDir, DbName, Extension) when is_binary(RootDir) ->
+ make_filepath(binary_to_list(RootDir), DbName, Extension);
+make_filepath(RootDir, DbName, Extension) when is_binary(DbName) ->
+ make_filepath(RootDir, binary_to_list(DbName), Extension);
+make_filepath(RootDir, DbName, Extension) when is_binary(Extension) ->
+ make_filepath(RootDir, DbName, binary_to_list(Extension));
+make_filepath(RootDir, DbName, Extension) ->
+ filename:join([RootDir, "./" ++ DbName ++ "." ++ Extension]).
+
+
+get_engine_extensions() ->
+ case config:get("couchdb_engines") of
+ [] ->
+ ["couch"];
+ Entries ->
+ [Ext || {Ext, _Mod} <- Entries]
+ end.
+
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/e056ae9e/src/couch_stream.erl
----------------------------------------------------------------------
diff --git a/src/couch_stream.erl b/src/couch_stream.erl
index 913977f..1980246 100644
--- a/src/couch_stream.erl
+++ b/src/couch_stream.erl
@@ -14,21 +14,40 @@
-behaviour(gen_server).
-vsn(1).
-% public API
--export([open/1, open/2, close/1]).
--export([foldl/4, foldl/5, foldl_decode/6, range_foldl/6]).
--export([copy_to_new_stream/3, write/2]).
-% gen_server callbacks
--export([init/1, terminate/2, code_change/3]).
--export([handle_cast/2, handle_call/3, handle_info/2]).
+-export([
+ open/1,
+ open/2,
+ close/1,
+
+ copy/2,
+ write/2,
+ to_disk_term/1,
+
+ foldl/3,
+ foldl/4,
+ foldl_decode/5,
+ range_foldl/5
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
-include_lib("couch/include/couch_db.hrl").
+
-define(DEFAULT_BUFFER_SIZE, 4096).
--record(stream,
- {fd = 0,
+
+-record(stream, {
+ engine,
opener_monitor,
written_pointers=[],
buffer_list = [],
@@ -42,114 +61,94 @@
identity_len = 0,
encoding_fun,
end_encoding_fun
- }).
+}).
-%%% Interface functions %%%
+open({_StreamEngine, _StreamEngineState} = Engine) ->
+ open(Engine, []).
-open(Fd) ->
- open(Fd, []).
-open(Fd, Options) ->
- gen_server:start_link(couch_stream, {Fd, self(), erlang:get(io_priority), Options}, []).
+open({_StreamEngine, _StreamEngineState} = Engine, Options) ->
+ gen_server:start_link(?MODULE, {Engine, self(), erlang:get(io_priority), Options}, []).
+
close(Pid) ->
gen_server:call(Pid, close, infinity).
-copy_to_new_stream(Fd, PosList, DestFd) ->
- {ok, Dest} = open(DestFd),
- foldl(Fd, PosList,
- fun(Bin, _) ->
- ok = write(Dest, Bin)
- end, ok),
- close(Dest).
-
-foldl(_Fd, [], _Fun, Acc) ->
- Acc;
-foldl(Fd, [Pos|Rest], Fun, Acc) ->
- {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
- foldl(Fd, Rest, Fun, Fun(Bin, Acc)).
-
-foldl(Fd, PosList, <<>>, Fun, Acc) ->
- foldl(Fd, PosList, Fun, Acc);
-foldl(Fd, PosList, Md5, Fun, Acc) ->
- foldl(Fd, PosList, Md5, couch_crypto:hash_init(md5), Fun, Acc).
-
-foldl_decode(Fd, PosList, Md5, Enc, Fun, Acc) ->
+
+copy(Src, Dst) ->
+ foldl(Src, fun(Bin, _) ->
+ ok = write(Dst, Bin)
+ end, ok).
+
+
+write(_Pid, <<>>) ->
+ ok;
+write(Pid, Bin) ->
+ gen_server:call(Pid, {write, Bin}, infinity).
+
+
+to_disk_term({Engine, EngineState}) ->
+ Engine:to_disk_term(EngineState).
+
+
+foldl({Engine, EngineState}, Fun, Acc) ->
+ Engine:foldl(EngineState, Fun, Acc).
+
+
+foldl(Engine, <<>>, Fun, Acc) ->
+ foldl(Engine, Fun, Acc);
+foldl(Engine, Md5, UserFun, UserAcc) ->
+ InitAcc = {couch_crypto:hash_init(md5), UserFun, UserAcc},
+ {Md5Acc, _, OutAcc} = foldl(Engine, fun foldl_md5/2, InitAcc),
+ Md5 = couch_crypto:hash_final(md5, Md5Acc),
+ OutAcc.
+
+
+foldl_decode(Engine, Md5, Enc, UserFun, UserAcc1) ->
{DecDataFun, DecEndFun} = case Enc of
- gzip ->
- ungzip_init();
- identity ->
- identity_enc_dec_funs()
+ gzip -> ungzip_init();
+ identity -> identity_enc_dec_funs()
end,
- Result = foldl_decode(
- DecDataFun, Fd, PosList, Md5, couch_crypto:hash_init(md5), Fun, Acc
- ),
+ InitAcc = {DecDataFun, UserFun, UserAcc1},
+ {_, _, UserAcc2} = foldl(Engine, Md5, fun foldl_decode/2, InitAcc),
DecEndFun(),
- Result.
+ UserAcc2.
+
+
+range_foldl(Engine, From, To, UserFun, UserAcc) when To >= From ->
+ NewEngine = do_seek(Engine, From),
+ InitAcc = {To - From, UserFun, UserAcc},
+ try
+ {_, _, UserAcc2} = foldl(NewEngine, fun foldl_length/2, InitAcc),
+ UserAcc2
+ catch
+ throw:{finished, UserAcc3} ->
+ UserAcc3
+ end.
-foldl(_Fd, [], Md5, Md5Acc, _Fun, Acc) ->
- Md5 = couch_crypto:hash_final(md5, Md5Acc),
- Acc;
-foldl(Fd, [{Pos, _Size}], Md5, Md5Acc, Fun, Acc) -> % 0110 UPGRADE CODE
- foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc);
-foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc) ->
- {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
- Md5 = couch_crypto:hash_final(md5, couch_crypto:hash_update(md5, Md5Acc, Bin)),
- Fun(Bin, Acc);
-foldl(Fd, [{Pos, _Size}|Rest], Md5, Md5Acc, Fun, Acc) ->
- foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc);
-foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
- {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
- foldl(Fd, Rest, Md5, couch_crypto:hash_update(md5, Md5Acc, Bin), Fun, Fun(Bin, Acc)).
-
-range_foldl(Fd, PosList, From, To, Fun, Acc) ->
- range_foldl(Fd, PosList, From, To, 0, Fun, Acc).
-
-range_foldl(_Fd, _PosList, _From, To, Off, _Fun, Acc) when Off >= To ->
- Acc;
-range_foldl(Fd, [Pos|Rest], From, To, Off, Fun, Acc) when is_integer(Pos) -> % old-style attachment
- {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
- range_foldl(Fd, [{Pos, iolist_size(Bin)}] ++ Rest, From, To, Off, Fun, Acc);
-range_foldl(Fd, [{_Pos, Size}|Rest], From, To, Off, Fun, Acc) when From > Off + Size ->
- range_foldl(Fd, Rest, From, To, Off + Size, Fun, Acc);
-range_foldl(Fd, [{Pos, Size}|Rest], From, To, Off, Fun, Acc) ->
- {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
- Bin1 = if
- From =< Off andalso To >= Off + Size -> Bin; %% the whole block is covered
- true ->
- PrefixLen = clip(From - Off, 0, Size),
- PostfixLen = clip(Off + Size - To, 0, Size),
- MatchLen = Size - PrefixLen - PostfixLen,
- <<_Prefix:PrefixLen/binary,Match:MatchLen/binary,_Postfix:PostfixLen/binary>> = iolist_to_binary(Bin),
- Match
- end,
- range_foldl(Fd, Rest, From, To, Off + Size, Fun, Fun(Bin1, Acc)).
-clip(Value, Lo, Hi) ->
- if
- Value < Lo -> Lo;
- Value > Hi -> Hi;
- true -> Value
+foldl_md5(Bin, {Md5Acc, UserFun, UserAcc}) ->
+ NewMd5Acc = couch_crypto:hash_update(md5, Md5Acc, Bin),
+ {NewMd5Acc, UserFun, UserFun(Bin, UserAcc)}.
+
+
+foldl_decode(EncBin, {DecFun, UserFun, UserAcc}) ->
+ case DecFun(EncBin) of
+ <<>> -> {DecFun, UserFun, UserAcc};
+ Dec -> {DecFun, UserFun, UserFun(Dec, UserAcc)}
end.
-foldl_decode(_DecFun, _Fd, [], Md5, Md5Acc, _Fun, Acc) ->
- Md5 = couch_crypto:hash_final(md5, Md5Acc),
- Acc;
-foldl_decode(DecFun, Fd, [{Pos, _Size}], Md5, Md5Acc, Fun, Acc) ->
- foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc);
-foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc) ->
- {ok, EncBin} = couch_file:pread_iolist(Fd, Pos),
- Md5 = couch_crypto:hash_final(md5, couch_crypto:hash_update(md5, Md5Acc, EncBin)),
- Bin = DecFun(EncBin),
- Fun(Bin, Acc);
-foldl_decode(DecFun, Fd, [{Pos, _Size}|Rest], Md5, Md5Acc, Fun, Acc) ->
- foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc);
-foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
- {ok, EncBin} = couch_file:pread_iolist(Fd, Pos),
- Bin = DecFun(EncBin),
- Md5Acc2 = couch_crypto:hash_update(md5, Md5Acc, EncBin),
- foldl_decode(DecFun, Fd, Rest, Md5, Md5Acc2, Fun, Fun(Bin, Acc)).
+
+foldl_length(Bin, {Length, UserFun, UserAcc}) ->
+ BinSize = size(Bin),
+ case BinSize =< Length of
+ true ->
+ {Length - BinSize, UserFun, UserFun(Bin, UserAcc)};
+ false ->
+ <<Trunc:BinSize/binary, _/binary>> = Bin,
+ throw({finished, UserFun(Trunc, UserAcc)})
+ end.
gzip_init(Options) ->
case couch_util:get_value(compression_level, Options, 0) of
@@ -192,23 +191,16 @@ identity_enc_dec_funs() ->
fun() -> [] end
}.
-write(_Pid, <<>>) ->
- ok;
-write(Pid, Bin) ->
- gen_server:call(Pid, {write, Bin}, infinity).
-
-init({Fd, OpenerPid, OpenerPriority, Options}) ->
+init({Engine, OpenerPid, OpenerPriority, Options}) ->
erlang:put(io_priority, OpenerPriority),
{EncodingFun, EndEncodingFun} =
case couch_util:get_value(encoding, Options, identity) of
- identity ->
- identity_enc_dec_funs();
- gzip ->
- gzip_init(Options)
+ identity -> identity_enc_dec_funs();
+ gzip -> gzip_init(Options)
end,
{ok, #stream{
- fd=Fd,
+ engine=Engine,
opener_monitor=erlang:monitor(process, OpenerPid),
md5=couch_crypto:hash_init(md5),
identity_md5=couch_crypto:hash_init(md5),
@@ -225,9 +217,8 @@ terminate(_Reason, _Stream) ->
handle_call({write, Bin}, _From, Stream) ->
BinSize = iolist_size(Bin),
#stream{
- fd = Fd,
+ engine = Engine,
written_len = WrittenLen,
- written_pointers = Written,
buffer_len = BufferLen,
buffer_list = Buffer,
max_buffer = Max,
@@ -242,19 +233,18 @@ handle_call({write, Bin}, _From, Stream) ->
[] ->
% case where the encoder did some internal buffering
% (zlib does it for example)
+ NewEngine = Engine,
WrittenLen2 = WrittenLen,
- Md5_2 = Md5,
- Written2 = Written;
+ Md5_2 = Md5;
WriteBin2 ->
- {ok, Pos, _} = couch_file:append_binary(Fd, WriteBin2),
+ NewEngine = do_write(Engine, WriteBin2),
WrittenLen2 = WrittenLen + iolist_size(WriteBin2),
- Md5_2 = couch_crypto:hash_update(md5, Md5, WriteBin2),
- Written2 = [{Pos, iolist_size(WriteBin2)}|Written]
+ Md5_2 = couch_crypto:hash_update(md5, Md5, WriteBin2)
end,
{reply, ok, Stream#stream{
+ engine = NewEngine,
written_len=WrittenLen2,
- written_pointers=Written2,
buffer_list=[],
buffer_len=0,
md5=Md5_2,
@@ -268,10 +258,9 @@ handle_call({write, Bin}, _From, Stream) ->
end;
handle_call(close, _From, Stream) ->
#stream{
- fd = Fd,
+ engine = Engine,
opener_monitor = MonRef,
written_len = WrittenLen,
- written_pointers = Written,
buffer_list = Buffer,
md5 = Md5,
identity_md5 = IdenMd5,
@@ -285,12 +274,11 @@ handle_call(close, _From, Stream) ->
Md5Final = couch_crypto:hash_final(md5, couch_crypto:hash_update(md5, Md5, WriteBin2)),
Result = case WriteBin2 of
[] ->
- {lists:reverse(Written), WrittenLen, IdenLen, Md5Final, IdenMd5Final};
+ {do_finalize(Engine), WrittenLen, IdenLen, Md5Final, IdenMd5Final};
_ ->
- {ok, Pos, _} = couch_file:append_binary(Fd, WriteBin2),
- StreamInfo = lists:reverse(Written, [{Pos, iolist_size(WriteBin2)}]),
+ NewEngine = do_write(Engine, WriteBin2),
StreamLen = WrittenLen + iolist_size(WriteBin2),
- {StreamInfo, StreamLen, IdenLen, Md5Final, IdenMd5Final}
+ {do_finalize(NewEngine), StreamLen, IdenLen, Md5Final, IdenMd5Final}
end,
erlang:demonitor(MonRef),
{stop, normal, Result, Stream}.
@@ -305,3 +293,17 @@ handle_info({'DOWN', Ref, _, _, _}, #stream{opener_monitor=Ref} = State) ->
{stop, normal, State};
handle_info(_Info, State) ->
{noreply, State}.
+
+
+do_seek({Engine, EngineState}, Offset) ->
+ {ok, NewState} = Engine:seek(EngineState, Offset),
+ {Engine, NewState}.
+
+do_write({Engine, EngineState}, Data) ->
+ {ok, NewState} = Engine:write(EngineState, Data),
+ {Engine, NewState}.
+
+do_finalize({Engine, EngineState}) ->
+ {ok, NewState} = Engine:finalize(EngineState),
+ {Engine, NewState}.
+
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/e056ae9e/src/couch_util.erl
----------------------------------------------------------------------
diff --git a/src/couch_util.erl b/src/couch_util.erl
index d688c12..dc2ef64 100644
--- a/src/couch_util.erl
+++ b/src/couch_util.erl
@@ -12,7 +12,7 @@
-module(couch_util).
--export([priv_dir/0, normpath/1]).
+-export([priv_dir/0, normpath/1, fold_files/5]).
-export([should_flush/0, should_flush/1, to_existing_atom/1]).
-export([rand32/0, implode/2, collate/2, collate/3]).
-export([abs_pathname/1,abs_pathname/2, trim/1, drop_dot_couch_ext/1]).
@@ -33,6 +33,7 @@
-export([find_in_binary/2]).
-export([callback_exists/3, validate_callback_exists/3]).
-export([with_proc/4]).
+-export([check_md5/2]).
-include_lib("couch/include/couch_db.hrl").
@@ -62,6 +63,37 @@ normparts(["." | RestParts], Acc) ->
normparts([Part | RestParts], Acc) ->
normparts(RestParts, [Part | Acc]).
+
+fold_files(Dir, RegExp, Recursive, Fun, Acc) ->
+ {ok, Re} = re:compile(RegExp, [unicode]),
+ fold_files1(Dir, Re, Recursive, Fun, Acc).
+
+fold_files1(Dir, RegExp, Recursive, Fun, Acc) ->
+ case file:list_dir(Dir) of
+ {ok, Files} ->
+ fold_files2(Files, Dir, RegExp, Recursive, Fun, Acc);
+ {error, _} ->
+ Acc
+ end.
+
+fold_files2([], _Dir, _RegExp, _Recursive, _Fun, Acc) ->
+ Acc;
+fold_files2([File | Rest], Dir, RegExp, Recursive, Fun, Acc0) ->
+ FullName = filename:join(Dir, File),
+ case (catch re:run(File, RegExp, [{capture, none}])) of
+ match ->
+ Acc1 = Fun(FullName, Acc0),
+ fold_files2(Rest, Dir, RegExp, Recursive, Fun, Acc1);
+ _ ->
+ case Recursive andalso filelib:is_dir(FullName) of
+ true ->
+ Acc1 = fold_files1(FullName, RegExp, Recursive, Fun, Acc0),
+ fold_files2(Rest, Dir, RegExp, Recursive, Fun, Acc1);
+ false ->
+ fold_files2(Rest, Dir, RegExp, Recursive, Fun, Acc0)
+ end
+ end.
+
% works like list_to_existing_atom, except can be list or binary and it
% gives you the original value instead of an error if no existing atom.
to_existing_atom(V) when is_list(V) ->
@@ -578,6 +610,12 @@ validate_callback_exists(Module, Function, Arity) ->
{undefined_callback, CallbackStr, {Module, Function, Arity}}})
end.
+
+check_md5(_NewSig, <<>>) -> ok;
+check_md5(Sig, Sig) -> ok;
+check_md5(_, _) -> throw(md5_mismatch).
+
+
ensure_loaded(Module) when is_atom(Module) ->
case code:ensure_loaded(Module) of
{module, Module} ->
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/e056ae9e/src/test_util.erl
----------------------------------------------------------------------
diff --git a/src/test_util.erl b/src/test_util.erl
index b5bb232..6bf7b46 100644
--- a/src/test_util.erl
+++ b/src/test_util.erl
@@ -34,6 +34,8 @@
-export([start/1, start/2, start/3, stop/1]).
+-export([fake_db/1]).
+
-record(test_context, {mocked = [], started = [], module}).
-define(DEFAULT_APPS,
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/e056ae9e/test/couch_db_plugin_tests.erl
----------------------------------------------------------------------
diff --git a/test/couch_db_plugin_tests.erl b/test/couch_db_plugin_tests.erl
index 94dd3df..52533fe 100644
--- a/test/couch_db_plugin_tests.erl
+++ b/test/couch_db_plugin_tests.erl
@@ -43,7 +43,7 @@ data_providers() -> [].
data_subscriptions() -> [].
processes() -> [].
notify(_, _, _) -> ok.
-fake_db() -> element(2, couch_db:clustered_db(fake, totes_fake)).
+fake_db() -> test_util:fake_db([]).
setup() ->
couch_tests:setup([
[2/5] couch commit: updated
refs/heads/COUCHDB-3287-pluggable-storage-engines to 0f4e1a7
Posted by da...@apache.org.
Add storage engine test suite
This allows other storage engine implementations to reuse the same exact
test suite without having to resort to shenanigans like keeping vendored
copies up to date.
COUCHDB-3287
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/414880e1
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/414880e1
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/414880e1
Branch: refs/heads/COUCHDB-3287-pluggable-storage-engines
Commit: 414880e1a80bc409a86ffdcc02fd90bf8bd014ed
Parents: e056ae9
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Feb 5 12:21:39 2016 -0600
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Thu Mar 2 11:03:55 2017 -0600
----------------------------------------------------------------------
.gitignore | 5 +
src/test_engine_attachments.erl | 85 ++++
src/test_engine_compaction.erl | 181 +++++++++
src/test_engine_fold_changes.erl | 190 +++++++++
src/test_engine_fold_docs.erl | 390 +++++++++++++++++++
src/test_engine_get_set_props.erl | 70 ++++
src/test_engine_open_close_delete.erl | 81 ++++
src/test_engine_purge_docs.erl | 158 ++++++++
src/test_engine_read_write_docs.erl | 317 +++++++++++++++
src/test_engine_ref_counting.erl | 112 ++++++
src/test_engine_util.erl | 602 +++++++++++++++++++++++++++++
test/couch_bt_engine_tests.erl | 20 +
12 files changed, 2211 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/414880e1/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 30aa173..73fb0b6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -11,5 +11,10 @@ priv/*.dll
priv/*.exe
vc120.pdb
+test/engines/coverage/
+test/engines/data/
+test/engines/etc/
+test/engines/log/
+
.rebar/
.eunit
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/414880e1/src/test_engine_attachments.erl
----------------------------------------------------------------------
diff --git a/src/test_engine_attachments.erl b/src/test_engine_attachments.erl
new file mode 100644
index 0000000..a19322d
--- /dev/null
+++ b/src/test_engine_attachments.erl
@@ -0,0 +1,85 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(test_engine_attachments).
+-compile(export_all).
+
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+cet_write_attachment() ->
+ {ok, Engine, DbPath, St1} = test_engine_util:init_engine(dbpath),
+
+ AttBin = crypto:rand_bytes(32768),
+
+ [Att0] = test_engine_util:prep_atts(Engine, St1, [
+ {<<"ohai.txt">>, AttBin}
+ ]),
+
+ {stream, Stream} = couch_att:fetch(data, Att0),
+ ?assertEqual(true, Engine:is_active_stream(St1, Stream)),
+
+ Actions = [{create, {<<"first">>, [], [Att0]}}],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+ {ok, St3} = Engine:commit_data(St2),
+ Engine:terminate(normal, St3),
+
+ {ok, St4} = Engine:init(DbPath, []),
+ [FDI] = Engine:open_docs(St4, [<<"first">>]),
+
+ #rev_info{
+ rev = {RevPos, PrevRevId},
+ deleted = Deleted,
+ body_sp = DocPtr
+ } = test_engine_util:prev_rev(FDI),
+
+ Doc0 = #doc{
+ id = <<"foo">>,
+ revs = {RevPos, [PrevRevId]},
+ deleted = Deleted,
+ body = DocPtr
+ },
+
+ Doc1 = Engine:read_doc_body(St4, Doc0),
+ Atts1 = if not is_binary(Doc1#doc.atts) -> Doc1#doc.atts; true ->
+ couch_compress:decompress(Doc1#doc.atts)
+ end,
+
+ StreamSrc = fun(Sp) -> Engine:open_read_stream(St4, Sp) end,
+ [Att1] = [couch_att:from_disk_term(StreamSrc, T) || T <- Atts1],
+ ReadBin = couch_att:to_binary(Att1),
+ ?assertEqual(AttBin, ReadBin).
+
+
+% N.B. This test may be overly specific for some theoretical
+% storage engines that don't re-initialize their
+% attachments streams when restarting (for instance if
+% we ever have something that stores attachemnts in
+% an external object store)
+cet_inactive_stream() ->
+ {ok, Engine, DbPath, St1} = test_engine_util:init_engine(dbpath),
+
+ AttBin = crypto:rand_bytes(32768),
+
+ [Att0] = test_engine_util:prep_atts(Engine, St1, [
+ {<<"ohai.txt">>, AttBin}
+ ]),
+
+ {stream, Stream} = couch_att:fetch(data, Att0),
+ ?assertEqual(true, Engine:is_active_stream(St1, Stream)),
+
+ Engine:terminate(normal, St1),
+ {ok, St2} = Engine:init(DbPath, []),
+
+ ?assertEqual(false, Engine:is_active_stream(St2, Stream)).
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/414880e1/src/test_engine_compaction.erl
----------------------------------------------------------------------
diff --git a/src/test_engine_compaction.erl b/src/test_engine_compaction.erl
new file mode 100644
index 0000000..b178bae
--- /dev/null
+++ b/src/test_engine_compaction.erl
@@ -0,0 +1,181 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(test_engine_compaction).
+-compile(export_all).
+
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+cet_compact_empty() ->
+ {ok, Engine, Path, St1} = test_engine_util:init_engine(dbpath),
+ Db1 = test_engine_util:db_as_term(Engine, St1),
+ {ok, St2, DbName, _, Term} = test_engine_util:compact(Engine, St1, Path),
+ {ok, St3, undefined} = Engine:finish_compaction(St2, DbName, [], Term),
+ Db2 = test_engine_util:db_as_term(Engine, St3),
+ Diff = test_engine_util:term_diff(Db1, Db2),
+ ?assertEqual(nodiff, Diff).
+
+
+cet_compact_doc() ->
+ {ok, Engine, Path, St1} = test_engine_util:init_engine(dbpath),
+ Actions = [{create, {<<"foo">>, []}}],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+ Db1 = test_engine_util:db_as_term(Engine, St2),
+ {ok, St3, DbName, _, Term} = test_engine_util:compact(Engine, St2, Path),
+ {ok, St4, undefined} = Engine:finish_compaction(St3, DbName, [], Term),
+ Db2 = test_engine_util:db_as_term(Engine, St4),
+ Diff = test_engine_util:term_diff(Db1, Db2),
+ ?assertEqual(nodiff, Diff).
+
+
+cet_compact_local_doc() ->
+ {ok, Engine, Path, St1} = test_engine_util:init_engine(dbpath),
+ Actions = [{create, {<<"_local/foo">>, []}}],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+ Db1 = test_engine_util:db_as_term(Engine, St2),
+ {ok, St3, DbName, _, Term} = test_engine_util:compact(Engine, St2, Path),
+ {ok, St4, undefined} = Engine:finish_compaction(St3, DbName, [], Term),
+ Db2 = test_engine_util:db_as_term(Engine, St4),
+ Diff = test_engine_util:term_diff(Db1, Db2),
+ ?assertEqual(nodiff, Diff).
+
+
+cet_compact_with_everything() ->
+ {ok, Engine, Path, St1} = test_engine_util:init_engine(dbpath),
+
+ % Add a whole bunch of docs
+ DocActions = lists:map(fun(Seq) ->
+ {create, {docid(Seq), [{<<"int">>, Seq}]}}
+ end, lists:seq(1, 1000)),
+
+ LocalActions = lists:map(fun(I) ->
+ {create, {local_docid(I), [{<<"int">>, I}]}}
+ end, lists:seq(1, 25)),
+
+ Actions1 = DocActions ++ LocalActions,
+
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions1),
+ {ok, St3} = Engine:set_security(St2, [{<<"readers">>, <<"ohai">>}]),
+ {ok, St4} = Engine:set_revs_limit(St3, 500),
+
+ Actions2 = [
+ {create, {<<"foo">>, []}},
+ {create, {<<"bar">>, [{<<"hooray">>, <<"purple">>}]}},
+ {conflict, {<<"bar">>, [{<<"booo">>, false}]}}
+ ],
+
+ {ok, St5} = test_engine_util:apply_actions(Engine, St4, Actions2),
+
+ [FooFDI, BarFDI] = Engine:open_docs(St5, [<<"foo">>, <<"bar">>]),
+
+ FooRev = test_engine_util:prev_rev(FooFDI),
+ BarRev = test_engine_util:prev_rev(BarFDI),
+
+ Actions3 = [
+ {batch, [
+ {purge, {<<"foo">>, FooRev#rev_info.rev}},
+ {purge, {<<"bar">>, BarRev#rev_info.rev}}
+ ]}
+ ],
+
+ {ok, St6} = test_engine_util:apply_actions(Engine, St5, Actions3),
+
+ PurgedIdRevs = [
+ {<<"bar">>, [BarRev#rev_info.rev]},
+ {<<"foo">>, [FooRev#rev_info.rev]}
+ ],
+
+ ?assertEqual(PurgedIdRevs, lists:sort(Engine:get_last_purged(St6))),
+
+ [Att0, Att1, Att2, Att3, Att4] = test_engine_util:prep_atts(Engine, St6, [
+ {<<"ohai.txt">>, crypto:rand_bytes(2048)},
+ {<<"stuff.py">>, crypto:rand_bytes(32768)},
+ {<<"a.erl">>, crypto:rand_bytes(29)},
+ {<<"a.hrl">>, crypto:rand_bytes(5000)},
+ {<<"a.app">>, crypto:rand_bytes(400)}
+ ]),
+
+ Actions4 = [
+ {create, {<<"small_att">>, [], [Att0]}},
+ {create, {<<"large_att">>, [], [Att1]}},
+ {create, {<<"multi_att">>, [], [Att2, Att3, Att4]}}
+ ],
+ {ok, St7} = test_engine_util:apply_actions(Engine, St6, Actions4),
+ {ok, St8} = Engine:commit_data(St7),
+
+ Db1 = test_engine_util:db_as_term(Engine, St8),
+
+ Config = [
+ {"database_compaction", "doc_buffer_size", "1024"},
+ {"database_compaction", "checkpoint_after", "2048"}
+ ],
+
+ {ok, St9, DbName, _, Term} = test_engine_util:with_config(Config, fun() ->
+ test_engine_util:compact(Engine, St8, Path)
+ end),
+
+ {ok, St10, undefined} = Engine:finish_compaction(St9, DbName, [], Term),
+ Db2 = test_engine_util:db_as_term(Engine, St10),
+ Diff = test_engine_util:term_diff(Db1, Db2),
+ ?assertEqual(nodiff, Diff).
+
+
+cet_recompact_updates() ->
+ {ok, Engine, Path, St1} = test_engine_util:init_engine(dbpath),
+
+ Actions1 = [
+ {create, {<<"foo">>, []}},
+ {create, {<<"bar">>, []}}
+ ],
+
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions1),
+ {ok, St3, DbName, _, Term} = test_engine_util:compact(Engine, St2, Path),
+
+ Actions2 = [
+ {update, {<<"foo">>, [{<<"updated">>, true}]}},
+ {create, {<<"baz">>, []}}
+ ],
+
+ {ok, St4} = test_engine_util:apply_actions(Engine, St3, Actions2),
+ Db1 = test_engine_util:db_as_term(Engine, St4),
+
+ {ok, St5, NewPid} = Engine:finish_compaction(St4, DbName, [], Term),
+
+ ?assertEqual(true, is_pid(NewPid)),
+ Ref = erlang:monitor(process, NewPid),
+
+ NewTerm = receive
+ {'$gen_cast', {compact_done, Engine, Term0}} ->
+ Term0;
+ {'DOWN', Ref, _, _, Reason} ->
+ erlang:error({compactor_died, Reason})
+ after 10000 ->
+ erlang:error(compactor_timed_out)
+ end,
+
+ {ok, St6, undefined} = Engine:finish_compaction(St5, DbName, [], NewTerm),
+ Db2 = test_engine_util:db_as_term(Engine, St6),
+ Diff = test_engine_util:term_diff(Db1, Db2),
+ ?assertEqual(nodiff, Diff).
+
+
+docid(I) ->
+ Str = io_lib:format("~4..0b", [I]),
+ iolist_to_binary(Str).
+
+
+local_docid(I) ->
+ Str = io_lib:format("_local/~4..0b", [I]),
+ iolist_to_binary(Str).
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/414880e1/src/test_engine_fold_changes.erl
----------------------------------------------------------------------
diff --git a/src/test_engine_fold_changes.erl b/src/test_engine_fold_changes.erl
new file mode 100644
index 0000000..6e97fda
--- /dev/null
+++ b/src/test_engine_fold_changes.erl
@@ -0,0 +1,190 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(test_engine_fold_changes).
+-compile(export_all).
+
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(NUM_DOCS, 100).
+
+
+cet_empty_changes() ->
+ {ok, Engine, St} = test_engine_util:init_engine(),
+
+ ?assertEqual(0, Engine:count_changes_since(St, 0)),
+ ?assertEqual({ok, []}, Engine:fold_changes(St, 0, fun fold_fun/2, [], [])).
+
+
+cet_single_change() ->
+ {ok, Engine, St1} = test_engine_util:init_engine(),
+ Actions = [{create, {<<"a">>, []}}],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+
+ ?assertEqual(1, Engine:count_changes_since(St2, 0)),
+ ?assertEqual({ok, [{<<"a">>, 1}]},
+ Engine:fold_changes(St2, 0, fun fold_fun/2, [], [])).
+
+
+cet_two_changes() ->
+ {ok, Engine, St1} = test_engine_util:init_engine(),
+ Actions = [
+ {create, {<<"a">>, []}},
+ {create, {<<"b">>, []}}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+
+ ?assertEqual(2, Engine:count_changes_since(St2, 0)),
+ {ok, Changes} = Engine:fold_changes(St2, 0, fun fold_fun/2, [], []),
+ ?assertEqual([{<<"a">>, 1}, {<<"b">>, 2}], lists:reverse(Changes)).
+
+
+cet_two_changes_batch() ->
+ {ok, Engine, St1} = test_engine_util:init_engine(),
+ Actions1 = [
+ {batch, [
+ {create, {<<"a">>, []}},
+ {create, {<<"b">>, []}}
+ ]}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions1),
+
+ ?assertEqual(2, Engine:count_changes_since(St2, 0)),
+ {ok, Changes1} = Engine:fold_changes(St2, 0, fun fold_fun/2, [], []),
+ ?assertEqual([{<<"a">>, 1}, {<<"b">>, 2}], lists:reverse(Changes1)),
+
+ {ok, Engine, St3} = test_engine_util:init_engine(),
+ Actions2 = [
+ {batch, [
+ {create, {<<"b">>, []}},
+ {create, {<<"a">>, []}}
+ ]}
+ ],
+ {ok, St4} = test_engine_util:apply_actions(Engine, St3, Actions2),
+
+ ?assertEqual(2, Engine:count_changes_since(St4, 0)),
+ {ok, Changes2} = Engine:fold_changes(St4, 0, fun fold_fun/2, [], []),
+ ?assertEqual([{<<"b">>, 1}, {<<"a">>, 2}], lists:reverse(Changes2)).
+
+
+cet_update_one() ->
+ {ok, Engine, St1} = test_engine_util:init_engine(),
+ Actions = [
+ {create, {<<"a">>, []}},
+ {update, {<<"a">>, []}}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+
+ ?assertEqual(1, Engine:count_changes_since(St2, 0)),
+ ?assertEqual({ok, [{<<"a">>, 2}]},
+ Engine:fold_changes(St2, 0, fun fold_fun/2, [], [])).
+
+
+cet_update_first_of_two() ->
+ {ok, Engine, St1} = test_engine_util:init_engine(),
+ Actions = [
+ {create, {<<"a">>, []}},
+ {create, {<<"b">>, []}},
+ {update, {<<"a">>, []}}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+
+ ?assertEqual(2, Engine:count_changes_since(St2, 0)),
+ {ok, Changes} = Engine:fold_changes(St2, 0, fun fold_fun/2, [], []),
+ ?assertEqual([{<<"b">>, 2}, {<<"a">>, 3}], lists:reverse(Changes)).
+
+
+cet_update_second_of_two() ->
+ {ok, Engine, St1} = test_engine_util:init_engine(),
+ Actions = [
+ {create, {<<"a">>, []}},
+ {create, {<<"b">>, []}},
+ {update, {<<"b">>, []}}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+
+ ?assertEqual(2, Engine:count_changes_since(St2, 0)),
+ {ok, Changes} = Engine:fold_changes(St2, 0, fun fold_fun/2, [], []),
+ ?assertEqual([{<<"a">>, 1}, {<<"b">>, 3}], lists:reverse(Changes)).
+
+
+cet_check_mutation_ordering() ->
+ Actions = shuffle(lists:map(fun(Seq) ->
+ {create, {docid(Seq), []}}
+ end, lists:seq(1, ?NUM_DOCS))),
+
+ DocIdOrder = [DocId || {_, {DocId, _}} <- Actions],
+ DocSeqs = lists:zip(DocIdOrder, lists:seq(1, ?NUM_DOCS)),
+
+ {ok, Engine, St1} = test_engine_util:init_engine(),
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+
+ % First lets see that we can get the correct
+ % suffix/prefix starting at every update sequence
+ lists:foreach(fun(Seq) ->
+ {ok, Suffix} = Engine:fold_changes(St2, Seq, fun fold_fun/2, [], []),
+ ?assertEqual(lists:nthtail(Seq, DocSeqs), lists:reverse(Suffix)),
+
+ {ok, Prefix} = Engine:fold_changes(St2, Seq, fun fold_fun/2, [], [
+ {dir, rev}
+ ]),
+ ?assertEqual(lists:sublist(DocSeqs, Seq + 1), Prefix)
+ end, lists:seq(0, ?NUM_DOCS)),
+
+ ok = do_mutation_ordering(Engine, St2, ?NUM_DOCS + 1, DocSeqs, []).
+
+
+do_mutation_ordering(Engine, St, _Seq, [], FinalDocSeqs) ->
+ {ok, RevOrder} = Engine:fold_changes(St, 0, fun fold_fun/2, [], []),
+ ?assertEqual(FinalDocSeqs, lists:reverse(RevOrder)),
+ ok;
+
+do_mutation_ordering(Engine, St, Seq, [{DocId, _OldSeq} | Rest], DocSeqAcc) ->
+ Actions = [{update, {DocId, []}}],
+ {ok, NewSt} = test_engine_util:apply_actions(Engine, St, Actions),
+ NewAcc = DocSeqAcc ++ [{DocId, Seq}],
+ Expected = Rest ++ NewAcc,
+ {ok, RevOrder} = Engine:fold_changes(NewSt, 0, fun fold_fun/2, [], []),
+ ?assertEqual(Expected, lists:reverse(RevOrder)),
+ do_mutation_ordering(Engine, NewSt, Seq + 1, Rest, NewAcc).
+
+
+shuffle(List) ->
+ random:seed(os:timestamp()),
+ Paired = [{random:uniform(), I} || I <- List],
+ Sorted = lists:sort(Paired),
+ [I || {_, I} <- Sorted].
+
+
+remove_random(List) ->
+ Pos = random:uniform(length(List)),
+ remove_random(Pos, List).
+
+
+remove_random(1, [Item | Rest]) ->
+ {Item, Rest};
+
+remove_random(N, [Skip | Rest]) when N > 1 ->
+ {Item, Tail} = remove_random(N - 1, Rest),
+ {Item, [Skip | Tail]}.
+
+
+fold_fun(#full_doc_info{id=Id, update_seq=Seq}, Acc) ->
+ {ok, [{Id, Seq} | Acc]}.
+
+
+docid(I) ->
+ Str = io_lib:format("~4..0b", [I]),
+ iolist_to_binary(Str).
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/414880e1/src/test_engine_fold_docs.erl
----------------------------------------------------------------------
diff --git a/src/test_engine_fold_docs.erl b/src/test_engine_fold_docs.erl
new file mode 100644
index 0000000..34d7f3e
--- /dev/null
+++ b/src/test_engine_fold_docs.erl
@@ -0,0 +1,390 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(test_engine_fold_docs).
+-compile(export_all).
+
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(NUM_DOCS, 100).
+
+
+cet_fold_all() ->
+ fold_all(fold_docs, fun docid/1).
+
+
+cet_fold_all_local() ->
+ fold_all(fold_local_docs, fun local_docid/1).
+
+
+cet_fold_start_key() ->
+ fold_start_key(fold_docs, fun docid/1).
+
+
+cet_fold_start_key_local() ->
+ fold_start_key(fold_local_docs, fun local_docid/1).
+
+
+cet_fold_end_key() ->
+ fold_end_key(fold_docs, fun docid/1).
+
+
+cet_fold_end_key_local() ->
+ fold_end_key(fold_local_docs, fun local_docid/1).
+
+
+cet_fold_end_key_gt() ->
+ fold_end_key_gt(fold_docs, fun docid/1).
+
+
+cet_fold_end_key_gt_local() ->
+ fold_end_key_gt(fold_local_docs, fun local_docid/1).
+
+
+cet_fold_range() ->
+ fold_range(fold_docs, fun docid/1).
+
+
+cet_fold_range_local() ->
+ fold_range(fold_local_docs, fun local_docid/1).
+
+
+cet_fold_stop() ->
+ fold_stop(fold_docs, fun docid/1).
+
+
+cet_fold_stop_local() ->
+ fold_stop(fold_local_docs, fun local_docid/1).
+
+
+% This is a loose test but we have to have this until
+% I figure out what to do about the total_rows/offset
+% meta data included in _all_docs
+cet_fold_include_reductions() ->
+ {ok, Engine, St} = init_st(fun docid/1),
+ FoldFun = fun(_, _, nil) -> {ok, nil} end,
+ {ok, Count, nil} = Engine:fold_docs(St, FoldFun, nil, [include_reductions]),
+ ?assert(is_integer(Count)),
+ ?assert(Count >= 0).
+
+
+fold_all(FoldFun, DocIdFun) ->
+ DocIds = [DocIdFun(I) || I <- lists:seq(1, ?NUM_DOCS)],
+ {ok, Engine, St} = init_st(DocIdFun),
+
+ {ok, DocIdAccFwd} = Engine:FoldFun(St, fun fold_fun/2, [], []),
+ ?assertEqual(?NUM_DOCS, length(DocIdAccFwd)),
+ ?assertEqual(DocIds, lists:reverse(DocIdAccFwd)),
+
+ {ok, DocIdAccRev} = Engine:FoldFun(St, fun fold_fun/2, [], [{dir, rev}]),
+ ?assertEqual(?NUM_DOCS, length(DocIdAccRev)),
+ ?assertEqual(DocIds, DocIdAccRev).
+
+
+fold_start_key(FoldFun, DocIdFun) ->
+ {ok, Engine, St} = init_st(DocIdFun),
+
+ StartKeyNum = ?NUM_DOCS div 4,
+ StartKey = DocIdFun(StartKeyNum),
+
+ AllDocIds = [DocIdFun(I) || I <- lists:seq(1, ?NUM_DOCS)],
+ DocIdsFwd = [DocIdFun(I) || I <- lists:seq(StartKeyNum, ?NUM_DOCS)],
+ DocIdsRev = [DocIdFun(I) || I <- lists:seq(1, StartKeyNum)],
+
+ ?assertEqual({ok, []}, Engine:FoldFun(St, fun fold_fun/2, [], [
+ {start_key, <<255>>}
+ ])),
+
+ ?assertEqual({ok, []}, Engine:FoldFun(St, fun fold_fun/2, [], [
+ {dir, rev},
+ {start_key, <<"">>}
+ ])),
+
+ {ok, AllDocIdAccFwd} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {start_key, <<"">>}
+ ]),
+ ?assertEqual(length(AllDocIds), length(AllDocIdAccFwd)),
+ ?assertEqual(AllDocIds, lists:reverse(AllDocIdAccFwd)),
+
+ {ok, AllDocIdAccRev} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {dir, rev},
+ {start_key, <<255>>}
+ ]),
+ ?assertEqual(length(AllDocIds), length(AllDocIdAccFwd)),
+ ?assertEqual(AllDocIds, AllDocIdAccRev),
+
+ {ok, DocIdAccFwd} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {start_key, StartKey}
+ ]),
+ ?assertEqual(length(DocIdsFwd), length(DocIdAccFwd)),
+ ?assertEqual(DocIdsFwd, lists:reverse(DocIdAccFwd)),
+
+ {ok, DocIdAccRev} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {dir, rev},
+ {start_key, StartKey}
+ ]),
+ ?assertEqual(length(DocIdsRev), length(DocIdAccRev)),
+ ?assertEqual(DocIdsRev, DocIdAccRev).
+
+
+fold_end_key(FoldFun, DocIdFun) ->
+ {ok, Engine, St} = init_st(DocIdFun),
+
+ EndKeyNum = ?NUM_DOCS div 4,
+ EndKey = DocIdFun(EndKeyNum),
+
+ ?assertEqual({ok, []}, Engine:FoldFun(St, fun fold_fun/2, [], [
+ {end_key, <<"">>}
+ ])),
+
+ ?assertEqual({ok, []}, Engine:FoldFun(St, fun fold_fun/2, [], [
+ {dir, rev},
+ {end_key, <<255>>}
+ ])),
+
+ AllDocIds = [DocIdFun(I) || I <- lists:seq(1, ?NUM_DOCS)],
+
+ {ok, AllDocIdAccFwd} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {end_key, <<255>>}
+ ]),
+ ?assertEqual(length(AllDocIds), length(AllDocIdAccFwd)),
+ ?assertEqual(AllDocIds, lists:reverse(AllDocIdAccFwd)),
+
+ {ok, AllDocIdAccRev} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {dir, rev},
+ {end_key, <<"">>}
+ ]),
+ ?assertEqual(length(AllDocIds), length(AllDocIdAccFwd)),
+ ?assertEqual(AllDocIds, AllDocIdAccRev),
+
+ DocIdsFwd = [DocIdFun(I) || I <- lists:seq(1, EndKeyNum)],
+
+ {ok, DocIdAccFwd} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {end_key, EndKey}
+ ]),
+ ?assertEqual(length(DocIdsFwd), length(DocIdAccFwd)),
+ ?assertEqual(DocIdsFwd, lists:reverse(DocIdAccFwd)),
+
+ DocIdsRev = [DocIdFun(I) || I <- lists:seq(EndKeyNum, ?NUM_DOCS)],
+
+ {ok, DocIdAccRev} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {dir, rev},
+ {end_key, EndKey}
+ ]),
+ ?assertEqual(length(DocIdsRev), length(DocIdAccRev)),
+ ?assertEqual(DocIdsRev, DocIdAccRev).
+
+
+fold_end_key_gt(FoldFun, DocIdFun) ->
+ {ok, Engine, St} = init_st(DocIdFun),
+
+ EndKeyNum = ?NUM_DOCS div 4,
+ EndKey = DocIdFun(EndKeyNum),
+
+ ?assertEqual({ok, []}, Engine:FoldFun(St, fun fold_fun/2, [], [
+ {end_key_gt, <<"">>}
+ ])),
+
+ ?assertEqual({ok, []}, Engine:FoldFun(St, fun fold_fun/2, [], [
+ {dir, rev},
+ {end_key_gt, <<255>>}
+ ])),
+
+ AllDocIds = [DocIdFun(I) || I <- lists:seq(1, ?NUM_DOCS)],
+
+ {ok, AllDocIdAccFwd} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {end_key_gt, <<255>>}
+ ]),
+ ?assertEqual(length(AllDocIds), length(AllDocIdAccFwd)),
+ ?assertEqual(AllDocIds, lists:reverse(AllDocIdAccFwd)),
+
+ {ok, AllDocIdAccRev} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {dir, rev},
+ {end_key_gt, <<"">>}
+ ]),
+ ?assertEqual(length(AllDocIds), length(AllDocIdAccFwd)),
+ ?assertEqual(AllDocIds, AllDocIdAccRev),
+
+ DocIdsFwd = [DocIdFun(I) || I <- lists:seq(1, EndKeyNum - 1)],
+
+ {ok, DocIdAccFwd} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {end_key_gt, EndKey}
+ ]),
+ ?assertEqual(length(DocIdsFwd), length(DocIdAccFwd)),
+ ?assertEqual(DocIdsFwd, lists:reverse(DocIdAccFwd)),
+
+ DocIdsRev = [DocIdFun(I) || I <- lists:seq(EndKeyNum + 1, ?NUM_DOCS)],
+
+ {ok, DocIdAccRev} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {dir, rev},
+ {end_key_gt, EndKey}
+ ]),
+ ?assertEqual(length(DocIdsRev), length(DocIdAccRev)),
+ ?assertEqual(DocIdsRev, DocIdAccRev).
+
+
+fold_range(FoldFun, DocIdFun) ->
+ {ok, Engine, St} = init_st(DocIdFun),
+
+ StartKeyNum = ?NUM_DOCS div 4,
+ EndKeyNum = StartKeyNum * 3,
+
+ StartKey = DocIdFun(StartKeyNum),
+ EndKey = DocIdFun(EndKeyNum),
+
+ ?assertEqual({ok, []}, Engine:FoldFun(St, fun fold_fun/2, [], [
+ {start_key, <<"">>},
+ {end_key, <<"">>}
+ ])),
+
+ ?assertEqual({ok, []}, Engine:FoldFun(St, fun fold_fun/2, [], [
+ {dir, rev},
+ {start_key, <<"">>},
+ {end_key, <<255>>}
+ ])),
+
+ AllDocIds = [DocIdFun(I) || I <- lists:seq(1, ?NUM_DOCS)],
+
+ {ok, AllDocIdAccFwd} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {start_key, <<"">>},
+ {end_key, <<255>>}
+ ]),
+ ?assertEqual(length(AllDocIds), length(AllDocIdAccFwd)),
+ ?assertEqual(AllDocIds, lists:reverse(AllDocIdAccFwd)),
+
+ {ok, AllDocIdAccRev} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {dir, rev},
+ {start_key, <<255>>},
+ {end_key_gt, <<"">>}
+ ]),
+ ?assertEqual(length(AllDocIds), length(AllDocIdAccFwd)),
+ ?assertEqual(AllDocIds, AllDocIdAccRev),
+
+ DocIdsFwd = [DocIdFun(I) || I <- lists:seq(StartKeyNum, EndKeyNum)],
+
+ {ok, DocIdAccFwd} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {start_key, StartKey},
+ {end_key, EndKey}
+ ]),
+ ?assertEqual(length(DocIdsFwd), length(DocIdAccFwd)),
+ ?assertEqual(DocIdsFwd, lists:reverse(DocIdAccFwd)),
+
+ DocIdsRev = [DocIdFun(I) || I <- lists:seq(StartKeyNum, EndKeyNum)],
+
+ ?assertEqual({ok, []}, Engine:FoldFun(St, fun fold_fun/2, [], [
+ {dir, rev},
+ {start_key, StartKey},
+ {end_key, EndKey}
+ ])),
+
+ {ok, DocIdAccRev} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {dir, rev},
+ {start_key, EndKey},
+ {end_key, StartKey}
+ ]),
+ ?assertEqual(length(DocIdsRev), length(DocIdAccRev)),
+ ?assertEqual(DocIdsRev, DocIdAccRev).
+
+
+fold_stop(FoldFun, DocIdFun) ->
+ {ok, Engine, St} = init_st(DocIdFun),
+
+ StartKeyNum = ?NUM_DOCS div 4,
+ StartKey = DocIdFun(StartKeyNum),
+
+ ?assertEqual({ok, []}, Engine:FoldFun(St, fun fold_fun_stop/2, [], [
+ {start_key, <<255>>}
+ ])),
+
+ ?assertEqual({ok, []}, Engine:FoldFun(St, fun fold_fun_stop/2, [], [
+ {dir, rev},
+ {start_key, <<"">>}
+ ])),
+
+ SuffixDocIds = [DocIdFun(I) || I <- lists:seq(?NUM_DOCS - 3, ?NUM_DOCS)],
+
+ {ok, SuffixDocIdAcc} = Engine:FoldFun(St, fun fold_fun_stop/2, [], [
+ {start_key, DocIdFun(?NUM_DOCS - 3)}
+ ]),
+ ?assertEqual(length(SuffixDocIds), length(SuffixDocIdAcc)),
+ ?assertEqual(SuffixDocIds, lists:reverse(SuffixDocIdAcc)),
+
+ PrefixDocIds = [DocIdFun(I) || I <- lists:seq(1, 3)],
+
+ {ok, PrefixDocIdAcc} = Engine:FoldFun(St, fun fold_fun_stop/2, [], [
+ {dir, rev},
+ {start_key, DocIdFun(3)}
+ ]),
+ ?assertEqual(3, length(PrefixDocIdAcc)),
+ ?assertEqual(PrefixDocIds, PrefixDocIdAcc),
+
+ FiveDocIdsFwd = [DocIdFun(I)
+ || I <- lists:seq(StartKeyNum, StartKeyNum + 5)],
+
+ {ok, FiveDocIdAccFwd} = Engine:FoldFun(St, fun fold_fun_stop/2, [], [
+ {start_key, StartKey}
+ ]),
+ ?assertEqual(length(FiveDocIdsFwd), length(FiveDocIdAccFwd)),
+ ?assertEqual(FiveDocIdsFwd, lists:reverse(FiveDocIdAccFwd)),
+
+ FiveDocIdsRev = [DocIdFun(I)
+ || I <- lists:seq(StartKeyNum - 5, StartKeyNum)],
+
+ {ok, FiveDocIdAccRev} = Engine:FoldFun(St, fun fold_fun_stop/2, [], [
+ {dir, rev},
+ {start_key, StartKey}
+ ]),
+ ?assertEqual(length(FiveDocIdsRev), length(FiveDocIdAccRev)),
+ ?assertEqual(FiveDocIdsRev, FiveDocIdAccRev).
+
+
+init_st(DocIdFun) ->
+ {ok, Engine, St1} = test_engine_util:init_engine(),
+ Actions = lists:map(fun(Id) ->
+ {create, {DocIdFun(Id), [{<<"int">>, Id}]}}
+ end, lists:seq(1, ?NUM_DOCS)),
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+ {ok, Engine, St2}.
+
+
+fold_fun(Doc, Acc) ->
+ Id = case Doc of
+ #doc{id = Id0} -> Id0;
+ #full_doc_info{id = Id0} -> Id0
+ end,
+ {ok, [Id | Acc]}.
+
+
+fold_fun_stop(Doc, Acc) ->
+ Id = case Doc of
+ #doc{id = Id0} -> Id0;
+ #full_doc_info{id = Id0} -> Id0
+ end,
+ case length(Acc) of
+ N when N =< 4 ->
+ {ok, [Id | Acc]};
+ _ ->
+ {stop, [Id | Acc]}
+ end.
+
+
+docid(I) ->
+ Str = io_lib:format("~4..0b", [I]),
+ iolist_to_binary(Str).
+
+
+local_docid(I) ->
+ Str = io_lib:format("_local/~4..0b", [I]),
+ iolist_to_binary(Str).
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/414880e1/src/test_engine_get_set_props.erl
----------------------------------------------------------------------
diff --git a/src/test_engine_get_set_props.erl b/src/test_engine_get_set_props.erl
new file mode 100644
index 0000000..6d2a447
--- /dev/null
+++ b/src/test_engine_get_set_props.erl
@@ -0,0 +1,70 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(test_engine_get_set_props).
+-compile(export_all).
+
+
+-include_lib("eunit/include/eunit.hrl").
+
+
+cet_default_props() ->
+ Engine = test_engine_util:get_engine(),
+ DbPath = test_engine_util:dbpath(),
+
+ {ok, St} = Engine:init(DbPath, [
+ create,
+ {default_security_object, dso}
+ ]),
+
+ Node = node(),
+
+ ?assertEqual(0, Engine:get_doc_count(St)),
+ ?assertEqual(0, Engine:get_del_doc_count(St)),
+ ?assertEqual(true, is_list(Engine:get_size_info(St))),
+ ?assertEqual(true, is_integer(Engine:get_disk_version(St))),
+ ?assertEqual(0, Engine:get_update_seq(St)),
+ ?assertEqual(0, Engine:get_purge_seq(St)),
+ ?assertEqual([], Engine:get_last_purged(St)),
+ ?assertEqual(dso, Engine:get_security(St)),
+ ?assertEqual(1000, Engine:get_revs_limit(St)),
+ ?assertMatch(<<_:32/binary>>, Engine:get_uuid(St)),
+ ?assertEqual([{Node, 0}], Engine:get_epochs(St)),
+ ?assertEqual(0, Engine:get_compacted_seq(St)).
+
+
+cet_set_security() ->
+ check_prop_set(get_security, set_security, dso, [{<<"readers">>, []}]).
+
+
+cet_set_revs_limit() ->
+ check_prop_set(get_revs_limit, set_revs_limit, 1000, 50).
+
+
+check_prop_set(GetFun, SetFun, Default, Value) ->
+ Engine = test_engine_util:get_engine(),
+ DbPath = test_engine_util:dbpath(),
+
+ {ok, St0} = Engine:init(DbPath, [
+ create,
+ {default_security_object, dso}
+ ]),
+ ?assertEqual(Default, Engine:GetFun(St0)),
+
+ {ok, St1} = Engine:SetFun(St0, Value),
+ ?assertEqual(Value, Engine:GetFun(St1)),
+
+ {ok, St2} = Engine:commit_data(St1),
+ Engine:terminate(normal, St2),
+
+ {ok, St3} = Engine:init(DbPath, []),
+ ?assertEqual(Value, Engine:GetFun(St3)).
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/414880e1/src/test_engine_open_close_delete.erl
----------------------------------------------------------------------
diff --git a/src/test_engine_open_close_delete.erl b/src/test_engine_open_close_delete.erl
new file mode 100644
index 0000000..b099d9f
--- /dev/null
+++ b/src/test_engine_open_close_delete.erl
@@ -0,0 +1,81 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(test_engine_open_close_delete).
+-compile(export_all).
+
+
+-include_lib("eunit/include/eunit.hrl").
+
+
+cet_open_non_existent() ->
+ Engine = test_engine_util:get_engine(),
+ DbPath = test_engine_util:dbpath(),
+
+ ?assertEqual(false, Engine:exists(DbPath)),
+ ?assertThrow({not_found, no_db_file}, Engine:init(DbPath, [])),
+ ?assertEqual(false, Engine:exists(DbPath)).
+
+
+cet_open_create() ->
+ process_flag(trap_exit, true),
+ Engine = test_engine_util:get_engine(),
+ DbPath = test_engine_util:dbpath(),
+
+ ?assertEqual(false, Engine:exists(DbPath)),
+ ?assertMatch({ok, _}, Engine:init(DbPath, [create])),
+ ?assertEqual(true, Engine:exists(DbPath)).
+
+
+cet_open_when_exists() ->
+ Engine = test_engine_util:get_engine(),
+ DbPath = test_engine_util:dbpath(),
+
+ ?assertEqual(false, Engine:exists(DbPath)),
+ ?assertMatch({ok, _}, Engine:init(DbPath, [create])),
+ ?assertThrow({error, eexist}, Engine:init(DbPath, [create])).
+
+
+cet_terminate() ->
+ Engine = test_engine_util:get_engine(),
+ DbPath = test_engine_util:dbpath(),
+
+ ?assertEqual(false, Engine:exists(DbPath)),
+ {ok, St} = Engine:init(DbPath, [create]),
+ Engine:terminate(normal, St),
+ ?assertEqual(true, Engine:exists(DbPath)).
+
+
+cet_rapid_recycle() ->
+ Engine = test_engine_util:get_engine(),
+ DbPath = test_engine_util:dbpath(),
+
+ {ok, St0} = Engine:init(DbPath, [create]),
+ Engine:terminate(normal, St0),
+
+ lists:foreach(fun(_) ->
+ {ok, St1} = Engine:init(DbPath, []),
+ Engine:terminate(normal, St1)
+ end, lists:seq(1, 100)).
+
+
+cet_delete() ->
+ Engine = test_engine_util:get_engine(),
+ RootDir = test_engine_util:rootdir(),
+ DbPath = test_engine_util:dbpath(),
+
+ ?assertEqual(false, Engine:exists(DbPath)),
+ {ok, St} = Engine:init(DbPath, [create]),
+ Engine:terminate(normal, St),
+ ?assertEqual(true, Engine:exists(DbPath)),
+ ?assertEqual(ok, Engine:delete(RootDir, DbPath, [async])),
+ ?assertEqual(false, Engine:exists(DbPath)).
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/414880e1/src/test_engine_purge_docs.erl
----------------------------------------------------------------------
diff --git a/src/test_engine_purge_docs.erl b/src/test_engine_purge_docs.erl
new file mode 100644
index 0000000..e5bf249
--- /dev/null
+++ b/src/test_engine_purge_docs.erl
@@ -0,0 +1,158 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(test_engine_purge_docs).
+-compile(export_all).
+
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+cet_purge_simple() ->
+ {ok, Engine, St1} = test_engine_util:init_engine(),
+
+ Actions1 = [
+ {create, {<<"foo">>, [{<<"vsn">>, 1}]}}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions1),
+
+ ?assertEqual(1, Engine:get_doc_count(St2)),
+ ?assertEqual(0, Engine:get_del_doc_count(St2)),
+ ?assertEqual(1, Engine:get_update_seq(St2)),
+ ?assertEqual(0, Engine:get_purge_seq(St2)),
+ ?assertEqual([], Engine:get_last_purged(St2)),
+
+ [FDI] = Engine:open_docs(St2, [<<"foo">>]),
+ PrevRev = test_engine_util:prev_rev(FDI),
+ Rev = PrevRev#rev_info.rev,
+
+ Actions2 = [
+ {purge, {<<"foo">>, Rev}}
+ ],
+ {ok, St3} = test_engine_util:apply_actions(Engine, St2, Actions2),
+
+ ?assertEqual(0, Engine:get_doc_count(St3)),
+ ?assertEqual(0, Engine:get_del_doc_count(St3)),
+ ?assertEqual(2, Engine:get_update_seq(St3)),
+ ?assertEqual(1, Engine:get_purge_seq(St3)),
+ ?assertEqual([{<<"foo">>, [Rev]}], Engine:get_last_purged(St3)).
+
+
+cet_purge_conflicts() ->
+ {ok, Engine, St1} = test_engine_util:init_engine(),
+
+ Actions1 = [
+ {create, {<<"foo">>, [{<<"vsn">>, 1}]}},
+ {conflict, {<<"foo">>, [{<<"vsn">>, 2}]}}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions1),
+
+ ?assertEqual(1, Engine:get_doc_count(St2)),
+ ?assertEqual(0, Engine:get_del_doc_count(St2)),
+ ?assertEqual(2, Engine:get_update_seq(St2)),
+ ?assertEqual(0, Engine:get_purge_seq(St2)),
+ ?assertEqual([], Engine:get_last_purged(St2)),
+
+ [FDI1] = Engine:open_docs(St2, [<<"foo">>]),
+ PrevRev1 = test_engine_util:prev_rev(FDI1),
+ Rev1 = PrevRev1#rev_info.rev,
+
+ Actions2 = [
+ {purge, {<<"foo">>, Rev1}}
+ ],
+ {ok, St3} = test_engine_util:apply_actions(Engine, St2, Actions2),
+
+ ?assertEqual(1, Engine:get_doc_count(St3)),
+ ?assertEqual(0, Engine:get_del_doc_count(St3)),
+ ?assertEqual(4, Engine:get_update_seq(St3)),
+ ?assertEqual(1, Engine:get_purge_seq(St3)),
+ ?assertEqual([{<<"foo">>, [Rev1]}], Engine:get_last_purged(St3)),
+
+ [FDI2] = Engine:open_docs(St3, [<<"foo">>]),
+ PrevRev2 = test_engine_util:prev_rev(FDI2),
+ Rev2 = PrevRev2#rev_info.rev,
+
+ Actions3 = [
+ {purge, {<<"foo">>, Rev2}}
+ ],
+ {ok, St4} = test_engine_util:apply_actions(Engine, St3, Actions3),
+
+ ?assertEqual(0, Engine:get_doc_count(St4)),
+ ?assertEqual(0, Engine:get_del_doc_count(St4)),
+ ?assertEqual(5, Engine:get_update_seq(St4)),
+ ?assertEqual(2, Engine:get_purge_seq(St4)),
+ ?assertEqual([{<<"foo">>, [Rev2]}], Engine:get_last_purged(St4)).
+
+
+cet_add_delete_purge() ->
+ {ok, Engine, St1} = test_engine_util:init_engine(),
+
+ Actions1 = [
+ {create, {<<"foo">>, [{<<"vsn">>, 1}]}},
+ {delete, {<<"foo">>, [{<<"vsn">>, 2}]}}
+ ],
+
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions1),
+
+ ?assertEqual(0, Engine:get_doc_count(St2)),
+ ?assertEqual(1, Engine:get_del_doc_count(St2)),
+ ?assertEqual(2, Engine:get_update_seq(St2)),
+ ?assertEqual(0, Engine:get_purge_seq(St2)),
+ ?assertEqual([], Engine:get_last_purged(St2)),
+
+ [FDI] = Engine:open_docs(St2, [<<"foo">>]),
+ PrevRev = test_engine_util:prev_rev(FDI),
+ Rev = PrevRev#rev_info.rev,
+
+ Actions2 = [
+ {purge, {<<"foo">>, Rev}}
+ ],
+ {ok, St3} = test_engine_util:apply_actions(Engine, St2, Actions2),
+
+ ?assertEqual(0, Engine:get_doc_count(St3)),
+ ?assertEqual(0, Engine:get_del_doc_count(St3)),
+ ?assertEqual(3, Engine:get_update_seq(St3)),
+ ?assertEqual(1, Engine:get_purge_seq(St3)),
+ ?assertEqual([{<<"foo">>, [Rev]}], Engine:get_last_purged(St3)).
+
+
+cet_add_two_purge_one() ->
+ {ok, Engine, St1} = test_engine_util:init_engine(),
+
+ Actions1 = [
+ {create, {<<"foo">>, [{<<"vsn">>, 1}]}},
+ {create, {<<"bar">>, []}}
+ ],
+
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions1),
+
+ ?assertEqual(2, Engine:get_doc_count(St2)),
+ ?assertEqual(0, Engine:get_del_doc_count(St2)),
+ ?assertEqual(2, Engine:get_update_seq(St2)),
+ ?assertEqual(0, Engine:get_purge_seq(St2)),
+ ?assertEqual([], Engine:get_last_purged(St2)),
+
+ [FDI] = Engine:open_docs(St2, [<<"foo">>]),
+ PrevRev = test_engine_util:prev_rev(FDI),
+ Rev = PrevRev#rev_info.rev,
+
+ Actions2 = [
+ {purge, {<<"foo">>, Rev}}
+ ],
+ {ok, St3} = test_engine_util:apply_actions(Engine, St2, Actions2),
+
+ ?assertEqual(1, Engine:get_doc_count(St3)),
+ ?assertEqual(0, Engine:get_del_doc_count(St3)),
+ ?assertEqual(3, Engine:get_update_seq(St3)),
+ ?assertEqual(1, Engine:get_purge_seq(St3)),
+ ?assertEqual([{<<"foo">>, [Rev]}], Engine:get_last_purged(St3)).
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/414880e1/src/test_engine_read_write_docs.erl
----------------------------------------------------------------------
diff --git a/src/test_engine_read_write_docs.erl b/src/test_engine_read_write_docs.erl
new file mode 100644
index 0000000..4307702
--- /dev/null
+++ b/src/test_engine_read_write_docs.erl
@@ -0,0 +1,317 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(test_engine_read_write_docs).
+-compile(export_all).
+
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+cet_read_empty_docs() ->
+ {ok, Engine, St} = test_engine_util:init_engine(),
+
+ ?assertEqual([not_found], Engine:open_docs(St, [<<"foo">>])),
+ ?assertEqual(
+ [not_found, not_found],
+ Engine:open_docs(St, [<<"a">>, <<"b">>])
+ ).
+
+
+cet_read_empty_local_docs() ->
+ {ok, Engine, St} = test_engine_util:init_engine(),
+
+ ?assertEqual([not_found], Engine:open_local_docs(St, [<<"_local/foo">>])),
+ ?assertEqual(
+ [not_found, not_found],
+ Engine:open_local_docs(St, [<<"_local/a">>, <<"_local/b">>])
+ ).
+
+
+cet_write_one_doc() ->
+ {ok, Engine, DbPath, St1} = test_engine_util:init_engine(dbpath),
+
+ ?assertEqual(0, Engine:get_doc_count(St1)),
+ ?assertEqual(0, Engine:get_del_doc_count(St1)),
+ ?assertEqual(0, Engine:get_update_seq(St1)),
+
+ Actions = [
+ {create, {<<"foo">>, [{<<"vsn">>, 1}]}}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+ {ok, St3} = Engine:commit_data(St2),
+ Engine:terminate(normal, St3),
+ {ok, St4} = Engine:init(DbPath, []),
+
+ ?assertEqual(1, Engine:get_doc_count(St4)),
+ ?assertEqual(0, Engine:get_del_doc_count(St4)),
+ ?assertEqual(1, Engine:get_update_seq(St4)),
+
+ [FDI] = Engine:open_docs(St4, [<<"foo">>]),
+ #rev_info{
+ rev = {RevPos, PrevRevId},
+ deleted = Deleted,
+ body_sp = DocPtr
+ } = test_engine_util:prev_rev(FDI),
+
+ Doc0 = #doc{
+ id = <<"foo">>,
+ revs = {RevPos, [PrevRevId]},
+ deleted = Deleted,
+ body = DocPtr
+ },
+
+ Doc1 = Engine:read_doc_body(St4, Doc0),
+ Body1 = if not is_binary(Doc1#doc.body) -> Doc1#doc.body; true ->
+ couch_compress:decompress(Doc1#doc.body)
+ end,
+ ?assertEqual([{<<"vsn">>, 1}], Body1).
+
+
+cet_write_two_docs() ->
+ {ok, Engine, DbPath, St1} = test_engine_util:init_engine(dbpath),
+
+ ?assertEqual(0, Engine:get_doc_count(St1)),
+ ?assertEqual(0, Engine:get_del_doc_count(St1)),
+ ?assertEqual(0, Engine:get_update_seq(St1)),
+
+ Actions = [
+ {create, {<<"foo">>, [{<<"vsn">>, 1}]}},
+ {create, {<<"bar">>, [{<<"stuff">>, true}]}}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+ {ok, St3} = Engine:commit_data(St2),
+ Engine:terminate(normal, St3),
+ {ok, St4} = Engine:init(DbPath, []),
+
+ ?assertEqual(2, Engine:get_doc_count(St4)),
+ ?assertEqual(0, Engine:get_del_doc_count(St4)),
+ ?assertEqual(2, Engine:get_update_seq(St4)),
+
+ Resps = Engine:open_docs(St4, [<<"foo">>, <<"bar">>]),
+ ?assertEqual(false, lists:member(not_found, Resps)).
+
+
+cet_write_three_doc_batch() ->
+ {ok, Engine, DbPath, St1} = test_engine_util:init_engine(dbpath),
+
+ ?assertEqual(0, Engine:get_doc_count(St1)),
+ ?assertEqual(0, Engine:get_del_doc_count(St1)),
+ ?assertEqual(0, Engine:get_update_seq(St1)),
+
+ Actions = [
+ {batch, [
+ {create, {<<"foo">>, [{<<"vsn">>, 1}]}},
+ {create, {<<"bar">>, [{<<"stuff">>, true}]}},
+ {create, {<<"baz">>, []}}
+ ]}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+ {ok, St3} = Engine:commit_data(St2),
+ Engine:terminate(normal, St3),
+ {ok, St4} = Engine:init(DbPath, []),
+
+ ?assertEqual(3, Engine:get_doc_count(St4)),
+ ?assertEqual(0, Engine:get_del_doc_count(St4)),
+ ?assertEqual(3, Engine:get_update_seq(St4)),
+
+ Resps = Engine:open_docs(St4, [<<"foo">>, <<"bar">>, <<"baz">>]),
+ ?assertEqual(false, lists:member(not_found, Resps)).
+
+
+cet_update_doc() ->
+ {ok, Engine, DbPath, St1} = test_engine_util:init_engine(dbpath),
+
+ ?assertEqual(0, Engine:get_doc_count(St1)),
+ ?assertEqual(0, Engine:get_del_doc_count(St1)),
+ ?assertEqual(0, Engine:get_update_seq(St1)),
+
+ Actions = [
+ {create, {<<"foo">>, [{<<"vsn">>, 1}]}},
+ {update, {<<"foo">>, [{<<"vsn">>, 2}]}}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+ {ok, St3} = Engine:commit_data(St2),
+ Engine:terminate(normal, St3),
+ {ok, St4} = Engine:init(DbPath, []),
+
+ ?assertEqual(1, Engine:get_doc_count(St4)),
+ ?assertEqual(0, Engine:get_del_doc_count(St4)),
+ ?assertEqual(2, Engine:get_update_seq(St4)),
+
+ [FDI] = Engine:open_docs(St4, [<<"foo">>]),
+
+ #rev_info{
+ rev = {RevPos, PrevRevId},
+ deleted = Deleted,
+ body_sp = DocPtr
+ } = test_engine_util:prev_rev(FDI),
+
+ Doc0 = #doc{
+ id = <<"foo">>,
+ revs = {RevPos, [PrevRevId]},
+ deleted = Deleted,
+ body = DocPtr
+ },
+
+ Doc1 = Engine:read_doc_body(St4, Doc0),
+ Body1 = if not is_binary(Doc1#doc.body) -> Doc1#doc.body; true ->
+ couch_compress:decompress(Doc1#doc.body)
+ end,
+
+ ?assertEqual([{<<"vsn">>, 2}], Body1).
+
+
+cet_delete_doc() ->
+ {ok, Engine, DbPath, St1} = test_engine_util:init_engine(dbpath),
+
+ ?assertEqual(0, Engine:get_doc_count(St1)),
+ ?assertEqual(0, Engine:get_del_doc_count(St1)),
+ ?assertEqual(0, Engine:get_update_seq(St1)),
+
+ Actions = [
+ {create, {<<"foo">>, [{<<"vsn">>, 1}]}},
+ {delete, {<<"foo">>, []}}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+ {ok, St3} = Engine:commit_data(St2),
+ Engine:terminate(normal, St3),
+ {ok, St4} = Engine:init(DbPath, []),
+
+ ?assertEqual(0, Engine:get_doc_count(St4)),
+ ?assertEqual(1, Engine:get_del_doc_count(St4)),
+ ?assertEqual(2, Engine:get_update_seq(St4)),
+
+ [FDI] = Engine:open_docs(St4, [<<"foo">>]),
+
+ #rev_info{
+ rev = {RevPos, PrevRevId},
+ deleted = Deleted,
+ body_sp = DocPtr
+ } = test_engine_util:prev_rev(FDI),
+
+ Doc0 = #doc{
+ id = <<"foo">>,
+ revs = {RevPos, [PrevRevId]},
+ deleted = Deleted,
+ body = DocPtr
+ },
+
+ Doc1 = Engine:read_doc_body(St4, Doc0),
+ Body1 = if not is_binary(Doc1#doc.body) -> Doc1#doc.body; true ->
+ couch_compress:decompress(Doc1#doc.body)
+ end,
+
+ ?assertEqual([], Body1).
+
+
+cet_write_local_doc() ->
+ {ok, Engine, DbPath, St1} = test_engine_util:init_engine(dbpath),
+
+ ?assertEqual(0, Engine:get_doc_count(St1)),
+ ?assertEqual(0, Engine:get_del_doc_count(St1)),
+ ?assertEqual(0, Engine:get_update_seq(St1)),
+
+ Actions = [
+ {create, {<<"_local/foo">>, [{<<"yay">>, false}]}}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+ {ok, St3} = Engine:commit_data(St2),
+ Engine:terminate(normal, St3),
+ {ok, St4} = Engine:init(DbPath, []),
+
+ ?assertEqual(0, Engine:get_doc_count(St4)),
+ ?assertEqual(0, Engine:get_del_doc_count(St4)),
+ ?assertEqual(0, Engine:get_update_seq(St4)),
+
+ [not_found] = Engine:open_docs(St4, [<<"_local/foo">>]),
+ [#doc{} = Doc] = Engine:open_local_docs(St4, [<<"_local/foo">>]),
+ ?assertEqual([{<<"yay">>, false}], Doc#doc.body).
+
+
+cet_write_mixed_batch() ->
+ {ok, Engine, DbPath, St1} = test_engine_util:init_engine(dbpath),
+
+ ?assertEqual(0, Engine:get_doc_count(St1)),
+ ?assertEqual(0, Engine:get_del_doc_count(St1)),
+ ?assertEqual(0, Engine:get_update_seq(St1)),
+
+ Actions = [
+ {batch, [
+ {create, {<<"bar">>, []}},
+ {create, {<<"_local/foo">>, [{<<"yay">>, false}]}}
+ ]}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+ {ok, St3} = Engine:commit_data(St2),
+ Engine:terminate(normal, St3),
+ {ok, St4} = Engine:init(DbPath, []),
+
+ ?assertEqual(1, Engine:get_doc_count(St4)),
+ ?assertEqual(0, Engine:get_del_doc_count(St4)),
+ ?assertEqual(1, Engine:get_update_seq(St4)),
+
+ [#full_doc_info{}] = Engine:open_docs(St4, [<<"bar">>]),
+ [not_found] = Engine:open_docs(St4, [<<"_local/foo">>]),
+
+ [not_found] = Engine:open_local_docs(St4, [<<"bar">>]),
+ [#doc{}] = Engine:open_local_docs(St4, [<<"_local/foo">>]).
+
+
+cet_update_local_doc() ->
+ {ok, Engine, DbPath, St1} = test_engine_util:init_engine(dbpath),
+
+ ?assertEqual(0, Engine:get_doc_count(St1)),
+ ?assertEqual(0, Engine:get_del_doc_count(St1)),
+ ?assertEqual(0, Engine:get_update_seq(St1)),
+
+ Actions = [
+ {create, {<<"_local/foo">>, []}},
+ {update, {<<"_local/foo">>, [{<<"stuff">>, null}]}}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+ {ok, St3} = Engine:commit_data(St2),
+ Engine:terminate(normal, St3),
+ {ok, St4} = Engine:init(DbPath, []),
+
+ ?assertEqual(0, Engine:get_doc_count(St4)),
+ ?assertEqual(0, Engine:get_del_doc_count(St4)),
+ ?assertEqual(0, Engine:get_update_seq(St4)),
+
+ [not_found] = Engine:open_docs(St4, [<<"_local/foo">>]),
+ [#doc{} = Doc] = Engine:open_local_docs(St4, [<<"_local/foo">>]),
+ ?assertEqual([{<<"stuff">>, null}], Doc#doc.body).
+
+
+cet_delete_local_doc() ->
+ {ok, Engine, DbPath, St1} = test_engine_util:init_engine(dbpath),
+
+ ?assertEqual(0, Engine:get_doc_count(St1)),
+ ?assertEqual(0, Engine:get_del_doc_count(St1)),
+ ?assertEqual(0, Engine:get_update_seq(St1)),
+
+ Actions = [
+ {create, {<<"_local/foo">>, []}},
+ {delete, {<<"_local/foo">>, []}}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+ {ok, St3} = Engine:commit_data(St2),
+ Engine:terminate(normal, St3),
+ {ok, St4} = Engine:init(DbPath, []),
+
+ ?assertEqual(0, Engine:get_doc_count(St4)),
+ ?assertEqual(0, Engine:get_del_doc_count(St4)),
+ ?assertEqual(0, Engine:get_update_seq(St4)),
+
+ [not_found] = Engine:open_docs(St4, [<<"_local/foo">>]),
+ ?assertEqual([not_found], Engine:open_local_docs(St4, [<<"_local/foo">>])).
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/414880e1/src/test_engine_ref_counting.erl
----------------------------------------------------------------------
diff --git a/src/test_engine_ref_counting.erl b/src/test_engine_ref_counting.erl
new file mode 100644
index 0000000..5e60276
--- /dev/null
+++ b/src/test_engine_ref_counting.erl
@@ -0,0 +1,112 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(test_engine_ref_counting).
+-compile(export_all).
+
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(NUM_CLIENTS, 1000).
+
+
+cet_empty_monitors() ->
+ {ok, Engine, St} = test_engine_util:init_engine(),
+ Pids = Engine:monitored_by(St),
+ ?assert(is_list(Pids)),
+ ?assertEqual([], Pids -- [self(), whereis(couch_stats_process_tracker)]).
+
+
+cet_test_system_db() ->
+ Engine = test_engine_util:get_engine(),
+ DbPath = test_engine_util:dbpath(),
+
+ {ok, St} = Engine:init(DbPath, [create, sys_db]),
+ Pids = Engine:monitored_by(St),
+ ?assertEqual(1, length(Pids)).
+
+
+cet_incref_decref() ->
+ {ok, Engine, St} = test_engine_util:init_engine(),
+
+ {Pid, _} = Client = start_client(Engine, St),
+ wait_client(Client),
+
+ Pids1 = Engine:monitored_by(St),
+ ?assert(lists:member(Pid, Pids1)),
+
+ close_client(Client),
+
+ Pids2 = Engine:monitored_by(St),
+ ?assert(not lists:member(Pid, Pids2)).
+
+
+cet_incref_decref_many() ->
+ {ok, Engine, St} = test_engine_util:init_engine(),
+ Clients = lists:map(fun(_) ->
+ start_client(Engine, St)
+ end, lists:seq(1, ?NUM_CLIENTS)),
+
+ lists:foreach(fun(C) -> wait_client(C) end, Clients),
+
+ Pids1 = Engine:monitored_by(St),
+ % +2 for db pid and process tracker
+ ?assertEqual(?NUM_CLIENTS + 2, length(Pids1)),
+
+ lists:foreach(fun(C) -> close_client(C) end, Clients),
+
+ Pids2 = Engine:monitored_by(St),
+ ?assertEqual(2, length(Pids2)).
+
+
+start_client(Engine, St1) ->
+ spawn_monitor(fun() ->
+ {ok, St2} = Engine:incref(St1),
+
+ receive
+ {waiting, Pid} ->
+ Pid ! go
+ after 1000 ->
+ erlang:error(timeout)
+ end,
+
+ receive
+ close ->
+ ok
+ after 1000 ->
+ erlang:error(timeout)
+ end,
+
+ Engine:decref(St2)
+ end).
+
+
+wait_client({Pid, _Ref}) ->
+ Pid ! {waiting, self()},
+ receive
+ go -> ok
+ after 1000 ->
+ erlang:error(timeout)
+ end.
+
+
+close_client({Pid, Ref}) ->
+ Pid ! close,
+ receive
+ {'DOWN', Ref, _, _, _} ->
+ ok
+ after 1000 ->
+ erlang:error(timeout)
+ end.
+
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/414880e1/src/test_engine_util.erl
----------------------------------------------------------------------
diff --git a/src/test_engine_util.erl b/src/test_engine_util.erl
new file mode 100644
index 0000000..33048d3
--- /dev/null
+++ b/src/test_engine_util.erl
@@ -0,0 +1,602 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(test_engine_util).
+-compile(export_all).
+
+
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(TEST_MODULES, [
+ test_engine_open_close_delete,
+ test_engine_get_set_props,
+ test_engine_read_write_docs,
+ test_engine_attachments,
+ test_engine_fold_docs,
+ test_engine_fold_changes,
+ test_engine_purge_docs,
+ test_engine_compaction,
+ test_engine_ref_counting
+]).
+
+
+create_tests(EngineApp) ->
+ create_tests(EngineApp, EngineApp).
+
+
+create_tests(EngineApp, EngineModule) ->
+ application:set_env(couch, test_engine, {EngineApp, EngineModule}),
+ Tests = lists:map(fun(TestMod) ->
+ {atom_to_list(TestMod), gather(TestMod)}
+ end, ?TEST_MODULES),
+ Setup = fun() ->
+ Ctx = test_util:start_couch(),
+ config:set("log", "include_sasl", "false", false),
+ Ctx
+ end,
+ {
+ setup,
+ Setup,
+ fun test_util:stop_couch/1,
+ fun(_) -> Tests end
+ }.
+
+
+gather(Module) ->
+ Exports = Module:module_info(exports),
+ Tests = lists:foldl(fun({Fun, Arity}, Acc) ->
+ case {atom_to_list(Fun), Arity} of
+ {[$c, $e, $t, $_ | _], 0} ->
+ TestFun = make_test_fun(Module, Fun),
+ [{spawn, TestFun} | Acc];
+ _ ->
+ Acc
+ end
+ end, [], Exports),
+ lists:reverse(Tests).
+
+
+make_test_fun(Module, Fun) ->
+ Name = lists:flatten(io_lib:format("~s:~s", [Module, Fun])),
+ Wrapper = fun() ->
+ process_flag(trap_exit, true),
+ Module:Fun()
+ end,
+ {Name, Wrapper}.
+
+rootdir() ->
+ config:get("couchdb", "database_dir", ".").
+
+
+dbpath() ->
+ binary_to_list(filename:join(rootdir(), couch_uuids:random())).
+
+
+get_engine() ->
+ case application:get_env(couch, test_engine) of
+ {ok, {_, Engine}} ->
+ Engine;
+ _ ->
+ couch_bt_engine
+ end.
+
+
+init_engine() ->
+ init_engine(default).
+
+
+init_engine(default) ->
+ Engine = get_engine(),
+ DbPath = dbpath(),
+ {ok, St} = Engine:init(DbPath, [
+ create,
+ {default_security_object, []}
+ ]),
+ {ok, Engine, St};
+
+init_engine(dbpath) ->
+ Engine = get_engine(),
+ DbPath = dbpath(),
+ {ok, St} = Engine:init(DbPath, [
+ create,
+ {default_security_object, []}
+ ]),
+ {ok, Engine, DbPath, St}.
+
+
+apply_actions(_Engine, St, []) ->
+ {ok, St};
+
+apply_actions(Engine, St, [Action | Rest]) ->
+ NewSt = apply_action(Engine, St, Action),
+ apply_actions(Engine, NewSt, Rest).
+
+
+apply_action(Engine, St, {batch, BatchActions}) ->
+ apply_batch(Engine, St, BatchActions);
+
+apply_action(Engine, St, Action) ->
+ apply_batch(Engine, St, [Action]).
+
+
+apply_batch(Engine, St, Actions) ->
+ UpdateSeq = Engine:get_update_seq(St) + 1,
+ AccIn = {UpdateSeq, [], [], []},
+ AccOut = lists:foldl(fun(Action, Acc) ->
+ {SeqAcc, DocAcc, LDocAcc, PurgeAcc} = Acc,
+ case Action of
+ {_, {<<"_local/", _/binary>>, _}} ->
+ LDoc = gen_local_write(Engine, St, Action),
+ {SeqAcc, DocAcc, [LDoc | LDocAcc], PurgeAcc};
+ _ ->
+ case gen_write(Engine, St, Action, SeqAcc) of
+ {_OldFDI, _NewFDI} = Pair ->
+ {SeqAcc + 1, [Pair | DocAcc], LDocAcc, PurgeAcc};
+ {Pair, NewSeqAcc, NewPurgeInfo} ->
+ NewPurgeAcc = [NewPurgeInfo | PurgeAcc],
+ {NewSeqAcc, [Pair | DocAcc], LDocAcc, NewPurgeAcc}
+ end
+ end
+ end, AccIn, Actions),
+ {_, Docs0, LDocs, PurgeIdRevs} = AccOut,
+ Docs = lists:reverse(Docs0),
+ {ok, NewSt} = Engine:write_doc_infos(St, Docs, LDocs, PurgeIdRevs),
+ NewSt.
+
+
+gen_local_write(Engine, St, {Action, {DocId, Body}}) ->
+ PrevRev = case Engine:open_local_docs(St, [DocId]) of
+ [not_found] ->
+ 0;
+ [#doc{revs = {0, []}}] ->
+ 0;
+ [#doc{revs = {0, [RevStr | _]}}] ->
+ list_to_integer(binary_to_list(RevStr))
+ end,
+ {RevId, Deleted} = case Action of
+ Action when Action == create; Action == update ->
+ {list_to_binary(integer_to_list(PrevRev + 1)), false};
+ delete ->
+ {<<"0">>, true}
+ end,
+ #doc{
+ id = DocId,
+ revs = {0, [RevId]},
+ body = Body,
+ deleted = Deleted
+ }.
+
+gen_write(Engine, St, {Action, {DocId, Body}}, UpdateSeq) ->
+ gen_write(Engine, St, {Action, {DocId, Body, []}}, UpdateSeq);
+
+gen_write(Engine, St, {create, {DocId, Body, Atts0}}, UpdateSeq) ->
+ [not_found] = Engine:open_docs(St, [DocId]),
+ Atts = [couch_att:to_disk_term(Att) || Att <- Atts0],
+
+ Rev = crypto:hash(md5, term_to_binary({DocId, Body, Atts})),
+
+ Doc0 = #doc{
+ id = DocId,
+ revs = {0, [Rev]},
+ deleted = false,
+ body = Body,
+ atts = Atts
+ },
+
+ Doc1 = make_doc_summary(Engine, St, Doc0),
+ {ok, Doc2, Len} = Engine:write_doc_body(St, Doc1),
+
+ Sizes = #size_info{
+ active = Len,
+ external = erlang:external_size(Doc1#doc.body)
+ },
+
+ Leaf = #leaf{
+ deleted = false,
+ ptr = Doc2#doc.body,
+ seq = UpdateSeq,
+ sizes = Sizes,
+ atts = Atts
+ },
+
+ {not_found, #full_doc_info{
+ id = DocId,
+ deleted = false,
+ update_seq = UpdateSeq,
+ rev_tree = [{0, {Rev, Leaf, []}}],
+ sizes = Sizes
+ }};
+
+gen_write(Engine, St, {purge, {DocId, PrevRevs0, _}}, UpdateSeq) ->
+ [#full_doc_info{} = PrevFDI] = Engine:open_docs(St, [DocId]),
+ PrevRevs = if is_list(PrevRevs0) -> PrevRevs0; true -> [PrevRevs0] end,
+
+ #full_doc_info{
+ rev_tree = PrevTree
+ } = PrevFDI,
+
+ {NewTree, RemRevs} = couch_key_tree:remove_leafs(PrevTree, PrevRevs),
+ RemovedAll = lists:sort(RemRevs) == lists:sort(PrevRevs),
+ if RemovedAll -> ok; true ->
+ % If we didn't purge all the requested revisions
+ % then its a bug in the test.
+ erlang:error({invalid_purge_test_revs, PrevRevs})
+ end,
+
+ case NewTree of
+ [] ->
+ % We've completely purged the document
+ {{PrevFDI, not_found}, UpdateSeq, {DocId, RemRevs}};
+ _ ->
+ % We have to relabel the update_seq of all
+ % leaves. See couch_db_updater for details.
+ {NewNewTree, NewUpdateSeq} = couch_key_tree:mapfold(fun
+ (_RevId, Leaf, leaf, InnerSeqAcc) ->
+ {Leaf#leaf{seq = InnerSeqAcc}, InnerSeqAcc + 1};
+ (_RevId, Value, _Type, InnerSeqAcc) ->
+ {Value, InnerSeqAcc}
+ end, UpdateSeq, NewTree),
+ NewFDI = PrevFDI#full_doc_info{
+ update_seq = NewUpdateSeq - 1,
+ rev_tree = NewNewTree
+ },
+ {{PrevFDI, NewFDI}, NewUpdateSeq, {DocId, RemRevs}}
+ end;
+
+gen_write(Engine, St, {Action, {DocId, Body, Atts0}}, UpdateSeq) ->
+ [#full_doc_info{} = PrevFDI] = Engine:open_docs(St, [DocId]),
+ Atts = [couch_att:to_disk_term(Att) || Att <- Atts0],
+
+ #full_doc_info{
+ id = DocId,
+ rev_tree = PrevRevTree
+ } = PrevFDI,
+
+ #rev_info{
+ rev = PrevRev
+ } = prev_rev(PrevFDI),
+
+ {RevPos, PrevRevId} = PrevRev,
+
+ Rev = gen_revision(Action, DocId, PrevRev, Body, Atts),
+
+ Doc0 = #doc{
+ id = DocId,
+ revs = {RevPos + 1, [Rev, PrevRevId]},
+ deleted = false,
+ body = Body,
+ atts = Atts
+ },
+
+ Doc1 = make_doc_summary(Engine, St, Doc0),
+ {ok, Doc2, Len} = Engine:write_doc_body(St, Doc1),
+
+ Deleted = case Action of
+ update -> false;
+ conflict -> false;
+ delete -> true
+ end,
+
+ Sizes = #size_info{
+ active = Len,
+ external = erlang:external_size(Doc1#doc.body)
+ },
+
+ Leaf = #leaf{
+ deleted = Deleted,
+ ptr = Doc2#doc.body,
+ seq = UpdateSeq,
+ sizes = Sizes,
+ atts = Atts
+ },
+
+ Path = gen_path(Action, RevPos, PrevRevId, Rev, Leaf),
+ RevsLimit = Engine:get_revs_limit(St),
+ NodeType = case Action of
+ conflict -> new_branch;
+ _ -> new_leaf
+ end,
+ {NewTree, NodeType} = couch_key_tree:merge(PrevRevTree, Path, RevsLimit),
+
+ NewFDI = PrevFDI#full_doc_info{
+ deleted = couch_doc:is_deleted(NewTree),
+ update_seq = UpdateSeq,
+ rev_tree = NewTree,
+ sizes = Sizes
+ },
+
+ {PrevFDI, NewFDI}.
+
+
+gen_revision(conflict, DocId, _PrevRev, Body, Atts) ->
+ crypto:hash(md5, term_to_binary({DocId, Body, Atts}));
+gen_revision(delete, DocId, PrevRev, Body, Atts) ->
+ gen_revision(update, DocId, PrevRev, Body, Atts);
+gen_revision(update, DocId, PrevRev, Body, Atts) ->
+ crypto:hash(md5, term_to_binary({DocId, PrevRev, Body, Atts})).
+
+
+gen_path(conflict, _RevPos, _PrevRevId, Rev, Leaf) ->
+ {0, {Rev, Leaf, []}};
+gen_path(delete, RevPos, PrevRevId, Rev, Leaf) ->
+ gen_path(update, RevPos, PrevRevId, Rev, Leaf);
+gen_path(update, RevPos, PrevRevId, Rev, Leaf) ->
+ {RevPos, {PrevRevId, ?REV_MISSING, [{Rev, Leaf, []}]}}.
+
+
+make_doc_summary(Engine, St, DocData) ->
+ {_, Ref} = spawn_monitor(fun() ->
+ exit({result, Engine:serialize_doc(St, DocData)})
+ end),
+ receive
+ {'DOWN', Ref, _, _, {result, Summary}} ->
+ Summary;
+ {'DOWN', Ref, _, _, Error} ->
+ erlang:error({make_doc_summary_error, Error})
+ after 1000 ->
+ erlang:error(make_doc_summary_timeout)
+ end.
+
+
+prep_atts(_Engine, _St, []) ->
+ [];
+
+prep_atts(Engine, St, [{FileName, Data} | Rest]) ->
+ {_, Ref} = spawn_monitor(fun() ->
+ {ok, Stream} = Engine:open_write_stream(St, []),
+ exit(write_att(Stream, FileName, Data, Data))
+ end),
+ Att = receive
+ {'DOWN', Ref, _, _, Resp} ->
+ Resp
+ after 5000 ->
+ erlang:error(attachment_write_timeout)
+ end,
+ [Att | prep_atts(Engine, St, Rest)].
+
+
+write_att(Stream, FileName, OrigData, <<>>) ->
+ {StreamEngine, Len, Len, Md5, Md5} = couch_stream:close(Stream),
+ couch_util:check_md5(Md5, crypto:hash(md5, OrigData)),
+ Len = size(OrigData),
+ couch_att:new([
+ {name, FileName},
+ {type, <<"application/octet-stream">>},
+ {data, {stream, StreamEngine}},
+ {att_len, Len},
+ {disk_len, Len},
+ {md5, Md5},
+ {encoding, identity}
+ ]);
+
+write_att(Stream, FileName, OrigData, Data) ->
+ {Chunk, Rest} = case size(Data) > 4096 of
+ true ->
+ <<Head:4096/binary, Tail/binary>> = Data,
+ {Head, Tail};
+ false ->
+ {Data, <<>>}
+ end,
+ ok = couch_stream:write(Stream, Chunk),
+ write_att(Stream, FileName, OrigData, Rest).
+
+
+prev_rev(#full_doc_info{} = FDI) ->
+ #doc_info{
+ revs = [#rev_info{} = PrevRev | _]
+ } = couch_doc:to_doc_info(FDI),
+ PrevRev.
+
+
+db_as_term(Engine, St) ->
+ [
+ {props, db_props_as_term(Engine, St)},
+ {docs, db_docs_as_term(Engine, St)},
+ {local_docs, db_local_docs_as_term(Engine, St)},
+ {changes, db_changes_as_term(Engine, St)}
+ ].
+
+
+db_props_as_term(Engine, St) ->
+ Props = [
+ get_doc_count,
+ get_del_doc_count,
+ get_disk_version,
+ get_update_seq,
+ get_purge_seq,
+ get_last_purged,
+ get_security,
+ get_revs_limit,
+ get_uuid,
+ get_epochs
+ ],
+ lists:map(fun(Fun) ->
+ {Fun, Engine:Fun(St)}
+ end, Props).
+
+
+db_docs_as_term(Engine, St) ->
+ FoldFun = fun(FDI, Acc) -> {ok, [FDI | Acc]} end,
+ {ok, FDIs} = Engine:fold_docs(St, FoldFun, [], []),
+ lists:reverse(lists:map(fun(FDI) ->
+ fdi_to_term(Engine, St, FDI)
+ end, FDIs)).
+
+
+db_local_docs_as_term(Engine, St) ->
+ FoldFun = fun(Doc, Acc) -> {ok, [Doc | Acc]} end,
+ {ok, LDocs} = Engine:fold_local_docs(St, FoldFun, [], []),
+ lists:reverse(LDocs).
+
+
+db_changes_as_term(Engine, St) ->
+ FoldFun = fun(FDI, Acc) -> {ok, [FDI | Acc]} end,
+ {ok, Changes} = Engine:fold_changes(St, 0, FoldFun, [], []),
+ lists:reverse(lists:map(fun(FDI) ->
+ fdi_to_term(Engine, St, FDI)
+ end, Changes)).
+
+
+fdi_to_term(Engine, St, FDI) ->
+ #full_doc_info{
+ id = DocId,
+ rev_tree = OldTree
+ } = FDI,
+ {NewRevTree, _} = couch_key_tree:mapfold(fun(Rev, Node, Type, Acc) ->
+ tree_to_term(Rev, Node, Type, Acc, DocId)
+ end, {Engine, St}, OldTree),
+ FDI#full_doc_info{
+ rev_tree = NewRevTree,
+ % Blank out sizes because we allow storage
+ % engines to handle this with their own
+ % definition until further notice.
+ sizes = #size_info{
+ active = -1,
+ external = -1
+ }
+ }.
+
+
+tree_to_term(_Rev, _Leaf, branch, Acc, _DocId) ->
+ {?REV_MISSING, Acc};
+
+tree_to_term({Pos, RevId}, #leaf{} = Leaf, leaf, {Engine, St}, DocId) ->
+ #leaf{
+ deleted = Deleted,
+ ptr = Ptr
+ } = Leaf,
+
+ Doc0 = #doc{
+ id = DocId,
+ revs = {Pos, [RevId]},
+ deleted = Deleted,
+ body = Ptr
+ },
+
+ Doc1 = Engine:read_doc_body(St, Doc0),
+
+ Body = if not is_binary(Doc1#doc.body) -> Doc1#doc.body; true ->
+ couch_compress:decompress(Doc1#doc.body)
+ end,
+
+ Atts1 = if not is_binary(Doc1#doc.atts) -> Doc1#doc.atts; true ->
+ couch_compress:decompress(Doc1#doc.atts)
+ end,
+
+ StreamSrc = fun(Sp) -> Engine:open_read_stream(St, Sp) end,
+ Atts2 = [couch_att:from_disk_term(StreamSrc, Att) || Att <- Atts1],
+ Atts = [att_to_term(Att) || Att <- Atts2],
+
+ NewLeaf = Leaf#leaf{
+ ptr = Body,
+ sizes = #size_info{active = -1, external = -1},
+ atts = Atts
+ },
+ {NewLeaf, {Engine, St}}.
+
+
+att_to_term(Att) ->
+ Bin = couch_att:to_binary(Att),
+ couch_att:store(data, Bin, Att).
+
+
+term_diff(T1, T2) when is_tuple(T1), is_tuple(T2) ->
+ tuple_diff(tuple_to_list(T1), tuple_to_list(T2));
+
+term_diff(L1, L2) when is_list(L1), is_list(L2) ->
+ list_diff(L1, L2);
+
+term_diff(V1, V2) when V1 == V2 ->
+ nodiff;
+
+term_diff(V1, V2) ->
+ {V1, V2}.
+
+
+tuple_diff([], []) ->
+ nodiff;
+
+tuple_diff([T1 | _], []) ->
+ {longer, T1};
+
+tuple_diff([], [T2 | _]) ->
+ {shorter, T2};
+
+tuple_diff([T1 | R1], [T2 | R2]) ->
+ case term_diff(T1, T2) of
+ nodiff ->
+ tuple_diff(R1, R2);
+ Else ->
+ {T1, Else}
+ end.
+
+
+list_diff([], []) ->
+ nodiff;
+
+list_diff([T1 | _], []) ->
+ {longer, T1};
+
+list_diff([], [T2 | _]) ->
+ {shorter, T2};
+
+list_diff([T1 | R1], [T2 | R2]) ->
+ case term_diff(T1, T2) of
+ nodiff ->
+ list_diff(R1, R2);
+ Else ->
+ {T1, Else}
+ end.
+
+
+compact(Engine, St1, DbPath) ->
+ DbName = filename:basename(DbPath),
+ {ok, St2, Pid} = Engine:start_compaction(St1, DbName, [], self()),
+ Ref = erlang:monitor(process, Pid),
+
+ % Ideally I'd assert that Pid is linked to us
+ % at this point but its technically possible
+ % that it could have finished compacting by
+ % the time we check... Quite the quandry.
+
+ Term = receive
+ {'$gen_cast', {compact_done, Engine, Term0}} ->
+ Term0;
+ {'DOWN', Ref, _, _, Reason} ->
+ erlang:error({compactor_died, Reason})
+ after 10000 ->
+ erlang:error(compactor_timed_out)
+ end,
+
+ {ok, St2, DbName, Pid, Term}.
+
+
+with_config(Config, Fun) ->
+ OldConfig = apply_config(Config),
+ try
+ Fun()
+ after
+ apply_config(OldConfig)
+ end.
+
+
+apply_config([]) ->
+ [];
+
+apply_config([{Section, Key, Value} | Rest]) ->
+ Orig = config:get(Section, Key),
+ case Value of
+ undefined -> config:delete(Section, Key);
+ _ -> config:set(Section, Key, Value)
+ end,
+ [{Section, Key, Orig} | apply_config(Rest)].
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/414880e1/test/couch_bt_engine_tests.erl
----------------------------------------------------------------------
diff --git a/test/couch_bt_engine_tests.erl b/test/couch_bt_engine_tests.erl
new file mode 100644
index 0000000..df200df
--- /dev/null
+++ b/test/couch_bt_engine_tests.erl
@@ -0,0 +1,20 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_bt_engine_tests).
+
+
+-include_lib("eunit/include/eunit.hrl").
+
+
+couch_bt_engine_test_()->
+ test_engine_util:create_tests(couch, couch_bt_engine).
[5/5] couch commit: updated
refs/heads/COUCHDB-3287-pluggable-storage-engines to 0f4e1a7
Posted by da...@apache.org.
Implement pluggable storage engines
This change moves the main work of storage engines to run through the
new couch_db_engine behavior. This allows us to replace the storage
engine with different implementations that can be tailored to specific
work loads and environments.
COUCHDB-3287
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/e056ae9e
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/e056ae9e
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/e056ae9e
Branch: refs/heads/COUCHDB-3287-pluggable-storage-engines
Commit: e056ae9e5461db4a4ce9152d78ad2b09ea00e057
Parents: 7430850
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Feb 5 12:04:20 2016 -0600
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Thu Mar 2 11:03:55 2017 -0600
----------------------------------------------------------------------
include/couch_db.hrl | 5 +-
src/couch_att.erl | 130 ++-
src/couch_auth_cache.erl | 9 +-
src/couch_bt_engine.erl | 10 +-
src/couch_changes.erl | 21 +-
src/couch_compaction_daemon.erl | 32 +-
src/couch_db.erl | 681 ++++++-------
src/couch_db_engine.erl | 3 -
src/couch_db_int.hrl | 66 +-
src/couch_db_updater.erl | 1270 ++++++-------------------
src/couch_httpd_db.erl | 8 +-
src/couch_httpd_misc_handlers.erl | 13 -
src/couch_server.erl | 191 +++-
src/couch_stream.erl | 256 ++---
src/couch_util.erl | 40 +-
src/test_util.erl | 2 +
test/couch_db_plugin_tests.erl | 2 +-
test/couch_stream_tests.erl | 32 +-
test/couchdb_compaction_daemon_tests.erl | 2 +-
test/couchdb_views_tests.erl | 43 +-
20 files changed, 1097 insertions(+), 1719 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/e056ae9e/include/couch_db.hrl
----------------------------------------------------------------------
diff --git a/include/couch_db.hrl b/include/couch_db.hrl
index 5abb316..ca57cce 100644
--- a/include/couch_db.hrl
+++ b/include/couch_db.hrl
@@ -112,7 +112,10 @@
% the json body object.
body = {[]},
- atts = [] :: [couch_att:att()], % attachments
+ % Atts can be a binary when a storage engine
+ % returns attachment info blob in compressed
+ % form.
+ atts = [] :: [couch_att:att()] | binary(), % attachments
deleted = false,
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/e056ae9e/src/couch_att.erl
----------------------------------------------------------------------
diff --git a/src/couch_att.erl b/src/couch_att.erl
index 9d38cfa..e6ed7df 100644
--- a/src/couch_att.erl
+++ b/src/couch_att.erl
@@ -18,7 +18,8 @@
fetch/2,
store/2,
store/3,
- transform/3
+ transform/3,
+ copy/2
]).
-export([
@@ -233,6 +234,14 @@ transform(Field, Fun, Att) ->
store(Field, NewValue, Att).
+copy(Att, DstStream) ->
+ [{stream, SrcStream}, AttLen, OldMd5] = fetch([data, att_len, md5], Att),
+ ok = couch_stream:copy(SrcStream, DstStream),
+ {NewStream, AttLen, _, NewMd5, _} = couch_stream:close(DstStream),
+ couch_util:check_md5(OldMd5, NewMd5),
+ store(data, {stream, NewStream}, Att).
+
+
is_stub(Att) ->
stub == fetch(data, Att).
@@ -292,11 +301,12 @@ size_info(Atts) ->
%% as safe as possible, avoiding the need for complicated disk versioning
%% schemes.
to_disk_term(#att{} = Att) ->
- {_, StreamIndex} = fetch(data, Att),
+ {stream, StreamEngine} = fetch(data, Att),
+ {ok, Sp} = couch_stream:to_disk_term(StreamEngine),
{
fetch(name, Att),
fetch(type, Att),
- StreamIndex,
+ Sp,
fetch(att_len, Att),
fetch(disk_len, Att),
fetch(revpos, Att),
@@ -309,9 +319,13 @@ to_disk_term(Att) ->
fun
(data, {Props, Values}) ->
case lists:keytake(data, 1, Props) of
- {value, {_, {_Fd, Sp}}, Other} -> {Other, [Sp | Values]};
- {value, {_, Value}, Other} -> {Other, [Value | Values]};
- false -> {Props, [undefined |Values ]}
+ {value, {_, {stream, StreamEngine}}, Other} ->
+ {ok, Sp} = couch_stream:to_disk_term(StreamEngine),
+ {Other, [Sp | Values]};
+ {value, {_, Value}, Other} ->
+ {Other, [Value | Values]};
+ false ->
+ {Props, [undefined |Values ]}
end;
(Key, {Props, Values}) ->
case lists:keytake(Key, 1, Props) of
@@ -332,9 +346,11 @@ to_disk_term(Att) ->
%% compression to remove these sorts of common bits (block level compression
%% with something like a shared dictionary that is checkpointed every now and
%% then).
-from_disk_term(Fd, {Base, Extended}) when is_tuple(Base), is_list(Extended) ->
- store(Extended, from_disk_term(Fd, Base));
-from_disk_term(Fd, {Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) ->
+from_disk_term(StreamSrc, {Base, Extended})
+ when is_tuple(Base), is_list(Extended) ->
+ store(Extended, from_disk_term(StreamSrc, Base));
+from_disk_term(StreamSrc, {Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) ->
+ {ok, Stream} = open_stream(StreamSrc, Sp),
#att{
name=Name,
type=Type,
@@ -342,10 +358,11 @@ from_disk_term(Fd, {Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) ->
disk_len=DiskLen,
md5=Md5,
revpos=RevPos,
- data={Fd,Sp},
+ data={stream, Stream},
encoding=upgrade_encoding(Enc)
};
-from_disk_term(Fd, {Name,Type,Sp,AttLen,RevPos,Md5}) ->
+from_disk_term(StreamSrc, {Name,Type,Sp,AttLen,RevPos,Md5}) ->
+ {ok, Stream} = open_stream(StreamSrc, Sp),
#att{
name=Name,
type=Type,
@@ -353,9 +370,10 @@ from_disk_term(Fd, {Name,Type,Sp,AttLen,RevPos,Md5}) ->
disk_len=AttLen,
md5=Md5,
revpos=RevPos,
- data={Fd,Sp}
+ data={stream, Stream}
};
-from_disk_term(Fd, {Name,{Type,Sp,AttLen}}) ->
+from_disk_term(StreamSrc, {Name,{Type,Sp,AttLen}}) ->
+ {ok, Stream} = open_stream(StreamSrc, Sp),
#att{
name=Name,
type=Type,
@@ -363,7 +381,7 @@ from_disk_term(Fd, {Name,{Type,Sp,AttLen}}) ->
disk_len=AttLen,
md5= <<>>,
revpos=0,
- data={Fd,Sp}
+ data={stream, Stream}
}.
@@ -477,32 +495,18 @@ to_json(Att, OutputData, DataToFollow, ShowEncoding) ->
{Name, {Props ++ DigestProp ++ DataProps ++ EncodingProps ++ HeadersProp}}.
-flush(Fd, Att) ->
- flush_data(Fd, fetch(data, Att), Att).
+flush(Db, Att) ->
+ flush_data(Db, fetch(data, Att), Att).
-flush_data(Fd, {Fd0, _}, Att) when Fd0 == Fd ->
- % already written to our file, nothing to write
- Att;
-flush_data(Fd, {OtherFd, StreamPointer}, Att) ->
- [InMd5, InDiskLen] = fetch([md5, disk_len], Att),
- {NewStreamData, Len, _IdentityLen, Md5, IdentityMd5} =
- couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd),
- couch_db:check_md5(IdentityMd5, InMd5),
- store([
- {data, {Fd, NewStreamData}},
- {md5, Md5},
- {att_len, Len},
- {disk_len, InDiskLen}
- ], Att);
-flush_data(Fd, Data, Att) when is_binary(Data) ->
- couch_db:with_stream(Fd, Att, fun(OutputStream) ->
+flush_data(Db, Data, Att) when is_binary(Data) ->
+ couch_db:with_stream(Db, Att, fun(OutputStream) ->
couch_stream:write(OutputStream, Data)
end);
-flush_data(Fd, Fun, Att) when is_function(Fun) ->
+flush_data(Db, Fun, Att) when is_function(Fun) ->
case fetch(att_len, Att) of
undefined ->
- couch_db:with_stream(Fd, Att, fun(OutputStream) ->
+ couch_db:with_stream(Db, Att, fun(OutputStream) ->
% Fun(MaxChunkSize, WriterFun) must call WriterFun
% once for each chunk of the attachment,
Fun(4096,
@@ -523,11 +527,11 @@ flush_data(Fd, Fun, Att) when is_function(Fun) ->
end, ok)
end);
AttLen ->
- couch_db:with_stream(Fd, Att, fun(OutputStream) ->
+ couch_db:with_stream(Db, Att, fun(OutputStream) ->
write_streamed_attachment(OutputStream, Fun, AttLen)
end)
end;
-flush_data(Fd, {follows, Parser, Ref}, Att) ->
+flush_data(Db, {follows, Parser, Ref}, Att) ->
ParserRef = erlang:monitor(process, Parser),
Fun = fun() ->
Parser ! {get_bytes, Ref, self()},
@@ -541,9 +545,23 @@ flush_data(Fd, {follows, Parser, Ref}, Att) ->
end
end,
try
- flush_data(Fd, Fun, store(data, Fun, Att))
+ flush_data(Db, Fun, store(data, Fun, Att))
after
erlang:demonitor(ParserRef, [flush])
+ end;
+flush_data(Db, {stream, StreamEngine}, Att) ->
+ case couch_db:is_active_stream(Db, StreamEngine) of
+ true ->
+ % Already written
+ Att;
+ false ->
+ NewAtt = couch_db:with_stream(Db, Att, fun(OutputStream) ->
+ couch_stream:copy(StreamEngine, OutputStream)
+ end),
+ InMd5 = fetch(md5, Att),
+ OutMd5 = fetch(md5, NewAtt),
+ couch_util:check_md5(OutMd5, InMd5),
+ NewAtt
end.
@@ -572,9 +590,9 @@ foldl(Att, Fun, Acc) ->
foldl(Bin, _Att, Fun, Acc) when is_binary(Bin) ->
Fun(Bin, Acc);
-foldl({Fd, Sp}, Att, Fun, Acc) ->
+foldl({stream, StreamEngine}, Att, Fun, Acc) ->
Md5 = fetch(md5, Att),
- couch_stream:foldl(Fd, Sp, Md5, Fun, Acc);
+ couch_stream:foldl(StreamEngine, Md5, Fun, Acc);
foldl(DataFun, Att, Fun, Acc) when is_function(DataFun) ->
Len = fetch(att_len, Att),
fold_streamed_data(DataFun, Len, Fun, Acc);
@@ -599,14 +617,15 @@ foldl({follows, Parser, Ref}, Att, Fun, Acc) ->
range_foldl(Att, From, To, Fun, Acc) ->
- {Fd, Sp} = fetch(data, Att),
- couch_stream:range_foldl(Fd, Sp, From, To, Fun, Acc).
+ {stream, StreamEngine} = fetch(data, Att),
+ couch_stream:range_foldl(StreamEngine, From, To, Fun, Acc).
foldl_decode(Att, Fun, Acc) ->
case fetch([data, encoding], Att) of
- [{Fd, Sp}, Enc] ->
- couch_stream:foldl_decode(Fd, Sp, fetch(md5, Att), Enc, Fun, Acc);
+ [{stream, StreamEngine}, Enc] ->
+ couch_stream:foldl_decode(
+ StreamEngine, fetch(md5, Att), Enc, Fun, Acc);
[Fun2, identity] ->
fold_streamed_data(Fun2, fetch(att_len, Att), Fun, Acc)
end.
@@ -620,7 +639,7 @@ to_binary(Bin, _Att) when is_binary(Bin) ->
Bin;
to_binary(Iolist, _Att) when is_list(Iolist) ->
iolist_to_binary(Iolist);
-to_binary({_Fd,_Sp}, Att) ->
+to_binary({stream, _StreamEngine}, Att) ->
iolist_to_binary(
lists:reverse(foldl(Att, fun(Bin,Acc) -> [Bin|Acc] end, []))
);
@@ -680,9 +699,25 @@ upgrade_encoding(false) -> identity;
upgrade_encoding(Encoding) -> Encoding.
+open_stream(StreamSrc, Data) ->
+ case couch_db:is_db(StreamSrc) of
+ true ->
+ couch_db:open_read_stream(StreamSrc, Data);
+ false ->
+ case is_function(StreamSrc, 1) of
+ true ->
+ StreamSrc(Data);
+ false ->
+ erlang:error({invalid_stream_source, StreamSrc})
+ end
+ end.
+
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
+% Eww...
+-include("couch_bt_engine.hrl").
%% Test utilities
@@ -737,7 +772,7 @@ attachment_disk_term_test_() ->
{disk_len, 0},
{md5, <<212,29,140,217,143,0,178,4,233,128,9,152,236,248,66,126>>},
{revpos, 4},
- {data, {fake_fd, fake_sp}},
+ {data, {stream, {couch_bt_engine_stream, {fake_fd, fake_sp}}}},
{encoding, identity}
]),
BaseDiskTerm = {
@@ -751,11 +786,12 @@ attachment_disk_term_test_() ->
Headers = [{<<"X-Foo">>, <<"bar">>}],
ExtendedAttachment = store(headers, Headers, BaseAttachment),
ExtendedDiskTerm = {BaseDiskTerm, [{headers, Headers}]},
+ FakeDb = test_util:fake_db([{engine, {couch_bt_engine, #st{fd=fake_fd}}}]),
{"Disk term tests", [
?_assertEqual(BaseDiskTerm, to_disk_term(BaseAttachment)),
- ?_assertEqual(BaseAttachment, from_disk_term(fake_fd, BaseDiskTerm)),
+ ?_assertEqual(BaseAttachment, from_disk_term(FakeDb, BaseDiskTerm)),
?_assertEqual(ExtendedDiskTerm, to_disk_term(ExtendedAttachment)),
- ?_assertEqual(ExtendedAttachment, from_disk_term(fake_fd, ExtendedDiskTerm))
+ ?_assertEqual(ExtendedAttachment, from_disk_term(FakeDb, ExtendedDiskTerm))
]}.
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/e056ae9e/src/couch_auth_cache.erl
----------------------------------------------------------------------
diff --git a/src/couch_auth_cache.erl b/src/couch_auth_cache.erl
index 6895992..7f1e064 100644
--- a/src/couch_auth_cache.erl
+++ b/src/couch_auth_cache.erl
@@ -326,13 +326,8 @@ refresh_entries(AuthDb) ->
AuthDb2Seq = couch_db:get_update_seq(AuthDb2),
case AuthDb2Seq > AuthDbSeq of
true ->
- {ok, _, _} = couch_db:enum_docs_since(
- AuthDb2,
- AuthDbSeq,
- fun(DocInfo, _, _) -> refresh_entry(AuthDb2, DocInfo) end,
- AuthDbSeq,
- []
- ),
+ Fun = fun(DocInfo, _) -> refresh_entry(AuthDb2, DocInfo) end,
+ {ok, _} = couch_db:fold_changes(AuthDb2, AuthDbSeq, Fun, nil),
true = ets:insert(?STATE, {auth_db, AuthDb2});
false ->
ok
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/e056ae9e/src/couch_bt_engine.erl
----------------------------------------------------------------------
diff --git a/src/couch_bt_engine.erl b/src/couch_bt_engine.erl
index 7c0fdbc..cf31b09 100644
--- a/src/couch_bt_engine.erl
+++ b/src/couch_bt_engine.erl
@@ -445,11 +445,11 @@ is_active_stream(_, _) ->
fold_docs(St, UserFun, UserAcc, Options) ->
- fold_docs_int(St#st.id_tree, UserFun, UserAcc, Options).
+ fold_docs_int(St, St#st.id_tree, UserFun, UserAcc, Options).
fold_local_docs(St, UserFun, UserAcc, Options) ->
- fold_docs_int(St#st.local_tree, UserFun, UserAcc, Options).
+ fold_docs_int(St, St#st.local_tree, UserFun, UserAcc, Options).
fold_changes(St, SinceSeq, UserFun, UserAcc, Options) ->
@@ -838,7 +838,7 @@ active_size(#st{} = St, #size_info{} = SI) ->
end, SI#size_info.active, Trees).
-fold_docs_int(Tree, UserFun, UserAcc, Options) ->
+fold_docs_int(St, Tree, UserFun, UserAcc, Options) ->
Fun = case lists:member(include_deleted, Options) of
true -> fun include_deleted/4;
false -> fun skip_deleted/4
@@ -851,8 +851,10 @@ fold_docs_int(Tree, UserFun, UserAcc, Options) ->
{ok, Reds, OutAcc} = couch_btree:fold(Tree, Fun, InAcc, Options),
{_, {_, FinalUserAcc}} = OutAcc,
case lists:member(include_reductions, Options) of
- true ->
+ true when Tree == St#st.id_tree ->
{ok, fold_docs_reduce_to_count(Reds), FinalUserAcc};
+ true when Tree == St#st.local_tree ->
+ {ok, 0, FinalUserAcc};
false ->
{ok, FinalUserAcc}
end.
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/e056ae9e/src/couch_changes.erl
----------------------------------------------------------------------
diff --git a/src/couch_changes.erl b/src/couch_changes.erl
index ea7f65c..7dfefed 100644
--- a/src/couch_changes.erl
+++ b/src/couch_changes.erl
@@ -536,7 +536,8 @@ send_changes(Acc, Dir, FirstRound) ->
{#mrview{}, {fast_view, _, _, _}} ->
couch_mrview:view_changes_since(View, StartSeq, DbEnumFun, [{dir, Dir}], Acc);
{undefined, _} ->
- couch_db:changes_since(Db, StartSeq, DbEnumFun, [{dir, Dir}], Acc);
+ Opts = [{dir, Dir}],
+ couch_db:fold_changes(Db, StartSeq, DbEnumFun, Acc, Opts);
{#mrview{}, _} ->
ViewEnumFun = fun view_changes_enumerator/2,
{Go, Acc0} = couch_mrview:view_changes_since(View, StartSeq, ViewEnumFun, [{dir, Dir}], Acc),
@@ -571,18 +572,22 @@ can_optimize(_, _) ->
send_changes_doc_ids(Db, StartSeq, Dir, Fun, Acc0, {doc_ids, _Style, DocIds}) ->
- Lookups = couch_db:get_full_doc_infos(Db, DocIds),
+ Results = couch_db:get_full_doc_infos(Db, DocIds),
FullInfos = lists:foldl(fun
- ({ok, FDI}, Acc) -> [FDI | Acc];
+ (#full_doc_info{}=FDI, Acc) -> [FDI | Acc];
(not_found, Acc) -> Acc
- end, [], Lookups),
+ end, [], Results),
send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0).
send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0, {design_docs, _Style}) ->
FoldFun = fun(FDI, Acc) -> {ok, [FDI | Acc]} end,
- KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}],
- {ok, FullInfos} = couch_db:fold_docs(Db, FoldFun, [], KeyOpts),
+ Opts = [
+ include_deleted,
+ {start_key, <<"_design/">>},
+ {end_key_gt, <<"_design0">>}
+ ],
+ {ok, FullInfos} = couch_db:fold_docs(Db, FoldFun, [], Opts),
send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0).
@@ -757,6 +762,8 @@ changes_enumerator(Value0, Acc) ->
end,
Results = [Result || Result <- Results0, Result /= null],
Seq = case Value of
+ #full_doc_info{} ->
+ Value#full_doc_info.update_seq;
#doc_info{} ->
Value#doc_info.high_seq;
{{Seq0, _}, _} ->
@@ -816,6 +823,8 @@ view_changes_row(Results, KVs, Acc) ->
] ++ maybe_get_changes_doc({Id, Rev}, Acc)}.
+changes_row(Results, #full_doc_info{} = FDI, Acc) ->
+ changes_row(Results, couch_doc:to_doc_info(FDI), Acc);
changes_row(Results, DocInfo, Acc) ->
#doc_info{
id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/e056ae9e/src/couch_compaction_daemon.erl
----------------------------------------------------------------------
diff --git a/src/couch_compaction_daemon.erl b/src/couch_compaction_daemon.erl
index f3b646d..da7481c 100644
--- a/src/couch_compaction_daemon.erl
+++ b/src/couch_compaction_daemon.erl
@@ -236,17 +236,18 @@ maybe_compact_views(DbName, [DDocName | Rest], Config) ->
db_ddoc_names(Db) ->
- {ok, _, DDocNames} = couch_db:enum_docs(
- Db,
- fun(#full_doc_info{id = <<"_design/", _/binary>>, deleted = true}, _, Acc) ->
- {ok, Acc};
- (#full_doc_info{id = <<"_design/", Id/binary>>}, _, Acc) ->
- {ok, [Id | Acc]};
- (_, _, Acc) ->
- {stop, Acc}
- end, [], [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}]),
+ FoldFun = fun ddoc_name/2,
+ Opts = [{start_key, <<"_design/">>}],
+ {ok, DDocNames} = couch_db:fold_docs(Db, FoldFun, [], Opts),
DDocNames.
+ddoc_name(#full_doc_info{id = <<"_design/", _/binary>>, deleted = true}, Acc) ->
+ {ok, Acc};
+ddoc_name(#full_doc_info{id = <<"_design/", Id/binary>>}, Acc) ->
+ {ok, [Id | Acc]};
+ddoc_name(_, Acc) ->
+ {stop, Acc}.
+
maybe_compact_view(DbName, GroupId, Config) ->
DDocId = <<"_design/", GroupId/binary>>,
@@ -391,21 +392,22 @@ check_frag(Threshold, Frag) ->
frag(Props) ->
- FileSize = couch_util:get_value(disk_size, Props),
+ {Sizes} = couch_util:get_value(sizes, Props),
+ FileSize = couch_util:get_value(file, Sizes),
MinFileSize = list_to_integer(
config:get("compaction_daemon", "min_file_size", "131072")),
case FileSize < MinFileSize of
true ->
{0, FileSize};
false ->
- case couch_util:get_value(data_size, Props) of
- null ->
- {100, FileSize};
+ case couch_util:get_value(active, Sizes) of
0 ->
{0, FileSize};
- DataSize ->
+ DataSize when is_integer(DataSize), DataSize > 0 ->
Frag = round(((FileSize - DataSize) / FileSize * 100)),
- {Frag, space_required(DataSize)}
+ {Frag, space_required(DataSize)};
+ _ ->
+ {100, FileSize}
end
end.
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/e056ae9e/src/couch_db.erl
----------------------------------------------------------------------
diff --git a/src/couch_db.erl b/src/couch_db.erl
index e2bc2c3..f5281a3 100644
--- a/src/couch_db.erl
+++ b/src/couch_db.erl
@@ -16,12 +16,10 @@
create/2,
open/2,
open_int/2,
+ incref/1,
reopen/1,
close/1,
- incref/1,
- decref/1,
-
clustered_db/2,
clustered_db/3,
@@ -34,13 +32,13 @@
check_is_member/1,
name/1,
- compression/1,
get_after_doc_read_fun/1,
get_before_doc_update_fun/1,
get_committed_update_seq/1,
get_compacted_seq/1,
get_compactor_pid/1,
get_db_info/1,
+ get_del_doc_count/1,
get_doc_count/1,
get_epochs/1,
get_filepath/1,
@@ -58,7 +56,6 @@
is_system_db/1,
is_clustered/1,
- increment_update_seq/1,
set_revs_limit/2,
set_security/2,
set_user_ctx/2,
@@ -67,12 +64,12 @@
ensure_full_commit/2,
load_validation_funs/1,
+ reload_validation_funs/1,
open_doc/2,
open_doc/3,
open_doc_revs/4,
open_doc_int/3,
- read_doc/2,
get_doc_info/2,
get_full_doc_info/2,
get_full_doc_infos/2,
@@ -89,16 +86,16 @@
purge_docs/2,
with_stream/3,
+ open_write_stream/2,
+ open_read_stream/2,
+ is_active_stream/2,
+ fold_docs/3,
fold_docs/4,
fold_local_docs/4,
- enum_docs/4,
- enum_docs_reduce_to_count/1,
-
- enum_docs_since/5,
- enum_docs_since_reduce_to_count/1,
- changes_since/4,
- changes_since/5,
+ fold_design_docs/4,
+ fold_changes/4,
+ fold_changes/5,
count_changes_since/2,
calculate_start_seq/3,
@@ -113,14 +110,13 @@
normalize_dbname/1,
validate_dbname/1,
- check_md5/2,
make_doc/5,
new_revid/1
]).
-export([
- start_link/3
+ start_link/4
]).
@@ -132,38 +128,9 @@
"(\\.[0-9]{10,})?$" % but allow an optional shard timestamp at the end
).
-start_link(DbName, Filepath, Options) ->
- case open_db_file(Filepath, Options) of
- {ok, Fd} ->
- {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {DbName,
- Filepath, Fd, Options}, []),
- unlink(Fd),
- gen_server:call(UpdaterPid, get_db);
- Else ->
- Else
- end.
-
-open_db_file(Filepath, Options) ->
- case couch_file:open(Filepath, Options) of
- {ok, Fd} ->
- {ok, Fd};
- {error, enoent} ->
- % couldn't find file. is there a compact version? This can happen if
- % crashed during the file switch.
- case couch_file:open(Filepath ++ ".compact", [nologifmissing]) of
- {ok, Fd} ->
- couch_log:info("Found ~s~s compaction file, using as primary"
- " storage.", [Filepath, ".compact"]),
- ok = file:rename(Filepath ++ ".compact", Filepath),
- ok = couch_file:sync(Fd),
- {ok, Fd};
- {error, enoent} ->
- {not_found, no_db_file}
- end;
- Error ->
- Error
- end.
-
+start_link(Engine, DbName, Filepath, Options) ->
+ Arg = {Engine, DbName, Filepath, Options},
+ proc_lib:start_link(couch_db_updater, init, [Arg]).
create(DbName, Options) ->
couch_server:create(DbName, Options).
@@ -189,16 +156,20 @@ open(DbName, Options) ->
Else -> Else
end.
-reopen(#db{main_pid = Pid, fd = Fd, fd_monitor = OldRef, user_ctx = UserCtx}) ->
- {ok, #db{fd = NewFd} = NewDb} = gen_server:call(Pid, get_db, infinity),
- case NewFd =:= Fd of
- true ->
- {ok, NewDb#db{user_ctx = UserCtx}};
- false ->
- erlang:demonitor(OldRef, [flush]),
- NewRef = erlang:monitor(process, NewFd),
- {ok, NewDb#db{user_ctx = UserCtx, fd_monitor = NewRef}}
- end.
+
+reopen(#db{} = Db) ->
+ % We could have just swapped out the storage engine
+ % for this database during a compaction so we just
+ % reimplement this as a close/open pair now.
+ close(Db),
+ open(Db#db.name, [{user_ctx, Db#db.user_ctx} | Db#db.options]).
+
+
+% You shouldn't call this. Its part of the ref counting between
+% couch_server and couch_db instances.
+incref(#db{} = Db) ->
+ couch_db_engine:incref(Db).
+
clustered_db(DbName, UserCtx) ->
clustered_db(DbName, UserCtx, []).
@@ -206,14 +177,6 @@ clustered_db(DbName, UserCtx) ->
clustered_db(DbName, UserCtx, SecProps) ->
{ok, #db{name = DbName, user_ctx = UserCtx, security = SecProps}}.
-incref(#db{fd = Fd} = Db) ->
- Ref = erlang:monitor(process, Fd),
- {ok, Db#db{fd_monitor = Ref}}.
-
-decref(#db{fd_monitor = Monitor}) ->
- erlang:demonitor(Monitor, [flush]),
- ok.
-
is_db(#db{}) ->
true;
is_db(_) ->
@@ -226,8 +189,8 @@ is_clustered(#db{main_pid = nil}) ->
true;
is_clustered(#db{}) ->
false;
-is_clustered(?NEW_PSE_DB = Db) ->
- ?PSE_DB_MAIN_PID(Db) == undefined.
+is_clustered(?OLD_DB_REC = Db) ->
+ ?OLD_DB_MAIN_PID(Db) == undefined.
ensure_full_commit(#db{main_pid=Pid, instance_start_time=StartTime}) ->
ok = gen_server:call(Pid, full_commit, infinity),
@@ -238,10 +201,9 @@ ensure_full_commit(Db, RequiredSeq) ->
ok = gen_server:call(Pid, {full_commit, RequiredSeq}, infinity),
{ok, StartTime}.
-close(#db{fd_monitor=Ref}) ->
- erlang:demonitor(Ref, [flush]),
- ok;
-close(?NEW_PSE_DB) ->
+close(#db{} = Db) ->
+ ok = couch_db_engine:decref(Db);
+close(?OLD_DB_REC) ->
ok.
is_idle(#db{compactor_pid=nil, waiting_delayed_commit=nil} = Db) ->
@@ -250,20 +212,31 @@ is_idle(_Db) ->
false.
monitored_by(Db) ->
- case erlang:process_info(Db#db.fd, monitored_by) of
- undefined ->
- [];
- {monitored_by, Pids} ->
- PidTracker = whereis(couch_stats_process_tracker),
- Pids -- [Db#db.main_pid, PidTracker]
+ case couch_db_engine:monitored_by(Db) of
+ Pids when is_list(Pids) ->
+ PidTracker = whereis(couch_stats_process_tracker),
+ Pids -- [Db#db.main_pid, PidTracker];
+ undefined ->
+ []
end.
monitor(#db{main_pid=MainPid}) ->
erlang:monitor(process, MainPid).
-start_compact(#db{main_pid=Pid}) ->
- gen_server:call(Pid, start_compact).
+start_compact(#db{} = Db) ->
+ start_compact(Db, []).
+
+start_compact(#db{} = Db, Opts) ->
+ case lists:keyfind(notify, 1, Opts) of
+ {notify, Pid, Term} ->
+ % We fake a gen_server call here which sends the
+ % response back to the specified pid.
+ Db#db.main_pid ! {'$gen_call', {Pid, Term}, start_compact},
+ ok;
+ _ ->
+ gen_server:call(Db#db.main_pid, start_compact)
+ end.
cancel_compact(#db{main_pid=Pid}) ->
gen_server:call(Pid, cancel_compact).
@@ -363,7 +336,8 @@ get_missing_revs(Db, IdRevsList) ->
find_missing([], []) ->
[];
-find_missing([{Id, Revs}|RestIdRevs], [{ok, FullInfo} | RestLookupInfo]) ->
+find_missing([{Id, Revs}|RestIdRevs], [FullInfo | RestLookupInfo])
+ when is_record(FullInfo, full_doc_info) ->
case couch_key_tree:find_missing(FullInfo#full_doc_info.rev_tree, Revs) of
[] ->
find_missing(RestIdRevs, RestLookupInfo);
@@ -391,8 +365,8 @@ find_missing([{Id, Revs}|RestIdRevs], [not_found | RestLookupInfo]) ->
get_doc_info(Db, Id) ->
case get_full_doc_info(Db, Id) of
- {ok, DocInfo} ->
- {ok, couch_doc:to_doc_info(DocInfo)};
+ #full_doc_info{} = FDI ->
+ {ok, couch_doc:to_doc_info(FDI)};
Else ->
Else
end.
@@ -403,10 +377,7 @@ get_full_doc_info(Db, Id) ->
Result.
get_full_doc_infos(Db, Ids) ->
- couch_btree:lookup(Db#db.id_tree, Ids).
-
-increment_update_seq(#db{main_pid=Pid}) ->
- gen_server:call(Pid, increment_update_seq).
+ couch_db_engine:open_docs(Db, Ids).
purge_docs(#db{main_pid=Pid}, IdsRevs) ->
gen_server:call(Pid, {purge_docs, IdsRevs}).
@@ -420,37 +391,34 @@ get_before_doc_update_fun(#db{before_doc_update = Fun}) ->
get_committed_update_seq(#db{committed_update_seq=Seq}) ->
Seq.
-get_update_seq(#db{update_seq=Seq})->
- Seq.
+get_update_seq(#db{} = Db)->
+ couch_db_engine:get_update_seq(Db).
get_user_ctx(#db{user_ctx = UserCtx}) ->
UserCtx;
-get_user_ctx(?NEW_PSE_DB = Db) ->
- ?PSE_DB_USER_CTX(Db).
+get_user_ctx(?OLD_DB_REC = Db) ->
+ ?OLD_DB_USER_CTX(Db).
get_purge_seq(#db{}=Db) ->
- couch_db_header:purge_seq(Db#db.header).
+ {ok, couch_db_engine:get_purge_seq(Db)}.
get_last_purged(#db{}=Db) ->
- case couch_db_header:purged_docs(Db#db.header) of
- nil ->
- {ok, []};
- Pointer ->
- couch_file:pread_term(Db#db.fd, Pointer)
- end.
+ {ok, couch_db_engine:get_last_purged(Db)}.
get_pid(#db{main_pid = Pid}) ->
Pid.
+get_del_doc_count(Db) ->
+ {ok, couch_db_engine:get_del_doc_count(Db)}.
+
get_doc_count(Db) ->
- {ok, {Count, _, _}} = couch_btree:full_reduce(Db#db.id_tree),
- {ok, Count}.
+ {ok, couch_db_engine:get_doc_count(Db)}.
get_uuid(#db{}=Db) ->
- couch_db_header:uuid(Db#db.header).
+ couch_db_engine:get_uuid(Db).
get_epochs(#db{}=Db) ->
- Epochs = couch_db_header:epochs(Db#db.header),
+ Epochs = couch_db_engine:get_epochs(Db),
validate_epochs(Epochs),
Epochs.
@@ -461,34 +429,25 @@ get_instance_start_time(#db{instance_start_time = IST}) ->
IST.
get_compacted_seq(#db{}=Db) ->
- couch_db_header:compacted_seq(Db#db.header).
+ couch_db_engine:get_compacted_seq(Db).
get_compactor_pid(#db{compactor_pid = Pid}) ->
Pid.
get_db_info(Db) ->
- #db{fd=Fd,
- header=Header,
- compactor_pid=Compactor,
- update_seq=SeqNum,
- name=Name,
- instance_start_time=StartTime,
- committed_update_seq=CommittedUpdateSeq,
- id_tree = IdBtree
+ #db{
+ name = Name,
+ compactor_pid = Compactor,
+ instance_start_time = StartTime,
+ committed_update_seq = CommittedUpdateSeq
} = Db,
- {ok, FileSize} = couch_file:bytes(Fd),
- {ok, DbReduction} = couch_btree:full_reduce(IdBtree),
- SizeInfo0 = element(3, DbReduction),
- SizeInfo = case SizeInfo0 of
- SI when is_record(SI, size_info) ->
- SI;
- {AS, ES} ->
- #size_info{active=AS, external=ES};
- AS ->
- #size_info{active=AS}
- end,
- ActiveSize = active_size(Db, SizeInfo),
- DiskVersion = couch_db_header:disk_version(Header),
+ {ok, DocCount} = get_doc_count(Db),
+ {ok, DelDocCount} = get_del_doc_count(Db),
+ SizeInfo = couch_db_engine:get_size_info(Db),
+ FileSize = couch_util:get_value(file, SizeInfo, null),
+ ActiveSize = couch_util:get_value(active, SizeInfo, null),
+ ExternalSize = couch_util:get_value(external, SizeInfo, null),
+ DiskVersion = couch_db_engine:get_disk_version(Db),
Uuid = case get_uuid(Db) of
undefined -> null;
Uuid0 -> Uuid0
@@ -499,63 +458,39 @@ get_db_info(Db) ->
end,
InfoList = [
{db_name, Name},
- {doc_count, element(1, DbReduction)},
- {doc_del_count, element(2, DbReduction)},
- {update_seq, SeqNum},
- {purge_seq, couch_db:get_purge_seq(Db)},
- {compact_running, Compactor/=nil},
+ {engine, couch_db_engine:get_engine(Db)},
+ {doc_count, DocCount},
+ {doc_del_count, DelDocCount},
+ {update_seq, get_update_seq(Db)},
+ {purge_seq, couch_db_engine:get_purge_seq(Db)},
+ {compact_running, Compactor /= nil},
+ {sizes, {SizeInfo}},
+ % TODO: Remove this in 3.0
+ % These are legacy and have been duplicated under
+ % the sizes key since 2.0. We should make a note
+ % in our release notes that we'll remove these
+ % old versions in 3.0
{disk_size, FileSize}, % legacy
- {other, {[{data_size, SizeInfo#size_info.external}]}}, % legacy
- {data_size, ActiveSize}, % legacy
- {sizes, {[
- {file, FileSize},
- {active, ActiveSize},
- {external, SizeInfo#size_info.external}
- ]}},
+ {data_size, ActiveSize},
+ {other, {[{data_size, ExternalSize}]}},
{instance_start_time, StartTime},
{disk_format_version, DiskVersion},
{committed_update_seq, CommittedUpdateSeq},
{compacted_seq, CompactedSeq},
{uuid, Uuid}
- ],
+ ],
{ok, InfoList}.
-active_size(#db{}=Db, Size) when is_integer(Size) ->
- active_size(Db, #size_info{active=Size});
-active_size(#db{}=Db, #size_info{}=SI) ->
- Trees = [
- Db#db.id_tree,
- Db#db.seq_tree,
- Db#db.local_tree
- ],
- lists:foldl(fun(T, Acc) ->
- case couch_btree:size(T) of
- _ when Acc == null ->
- null;
- undefined ->
- null;
- Size ->
- Acc + Size
- end
- end, SI#size_info.active, Trees).
get_design_docs(#db{name = <<"shards/", _:18/binary, DbName/binary>>}) ->
{_, Ref} = spawn_monitor(fun() -> exit(fabric:design_docs(DbName)) end),
receive {'DOWN', Ref, _, _, Response} ->
Response
end;
-get_design_docs(#db{id_tree = IdBtree}) ->
- FoldFun = pipe([fun skip_deleted/4], fun
- (#full_doc_info{deleted = true}, _Reds, Acc) ->
- {ok, Acc};
- (#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, Acc) ->
- {ok, [FullDocInfo | Acc]};
- (_, _Reds, Acc) ->
- {stop, Acc}
- end),
- KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}],
- {ok, _, Docs} = couch_btree:fold(IdBtree, FoldFun, [], KeyOpts),
- {ok, Docs}.
+get_design_docs(#db{} = Db) ->
+ FoldFun = fun(FDI, Acc) -> {ok, [FDI | Acc]} end,
+ {ok, Docs} = fold_design_docs(Db, FoldFun, [], []),
+ {ok, lists:reverse(Docs)}.
check_is_admin(#db{user_ctx=UserCtx}=Db) ->
@@ -645,8 +580,8 @@ get_members(#db{security=SecProps}) ->
get_security(#db{security=SecProps}) ->
{SecProps};
-get_security(?NEW_PSE_DB = Db) ->
- {?PSE_DB_SECURITY(Db)}.
+get_security(?OLD_DB_REC = Db) ->
+ {?OLD_DB_SECURITY(Db)}.
set_security(#db{main_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) ->
check_is_admin(Db),
@@ -685,8 +620,8 @@ validate_names_and_roles({Props}) when is_list(Props) ->
end,
ok.
-get_revs_limit(#db{revs_limit=Limit}) ->
- Limit.
+get_revs_limit(#db{} = Db) ->
+ couch_db_engine:get_revs_limit(Db).
set_revs_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 ->
check_is_admin(Db),
@@ -694,13 +629,11 @@ set_revs_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 ->
set_revs_limit(_Db, _Limit) ->
throw(invalid_revs_limit).
+
name(#db{name=Name}) ->
Name;
-name(?NEW_PSE_DB = Db) ->
- ?PSE_DB_NAME(Db).
-
-compression(#db{compression=Compression}) ->
- Compression.
+name(?OLD_DB_REC = Db) ->
+ ?OLD_DB_NAME(Db).
update_doc(Db, Doc, Options) ->
update_doc(Db, Doc, Options, interactive_edit).
@@ -831,6 +764,9 @@ load_validation_funs(#db{main_pid=Pid}=Db) ->
gen_server:cast(Pid, {load_validation_funs, Funs}),
Funs.
+reload_validation_funs(#db{} = Db) ->
+ gen_server:cast(Db#db.main_pid, {load_validation_funs, undefined}).
+
prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc,
OldFullDocInfo, LeafRevsDict, AllowConflict) ->
case Revs of
@@ -897,7 +833,7 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict,
[PreppedBucket | AccPrepped], AccErrors3);
prep_and_validate_updates(Db, [DocBucket|RestBuckets],
- [{ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}|RestLookups],
+ [#full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo|RestLookups],
AllowConflict, AccPrepped, AccErrors) ->
Leafs = couch_key_tree:get_all_leafs(OldRevTree),
LeafRevsDict = dict:from_list([
@@ -948,13 +884,14 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
end,
{[], AccErrors}, Bucket),
prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3);
- {ok, #full_doc_info{rev_tree=OldTree}} ->
+ #full_doc_info{rev_tree=OldTree} ->
+ RevsLimit = get_revs_limit(Db),
OldLeafs = couch_key_tree:get_all_leafs_full(OldTree),
OldLeafsLU = [{Start, RevId} || {Start, [{RevId, _}|_]} <- OldLeafs],
NewRevTree = lists:foldl(
fun(NewDoc, AccTree) ->
{NewTree, _} = couch_key_tree:merge(AccTree,
- couch_doc:to_path(NewDoc), Db#db.revs_limit),
+ couch_doc:to_path(NewDoc), RevsLimit),
NewTree
end,
OldTree, Bucket),
@@ -1090,7 +1027,7 @@ update_docs(Db, Docs0, Options, replicated_changes) ->
DocErrors = [],
DocBuckets3 = DocBuckets
end,
- DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.fd)
+ DocBuckets4 = [[doc_flush_atts(Db, check_dup_atts(Doc))
|| Doc <- Bucket] || Bucket <- DocBuckets3],
{ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]),
{ok, DocErrors};
@@ -1144,8 +1081,8 @@ update_docs(Db, Docs0, Options, interactive_edit) ->
Options2 = if AllOrNothing -> [merge_conflicts];
true -> [] end ++ Options,
DocBuckets3 = [[
- doc_flush_atts(set_new_att_revpos(
- check_dup_atts(Doc)), Db#db.fd)
+ doc_flush_atts(Db, set_new_att_revpos(
+ check_dup_atts(Doc)))
|| Doc <- B] || B <- DocBuckets2],
{DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),
@@ -1229,7 +1166,7 @@ write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets1,
% compaction. Retry by reopening the db and writing to the current file
{ok, Db2} = open(Db#db.name, [{user_ctx, Ctx}]),
DocBuckets2 = [
- [doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] ||
+ [doc_flush_atts(Db2, Doc) || Doc <- Bucket] ||
Bucket <- DocBuckets1
],
% We only retry once
@@ -1248,18 +1185,24 @@ write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets1,
prepare_doc_summaries(Db, BucketList) ->
[lists:map(
- fun(#doc{body = Body, atts = Atts} = Doc) ->
+ fun(#doc{atts = Atts} = Doc0) ->
DiskAtts = [couch_att:to_disk_term(Att) || Att <- Atts],
{ok, SizeInfo} = couch_att:size_info(Atts),
- AttsFd = case Atts of
- [Att | _] ->
- {Fd, _} = couch_att:fetch(data, Att),
- Fd;
- [] ->
- nil
+ AttsStream = case Atts of
+ [Att | _] ->
+ {stream, StreamEngine} = couch_att:fetch(data, Att),
+ StreamEngine;
+ [] ->
+ nil
end,
- SummaryChunk = couch_db_updater:make_doc_summary(Db, {Body, DiskAtts}),
- Doc#doc{body = {summary, SummaryChunk, SizeInfo, AttsFd}}
+ Doc1 = Doc0#doc{
+ atts = DiskAtts,
+ meta = [
+ {size_info, SizeInfo},
+ {atts_stream, AttsStream}
+ ] ++ Doc0#doc.meta
+ },
+ couch_db_engine:serialize_doc(Db, Doc1)
end,
Bucket) || Bucket <- BucketList].
@@ -1284,12 +1227,8 @@ set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts0}=Doc) ->
Doc#doc{atts = Atts}.
-doc_flush_atts(Doc, Fd) ->
- Doc#doc{atts=[couch_att:flush(Fd, Att) || Att <- Doc#doc.atts]}.
-
-check_md5(_NewSig, <<>>) -> ok;
-check_md5(Sig, Sig) -> ok;
-check_md5(_, _) -> throw(md5_mismatch).
+doc_flush_atts(Db, Doc) ->
+ Doc#doc{atts=[couch_att:flush(Db, Att) || Att <- Doc#doc.atts]}.
compressible_att_type(MimeType) when is_binary(MimeType) ->
@@ -1319,21 +1258,24 @@ compressible_att_type(MimeType) ->
% is present in the request, but there is no Content-MD5
% trailer, we're free to ignore this inconsistency and
% pretend that no Content-MD5 exists.
-with_stream(Fd, Att, Fun) ->
+with_stream(Db, Att, Fun) ->
[InMd5, Type, Enc] = couch_att:fetch([md5, type, encoding], Att),
BufferSize = list_to_integer(
config:get("couchdb", "attachment_stream_buffer_size", "4096")),
- {ok, OutputStream} = case (Enc =:= identity) andalso
- compressible_att_type(Type) of
- true ->
- CompLevel = list_to_integer(
- config:get("attachments", "compression_level", "0")
- ),
- couch_stream:open(Fd, [{buffer_size, BufferSize},
- {encoding, gzip}, {compression_level, CompLevel}]);
- _ ->
- couch_stream:open(Fd, [{buffer_size, BufferSize}])
+ Options = case (Enc =:= identity) andalso compressible_att_type(Type) of
+ true ->
+ CompLevel = list_to_integer(
+ config:get("attachments", "compression_level", "0")
+ ),
+ [
+ {buffer_size, BufferSize},
+ {encoding, gzip},
+ {compression_level, CompLevel}
+ ];
+ _ ->
+ [{buffer_size, BufferSize}]
end,
+ {ok, OutputStream} = open_write_stream(Db, Options),
ReqMd5 = case Fun(OutputStream) of
{md5, FooterMd5} ->
case InMd5 of
@@ -1343,9 +1285,9 @@ with_stream(Fd, Att, Fun) ->
_ ->
InMd5
end,
- {StreamInfo, Len, IdentityLen, Md5, IdentityMd5} =
+ {StreamEngine, Len, IdentityLen, Md5, IdentityMd5} =
couch_stream:close(OutputStream),
- check_md5(IdentityMd5, ReqMd5),
+ couch_util:check_md5(IdentityMd5, ReqMd5),
{AttLen, DiskLen, NewEnc} = case Enc of
identity ->
case {Md5, IdentityMd5} of
@@ -1367,7 +1309,7 @@ with_stream(Fd, Att, Fun) ->
end
end,
couch_att:store([
- {data, {Fd,StreamInfo}},
+ {data, {stream, StreamEngine}},
{att_len, AttLen},
{disk_len, DiskLen},
{md5, Md5},
@@ -1375,83 +1317,16 @@ with_stream(Fd, Att, Fun) ->
], Att).
-enum_docs_since_reduce_to_count(Reds) ->
- couch_btree:final_reduce(
- fun couch_db_updater:btree_by_seq_reduce/2, Reds).
+open_write_stream(Db, Options) ->
+ couch_db_engine:open_write_stream(Db, Options).
-enum_docs_reduce_to_count(Reds) ->
- FinalRed = couch_btree:final_reduce(
- fun couch_db_updater:btree_by_id_reduce/2, Reds),
- element(1, FinalRed).
-changes_since(Db, StartSeq, Fun, Acc) ->
- changes_since(Db, StartSeq, Fun, [], Acc).
+open_read_stream(Db, AttState) ->
+ couch_db_engine:open_read_stream(Db, AttState).
-changes_since(Db, StartSeq, Fun, Options, Acc) when is_record(Db, db) ->
- changes_since(Db#db.seq_tree, StartSeq, Fun, Options, Acc);
-changes_since(SeqTree, StartSeq, Fun, Options, Acc) ->
- Wrapper = fun(FullDocInfo, _Offset, Acc2) ->
- DocInfo = case FullDocInfo of
- #full_doc_info{} ->
- couch_doc:to_doc_info(FullDocInfo);
- #doc_info{} ->
- FullDocInfo
- end,
- Fun(DocInfo, Acc2)
- end,
- {ok, _LastReduction, AccOut} = couch_btree:fold(SeqTree,
- Wrapper, Acc, [{start_key, StartSeq + 1}] ++ Options),
- {ok, AccOut}.
-count_changes_since(Db, SinceSeq) ->
- BTree = Db#db.seq_tree,
- {ok, Changes} =
- couch_btree:fold_reduce(BTree,
- fun(_SeqStart, PartialReds, 0) ->
- {ok, couch_btree:final_reduce(BTree, PartialReds)}
- end,
- 0, [{start_key, SinceSeq + 1}]),
- Changes.
-
-enum_docs_since(Db, SinceSeq, InFun, Acc, Options) ->
- {ok, LastReduction, AccOut} = couch_btree:fold(
- Db#db.seq_tree, InFun, Acc,
- [{start_key, SinceSeq + 1} | Options]),
- {ok, enum_docs_since_reduce_to_count(LastReduction), AccOut}.
-
-
-fold_docs(Db, InFun, InAcc, Opts) ->
- Wrapper = fun(FDI, _, Acc) -> InFun(FDI, Acc) end,
- {ok, _, AccOut} = couch_btree:fold(Db#db.id_tree, Wrapper, InAcc, Opts),
- {ok, AccOut}.
-
-fold_local_docs(Db, InFun, InAcc, Opts) ->
- Wrapper = fun(FDI, _, Acc) -> InFun(FDI, Acc) end,
- {ok, _, AccOut} = couch_btree:fold(Db#db.local_tree, Wrapper, InAcc, Opts),
- {ok, AccOut}.
-
-enum_docs(Db, InFun, InAcc, Options0) ->
- {NS, Options} = extract_namespace(Options0),
- enum_docs(Db, NS, InFun, InAcc, Options).
-
-enum_docs(Db, undefined, InFun, InAcc, Options) ->
- FoldFun = pipe([fun skip_deleted/4], InFun),
- {ok, LastReduce, OutAcc} = couch_btree:fold(
- Db#db.id_tree, FoldFun, InAcc, Options),
- {ok, enum_docs_reduce_to_count(LastReduce), OutAcc};
-enum_docs(Db, <<"_local">>, InFun, InAcc, Options) ->
- FoldFun = pipe([fun skip_deleted/4], InFun),
- {ok, _LastReduce, OutAcc} = couch_btree:fold(
- Db#db.local_tree, FoldFun, InAcc, Options),
- {ok, 0, OutAcc};
-enum_docs(Db, NS, InFun, InAcc, Options0) ->
- FoldFun = pipe([
- fun skip_deleted/4,
- stop_on_leaving_namespace(NS)], InFun),
- Options = set_namespace_range(Options0, NS),
- {ok, LastReduce, OutAcc} = couch_btree:fold(
- Db#db.id_tree, FoldFun, InAcc, Options),
- {ok, enum_docs_reduce_to_count(LastReduce), OutAcc}.
+is_active_stream(Db, StreamEngine) ->
+ couch_db_engine:is_active_stream(Db, StreamEngine).
calculate_start_seq(_Db, _Node, Seq) when is_integer(Seq) ->
@@ -1525,22 +1400,43 @@ start_seq([], OrigNode, Seq) ->
erlang:error({epoch_mismatch, OrigNode, Seq}).
-extract_namespace(Options0) ->
- case proplists:split(Options0, [namespace]) of
- {[[{namespace, NS}]], Options} ->
- {NS, Options};
- {_, Options} ->
- {undefined, Options}
- end.
+fold_docs(Db, UserFun, UserAcc) ->
+ fold_docs(Db, UserFun, UserAcc, []).
+
+fold_docs(Db, UserFun, UserAcc, Options) ->
+ couch_db_engine:fold_docs(Db, UserFun, UserAcc, Options).
+
+
+fold_local_docs(Db, UserFun, UserAcc, Options) ->
+ couch_db_engine:fold_local_docs(Db, UserFun, UserAcc, Options).
+
+
+fold_design_docs(Db, UserFun, UserAcc, Options1) ->
+ Options2 = set_design_doc_keys(Options1),
+ couch_db_engine:fold_docs(Db, UserFun, UserAcc, Options2).
+
+
+fold_changes(Db, StartSeq, UserFun, UserAcc) ->
+ fold_changes(Db, StartSeq, UserFun, UserAcc, []).
+
+
+fold_changes(Db, StartSeq, UserFun, UserAcc, Opts) ->
+ couch_db_engine:fold_changes(Db, StartSeq, UserFun, UserAcc, Opts).
+
+
+count_changes_since(Db, SinceSeq) ->
+ couch_db_engine:count_changes_since(Db, SinceSeq).
+
%%% Internal function %%%
+
open_doc_revs_int(Db, IdRevs, Options) ->
Ids = [Id || {Id, _Revs} <- IdRevs],
LookupResults = get_full_doc_infos(Db, Ids),
lists:zipwith(
fun({Id, Revs}, Lookup) ->
case Lookup of
- {ok, #full_doc_info{rev_tree=RevTree}} ->
+ #full_doc_info{rev_tree=RevTree} ->
{FoundRevs, MissingRevs} =
case Revs of
all ->
@@ -1574,9 +1470,8 @@ open_doc_revs_int(Db, IdRevs, Options) ->
IdRevs, LookupResults).
open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, Options) ->
- case couch_btree:lookup(Db#db.local_tree, [Id]) of
- [{ok, {_, {Rev, BodyData}}}] ->
- Doc = #doc{id=Id, revs={0, [?l2b(integer_to_list(Rev))]}, body=BodyData},
+ case couch_db_engine:open_local_docs(Db, [Id]) of
+ [#doc{} = Doc] ->
apply_open_options({ok, Doc}, Options);
[not_found] ->
{not_found, missing}
@@ -1595,7 +1490,7 @@ open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo, Options) ->
{ok, Doc#doc{meta=doc_meta_info(DocInfo, RevTree, Options)}}, Options);
open_doc_int(Db, Id, Options) ->
case get_full_doc_info(Db, Id) of
- {ok, FullDocInfo} ->
+ #full_doc_info{} = FullDocInfo ->
open_doc_int(Db, FullDocInfo, Options);
not_found ->
{not_found, missing}
@@ -1641,9 +1536,6 @@ doc_meta_info(#doc_info{high_seq=Seq,revs=[#rev_info{rev=Rev}|RestInfo]}, RevTre
true -> [{local_seq, Seq}]
end.
-read_doc(#db{fd=Fd}, Pos) ->
- couch_file:pread_term(Fd, Pos).
-
make_doc(_Db, Id, Deleted, nil = _Bp, RevisionPath) ->
#doc{
@@ -1653,34 +1545,32 @@ make_doc(_Db, Id, Deleted, nil = _Bp, RevisionPath) ->
atts = [],
deleted = Deleted
};
-make_doc(#db{fd=Fd, revs_limit=RevsLimit}=Db, Id, Deleted, Bp, {Pos, Revs}) ->
- {BodyData, Atts0} = case Bp of
- nil ->
- {[], []};
- _ ->
- case read_doc(Db, Bp) of
- {ok, {BodyData0, Atts1}} when is_binary(Atts1) ->
- {BodyData0, couch_compress:decompress(Atts1)};
- {ok, {BodyData0, Atts1}} when is_list(Atts1) ->
- % pre 1.2 format
- {BodyData0, Atts1}
- end
- end,
- Atts = [couch_att:from_disk_term(Fd, T) || T <- Atts0],
- Doc = #doc{
+make_doc(#db{} = Db, Id, Deleted, Bp, {Pos, Revs}) ->
+ RevsLimit = get_revs_limit(Db),
+ Doc0 = couch_db_engine:read_doc_body(Db, #doc{
id = Id,
revs = {Pos, lists:sublist(Revs, 1, RevsLimit)},
- body = BodyData,
- atts = Atts,
+ body = Bp,
deleted = Deleted
- },
- after_doc_read(Db, Doc).
+ }),
+ Doc1 = case Doc0#doc.atts of
+ BinAtts when is_binary(BinAtts) ->
+ Doc0#doc{
+ atts = couch_compress:decompress(BinAtts)
+ };
+ ListAtts when is_list(ListAtts) ->
+ Doc0
+ end,
+ after_doc_read(Db, Doc1#doc{
+ atts = [couch_att:from_disk_term(Db, T) || T <- Doc1#doc.atts]
+ }).
after_doc_read(#db{} = Db, Doc) ->
DocWithBody = couch_doc:with_ejson_body(Doc),
couch_db_plugin:after_doc_read(Db, DocWithBody).
+
increment_stat(#db{options = Options}, Stat) ->
case lists:member(sys_db, Options) of
true ->
@@ -1689,71 +1579,6 @@ increment_stat(#db{options = Options}, Stat) ->
couch_stats:increment_counter(Stat)
end.
-skip_deleted(traverse, LK, {Undeleted, _, _} = Reds, Acc) when Undeleted == 0 ->
- {skip, LK, Reds, Acc};
-skip_deleted(Case, A, B, C) ->
- {Case, A, B, C}.
-
-stop_on_leaving_namespace(NS) ->
- fun
- (visit, #full_doc_info{id = Key} = FullInfo, Reds, Acc) ->
- case has_prefix(Key, NS) of
- true ->
- {visit, FullInfo, Reds, Acc};
- false ->
- {stop, FullInfo, Reds, Acc}
- end;
- (Case, KV, Reds, Acc) ->
- {Case, KV, Reds, Acc}
- end.
-
-has_prefix(Bin, Prefix) ->
- S = byte_size(Prefix),
- case Bin of
- <<Prefix:S/binary, "/", _/binary>> ->
- true;
- _Else ->
- false
- end.
-
-pipe(Filters, Final) ->
- Wrap =
- fun
- (visit, KV, Reds, Acc) ->
- Final(KV, Reds, Acc);
- (skip, _KV, _Reds, Acc) ->
- {skip, Acc};
- (stop, _KV, _Reds, Acc) ->
- {stop, Acc};
- (traverse, _, _, Acc) ->
- {ok, Acc}
- end,
- do_pipe(Filters, Wrap).
-
-do_pipe([], Fun) -> Fun;
-do_pipe([Filter|Rest], F0) ->
- F1 = fun(C0, KV0, Reds0, Acc0) ->
- {C, KV, Reds, Acc} = Filter(C0, KV0, Reds0, Acc0),
- F0(C, KV, Reds, Acc)
- end,
- do_pipe(Rest, F1).
-
-set_namespace_range(Options, undefined) -> Options;
-set_namespace_range(Options, NS) ->
- %% FIXME depending on order we might need to swap keys
- SK = select_gt(
- proplists:get_value(start_key, Options, <<"">>),
- <<NS/binary, "/">>),
- EK = select_lt(
- proplists:get_value(end_key, Options, <<NS/binary, "0">>),
- <<NS/binary, "0">>),
- [{start_key, SK}, {end_key_gt, EK}].
-
-select_gt(V1, V2) when V1 < V2 -> V2;
-select_gt(V1, _V2) -> V1.
-
-select_lt(V1, V2) when V1 > V2 -> V2;
-select_lt(V1, _V2) -> V1.
-spec normalize_dbname(list() | binary()) -> binary().
@@ -1793,6 +1618,70 @@ is_systemdb(DbName) when is_list(DbName) ->
is_systemdb(DbName) when is_binary(DbName) ->
lists:member(dbname_suffix(DbName), ?SYSTEM_DATABASES).
+
+set_design_doc_keys(Options1) ->
+ Dir = case lists:keyfind(dir, 1, Options1) of
+ {dir, D0} -> D0;
+ _ -> fwd
+ end,
+ Options2 = set_design_doc_start_key(Options1, Dir),
+ set_design_doc_end_key(Options2, Dir).
+
+
+-define(FIRST_DDOC_KEY, <<"_design/">>).
+-define(LAST_DDOC_KEY, <<"_design0">>).
+
+
+set_design_doc_start_key(Options, fwd) ->
+ Key1 = couch_util:get_value(start_key, Options, ?FIRST_DDOC_KEY),
+ Key2 = case Key1 < ?FIRST_DDOC_KEY of
+ true -> ?FIRST_DDOC_KEY;
+ false -> Key1
+ end,
+ lists:keystore(start_key, 1, Options, {start_key, Key2});
+set_design_doc_start_key(Options, rev) ->
+ Key1 = couch_util:get_value(start_key, Options, ?LAST_DDOC_KEY),
+ Key2 = case Key1 > ?LAST_DDOC_KEY of
+ true -> ?LAST_DDOC_KEY;
+ false -> Key1
+ end,
+ lists:keystore(start_key, 1, Options, {start_key, Key2}).
+
+
+set_design_doc_end_key(Options, fwd) ->
+ case couch_util:get_value(end_key_gt, Options) of
+ undefined ->
+ Key1 = couch_util:get_value(end_key, Options, ?LAST_DDOC_KEY),
+ Key2 = case Key1 > ?LAST_DDOC_KEY of
+ true -> ?LAST_DDOC_KEY;
+ false -> Key1
+ end,
+ lists:keystore(end_key, 1, Options, {end_key, Key2});
+ EKeyGT ->
+ Key2 = case EKeyGT > ?LAST_DDOC_KEY of
+ true -> ?LAST_DDOC_KEY;
+ false -> EKeyGT
+ end,
+ lists:keystore(end_key_gt, 1, Options, {end_key_gt, Key2})
+ end;
+set_design_doc_end_key(Options, rev) ->
+ case couch_util:get_value(end_key_gt, Options) of
+ undefined ->
+ Key1 = couch_util:get_value(end_key, Options, ?LAST_DDOC_KEY),
+ Key2 = case Key1 < ?FIRST_DDOC_KEY of
+ true -> ?FIRST_DDOC_KEY;
+ false -> Key1
+ end,
+ lists:keystore(end_key, 1, Options, {end_key, Key2});
+ EKeyGT ->
+ Key2 = case EKeyGT < ?FIRST_DDOC_KEY of
+ true -> ?FIRST_DDOC_KEY;
+ false -> EKeyGT
+ end,
+ lists:keystore(end_key_gt, 1, Options, {end_key_gt, Key2})
+ end.
+
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
@@ -1863,19 +1752,19 @@ should_fail_validate_dbname(DbName) ->
ok
end)}.
-calculate_start_seq_test() ->
- %% uuid mismatch is always a rewind.
- Hdr1 = couch_db_header:new(),
- Hdr2 = couch_db_header:set(Hdr1, [{epochs, [{node1, 1}]}, {uuid, <<"uuid1">>}]),
- ?assertEqual(0, calculate_start_seq(#db{header=Hdr2}, node1, {1, <<"uuid2">>})),
- %% uuid matches and seq is owned by node.
- Hdr3 = couch_db_header:set(Hdr2, [{epochs, [{node1, 1}]}]),
- ?assertEqual(2, calculate_start_seq(#db{header=Hdr3}, node1, {2, <<"uuid1">>})),
- %% uuids match but seq is not owned by node.
- Hdr4 = couch_db_header:set(Hdr2, [{epochs, [{node2, 2}, {node1, 1}]}]),
- ?assertEqual(0, calculate_start_seq(#db{header=Hdr4}, node1, {3, <<"uuid1">>})),
- %% return integer if we didn't get a vector.
- ?assertEqual(4, calculate_start_seq(#db{}, foo, 4)).
+%calculate_start_seq_test() ->
+% %% uuid mismatch is always a rewind.
+% Hdr1 = couch_db_header:new(),
+% Hdr2 = couch_db_header:set(Hdr1, [{epochs, [{node1, 1}]}, {uuid, <<"uuid1">>}]),
+% ?assertEqual(0, calculate_start_seq(#db{header=Hdr2}, node1, {1, <<"uuid2">>})),
+% %% uuid matches and seq is owned by node.
+% Hdr3 = couch_db_header:set(Hdr2, [{epochs, [{node1, 1}]}]),
+% ?assertEqual(2, calculate_start_seq(#db{header=Hdr3}, node1, {2, <<"uuid1">>})),
+% %% uuids match but seq is not owned by node.
+% Hdr4 = couch_db_header:set(Hdr2, [{epochs, [{node2, 2}, {node1, 1}]}]),
+% ?assertEqual(0, calculate_start_seq(#db{header=Hdr4}, node1, {3, <<"uuid1">>})),
+% %% return integer if we didn't get a vector.
+% ?assertEqual(4, calculate_start_seq(#db{}, foo, 4)).
is_owner_test() ->
?assertNot(is_owner(foo, 1, [])),
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/e056ae9e/src/couch_db_engine.erl
----------------------------------------------------------------------
diff --git a/src/couch_db_engine.erl b/src/couch_db_engine.erl
index 7718ac5..045e75c 100644
--- a/src/couch_db_engine.erl
+++ b/src/couch_db_engine.erl
@@ -563,9 +563,6 @@
{ok, CompactedDbHandle::db_handle(), CompactorPid::pid() | undefined}.
--include("couch_db_int.hrl").
-
-
-export([
exists/2,
delete/4,
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/e056ae9e/src/couch_db_int.hrl
----------------------------------------------------------------------
diff --git a/src/couch_db_int.hrl b/src/couch_db_int.hrl
index 0bbb5e0..2b41b73 100644
--- a/src/couch_db_int.hrl
+++ b/src/couch_db_int.hrl
@@ -10,35 +10,8 @@
% License for the specific language governing permissions and limitations under
% the License.
--record(db, {
- main_pid = nil,
- compactor_pid = nil,
- instance_start_time, % number of microsecs since jan 1 1970 as a binary string
- fd,
- fd_monitor,
- header = couch_db_header:new(),
- committed_update_seq,
- id_tree,
- seq_tree,
- local_tree,
- update_seq,
- name,
- filepath,
- validate_doc_funs = undefined,
- security = [],
- security_ptr = nil,
- user_ctx = #user_ctx{},
- waiting_delayed_commit = nil,
- revs_limit = 1000,
- fsync_options = [],
- options = [],
- compression,
- before_doc_update = nil, % nil | fun(Doc, Db) -> NewDoc
- after_doc_read = nil % nil | fun(Doc, Db) -> NewDoc
-}).
-
--record(new_pse_db, {
+-record(db, {
name,
filepath,
@@ -65,27 +38,36 @@
}).
--define(NEW_PSE_DB, {
+-define(OLD_DB_REC, {
db,
- _, % Name
- _, % FilePath
- _, % Engine
_, % MainPid
_, % CompactorPid
- _, % CommittedUpdateSeq
_, % InstanceStartTime
- _, % UserCtx
- _, % Security
+ _, % Fd
+ _, % FdMonitor
+ _, % Header
+ _, % CommittedUpdateSeq
+ _, % IdTree
+ _, % SeqTree
+ _, % LocalTree
+ _, % UpdateSeq
+ _, % Name
+ _, % FilePath
_, % ValidateDocFuns
- _, % BeforeDocUpdate
- _, % AfterDocRead
+ _, % Security
+ _, % SecurityPtr
+ _, % UserCtx
_, % WaitingDelayedCommit
+ _, % RevsLimit
+ _, % FsyncOptions
_, % Options
- _ % Compression
+ _, % Compression
+ _, % BeforeDocUpdate
+ _ % AfterDocRead
}).
--define(PSE_DB_NAME(Db), element(2, Db)).
--define(PSE_DB_MAIN_PID(Db), element(5, Db)).
--define(PSE_DB_USER_CTX(Db), element(9, Db)).
--define(PSE_DB_SECURITY(Db), element(10, Db)).
+-define(OLD_DB_NAME(Db), element(2, Db)).
+-define(OLD_DB_MAIN_PID(Db), element(13, Db)).
+-define(OLD_DB_USER_CTX(Db), element(18, Db)).
+-define(OLD_DB_SECURITY(Db), element(16, Db)).