You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/03/15 22:11:25 UTC

[1/2] couch commit: updated refs/heads/COUCHDB-3326-clustered-purge to 7a28094

Repository: couchdb-couch
Updated Branches:
  refs/heads/COUCHDB-3326-clustered-purge [created] 7a2809477


http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/7a280947/src/test_engine_util.erl
----------------------------------------------------------------------
diff --git a/src/test_engine_util.erl b/src/test_engine_util.erl
index 33048d3..4d06cee 100644
--- a/src/test_engine_util.erl
+++ b/src/test_engine_util.erl
@@ -24,6 +24,7 @@
     test_engine_attachments,
     test_engine_fold_docs,
     test_engine_fold_changes,
+    test_engine_fold_purged_docs,
     test_engine_purge_docs,
     test_engine_compaction,
     test_engine_ref_counting
@@ -129,28 +130,34 @@ apply_action(Engine, St, Action) ->
     apply_batch(Engine, St, [Action]).
 
 
+apply_batch(Engine, St, [{purge, {Id, Revs}}]) ->
+    UpdateSeq = Engine:get(St, update_seq) + 1,
+    case gen_write(Engine, St, {purge, {Id, Revs}}, UpdateSeq) of
+        {_, _, purged_before}->
+            St;
+        {Pair, _, {Id, PRevs}} ->
+            UUID = couch_uuids:new(),
+            {ok, NewSt} = Engine:purge_doc_revs(St, [Pair], [{UUID, Id, PRevs}]),
+            NewSt
+    end;
+
 apply_batch(Engine, St, Actions) ->
     UpdateSeq = Engine:get_update_seq(St) + 1,
     AccIn = {UpdateSeq, [], [], []},
     AccOut = lists:foldl(fun(Action, Acc) ->
-        {SeqAcc, DocAcc, LDocAcc, PurgeAcc} = Acc,
+        {SeqAcc, DocAcc, LDocAcc} = Acc,
         case Action of
             {_, {<<"_local/", _/binary>>, _}} ->
                 LDoc = gen_local_write(Engine, St, Action),
-                {SeqAcc, DocAcc, [LDoc | LDocAcc], PurgeAcc};
+                {SeqAcc, DocAcc, [LDoc | LDocAcc]};
             _ ->
-                case gen_write(Engine, St, Action, SeqAcc) of
-                    {_OldFDI, _NewFDI} = Pair ->
-                        {SeqAcc + 1, [Pair | DocAcc], LDocAcc, PurgeAcc};
-                    {Pair, NewSeqAcc, NewPurgeInfo} ->
-                        NewPurgeAcc = [NewPurgeInfo | PurgeAcc],
-                        {NewSeqAcc, [Pair | DocAcc], LDocAcc, NewPurgeAcc}
-                end
+                {OldFDI, NewFDI} = gen_write(Engine, St, Action, SeqAcc),
+                {SeqAcc + 1, [{OldFDI, NewFDI} | DocAcc], LDocAcc}
         end
     end, AccIn, Actions),
-    {_, Docs0, LDocs, PurgeIdRevs} = AccOut,
+    {_, Docs0, LDocs} = AccOut,
     Docs = lists:reverse(Docs0),
-    {ok, NewSt} = Engine:write_doc_infos(St, Docs, LDocs, PurgeIdRevs),
+    {ok, NewSt} = Engine:write_doc_infos(St, Docs, LDocs),
     NewSt.
 
 
@@ -218,39 +225,70 @@ gen_write(Engine, St, {create, {DocId, Body, Atts0}}, UpdateSeq) ->
     }};
 
 gen_write(Engine, St, {purge, {DocId, PrevRevs0, _}}, UpdateSeq) ->
-    [#full_doc_info{} = PrevFDI] = Engine:open_docs(St, [DocId]),
-    PrevRevs = if is_list(PrevRevs0) -> PrevRevs0; true -> [PrevRevs0] end,
-
-    #full_doc_info{
-        rev_tree = PrevTree
-    } = PrevFDI,
-
-    {NewTree, RemRevs} = couch_key_tree:remove_leafs(PrevTree, PrevRevs),
-    RemovedAll = lists:sort(RemRevs) == lists:sort(PrevRevs),
-    if RemovedAll -> ok; true ->
-        % If we didn't purge all the requested revisions
-        % then its a bug in the test.
-        erlang:error({invalid_purge_test_revs, PrevRevs})
-    end,
+    case Engine:open_docs(St, [DocId]) of
+    [not_found] ->
+        % Check if this doc has been purged before
+        FoldFun = fun({_PSeq, _UUID, Id, _Revs}, _Acc) ->
+            case Id of
+                DocId -> true;
+                _ -> false
+            end
+        end,
+        {ok, IsPurgedBefore} = Engine:fold_purged_docs(St, 0, FoldFun, false, []),
+        case IsPurgedBefore of
+            true -> {{}, UpdateSeq, purged_before};
+            false -> erlang:error({invalid_purge_test_id, DocId})
+        end;
+    [#full_doc_info{} = PrevFDI] ->
+        PrevRevs = if is_list(PrevRevs0) -> PrevRevs0; true -> [PrevRevs0] end,
+
+        #full_doc_info{
+            rev_tree = PrevTree
+        } = PrevFDI,
+
+        {NewTree, RemRevs0} = couch_key_tree:remove_leafs(PrevTree, PrevRevs),
+        {RemRevs, NotRemRevs} = lists:partition(fun(R) ->
+                lists:member(R, RemRevs0) end, PrevRevs),
+
+        if NotRemRevs == [] -> ok; true ->
+            % Check if these Revs have been purged before
+            FoldFun = fun({_Pseq, _UUID, Id, Revs}, Acc) ->
+                case Id of
+                    DocId -> Acc ++ Revs;
+                    _ -> Acc
+                end
+            end,
+            {ok, PurgedRevs} = Engine:fold_purged_docs(St, 0, FoldFun, [], []),
+            case lists:subtract(PrevRevs, PurgedRevs) of [] -> ok; true ->
+                % If we didn't purge all the requested revisions
+                % and they haven't been purged before
+                % then its a bug in the test.
+                erlang:error({invalid_purge_test_revs, PrevRevs})
+            end
+        end,
+
+        case {RemRevs, NewTree} of
+            {[], _} ->
+                {{PrevFDI, PrevFDI}, UpdateSeq, purged_before};
+            {_, []} ->
+                % We've completely purged the document
+                {{PrevFDI, not_found}, UpdateSeq, {DocId, RemRevs}};
+            _ ->
+                % We have to relabel the update_seq of all
+                % leaves. See couch_db_updater for details.
+                {NewNewTree, NewUpdateSeq} = couch_key_tree:mapfold(fun
+                    (_RevId, Leaf, leaf, InnerSeqAcc) ->
+                        {Leaf#leaf{seq = InnerSeqAcc}, InnerSeqAcc + 1};
+                    (_RevId, Value, _Type, InnerSeqAcc) ->
+                        {Value, InnerSeqAcc}
+                end, UpdateSeq, NewTree),
+                NewFDI = PrevFDI#full_doc_info{
+                    update_seq = NewUpdateSeq - 1,
+                    rev_tree = NewNewTree
+                },
+                {{PrevFDI, NewFDI}, NewUpdateSeq, {DocId, RemRevs}}
 
-    case NewTree of
-        [] ->
-            % We've completely purged the document
-            {{PrevFDI, not_found}, UpdateSeq, {DocId, RemRevs}};
-        _ ->
-            % We have to relabel the update_seq of all
-            % leaves. See couch_db_updater for details.
-            {NewNewTree, NewUpdateSeq} = couch_key_tree:mapfold(fun
-                (_RevId, Leaf, leaf, InnerSeqAcc) ->
-                    {Leaf#leaf{seq = InnerSeqAcc}, InnerSeqAcc + 1};
-                (_RevId, Value, _Type, InnerSeqAcc) ->
-                    {Value, InnerSeqAcc}
-            end, UpdateSeq, NewTree),
-            NewFDI = PrevFDI#full_doc_info{
-                update_seq = NewUpdateSeq - 1,
-                rev_tree = NewNewTree
-            },
-            {{PrevFDI, NewFDI}, NewUpdateSeq, {DocId, RemRevs}}
+        end
     end;
 
 gen_write(Engine, St, {Action, {DocId, Body, Atts0}}, UpdateSeq) ->
@@ -403,7 +441,8 @@ db_as_term(Engine, St) ->
         {props, db_props_as_term(Engine, St)},
         {docs, db_docs_as_term(Engine, St)},
         {local_docs, db_local_docs_as_term(Engine, St)},
-        {changes, db_changes_as_term(Engine, St)}
+        {changes, db_changes_as_term(Engine, St)},
+        {purged_docs, db_purged_docs_as_term(Engine, St)}
     ].
 
 
@@ -414,6 +453,7 @@ db_props_as_term(Engine, St) ->
         get_disk_version,
         get_update_seq,
         get_purge_seq,
+        get_purged_docs_limit,
         get_last_purged,
         get_security,
         get_revs_limit,
@@ -447,6 +487,15 @@ db_changes_as_term(Engine, St) ->
     end, Changes)).
 
 
+db_purged_docs_as_term(Engine, St) ->
+    StartPSeq = Engine:get(St, oldest_purge_seq) - 1,
+    FoldFun = fun({PSeq, UUID, Id, Revs}, Acc) ->
+        [{PSeq, UUID, Id, Revs} | Acc]
+    end,
+    {ok, PDocs} = Engine:fold_purged_docs(St, StartPSeq, FoldFun, [], []),
+    PDocs.
+
+
 fdi_to_term(Engine, St, FDI) ->
     #full_doc_info{
         id = DocId,
@@ -573,11 +622,27 @@ compact(Engine, St1, DbPath) ->
         {'$gen_cast', {compact_done, Engine, Term0}} ->
             Term0;
         {'DOWN', Ref, _, _, Reason} ->
-            erlang:error({compactor_died, Reason})
+            erlang:error({compactor_died, Reason});
+        {'$gen_call', {Pid, Ref2}, get_disposable_purge_seq} ->
+            % assuming no client exists (no internal replications or indexes)
+            PSeq = Engine:get(St2, purge_seq),
+            OldestPSeq = Engine:get(St2, oldest_purge_seq),
+            PDocsLimit = Engine:get(St2, purged_docs_limit),
+            ExpectedDispPSeq = PSeq - PDocsLimit,
+            DisposablePSeq = if ExpectedDispPSeq > 0 -> ExpectedDispPSeq;
+                    true -> OldestPSeq - 1 end,
+            Pid!{Ref2, {ok, DisposablePSeq}},
+            receive
+                {'$gen_cast', {compact_done, Engine, Term0}} ->
+                    Term0;
+                {'DOWN', Ref, _, _, Reason} ->
+                    erlang:error({compactor_died, Reason})
+                after 10000 ->
+                    erlang:error(compactor_timed_out)
+            end
         after 10000 ->
             erlang:error(compactor_timed_out)
     end,
