You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/03/15 22:46:35 UTC
[2/2] couch commit: updated refs/heads/COUCHDB-3326-clustered-purge
to 511aef2
Implement new purge API
This is the basis for clustered purge.
COUCHDB-3266
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/511aef20
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/511aef20
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/511aef20
Branch: refs/heads/COUCHDB-3326-clustered-purge
Commit: 511aef20b5111bfaaceafeaf29e731d56849d773
Parents: 0f4e1a7
Author: Mayya Sharipova <ma...@ca.ibm.com>
Authored: Wed Jun 22 09:58:04 2016 -0400
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Wed Mar 15 17:46:13 2017 -0500
----------------------------------------------------------------------
src/couch_bt_engine.erl | 259 ++++++++++++++++---
src/couch_bt_engine.hrl | 4 +-
src/couch_bt_engine_compactor.erl | 126 ++++++++--
src/couch_bt_engine_header.erl | 27 +-
src/couch_db.erl | 82 +++++-
src/couch_db_engine.erl | 110 ++++++--
src/couch_db_header.erl | 405 ------------------------------
src/couch_db_updater.erl | 237 +++++++++++------
src/couch_util.erl | 10 +
src/test_engine_compaction.erl | 124 ++++++++-
src/test_engine_fold_purged_docs.erl | 134 ++++++++++
src/test_engine_get_set_props.erl | 2 +
src/test_engine_purge_docs.erl | 33 ++-
src/test_engine_util.erl | 157 ++++++++----
test/couch_db_purge_docs_tests.erl | 348 +++++++++++++++++++++++++
15 files changed, 1407 insertions(+), 651 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/511aef20/src/couch_bt_engine.erl
----------------------------------------------------------------------
diff --git a/src/couch_bt_engine.erl b/src/couch_bt_engine.erl
index 3b31341..f5edb45 100644
--- a/src/couch_bt_engine.erl
+++ b/src/couch_bt_engine.erl
@@ -34,23 +34,28 @@
get_doc_count/1,
get_epochs/1,
get_last_purged/1,
+ get_oldest_purge_seq/1,
get_purge_seq/1,
+ get_purged_docs_limit/1,
get_revs_limit/1,
get_security/1,
get_size_info/1,
get_update_seq/1,
get_uuid/1,
+ set_purged_docs_limit/2,
set_revs_limit/2,
set_security/2,
open_docs/2,
open_local_docs/2,
+ open_purged_docs/2,
read_doc_body/2,
serialize_doc/2,
write_doc_body/2,
- write_doc_infos/4,
+ write_doc_infos/3,
+ purge_doc_revs/3,
commit_data/1,
@@ -62,6 +67,7 @@
fold_local_docs/4,
fold_changes/5,
count_changes_since/2,
+ fold_purged_docs/5,
start_compaction/4,
finish_compaction/4
@@ -83,7 +89,9 @@
seq_tree_reduce/2,
local_tree_split/1,
- local_tree_join/2
+ local_tree_join/2,
+
+ purge_tree_reduce/2
]).
@@ -98,6 +106,17 @@
-include("couch_bt_engine.hrl").
+-record(pacc, {
+ add_docs = [],
+ rem_ids = [],
+ rem_seqs = [],
+ add_upurges = [],
+ add_purges = [],
+ update_seq,
+ purge_seq
+}).
+
+
exists(FilePath) ->
case filelib:is_file(FilePath) of
true ->
@@ -205,6 +224,7 @@ get_doc_count(#st{} = St) ->
{ok, {Count, _, _}} = couch_btree:full_reduce(St#st.id_tree),
Count.
+
get_epochs(#st{header = Header}) ->
couch_bt_engine_header:get(Header, epochs).
@@ -223,6 +243,10 @@ get_purge_seq(#st{header = Header}) ->
couch_bt_engine_header:get(Header, purge_seq).
+get_purged_docs_limit(#st{header = Header}) ->
+ couch_bt_enigne_header:get(Header, purged_docs_limit).
+
+
get_revs_limit(#st{header = Header}) ->
couch_bt_engine_header:get(Header, revs_limit).
@@ -258,6 +282,12 @@ get_security(#st{header = Header} = St) ->
end.
+get_oldest_purge_seq(#st{} = St) ->
+ FoldFun = fun({K, _V}, _) -> {stop, K} end,
+ {ok, _, OldestSeq} = couch_btree:foldl(St#st.purge_tree, FoldFun, 0),
+ OldestSeq.
+
+
get_update_seq(#st{header = Header}) ->
couch_bt_engine_header:get(Header, update_seq).
@@ -266,6 +296,16 @@ get_uuid(#st{header = Header}) ->
couch_bt_engine_header:get(Header, uuid).
+set_purged_docs_limit(#st{header = Header} = St, PurgedDocsLimit) ->
+ NewSt = St#st{
+ header = couch_bt_enigne_header:set(Header, [
+ {purged_docs_limit, PurgedDocsLimit}
+ ]),
+ needs_commit = true
+ },
+ {ok, increment_update_seq(NewSt)}.
+
+
set_revs_limit(#st{header = Header} = St, RevsLimit) ->
NewSt = St#st{
header = couch_bt_engine_header:set(Header, [
@@ -304,6 +344,14 @@ open_local_docs(#st{} = St, DocIds) ->
end, Results).
+open_purged_docs(St, UUIDs) ->
+ Results = couch_btree:lookup(St#st.upurge_tree, UUIDs),
+ lists:map(fun
+ ({ok, IdRevs}) -> IdRevs;
+ (not_found) -> not_found
+ end, Results).
+
+
read_doc_body(#st{} = St, #doc{} = Doc) ->
{ok, {Body, Atts}} = couch_file:pread_term(St#st.fd, Doc#doc.body),
Doc#doc{
@@ -343,7 +391,7 @@ write_doc_body(St, #doc{} = Doc) ->
{ok, Doc#doc{body = Ptr}, Written}.
-write_doc_infos(#st{} = St, Pairs, LocalDocs, PurgedIdRevs) ->
+write_doc_infos(#st{} = St, Pairs, LocalDocs) ->
#st{
id_tree = IdTree,
seq_tree = SeqTree,
@@ -383,23 +431,9 @@ write_doc_infos(#st{} = St, Pairs, LocalDocs, PurgedIdRevs) ->
erlang:max(Seq, Acc)
end, get_update_seq(St), Add),
- NewHeader = case PurgedIdRevs of
- [] ->
- couch_bt_engine_header:set(St#st.header, [
- {update_seq, NewUpdateSeq}
- ]);
- _ ->
- {ok, Ptr, _} = couch_file:append_term(St#st.fd, PurgedIdRevs),
- OldPurgeSeq = couch_bt_engine_header:get(St#st.header, purge_seq),
- % We bump NewUpdateSeq because we have to ensure that
- % indexers see that they need to process the new purge
- % information.
- couch_bt_engine_header:set(St#st.header, [
- {update_seq, NewUpdateSeq + 1},
- {purge_seq, OldPurgeSeq + 1},
- {purged_docs, Ptr}
- ])
- end,
+ NewHeader = couch_bt_engine_header:set(St#st.header, [
+ {update_seq, NewUpdateSeq}
+ ]),
{ok, St#st{
header = NewHeader,
@@ -410,6 +444,62 @@ write_doc_infos(#st{} = St, Pairs, LocalDocs, PurgedIdRevs) ->
}}.
+purge_doc_revs(#st{} = St, DocInfos, Purges) ->
+ #st{
+ id_tree = IdTree,
+ seq_tree = SeqTree,
+ purge_tree = PurgeTree,
+ upurge_tree = UPurgeTree
+ } = St,
+ UpdateSeq = couch_bt_engine_header:get(St#st.header, update_seq),
+ PurgeSeq = couch_bt_engine_header:get(St#st.header, purge_seq),
+ PAcc0 = #pacc{update_seq = UpdateSeq, purge_seq = PurgeSeq},
+
+ PAcc = lists:foldl(fun(DocInfoPurge, Acc) ->
+ {{OldFDI, NewFDI}, {UUId, DocId, Revs}} = DocInfoPurge,
+ NextPSeq = Acc#pacc.purge_seq + 1,
+ Acc2 = Acc#pacc{
+ rem_seqs = [OldFDI#full_doc_info.update_seq|Acc#pacc.rem_seqs],
+ add_upurges = [{UUId, {DocId, Revs}}|Acc#pacc.add_upurges],
+ add_purges = [{NextPSeq, UUId}|Acc#pacc.add_purges],
+ purge_seq = NextPSeq
+ },
+ case NewFDI of
+ #full_doc_info{id = DocId, update_seq = NewUSeq} ->
+ Acc2#pacc{
+ add_docs = [NewFDI|Acc2#pacc.add_docs],
+ update_seq = NewUSeq
+ };
+ not_found ->
+ Acc2#pacc{rem_ids = [DocId|Acc#pacc.rem_ids]}
+ end
+ end, PAcc0, lists:zip(DocInfos, Purges)),
+
+ % We bump NewUpdateSeq because we have to ensure that
+ % indexers see that they need to process the new purge
+ % information.
+ NewUpdateSeq = if UpdateSeq == PAcc#pacc.update_seq -> UpdateSeq + 1;
+ true -> PAcc#pacc.update_seq end,
+ Header2 = couch_bt_engine_header:set(St#st.header, [
+ {update_seq, NewUpdateSeq},
+ {purge_seq, PAcc#pacc.purge_seq}
+ ]),
+ {ok, IdTree2} = couch_btree:add_remove(IdTree,
+ PAcc#pacc.add_docs, PAcc#pacc.rem_ids),
+ {ok, SeqTree2} = couch_btree:add_remove(SeqTree,
+ PAcc#pacc.add_docs, PAcc#pacc.rem_seqs),
+ {ok, UPurgeTree2} = couch_btree:add(UPurgeTree, PAcc#pacc.add_upurges),
+ {ok, PurgeTree2} = couch_btree:add(PurgeTree, PAcc#pacc.add_purges),
+ {ok, St#st{
+ header = Header2,
+ id_tree = IdTree2,
+ seq_tree = SeqTree2,
+ purge_tree = PurgeTree2,
+ upurge_tree = UPurgeTree2,
+ needs_commit = true
+ }}.
+
+
commit_data(St) ->
#st{
fd = Fd,
@@ -469,6 +559,35 @@ fold_changes(St, SinceSeq, UserFun, UserAcc, Options) ->
{ok, FinalUserAcc}.
+fold_purged_docs(St, StartSeq0, UserFun, UserAcc, Options) ->
+ StartSeq = StartSeq0 + 1,
+ PurgeTree = St#st.purge_tree,
+ {ok, _, MinSeq} = couch_btree:foldl(PurgeTree,
+ fun({K, _V}, _) -> {stop, K} end, 0),
+ if (MinSeq =< StartSeq) ->
+ Fun = fun drop_reductions2/4,
+ UFun = fun(PSeq, UUID, {PAcc, UUIDAcc}) ->
+ {ok, {[PSeq| PAcc],[UUID| UUIDAcc]}}
+ end,
+ PUAcc = {[], []},
+ InAcc = {UFun, PUAcc},
+ Opts = [{start_key, StartSeq}] ++ Options,
+ {ok, _, OutAcc} = couch_btree:fold(PurgeTree, Fun, InAcc, Opts),
+ {_, {PSeqs0, UUIDs0}} = OutAcc,
+ PSeqs = lists:reverse(PSeqs0),
+ UUIDs = lists:reverse(UUIDs0),
+ DocResults = couch_btree:lookup(St#st.upurge_tree, UUIDs),
+ FinalAcc = pfoldl(UserFun, UserAcc, PSeqs, DocResults),
+ {ok, FinalAcc};
+ true ->
+ throw({invalid_start_purge_seq, StartSeq0})
+ end.
+
+pfoldl(F, Accu, [PSeq| PSeqs], [{ok, {UUID, {Id, Revs}}}| DocResults]) ->
+ pfoldl(F, F({PSeq, UUID, Id, Revs}, Accu), PSeqs, DocResults);
+pfoldl(_F, Accu, [], []) -> Accu.
+
+
count_changes_since(St, SinceSeq) ->
BTree = St#st.seq_tree,
FoldFun = fun(_SeqStart, PartialReds, 0) ->
@@ -494,8 +613,8 @@ finish_compaction(OldState, DbName, Options, CompactFilePath) ->
finish_compaction_int(OldState, NewState1);
false ->
couch_log:info("Compaction file still behind main file "
- "(update seq=~p. compact update seq=~p). Retrying.",
- [OldSeq, NewSeq]),
+ "(update seq=~p. compact update seq=~p). Retrying.",
+ [OldSeq, NewSeq]),
ok = decref(NewState1),
start_compaction(OldState, DbName, Options, self())
end.
@@ -593,6 +712,13 @@ seq_tree_reduce(rereduce, Reds) ->
lists:sum(Reds).
+purge_tree_reduce(reduce, IdRevs) ->
+ % count the number of purge requests
+ length(IdRevs);
+purge_tree_reduce(rereduce, Reds) ->
+ lists:sum(Reds).
+
+
local_tree_split(#doc{} = Doc) ->
#doc{
id = Id,
@@ -672,7 +798,8 @@ init_state(FilePath, Fd, Header0, Options) ->
Compression = couch_compress:get_compression_method(),
Header1 = couch_bt_engine_header:upgrade(Header0),
- Header = set_default_security_object(Fd, Header1, Compression, Options),
+ Header2 = set_default_security_object(Fd, Header1, Compression, Options),
+ Header = upgrade_purge_info(Fd, Header2),
IdTreeState = couch_bt_engine_header:id_tree_state(Header),
{ok, IdTree} = couch_btree:open(IdTreeState, Fd, [
@@ -697,6 +824,16 @@ init_state(FilePath, Fd, Header0, Options) ->
{compression, Compression}
]),
+ PurgeTreeState = couch_bt_engine_header:purge_tree_state(Header),
+ {ok, PurgeTree} = couch_btree:open(PurgeTreeState, Fd, [
+ {reduce, fun ?MODULE:purge_tree_reduce/2}
+ ]),
+
+ UPurgeTreeState = couch_bt_engine_header:upurge_tree_state(Header),
+ {ok, UPurgeTree} = couch_btree:open(UPurgeTreeState, Fd, [
+ {reduce, fun ?MODULE:purge_tree_reduce/2}
+ ]),
+
ok = couch_file:set_db_pid(Fd, self()),
St = #st{
@@ -709,7 +846,9 @@ init_state(FilePath, Fd, Header0, Options) ->
id_tree = IdTree,
seq_tree = SeqTree,
local_tree = LocalTree,
- compression = Compression
+ compression = Compression,
+ purge_tree = PurgeTree,
+ upurge_tree = UPurgeTree
},
% If this is a new database we've just created a
@@ -728,7 +867,9 @@ update_header(St, Header) ->
couch_bt_engine_header:set(Header, [
{seq_tree_state, couch_btree:get_state(St#st.seq_tree)},
{id_tree_state, couch_btree:get_state(St#st.id_tree)},
- {local_tree_state, couch_btree:get_state(St#st.local_tree)}
+ {local_tree_state, couch_btree:get_state(St#st.local_tree)},
+ {purge_tree_state, couch_btree:get_state(St#st.purge_tree)},
+ {upurge_tree_state, couch_btree:get_state(St#st.upurge_tree)}
]).
@@ -753,6 +894,46 @@ set_default_security_object(Fd, Header, Compression, Options) ->
end.
+% This function is here, and not in couch_bt_engine_header
+% because it requires modifying file contents
+upgrade_purge_info(Fd, Header) ->
+ case couch_bt_engine_header:get(Header, purge_tree_state) of
+ nil ->
+ Header;
+ Ptr when is_tuple(Ptr) ->
+ Header;
+ Ptr when is_integer(Ptr)->
+ % old PurgeDocs format - upgrade to purge_tree
+ {ok, PurgedIdRevs} = couch_file:pread_term(Fd, Ptr),
+ PSeq = couch_bt_engine_header:purge_seq(Header),
+
+ {NPSeq, AddU0, AddP0} = lists:foldl(fun({Id, Revs}, {Seq, UAcc, PAcc}) ->
+ UUId = couch_uuids:new(),
+ NewUAcc = [{UUId, {Id, Revs}} | UAcc],
+ NewPAcc = [{Seq+1, UUId} | PAcc],
+ {Seq+1, NewUAcc, NewPAcc}
+ end, {PSeq-1, [], []}, PurgedIdRevs),
+ AddU = lists:reverse(AddU0),
+ AddP = lists:reverse(AddP0),
+
+ {ok, UPTree0} = couch_btree:open(nil, Fd, [
+ {reduce, fun ?MODULE:purge_tree_reduce/2}
+ ]),
+ {ok, UPTree} = couch_btree:add_remove(UPTree0, AddU, []),
+ UPTreeState = couch_btree:get_state(UPTree),
+ {ok, PTree0} = couch_btree:open(nil, Fd, [
+ {reduce, fun ?MODULE:purge_tree_reduce/2}
+ ]),
+ {ok, PTree} = couch_btree:add_remove(PTree0, AddP, []),
+ PTreeState = couch_btree:get_state(PTree),
+ couch_bt_engine:set(Header, [
+ {purge_seq, NPSeq},
+ {purge_tree_state, PTreeState},
+ {upurge_tree_state, UPTreeState}
+ ])
+ end.
+
+
delete_compaction_files(FilePath) ->
RootDir = config:get("couchdb", "database_dir", "."),
DelOpts = [{context, delete}],
@@ -898,34 +1079,38 @@ drop_reductions(_, _, _, Acc) ->
{ok, Acc}.
+drop_reductions2(visit, {PurgeSeq, UUID}, _Reds, {UserFun, UserAcc}) ->
+ {Go, NewUserAcc} = UserFun(PurgeSeq, UUID, UserAcc),
+ {Go, {UserFun, NewUserAcc}};
+drop_reductions2(_, _, _, Acc) ->
+ {ok, Acc}.
+
+
fold_docs_reduce_to_count(Reds) ->
RedFun = fun id_tree_reduce/2,
FinalRed = couch_btree:final_reduce(RedFun, Reds),
element(1, FinalRed).
-finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) ->
+finish_compaction_int(#st{} = OldSt, #st{} = NewSt) ->
#st{
filepath = FilePath,
local_tree = OldLocal
} = OldSt,
#st{
- filepath = CompactDataPath,
- header = Header,
- local_tree = NewLocal1
- } = NewSt1,
+ filepath = CompactDataPath
+ } = NewSt,
% suck up all the local docs into memory and write them to the new db
- LoadFun = fun(Value, _Offset, Acc) ->
- {ok, [Value | Acc]}
- end,
+ LoadFun = fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end,
{ok, _, LocalDocs} = couch_btree:foldl(OldLocal, LoadFun, []),
- {ok, NewLocal2} = couch_btree:add(NewLocal1, LocalDocs),
+ {ok, NewLocal2} = couch_btree:add(NewSt#st.local_tree, LocalDocs),
- {ok, NewSt2} = commit_data(NewSt1#st{
- header = couch_bt_engine_header:set(Header, [
+ {ok, NewSt2} = commit_data(NewSt#st{
+ header = couch_bt_engine_header:set(NewSt#st.header, [
{compacted_seq, get_update_seq(OldSt)},
- {revs_limit, get_revs_limit(OldSt)}
+ {revs_limit, get_revs_limit(OldSt)},
+ {purged_docs_limit, get_purged_docs_limit(OldSt)}
]),
local_tree = NewLocal2
}),
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/511aef20/src/couch_bt_engine.hrl
----------------------------------------------------------------------
diff --git a/src/couch_bt_engine.hrl b/src/couch_bt_engine.hrl
index 7f52d8f..ee3d566 100644
--- a/src/couch_bt_engine.hrl
+++ b/src/couch_bt_engine.hrl
@@ -20,5 +20,7 @@
id_tree,
seq_tree,
local_tree,
- compression
+ compression,
+ purge_tree,
+ upurge_tree
}).
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/511aef20/src/couch_bt_engine_compactor.erl
----------------------------------------------------------------------
diff --git a/src/couch_bt_engine_compactor.erl b/src/couch_bt_engine_compactor.erl
index 7f3b5d7..7e2c8a4 100644
--- a/src/couch_bt_engine_compactor.erl
+++ b/src/couch_bt_engine_compactor.erl
@@ -54,13 +54,13 @@ start(#st{} = St, DbName, Options, Parent) ->
% and hope everything works out for the best.
unlink(DFd),
- NewSt1 = copy_purge_info(St, NewSt),
- NewSt2 = copy_compact(DbName, St, NewSt1, Retry),
+ NewSt2 = copy_compact(DbName, St, NewSt, Retry),
NewSt3 = sort_meta_data(NewSt2),
NewSt4 = commit_compaction_data(NewSt3),
NewSt5 = copy_meta_data(NewSt4),
- {ok, NewSt6} = couch_bt_engine:commit_data(NewSt5),
- ok = couch_bt_engine:decref(NewSt6),
+ NewSt6 = copy_purge_info(St, NewSt5, Parent),
+ {ok, NewSt7} = couch_bt_engine:commit_data(NewSt6),
+ ok = couch_bt_engine:decref(NewSt7),
ok = couch_file:close(MFd),
% Done
@@ -72,7 +72,9 @@ open_compaction_files(SrcHdr, DbFilePath, Options) ->
MetaFile = DbFilePath ++ ".compact.meta",
{ok, DataFd, DataHdr} = open_compaction_file(DataFile),
{ok, MetaFd, MetaHdr} = open_compaction_file(MetaFile),
+
DataHdrIsDbHdr = couch_bt_engine_header:is_header(DataHdr),
+
case {DataHdr, MetaHdr} of
{#comp_header{}=A, #comp_header{}=A} ->
DbHeader = A#comp_header.db_header,
@@ -88,7 +90,9 @@ open_compaction_files(SrcHdr, DbFilePath, Options) ->
St1 = bind_emsort(St0, MetaFd, nil),
{ok, St1, DataFile, DataFd, MetaFd, St0#st.id_tree};
_ ->
- Header = couch_bt_engine_header:from(SrcHdr),
+ Header0 = couch_bt_engine_header:from(SrcHdr),
+ % set purge_seq of NewSt to -1 on the 1st round of compaction
+ Header = couch_bt_engine_header:set(Header0, [{purge_seq, -1}]),
ok = reset_compaction_file(DataFd, Header),
ok = reset_compaction_file(MetaFd, Header),
St0 = couch_bt_engine:init_state(DataFile, DataFd, Header, Options),
@@ -97,25 +101,6 @@ open_compaction_files(SrcHdr, DbFilePath, Options) ->
end.
-copy_purge_info(OldSt, NewSt) ->
- OldHdr = OldSt#st.header,
- NewHdr = NewSt#st.header,
- OldPurgeSeq = couch_bt_engine_header:purge_seq(OldHdr),
- case OldPurgeSeq > 0 of
- true ->
- Purged = couch_bt_engine:get_last_purged(OldSt),
- Opts = [{compression, NewSt#st.compression}],
- {ok, Ptr, _} = couch_file:append_term(NewSt#st.fd, Purged, Opts),
- NewNewHdr = couch_bt_engine_header:set(NewHdr, [
- {purge_seq, OldPurgeSeq},
- {purged_docs, Ptr}
- ]),
- NewSt#st{header = NewNewHdr};
- false ->
- NewSt
- end.
-
-
copy_compact(DbName, St, NewSt0, Retry) ->
Compression = couch_compress:get_compression_method(),
NewSt = NewSt0#st{compression = Compression},
@@ -316,6 +301,99 @@ copy_doc_attachments(#st{} = SrcSt, SrcSp, DstSt) ->
{BodyData, NewBinInfos}.
+copy_purge_info(OldSt, NewSt, Parent) ->
+ OldPSeq = couch_bt_engine:get(OldSt, purge_seq),
+ NewPSeq = couch_bt_engine:get(NewSt, purge_seq),
+ % During 1st round of compaction, we copy all purges from OldSt to NewSt
+ % respecting purged_docs_limit. During recompaction rounds, we copy
+ % purges occurred during compact and remove from db's ID and Seq trees
+ % completely purged Docs
+ % As we do diff copy_purge operations on the 1st and subsequent rounds of
+ % compaction, we need NewPSeq==-1 indicate the 1st round of compaction.
+ % We can't use NewPSeq==0 to indicate 1st round, as it will be wrong
+ % for the case when compaction started with OldPSeq=0 and NewPSeq=0
+ % and during compaction there were purge requests, and OldPSeq>0
+ % On recompaction, we need operations for copy_new_purge_info
+ % but since NewPseq is still 0 after 1s round, checking for 0 will lead us
+ % wrongly to copy_purge_info_from_start.
+ case NewPSeq of
+ OldPSeq -> NewSt; % nothing to copy
+ -1 when OldPSeq > 0 -> copy_purge_info_from_start(OldSt, NewSt, Parent);
+ -1 when OldPSeq == 0 -> % just update the header
+ NewHeader =
+ couch_bt_engine_header:set(NewSt#st.header, [{purge_seq, 0}]),
+ NewSt#st{header=NewHeader};
+ _ -> copy_new_purge_info(OldSt, NewSt)
+ end.
+
+
+copy_purge_info_from_start(OldSt, NewSt, Parent) ->
+ % purge requests happened up to & including DisposPSeq can be disregarded
+ {ok, DisposPSeq} = gen_server:call(Parent, get_disposable_purge_seq),
+ FoldFun = fun({P, U, Id, Rs}, {AccP, AccU}) ->
+ {[{P, U}|AccP], [{U, {Id, Rs}}|AccU]}
+ end,
+ % copy purge requests from OldSt to NewSt starting AFTER DisposPSeq
+ {ok, {AddP, AddU}} = couch_bt_engine:fold_purged_docs(
+ OldSt, DisposPSeq, FoldFun, {[], []}, []),
+ {ok, NewPTree} = couch_btree:add(NewSt#st.purge_tree, lists:reverse(AddP)),
+ {ok, NewUTree} = couch_btree:add(NewSt#st.upurge_tree, lists:reverse(AddU)),
+ NewHeader = couch_bt_engine_header:set(NewSt#st.header,
+ [{purge_seq, couch_bt_engine:get(OldSt, purge_seq)}]
+ ),
+ NewSt#st{purge_tree = NewPTree, upurge_tree = NewUTree, header = NewHeader}.
+
+
+copy_new_purge_info(OldSt, NewSt) ->
+ % collect purges since NewPSeq
+ FoldFun = fun({P, U, Id, Rs}, {AccP, AccU, AccI, AccR}) ->
+ {[{P,U}|AccP], [{U,{Id, Rs}}|AccU], [Id|AccI], [{Id, Rs}|AccR]}
+ end,
+ NewPSeq = couch_bt_engine:get(NewSt, purge_seq),
+ InitAcc = {[], [], [], []},
+ {ok, {AddP, AddU, DocIds, PIdsRevs}} =
+ couch_bt_engine:fold_purged_docs(OldSt, NewPSeq, FoldFun, InitAcc, []),
+ {ok, NewPTree} = couch_btree:add(NewSt#st.purge_tree, lists:reverse(AddP)),
+ {ok, NewUTree} = couch_btree:add(NewSt#st.upurge_tree, lists:reverse(AddU)),
+
+ % Since purging a document will change the update_seq,
+ % finish_compaction will restart compaction in order to process
+ % the new updates, which takes care of handling partially
+ % purged documents.
+ % collect only Ids and Seqs of docs that were completely purged
+ OldDocInfos = couch_bt_engine:open_docs(NewSt, DocIds),
+ FoldFun2 = fun({OldFDI, {Id,Revs}}, {Acc1, Acc2}) ->
+ #full_doc_info{rev_tree=Tree, update_seq=UpSeq} = OldFDI,
+ case couch_key_tree:remove_leafs(Tree, Revs) of
+ {[]= _NewTree, _} ->
+ {[Id| Acc1], [UpSeq| Acc2]};
+ _ ->
+ {Acc1, Acc2}
+ end
+ end,
+ {RemIds, RemSeqs} =
+ lists:foldl(FoldFun2, {[], []}, lists:zip(OldDocInfos, PIdsRevs)),
+ % remove from NewSt docs that were completely purged
+ {NewIdTree, NewSeqTree} = case {RemIds, RemSeqs} of
+ {[], []} ->
+ {NewSt#st.id_tree, NewSt#st.seq_tree};
+ _ ->
+ {ok, NewIT} = couch_btree:add_remove(NewSt#st.id_tree, [], RemIds),
+ {ok, NewST} = couch_btree:add_remove(NewSt#st.seq_tree, [], RemSeqs),
+ {NewIT, NewST}
+ end,
+ NewHeader = couch_bt_engine_header:set(NewSt#st.header,
+ [{purge_seq, couch_bt_engine:get(OldSt, purge_seq)}]
+ ),
+ NewSt#st{
+ header = NewHeader,
+ id_tree = NewIdTree,
+ seq_tree = NewSeqTree,
+ purge_tree = NewPTree,
+ upurge_tree = NewUTree
+ }.
+
+
sort_meta_data(St0) ->
{ok, Ems} = couch_emsort:merge(St0#st.id_tree),
St0#st{id_tree=Ems}.
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/511aef20/src/couch_bt_engine_header.erl
----------------------------------------------------------------------
diff --git a/src/couch_bt_engine_header.erl b/src/couch_bt_engine_header.erl
index 3d24f31..cfe76e1 100644
--- a/src/couch_bt_engine_header.erl
+++ b/src/couch_bt_engine_header.erl
@@ -31,8 +31,10 @@
seq_tree_state/1,
latest/1,
local_tree_state/1,
+ purge_tree_state/1,
+ upurge_tree_state/1,
purge_seq/1,
- purged_docs/1,
+ purged_docs_limit/1,
security_ptr/1,
revs_limit/1,
uuid/1,
@@ -61,12 +63,14 @@
seq_tree_state = nil,
local_tree_state = nil,
purge_seq = 0,
- purged_docs = nil,
+ purge_tree_state = nil, %purge tree: purge_seq -> uuid
security_ptr = nil,
revs_limit = 1000,
uuid,
epochs,
- compacted_seq
+ compacted_seq,
+ upurge_tree_state = nil, %purge tree: uuid -> {docid, revs}
+ purged_docs_limit = 1000
}).
@@ -150,12 +154,20 @@ local_tree_state(Header) ->
get_field(Header, local_tree_state).
+purge_tree_state(Header) ->
+ get_field(Header, purge_tree_state).
+
+
+upurge_tree_state(Header) ->
+ get_field(Header, upurge_tree_state).
+
+
purge_seq(Header) ->
get_field(Header, purge_seq).
-purged_docs(Header) ->
- get_field(Header, purged_docs).
+purged_docs_limit(Header) ->
+ get_field(Header, purged_docs_limit).
security_ptr(Header) ->
@@ -303,6 +315,7 @@ upgrade_compacted_seq(#db_header{}=Header) ->
Header
end.
+
latest(?LATEST_DISK_VERSION) ->
true;
latest(N) when is_integer(N), N < ?LATEST_DISK_VERSION ->
@@ -323,7 +336,7 @@ mk_header(Vsn) ->
bar, % seq_tree_state
bam, % local_tree_state
1, % purge_seq
- baz, % purged_docs
+ baz, % purge_info
bang, % security_ptr
999 % revs_limit
}.
@@ -343,7 +356,7 @@ upgrade_v3_test() ->
?assertEqual(bar, seq_tree_state(NewHeader)),
?assertEqual(bam, local_tree_state(NewHeader)),
?assertEqual(1, purge_seq(NewHeader)),
- ?assertEqual(baz, purged_docs(NewHeader)),
+ ?assertEqual(baz, purge_tree_state(NewHeader)),
?assertEqual(bang, security_ptr(NewHeader)),
?assertEqual(999, revs_limit(NewHeader)),
?assertEqual(undefined, uuid(NewHeader)),
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/511aef20/src/couch_db.erl
----------------------------------------------------------------------
diff --git a/src/couch_db.erl b/src/couch_db.erl
index ca63a40..40d7aef 100644
--- a/src/couch_db.erl
+++ b/src/couch_db.erl
@@ -43,19 +43,21 @@
get_epochs/1,
get_filepath/1,
get_instance_start_time/1,
- get_last_purged/1,
+ get_oldest_purge_seq/1,
get_pid/1,
+ get_purge_seq/1,
+ get_purged_docs_limit/1,
get_revs_limit/1,
get_security/1,
get_update_seq/1,
get_user_ctx/1,
get_uuid/1,
- get_purge_seq/1,
is_db/1,
is_system_db/1,
is_clustered/1,
+ set_purged_docs_limit/2,
set_revs_limit/2,
set_security/2,
set_user_ctx/2,
@@ -69,6 +71,7 @@
open_doc/2,
open_doc/3,
open_doc_revs/4,
+ open_purged_docs/2,
open_doc_int/3,
get_doc_info/2,
get_full_doc_info/2,
@@ -82,7 +85,6 @@
update_docs/2,
update_docs/3,
delete_doc/3,
-
purge_docs/2,
with_stream/3,
@@ -97,6 +99,8 @@
fold_changes/4,
fold_changes/5,
count_changes_since/2,
+ fold_purged_docs/4,
+ fold_purged_docs/5,
calculate_start_seq/3,
owner_of/2,
@@ -363,6 +367,8 @@ find_missing([{Id, Revs}|RestIdRevs], [FullInfo | RestLookupInfo])
find_missing([{Id, Revs}|RestIdRevs], [not_found | RestLookupInfo]) ->
[{Id, Revs, []} | find_missing(RestIdRevs, RestLookupInfo)].
+
+% returns {ok, DocInfo} or not_found
get_doc_info(Db, Id) ->
case get_full_doc_info(Db, Id) of
#full_doc_info{} = FDI ->
@@ -371,7 +377,7 @@ get_doc_info(Db, Id) ->
Else
end.
-% returns {ok, DocInfo} or not_found
+
get_full_doc_info(Db, Id) ->
[Result] = get_full_doc_infos(Db, [Id]),
Result.
@@ -379,8 +385,52 @@ get_full_doc_info(Db, Id) ->
get_full_doc_infos(Db, Ids) ->
couch_db_engine:open_docs(Db, Ids).
-purge_docs(#db{main_pid=Pid}, IdsRevs) ->
- gen_server:call(Pid, {purge_docs, IdsRevs}).
+
+purge_docs(#db{main_pid=Pid}=Db, UUIdsIdsRevs) ->
+ purge_docs(#db{main_pid=Pid}=Db, UUIdsIdsRevs, interactive_edit).
+
+-spec purge_docs(#db{}, [{UUId, Id, [Rev]}], interactive_edit) ->
+ {ok, {PurgeSeq, [Reply]}} when
+ UUId :: binary(),
+ Id :: binary(),
+ Rev :: {non_neg_integer(), binary()},
+ PurgeSeq :: non_neg_integer(),
+ Reply :: {ok, []}
+ | {ok, [Rev]}.
+purge_docs(#db{main_pid=Pid}, UUIdsIdsRevs, interactive_edit) ->
+ gen_server:call(Pid, {purge_docs, UUIdsIdsRevs});
+
+purge_docs(#db{main_pid=Pid}=Db, UUIdsIdsRevs0, replicated_changes) ->
+ % filter out purge requests that have been already applied:
+ % their UUIDs exist in upurge_tree
+ UUIDs = [UUID || {UUID, _Id, _Revs} <- UUIdsIdsRevs0],
+ Results = open_purged_docs(Db, UUIDs),
+ UUIdsIdsRevs = lists:foldr(fun(
+ {not_found, UUIdIdrevs}, Acc0) -> [UUIdIdrevs|Acc0];
+ ({_, _}, Acc0) -> Acc0
+ end, [], lists:zip(Results, UUIdsIdsRevs0)),
+ case UUIdsIdsRevs of
+ [] -> {ok, []};
+ _ -> gen_server:call(Pid, {purge_docs, UUIdsIdsRevs})
+ end.
+
+
+-spec open_purged_docs(#db{}, [UUId]) -> [PurgedReq] when
+ UUId :: binary(),
+ PurgedReq :: {Id, [Rev]}
+ | not_found,
+ Id :: binary(),
+ Rev :: {non_neg_integer(), binary()}.
+open_purged_docs(Db, UUIDs) ->
+ couch_db_engine:open_purged_docs(Db, UUIDs).
+
+
+set_purged_docs_limit(#db{main_pid=Pid}=Db, Limit)
+ when is_integer(Limit), Limit > 0 ->
+ check_is_admin(Db),
+ gen_server:call(Pid, {set_purged_docs_limit, Limit}, infinity);
+set_purged_docs_limit(_Db, _Limit) ->
+ throw(invalid_purged_docs_limit).
get_after_doc_read_fun(#db{after_doc_read = Fun}) ->
Fun.
@@ -400,10 +450,13 @@ get_user_ctx(?OLD_DB_REC = Db) ->
?OLD_DB_USER_CTX(Db).
get_purge_seq(#db{}=Db) ->
- {ok, couch_db_engine:get_purge_seq(Db)}.
+ couch_db_engine:get_purge_seq(Db).
-get_last_purged(#db{}=Db) ->
- {ok, couch_db_engine:get_last_purged(Db)}.
+get_oldest_purge_seq(#db{}=Db) ->
+ {ok, couch_db_engine:get_oldest_purge_seq(Db)}.
+
+get_purged_docs_limit(#db{}=Db) ->
+ couch_db_engine:get_purged_docs_limit(Db).
get_pid(#db{main_pid = Pid}) ->
Pid.
@@ -995,6 +1048,7 @@ doc_tag(#doc{meta=Meta}) ->
Else -> throw({invalid_doc_tag, Else})
end.
+
update_docs(Db, Docs0, Options, replicated_changes) ->
increment_stat(Db, [couchdb, database_writes]),
Docs = tag_docs(Docs0),
@@ -1075,7 +1129,6 @@ update_docs(Db, Docs0, Options, interactive_edit) ->
check_dup_atts(Doc)))
|| Doc <- B] || B <- DocBuckets2],
{DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),
-
{ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2),
ResultsDict = lists:foldl(fun({Key, Resp}, ResultsAcc) ->
@@ -1086,6 +1139,7 @@ update_docs(Db, Docs0, Options, interactive_edit) ->
end, Docs)}
end.
+
% Returns the first available document on disk. Input list is a full rev path
% for the doc.
make_first_doc_on_disk(_Db, _Id, _Pos, []) ->
@@ -1397,6 +1451,14 @@ fold_docs(Db, UserFun, UserAcc, Options) ->
couch_db_engine:fold_docs(Db, UserFun, UserAcc, Options).
+fold_purged_docs(Db, StartPurgeSeq, Fun, Acc) ->
+ fold_purged_docs(Db, StartPurgeSeq, Fun, Acc, []).
+
+
+fold_purged_docs(Db, StartPurgeSeq, UFun, UAcc, Opts) ->
+ couch_db_engine:fold_purged_docs(Db, StartPurgeSeq, UFun, UAcc, Opts).
+
+
fold_local_docs(Db, UserFun, UserAcc, Options) ->
couch_db_engine:fold_local_docs(Db, UserFun, UserAcc, Options).
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/511aef20/src/couch_db_engine.erl
----------------------------------------------------------------------
diff --git a/src/couch_db_engine.erl b/src/couch_db_engine.erl
index 045e75c..bdf2313 100644
--- a/src/couch_db_engine.erl
+++ b/src/couch_db_engine.erl
@@ -59,6 +59,10 @@
% Need to enumerate these
].
+-type purge_fold_options() :: [
+ % Need to enumerate these
+ ].
+
-type db_handle() :: any().
-type doc_fold_fun() :: fun((#full_doc_info{}, UserAcc::any()) ->
@@ -73,6 +77,10 @@
{ok, NewUserAcc::any()} |
{stop, NewUserAcc::any()}).
+-type purge_fold_fun() :: fun((
+ {PurgeSeq::non_neg_integer(), UUID:: binary(), Id::docid(), Revs::revs()},
+ UserAcc::any()) -> NewUserAcc::any()).
+
% This is called by couch_server to determine which
% engine should be used for the given database. DbPath
@@ -329,31 +337,20 @@
% #full_doc_info{} records. The first element of the pair is
% the #full_doc_info{} that exists on disk. The second element
% is the new version that should be written to disk. There are
-% three basic cases that should be followed:
+% two basic cases that should be considered:
%
% 1. {not_found, #full_doc_info{}} - A new document was created
% 2. {#full_doc_info{}, #full_doc_info{}} - A document was updated
-% 3. {#full_doc_info{}, not_found} - A document was purged completely
%
-% Number one and two are fairly straight forward as long as proper
-% accounting for moving entries in the udpate sequence are accounted
-% for. However, case 3 you'll notice is "purged completely" which
-% means it needs to be removed from the database including the
-% update sequence. Also, for engines that are not using append
-% only storage like the legacy engine, case 2 can be the result of
-% a purge so special care will be needed to see which revisions
-% should be removed.
+% The cases are fairly straight forward as long as proper
+% accounting for moving entries in the update sequence are accounted
+% for.
%
% The LocalDocs variable is applied separately. Its important to
% note for new storage engine authors that these documents are
% separate because they should *not* be included as part of the
% changes index for the database.
%
-% The PurgedDocIdRevs is the list of Ids and Revisions that were
-% purged during this update. While its not guaranteed by the API,
-% currently there will never be purge changes comingled with
-% standard updates.
-%
% Traditionally an invocation of write_doc_infos should be all
% or nothing in so much that if an error occurs (or the VM dies)
% then the database doesn't retain any of the changes. However
@@ -364,8 +361,36 @@
-callback write_doc_infos(
DbHandle::db_handle(),
Pairs::doc_pairs(),
- LocalDocs::[#doc{}],
- PurgedDocIdRevs::[{docid(), revs()}]) ->
+ LocalDocs::[#doc{}]) ->
+ {ok, NewDbHandle::db_handle()}.
+
+
+% This function is called from the context of couch_db_updater
+% and as such is guaranteed single threaded for the given
+% DbHandle.
+%
+% The Pairs argument is a list of pairs (2-tuples) of
+% #full_doc_info{} records.
+% The first element of the pair is the #full_doc_info{} that exists
+% on disk. The second element is the new version that should be written
+% to disk. There are two basic cases that should be considered:
+%
+% 1. {#full_doc_info{}, #full_doc_info{}} - A document was partially purged
+% 2. {#full_doc_info{}, not_found} - A document was completely purged
+%
+% In case 1, non-tail-append engines may have to remove revisions
+% specifically rather than rely on compaction to remove them.
+%
+% In case 2 you'll notice is "purged completely" which
+% means it needs to be removed from the database including the
+% update sequence.
+%
+% The Purges argument is a list of 3-tuples, representing a purge request.
+% Each tuple consists of the purge UUId, DocId and Revisions, that were purged.
+-callback purge_doc_revs(
+ DbHandle::db_handle(),
+ Pairs::doc_pairs(),
+ Purges::[{binary(), docid(), revs()}]) ->
{ok, NewDbHandle::db_handle()}.
@@ -424,16 +449,16 @@
%
% 1. start_key - Start iteration at the provided key or
% or just after if the key doesn't exist
-% 2. end_key - Stop iteration prior to visiting the provided
+% 2. end_key_gt - Stop iteration prior to visiting the provided
% key
-% 3. end_key_gt - Stop iteration just after the provided key
+% 3. end_key - Stop iteration just after the provided key
% 4. dir - The atom fwd or rev. This is to be able to iterate
% over documents in reverse order. The logic for comparing
% start_key, end_key, and end_key_gt are then reversed (ie,
% when rev, start_key should be greater than end_key if the
% user wishes to see results)
% 5. include_reductions - This is a hack for _all_docs since
-% it currently relies on reductiosn to count an offset. This
+% it currently relies on reductions to count an offset. This
% is a terrible hack that will need to be addressed by the
% API in the future. If this option is present the supplied
% user function expects three arguments, where the first
@@ -480,12 +505,12 @@
% This function is called to fold over the documents (not local
% documents) in order of their most recent update. Each document
% in the database should have exactly one entry in this sequence.
-% If a document is updated during a call to this funciton it should
+% If a document is updated during a call to this function it should
% not be included twice as that will probably lead to Very Bad Things.
%
% This should behave similarly to fold_docs/4 in that the supplied
% user function should be invoked with a #full_doc_info{} record
-% as the first arugment and the current user accumulator as the
+% as the first argument and the current user accumulator as the
% second argument. The same semantics for the return value from the
% user function should be handled as in fold_docs/4.
%
@@ -506,6 +531,21 @@
% This function may be called by many processes concurrently.
%
+% This function is called to fold over purged requests in order of
+% their oldest purge (increasing purge_seq order)
+%
+% The StartPurgeSeq parameter indicates where the fold should start *after*.
+-callback fold_purged_docs(
+ DbHandle::db_handle(),
+ StartPurgeSeq::non_neg_integer(),
+ UserFold::purge_fold_fun(),
+ UserAcc::any(),
+ purge_fold_options()) ->
+ {ok, LastUserAcc::any()}.
+
+
+% This function may be called by many processes concurrently.
+%
% This function is called to count the number of documents changed
% since they given UpdateSeq (ie, not including the possible change
% at exactly UpdateSeq). It is currently only used internally to
@@ -596,11 +636,13 @@
open_docs/2,
open_local_docs/2,
+ open_purged_docs/2,
read_doc_body/2,
serialize_doc/2,
write_doc_body/2,
- write_doc_infos/4,
+ write_doc_infos/3,
+ purge_doc_revs/3,
commit_data/1,
open_write_stream/2,
@@ -610,6 +652,7 @@
fold_docs/4,
fold_local_docs/4,
fold_changes/5,
+ fold_purged_docs/5,
count_changes_since/2,
start_compaction/1,
@@ -774,6 +817,11 @@ open_local_docs(#db{} = Db, DocIds) ->
Engine:open_local_docs(EngineState, DocIds).
+open_purged_docs(#db{} = Db, UUIDs) ->
+ #db{engine = {Engine, EngineState}} = Db,
+ Engine:open_purged_docs(EngineState, UUIDs).
+
+
read_doc_body(#db{} = Db, RawDoc) ->
#db{engine = {Engine, EngineState}} = Db,
Engine:read_doc_body(EngineState, RawDoc).
@@ -789,10 +837,17 @@ write_doc_body(#db{} = Db, #doc{} = Doc) ->
Engine:write_doc_body(EngineState, Doc).
-write_doc_infos(#db{} = Db, DocUpdates, LocalDocs, PurgedDocIdRevs) ->
+write_doc_infos(#db{} = Db, DocUpdates, LocalDocs) ->
#db{engine = {Engine, EngineState}} = Db,
{ok, NewSt} = Engine:write_doc_infos(
- EngineState, DocUpdates, LocalDocs, PurgedDocIdRevs),
+ EngineState, DocUpdates, LocalDocs),
+ {ok, Db#db{engine = {Engine, NewSt}}}.
+
+
+purge_doc_revs(#db{} = Db, DocUpdates, Purges) ->
+ #db{engine = {Engine, EngineState}} = Db,
+ {ok, NewSt} = Engine:purge_doc_revs(
+ EngineState, DocUpdates, Purges),
{ok, Db#db{engine = {Engine, NewSt}}}.
@@ -832,6 +887,11 @@ fold_changes(#db{} = Db, StartSeq, UserFun, UserAcc, Options) ->
Engine:fold_changes(EngineState, StartSeq, UserFun, UserAcc, Options).
+fold_purged_docs(#db{} = Db, StartPurgeSeq, UserFun, UserAcc, Options) ->
+ #db{engine = {Engine, EngineState}} = Db,
+ Engine:fold_purged_docs(EngineState, StartPurgeSeq, UserFun, UserAcc, Options).
+
+
count_changes_since(#db{} = Db, StartSeq) ->
#db{engine = {Engine, EngineState}} = Db,
Engine:count_changes_since(EngineState, StartSeq).
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/511aef20/src/couch_db_header.erl
----------------------------------------------------------------------
diff --git a/src/couch_db_header.erl b/src/couch_db_header.erl
deleted file mode 100644
index 355364f..0000000
--- a/src/couch_db_header.erl
+++ /dev/null
@@ -1,405 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_db_header).
-
-
--export([
- new/0,
- from/1,
- is_header/1,
- upgrade/1,
- set/2
-]).
-
--export([
- disk_version/1,
- update_seq/1,
- id_tree_state/1,
- seq_tree_state/1,
- latest/1,
- local_tree_state/1,
- purge_seq/1,
- purged_docs/1,
- security_ptr/1,
- revs_limit/1,
- uuid/1,
- epochs/1,
- compacted_seq/1
-]).
-
-
-% This should be updated anytime a header change happens that requires more
-% than filling in new defaults.
-%
-% As long the changes are limited to new header fields (with inline
-% defaults) added to the end of the record, then there is no need to increment
-% the disk revision number.
-%
-% if the disk revision is incremented, then new upgrade logic will need to be
-% added to couch_db_updater:init_db.
-
--define(LATEST_DISK_VERSION, 6).
-
--record(db_header, {
- disk_version = ?LATEST_DISK_VERSION,
- update_seq = 0,
- unused = 0,
- id_tree_state = nil,
- seq_tree_state = nil,
- local_tree_state = nil,
- purge_seq = 0,
- purged_docs = nil,
- security_ptr = nil,
- revs_limit = 1000,
- uuid,
- epochs,
- compacted_seq
-}).
-
-
-new() ->
- #db_header{
- uuid = couch_uuids:random(),
- epochs = [{node(), 0}]
- }.
-
-
-from(Header0) ->
- Header = upgrade(Header0),
- #db_header{
- uuid = Header#db_header.uuid,
- epochs = Header#db_header.epochs,
- compacted_seq = Header#db_header.compacted_seq
- }.
-
-
-is_header(Header) ->
- try
- upgrade(Header),
- true
- catch _:_ ->
- false
- end.
-
-
-upgrade(Header) ->
- Funs = [
- fun upgrade_tuple/1,
- fun upgrade_disk_version/1,
- fun upgrade_uuid/1,
- fun upgrade_epochs/1,
- fun upgrade_compacted_seq/1
- ],
- lists:foldl(fun(F, HdrAcc) ->
- F(HdrAcc)
- end, Header, Funs).
-
-
-set(Header0, Fields) ->
- % A subtlety here is that if a database was open during
- % the release upgrade that updates to uuids and epochs then
- % this dynamic upgrade also assigns a uuid and epoch.
- Header = upgrade(Header0),
- lists:foldl(fun({Field, Value}, HdrAcc) ->
- set_field(HdrAcc, Field, Value)
- end, Header, Fields).
-
-
-disk_version(Header) ->
- get_field(Header, disk_version).
-
-
-update_seq(Header) ->
- get_field(Header, update_seq).
-
-
-id_tree_state(Header) ->
- get_field(Header, id_tree_state).
-
-
-seq_tree_state(Header) ->
- get_field(Header, seq_tree_state).
-
-
-local_tree_state(Header) ->
- get_field(Header, local_tree_state).
-
-
-purge_seq(Header) ->
- get_field(Header, purge_seq).
-
-
-purged_docs(Header) ->
- get_field(Header, purged_docs).
-
-
-security_ptr(Header) ->
- get_field(Header, security_ptr).
-
-
-revs_limit(Header) ->
- get_field(Header, revs_limit).
-
-
-uuid(Header) ->
- get_field(Header, uuid).
-
-
-epochs(Header) ->
- get_field(Header, epochs).
-
-
-compacted_seq(Header) ->
- get_field(Header, compacted_seq).
-
-
-get_field(Header, Field) ->
- Idx = index(Field),
- case Idx > tuple_size(Header) of
- true -> undefined;
- false -> element(index(Field), Header)
- end.
-
-
-set_field(Header, Field, Value) ->
- setelement(index(Field), Header, Value).
-
-
-index(Field) ->
- couch_util:get_value(Field, indexes()).
-
-
-indexes() ->
- Fields = record_info(fields, db_header),
- Indexes = lists:seq(2, record_info(size, db_header)),
- lists:zip(Fields, Indexes).
-
-
-upgrade_tuple(Old) when is_record(Old, db_header) ->
- Old;
-upgrade_tuple(Old) when is_tuple(Old) ->
- NewSize = record_info(size, db_header),
- if tuple_size(Old) < NewSize -> ok; true ->
- erlang:error({invalid_header_size, Old})
- end,
- {_, New} = lists:foldl(fun(Val, {Idx, Hdr}) ->
- {Idx+1, setelement(Idx, Hdr, Val)}
- end, {1, #db_header{}}, tuple_to_list(Old)),
- if is_record(New, db_header) -> ok; true ->
- erlang:error({invalid_header_extension, {Old, New}})
- end,
- New.
-
--define(OLD_DISK_VERSION_ERROR,
- "Database files from versions smaller than 0.10.0 are no longer supported").
-
-upgrade_disk_version(#db_header{}=Header) ->
- case element(2, Header) of
- 1 -> throw({database_disk_version_error, ?OLD_DISK_VERSION_ERROR});
- 2 -> throw({database_disk_version_error, ?OLD_DISK_VERSION_ERROR});
- 3 -> throw({database_disk_version_error, ?OLD_DISK_VERSION_ERROR});
- 4 -> Header#db_header{security_ptr = nil}; % [0.10 - 0.11)
- 5 -> Header; % pre 1.2
- ?LATEST_DISK_VERSION -> Header;
- _ ->
- Reason = "Incorrect disk header version",
- throw({database_disk_version_error, Reason})
- end.
-
-
-upgrade_uuid(#db_header{}=Header) ->
- case Header#db_header.uuid of
- undefined ->
- % Upgrading this old db file to a newer
- % on disk format that includes a UUID.
- Header#db_header{uuid=couch_uuids:random()};
- _ ->
- Header
- end.
-
-
-upgrade_epochs(#db_header{}=Header) ->
- NewEpochs = case Header#db_header.epochs of
- undefined ->
- % This node is taking over ownership of shard with
- % and old version of couch file. Before epochs there
- % was always an implicit assumption that a file was
- % owned since eternity by the node it was on. This
- % just codifies that assumption.
- [{node(), 0}];
- [{Node, _} | _] = Epochs0 when Node == node() ->
- % Current node is the current owner of this db
- Epochs0;
- Epochs1 ->
- % This node is taking over ownership of this db
- % and marking the update sequence where it happened.
- [{node(), Header#db_header.update_seq} | Epochs1]
- end,
- % Its possible for a node to open a db and claim
- % ownership but never make a write to the db. This
- % removes nodes that claimed ownership but never
- % changed the database.
- DedupedEpochs = remove_dup_epochs(NewEpochs),
- Header#db_header{epochs=DedupedEpochs}.
-
-
-% This is slightly relying on the udpate_seq's being sorted
-% in epochs due to how we only ever push things onto the
-% front. Although if we ever had a case where the update_seq
-% is not monotonically increasing I don't know that we'd
-% want to remove dupes (by calling a sort on the input to this
-% function). So for now we don't sort but are relying on the
-% idea that epochs is always sorted.
-remove_dup_epochs([_]=Epochs) ->
- Epochs;
-remove_dup_epochs([{N1, S}, {_N2, S}]) ->
- % Seqs match, keep the most recent owner
- [{N1, S}];
-remove_dup_epochs([_, _]=Epochs) ->
- % Seqs don't match.
- Epochs;
-remove_dup_epochs([{N1, S}, {_N2, S} | Rest]) ->
- % Seqs match, keep the most recent owner
- remove_dup_epochs([{N1, S} | Rest]);
-remove_dup_epochs([{N1, S1}, {N2, S2} | Rest]) ->
- % Seqs don't match, recurse to check others
- [{N1, S1} | remove_dup_epochs([{N2, S2} | Rest])].
-
-
-upgrade_compacted_seq(#db_header{}=Header) ->
- case Header#db_header.compacted_seq of
- undefined ->
- Header#db_header{compacted_seq=0};
- _ ->
- Header
- end.
-
-latest(?LATEST_DISK_VERSION) ->
- true;
-latest(N) when is_integer(N), N < ?LATEST_DISK_VERSION ->
- false;
-latest(_Else) ->
- undefined.
-
--ifdef(TEST).
--include_lib("eunit/include/eunit.hrl").
-
-mk_header(Vsn) ->
- {
- db_header, % record name
- Vsn, % disk version
- 100, % update_seq
- 0, % unused
- foo, % id_tree_state
- bar, % seq_tree_state
- bam, % local_tree_state
- 1, % purge_seq
- baz, % purged_docs
- bang, % security_ptr
- 999 % revs_limit
- }.
-
-
-upgrade_v3_test() ->
- Vsn3Header = mk_header(3),
- NewHeader = upgrade_tuple(Vsn3Header),
-
- % Tuple upgrades don't change
- ?assert(is_record(NewHeader, db_header)),
- ?assertEqual(3, disk_version(NewHeader)),
- ?assertEqual(100, update_seq(NewHeader)),
- ?assertEqual(foo, id_tree_state(NewHeader)),
- ?assertEqual(bar, seq_tree_state(NewHeader)),
- ?assertEqual(bam, local_tree_state(NewHeader)),
- ?assertEqual(1, purge_seq(NewHeader)),
- ?assertEqual(baz, purged_docs(NewHeader)),
- ?assertEqual(bang, security_ptr(NewHeader)),
- ?assertEqual(999, revs_limit(NewHeader)),
- ?assertEqual(undefined, uuid(NewHeader)),
- ?assertEqual(undefined, epochs(NewHeader)),
-
- ?assertThrow({database_disk_version_error, _},
- upgrade_disk_version(NewHeader)).
-
-
-upgrade_v5_test() ->
- Vsn5Header = mk_header(5),
- NewHeader = upgrade_disk_version(upgrade_tuple(Vsn5Header)),
-
- ?assert(is_record(NewHeader, db_header)),
- ?assertEqual(5, disk_version(NewHeader)),
-
- % Security ptr isn't changed for v5 headers
- ?assertEqual(bang, security_ptr(NewHeader)).
-
-
-upgrade_uuid_test() ->
- Vsn5Header = mk_header(5),
-
- % Upgraded headers get a new UUID
- NewHeader = upgrade_uuid(upgrade_disk_version(upgrade_tuple(Vsn5Header))),
- ?assertMatch(<<_:32/binary>>, uuid(NewHeader)),
-
- % Headers with a UUID don't have their UUID changed
- NewNewHeader = upgrade_uuid(upgrade_disk_version(upgrade_tuple(NewHeader))),
- ?assertEqual(uuid(NewHeader), uuid(NewNewHeader)),
-
- % Derived empty headers maintain the same UUID
- ResetHeader = from(NewNewHeader),
- ?assertEqual(uuid(NewHeader), uuid(ResetHeader)).
-
-
-upgrade_epochs_test() ->
- Vsn5Header = mk_header(5),
-
- % Upgraded headers get a default epochs set
- NewHeader = upgrade(Vsn5Header),
- ?assertEqual([{node(), 0}], epochs(NewHeader)),
-
- % Fake an old entry in epochs
- FakeFields = [
- {update_seq, 20},
- {epochs, [{'someothernode@someotherhost', 0}]}
- ],
- NotOwnedHeader = set(NewHeader, FakeFields),
-
- OwnedEpochs = [
- {node(), 20},
- {'someothernode@someotherhost', 0}
- ],
-
- % Upgrading a header not owned by the local node updates
- % the epochs appropriately.
- NowOwnedHeader = upgrade(NotOwnedHeader),
- ?assertEqual(OwnedEpochs, epochs(NowOwnedHeader)),
-
- % Headers with epochs stay the same after upgrades
- NewNewHeader = upgrade(NowOwnedHeader),
- ?assertEqual(OwnedEpochs, epochs(NewNewHeader)),
-
- % Getting a reset header maintains the epoch data
- ResetHeader = from(NewNewHeader),
- ?assertEqual(OwnedEpochs, epochs(ResetHeader)).
-
-
-get_uuid_from_old_header_test() ->
- Vsn5Header = mk_header(5),
- ?assertEqual(undefined, uuid(Vsn5Header)).
-
-
-get_epochs_from_old_header_test() ->
- Vsn5Header = mk_header(5),
- ?assertEqual(undefined, epochs(Vsn5Header)).
-
-
--endif.
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/511aef20/src/couch_db_updater.erl
----------------------------------------------------------------------
diff --git a/src/couch_db_updater.erl b/src/couch_db_updater.erl
index 86a0300..315fbcd 100644
--- a/src/couch_db_updater.erl
+++ b/src/couch_db_updater.erl
@@ -86,79 +86,49 @@ handle_call({set_revs_limit, Limit}, _From, Db) ->
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
{reply, ok, Db2};
-handle_call({purge_docs, _IdRevs}, _From,
- #db{compactor_pid=Pid}=Db) when Pid /= nil ->
- {reply, {error, purge_during_compaction}, Db};
-handle_call({purge_docs, IdRevs}, _From, Db) ->
- DocIds = [Id || {Id, _Revs} <- IdRevs],
- OldDocInfos = couch_db_engine:open_docs(Db, DocIds),
-
- NewDocInfos = lists:flatmap(fun
- ({{Id, Revs}, #full_doc_info{id = Id, rev_tree = Tree} = FDI}) ->
- case couch_key_tree:remove_leafs(Tree, Revs) of
- {_, [] = _RemovedRevs} -> % no change
- [];
- {NewTree, RemovedRevs} ->
- NewFDI = FDI#full_doc_info{rev_tree = NewTree},
- [{FDI, NewFDI, RemovedRevs}]
- end;
- ({_, not_found}) ->
- []
- end, lists:zip(IdRevs, OldDocInfos)),
-
- InitUpdateSeq = couch_db_engine:get_update_seq(Db),
- InitAcc = {InitUpdateSeq, [], []},
- FinalAcc = lists:foldl(fun({_, #full_doc_info{} = OldFDI, RemRevs}, Acc) ->
- #full_doc_info{
- id = Id,
- rev_tree = OldTree
- } = OldFDI,
- {SeqAcc0, FDIAcc, IdRevsAcc} = Acc,
-
- {NewFDIAcc, NewSeqAcc} = case OldTree of
- [] ->
- % If we purged every #leaf{} in the doc record
- % then we're removing it completely from the
- % database.
- FDIAcc;
- _ ->
- % Its possible to purge the #leaf{} that contains
- % the update_seq where this doc sits in the update_seq
- % sequence. Rather than do a bunch of complicated checks
- % we just re-label every #leaf{} and reinsert it into
- % the update_seq sequence.
- {NewTree, SeqAcc1} = couch_key_tree:mapfold(fun
- (_RevId, Leaf, leaf, InnerSeqAcc) ->
- {Leaf#leaf{seq = InnerSeqAcc + 1}, InnerSeqAcc + 1};
- (_RevId, Value, _Type, InnerSeqAcc) ->
- {Value, InnerSeqAcc}
- end, SeqAcc0, OldTree),
-
- NewFDI = OldFDI#full_doc_info{
- update_seq = SeqAcc1,
- rev_tree = NewTree
- },
-
- {[NewFDI | FDIAcc], SeqAcc1}
- end,
- NewIdRevsAcc = [{Id, RemRevs} | IdRevsAcc],
- {NewSeqAcc, NewFDIAcc, NewIdRevsAcc}
- end, InitAcc, NewDocInfos),
-
- {_FinalSeq, FDIs, PurgedIdRevs} = FinalAcc,
-
- % We need to only use the list of #full_doc_info{} records
- % that we have actually changed due to a purge.
- PreviousFDIs = [PrevFDI || {PrevFDI, _, _} <- NewDocInfos],
- Pairs = pair_purge_info(PreviousFDIs, FDIs),
-
- {ok, Db2} = couch_db_engine:write_doc_infos(Db, Pairs, [], PurgedIdRevs),
-
+handle_call({set_purged_docs_limit, Limit}, _From, Db) ->
+ {ok, Db2} = couch_db_engine:set_purged_docs_limit(Db, Limit),
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- couch_event:notify(Db#db.name, updated),
+ {reply, ok, Db2};
- PurgeSeq = couch_db_engine:get_purge_seq(Db2),
- {reply, {ok, PurgeSeq, PurgedIdRevs}, Db2};
+handle_call({purge_docs, UUIDsIdsRevs}, _From, Db) ->
+ UpdateSeq = couch_db_engine:get_update_seq(Db, update_seq),
+ Ids = [Id||{_UUID, Id, _Revs} <- UUIDsIdsRevs],
+ OldDocInfos = couch_db_engine:open_docs(Db, Ids),
+ DocInfosPurges = lists:zip(OldDocInfos, UUIDsIdsRevs),
+
+ FoldFun = fun({OldDocInfo, {UUId, Id, Revs}}, {UpdSeq, RAcc, PaAcc, PuAcc}) ->
+ case purge_doc(UpdSeq, OldDocInfo, Id, Revs) of
+ not_found ->
+ {UpdSeq,
+ [{ok, []}| RAcc],
+ PaAcc,
+ PuAcc};
+ {NewUpdSeq, Pair, {Id, PurgedRevs}} ->
+ {NewUpdSeq,
+ [{ok, PurgedRevs}| RAcc],
+ [Pair| PaAcc],
+ [{UUId, Id, PurgedRevs}| PuAcc]}
+ end
+ end,
+ InitAcc = {UpdateSeq, [], [], []},
+ {_USeq, Replies, Pairs, Purges} =
+ lists:foldl(FoldFun, InitAcc, DocInfosPurges),
+
+ Db2 = if Pairs == [] -> Db; true ->
+ {ok, Db1} = couch_db_engine:purge_doc_revs(
+ Db, lists:reverse(Pairs), lists:reverse(Purges)
+ ),
+ ok = gen_server:call(couch_server, {db_updated, Db1}, infinity),
+ couch_event:notify(Db#db.name, updated),
+ Db1
+ end,
+ PurgeSeq = couch_db_engine:get(Db2, purge_seq),
+ {reply, {ok, {PurgeSeq, lists:reverse(Replies)}}, Db2};
+
+handle_call(get_disposable_purge_seq, _From, Db) ->
+ NewOldestPurgeSeq = get_disposable_purge_seq(Db),
+ {reply, {ok, NewOldestPurgeSeq}, Db};
handle_call(Msg, From, Db) ->
couch_db_engine:handle_call(Msg, From, Db).
@@ -608,7 +578,7 @@ update_docs_int(Db, DocsList, LocalDocs, MergeConflicts, FullCommit) ->
Pairs = pair_write_info(OldDocLookups, IndexFDIs),
LocalDocs2 = update_local_doc_revs(LocalDocs),
- {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2, []),
+ {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2),
WriteCount = length(IndexFDIs),
couch_stats:increment_counter([couchdb, document_inserts],
@@ -662,6 +632,120 @@ update_local_doc_revs(Docs) ->
end, Docs).
+purge_doc(UpdateSeq, OldDocInfo, Id, Revs) ->
+ case OldDocInfo of
+ #full_doc_info{rev_tree = Tree} = FDI ->
+ case couch_key_tree:remove_leafs(Tree, Revs) of
+ {_, [] = _RemovedRevs} -> % no change
+ not_found;
+ {NewTree, RemovedRevs} ->
+ case NewTree of
+ [] ->
+ % If we purged every #leaf{} in the doc record
+ % then we're removing it completely from the
+ % database.
+ {UpdateSeq, {FDI, not_found}, {Id, RemovedRevs}};
+ _ ->
+ % Its possible to purge the #leaf{} that contains
+ % the update_seq where this doc sits in the
+ % update_seq sequence. Rather than do a bunch of
+ % complicated checks we just re-label every #leaf{}
+ % and reinsert it into the update_seq sequence.
+ {NewTree2, NewUpdateSeq} = couch_key_tree:mapfold(fun
+ (_RevId, Leaf, leaf, InnerSeqAcc) ->
+ {Leaf#leaf{seq = InnerSeqAcc + 1},
+ InnerSeqAcc + 1};
+ (_RevId, Value, _Type, InnerSeqAcc) ->
+ {Value, InnerSeqAcc}
+ end, UpdateSeq, NewTree),
+
+ NewFDI = FDI#full_doc_info{
+ update_seq = NewUpdateSeq,
+ rev_tree = NewTree2
+ },
+ {NewUpdateSeq, {FDI, NewFDI}, {Id, RemovedRevs}}
+ end
+ end;
+ not_found ->
+ not_found
+ end.
+
+
+% find purge seq such that all purge requests that happen before or
+% during it can be removed from purge trees
+get_disposable_purge_seq(#db{name=DbName} = Db) ->
+ PSeq = couch_db_engine:get(Db, purge_seq),
+ OldestPSeq = couch_db_engine:get(Db, oldest_purge_seq),
+ PDocsLimit = couch_db_engine:get(Db, purged_docs_limit),
+ ExpectedDispPSeq = PSeq - PDocsLimit,
+ % client's purge_seq can be up to "allowed_purge_seq_lag"
+ % behind ExpectedDispPSeq
+ AllowedPSeqLag = config:get_integer("couchdb", "allowed_purge_seq_lag", 100),
+ ClientAllowedMinPSeq = ExpectedDispPSeq - AllowedPSeqLag,
+ DisposablePSeq = if OldestPSeq > ClientAllowedMinPSeq ->
+ % DisposablePSeq is the last pseq we can remove;
+ % it should be one less than OldestPSeq when #purges is within limit
+ OldestPSeq - 1;
+ true ->
+ % Find the smallest checkpointed purge_seq among clients
+ Opts = [
+ {start_key, list_to_binary(?LOCAL_DOC_PREFIX ++ "purge-")},
+ {end_key_gt, list_to_binary(?LOCAL_DOC_PREFIX ++ "purge1")}
+ ],
+ FoldFun = fun(#doc{id=DocID, body={Props}}, MinPSeq) ->
+ ClientPSeq = couch_util:get_value(<<"purge_seq">>, Props),
+ MinPSeq2 = if ClientPSeq >= ClientAllowedMinPSeq ->
+ erlang:min(MinPSeq, ClientPSeq);
+ true ->
+ case check_client_exists(Props, DbName, DocID) of
+ true -> erlang:min(MinPSeq, ClientPSeq);
+ false -> MinPSeq % ignore nonexisting clients
+ end
+ end,
+ {ok, MinPSeq2}
+ end,
+ {ok, ClientPSeq} = couch_db_engine:fold_local_docs(Db, FoldFun, PSeq, Opts),
+ erlang:min(ClientPSeq, ExpectedDispPSeq)
+ end,
+ DisposablePSeq.
+
+
+check_client_exists(Props, DbName, DocID) ->
+ % will warn about clients that have not
+ % checkpointed more than "allowed_purge_time_lag"
+ AllowedPTimeLag = config:get_integer("couchdb",
+ "allowed_purge_time_lag", 86400), % secs in 1 day
+ M = couch_util:get_value(<<"verify_module">>, Props),
+ F = couch_util:get_value(<<"verify_function">>, Props),
+ A = couch_util:get_value(<<"verify_options">>, Props),
+ ClientExists = try erlang:apply(M, F, A) of
+ true ->
+ % warn if we haven't heard of this client more than AllowedPTimeLag
+ ClientTime = couch_util:get_value(<<"timestamp_utc">>, Props),
+ {ok, [Y, Mon, D, H, Min, S], [] }=
+ io_lib:fread("~4d-~2d-~2dT~2d:~2d:~2dZ", ClientTime),
+ SecsClient = calendar:datetime_to_gregorian_seconds(
+ {{Y, Mon, D}, {H, Min, S}}),
+ SecsNow = calendar:datetime_to_gregorian_seconds(
+ calendar:now_to_universal_time(os:timestamp())),
+ if SecsClient + AllowedPTimeLag > SecsNow -> ok; true ->
+ couch_log:warning("Processing of purge requests is much behind on: ~p."
+ "Prevents purge_tree of db:~p from compacting.", [A, DbName])
+ end,
+ true;
+ false ->
+ couch_log:warning("Client ~p doesn't exists, but its : ~p doc on ~p"
+ "still exists. Accessed during compaction of purge_tree",
+ [A, DocID, DbName]),
+ false
+ catch
+ error:Error ->
+ couch_log:error("error in evaluating if client exists: ~p", [Error]),
+ false
+ end,
+ ClientExists.
+
+
commit_data(Db) ->
commit_data(Db, false).
@@ -704,15 +788,6 @@ pair_write_info(Old, New) ->
end, New).
-pair_purge_info(Old, New) ->
- lists:map(fun(OldFDI) ->
- case lists:keyfind(OldFDI#full_doc_info.id, #full_doc_info.id, New) of
- #full_doc_info{} = NewFDI -> {OldFDI, NewFDI};
- false -> {OldFDI, not_found}
- end
- end, Old).
-
-
default_security_object(<<"shards/", _/binary>>) ->
case config:get("couchdb", "default_security", "everyone") of
"admin_only" ->
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/511aef20/src/couch_util.erl
----------------------------------------------------------------------
diff --git a/src/couch_util.erl b/src/couch_util.erl
index dc2ef64..8a739a9 100644
--- a/src/couch_util.erl
+++ b/src/couch_util.erl
@@ -34,6 +34,7 @@
-export([callback_exists/3, validate_callback_exists/3]).
-export([with_proc/4]).
-export([check_md5/2]).
+-export([utc_string/0]).
-include_lib("couch/include/couch_db.hrl").
@@ -643,3 +644,12 @@ with_proc(M, F, A, Timeout) ->
erlang:demonitor(Ref, [flush]),
{error, timeout}
end.
+
+
+utc_string() ->
+ {{Year, Month, Day}, {Hour, Minute, Second}} =
+ calendar:now_to_universal_time(os:timestamp()),
+ lists:flatten(
+ io_lib:format("~.4.0w-~.2.0w-~.2.0wT~.2.0w:~.2.0w:~.2.0wZ",
+ [Year, Month, Day, Hour, Minute, Second])
+ ).
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/511aef20/src/test_engine_compaction.erl
----------------------------------------------------------------------
diff --git a/src/test_engine_compaction.erl b/src/test_engine_compaction.erl
index b178bae..13b29fe 100644
--- a/src/test_engine_compaction.erl
+++ b/src/test_engine_compaction.erl
@@ -82,12 +82,9 @@ cet_compact_with_everything() ->
FooRev = test_engine_util:prev_rev(FooFDI),
BarRev = test_engine_util:prev_rev(BarFDI),
-
Actions3 = [
- {batch, [
- {purge, {<<"foo">>, FooRev#rev_info.rev}},
- {purge, {<<"bar">>, BarRev#rev_info.rev}}
- ]}
+ {purge, {<<"foo">>, FooRev#rev_info.rev}},
+ {purge, {<<"bar">>, BarRev#rev_info.rev}}
],
{ok, St6} = test_engine_util:apply_actions(Engine, St5, Actions3),
@@ -97,7 +94,9 @@ cet_compact_with_everything() ->
{<<"foo">>, [FooRev#rev_info.rev]}
],
- ?assertEqual(PurgedIdRevs, lists:sort(Engine:get_last_purged(St6))),
+ {ok, St7} = test_engine_util:apply_actions(Engine, St6, Actions3),
+ {ok, PIdRevs7} = Engine:fold_purged_docs(St7, 0, fun fold_fun/2, [], []),
+ ?assertEqual(PurgedIdRevs, lists:reverse(PIdRevs7)),
[Att0, Att1, Att2, Att3, Att4] = test_engine_util:prep_atts(Engine, St6, [
{<<"ohai.txt">>, crypto:rand_bytes(2048)},
@@ -127,11 +126,21 @@ cet_compact_with_everything() ->
end),
{ok, St10, undefined} = Engine:finish_compaction(St9, DbName, [], Term),
+ {ok, PIdRevs11} = Engine:fold_purged_docs(St10, 0, fun fold_fun/2, [], []),
+ ?assertEqual(
+ [
+ {<<"foo">>, [FooRev#rev_info.rev]},
+ {<<"bar">>, [BarRev#rev_info.rev]}
+ ],
+ lists:reverse(PIdRevs11)
+ ),
+
Db2 = test_engine_util:db_as_term(Engine, St10),
Diff = test_engine_util:term_diff(Db1, Db2),
?assertEqual(nodiff, Diff).
+
cet_recompact_updates() ->
{ok, Engine, Path, St1} = test_engine_util:init_engine(dbpath),
@@ -171,6 +180,105 @@ cet_recompact_updates() ->
?assertEqual(nodiff, Diff).
+cet_recompact_purge() ->
+ {ok, Engine, Path, St1} = test_engine_util:init_engine(dbpath),
+
+ Actions1 = [
+ {create, {<<"foo">>, []}},
+ {create, {<<"bar">>, []}},
+ {conflict, {<<"bar">>, [{<<"vsn">>, 2}]}},
+ {create, {<<"baz">>, []}}
+ ],
+
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions1),
+ {ok, St3, DbName, _, Term} = test_engine_util:compact(Engine, St2, Path),
+
+ [BarFDI, BazFDI] = Engine:open_docs(St3, [<<"bar">>, <<"baz">>]),
+ BarRev = test_engine_util:prev_rev(BarFDI),
+ BazRev = test_engine_util:prev_rev(BazFDI),
+ Actions2 = [
+ {purge, {<<"bar">>, BarRev#rev_info.rev}},
+ {purge, {<<"baz">>, BazRev#rev_info.rev}}
+ ],
+ {ok, St4} = test_engine_util:apply_actions(Engine, St3, Actions2),
+ Db1 = test_engine_util:db_as_term(Engine, St4),
+
+ {ok, St5, NewPid} = Engine:finish_compaction(St4, DbName, [], Term),
+
+ ?assertEqual(true, is_pid(NewPid)),
+ Ref = erlang:monitor(process, NewPid),
+
+ NewTerm = receive
+ {'$gen_cast', {compact_done, Engine, Term0}} ->
+ Term0;
+ {'DOWN', Ref, _, _, Reason} ->
+ erlang:error({compactor_died, Reason});
+ {'$gen_call', {NewPid, Ref2}, get_disposable_purge_seq} ->
+ NewPid!{Ref2, {ok, 0}},
+ receive
+ {'$gen_cast', {compact_done, Engine, Term0}} ->
+ Term0;
+ {'DOWN', Ref, _, _, Reason} ->
+ erlang:error({compactor_died, Reason})
+ after 10000 ->
+ erlang:error(compactor_timed_out)
+ end
+ after 10000 ->
+ erlang:error(compactor_timed_out)
+ end,
+
+ {ok, St6, undefined} = Engine:finish_compaction(St5, DbName, [], NewTerm),
+ Db2 = test_engine_util:db_as_term(Engine, St6),
+ Diff = test_engine_util:term_diff(Db1, Db2),
+ ?assertEqual(nodiff, Diff).
+
+
+cet_compact_purged_docs_limit() ->
+ {ok, Engine, Path, St1} = test_engine_util:init_engine(dbpath),
+ % create NumDocs docs
+ NumDocs = 1200,
+ {RActions, RIds} = lists:foldl(fun(Id, {CActions, CIds}) ->
+ Id1 = docid(Id),
+ Action = {create, {Id1, [{<<"int">>, Id}]}},
+ {[Action| CActions], [Id1| CIds]}
+ end, {[], []}, lists:seq(1, NumDocs)),
+ Ids = lists:reverse(RIds),
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1,
+ lists:reverse(RActions)),
+
+ % purge NumDocs docs
+ FDIs = Engine:open_docs(St2, Ids),
+ RevActions2 = lists:foldl(fun(FDI, CActions) ->
+ Id = FDI#full_doc_info.id,
+ PrevRev = test_engine_util:prev_rev(FDI),
+ Rev = PrevRev#rev_info.rev,
+ [{purge, {Id, Rev}}| CActions]
+ end, [], FDIs),
+ {ok, St3} = test_engine_util:apply_actions(Engine, St2,
+ lists:reverse(RevActions2)),
+
+ % check that before compaction all NumDocs of purge_requests
+ % are in purge_tree,
+ % even if NumDocs=1200 is greater than purged_docs_limit=1000
+ {ok, PurgedIdRevs} = Engine:fold_purged_docs(St3, 0, fun fold_fun/2, [], []),
+ ?assertEqual(1, Engine:get(St3, oldest_purge_seq)),
+ ?assertEqual(NumDocs, length(PurgedIdRevs)),
+
+ % compact db
+ {ok, St4, DbName, _, Term} = test_engine_util:compact(Engine, St3, Path),
+ {ok, St5, undefined} = Engine:finish_compaction(St4, DbName, [], Term),
+
+ % check that after compaction only purged_docs_limit purge_requests
+ % are in purge_tree
+ PurgedDocsLimit = Engine:get(St5, purged_docs_limit),
+ OldestPSeq = Engine:get(St5, oldest_purge_seq),
+ {ok, PurgedIdRevs2} = Engine:fold_purged_docs(
+ St5, OldestPSeq - 1, fun fold_fun/2, [], []),
+ ExpectedOldestPSeq = NumDocs - PurgedDocsLimit + 1,
+ ?assertEqual(ExpectedOldestPSeq, OldestPSeq),
+ ?assertEqual(PurgedDocsLimit, length(PurgedIdRevs2)).
+
+
docid(I) ->
Str = io_lib:format("~4..0b", [I]),
iolist_to_binary(Str).
@@ -179,3 +287,7 @@ docid(I) ->
local_docid(I) ->
Str = io_lib:format("_local/~4..0b", [I]),
iolist_to_binary(Str).
+
+
+fold_fun({_PSeq, _UUID, Id, Revs}, Acc) ->
+ [{Id, Revs} | Acc].
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/511aef20/src/test_engine_fold_purged_docs.erl
----------------------------------------------------------------------
diff --git a/src/test_engine_fold_purged_docs.erl b/src/test_engine_fold_purged_docs.erl
new file mode 100644
index 0000000..cb4238b
--- /dev/null
+++ b/src/test_engine_fold_purged_docs.erl
@@ -0,0 +1,134 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(test_engine_fold_purged_docs).
+-compile(export_all).
+
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(NUM_DOCS, 100).
+
+
+cet_empty_purged_docs() ->
+ {ok, Engine, St} = test_engine_util:init_engine(),
+ ?assertEqual({ok, []}, Engine:fold_purged_docs(St, 0, fun fold_fun/2, [], [])).
+
+
+cet_all_purged_docs() ->
+ {ok, Engine, St1} = test_engine_util:init_engine(),
+
+ {RActions, RIds} = lists:foldl(fun(Id, {CActions, CIds}) ->
+ Id1 = docid(Id),
+ Action = {create, {Id1, [{<<"int">>, Id}]}},
+ {[Action| CActions], [Id1| CIds]}
+ end, {[], []}, lists:seq(1, ?NUM_DOCS)),
+ Actions = lists:reverse(RActions),
+ Ids = lists:reverse(RIds),
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+
+ FDIs = Engine:open_docs(St2, Ids),
+ {RevActions2, RevIdRevs} = lists:foldl(fun(FDI, {CActions, CIdRevs}) ->
+ Id = FDI#full_doc_info.id,
+ PrevRev = test_engine_util:prev_rev(FDI),
+ Rev = PrevRev#rev_info.rev,
+ Action = {purge, {Id, Rev}},
+ {[Action| CActions], [{Id, [Rev]}| CIdRevs]}
+ end, {[], []}, FDIs),
+ {Actions2, IdsRevs} = {lists:reverse(RevActions2), lists:reverse(RevIdRevs)},
+
+ {ok, St3} = test_engine_util:apply_actions(Engine, St2, Actions2),
+ {ok, PurgedIdRevs} = Engine:fold_purged_docs(St3, 0, fun fold_fun/2, [], []),
+ ?assertEqual(IdsRevs, lists:reverse(PurgedIdRevs)).
+
+
+cet_start_seq() ->
+ {ok, Engine, St1} = test_engine_util:init_engine(),
+ Actions1 = [
+ {create, {docid(1), [{<<"int">>, 1}]}},
+ {create, {docid(2), [{<<"int">>, 2}]}},
+ {create, {docid(3), [{<<"int">>, 3}]}},
+ {create, {docid(4), [{<<"int">>, 4}]}},
+ {create, {docid(5), [{<<"int">>, 5}]}}
+ ],
+ Ids = [docid(1), docid(2), docid(3), docid(4), docid(5)],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions1),
+
+ FDIs = Engine:open_docs(St2, Ids),
+ {RActions2, RIdRevs} = lists:foldl(fun(FDI, {CActions, CIdRevs}) ->
+ Id = FDI#full_doc_info.id,
+ PrevRev = test_engine_util:prev_rev(FDI),
+ Rev = PrevRev#rev_info.rev,
+ Action = {purge, {Id, Rev}},
+ {[Action| CActions], [{Id, [Rev]}| CIdRevs]}
+ end, {[], []}, FDIs),
+ {ok, St3} = test_engine_util:apply_actions(Engine, St2, lists:reverse(RActions2)),
+
+ StartSeq = 3,
+ StartSeqIdRevs = lists:nthtail(StartSeq, lists:reverse(RIdRevs)),
+ {ok, PurgedIdRevs} = Engine:fold_purged_docs(St3, StartSeq, fun fold_fun/2, [], []),
+ ?assertEqual(StartSeqIdRevs, lists:reverse(PurgedIdRevs)).
+
+
+cet_id_rev_repeated() ->
+ {ok, Engine, St1} = test_engine_util:init_engine(),
+
+ Actions1 = [
+ {create, {<<"foo">>, [{<<"vsn">>, 1}]}},
+ {conflict, {<<"foo">>, [{<<"vsn">>, 2}]}}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions1),
+
+ [FDI1] = Engine:open_docs(St2, [<<"foo">>]),
+ PrevRev1 = test_engine_util:prev_rev(FDI1),
+ Rev1 = PrevRev1#rev_info.rev,
+ Actions2 = [
+ {purge, {<<"foo">>, Rev1}}
+ ],
+ {ok, St3} = test_engine_util:apply_actions(Engine, St2, Actions2),
+ PurgedIdRevs0 = [{<<"foo">>, [Rev1]}],
+ {ok, PurgedIdRevs1} = Engine:fold_purged_docs(St3, 0, fun fold_fun/2, [], []),
+ ?assertEqual(PurgedIdRevs0, PurgedIdRevs1),
+ ?assertEqual(1, Engine:get(St3, purge_seq)),
+
+ % purge the same Id,Rev when the doc still exists
+ {ok, St4} = test_engine_util:apply_actions(Engine, St3, Actions2),
+ {ok, PurgedIdRevs2} = Engine:fold_purged_docs(St4, 0, fun fold_fun/2, [], []),
+ ?assertEqual(PurgedIdRevs0, PurgedIdRevs2),
+ ?assertEqual(1, Engine:get(St4, purge_seq)),
+
+ [FDI2] = Engine:open_docs(St4, [<<"foo">>]),
+ PrevRev2 = test_engine_util:prev_rev(FDI2),
+ Rev2 = PrevRev2#rev_info.rev,
+ Actions3 = [
+ {purge, {<<"foo">>, Rev2}}
+ ],
+ {ok, St5} = test_engine_util:apply_actions(Engine, St4, Actions3),
+ PurgedIdRevs00 = [{<<"foo">>, [Rev1]}, {<<"foo">>, [Rev2]}],
+
+ % purge the same Id,Rev when the doc was completely purged
+ {ok, St6} = test_engine_util:apply_actions(Engine, St5, Actions3),
+ {ok, PurgedIdRevs3} = Engine:fold_purged_docs(St6, 0, fun fold_fun/2, [], []),
+ ?assertEqual(PurgedIdRevs00, lists:reverse(PurgedIdRevs3)),
+ ?assertEqual(2, Engine:get(St6, purge_seq)).
+
+
+fold_fun({_PSeq, _UUID, Id, Revs}, Acc) ->
+ [{Id, Revs} | Acc].
+
+
+docid(I) ->
+ Str = io_lib:format("~4..0b", [I]),
+ iolist_to_binary(Str).
+
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/511aef20/src/test_engine_get_set_props.erl
----------------------------------------------------------------------
diff --git a/src/test_engine_get_set_props.erl b/src/test_engine_get_set_props.erl
index 6d2a447..1e509dc 100644
--- a/src/test_engine_get_set_props.erl
+++ b/src/test_engine_get_set_props.erl
@@ -34,6 +34,8 @@ cet_default_props() ->
?assertEqual(true, is_integer(Engine:get_disk_version(St))),
?assertEqual(0, Engine:get_update_seq(St)),
?assertEqual(0, Engine:get_purge_seq(St)),
+ ?assertEqual(true, is_integer(Engine:get_purged_docs_limit(St))),
+ ?assertEqual(true, Engine:get_purged_docs_limit(St) > 0),
?assertEqual([], Engine:get_last_purged(St)),
?assertEqual(dso, Engine:get_security(St)),
?assertEqual(1000, Engine:get_revs_limit(St)),
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/511aef20/src/test_engine_purge_docs.erl
----------------------------------------------------------------------
diff --git a/src/test_engine_purge_docs.erl b/src/test_engine_purge_docs.erl
index e5bf249..191c2f2 100644
--- a/src/test_engine_purge_docs.erl
+++ b/src/test_engine_purge_docs.erl
@@ -25,12 +25,13 @@ cet_purge_simple() ->
{create, {<<"foo">>, [{<<"vsn">>, 1}]}}
],
{ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions1),
+ {ok, PIdRevs2} = Engine:fold_purged_docs(St2, 0, fun fold_fun/2, [], []),
?assertEqual(1, Engine:get_doc_count(St2)),
?assertEqual(0, Engine:get_del_doc_count(St2)),
?assertEqual(1, Engine:get_update_seq(St2)),
?assertEqual(0, Engine:get_purge_seq(St2)),
- ?assertEqual([], Engine:get_last_purged(St2)),
+ ?assertEqual([], PIdRevs2),
[FDI] = Engine:open_docs(St2, [<<"foo">>]),
PrevRev = test_engine_util:prev_rev(FDI),
@@ -40,12 +41,13 @@ cet_purge_simple() ->
{purge, {<<"foo">>, Rev}}
],
{ok, St3} = test_engine_util:apply_actions(Engine, St2, Actions2),
+ {ok, PIdRevs3} = Engine:fold_purged_docs(St3, 0, fun fold_fun/2, [], []),
?assertEqual(0, Engine:get_doc_count(St3)),
?assertEqual(0, Engine:get_del_doc_count(St3)),
?assertEqual(2, Engine:get_update_seq(St3)),
?assertEqual(1, Engine:get_purge_seq(St3)),
- ?assertEqual([{<<"foo">>, [Rev]}], Engine:get_last_purged(St3)).
+ ?assertEqual([{<<"foo">>, [Rev]}], PIdRevs3).
cet_purge_conflicts() ->
@@ -56,12 +58,13 @@ cet_purge_conflicts() ->
{conflict, {<<"foo">>, [{<<"vsn">>, 2}]}}
],
{ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions1),
+ {ok, PIdRevs2} = Engine:fold_purged_docs(St2, 0, fun fold_fun/2, [], []),
?assertEqual(1, Engine:get_doc_count(St2)),
?assertEqual(0, Engine:get_del_doc_count(St2)),
?assertEqual(2, Engine:get_update_seq(St2)),
?assertEqual(0, Engine:get_purge_seq(St2)),
- ?assertEqual([], Engine:get_last_purged(St2)),
+ ?assertEqual([], PIdRevs2),
[FDI1] = Engine:open_docs(St2, [<<"foo">>]),
PrevRev1 = test_engine_util:prev_rev(FDI1),
@@ -71,12 +74,13 @@ cet_purge_conflicts() ->
{purge, {<<"foo">>, Rev1}}
],
{ok, St3} = test_engine_util:apply_actions(Engine, St2, Actions2),
+ {ok, PIdRevs3} = Engine:fold_purged_docs(St3, 0, fun fold_fun/2, [], []),
?assertEqual(1, Engine:get_doc_count(St3)),
?assertEqual(0, Engine:get_del_doc_count(St3)),
?assertEqual(4, Engine:get_update_seq(St3)),
?assertEqual(1, Engine:get_purge_seq(St3)),
- ?assertEqual([{<<"foo">>, [Rev1]}], Engine:get_last_purged(St3)),
+ ?assertEqual([{<<"foo">>, [Rev1]}], PIdRevs3),
[FDI2] = Engine:open_docs(St3, [<<"foo">>]),
PrevRev2 = test_engine_util:prev_rev(FDI2),
@@ -86,12 +90,14 @@ cet_purge_conflicts() ->
{purge, {<<"foo">>, Rev2}}
],
{ok, St4} = test_engine_util:apply_actions(Engine, St3, Actions3),
+ {ok, PIdRevs4} = Engine:fold_purged_docs(St4, 0, fun fold_fun/2, [], []),
?assertEqual(0, Engine:get_doc_count(St4)),
?assertEqual(0, Engine:get_del_doc_count(St4)),
?assertEqual(5, Engine:get_update_seq(St4)),
?assertEqual(2, Engine:get_purge_seq(St4)),
- ?assertEqual([{<<"foo">>, [Rev2]}], Engine:get_last_purged(St4)).
+ ?assertEqual([{<<"foo">>, [Rev1]}, {<<"foo">>, [Rev2]}],
+ lists:reverse(PIdRevs4)).
cet_add_delete_purge() ->
@@ -103,12 +109,14 @@ cet_add_delete_purge() ->
],
{ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions1),
+ {ok, PIdRevs2} = Engine:fold_purged_docs(St2, 0, fun fold_fun/2, [], []),
+
?assertEqual(0, Engine:get_doc_count(St2)),
?assertEqual(1, Engine:get_del_doc_count(St2)),
?assertEqual(2, Engine:get_update_seq(St2)),
?assertEqual(0, Engine:get_purge_seq(St2)),
- ?assertEqual([], Engine:get_last_purged(St2)),
+ ?assertEqual([], PIdRevs2),
[FDI] = Engine:open_docs(St2, [<<"foo">>]),
PrevRev = test_engine_util:prev_rev(FDI),
@@ -118,12 +126,13 @@ cet_add_delete_purge() ->
{purge, {<<"foo">>, Rev}}
],
{ok, St3} = test_engine_util:apply_actions(Engine, St2, Actions2),
+ {ok, PIdRevs3} = Engine:fold_purged_docs(St3, 0, fun fold_fun/2, [], []),
?assertEqual(0, Engine:get_doc_count(St3)),
?assertEqual(0, Engine:get_del_doc_count(St3)),
?assertEqual(3, Engine:get_update_seq(St3)),
?assertEqual(1, Engine:get_purge_seq(St3)),
- ?assertEqual([{<<"foo">>, [Rev]}], Engine:get_last_purged(St3)).
+ ?assertEqual([{<<"foo">>, [Rev]}], PIdRevs3).
cet_add_two_purge_one() ->
@@ -135,12 +144,13 @@ cet_add_two_purge_one() ->
],
{ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions1),
+ {ok, PIdRevs2} = Engine:fold_purged_docs(St2, 0, fun fold_fun/2, [], []),
?assertEqual(2, Engine:get_doc_count(St2)),
?assertEqual(0, Engine:get_del_doc_count(St2)),
?assertEqual(2, Engine:get_update_seq(St2)),
?assertEqual(0, Engine:get_purge_seq(St2)),
- ?assertEqual([], Engine:get_last_purged(St2)),
+ ?assertEqual([], PIdRevs2),
[FDI] = Engine:open_docs(St2, [<<"foo">>]),
PrevRev = test_engine_util:prev_rev(FDI),
@@ -150,9 +160,14 @@ cet_add_two_purge_one() ->
{purge, {<<"foo">>, Rev}}
],
{ok, St3} = test_engine_util:apply_actions(Engine, St2, Actions2),
+ {ok, PIdRevs3} = Engine:fold_purged_docs(St3, 0, fun fold_fun/2, [], []),
?assertEqual(1, Engine:get_doc_count(St3)),
?assertEqual(0, Engine:get_del_doc_count(St3)),
?assertEqual(3, Engine:get_update_seq(St3)),
?assertEqual(1, Engine:get_purge_seq(St3)),
- ?assertEqual([{<<"foo">>, [Rev]}], Engine:get_last_purged(St3)).
+ ?assertEqual([{<<"foo">>, [Rev]}], PIdRevs3).
+
+
+fold_fun({_Pseq, _UUID, Id, Revs}, Acc) ->
+ [{Id, Revs} | Acc].