-
     {ok, St2, DbName, Pid, Term}.
 
 

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/7a280947/test/couch_db_purge_docs_tests.erl
----------------------------------------------------------------------
diff --git a/test/couch_db_purge_docs_tests.erl b/test/couch_db_purge_docs_tests.erl
new file mode 100644
index 0000000..58d869f
--- /dev/null
+++ b/test/couch_db_purge_docs_tests.erl
@@ -0,0 +1,348 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_db_purge_docs_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+setup() ->
+    DbName = ?tempdb(),
+    {ok, _Db} = create_db(DbName),
+    DbName.
+
+teardown(DbName) ->
+    delete_db(DbName),
+    ok.
+
+couch_db_purge_docs_test_() ->
+    {
+        "Couch_db purge_docs",
+        [
+            {
+                setup,
+                fun test_util:start_couch/0, fun test_util:stop_couch/1,
+                [couch_db_purge_docs()]
+            },
+            purge_with_replication()
+        ]
+
+    }.
+
+
+couch_db_purge_docs() ->
+    {
+       foreach,
+            fun setup/0, fun teardown/1,
+            [
+                fun purge_simple/1,
+                fun add_delete_purge/1,
+                fun add_two_purge_one/1,
+                fun purge_id_not_exist/1,
+                fun purge_non_leaf_rev/1,
+                fun purge_conflicts/1,
+                fun purge_deep_tree/1
+            ]
+    }.
+
+
+purge_simple(DbName) ->
+    ?_test(
+        begin
+            {ok, Db} = couch_db:open_int(DbName, []),
+            DocId = <<"foo">>,
+            {ok, Rev} = save_doc(Db, {[{<<"_id">>, DocId}, {<<"vsn">>, 1}]}),
+            couch_db:ensure_full_commit(Db),
+
+            {ok, Db2} = couch_db:reopen(Db),
+            ?assertEqual(1, couch_db_engine:get(Db2, doc_count)),
+            ?assertEqual(0, couch_db_engine:get(Db2, del_doc_count)),
+            ?assertEqual(1, couch_db_engine:get(Db2, update_seq)),
+            ?assertEqual(0, couch_db_engine:get(Db2, purge_seq)),
+            ?assertEqual(nil, couch_db_engine:get(Db2, purge_tree_state)),
+
+            UUID = couch_uuids:new(),
+            {ok, {PurgeSeq, [{ok, PRevs}]}} = couch_db:purge_docs(Db2,
+                [{UUID, DocId, [Rev]}]),
+            ?assertEqual([Rev], PRevs),
+            ?assertEqual(1, PurgeSeq),
+
+            {ok, Db3} = couch_db:reopen(Db2),
+            {ok, PIdsRevs} = couch_db:fold_purged_docs(Db3, 0, fun fold_fun/2, [], []),
+            ?assertEqual(0, couch_db_engine:get(Db3, doc_count)),
+            ?assertEqual(0, couch_db_engine:get(Db3, del_doc_count)),
+            ?assertEqual(2, couch_db_engine:get(Db3, update_seq)),
+            ?assertEqual(1, couch_db_engine:get(Db3, purge_seq)),
+            ?assertEqual([{DocId, [Rev]}], PIdsRevs)
+        end).
+
+
+add_delete_purge(DbName) ->
+    ?_test(
+        begin
+            {ok, Db0} = couch_db:open_int(DbName, []),
+            DocId = <<"foo">>,
+            {ok, Rev} = save_doc(Db0, {[{<<"_id">>, DocId}, {<<"vsn">>, 1}]}),
+            couch_db:ensure_full_commit(Db0),
+            {ok, Db1} = couch_db:reopen(Db0),
+
+            {ok, Rev2} = save_doc(Db1, {[{<<"_id">>, DocId}, {<<"vsn">>, 2},
+                {<<"_rev">>, couch_doc:rev_to_str(Rev)}, {<<"_deleted">>, true}]}),
+            couch_db:ensure_full_commit(Db1),
+
+            {ok, Db2} = couch_db:reopen(Db1),
+            {ok, PIdsRevs1} = couch_db:fold_purged_docs(Db2, 0, fun fold_fun/2, [], []),
+            ?assertEqual(0, couch_db_engine:get(Db2, doc_count)),
+            ?assertEqual(1, couch_db_engine:get(Db2, del_doc_count)),
+            ?assertEqual(2, couch_db_engine:get(Db2, update_seq)),
+            ?assertEqual(0, couch_db_engine:get(Db2, purge_seq)),
+            ?assertEqual([], PIdsRevs1),
+
+            UUID = couch_uuids:new(),
+            {ok, {PurgeSeq, [{ok, PRevs}]}} = couch_db:purge_docs(Db2,
+                [{UUID, DocId, [Rev2]}]),
+            ?assertEqual([Rev2], PRevs),
+            ?assertEqual(1, PurgeSeq),
+
+            {ok, Db3} = couch_db:reopen(Db2),
+            {ok, PIdsRevs2} = couch_db:fold_purged_docs(Db3, 0, fun fold_fun/2, [], []),
+            ?assertEqual(0, couch_db_engine:get(Db3, doc_count)),
+            ?assertEqual(0, couch_db_engine:get(Db3, del_doc_count)),
+            ?assertEqual(3, couch_db_engine:get(Db3, update_seq)),
+            ?assertEqual(1, couch_db_engine:get(Db3, purge_seq)),
+            ?assertEqual([{DocId, [Rev2]}], PIdsRevs2)
+        end).
+
+
+add_two_purge_one(DbName) ->
+    ?_test(
+        begin
+            {ok, Db} = couch_db:open_int(DbName, []),
+            {ok, Rev} = save_doc(Db, {[{<<"_id">>, <<"foo">>}, {<<"vsn">>, 1}]}),
+            {ok, _Rev2} = save_doc(Db, {[{<<"_id">>, <<"bar">>}]}),
+            couch_db:ensure_full_commit(Db),
+
+            {ok, Db2} = couch_db:reopen(Db),
+            ?assertEqual(2, couch_db_engine:get(Db2, doc_count)),
+            ?assertEqual(0, couch_db_engine:get(Db2, del_doc_count)),
+            ?assertEqual(2, couch_db_engine:get(Db2, update_seq)),
+            ?assertEqual(0, couch_db_engine:get(Db2, purge_seq)),
+
+            UUID = couch_uuids:new(),
+            {ok, {PurgeSeq, [{ok, PRevs}]}} = couch_db:purge_docs(Db2,
+                [{UUID, <<"foo">>, [Rev]}]),
+            ?assertEqual([Rev], PRevs),
+            ?assertEqual(1, PurgeSeq),
+
+            {ok, Db3} = couch_db:reopen(Db2),
+            {ok, PIdsRevs} = couch_db:fold_purged_docs(Db3, 0, fun fold_fun/2, [], []),
+            ?assertEqual(1, couch_db_engine:get(Db3, doc_count)),
+            ?assertEqual(0, couch_db_engine:get(Db3, del_doc_count)),
+            ?assertEqual(3, couch_db_engine:get(Db3, update_seq)),
+            ?assertEqual(1, couch_db_engine:get(Db3, purge_seq)),
+            ?assertEqual([{<<"foo">>, [Rev]}], PIdsRevs)
+        end).
+
+
+purge_id_not_exist(DbName) ->
+    ?_test(
+        begin
+            {ok, Db} = couch_db:open_int(DbName, []),
+            UUID = couch_uuids:new(),
+            {ok, {PurgeSeq, [{ok, PRevs}]}} = couch_db:purge_docs(Db,
+                [{UUID, <<"foo">>, [{0, <<0>>}]}]),
+
+            ?assertEqual([], PRevs),
+            ?assertEqual(0, PurgeSeq),
+
+            {ok, Db2} = couch_db:reopen(Db),
+            {ok, PIdsRevs} = couch_db:fold_purged_docs(Db2, 0, fun fold_fun/2, [], []),
+            ?assertEqual(0, couch_db_engine:get(Db2, doc_count)),
+            ?assertEqual(0, couch_db_engine:get(Db2, del_doc_count)),
+            ?assertEqual(0, couch_db_engine:get(Db2, update_seq)),
+            ?assertEqual(0, couch_db_engine:get(Db2, purge_seq)),
+            ?assertEqual([], PIdsRevs)
+        end).
+
+
+purge_non_leaf_rev(DbName) ->
+    ?_test(
+        begin
+            {ok, Db} = couch_db:open_int(DbName, []),
+            {ok, Rev} = save_doc(Db, {[{<<"_id">>, <<"foo">>}, {<<"vsn">>, 1}]}),
+            couch_db:ensure_full_commit(Db),
+            {ok, Db2} = couch_db:reopen(Db),
+
+            {ok, _Rev2} = save_doc(Db2, {[{<<"_id">>, <<"foo">>}, {<<"vsn">>, 2},
+                {<<"_rev">>, couch_doc:rev_to_str(Rev)}]}),
+            couch_db:ensure_full_commit(Db2),
+            {ok, Db3} = couch_db:reopen(Db2),
+
+            UUID = couch_uuids:new(),
+            {ok, {PurgeSeq, [{ok, PRevs}]}} = couch_db:purge_docs(Db3,
+                [{UUID, <<"foo">>, [Rev]}]),
+            ?assertEqual([], PRevs),
+            ?assertEqual(0, PurgeSeq),
+
+            {ok, Db4} = couch_db:reopen(Db3),
+            {ok, PIdsRevs} = couch_db:fold_purged_docs(Db4, 0, fun fold_fun/2, [], []),
+            ?assertEqual(1, couch_db_engine:get(Db4, doc_count)),
+            ?assertEqual(2, couch_db_engine:get(Db4, update_seq)),
+            ?assertEqual(0, couch_db_engine:get(Db4, purge_seq)),
+            ?assertEqual([], PIdsRevs)
+        end).
+
+
+purge_conflicts(DbName) ->
+    ?_test(
+        begin
+            {ok, Db} = couch_db:open_int(DbName, []),
+            {ok, Rev} = save_doc(Db, {[{<<"_id">>, <<"foo">>}, {<<"vsn">>, <<"v1.1">>}]}),
+            couch_db:ensure_full_commit(Db),
+            {ok, Db2} = couch_db:reopen(Db),
+
+            % create a conflict
+            DocConflict = #doc{
+                id = <<"foo">>,
+                revs = {1, [couch_crypto:hash(md5, <<"v1.2">>)]},
+                body = {[ {<<"vsn">>,  <<"v1.2">>}]}
+            },
+            {ok, _} = couch_db:update_doc(Db2, DocConflict, [], replicated_changes),
+            couch_db:ensure_full_commit(Db2),
+            {ok, Db3} = couch_db:reopen(Db2),
+
+            UUID = couch_uuids:new(),
+            {ok, {PurgeSeq, [{ok, PRevs}]}} = couch_db:purge_docs(Db3,
+                [{UUID, <<"foo">>, [Rev]}]),
+            ?assertEqual([Rev], PRevs),
+            ?assertEqual(1, PurgeSeq),
+
+            {ok, Db4} = couch_db:reopen(Db3),
+            {ok, PIdsRevs} = couch_db:fold_purged_docs(Db4, 0, fun fold_fun/2, [], []),
+            % still has one doc
+            ?assertEqual(1, couch_db_engine:get(Db4, doc_count)),
+            ?assertEqual(0, couch_db_engine:get(Db4, del_doc_count)),
+            ?assertEqual(3, couch_db_engine:get(Db4, update_seq)),
+            ?assertEqual(1, couch_db_engine:get(Db4, purge_seq)),
+            ?assertEqual([{<<"foo">>, [Rev]}], PIdsRevs)
+        end).
+
+
+purge_deep_tree(DbName) ->
+    ?_test(
+        begin
+            NRevs = 300,
+            {ok, Db0} = couch_db:open_int(DbName, []),
+            {ok, InitRev} = save_doc(Db0, {[{<<"_id">>, <<"bar">>}, {<<"vsn">>, 0}]}),
+            ok = couch_db:close(Db0),
+            LastRev = lists:foldl(fun(V, PrevRev) ->
+                {ok, Db} = couch_db:open_int(DbName, []),
+                {ok, Rev} = save_doc(Db,
+                    {[{<<"_id">>, <<"bar">>},
+                    {<<"vsn">>, V},
+                    {<<"_rev">>, couch_doc:rev_to_str(PrevRev)}]}
+                ),
+                ok = couch_db:close(Db),
+                Rev
+            end, InitRev, lists:seq(2, NRevs)),
+            {ok, Db1} = couch_db:open_int(DbName, []),
+
+            % purge doc
+            UUID = couch_uuids:new(),
+            {ok, {PurgeSeq, [{ok, PRevs}]}} = couch_db:purge_docs(Db1,
+                [{UUID, <<"bar">>, [LastRev]}]),
+            ?assertEqual([LastRev], PRevs),
+            ?assertEqual(1, PurgeSeq),
+
+            {ok, Db2} = couch_db:reopen(Db1),
+            % no docs left
+            ?assertEqual(0, couch_db_engine:get(Db2, doc_count)),
+            ?assertEqual(0, couch_db_engine:get(Db2, del_doc_count)),
+            ?assertEqual(1, couch_db_engine:get(Db2, purge_seq)),
+            ?assertEqual(NRevs + 1 , couch_db_engine:get(Db2, update_seq))
+        end).
+
+
+purge_with_replication() ->
+    ?_test(
+        begin
+            Ctx = test_util:start_couch([couch_replicator]),
+            Source = ?tempdb(),
+            {ok, SourceDb} = create_db(Source),
+            Target = ?tempdb(),
+            {ok, _Db} = create_db(Target),
+
+            % create Doc and do replication to Target
+            {ok, Rev} = save_doc(SourceDb,
+                {[{<<"_id">>, <<"foo">>}, {<<"vsn">>, 1}]}),
+            couch_db:ensure_full_commit(SourceDb),
+            {ok, SourceDb2} = couch_db:reopen(SourceDb),
+            RepObject = {[
+                {<<"source">>, Source},
+                {<<"target">>, Target}
+            ]},
+            {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER),
+            {ok, TargetDb} = couch_db:open_int(Target, []),
+            {ok, Doc} = couch_db:get_doc_info(TargetDb, <<"foo">>),
+
+            % purge Doc on Source and do replication to Target
+            % assert purges don't get replicated to Target
+            UUID = couch_uuids:new(),
+            {ok, _} = couch_db:purge_docs(SourceDb2, [{UUID, <<"foo">>, [Rev]}]),
+            {ok, SourceDb3} = couch_db:reopen(SourceDb2),
+            {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER),
+            {ok, TargetDb2} = couch_db:open_int(Target, []),
+            {ok, Doc2} = couch_db:get_doc_info(TargetDb2, <<"foo">>),
+            [Rev2] = Doc2#doc_info.revs,
+            ?assertEqual(Rev, Rev2#rev_info.rev),
+            ?assertEqual(Doc, Doc2),
+            ?assertEqual(0, couch_db_engine:get(SourceDb3, doc_count)),
+            ?assertEqual(1, couch_db_engine:get(SourceDb3, purge_seq)),
+            ?assertEqual(1, couch_db_engine:get(TargetDb2, doc_count)),
+            ?assertEqual(0, couch_db_engine:get(TargetDb2, purge_seq)),
+
+            % replicate from Target to Source
+            % assert that Doc reappears on Source
+            RepObject2 = {[
+                {<<"source">>, Target},
+                {<<"target">>, Source}
+            ]},
+            {ok, _} = couch_replicator:replicate(RepObject2, ?ADMIN_USER),
+            {ok, SourceDb4} = couch_db:reopen(SourceDb3),
+            {ok, Doc3} = couch_db:get_doc_info(SourceDb4, <<"foo">>),
+            [Rev3] = Doc3#doc_info.revs,
+            ?assertEqual(Rev, Rev3#rev_info.rev),
+            ?assertEqual(1, couch_db_engine:get(SourceDb4, doc_count)),
+            ?assertEqual(1, couch_db_engine:get(SourceDb4, purge_seq)),
+
+            delete_db(Source),
+            delete_db(Target),
+            ok = application:stop(couch_replicator),
+            ok = test_util:stop_couch(Ctx)
+        end).
+
+
+create_db(DbName) ->
+    couch_db:create(DbName, [?ADMIN_CTX, overwrite]).
+
+delete_db(DbName) ->
+    couch_server:delete(DbName, [?ADMIN_CTX]).
+
+save_doc(Db, Json) ->
+    Doc = couch_doc:from_json_obj(Json),
+    couch_db:update_doc(Db, Doc, []).
+
+fold_fun({_PSeq, _UUID, Id, Revs}, Acc) ->
+    [{Id, Revs} | Acc].
+
+


[2/2] couch commit: updated refs/heads/COUCHDB-3326-clustered-purge to 7a28094

Posted by da...@apache.org.
Implement new purge API

This is the basis for clustered purge.

COUCHDB-3266


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/7a280947
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/7a280947
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/7a280947

Branch: refs/heads/COUCHDB-3326-clustered-purge
Commit: 7a28094771d0f1896796fa61b33cc8fb96188b58
Parents: 0f4e1a7
Author: Mayya Sharipova <ma...@ca.ibm.com>
Authored: Wed Jun 22 09:58:04 2016 -0400
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Wed Mar 15 17:10:47 2017 -0500

----------------------------------------------------------------------
 src/couch_bt_engine.erl              | 265 +++++++++++++++----
 src/couch_bt_engine.hrl              |   4 +-
 src/couch_bt_engine_compactor.erl    | 126 ++++++++--
 src/couch_bt_engine_header.erl       |  27 +-
 src/couch_db.erl                     |  81 +++++-
 src/couch_db_engine.erl              | 110 ++++++--
 src/couch_db_header.erl              | 405 ------------------------------
 src/couch_db_updater.erl             | 237 +++++++++++------
 src/couch_util.erl                   |  10 +
 src/test_engine_compaction.erl       | 130 +++++++++-
 src/test_engine_fold_purged_docs.erl | 134 ++++++++++
 src/test_engine_get_set_props.erl    |   2 +
 src/test_engine_purge_docs.erl       |  33 ++-
 src/test_engine_util.erl             | 157 ++++++++----
 test/couch_db_purge_docs_tests.erl   | 348 +++++++++++++++++++++++++
 15 files changed, 1411 insertions(+), 658 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/7a280947/src/couch_bt_engine.erl
----------------------------------------------------------------------
diff --git a/src/couch_bt_engine.erl b/src/couch_bt_engine.erl
index 3b31341..a6a3dab 100644
--- a/src/couch_bt_engine.erl
+++ b/src/couch_bt_engine.erl
@@ -34,23 +34,28 @@
     get_doc_count/1,
     get_epochs/1,
     get_last_purged/1,
+    get_oldest_purge_seq/1,
     get_purge_seq/1,
+    get_purged_docs_limit/1,
     get_revs_limit/1,
     get_security/1,
     get_size_info/1,
     get_update_seq/1,
     get_uuid/1,
 
+    set_purged_docs_limit/2,
     set_revs_limit/2,
     set_security/2,
 
     open_docs/2,
     open_local_docs/2,
+    open_purged_docs/2,
     read_doc_body/2,
 
     serialize_doc/2,
     write_doc_body/2,
-    write_doc_infos/4,
+    write_doc_infos/3,
+    purge_doc_revs/3,
 
     commit_data/1,
 
@@ -62,6 +67,7 @@
     fold_local_docs/4,
     fold_changes/5,
     count_changes_since/2,
+    fold_purged_docs/5,
 
     start_compaction/4,
     finish_compaction/4
@@ -83,7 +89,9 @@
     seq_tree_reduce/2,
 
     local_tree_split/1,
-    local_tree_join/2
+    local_tree_join/2,
+
+    purge_tree_reduce/2
 ]).
 
 
@@ -98,6 +106,17 @@
 -include("couch_bt_engine.hrl").
 
 
+-record(pacc, {
+    add_docs = [],
+    rem_ids = [],
+    rem_seqs = [],
+    add_upurges = [],
+    add_purges = [],
+    update_seq,
+    purge_seq
+}).
+
+
 exists(FilePath) ->
     case filelib:is_file(FilePath) of
         true ->
@@ -205,6 +224,7 @@ get_doc_count(#st{} = St) ->
     {ok, {Count, _, _}} = couch_btree:full_reduce(St#st.id_tree),
     Count.
 
+
 get_epochs(#st{header = Header}) ->
     couch_bt_engine_header:get(Header, epochs).
 
@@ -223,6 +243,10 @@ get_purge_seq(#st{header = Header}) ->
     couch_bt_engine_header:get(Header, purge_seq).
 
 
+get_purged_docs_limit(#st{header = Header}) ->
+    couch_bt_enigne_header:get(Header, purged_docs_limit
+
+
 get_revs_limit(#st{header = Header}) ->
     couch_bt_engine_header:get(Header, revs_limit).
 
@@ -248,14 +272,10 @@ get_size_info(#st{} = St) ->
     ].
 
 
-get_security(#st{header = Header} = St) ->
-    case couch_bt_engine_header:get(Header, security_ptr) of
-        undefined ->
-            [];
-        Pointer ->
-            {ok, SecProps} = couch_file:pread_term(St#st.fd, Pointer),
-            SecProps
-    end.
+get_oldest_purge_seq(#st{} = St) ->
+    FoldFun = fun({K, _V}, _) -> {stop, K} end,
+    {ok, _, OldestSeq} = couch_btree:foldl(St#st.purge_tree, FoldFun, 0),
+    OldestSeq.
 
 
 get_update_seq(#st{header = Header}) ->
@@ -266,6 +286,16 @@ get_uuid(#st{header = Header}) ->
     couch_bt_engine_header:get(Header, uuid).
 
 
+set_purged_docs_limit(#st{header = Header} = St, PurgedDocsLimit) ->
+    NewSt = St#st{
+        header = couch_bt_enigne_header:set(Header, [
+            {purged_docs_limit, PurgedDocsLimit}
+        ]),
+        needs_commit = true
+    },
+    {ok, increment_update_seq(NewSt)}.
+
+
 set_revs_limit(#st{header = Header} = St, RevsLimit) ->
     NewSt = St#st{
         header = couch_bt_engine_header:set(Header, [
@@ -304,6 +334,14 @@ open_local_docs(#st{} = St, DocIds) ->
     end, Results).
 
 
+open_purged_docs(St, UUIDs) ->
+    Results = couch_btree:lookup(St#st.upurge_tree, UUIDs),
+    lists:map(fun
+        ({ok, IdRevs}) -> IdRevs;
+        (not_found) -> not_found
+    end, Results).
+
+
 read_doc_body(#st{} = St, #doc{} = Doc) ->
     {ok, {Body, Atts}} = couch_file:pread_term(St#st.fd, Doc#doc.body),
     Doc#doc{
@@ -343,7 +381,7 @@ write_doc_body(St, #doc{} = Doc) ->
     {ok, Doc#doc{body = Ptr}, Written}.
 
 
-write_doc_infos(#st{} = St, Pairs, LocalDocs, PurgedIdRevs) ->
+write_doc_infos(#st{} = St, Pairs, LocalDocs) ->
     #st{
         id_tree = IdTree,
         seq_tree = SeqTree,
@@ -383,23 +421,9 @@ write_doc_infos(#st{} = St, Pairs, LocalDocs, PurgedIdRevs) ->
         erlang:max(Seq, Acc)
     end, get_update_seq(St), Add),
 
-    NewHeader = case PurgedIdRevs of
-        [] ->
-            couch_bt_engine_header:set(St#st.header, [
-                {update_seq, NewUpdateSeq}
-            ]);
-        _ ->
-            {ok, Ptr, _} = couch_file:append_term(St#st.fd, PurgedIdRevs),
-            OldPurgeSeq = couch_bt_engine_header:get(St#st.header, purge_seq),
-            % We bump NewUpdateSeq because we have to ensure that
-            % indexers see that they need to process the new purge
-            % information.
-            couch_bt_engine_header:set(St#st.header, [
-                {update_seq, NewUpdateSeq + 1},
-                {purge_seq, OldPurgeSeq + 1},
-                {purged_docs, Ptr}
-            ])
-    end,
+    NewHeader = couch_bt_engine_header:set(St#st.header, [
+        {update_seq, NewUpdateSeq}
+    ]),
 
     {ok, St#st{
         header = NewHeader,
@@ -410,6 +434,62 @@ write_doc_infos(#st{} = St, Pairs, LocalDocs, PurgedIdRevs) ->
     }}.
 
 
+purge_doc_revs(#st{} = St, DocInfos, Purges) ->
+    #st{
+        id_tree = IdTree,
+        seq_tree = SeqTree,
+        purge_tree = PurgeTree,
+        upurge_tree = UPurgeTree
+    } = St,
+    UpdateSeq = couch_bt_engine_header:get(St#st.header, update_seq),
+    PurgeSeq = couch_bt_engine_header:get(St#st.header, purge_seq),
+    PAcc0 = #pacc{update_seq = UpdateSeq, purge_seq = PurgeSeq},
+
+    PAcc = lists:foldl(fun(DocInfoPurge, Acc) ->
+        {{OldFDI, NewFDI}, {UUId, DocId, Revs}} = DocInfoPurge,
+        NextPSeq = Acc#pacc.purge_seq + 1,
+        Acc2 = Acc#pacc{
+            rem_seqs = [OldFDI#full_doc_info.update_seq|Acc#pacc.rem_seqs],
+            add_upurges = [{UUId, {DocId, Revs}}|Acc#pacc.add_upurges],
+            add_purges =  [{NextPSeq, UUId}|Acc#pacc.add_purges],
+            purge_seq = NextPSeq
+        },
+        case NewFDI of
+            #full_doc_info{id = DocId, update_seq = NewUSeq} ->
+                Acc2#pacc{
+                    add_docs = [NewFDI|Acc2#pacc.add_docs],
+                    update_seq = NewUSeq
+                };
+            not_found ->
+                Acc2#pacc{rem_ids = [DocId|Acc#pacc.rem_ids]}
+        end
+    end, PAcc0, lists:zip(DocInfos, Purges)),
+
+    % We bump NewUpdateSeq because we have to ensure that
+    % indexers see that they need to process the new purge
+    % information.
+    NewUpdateSeq = if UpdateSeq == PAcc#pacc.update_seq -> UpdateSeq + 1;
+        true -> PAcc#pacc.update_seq end,
+    Header2 = couch_bt_engine_header:set(St#st.header, [
+        {update_seq, NewUpdateSeq},
+        {purge_seq, PAcc#pacc.purge_seq}
+    ]),
+    {ok, IdTree2} = couch_btree:add_remove(IdTree,
+            PAcc#pacc.add_docs, PAcc#pacc.rem_ids),
+    {ok, SeqTree2} = couch_btree:add_remove(SeqTree,
+            PAcc#pacc.add_docs, PAcc#pacc.rem_seqs),
+    {ok, UPurgeTree2} = couch_btree:add(UPurgeTree, PAcc#pacc.add_upurges),
+    {ok, PurgeTree2} = couch_btree:add(PurgeTree, PAcc#pacc.add_purges),
+    {ok, St#st{
+        header = Header2,
+        id_tree = IdTree2,
+        seq_tree = SeqTree2,
+        purge_tree = PurgeTree2,
+        upurge_tree = UPurgeTree2,
+        needs_commit = true
+    }}.
+
+
 commit_data(St) ->
     #st{
         fd = Fd,
@@ -469,6 +549,35 @@ fold_changes(St, SinceSeq, UserFun, UserAcc, Options) ->
     {ok, FinalUserAcc}.
 
 
+fold_purged_docs(St, StartSeq0, UserFun, UserAcc, Options) ->
+    StartSeq = StartSeq0 + 1,
+    PurgeTree = St#st.purge_tree,
+    {ok, _, MinSeq} = couch_btree:foldl(PurgeTree,
+        fun({K, _V}, _) -> {stop, K} end, 0),
+    if (MinSeq =< StartSeq) ->
+        Fun = fun drop_reductions2/4,
+        UFun = fun(PSeq, UUID, {PAcc, UUIDAcc}) ->
+            {ok, {[PSeq| PAcc],[UUID| UUIDAcc]}}
+        end,
+        PUAcc = {[], []},
+        InAcc = {UFun, PUAcc},
+        Opts = [{start_key, StartSeq}] ++ Options,
+        {ok, _, OutAcc} = couch_btree:fold(PurgeTree, Fun, InAcc, Opts),
+        {_, {PSeqs0, UUIDs0}} = OutAcc,
+        PSeqs = lists:reverse(PSeqs0),
+        UUIDs = lists:reverse(UUIDs0),
+        DocResults = couch_btree:lookup(St#st.upurge_tree, UUIDs),
+        FinalAcc = pfoldl(UserFun, UserAcc, PSeqs, DocResults),
+        {ok, FinalAcc};
+    true ->
+        throw({invalid_start_purge_seq, StartSeq0})
+    end.
+
+pfoldl(F, Accu, [PSeq| PSeqs], [{ok, {UUID, {Id, Revs}}}| DocResults]) ->
+    pfoldl(F, F({PSeq, UUID, Id, Revs}, Accu), PSeqs, DocResults);
+pfoldl(_F, Accu, [], []) -> Accu.
+
+
 count_changes_since(St, SinceSeq) ->
     BTree = St#st.seq_tree,
     FoldFun = fun(_SeqStart, PartialReds, 0) ->
@@ -494,8 +603,8 @@ finish_compaction(OldState, DbName, Options, CompactFilePath) ->
             finish_compaction_int(OldState, NewState1);
         false ->
             couch_log:info("Compaction file still behind main file "
-                           "(update seq=~p. compact update seq=~p). Retrying.",
-                           [OldSeq, NewSeq]),
+                    "(update seq=~p. compact update seq=~p). Retrying.",
+                    [OldSeq, NewSeq]),
             ok = decref(NewState1),
             start_compaction(OldState, DbName, Options, self())
     end.
@@ -593,6 +702,13 @@ seq_tree_reduce(rereduce, Reds) ->
     lists:sum(Reds).
 
 
+purge_tree_reduce(reduce, IdRevs) ->
+    % count the number of purge requests
+    length(IdRevs);
+purge_tree_reduce(rereduce, Reds) ->
+    lists:sum(Reds).
+
+
 local_tree_split(#doc{} = Doc) ->
     #doc{
         id = Id,
@@ -672,7 +788,8 @@ init_state(FilePath, Fd, Header0, Options) ->
     Compression = couch_compress:get_compression_method(),
 
     Header1 = couch_bt_engine_header:upgrade(Header0),
-    Header = set_default_security_object(Fd, Header1, Compression, Options),
+    Header2 = set_default_security_object(Fd, Header1, Compression, Options),
+    Header = upgrade_purge_info(Fd, Header2),
 
     IdTreeState = couch_bt_engine_header:id_tree_state(Header),
     {ok, IdTree} = couch_btree:open(IdTreeState, Fd, [
@@ -697,6 +814,16 @@ init_state(FilePath, Fd, Header0, Options) ->
             {compression, Compression}
         ]),
 
+    PurgeTreeState = couch_bt_engine_header:purge_tree_state(Header),
+    {ok, PurgeTree} = couch_btree:open(PurgeTreeState, Fd, [
+        {reduce, fun ?MODULE:purge_tree_reduce/2}
+    ]),
+
+    UPurgeTreeState = couch_bt_engine_header:upurge_tree_state(Header),
+    {ok, UPurgeTree} = couch_btree:open(UPurgeTreeState, Fd, [
+        {reduce, fun ?MODULE:purge_tree_reduce/2}
+    ]),
+
     ok = couch_file:set_db_pid(Fd, self()),
 
     St = #st{
@@ -709,7 +836,9 @@ init_state(FilePath, Fd, Header0, Options) ->
         id_tree = IdTree,
         seq_tree = SeqTree,
         local_tree = LocalTree,
-        compression = Compression
+        compression = Compression,
+        purge_tree = PurgeTree,
+        upurge_tree = UPurgeTree
     },
 
     % If this is a new database we've just created a
@@ -728,7 +857,9 @@ update_header(St, Header) ->
     couch_bt_engine_header:set(Header, [
         {seq_tree_state, couch_btree:get_state(St#st.seq_tree)},
         {id_tree_state, couch_btree:get_state(St#st.id_tree)},
-        {local_tree_state, couch_btree:get_state(St#st.local_tree)}
+        {local_tree_state, couch_btree:get_state(St#st.local_tree)},
+        {purge_tree_state, couch_btree:get_state(St#st.purge_tree)},
+        {upurge_tree_state, couch_btree:get_state(St#st.upurge_tree)}
     ]).
 
 
@@ -753,6 +884,46 @@ set_default_security_object(Fd, Header, Compression, Options) ->
     end.
 
 
+% This function is here, and not in couch_bt_engine_header
+% because it requires modifying file contents
+upgrade_purge_info(Fd, Header) ->
+    case couch_bt_engine_header:get(Header, purge_tree_state) of
+    nil ->
+        Header;
+    Ptr when is_tuple(Ptr) ->
+        Header;
+    Ptr when is_integer(Ptr)->
+        % old PurgeDocs format - upgrade to purge_tree
+        {ok, PurgedIdRevs} = couch_file:pread_term(Fd, Ptr),
+        PSeq = couch_bt_engine_header:purge_seq(Header),
+
+        {NPSeq, AddU0, AddP0} = lists:foldl(fun({Id, Revs}, {Seq, UAcc, PAcc}) ->
+            UUId = couch_uuids:new(),
+            NewUAcc = [{UUId, {Id, Revs}} | UAcc],
+            NewPAcc = [{Seq+1, UUId} | PAcc],
+            {Seq+1, NewUAcc, NewPAcc}
+        end, {PSeq-1, [], []}, PurgedIdRevs),
+        AddU = lists:reverse(AddU0),
+        AddP = lists:reverse(AddP0),
+
+        {ok, UPTree0} = couch_btree:open(nil, Fd, [
+            {reduce, fun ?MODULE:purge_tree_reduce/2}
+        ]),
+        {ok, UPTree} = couch_btree:add_remove(UPTree0, AddU, []),
+        UPTreeState = couch_btree:get_state(UPTree),
+        {ok, PTree0} = couch_btree:open(nil, Fd, [
+            {reduce, fun ?MODULE:purge_tree_reduce/2}
+        ]),
+        {ok, PTree} = couch_btree:add_remove(PTree0, AddP, []),
+        PTreeState = couch_btree:get_state(PTree),
+        couch_bt_engine:set(Header, [
+            {purge_seq, NPSeq},
+            {purge_tree_state, PTreeState},
+            {upurge_tree_state, UPTreeState}
+        ])
+    end.
+
+
 delete_compaction_files(FilePath) ->
     RootDir = config:get("couchdb", "database_dir", "."),
     DelOpts = [{context, delete}],
@@ -898,34 +1069,38 @@ drop_reductions(_, _, _, Acc) ->
     {ok, Acc}.
 
 
+drop_reductions2(visit, {PurgeSeq, UUID}, _Reds, {UserFun, UserAcc}) ->
+    {Go, NewUserAcc} = UserFun(PurgeSeq, UUID, UserAcc),
+    {Go, {UserFun, NewUserAcc}};
+drop_reductions2(_, _, _, Acc) ->
+    {ok, Acc}.
+
+
 fold_docs_reduce_to_count(Reds) ->
     RedFun = fun id_tree_reduce/2,
     FinalRed = couch_btree:final_reduce(RedFun, Reds),
     element(1, FinalRed).
 
 
-finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) ->
+finish_compaction_int(#st{} = OldSt, #st{} = NewSt) ->
     #st{
         filepath = FilePath,
         local_tree = OldLocal
     } = OldSt,
     #st{
-        filepath = CompactDataPath,
-        header = Header,
-        local_tree = NewLocal1
-    } = NewSt1,
+        filepath = CompactDataPath
+    } = NewSt,
 
     % suck up all the local docs into memory and write them to the new db
-    LoadFun = fun(Value, _Offset, Acc) ->
-        {ok, [Value | Acc]}
-    end,
+    LoadFun = fun(Value, _Offset, Acc) ->  {ok, [Value | Acc]} end,
     {ok, _, LocalDocs} = couch_btree:foldl(OldLocal, LoadFun, []),
-    {ok, NewLocal2} = couch_btree:add(NewLocal1, LocalDocs),
+    {ok, NewLocal2} = couch_btree:add(NewSt#st.local_tree, LocalDocs),
 
-    {ok, NewSt2} = commit_data(NewSt1#st{
-        header = couch_bt_engine_header:set(Header, [
+    {ok, NewSt2} = commit_data(NewSt#st{
+        header = couch_bt_engine_header:set(NewSt#st.header, [
             {compacted_seq, get_update_seq(OldSt)},
-            {revs_limit, get_revs_limit(OldSt)}
+            {revs_limit, get_revs_limit(OldSt)},
+            {purged_docs_limit, get_purged_docs_limit(OldSt)}
         ]),
         local_tree = NewLocal2
     }),

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/7a280947/src/couch_bt_engine.hrl
----------------------------------------------------------------------
diff --git a/src/couch_bt_engine.hrl b/src/couch_bt_engine.hrl
index 7f52d8f..ee3d566 100644
--- a/src/couch_bt_engine.hrl
+++ b/src/couch_bt_engine.hrl
@@ -20,5 +20,7 @@
     id_tree,
     seq_tree,
     local_tree,
-    compression
+    compression,
+    purge_tree,
+    upurge_tree
 }).

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/7a280947/src/couch_bt_engine_compactor.erl
----------------------------------------------------------------------
diff --git a/src/couch_bt_engine_compactor.erl b/src/couch_bt_engine_compactor.erl
index 7f3b5d7..7e2c8a4 100644
--- a/src/couch_bt_engine_compactor.erl
+++ b/src/couch_bt_engine_compactor.erl
@@ -54,13 +54,13 @@ start(#st{} = St, DbName, Options, Parent) ->
     % and hope everything works out for the best.
     unlink(DFd),
 
-    NewSt1 = copy_purge_info(St, NewSt),
-    NewSt2 = copy_compact(DbName, St, NewSt1, Retry),
+    NewSt2 = copy_compact(DbName, St, NewSt, Retry),
     NewSt3 = sort_meta_data(NewSt2),
     NewSt4 = commit_compaction_data(NewSt3),
     NewSt5 = copy_meta_data(NewSt4),
-    {ok, NewSt6} = couch_bt_engine:commit_data(NewSt5),
-    ok = couch_bt_engine:decref(NewSt6),
+    NewSt6 = copy_purge_info(St, NewSt5, Parent),
+    {ok, NewSt7} = couch_bt_engine:commit_data(NewSt6),
+    ok = couch_bt_engine:decref(NewSt7),
     ok = couch_file:close(MFd),
 
     % Done
@@ -72,7 +72,9 @@ open_compaction_files(SrcHdr, DbFilePath, Options) ->
     MetaFile = DbFilePath ++ ".compact.meta",
     {ok, DataFd, DataHdr} = open_compaction_file(DataFile),
     {ok, MetaFd, MetaHdr} = open_compaction_file(MetaFile),
+
     DataHdrIsDbHdr = couch_bt_engine_header:is_header(DataHdr),
+
     case {DataHdr, MetaHdr} of
         {#comp_header{}=A, #comp_header{}=A} ->
             DbHeader = A#comp_header.db_header,
@@ -88,7 +90,9 @@ open_compaction_files(SrcHdr, DbFilePath, Options) ->
             St1 = bind_emsort(St0, MetaFd, nil),
             {ok, St1, DataFile, DataFd, MetaFd, St0#st.id_tree};
         _ ->
-            Header = couch_bt_engine_header:from(SrcHdr),
+            Header0 = couch_bt_engine_header:from(SrcHdr),
+            % set purge_seq of NewSt to -1 on the 1st round of compaction
+            Header = couch_bt_engine_header:set(Header0, [{purge_seq, -1}]),
             ok = reset_compaction_file(DataFd, Header),
             ok = reset_compaction_file(MetaFd, Header),
             St0 = couch_bt_engine:init_state(DataFile, DataFd, Header, Options),
@@ -97,25 +101,6 @@ open_compaction_files(SrcHdr, DbFilePath, Options) ->
     end.
 
 
-copy_purge_info(OldSt, NewSt) ->
-    OldHdr = OldSt#st.header,
-    NewHdr = NewSt#st.header,
-    OldPurgeSeq = couch_bt_engine_header:purge_seq(OldHdr),
-    case OldPurgeSeq > 0 of
-        true ->
-            Purged = couch_bt_engine:get_last_purged(OldSt),
-            Opts = [{compression, NewSt#st.compression}],
-            {ok, Ptr, _} = couch_file:append_term(NewSt#st.fd, Purged, Opts),
-            NewNewHdr = couch_bt_engine_header:set(NewHdr, [
-                {purge_seq, OldPurgeSeq},
-                {purged_docs, Ptr}
-            ]),
-            NewSt#st{header = NewNewHdr};
-        false ->
-            NewSt
-    end.
-
-
 copy_compact(DbName, St, NewSt0, Retry) ->
     Compression = couch_compress:get_compression_method(),
     NewSt = NewSt0#st{compression = Compression},
@@ -316,6 +301,99 @@ copy_doc_attachments(#st{} = SrcSt, SrcSp, DstSt) ->
     {BodyData, NewBinInfos}.
 
 
+copy_purge_info(OldSt, NewSt, Parent) ->
+    OldPSeq = couch_bt_engine:get(OldSt, purge_seq),
+    NewPSeq = couch_bt_engine:get(NewSt, purge_seq),
+    % During 1st round of compaction, we copy all purges from OldSt to NewSt
+    %   respecting purged_docs_limit. During recompaction rounds, we copy
+    %   purges occurred during compact and remove from db's ID and Seq trees
+    %   completely purged Docs
+    % As we do diff copy_purge operations on the 1st and subsequent rounds of
+    % compaction, we need NewPSeq==-1 indicate the 1st round of compaction.
+    % We can't use NewPSeq==0 to indicate 1st round, as it will be wrong
+    % for the case when compaction started with OldPSeq=0 and NewPSeq=0
+    % and during compaction there were purge requests, and OldPSeq>0
+    % On recompaction, we need operations for copy_new_purge_info
+    % but since NewPseq is still 0 after 1s round, checking for 0 will lead us
+    % wrongly to copy_purge_info_from_start.
+    case NewPSeq of
+        OldPSeq  -> NewSt; % nothing to copy
+        -1 when OldPSeq > 0 -> copy_purge_info_from_start(OldSt, NewSt, Parent);
+        -1 when OldPSeq == 0  -> % just update the header
+            NewHeader =
+                couch_bt_engine_header:set(NewSt#st.header, [{purge_seq, 0}]),
+            NewSt#st{header=NewHeader};
+        _ -> copy_new_purge_info(OldSt, NewSt)
+    end.
+
+
+copy_purge_info_from_start(OldSt, NewSt, Parent) ->
+    % purge requests happened up to & including DisposPSeq can be disregarded
+    {ok, DisposPSeq} = gen_server:call(Parent, get_disposable_purge_seq),
+    FoldFun = fun({P, U, Id, Rs}, {AccP, AccU}) ->
+        {[{P, U}|AccP], [{U, {Id, Rs}}|AccU]}
+    end,
+    % copy purge requests from OldSt to NewSt starting AFTER DisposPSeq
+    {ok, {AddP, AddU}} = couch_bt_engine:fold_purged_docs(
+        OldSt, DisposPSeq, FoldFun, {[], []}, []),
+    {ok, NewPTree} = couch_btree:add(NewSt#st.purge_tree, lists:reverse(AddP)),
+    {ok, NewUTree} = couch_btree:add(NewSt#st.upurge_tree, lists:reverse(AddU)),
+    NewHeader = couch_bt_engine_header:set(NewSt#st.header,
+        [{purge_seq, couch_bt_engine:get(OldSt, purge_seq)}]
+    ),
+    NewSt#st{purge_tree = NewPTree, upurge_tree = NewUTree, header = NewHeader}.
+
+
+copy_new_purge_info(OldSt, NewSt) ->
+    % collect purges since NewPSeq
+    FoldFun = fun({P, U, Id, Rs}, {AccP, AccU, AccI, AccR}) ->
+        {[{P,U}|AccP], [{U,{Id, Rs}}|AccU], [Id|AccI], [{Id, Rs}|AccR]}
+    end,
+    NewPSeq = couch_bt_engine:get(NewSt, purge_seq),
+    InitAcc = {[], [], [], []},
+    {ok, {AddP, AddU, DocIds, PIdsRevs}} =
+        couch_bt_engine:fold_purged_docs(OldSt, NewPSeq, FoldFun, InitAcc, []),
+    {ok, NewPTree} = couch_btree:add(NewSt#st.purge_tree, lists:reverse(AddP)),
+    {ok, NewUTree} = couch_btree:add(NewSt#st.upurge_tree, lists:reverse(AddU)),
+
+    % Since purging a document will change the update_seq,
+    % finish_compaction will restart compaction in order to process
+    % the new updates, which takes care of handling partially
+    % purged documents.
+    % collect only Ids and Seqs of docs that were completely purged
+    OldDocInfos = couch_bt_engine:open_docs(NewSt, DocIds),
+    FoldFun2 = fun({OldFDI, {Id,Revs}}, {Acc1, Acc2}) ->
+        #full_doc_info{rev_tree=Tree, update_seq=UpSeq} = OldFDI,
+        case couch_key_tree:remove_leafs(Tree, Revs) of
+            {[]= _NewTree, _} ->
+                {[Id| Acc1], [UpSeq| Acc2]};
+            _ ->
+                {Acc1, Acc2}
+        end
+    end,
+    {RemIds, RemSeqs} =
+        lists:foldl(FoldFun2, {[], []}, lists:zip(OldDocInfos, PIdsRevs)),
+    % remove from NewSt docs that were completely purged
+    {NewIdTree, NewSeqTree} = case {RemIds, RemSeqs} of
+        {[], []} ->
+            {NewSt#st.id_tree, NewSt#st.seq_tree};
+        _ ->
+            {ok, NewIT} = couch_btree:add_remove(NewSt#st.id_tree, [], RemIds),
+            {ok, NewST} = couch_btree:add_remove(NewSt#st.seq_tree, [], RemSeqs),
+            {NewIT, NewST}
+    end,
+    NewHeader = couch_bt_engine_header:set(NewSt#st.header,
+        [{purge_seq, couch_bt_engine:get(OldSt, purge_seq)}]
+    ),
+    NewSt#st{
+        header = NewHeader,
+        id_tree = NewIdTree,
+        seq_tree = NewSeqTree,
+        purge_tree = NewPTree,
+        upurge_tree = NewUTree
+    }.
+
+
 sort_meta_data(St0) ->
     {ok, Ems} = couch_emsort:merge(St0#st.id_tree),
     St0#st{id_tree=Ems}.

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/7a280947/src/couch_bt_engine_header.erl
----------------------------------------------------------------------
diff --git a/src/couch_bt_engine_header.erl b/src/couch_bt_engine_header.erl
index 3d24f31..cfe76e1 100644
--- a/src/couch_bt_engine_header.erl
+++ b/src/couch_bt_engine_header.erl
@@ -31,8 +31,10 @@
     seq_tree_state/1,
     latest/1,
     local_tree_state/1,
+    purge_tree_state/1,
+    upurge_tree_state/1,
     purge_seq/1,
-    purged_docs/1,
+    purged_docs_limit/1,
     security_ptr/1,
     revs_limit/1,
     uuid/1,
@@ -61,12 +63,14 @@
     seq_tree_state = nil,
     local_tree_state = nil,
     purge_seq = 0,
-    purged_docs = nil,
+    purge_tree_state = nil, %purge tree: purge_seq -> uuid
     security_ptr = nil,
     revs_limit = 1000,
     uuid,
     epochs,
-    compacted_seq
+    compacted_seq,
+    upurge_tree_state = nil, %purge tree: uuid -> {docid, revs}
+    purged_docs_limit = 1000
 }).
 
 
@@ -150,12 +154,20 @@ local_tree_state(Header) ->
     get_field(Header, local_tree_state).
 
 
+purge_tree_state(Header) ->
+    get_field(Header, purge_tree_state).
+
+
+upurge_tree_state(Header) ->
+    get_field(Header, upurge_tree_state).
+
+
 purge_seq(Header) ->
     get_field(Header, purge_seq).
 
 
-purged_docs(Header) ->
-    get_field(Header, purged_docs).
+purged_docs_limit(Header) ->
+    get_field(Header, purged_docs_limit).
 
 
 security_ptr(Header) ->
@@ -303,6 +315,7 @@ upgrade_compacted_seq(#db_header{}=Header) ->
             Header
     end.
 
+
 latest(?LATEST_DISK_VERSION) ->
     true;
 latest(N) when is_integer(N), N < ?LATEST_DISK_VERSION ->
@@ -323,7 +336,7 @@ mk_header(Vsn) ->
         bar, % seq_tree_state
         bam, % local_tree_state
         1, % purge_seq
-        baz, % purged_docs
+        baz, % purge_info
         bang, % security_ptr
         999 % revs_limit
     }.
@@ -343,7 +356,7 @@ upgrade_v3_test() ->
     ?assertEqual(bar, seq_tree_state(NewHeader)),
     ?assertEqual(bam, local_tree_state(NewHeader)),
     ?assertEqual(1, purge_seq(NewHeader)),
-    ?assertEqual(baz, purged_docs(NewHeader)),
+    ?assertEqual(baz, purge_tree_state(NewHeader)),
     ?assertEqual(bang, security_ptr(NewHeader)),
     ?assertEqual(999, revs_limit(NewHeader)),
     ?assertEqual(undefined, uuid(NewHeader)),

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/7a280947/src/couch_db.erl
----------------------------------------------------------------------
diff --git a/src/couch_db.erl b/src/couch_db.erl
index ca63a40..fad9d76 100644
--- a/src/couch_db.erl
+++ b/src/couch_db.erl
@@ -44,18 +44,21 @@
     get_filepath/1,
     get_instance_start_time/1,
     get_last_purged/1,
+    get_oldest_purge_seq/1,
     get_pid/1,
+    get_purge_seq/1,
+    get_purged_docs_limit/1,
     get_revs_limit/1,
     get_security/1,
     get_update_seq/1,
     get_user_ctx/1,
     get_uuid/1,
-    get_purge_seq/1,
 
     is_db/1,
     is_system_db/1,
     is_clustered/1,
 
+    set_purged_docs_limit/2,
     set_revs_limit/2,
     set_security/2,
     set_user_ctx/2,
@@ -69,6 +72,7 @@
     open_doc/2,
     open_doc/3,
     open_doc_revs/4,
+    open_purged_docs/2,
     open_doc_int/3,
     get_doc_info/2,
     get_full_doc_info/2,
@@ -82,7 +86,6 @@
     update_docs/2,
     update_docs/3,
     delete_doc/3,
-
     purge_docs/2,
 
     with_stream/3,
@@ -97,6 +100,8 @@
     fold_changes/4,
     fold_changes/5,
     count_changes_since/2,
+    fold_purged_docs/4,
+    fold_purged_docs/5,
 
     calculate_start_seq/3,
     owner_of/2,
@@ -363,6 +368,8 @@ find_missing([{Id, Revs}|RestIdRevs], [FullInfo | RestLookupInfo])
 find_missing([{Id, Revs}|RestIdRevs], [not_found | RestLookupInfo]) ->
     [{Id, Revs, []} | find_missing(RestIdRevs, RestLookupInfo)].
 
+
+%   returns {ok, DocInfo} or not_found
 get_doc_info(Db, Id) ->
     case get_full_doc_info(Db, Id) of
     #full_doc_info{} = FDI ->
@@ -371,7 +378,7 @@ get_doc_info(Db, Id) ->
         Else
     end.
 
-%   returns {ok, DocInfo} or not_found
+
 get_full_doc_info(Db, Id) ->
     [Result] = get_full_doc_infos(Db, [Id]),
     Result.
@@ -379,8 +386,52 @@ get_full_doc_info(Db, Id) ->
 get_full_doc_infos(Db, Ids) ->
     couch_db_engine:open_docs(Db, Ids).
 
-purge_docs(#db{main_pid=Pid}, IdsRevs) ->
-    gen_server:call(Pid, {purge_docs, IdsRevs}).
+
+purge_docs(#db{main_pid=Pid}=Db, UUIdsIdsRevs) ->
+    purge_docs(#db{main_pid=Pid}=Db, UUIdsIdsRevs, interactive_edit).
+
+-spec purge_docs(#db{}, [{UUId, Id, [Rev]}], interactive_edit) ->
+        {ok, {PurgeSeq, [Reply]}} when
+    UUId     :: binary(),
+    Id       :: binary(),
+    Rev      :: {non_neg_integer(), binary()},
+    PurgeSeq :: non_neg_integer(),
+    Reply    :: {ok, []}
+              | {ok, [Rev]}.
+purge_docs(#db{main_pid=Pid}, UUIdsIdsRevs, interactive_edit) ->
+    gen_server:call(Pid, {purge_docs, UUIdsIdsRevs});
+
+purge_docs(#db{main_pid=Pid}=Db, UUIdsIdsRevs0, replicated_changes) ->
+    % filter out purge requests that have been already applied:
+    % their UUIDs exist in upurge_tree
+    UUIDs = [UUID || {UUID, _Id, _Revs} <- UUIdsIdsRevs0],
+    Results = open_purged_docs(Db, UUIDs),
+    UUIdsIdsRevs = lists:foldr(fun(
+        {not_found, UUIdIdrevs}, Acc0) -> [UUIdIdrevs|Acc0];
+        ({_, _}, Acc0) -> Acc0
+    end, [], lists:zip(Results, UUIdsIdsRevs0)),
+    case UUIdsIdsRevs of
+        [] -> {ok, []};
+        _ -> gen_server:call(Pid, {purge_docs, UUIdsIdsRevs})
+    end.
+
+
+-spec open_purged_docs(#db{}, [UUId]) -> [PurgedReq] when
+    UUId        :: binary(),
+    PurgedReq   :: {Id, [Rev]}
+                 | not_found,
+    Id          :: binary(),
+    Rev         :: {non_neg_integer(), binary()}.
+open_purged_docs(Db, UUIDs) ->
+    couch_db_engine:open_purged_docs(Db, UUIDs).
+
+
+set_purged_docs_limit(#db{main_pid=Pid}=Db, Limit)
+        when is_integer(Limit), Limit > 0 ->
+    check_is_admin(Db),
+    gen_server:call(Pid, {set_purged_docs_limit, Limit}, infinity);
+set_purged_docs_limit(_Db, _Limit) ->
+    throw(invalid_purged_docs_limit).
 
 get_after_doc_read_fun(#db{after_doc_read = Fun}) ->
     Fun.
@@ -400,10 +451,13 @@ get_user_ctx(?OLD_DB_REC = Db) ->
     ?OLD_DB_USER_CTX(Db).
 
 get_purge_seq(#db{}=Db) ->
-    {ok, couch_db_engine:get_purge_seq(Db)}.
+    couch_db_engine:get_purge_seq(Db).
 
-get_last_purged(#db{}=Db) ->
-    {ok, couch_db_engine:get_last_purged(Db)}.
+get_oldest_purge_seq(#db{}=Db) ->
+    {ok, couch_db_engine:get_oldest_purge_seq(Db)}.
+
+get_purged_docs_limit(#db{}=Db) ->
+    couch_db_engine:get_purged_docs_limit(Db).
 
 get_pid(#db{main_pid = Pid}) ->
     Pid.
@@ -995,6 +1049,7 @@ doc_tag(#doc{meta=Meta}) ->
         Else -> throw({invalid_doc_tag, Else})
     end.
 
+
 update_docs(Db, Docs0, Options, replicated_changes) ->
     increment_stat(Db, [couchdb, database_writes]),
     Docs = tag_docs(Docs0),
@@ -1075,7 +1130,6 @@ update_docs(Db, Docs0, Options, interactive_edit) ->
                         check_dup_atts(Doc)))
                 || Doc <- B] || B <- DocBuckets2],
         {DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),
-
         {ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2),
 
         ResultsDict = lists:foldl(fun({Key, Resp}, ResultsAcc) ->
@@ -1086,6 +1140,7 @@ update_docs(Db, Docs0, Options, interactive_edit) ->
         end, Docs)}
     end.
 
+
 % Returns the first available document on disk. Input list is a full rev path
 % for the doc.
 make_first_doc_on_disk(_Db, _Id, _Pos, []) ->
@@ -1397,6 +1452,14 @@ fold_docs(Db, UserFun, UserAcc, Options) ->
     couch_db_engine:fold_docs(Db, UserFun, UserAcc, Options).
 
 
+fold_purged_docs(Db, StartPurgeSeq, Fun, Acc) ->
+    fold_purged_docs(Db, StartPurgeSeq, Fun, Acc, []).
+
+
+fold_purged_docs(Db, StartPurgeSeq, UFun, UAcc, Opts) ->
+    couch_db_engine:fold_purged_docs(Db, StartPurgeSeq, UFun, UAcc, Opts).
+
+
 fold_local_docs(Db, UserFun, UserAcc, Options) ->
     couch_db_engine:fold_local_docs(Db, UserFun, UserAcc, Options).
 

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/7a280947/src/couch_db_engine.erl
----------------------------------------------------------------------
diff --git a/src/couch_db_engine.erl b/src/couch_db_engine.erl
index 045e75c..bdf2313 100644
--- a/src/couch_db_engine.erl
+++ b/src/couch_db_engine.erl
@@ -59,6 +59,10 @@
         % Need to enumerate these
     ].
 
+-type purge_fold_options() :: [
+        % Need to enumerate these
+    ].
+
 -type db_handle() :: any().
 
 -type doc_fold_fun() :: fun((#full_doc_info{}, UserAcc::any()) ->
@@ -73,6 +77,10 @@
         {ok, NewUserAcc::any()} |
         {stop, NewUserAcc::any()}).
 
+-type purge_fold_fun() :: fun((
+    {PurgeSeq::non_neg_integer(), UUID:: binary(), Id::docid(), Revs::revs()},
+    UserAcc::any()) -> NewUserAcc::any()).
+
 
 % This is called by couch_server to determine which
 % engine should be used for the given database. DbPath
@@ -329,31 +337,20 @@
 % #full_doc_info{} records. The first element of the pair is
 % the #full_doc_info{} that exists on disk. The second element
 % is the new version that should be written to disk. There are
-% three basic cases that should be followed:
+% two basic cases that should be considered:
 %
 %     1. {not_found, #full_doc_info{}} - A new document was created
 %     2. {#full_doc_info{}, #full_doc_info{}} - A document was updated
-%     3. {#full_doc_info{}, not_found} - A document was purged completely
 %
-% Number one and two are fairly straight forward as long as proper
-% accounting for moving entries in the udpate sequence are accounted
-% for. However, case 3 you'll notice is "purged completely" which
-% means it needs to be removed from the database including the
-% update sequence. Also, for engines that are not using append
-% only storage like the legacy engine, case 2 can be the result of
-% a purge so special care will be needed to see which revisions
-% should be removed.
+% The cases are fairly straight forward as long as proper
+% accounting for moving entries in the update sequence are accounted
+% for.
 %
 % The LocalDocs variable is applied separately. Its important to
 % note for new storage engine authors that these documents are
 % separate because they should *not* be included as part of the
 % changes index for the database.
 %
-% The PurgedDocIdRevs is the list of Ids and Revisions that were
-% purged during this update. While its not guaranteed by the API,
-% currently there will never be purge changes comingled with
-% standard updates.
-%
 % Traditionally an invocation of write_doc_infos should be all
 % or nothing in so much that if an error occurs (or the VM dies)
 % then the database doesn't retain any of the changes. However
@@ -364,8 +361,36 @@
 -callback write_doc_infos(
     DbHandle::db_handle(),
     Pairs::doc_pairs(),
-    LocalDocs::[#doc{}],
-    PurgedDocIdRevs::[{docid(), revs()}]) ->
+    LocalDocs::[#doc{}]) ->
+        {ok, NewDbHandle::db_handle()}.
+
+
+% This function is called from the context of couch_db_updater
+% and as such is guaranteed single threaded for the given
+% DbHandle.
+%
+% The Pairs argument is a list of pairs (2-tuples) of
+% #full_doc_info{} records.
+% The first element of the pair is the #full_doc_info{} that exists
+% on disk. The second element is the new version that should be written
+% to disk. There are two basic cases that should be considered:
+%
+%     1. {#full_doc_info{}, #full_doc_info{}} - A document was partially purged
+%     2. {#full_doc_info{}, not_found} - A document was completely purged
+%
+% In case 1, non-tail-append engines may have to remove revisions
+% specifically rather than rely on compaction to remove them.
+%
+% In case 2 you'll notice is "purged completely" which
+% means it needs to be removed from the database including the
+% update sequence.
+%
+% The Purges argument is a list of 3-tuples, representing a purge request.
+% Each tuple consists of the purge UUId, DocId and Revisions, that were purged.
+-callback purge_doc_revs(
+    DbHandle::db_handle(),
+    Pairs::doc_pairs(),
+    Purges::[{binary(), docid(), revs()}]) ->
         {ok, NewDbHandle::db_handle()}.
 
 
@@ -424,16 +449,16 @@
 %
 %     1. start_key - Start iteration at the provided key or
 %        or just after if the key doesn't exist
-%     2. end_key - Stop iteration prior to visiting the provided
+%     2. end_key_gt - Stop iteration prior to visiting the provided
 %        key
-%     3. end_key_gt - Stop iteration just after the provided key
+%     3. end_key - Stop iteration just after the provided key
 %     4. dir - The atom fwd or rev. This is to be able to iterate
 %        over documents in reverse order. The logic for comparing
 %        start_key, end_key, and end_key_gt are then reversed (ie,
 %        when rev, start_key should be greater than end_key if the
 %        user wishes to see results)
 %     5. include_reductions - This is a hack for _all_docs since
-%        it currently relies on reductiosn to count an offset. This
+%        it currently relies on reductions to count an offset. This
 %        is a terrible hack that will need to be addressed by the
 %        API in the future. If this option is present the supplied
 %        user function expects three arguments, where the first
@@ -480,12 +505,12 @@
 % This function is called to fold over the documents (not local
 % documents) in order of their most recent update. Each document
 % in the database should have exactly one entry in this sequence.
-% If a document is updated during a call to this funciton it should
+% If a document is updated during a call to this function it should
 % not be included twice as that will probably lead to Very Bad Things.
 %
 % This should behave similarly to fold_docs/4 in that the supplied
 % user function should be invoked with a #full_doc_info{} record
-% as the first arugment and the current user accumulator as the
+% as the first argument and the current user accumulator as the
 % second argument. The same semantics for the return value from the
 % user function should be handled as in fold_docs/4.
 %
@@ -506,6 +531,21 @@
 
 % This function may be called by many processes concurrently.
 %
+% This function is called to fold over purged requests in order of
+% their oldest purge (increasing purge_seq order)
+%
+% The StartPurgeSeq parameter indicates where the fold should start *after*.
+-callback fold_purged_docs(
+    DbHandle::db_handle(),
+    StartPurgeSeq::non_neg_integer(),
+    UserFold::purge_fold_fun(),
+    UserAcc::any(),
+    purge_fold_options()) ->
+        {ok, LastUserAcc::any()}.
+
+
+% This function may be called by many processes concurrently.
+%
 % This function is called to count the number of documents changed
 % since they given UpdateSeq (ie, not including the possible change
 % at exactly UpdateSeq). It is currently only used internally to
@@ -596,11 +636,13 @@
 
     open_docs/2,
     open_local_docs/2,
+    open_purged_docs/2,
     read_doc_body/2,
 
     serialize_doc/2,
     write_doc_body/2,
-    write_doc_infos/4,
+    write_doc_infos/3,
+    purge_doc_revs/3,
     commit_data/1,
 
     open_write_stream/2,
@@ -610,6 +652,7 @@
     fold_docs/4,
     fold_local_docs/4,
     fold_changes/5,
+    fold_purged_docs/5,
     count_changes_since/2,
 
     start_compaction/1,
@@ -774,6 +817,11 @@ open_local_docs(#db{} = Db, DocIds) ->
     Engine:open_local_docs(EngineState, DocIds).
 
 
+open_purged_docs(#db{} = Db, UUIDs) ->
+    #db{engine = {Engine, EngineState}} = Db,
+    Engine:open_purged_docs(EngineState, UUIDs).
+
+
 read_doc_body(#db{} = Db, RawDoc) ->
     #db{engine = {Engine, EngineState}} = Db,
     Engine:read_doc_body(EngineState, RawDoc).
@@ -789,10 +837,17 @@ write_doc_body(#db{} = Db, #doc{} = Doc) ->
     Engine:write_doc_body(EngineState, Doc).
 
 
-write_doc_infos(#db{} = Db, DocUpdates, LocalDocs, PurgedDocIdRevs) ->
+write_doc_infos(#db{} = Db, DocUpdates, LocalDocs) ->
     #db{engine = {Engine, EngineState}} = Db,
     {ok, NewSt} = Engine:write_doc_infos(
-            EngineState, DocUpdates, LocalDocs, PurgedDocIdRevs),
+            EngineState, DocUpdates, LocalDocs),
+    {ok, Db#db{engine = {Engine, NewSt}}}.
+
+
+purge_doc_revs(#db{} = Db, DocUpdates, Purges) ->
+    #db{engine = {Engine, EngineState}} = Db,
+    {ok, NewSt} = Engine:purge_doc_revs(
+        EngineState, DocUpdates, Purges),
     {ok, Db#db{engine = {Engine, NewSt}}}.
 
 
@@ -832,6 +887,11 @@ fold_changes(#db{} = Db, StartSeq, UserFun, UserAcc, Options) ->
     Engine:fold_changes(EngineState, StartSeq, UserFun, UserAcc, Options).
 
 
+fold_purged_docs(#db{} = Db, StartPurgeSeq, UserFun, UserAcc, Options) ->
+    #db{engine = {Engine, EngineState}} = Db,
+    Engine:fold_purged_docs(EngineState, StartPurgeSeq, UserFun, UserAcc, Options).
+
+
 count_changes_since(#db{} = Db, StartSeq) ->
     #db{engine = {Engine, EngineState}} = Db,
     Engine:count_changes_since(EngineState, StartSeq).

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/7a280947/src/couch_db_header.erl
----------------------------------------------------------------------
diff --git a/src/couch_db_header.erl b/src/couch_db_header.erl
deleted file mode 100644
index 355364f..0000000
--- a/src/couch_db_header.erl
+++ /dev/null
@@ -1,405 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_db_header).
-
-
--export([
-    new/0,
-    from/1,
-    is_header/1,
-    upgrade/1,
-    set/2
-]).
-
--export([
-    disk_version/1,
-    update_seq/1,
-    id_tree_state/1,
-    seq_tree_state/1,
-    latest/1,
-    local_tree_state/1,
-    purge_seq/1,
-    purged_docs/1,
-    security_ptr/1,
-    revs_limit/1,
-    uuid/1,
-    epochs/1,
-    compacted_seq/1
-]).
-
-
-% This should be updated anytime a header change happens that requires more
-% than filling in new defaults.
-%
-% As long the changes are limited to new header fields (with inline
-% defaults) added to the end of the record, then there is no need to increment
-% the disk revision number.
-%
-% if the disk revision is incremented, then new upgrade logic will need to be
-% added to couch_db_updater:init_db.
-
--define(LATEST_DISK_VERSION, 6).
-
--record(db_header, {
-    disk_version = ?LATEST_DISK_VERSION,
-    update_seq = 0,
-    unused = 0,
-    id_tree_state = nil,
-    seq_tree_state = nil,
-    local_tree_state = nil,
-    purge_seq = 0,
-    purged_docs = nil,
-    security_ptr = nil,
-    revs_limit = 1000,
-    uuid,
-    epochs,
-    compacted_seq
-}).
-
-
-new() ->
-    #db_header{
-        uuid = couch_uuids:random(),
-        epochs = [{node(), 0}]
-    }.
-
-
-from(Header0) ->
-    Header = upgrade(Header0),
-    #db_header{
-        uuid = Header#db_header.uuid,
-        epochs = Header#db_header.epochs,
-        compacted_seq = Header#db_header.compacted_seq
-    }.
-
-
-is_header(Header) ->
-    try
-        upgrade(Header),
-        true
-    catch _:_ ->
-        false
-    end.
-
-
-upgrade(Header) ->
-    Funs = [
-        fun upgrade_tuple/1,
-        fun upgrade_disk_version/1,
-        fun upgrade_uuid/1,
-        fun upgrade_epochs/1,
-        fun upgrade_compacted_seq/1
-    ],
-    lists:foldl(fun(F, HdrAcc) ->
-        F(HdrAcc)
-    end, Header, Funs).
-
-
-set(Header0, Fields) ->
-    % A subtlety here is that if a database was open during
-    % the release upgrade that updates to uuids and epochs then
-    % this dynamic upgrade also assigns a uuid and epoch.
-    Header = upgrade(Header0),
-    lists:foldl(fun({Field, Value}, HdrAcc) ->
-        set_field(HdrAcc, Field, Value)
-    end, Header, Fields).
-
-
-disk_version(Header) ->
-    get_field(Header, disk_version).
-
-
-update_seq(Header) ->
-    get_field(Header, update_seq).
-
-
-id_tree_state(Header) ->
-    get_field(Header, id_tree_state).
-
-
-seq_tree_state(Header) ->
-    get_field(Header, seq_tree_state).
-
-
-local_tree_state(Header) ->
-    get_field(Header, local_tree_state).
-
-
-purge_seq(Header) ->
-    get_field(Header, purge_seq).
-
-
-purged_docs(Header) ->
-    get_field(Header, purged_docs).
-
-
-security_ptr(Header) ->
-    get_field(Header, security_ptr).
-
-
-revs_limit(Header) ->
-    get_field(Header, revs_limit).
-
-
-uuid(Header) ->
-    get_field(Header, uuid).
-
-
-epochs(Header) ->
-    get_field(Header, epochs).
-
-
-compacted_seq(Header) ->
-    get_field(Header, compacted_seq).
-
-
-get_field(Header, Field) ->
-    Idx = index(Field),
-    case Idx > tuple_size(Header) of
-        true -> undefined;
-        false -> element(index(Field), Header)
-    end.
-
-
-set_field(Header, Field, Value) ->
-    setelement(index(Field), Header, Value).
-
-
-index(Field) ->
-    couch_util:get_value(Field, indexes()).
-
-
-indexes() ->
-    Fields = record_info(fields, db_header),
-    Indexes = lists:seq(2, record_info(size, db_header)),
-    lists:zip(Fields, Indexes).
-
-
-upgrade_tuple(Old) when is_record(Old, db_header) ->
-    Old;
-upgrade_tuple(Old) when is_tuple(Old) ->
-    NewSize = record_info(size, db_header),
-    if tuple_size(Old) < NewSize -> ok; true ->
-        erlang:error({invalid_header_size, Old})
-    end,
-    {_, New} = lists:foldl(fun(Val, {Idx, Hdr}) ->
-        {Idx+1, setelement(Idx, Hdr, Val)}
-    end, {1, #db_header{}}, tuple_to_list(Old)),
-    if is_record(New, db_header) -> ok; true ->
-        erlang:error({invalid_header_extension, {Old, New}})
-    end,
-    New.
-
--define(OLD_DISK_VERSION_ERROR,
-    "Database files from versions smaller than 0.10.0 are no longer supported").
-
-upgrade_disk_version(#db_header{}=Header) ->
-    case element(2, Header) of
-        1 -> throw({database_disk_version_error, ?OLD_DISK_VERSION_ERROR});
-        2 -> throw({database_disk_version_error, ?OLD_DISK_VERSION_ERROR});
-        3 -> throw({database_disk_version_error, ?OLD_DISK_VERSION_ERROR});
-        4 -> Header#db_header{security_ptr = nil}; % [0.10 - 0.11)
-        5 -> Header; % pre 1.2
-        ?LATEST_DISK_VERSION -> Header;
-        _ ->
-            Reason = "Incorrect disk header version",
-            throw({database_disk_version_error, Reason})
-    end.
-
-
-upgrade_uuid(#db_header{}=Header) ->
-    case Header#db_header.uuid of
-        undefined ->
-            % Upgrading this old db file to a newer
-            % on disk format that includes a UUID.
-            Header#db_header{uuid=couch_uuids:random()};
-        _ ->
-            Header
-    end.
-
-
-upgrade_epochs(#db_header{}=Header) ->
-    NewEpochs = case Header#db_header.epochs of
-        undefined ->
-            % This node is taking over ownership of shard with
-            % and old version of couch file. Before epochs there
-            % was always an implicit assumption that a file was
-            % owned since eternity by the node it was on. This
-            % just codifies that assumption.
-            [{node(), 0}];
-        [{Node, _} | _] = Epochs0 when Node == node() ->
-            % Current node is the current owner of this db
-            Epochs0;
-        Epochs1 ->
-            % This node is taking over ownership of this db
-            % and marking the update sequence where it happened.
-            [{node(), Header#db_header.update_seq} | Epochs1]
-    end,
-    % Its possible for a node to open a db and claim
-    % ownership but never make a write to the db. This
-    % removes nodes that claimed ownership but never
-    % changed the database.
-    DedupedEpochs = remove_dup_epochs(NewEpochs),
-    Header#db_header{epochs=DedupedEpochs}.
-
-
-% This is slightly relying on the udpate_seq's being sorted
-% in epochs due to how we only ever push things onto the
-% front. Although if we ever had a case where the update_seq
-% is not monotonically increasing I don't know that we'd
-% want to remove dupes (by calling a sort on the input to this
-% function). So for now we don't sort but are relying on the
-% idea that epochs is always sorted.
-remove_dup_epochs([_]=Epochs) ->
-    Epochs;
-remove_dup_epochs([{N1, S}, {_N2, S}]) ->
-    % Seqs match, keep the most recent owner
-    [{N1, S}];
-remove_dup_epochs([_, _]=Epochs) ->
-    % Seqs don't match.
-    Epochs;
-remove_dup_epochs([{N1, S}, {_N2, S} | Rest]) ->
-    % Seqs match, keep the most recent owner
-    remove_dup_epochs([{N1, S} | Rest]);
-remove_dup_epochs([{N1, S1}, {N2, S2} | Rest]) ->
-    % Seqs don't match, recurse to check others
-    [{N1, S1} | remove_dup_epochs([{N2, S2} | Rest])].
-
-
-upgrade_compacted_seq(#db_header{}=Header) ->
-    case Header#db_header.compacted_seq of
-        undefined ->
-            Header#db_header{compacted_seq=0};
-        _ ->
-            Header
-    end.
-
-latest(?LATEST_DISK_VERSION) ->
-    true;
-latest(N) when is_integer(N), N < ?LATEST_DISK_VERSION ->
-    false;
-latest(_Else) ->
-    undefined.
-
--ifdef(TEST).
--include_lib("eunit/include/eunit.hrl").
-
-mk_header(Vsn) ->
-    {
-        db_header, % record name
-        Vsn, % disk version
-        100, % update_seq
-        0, % unused
-        foo, % id_tree_state
-        bar, % seq_tree_state
-        bam, % local_tree_state
-        1, % purge_seq
-        baz, % purged_docs
-        bang, % security_ptr
-        999 % revs_limit
-    }.
-
-
-upgrade_v3_test() ->
-    Vsn3Header = mk_header(3),
-    NewHeader = upgrade_tuple(Vsn3Header),
-
-    % Tuple upgrades don't change
-    ?assert(is_record(NewHeader, db_header)),
-    ?assertEqual(3, disk_version(NewHeader)),
-    ?assertEqual(100, update_seq(NewHeader)),
-    ?assertEqual(foo, id_tree_state(NewHeader)),
-    ?assertEqual(bar, seq_tree_state(NewHeader)),
-    ?assertEqual(bam, local_tree_state(NewHeader)),
-    ?assertEqual(1, purge_seq(NewHeader)),
-    ?assertEqual(baz, purged_docs(NewHeader)),
-    ?assertEqual(bang, security_ptr(NewHeader)),
-    ?assertEqual(999, revs_limit(NewHeader)),
-    ?assertEqual(undefined, uuid(NewHeader)),
-    ?assertEqual(undefined, epochs(NewHeader)),
-
-    ?assertThrow({database_disk_version_error, _},
-                 upgrade_disk_version(NewHeader)).
-
-
-upgrade_v5_test() ->
-    Vsn5Header = mk_header(5),
-    NewHeader = upgrade_disk_version(upgrade_tuple(Vsn5Header)),
-
-    ?assert(is_record(NewHeader, db_header)),
-    ?assertEqual(5, disk_version(NewHeader)),
-
-    % Security ptr isn't changed for v5 headers
-    ?assertEqual(bang, security_ptr(NewHeader)).
-
-
-upgrade_uuid_test() ->
-    Vsn5Header = mk_header(5),
-
-    % Upgraded headers get a new UUID
-    NewHeader = upgrade_uuid(upgrade_disk_version(upgrade_tuple(Vsn5Header))),
-    ?assertMatch(<<_:32/binary>>, uuid(NewHeader)),
-
-    % Headers with a UUID don't have their UUID changed
-    NewNewHeader = upgrade_uuid(upgrade_disk_version(upgrade_tuple(NewHeader))),
-    ?assertEqual(uuid(NewHeader), uuid(NewNewHeader)),
-
-    % Derived empty headers maintain the same UUID
-    ResetHeader = from(NewNewHeader),
-    ?assertEqual(uuid(NewHeader), uuid(ResetHeader)).
-
-
-upgrade_epochs_test() ->
-    Vsn5Header = mk_header(5),
-
-    % Upgraded headers get a default epochs set
-    NewHeader = upgrade(Vsn5Header),
-    ?assertEqual([{node(), 0}], epochs(NewHeader)),
-
-    % Fake an old entry in epochs
-    FakeFields = [
-        {update_seq, 20},
-        {epochs, [{'someothernode@someotherhost', 0}]}
-    ],
-    NotOwnedHeader = set(NewHeader, FakeFields),
-
-    OwnedEpochs = [
-        {node(), 20},
-        {'someothernode@someotherhost', 0}
-    ],
-
-    % Upgrading a header not owned by the local node updates
-    % the epochs appropriately.
-    NowOwnedHeader = upgrade(NotOwnedHeader),
-    ?assertEqual(OwnedEpochs, epochs(NowOwnedHeader)),
-
-    % Headers with epochs stay the same after upgrades
-    NewNewHeader = upgrade(NowOwnedHeader),
-    ?assertEqual(OwnedEpochs, epochs(NewNewHeader)),
-
-    % Getting a reset header maintains the epoch data
-    ResetHeader = from(NewNewHeader),
-    ?assertEqual(OwnedEpochs, epochs(ResetHeader)).
-
-
-get_uuid_from_old_header_test() ->
-    Vsn5Header = mk_header(5),
-    ?assertEqual(undefined, uuid(Vsn5Header)).
-
-
-get_epochs_from_old_header_test() ->
-    Vsn5Header = mk_header(5),
-    ?assertEqual(undefined, epochs(Vsn5Header)).
-
-
--endif.

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/7a280947/src/couch_db_updater.erl
----------------------------------------------------------------------
diff --git a/src/couch_db_updater.erl b/src/couch_db_updater.erl
index 86a0300..315fbcd 100644
--- a/src/couch_db_updater.erl
+++ b/src/couch_db_updater.erl
@@ -86,79 +86,49 @@ handle_call({set_revs_limit, Limit}, _From, Db) ->
     ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
     {reply, ok, Db2};
 
-handle_call({purge_docs, _IdRevs}, _From,
-        #db{compactor_pid=Pid}=Db) when Pid /= nil ->
-    {reply, {error, purge_during_compaction}, Db};
-handle_call({purge_docs, IdRevs}, _From, Db) ->
-    DocIds = [Id || {Id, _Revs} <- IdRevs],
-    OldDocInfos = couch_db_engine:open_docs(Db, DocIds),
-
-    NewDocInfos = lists:flatmap(fun
-        ({{Id, Revs}, #full_doc_info{id = Id, rev_tree = Tree} = FDI}) ->
-            case couch_key_tree:remove_leafs(Tree, Revs) of
-                {_, [] = _RemovedRevs} -> % no change
-                    [];
-                {NewTree, RemovedRevs} ->
-                    NewFDI = FDI#full_doc_info{rev_tree = NewTree},
-                    [{FDI, NewFDI, RemovedRevs}]
-            end;
-        ({_, not_found}) ->
-            []
-    end, lists:zip(IdRevs, OldDocInfos)),
-
-    InitUpdateSeq = couch_db_engine:get_update_seq(Db),
-    InitAcc = {InitUpdateSeq, [], []},
-    FinalAcc = lists:foldl(fun({_, #full_doc_info{} = OldFDI, RemRevs}, Acc) ->
-        #full_doc_info{
-            id = Id,
-            rev_tree = OldTree
-        } = OldFDI,
-        {SeqAcc0, FDIAcc, IdRevsAcc} = Acc,
-
-        {NewFDIAcc, NewSeqAcc} = case OldTree of
-            [] ->
-                % If we purged every #leaf{} in the doc record
-                % then we're removing it completely from the
-                % database.
-                FDIAcc;
-            _ ->
-                % Its possible to purge the #leaf{} that contains
-                % the update_seq where this doc sits in the update_seq
-                % sequence. Rather than do a bunch of complicated checks
-                % we just re-label every #leaf{} and reinsert it into
-                % the update_seq sequence.
-                {NewTree, SeqAcc1} = couch_key_tree:mapfold(fun
-                    (_RevId, Leaf, leaf, InnerSeqAcc) ->
-                        {Leaf#leaf{seq = InnerSeqAcc + 1}, InnerSeqAcc + 1};
-                    (_RevId, Value, _Type, InnerSeqAcc) ->
-                        {Value, InnerSeqAcc}
-                end, SeqAcc0, OldTree),
-
-                NewFDI = OldFDI#full_doc_info{
-                    update_seq = SeqAcc1,
-                    rev_tree = NewTree
-                },
-
-                {[NewFDI | FDIAcc], SeqAcc1}
-        end,
-        NewIdRevsAcc = [{Id, RemRevs} | IdRevsAcc],
-        {NewSeqAcc, NewFDIAcc, NewIdRevsAcc}
-    end, InitAcc, NewDocInfos),
-
-    {_FinalSeq, FDIs, PurgedIdRevs} = FinalAcc,
-
-    % We need to only use the list of #full_doc_info{} records
-    % that we have actually changed due to a purge.
-    PreviousFDIs = [PrevFDI || {PrevFDI, _, _} <- NewDocInfos],
-    Pairs = pair_purge_info(PreviousFDIs, FDIs),
-
-    {ok, Db2} = couch_db_engine:write_doc_infos(Db, Pairs, [], PurgedIdRevs),
-
+handle_call({set_purged_docs_limit, Limit}, _From, Db) ->
+    {ok, Db2} = couch_db_engine:set_purged_docs_limit(Db, Limit),
     ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
-    couch_event:notify(Db#db.name, updated),
+    {reply, ok, Db2};
 
-    PurgeSeq = couch_db_engine:get_purge_seq(Db2),
-    {reply, {ok, PurgeSeq, PurgedIdRevs}, Db2};
+handle_call({purge_docs, UUIDsIdsRevs}, _From, Db) ->
+    UpdateSeq = couch_db_engine:get_update_seq(Db, update_seq),
+    Ids = [Id||{_UUID, Id, _Revs} <- UUIDsIdsRevs],
+    OldDocInfos = couch_db_engine:open_docs(Db, Ids),
+    DocInfosPurges = lists:zip(OldDocInfos, UUIDsIdsRevs),
+
+    FoldFun = fun({OldDocInfo, {UUId, Id, Revs}}, {UpdSeq, RAcc, PaAcc, PuAcc}) ->
+        case purge_doc(UpdSeq, OldDocInfo, Id, Revs) of
+            not_found ->
+                {UpdSeq,
+                [{ok, []}| RAcc],
+                PaAcc,
+                PuAcc};
+            {NewUpdSeq, Pair, {Id, PurgedRevs}} ->
+                {NewUpdSeq,
+                [{ok, PurgedRevs}| RAcc],
+                [Pair| PaAcc],
+                [{UUId, Id, PurgedRevs}| PuAcc]}
+        end
+    end,
+    InitAcc = {UpdateSeq, [], [], []},
+    {_USeq, Replies, Pairs, Purges} =
+            lists:foldl(FoldFun, InitAcc, DocInfosPurges),
+
+    Db2 = if Pairs == [] -> Db; true ->
+        {ok, Db1} = couch_db_engine:purge_doc_revs(
+            Db, lists:reverse(Pairs), lists:reverse(Purges)
+        ),
+        ok = gen_server:call(couch_server, {db_updated, Db1}, infinity),
+        couch_event:notify(Db#db.name, updated),
+        Db1
+    end,
+    PurgeSeq = couch_db_engine:get(Db2, purge_seq),
+    {reply, {ok, {PurgeSeq, lists:reverse(Replies)}}, Db2};
+
+handle_call(get_disposable_purge_seq, _From, Db) ->
+    NewOldestPurgeSeq = get_disposable_purge_seq(Db),
+    {reply, {ok, NewOldestPurgeSeq}, Db};
 
 handle_call(Msg, From, Db) ->
     couch_db_engine:handle_call(Msg, From, Db).
@@ -608,7 +578,7 @@ update_docs_int(Db, DocsList, LocalDocs, MergeConflicts, FullCommit) ->
     Pairs = pair_write_info(OldDocLookups, IndexFDIs),
     LocalDocs2 = update_local_doc_revs(LocalDocs),
 
-    {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2, []),
+    {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2),
 
     WriteCount = length(IndexFDIs),
     couch_stats:increment_counter([couchdb, document_inserts],
@@ -662,6 +632,120 @@ update_local_doc_revs(Docs) ->
     end, Docs).
 
 
+purge_doc(UpdateSeq, OldDocInfo, Id, Revs) ->
+    case OldDocInfo of
+        #full_doc_info{rev_tree = Tree} = FDI ->
+            case couch_key_tree:remove_leafs(Tree, Revs) of
+                {_, [] = _RemovedRevs} -> % no change
+                    not_found;
+                {NewTree, RemovedRevs} ->
+                    case NewTree of
+                        [] ->
+                            % If we purged every #leaf{} in the doc record
+                            % then we're removing it completely from the
+                            % database.
+                            {UpdateSeq, {FDI, not_found}, {Id, RemovedRevs}};
+                        _ ->
+                            % Its possible to purge the #leaf{} that contains
+                            % the update_seq where this doc sits in the
+                            % update_seq sequence. Rather than do a bunch of
+                            % complicated checks we just re-label every #leaf{}
+                            % and reinsert it into the update_seq sequence.
+                            {NewTree2, NewUpdateSeq} = couch_key_tree:mapfold(fun
+                                (_RevId, Leaf, leaf, InnerSeqAcc) ->
+                                    {Leaf#leaf{seq = InnerSeqAcc + 1},
+                                        InnerSeqAcc + 1};
+                                (_RevId, Value, _Type, InnerSeqAcc) ->
+                                    {Value, InnerSeqAcc}
+                            end, UpdateSeq, NewTree),
+
+                            NewFDI = FDI#full_doc_info{
+                                update_seq = NewUpdateSeq,
+                                rev_tree = NewTree2
+                            },
+                            {NewUpdateSeq, {FDI, NewFDI}, {Id, RemovedRevs}}
+                    end
+            end;
+        not_found ->
+            not_found
+    end.
+
+
+% find purge seq such that all purge requests that happen before or
+% during it can be removed from purge trees
+get_disposable_purge_seq(#db{name=DbName} = Db) ->
+    PSeq = couch_db_engine:get(Db, purge_seq),
+    OldestPSeq = couch_db_engine:get(Db, oldest_purge_seq),
+    PDocsLimit = couch_db_engine:get(Db, purged_docs_limit),
+    ExpectedDispPSeq = PSeq - PDocsLimit,
+    % client's purge_seq can be up to "allowed_purge_seq_lag"
+    % behind ExpectedDispPSeq
+    AllowedPSeqLag = config:get_integer("couchdb", "allowed_purge_seq_lag", 100),
+    ClientAllowedMinPSeq = ExpectedDispPSeq - AllowedPSeqLag,
+    DisposablePSeq = if OldestPSeq > ClientAllowedMinPSeq ->
+        % DisposablePSeq is the last pseq we can remove;
+        % it should be one less than OldestPSeq when #purges is within limit
+        OldestPSeq - 1;
+    true ->
+        % Find the smallest checkpointed purge_seq among clients
+        Opts = [
+            {start_key, list_to_binary(?LOCAL_DOC_PREFIX ++ "purge-")},
+            {end_key_gt, list_to_binary(?LOCAL_DOC_PREFIX ++ "purge1")}
+        ],
+        FoldFun = fun(#doc{id=DocID, body={Props}}, MinPSeq) ->
+            ClientPSeq = couch_util:get_value(<<"purge_seq">>, Props),
+            MinPSeq2 = if ClientPSeq >= ClientAllowedMinPSeq ->
+                erlang:min(MinPSeq, ClientPSeq);
+            true ->
+                case check_client_exists(Props, DbName, DocID) of
+                    true ->  erlang:min(MinPSeq, ClientPSeq);
+                    false -> MinPSeq % ignore nonexisting clients
+                end
+            end,
+            {ok, MinPSeq2}
+        end,
+        {ok, ClientPSeq} = couch_db_engine:fold_local_docs(Db, FoldFun, PSeq, Opts),
+        erlang:min(ClientPSeq, ExpectedDispPSeq)
+    end,
+    DisposablePSeq.
+
+
+check_client_exists(Props, DbName, DocID) ->
+    % will warn about clients that have not
+    % checkpointed more than "allowed_purge_time_lag"
+    AllowedPTimeLag = config:get_integer("couchdb",
+        "allowed_purge_time_lag", 86400), % secs in 1 day
+    M = couch_util:get_value(<<"verify_module">>, Props),
+    F = couch_util:get_value(<<"verify_function">>, Props),
+    A = couch_util:get_value(<<"verify_options">>, Props),
+    ClientExists = try erlang:apply(M, F, A) of
+        true ->
+            % warn if we haven't heard of this client more than AllowedPTimeLag
+            ClientTime = couch_util:get_value(<<"timestamp_utc">>, Props),
+            {ok, [Y, Mon, D, H, Min, S], [] }=
+                io_lib:fread("~4d-~2d-~2dT~2d:~2d:~2dZ", ClientTime),
+            SecsClient = calendar:datetime_to_gregorian_seconds(
+                {{Y, Mon, D}, {H, Min, S}}),
+            SecsNow = calendar:datetime_to_gregorian_seconds(
+                calendar:now_to_universal_time(os:timestamp())),
+            if SecsClient + AllowedPTimeLag > SecsNow -> ok; true ->
+                couch_log:warning("Processing of purge requests is much behind on: ~p."
+                "Prevents purge_tree of db:~p from compacting.", [A, DbName])
+            end,
+            true;
+        false ->
+            couch_log:warning("Client ~p doesn't exists, but its : ~p doc on ~p"
+            "still exists. Accessed during compaction of purge_tree",
+                [A, DocID, DbName]),
+            false
+    catch
+        error:Error ->
+            couch_log:error("error in evaluating if client exists: ~p", [Error]),
+        false
+    end,
+    ClientExists.
+
+
 commit_data(Db) ->
     commit_data(Db, false).
 
@@ -704,15 +788,6 @@ pair_write_info(Old, New) ->
     end, New).
 
 
-pair_purge_info(Old, New) ->
-    lists:map(fun(OldFDI) ->
-        case lists:keyfind(OldFDI#full_doc_info.id, #full_doc_info.id, New) of
-            #full_doc_info{} = NewFDI -> {OldFDI, NewFDI};
-            false -> {OldFDI, not_found}
-        end
-    end, Old).
-
-
 default_security_object(<<"shards/", _/binary>>) ->
     case config:get("couchdb", "default_security", "everyone") of
         "admin_only" ->

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/7a280947/src/couch_util.erl
----------------------------------------------------------------------
diff --git a/src/couch_util.erl b/src/couch_util.erl
index dc2ef64..8a739a9 100644
--- a/src/couch_util.erl
+++ b/src/couch_util.erl
@@ -34,6 +34,7 @@
 -export([callback_exists/3, validate_callback_exists/3]).
 -export([with_proc/4]).
 -export([check_md5/2]).
+-export([utc_string/0]).
 
 -include_lib("couch/include/couch_db.hrl").
 
@@ -643,3 +644,12 @@ with_proc(M, F, A, Timeout) ->
         erlang:demonitor(Ref, [flush]),
         {error, timeout}
     end.
+
+
+utc_string() ->
+    {{Year, Month, Day}, {Hour, Minute, Second}} =
+        calendar:now_to_universal_time(os:timestamp()),
+    lists:flatten(
+        io_lib:format("~.4.0w-~.2.0w-~.2.0wT~.2.0w:~.2.0w:~.2.0wZ",
+            [Year, Month, Day, Hour, Minute, Second])
+    ).

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/7a280947/src/test_engine_compaction.erl
----------------------------------------------------------------------
diff --git a/src/test_engine_compaction.erl b/src/test_engine_compaction.erl
index b178bae..321ce94 100644
--- a/src/test_engine_compaction.erl
+++ b/src/test_engine_compaction.erl
@@ -82,12 +82,9 @@ cet_compact_with_everything() ->
 
     FooRev = test_engine_util:prev_rev(FooFDI),
     BarRev = test_engine_util:prev_rev(BarFDI),
-
     Actions3 = [
-        {batch, [
-            {purge, {<<"foo">>, FooRev#rev_info.rev}},
-            {purge, {<<"bar">>, BarRev#rev_info.rev}}
-        ]}
+        {purge, {<<"foo">>, FooRev#rev_info.rev}},
+        {purge, {<<"bar">>, BarRev#rev_info.rev}}
     ],
 
     {ok, St6} = test_engine_util:apply_actions(Engine, St5, Actions3),
@@ -97,7 +94,15 @@ cet_compact_with_everything() ->
         {<<"foo">>, [FooRev#rev_info.rev]}
     ],
 
-    ?assertEqual(PurgedIdRevs, lists:sort(Engine:get_last_purged(St6))),
+    {ok, St7} = test_engine_util:apply_actions(Engine, St6, Actions3),
+    {ok, PIdRevs7} = Engine:fold_purged_docs(St7, 0, fun fold_fun/2, [], []),
+    ?assertEqual(
+        [
+            {<<"foo">>, [FooRev#rev_info.rev]},
+            {<<"bar">>, [BarRev#rev_info.rev]}
+        ],
+        lists:reverse(PIdRevs7)
+    ),
 
     [Att0, Att1, Att2, Att3, Att4] = test_engine_util:prep_atts(Engine, St6, [
             {<<"ohai.txt">>, crypto:rand_bytes(2048)},
@@ -127,11 +132,21 @@ cet_compact_with_everything() ->
     end),
 
     {ok, St10, undefined} = Engine:finish_compaction(St9, DbName, [], Term),
+    {ok, PIdRevs11} = Engine:fold_purged_docs(St11, 0, fun fold_fun/2, [], []),
+    ?assertEqual(
+        [
+            {<<"foo">>, [FooRev#rev_info.rev]},
+            {<<"bar">>, [BarRev#rev_info.rev]}
+        ],
+        lists:reverse(PIdRevs11)
+    ),
+
     Db2 = test_engine_util:db_as_term(Engine, St10),
     Diff = test_engine_util:term_diff(Db1, Db2),
     ?assertEqual(nodiff, Diff).
 
 
+
 cet_recompact_updates() ->
     {ok, Engine, Path, St1} = test_engine_util:init_engine(dbpath),
 
@@ -171,6 +186,105 @@ cet_recompact_updates() ->
     ?assertEqual(nodiff, Diff).
 
 
+cet_recompact_purge() ->
+    {ok, Engine, Path, St1} = test_engine_util:init_engine(dbpath),
+
+    Actions1 = [
+        {create, {<<"foo">>, []}},
+        {create, {<<"bar">>, []}},
+        {conflict, {<<"bar">>, [{<<"vsn">>, 2}]}},
+        {create, {<<"baz">>, []}}
+    ],
+
+    {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions1),
+    {ok, St3, DbName, _, Term} = test_engine_util:compact(Engine, St2, Path),
+
+    [BarFDI, BazFDI] = Engine:open_docs(St3, [<<"bar">>, <<"baz">>]),
+    BarRev = test_engine_util:prev_rev(BarFDI),
+    BazRev = test_engine_util:prev_rev(BazFDI),
+    Actions2 = [
+        {purge, {<<"bar">>, BarRev#rev_info.rev}},
+        {purge, {<<"baz">>, BazRev#rev_info.rev}}
+    ],
+    {ok, St4} = test_engine_util:apply_actions(Engine, St3, Actions2),
+    Db1 = test_engine_util:db_as_term(Engine, St4),
+
+    {ok, St5, NewPid} = Engine:finish_compaction(St4, DbName, [], Term),
+
+    ?assertEqual(true, is_pid(NewPid)),
+    Ref = erlang:monitor(process, NewPid),
+
+    NewTerm = receive
+        {'$gen_cast', {compact_done, Engine, Term0}} ->
+            Term0;
+        {'DOWN', Ref, _, _, Reason} ->
+            erlang:error({compactor_died, Reason});
+        {'$gen_call', {NewPid, Ref2}, get_disposable_purge_seq} ->
+            NewPid!{Ref2, {ok, 0}},
+            receive
+                {'$gen_cast', {compact_done, Engine, Term0}} ->
+                    Term0;
+                {'DOWN', Ref, _, _, Reason} ->
+                    erlang:error({compactor_died, Reason})
+                after 10000 ->
+                    erlang:error(compactor_timed_out)
+            end
+        after 10000 ->
+            erlang:error(compactor_timed_out)
+    end,
+
+    {ok, St6, undefined} = Engine:finish_compaction(St5, DbName, [], NewTerm),
+    Db2 = test_engine_util:db_as_term(Engine, St6),
+    Diff = test_engine_util:term_diff(Db1, Db2),
+    ?assertEqual(nodiff, Diff).
+
+
+cet_compact_purged_docs_limit() ->
+    {ok, Engine, Path, St1} = test_engine_util:init_engine(dbpath),
+    % create NumDocs docs
+    NumDocs = 1200,
+    {RActions, RIds} = lists:foldl(fun(Id, {CActions, CIds}) ->
+        Id1 = docid(Id),
+        Action = {create, {Id1, [{<<"int">>, Id}]}},
+        {[Action| CActions], [Id1| CIds]}
+    end, {[], []}, lists:seq(1, NumDocs)),
+    Ids = lists:reverse(RIds),
+    {ok, St2} = test_engine_util:apply_actions(Engine, St1,
+        lists:reverse(RActions)),
+
+    % purge NumDocs docs
+    FDIs = Engine:open_docs(St2, Ids),
+    RevActions2 = lists:foldl(fun(FDI, CActions) ->
+        Id = FDI#full_doc_info.id,
+        PrevRev = test_engine_util:prev_rev(FDI),
+        Rev = PrevRev#rev_info.rev,
+        [{purge, {Id, Rev}}| CActions]
+    end, [], FDIs),
+    {ok, St3} = test_engine_util:apply_actions(Engine, St2,
+        lists:reverse(RevActions2)),
+
+    % check that before compaction all NumDocs of purge_requests
+    % are in purge_tree,
+    % even if NumDocs=1200 is greater than purged_docs_limit=1000
+    {ok, PurgedIdRevs} = Engine:fold_purged_docs(St3, 0, fun fold_fun/2, [], []),
+    ?assertEqual(1, Engine:get(St3, oldest_purge_seq)),
+    ?assertEqual(NumDocs, length(PurgedIdRevs)),
+
+    % compact db
+    {ok, St4, DbName, _, Term} = test_engine_util:compact(Engine, St3, Path),
+    {ok, St5, undefined} = Engine:finish_compaction(St4, DbName, [], Term),
+
+    % check that after compaction only purged_docs_limit purge_requests
+    % are in purge_tree
+    PurgedDocsLimit = Engine:get(St5, purged_docs_limit),
+    OldestPSeq = Engine:get(St5, oldest_purge_seq),
+    {ok, PurgedIdRevs2} = Engine:fold_purged_docs(
+        St5, OldestPSeq - 1, fun fold_fun/2, [], []),
+    ExpectedOldestPSeq = NumDocs - PurgedDocsLimit + 1,
+    ?assertEqual(ExpectedOldestPSeq, OldestPSeq),
+    ?assertEqual(PurgedDocsLimit, length(PurgedIdRevs2)).
+
+
 docid(I) ->
     Str = io_lib:format("~4..0b", [I]),
     iolist_to_binary(Str).
@@ -179,3 +293,7 @@ docid(I) ->
 local_docid(I) ->
     Str = io_lib:format("_local/~4..0b", [I]),
     iolist_to_binary(Str).
+
+
+fold_fun({_PSeq, _UUID, Id, Revs}, Acc) ->
+    [{Id, Revs} | Acc].

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/7a280947/src/test_engine_fold_purged_docs.erl
----------------------------------------------------------------------
diff --git a/src/test_engine_fold_purged_docs.erl b/src/test_engine_fold_purged_docs.erl
new file mode 100644
index 0000000..cb4238b
--- /dev/null
+++ b/src/test_engine_fold_purged_docs.erl
@@ -0,0 +1,134 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(test_engine_fold_purged_docs).
+-compile(export_all).
+
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(NUM_DOCS, 100).
+
+
+cet_empty_purged_docs() ->
+    {ok, Engine, St} = test_engine_util:init_engine(),
+    ?assertEqual({ok, []}, Engine:fold_purged_docs(St, 0, fun fold_fun/2, [], [])).
+
+
+cet_all_purged_docs() ->
+    {ok, Engine, St1} = test_engine_util:init_engine(),
+
+    {RActions, RIds} = lists:foldl(fun(Id, {CActions, CIds}) ->
+        Id1 = docid(Id),
+        Action = {create, {Id1, [{<<"int">>, Id}]}},
+        {[Action| CActions], [Id1| CIds]}
+     end, {[], []}, lists:seq(1, ?NUM_DOCS)),
+    Actions = lists:reverse(RActions),
+    Ids = lists:reverse(RIds),
+    {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+
+    FDIs = Engine:open_docs(St2, Ids),
+    {RevActions2, RevIdRevs} = lists:foldl(fun(FDI, {CActions, CIdRevs}) ->
+        Id = FDI#full_doc_info.id,
+        PrevRev = test_engine_util:prev_rev(FDI),
+        Rev = PrevRev#rev_info.rev,
+        Action = {purge, {Id, Rev}},
+        {[Action| CActions], [{Id, [Rev]}| CIdRevs]}
+     end, {[], []}, FDIs),
+    {Actions2, IdsRevs} = {lists:reverse(RevActions2), lists:reverse(RevIdRevs)},
+
+    {ok, St3} = test_engine_util:apply_actions(Engine, St2, Actions2),
+    {ok, PurgedIdRevs} = Engine:fold_purged_docs(St3, 0, fun fold_fun/2, [], []),
+    ?assertEqual(IdsRevs, lists:reverse(PurgedIdRevs)).
+
+
+cet_start_seq() ->
+    {ok, Engine, St1} = test_engine_util:init_engine(),
+    Actions1 = [
+        {create, {docid(1), [{<<"int">>, 1}]}},
+        {create, {docid(2), [{<<"int">>, 2}]}},
+        {create, {docid(3), [{<<"int">>, 3}]}},
+        {create, {docid(4), [{<<"int">>, 4}]}},
+        {create, {docid(5), [{<<"int">>, 5}]}}
+    ],
+    Ids = [docid(1), docid(2), docid(3), docid(4), docid(5)],
+    {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions1),
+
+    FDIs = Engine:open_docs(St2, Ids),
+    {RActions2, RIdRevs} = lists:foldl(fun(FDI, {CActions, CIdRevs}) ->
+        Id = FDI#full_doc_info.id,
+        PrevRev = test_engine_util:prev_rev(FDI),
+        Rev = PrevRev#rev_info.rev,
+        Action = {purge, {Id, Rev}},
+        {[Action| CActions], [{Id, [Rev]}| CIdRevs]}
+    end, {[], []}, FDIs),
+    {ok, St3} = test_engine_util:apply_actions(Engine, St2, lists:reverse(RActions2)),
+
+    StartSeq = 3,
+    StartSeqIdRevs = lists:nthtail(StartSeq, lists:reverse(RIdRevs)),
+    {ok, PurgedIdRevs} = Engine:fold_purged_docs(St3, StartSeq, fun fold_fun/2, [], []),
+    ?assertEqual(StartSeqIdRevs, lists:reverse(PurgedIdRevs)).
+
+
+cet_id_rev_repeated() ->
+    {ok, Engine, St1} = test_engine_util:init_engine(),
+
+    Actions1 = [
+        {create, {<<"foo">>, [{<<"vsn">>, 1}]}},
+        {conflict, {<<"foo">>, [{<<"vsn">>, 2}]}}
+    ],
+    {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions1),
+
+    [FDI1] = Engine:open_docs(St2, [<<"foo">>]),
+    PrevRev1 = test_engine_util:prev_rev(FDI1),
+    Rev1 = PrevRev1#rev_info.rev,
+    Actions2 = [
+        {purge, {<<"foo">>, Rev1}}
+    ],
+    {ok, St3} = test_engine_util:apply_actions(Engine, St2, Actions2),
+    PurgedIdRevs0 = [{<<"foo">>, [Rev1]}],
+    {ok, PurgedIdRevs1} = Engine:fold_purged_docs(St3, 0, fun fold_fun/2, [], []),
+    ?assertEqual(PurgedIdRevs0, PurgedIdRevs1),
+    ?assertEqual(1, Engine:get(St3, purge_seq)),
+
+    % purge the same Id,Rev when the doc still exists
+    {ok, St4} = test_engine_util:apply_actions(Engine, St3, Actions2),
+    {ok, PurgedIdRevs2} = Engine:fold_purged_docs(St4, 0, fun fold_fun/2, [], []),
+    ?assertEqual(PurgedIdRevs0, PurgedIdRevs2),
+    ?assertEqual(1, Engine:get(St4, purge_seq)),
+
+    [FDI2] = Engine:open_docs(St4, [<<"foo">>]),
+    PrevRev2 = test_engine_util:prev_rev(FDI2),
+    Rev2 = PrevRev2#rev_info.rev,
+    Actions3 = [
+        {purge, {<<"foo">>, Rev2}}
+    ],
+    {ok, St5} = test_engine_util:apply_actions(Engine, St4, Actions3),
+    PurgedIdRevs00 = [{<<"foo">>, [Rev1]}, {<<"foo">>, [Rev2]}],
+
+    % purge the same Id,Rev when the doc was completely purged
+    {ok, St6} = test_engine_util:apply_actions(Engine, St5, Actions3),
+    {ok, PurgedIdRevs3} = Engine:fold_purged_docs(St6, 0, fun fold_fun/2, [], []),
+    ?assertEqual(PurgedIdRevs00, lists:reverse(PurgedIdRevs3)),
+    ?assertEqual(2, Engine:get(St6, purge_seq)).
+
+
+fold_fun({_PSeq, _UUID, Id, Revs}, Acc) ->
+    [{Id, Revs} | Acc].
+
+
+docid(I) ->
+    Str = io_lib:format("~4..0b", [I]),
+    iolist_to_binary(Str).
+

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/7a280947/src/test_engine_get_set_props.erl
----------------------------------------------------------------------
diff --git a/src/test_engine_get_set_props.erl b/src/test_engine_get_set_props.erl
index 6d2a447..1e509dc 100644
--- a/src/test_engine_get_set_props.erl
+++ b/src/test_engine_get_set_props.erl
@@ -34,6 +34,8 @@ cet_default_props() ->
     ?assertEqual(true, is_integer(Engine:get_disk_version(St))),
     ?assertEqual(0, Engine:get_update_seq(St)),
     ?assertEqual(0, Engine:get_purge_seq(St)),
+    ?assertEqual(true, is_integer(Engine:get_purged_docs_limit(St))),
+    ?assertEqual(true, Engine:get_purged_docs_limit(St) > 0),
     ?assertEqual([], Engine:get_last_purged(St)),
     ?assertEqual(dso, Engine:get_security(St)),
     ?assertEqual(1000, Engine:get_revs_limit(St)),

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/7a280947/src/test_engine_purge_docs.erl
----------------------------------------------------------------------
diff --git a/src/test_engine_purge_docs.erl b/src/test_engine_purge_docs.erl
index e5bf249..191c2f2 100644
--- a/src/test_engine_purge_docs.erl
+++ b/src/test_engine_purge_docs.erl
@@ -25,12 +25,13 @@ cet_purge_simple() ->
         {create, {<<"foo">>, [{<<"vsn">>, 1}]}}
     ],
     {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions1),
+    {ok, PIdRevs2} = Engine:fold_purged_docs(St2, 0, fun fold_fun/2, [], []),
 
     ?assertEqual(1, Engine:get_doc_count(St2)),
     ?assertEqual(0, Engine:get_del_doc_count(St2)),
     ?assertEqual(1, Engine:get_update_seq(St2)),
     ?assertEqual(0, Engine:get_purge_seq(St2)),
-    ?assertEqual([], Engine:get_last_purged(St2)),
+    ?assertEqual([], PIdRevs2),
 
     [FDI] = Engine:open_docs(St2, [<<"foo">>]),
     PrevRev = test_engine_util:prev_rev(FDI),
@@ -40,12 +41,13 @@ cet_purge_simple() ->
         {purge, {<<"foo">>, Rev}}
     ],
     {ok, St3} = test_engine_util:apply_actions(Engine, St2, Actions2),
+    {ok, PIdRevs3} = Engine:fold_purged_docs(St3, 0, fun fold_fun/2, [], []),
 
     ?assertEqual(0, Engine:get_doc_count(St3)),
     ?assertEqual(0, Engine:get_del_doc_count(St3)),
     ?assertEqual(2, Engine:get_update_seq(St3)),
     ?assertEqual(1, Engine:get_purge_seq(St3)),
-    ?assertEqual([{<<"foo">>, [Rev]}], Engine:get_last_purged(St3)).
+    ?assertEqual([{<<"foo">>, [Rev]}], PIdRevs3).
 
 
 cet_purge_conflicts() ->
@@ -56,12 +58,13 @@ cet_purge_conflicts() ->
         {conflict, {<<"foo">>, [{<<"vsn">>, 2}]}}
     ],
     {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions1),
+    {ok, PIdRevs2} = Engine:fold_purged_docs(St2, 0, fun fold_fun/2, [], []),
 
     ?assertEqual(1, Engine:get_doc_count(St2)),
     ?assertEqual(0, Engine:get_del_doc_count(St2)),
     ?assertEqual(2, Engine:get_update_seq(St2)),
     ?assertEqual(0, Engine:get_purge_seq(St2)),
-    ?assertEqual([], Engine:get_last_purged(St2)),
+    ?assertEqual([], PIdRevs2),
 
     [FDI1] = Engine:open_docs(St2, [<<"foo">>]),
     PrevRev1 = test_engine_util:prev_rev(FDI1),
@@ -71,12 +74,13 @@ cet_purge_conflicts() ->
         {purge, {<<"foo">>, Rev1}}
     ],
     {ok, St3} = test_engine_util:apply_actions(Engine, St2, Actions2),
+    {ok, PIdRevs3} = Engine:fold_purged_docs(St3, 0, fun fold_fun/2, [], []),
 
     ?assertEqual(1, Engine:get_doc_count(St3)),
     ?assertEqual(0, Engine:get_del_doc_count(St3)),
     ?assertEqual(4, Engine:get_update_seq(St3)),
     ?assertEqual(1, Engine:get_purge_seq(St3)),
-    ?assertEqual([{<<"foo">>, [Rev1]}], Engine:get_last_purged(St3)),
+    ?assertEqual([{<<"foo">>, [Rev1]}], PIdRevs3),
 
     [FDI2] = Engine:open_docs(St3, [<<"foo">>]),
     PrevRev2 = test_engine_util:prev_rev(FDI2),
@@ -86,12 +90,14 @@ cet_purge_conflicts() ->
         {purge, {<<"foo">>, Rev2}}
     ],
     {ok, St4} = test_engine_util:apply_actions(Engine, St3, Actions3),
+    {ok, PIdRevs4} = Engine:fold_purged_docs(St4, 0, fun fold_fun/2, [], []),
 
     ?assertEqual(0, Engine:get_doc_count(St4)),
     ?assertEqual(0, Engine:get_del_doc_count(St4)),
     ?assertEqual(5, Engine:get_update_seq(St4)),
     ?assertEqual(2, Engine:get_purge_seq(St4)),
-    ?assertEqual([{<<"foo">>, [Rev2]}], Engine:get_last_purged(St4)).
+    ?assertEqual([{<<"foo">>, [Rev1]}, {<<"foo">>, [Rev2]}],
+            lists:reverse(PIdRevs4)).
 
 
 cet_add_delete_purge() ->
@@ -103,12 +109,14 @@ cet_add_delete_purge() ->
     ],
 
     {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions1),
+    {ok, PIdRevs2} = Engine:fold_purged_docs(St2, 0, fun fold_fun/2, [], []),
+
 
     ?assertEqual(0, Engine:get_doc_count(St2)),
     ?assertEqual(1, Engine:get_del_doc_count(St2)),
     ?assertEqual(2, Engine:get_update_seq(St2)),
     ?assertEqual(0, Engine:get_purge_seq(St2)),
-    ?assertEqual([], Engine:get_last_purged(St2)),
+    ?assertEqual([], PIdRevs2),
 
     [FDI] = Engine:open_docs(St2, [<<"foo">>]),
     PrevRev = test_engine_util:prev_rev(FDI),
@@ -118,12 +126,13 @@ cet_add_delete_purge() ->
         {purge, {<<"foo">>, Rev}}
     ],
     {ok, St3} = test_engine_util:apply_actions(Engine, St2, Actions2),
+    {ok, PIdRevs3} = Engine:fold_purged_docs(St3, 0, fun fold_fun/2, [], []),
 
     ?assertEqual(0, Engine:get_doc_count(St3)),
     ?assertEqual(0, Engine:get_del_doc_count(St3)),
     ?assertEqual(3, Engine:get_update_seq(St3)),
     ?assertEqual(1, Engine:get_purge_seq(St3)),
-    ?assertEqual([{<<"foo">>, [Rev]}], Engine:get_last_purged(St3)).
+    ?assertEqual([{<<"foo">>, [Rev]}], PIdRevs3).
 
 
 cet_add_two_purge_one() ->
@@ -135,12 +144,13 @@ cet_add_two_purge_one() ->
     ],
 
     {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions1),
+    {ok, PIdRevs2} = Engine:fold_purged_docs(St2, 0, fun fold_fun/2, [], []),
 
     ?assertEqual(2, Engine:get_doc_count(St2)),
     ?assertEqual(0, Engine:get_del_doc_count(St2)),
     ?assertEqual(2, Engine:get_update_seq(St2)),
     ?assertEqual(0, Engine:get_purge_seq(St2)),
-    ?assertEqual([], Engine:get_last_purged(St2)),
+    ?assertEqual([], PIdRevs2),
 
     [FDI] = Engine:open_docs(St2, [<<"foo">>]),
     PrevRev = test_engine_util:prev_rev(FDI),
@@ -150,9 +160,14 @@ cet_add_two_purge_one() ->
         {purge, {<<"foo">>, Rev}}
     ],
     {ok, St3} = test_engine_util:apply_actions(Engine, St2, Actions2),
+    {ok, PIdRevs3} = Engine:fold_purged_docs(St3, 0, fun fold_fun/2, [], []),
 
     ?assertEqual(1, Engine:get_doc_count(St3)),
     ?assertEqual(0, Engine:get_del_doc_count(St3)),
     ?assertEqual(3, Engine:get_update_seq(St3)),
     ?assertEqual(1, Engine:get_purge_seq(St3)),
-    ?assertEqual([{<<"foo">>, [Rev]}], Engine:get_last_purged(St3)).
+    ?assertEqual([{<<"foo">>, [Rev]}], PIdRevs3).
+
+
+fold_fun({_Pseq, _UUID, Id, Revs}, Acc) ->
+    [{Id, Revs} | Acc].