You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ji...@apache.org on 2018/07/23 03:46:00 UTC

[couchdb] 02/09: [03/10] Clustered Purge: Update couch_bt_engine

This is an automated email from the ASF dual-hosted git repository.

jiangphcn pushed a commit to branch COUCHDB-3326-clustered-purge-pr5-implementation
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 712363cf3dba9935928b9b9fa64147ee4fa4215c
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Tue Apr 24 12:25:43 2018 -0500

    [03/10] Clustered Purge: Update couch_bt_engine
    
    This commit updates the couch_bt_engine storage engine implementation to
    satisfy the newly defined single-node purge APIs. This is accomplished
    by storing two new database btrees.
    
    The purge_seq_tree orders purge requests by their purge_seq. This tree
    is used to satisfy the fold_purge_infos API for database components to
    enumerate the list of purge requests in a defined order.
    
    The second index is the purge_tree which orders purge requests by their
    UUID to make for an efficient lookup when filtering replicated purge
    requests.
    
    COUCHDB-3326
    
    Co-authored-by: Mayya Sharipova <ma...@ca.ibm.com>
    Co-authored-by: jiangphcn <ji...@cn.ibm.com>
---
 src/couch/src/couch_bt_engine.erl           | 251 ++++++++++++++++++++++++----
 src/couch/src/couch_bt_engine.hrl           |   4 +-
 src/couch/src/couch_bt_engine_compactor.erl | 122 ++++++++++++--
 src/couch/src/couch_bt_engine_header.erl    |  35 ++--
 4 files changed, 344 insertions(+), 68 deletions(-)

diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 2583f10..1096c03 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -35,8 +35,9 @@
     get_disk_version/1,
     get_doc_count/1,
     get_epochs/1,
-    get_last_purged/1,
     get_purge_seq/1,
+    get_oldest_purge_seq/1,
+    get_purge_infos_limit/1,
     get_revs_limit/1,
     get_security/1,
     get_size_info/1,
@@ -44,15 +45,18 @@
     get_uuid/1,
 
     set_revs_limit/2,
+    set_purge_infos_limit/2,
     set_security/2,
 
     open_docs/2,
     open_local_docs/2,
     read_doc_body/2,
+    load_purge_infos/2,
 
     serialize_doc/2,
     write_doc_body/2,
-    write_doc_infos/4,
+    write_doc_infos/3,
+    purge_docs/3,
 
     commit_data/1,
 
@@ -63,6 +67,7 @@
     fold_docs/4,
     fold_local_docs/4,
     fold_changes/5,
+    fold_purge_infos/5,
     count_changes_since/2,
 
     start_compaction/4,
@@ -85,7 +90,13 @@
     seq_tree_reduce/2,
 
     local_tree_split/1,
-    local_tree_join/2
+    local_tree_join/2,
+
+    purge_tree_split/1,
+    purge_tree_join/2,
+    purge_tree_reduce/2,
+    purge_seq_tree_split/1,
+    purge_seq_tree_join/2
 ]).
 
 
@@ -217,18 +228,24 @@ get_epochs(#st{header = Header}) ->
     couch_bt_engine_header:get(Header, epochs).
 
 
-get_last_purged(#st{header = Header} = St) ->
-    case couch_bt_engine_header:get(Header, purged_docs) of
-        nil ->
-            [];
-        Pointer ->
-            {ok, PurgeInfo} = couch_file:pread_term(St#st.fd, Pointer),
-            PurgeInfo
-    end.
+get_purge_seq(#st{purge_seq_tree = PurgeSeqTree}) ->
+    Fun = fun({PurgeSeq, _, _, _}, _Reds, _Acc) ->
+        {stop, PurgeSeq}
+    end,
+    {ok, _, PurgeSeq} = couch_btree:fold(PurgeSeqTree, Fun, 0, [{dir, rev}]),
+    PurgeSeq.
+
+
+get_oldest_purge_seq(#st{purge_seq_tree = PurgeSeqTree}) ->
+    Fun = fun({PurgeSeq, _, _, _}, _Reds, _Acc) ->
+        {stop, PurgeSeq}
+    end,
+    {ok, _, PurgeSeq} = couch_btree:fold(PurgeSeqTree, Fun, 0, []),
+    PurgeSeq.
 
 
-get_purge_seq(#st{header = Header}) ->
-    couch_bt_engine_header:get(Header, purge_seq).
+get_purge_infos_limit(#st{header = Header}) ->
+    couch_bt_engine_header:get(Header, purge_infos_limit).
 
 
 get_revs_limit(#st{header = Header}) ->
@@ -284,6 +301,16 @@ set_revs_limit(#st{header = Header} = St, RevsLimit) ->
     {ok, increment_update_seq(NewSt)}.
 
 
+set_purge_infos_limit(#st{header = Header} = St, PurgeInfosLimit) ->
+    NewSt = St#st{
+        header = couch_bt_engine_header:set(Header, [
+            {purge_infos_limit, PurgeInfosLimit}
+        ]),
+        needs_commit = true
+    },
+    {ok, increment_update_seq(NewSt)}.
+
+
 set_security(#st{header = Header} = St, NewSecurity) ->
     Options = [{compression, St#st.compression}],
     {ok, Ptr, _} = couch_file:append_term(St#st.fd, NewSecurity, Options),
@@ -320,6 +347,14 @@ read_doc_body(#st{} = St, #doc{} = Doc) ->
     }.
 
 
+load_purge_infos(St, UUIDs) ->
+    Results = couch_btree:lookup(St#st.purge_tree, UUIDs),
+    lists:map(fun
+        ({ok, Info}) -> Info;
+        (not_found) -> not_found
+    end, Results).
+
+
 serialize_doc(#st{} = St, #doc{} = Doc) ->
     Compress = fun(Term) ->
         case couch_compress:is_compressed(Term, St#st.compression) of
@@ -351,7 +386,7 @@ write_doc_body(St, #doc{} = Doc) ->
     {ok, Doc#doc{body = Ptr}, Written}.
 
 
-write_doc_infos(#st{} = St, Pairs, LocalDocs, PurgedIdRevs) ->
+write_doc_infos(#st{} = St, Pairs, LocalDocs) ->
     #st{
         id_tree = IdTree,
         seq_tree = SeqTree,
@@ -391,23 +426,9 @@ write_doc_infos(#st{} = St, Pairs, LocalDocs, PurgedIdRevs) ->
         erlang:max(Seq, Acc)
     end, get_update_seq(St), Add),
 
-    NewHeader = case PurgedIdRevs of
-        [] ->
-            couch_bt_engine_header:set(St#st.header, [
-                {update_seq, NewUpdateSeq}
-            ]);
-        _ ->
-            {ok, Ptr, _} = couch_file:append_term(St#st.fd, PurgedIdRevs),
-            OldPurgeSeq = couch_bt_engine_header:get(St#st.header, purge_seq),
-            % We bump NewUpdateSeq because we have to ensure that
-            % indexers see that they need to process the new purge
-            % information.
-            couch_bt_engine_header:set(St#st.header, [
-                {update_seq, NewUpdateSeq + 1},
-                {purge_seq, OldPurgeSeq + 1},
-                {purged_docs, Ptr}
-            ])
-    end,
+    NewHeader = couch_bt_engine_header:set(St#st.header, [
+        {update_seq, NewUpdateSeq}
+    ]),
 
     {ok, St#st{
         header = NewHeader,
@@ -418,6 +439,46 @@ write_doc_infos(#st{} = St, Pairs, LocalDocs, PurgedIdRevs) ->
     }}.
 
 
+purge_docs(#st{} = St, Pairs, PurgeInfos) ->
+    #st{
+        id_tree = IdTree,
+        seq_tree = SeqTree,
+        purge_tree = PurgeTree,
+        purge_seq_tree = PurgeSeqTree
+    } = St,
+
+    RemDocIds = [Old#full_doc_info.id || {Old, not_found} <- Pairs],
+    RemSeqs = [Old#full_doc_info.update_seq || {Old, _} <- Pairs],
+    DocsToAdd = [New || {_, New} <- Pairs, New /= not_found],
+    CurrSeq = couch_bt_engine_header:get(St#st.header, update_seq),
+    Seqs = [FDI#full_doc_info.update_seq || FDI <- DocsToAdd],
+    NewSeq = lists:max([CurrSeq | Seqs]),
+
+    % We bump NewUpdateSeq because we have to ensure that
+    % indexers see that they need to process the new purge
+    % information.
+    UpdateSeq = case NewSeq == CurrSeq of
+        true -> CurrSeq + 1;
+        false -> NewSeq
+    end,
+    Header = couch_bt_engine_header:set(St#st.header, [
+        {update_seq, UpdateSeq}
+    ]),
+
+    {ok, IdTree2} = couch_btree:add_remove(IdTree, DocsToAdd, RemDocIds),
+    {ok, SeqTree2} = couch_btree:add_remove(SeqTree, DocsToAdd, RemSeqs),
+    {ok, PurgeTree2} = couch_btree:add(PurgeTree, PurgeInfos),
+    {ok, PurgeSeqTree2} = couch_btree:add(PurgeSeqTree, PurgeInfos),
+    {ok, St#st{
+        header = Header,
+        id_tree = IdTree2,
+        seq_tree = SeqTree2,
+        purge_tree = PurgeTree2,
+        purge_seq_tree = PurgeSeqTree2,
+        needs_commit = true
+    }}.
+
+
 commit_data(St) ->
     #st{
         fd = Fd,
@@ -480,6 +541,21 @@ fold_changes(St, SinceSeq, UserFun, UserAcc, Options) ->
     {ok, FinalUserAcc}.
 
 
+fold_purge_infos(St, StartSeq0, UserFun, UserAcc, Options) ->
+    PurgeSeqTree = St#st.purge_seq_tree,
+    StartSeq = StartSeq0 + 1,
+    MinSeq = get_oldest_purge_seq(St),
+    if MinSeq =< StartSeq -> ok; true ->
+        erlang:error({invalid_start_purge_seq, StartSeq0})
+    end,
+    Wrapper = fun(Info, _Reds, UAcc) ->
+        UserFun(Info, UAcc)
+    end,
+    Opts = [{start_key, StartSeq}] ++ Options,
+    {ok, _, OutAcc} = couch_btree:fold(PurgeSeqTree, Wrapper, UserAcc, Opts),
+    {ok, OutAcc}.
+
+
 count_changes_since(St, SinceSeq) ->
     BTree = St#st.seq_tree,
     FoldFun = fun(_SeqStart, PartialReds, 0) ->
@@ -619,6 +695,13 @@ local_tree_split(#doc{revs = {0, [Rev]}} = Doc) when is_integer(Rev) ->
     {Id, {Rev, BodyData}}.
 
 
+local_tree_join(Id, {Rev, BodyData}) when is_binary(Rev) ->
+    #doc{
+        id = Id,
+        revs = {0, [Rev]},
+        body = BodyData
+    };
+
 local_tree_join(Id, {Rev, BodyData}) when is_integer(Rev) ->
     #doc{
         id = Id,
@@ -627,6 +710,29 @@ local_tree_join(Id, {Rev, BodyData}) when is_integer(Rev) ->
     }.
 
 
+purge_tree_split({PurgeSeq, UUID, DocId, Revs}) ->
+    {UUID, {PurgeSeq, DocId, Revs}}.
+
+
+purge_tree_join(UUID, {PurgeSeq, DocId, Revs}) ->
+    {PurgeSeq, UUID, DocId, Revs}.
+
+
+purge_seq_tree_split({PurgeSeq, UUID, DocId, Revs}) ->
+    {PurgeSeq, {UUID, DocId, Revs}}.
+
+
+purge_seq_tree_join(PurgeSeq, {UUID, DocId, Revs}) ->
+    {PurgeSeq, UUID, DocId, Revs}.
+
+
+purge_tree_reduce(reduce, IdRevs) ->
+    % count the number of purge requests
+    length(IdRevs);
+purge_tree_reduce(rereduce, Reds) ->
+    lists:sum(Reds).
+
+
 set_update_seq(#st{header = Header} = St, UpdateSeq) ->
     {ok, St#st{
         header = couch_bt_engine_header:set(Header, [
@@ -682,7 +788,8 @@ init_state(FilePath, Fd, Header0, Options) ->
     Compression = couch_compress:get_compression_method(),
 
     Header1 = couch_bt_engine_header:upgrade(Header0),
-    Header = set_default_security_object(Fd, Header1, Compression, Options),
+    Header2 = set_default_security_object(Fd, Header1, Compression, Options),
+    Header = upgrade_purge_info(Fd, Header2),
 
     IdTreeState = couch_bt_engine_header:id_tree_state(Header),
     {ok, IdTree} = couch_btree:open(IdTreeState, Fd, [
@@ -707,6 +814,20 @@ init_state(FilePath, Fd, Header0, Options) ->
             {compression, Compression}
         ]),
 
+    PurgeTreeState = couch_bt_engine_header:purge_tree_state(Header),
+    {ok, PurgeTree} = couch_btree:open(PurgeTreeState, Fd, [
+        {split, fun ?MODULE:purge_tree_split/1},
+        {join, fun ?MODULE:purge_tree_join/2},
+        {reduce, fun ?MODULE:purge_tree_reduce/2}
+    ]),
+
+    PurgeSeqTreeState = couch_bt_engine_header:purge_seq_tree_state(Header),
+    {ok, PurgeSeqTree} = couch_btree:open(PurgeSeqTreeState, Fd, [
+        {split, fun ?MODULE:purge_seq_tree_split/1},
+        {join, fun ?MODULE:purge_seq_tree_join/2},
+        {reduce, fun ?MODULE:purge_tree_reduce/2}
+    ]),
+
     ok = couch_file:set_db_pid(Fd, self()),
 
     St = #st{
@@ -719,7 +840,9 @@ init_state(FilePath, Fd, Header0, Options) ->
         id_tree = IdTree,
         seq_tree = SeqTree,
         local_tree = LocalTree,
-        compression = Compression
+        compression = Compression,
+        purge_tree = PurgeTree,
+        purge_seq_tree = PurgeSeqTree
     },
 
     % If this is a new database we've just created a
@@ -738,7 +861,9 @@ update_header(St, Header) ->
     couch_bt_engine_header:set(Header, [
         {seq_tree_state, couch_btree:get_state(St#st.seq_tree)},
         {id_tree_state, couch_btree:get_state(St#st.id_tree)},
-        {local_tree_state, couch_btree:get_state(St#st.local_tree)}
+        {local_tree_state, couch_btree:get_state(St#st.local_tree)},
+        {purge_tree_state, couch_btree:get_state(St#st.purge_tree)},
+        {purge_seq_tree_state, couch_btree:get_state(St#st.purge_seq_tree)}
     ]).
 
 
@@ -763,6 +888,57 @@ set_default_security_object(Fd, Header, Compression, Options) ->
     end.
 
 
+% This function is here, and not in couch_bt_engine_header
+% because it requires modifying file contents
+upgrade_purge_info(Fd, Header) ->
+    case couch_bt_engine_header:get(Header, purge_tree_state) of
+        nil ->
+            Header;
+        Ptr when is_tuple(Ptr) ->
+            Header;
+        PurgeSeq when is_integer(PurgeSeq)->
+            % Pointer to old purged ids/revs is in purge_seq_tree_state
+            Ptr = couch_bt_engine_header:get(Header, purge_seq_tree_state),
+
+            case Ptr of
+                nil ->
+                    PTS = couch_bt_engine_header:purge_tree_state(Header),
+                    PurgeTreeSt = case PTS of 0 -> nil; Else -> Else end,
+                    couch_bt_engine_header:set(Header, [
+                        {purge_tree_state, PurgeTreeSt}
+                    ]);
+                _ ->
+                    {ok, PurgedIdsRevs} = couch_file:pread_term(Fd, Ptr),
+
+                    {Infos, NewSeq} = lists:foldl(fun({Id, Revs}, {InfoAcc, PSeq}) ->
+                        Info = {PSeq, couch_uuids:random(), Id, Revs},
+                        {[Info | InfoAcc], PSeq + 1}
+                    end, {[], PurgeSeq}, PurgedIdsRevs),
+
+                    {ok, PurgeTree} = couch_btree:open(nil, Fd, [
+                        {split, fun ?MODULE:purge_tree_split/1},
+                        {join, fun ?MODULE:purge_tree_join/2},
+                        {reduce, fun ?MODULE:purge_tree_reduce/2}
+                    ]),
+                    {ok, PurgeTree2} = couch_btree:add(PurgeTree, Infos),
+                    PurgeTreeSt = couch_btree:get_state(PurgeTree2),
+
+                    {ok, PurgeSeqTree} = couch_btree:open(nil, Fd, [
+                        {split, fun ?MODULE:purge_seq_tree_split/1},
+                        {join, fun ?MODULE:purge_seq_tree_join/2},
+                        {reduce, fun ?MODULE:purge_tree_reduce/2}
+                    ]),
+                    {ok, PurgeSeqTree2} = couch_btree:add(PurgeSeqTree, Infos),
+                    PurgeSeqTreeSt = couch_btree:get_state(PurgeSeqTree2),
+
+                    couch_bt_engine_header:set(Header, [
+                        {purge_tree_state, PurgeTreeSt},
+                        {purge_seq_tree_state, PurgeSeqTreeSt}
+                    ])
+            end
+    end.
+
+
 delete_compaction_files(FilePath) ->
     RootDir = config:get("couchdb", "database_dir", "."),
     DelOpts = [{context, delete}],
@@ -840,7 +1016,9 @@ active_size(#st{} = St, #size_info{} = SI) ->
     Trees = [
         St#st.id_tree,
         St#st.seq_tree,
-        St#st.local_tree
+        St#st.local_tree,
+        St#st.purge_tree,
+        St#st.purge_seq_tree
     ],
     lists:foldl(fun(T, Acc) ->
         case couch_btree:size(T) of
@@ -933,7 +1111,8 @@ finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) ->
     {ok, NewSt2} = commit_data(NewSt1#st{
         header = couch_bt_engine_header:set(Header, [
             {compacted_seq, get_update_seq(OldSt)},
-            {revs_limit, get_revs_limit(OldSt)}
+            {revs_limit, get_revs_limit(OldSt)},
+            {purge_infos_limit, get_purge_infos_limit(OldSt)}
         ]),
         local_tree = NewLocal2
     }),
diff --git a/src/couch/src/couch_bt_engine.hrl b/src/couch/src/couch_bt_engine.hrl
index 7f52d8f..1f5bcc9 100644
--- a/src/couch/src/couch_bt_engine.hrl
+++ b/src/couch/src/couch_bt_engine.hrl
@@ -20,5 +20,7 @@
     id_tree,
     seq_tree,
     local_tree,
-    compression
+    compression,
+    purge_tree,
+    purge_seq_tree
 }).
diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl
index 2c5b78e..10de686 100644
--- a/src/couch/src/couch_bt_engine_compactor.erl
+++ b/src/couch/src/couch_bt_engine_compactor.erl
@@ -56,7 +56,7 @@ start(#st{} = St, DbName, Options, Parent) ->
     % and hope everything works out for the best.
     unlink(DFd),
 
-    NewSt1 = copy_purge_info(St, NewSt),
+    NewSt1 = copy_purge_info(DbName, St, NewSt, Retry),
     NewSt2 = copy_compact(DbName, St, NewSt1, Retry),
     NewSt3 = sort_meta_data(NewSt2),
     NewSt4 = commit_compaction_data(NewSt3),
@@ -99,23 +99,111 @@ open_compaction_files(SrcHdr, DbFilePath, Options) ->
     end.
 
 
-copy_purge_info(OldSt, NewSt) ->
-    OldHdr = OldSt#st.header,
-    NewHdr = NewSt#st.header,
-    OldPurgeSeq = couch_bt_engine_header:purge_seq(OldHdr),
-    case OldPurgeSeq > 0 of
+copy_purge_info(DbName, OldSt, NewSt, Retry) ->
+    MinPurgeSeq = couch_util:with_db(DbName, fun(Db) ->
+        couch_db:get_minimum_purge_seq(Db)
+    end),
+    OldPSTree = OldSt#st.purge_seq_tree,
+    StartSeq = couch_bt_engine:get_purge_seq(NewSt) + 1,
+    BufferSize = config:get_integer(
+            "database_compaction", "doc_buffer_size", 524288),
+    CheckpointAfter = config:get(
+            "database_compaction", "checkpoint_after", BufferSize * 10),
+
+    EnumFun = fun(Info, _Reds, {StAcc0, InfosAcc, InfosSize, CopiedSize}) ->
+        NewInfosSize = InfosSize + ?term_size(Info),
+        if NewInfosSize >= BufferSize ->
+            StAcc1 = copy_purge_infos(
+                    OldSt, StAcc0, [Info | InfosAcc], MinPurgeSeq, Retry),
+            NewCopiedSize = CopiedSize + NewInfosSize,
+            if NewCopiedSize >= CheckpointAfter ->
+                StAcc2 = commit_compaction_data(StAcc1),
+                {ok, {StAcc2, [], 0, 0}};
+            true ->
+                {ok, {StAcc1, [], 0, NewCopiedSize}}
+            end;
         true ->
-            Purged = couch_bt_engine:get_last_purged(OldSt),
-            Opts = [{compression, NewSt#st.compression}],
-            {ok, Ptr, _} = couch_file:append_term(NewSt#st.fd, Purged, Opts),
-            NewNewHdr = couch_bt_engine_header:set(NewHdr, [
-                {purge_seq, OldPurgeSeq},
-                {purged_docs, Ptr}
-            ]),
-            NewSt#st{header = NewNewHdr};
-        false ->
-            NewSt
-    end.
+            NewInfosAcc = [Info | InfosAcc],
+            {ok, {StAcc0, NewInfosAcc, NewInfosSize, CopiedSize}}
+        end
+    end,
+
+    InitAcc = {NewSt, [], 0, 0},
+    Opts = [{start_key, StartSeq}],
+    {ok, _, FinalAcc} = couch_btree:fold(OldPSTree, EnumFun, InitAcc, Opts),
+    {NewStAcc, Infos, _, _} = FinalAcc,
+    copy_purge_infos(OldSt, NewStAcc, Infos, MinPurgeSeq, Retry).
+
+
+copy_purge_infos(OldSt, NewSt0, Infos, MinPurgeSeq, Retry) ->
+    #st{
+        id_tree = OldIdTree
+    } = OldSt,
+
+    % Re-bind our id_tree to the backing btree
+    NewIdTreeState = couch_bt_engine_header:id_tree_state(NewSt0#st.header),
+    MetaFd = couch_emsort:get_fd(NewSt0#st.id_tree),
+    MetaState = couch_emsort:get_state(NewSt0#st.id_tree),
+    NewSt1 = bind_id_tree(NewSt0, NewSt0#st.fd, NewIdTreeState),
+
+    #st{
+        id_tree = NewIdTree0,
+        seq_tree = NewSeqTree0,
+        purge_tree = NewPurgeTree0,
+        purge_seq_tree = NewPurgeSeqTree0
+    } = NewSt1,
+
+    % Copy over the purge infos
+    InfosToAdd = lists:filter(fun({PSeq, _, _, _}) ->
+        PSeq > MinPurgeSeq
+    end, Infos),
+    {ok, NewPurgeTree1} = couch_btree:add(NewPurgeTree0, InfosToAdd),
+    {ok, NewPurgeSeqTree1} = couch_btree:add(NewPurgeSeqTree0, InfosToAdd),
+
+    NewSt2 = NewSt1#st{
+        purge_tree = NewPurgeTree1,
+        purge_seq_tree = NewPurgeSeqTree1
+    },
+
+    % If we're peforming a retry compaction we have to check if
+    % any of the referenced docs have been completely purged
+    % from the database. Any doc that has been completely purged
+    % must then be removed from our partially compacted database.
+    NewSt3 = if Retry == nil -> NewSt2; true ->
+        AllDocIds = [DocId || {_PurgeSeq, _UUID, DocId, _Revs} <- Infos],
+        UniqDocIds = lists:usort(AllDocIds),
+        OldIdResults = couch_btree:lookup(OldIdTree, UniqDocIds),
+        OldZipped = lists:zip(UniqDocIds, OldIdResults),
+
+        % The list of non-existant docs in the database being compacted
+        MaybeRemDocIds = [DocId || {DocId, not_found} <- OldZipped],
+
+        % Removing anything that exists in the partially compacted database
+        NewIdResults = couch_btree:lookup(NewIdTree0, MaybeRemDocIds),
+        ToRemove = [Doc || {ok, Doc} <- NewIdResults, Doc /= {ok, not_found}],
+
+        {RemIds, RemSeqs} = lists:unzip(lists:map(fun(FDI) ->
+            #full_doc_info{
+                id = Id,
+                update_seq = Seq
+            } = FDI,
+            {Id, Seq}
+        end, ToRemove)),
+
+        {ok, NewIdTree1} = couch_btree:add_remove(NewIdTree0, [], RemIds),
+        {ok, NewSeqTree1} = couch_btree:add_remove(NewSeqTree0, [], RemSeqs),
+
+        NewSt2#st{
+            id_tree = NewIdTree1,
+            seq_tree = NewSeqTree1
+        }
+    end,
+
+    Header = couch_bt_engine:update_header(NewSt3, NewSt3#st.header),
+    NewSt4 = NewSt3#st{
+        header = Header
+    },
+    bind_emsort(NewSt4, MetaFd, MetaState).
 
 
 copy_compact(DbName, St, NewSt0, Retry) ->
diff --git a/src/couch/src/couch_bt_engine_header.erl b/src/couch/src/couch_bt_engine_header.erl
index 3d24f31..467bb2f 100644
--- a/src/couch/src/couch_bt_engine_header.erl
+++ b/src/couch/src/couch_bt_engine_header.erl
@@ -31,8 +31,9 @@
     seq_tree_state/1,
     latest/1,
     local_tree_state/1,
-    purge_seq/1,
-    purged_docs/1,
+    purge_tree_state/1,
+    purge_seq_tree_state/1,
+    purge_infos_limit/1,
     security_ptr/1,
     revs_limit/1,
     uuid/1,
@@ -51,7 +52,7 @@
 % if the disk revision is incremented, then new upgrade logic will need to be
 % added to couch_db_updater:init_db.
 
--define(LATEST_DISK_VERSION, 6).
+-define(LATEST_DISK_VERSION, 7).
 
 -record(db_header, {
     disk_version = ?LATEST_DISK_VERSION,
@@ -60,13 +61,14 @@
     id_tree_state = nil,
     seq_tree_state = nil,
     local_tree_state = nil,
-    purge_seq = 0,
-    purged_docs = nil,
+    purge_tree_state = nil,
+    purge_seq_tree_state = nil, %purge tree: purge_seq -> uuid
     security_ptr = nil,
     revs_limit = 1000,
     uuid,
     epochs,
-    compacted_seq
+    compacted_seq,
+    purge_infos_limit = 1000
 }).
 
 
@@ -150,12 +152,12 @@ local_tree_state(Header) ->
     get_field(Header, local_tree_state).
 
 
-purge_seq(Header) ->
-    get_field(Header, purge_seq).
+purge_tree_state(Header) ->
+    get_field(Header, purge_tree_state).
 
 
-purged_docs(Header) ->
-    get_field(Header, purged_docs).
+purge_seq_tree_state(Header) ->
+    get_field(Header, purge_seq_tree_state).
 
 
 security_ptr(Header) ->
@@ -178,6 +180,10 @@ compacted_seq(Header) ->
     get_field(Header, compacted_seq).
 
 
+purge_infos_limit(Header) ->
+    get_field(Header, purge_infos_limit).
+
+
 get_field(Header, Field) ->
     get_field(Header, Field, undefined).
 
@@ -229,6 +235,7 @@ upgrade_disk_version(#db_header{}=Header) ->
         3 -> throw({database_disk_version_error, ?OLD_DISK_VERSION_ERROR});
         4 -> Header#db_header{security_ptr = nil}; % [0.10 - 0.11)
         5 -> Header; % pre 1.2
+        6 -> Header; % pre clustered purge
         ?LATEST_DISK_VERSION -> Header;
         _ ->
             Reason = "Incorrect disk header version",
@@ -322,8 +329,8 @@ mk_header(Vsn) ->
         foo, % id_tree_state
         bar, % seq_tree_state
         bam, % local_tree_state
-        1, % purge_seq
-        baz, % purged_docs
+        flam, % was purge_seq - now purge_tree_state
+        baz, % was purged_docs - now purge_seq_tree_state
         bang, % security_ptr
         999 % revs_limit
     }.
@@ -342,8 +349,8 @@ upgrade_v3_test() ->
     ?assertEqual(foo, id_tree_state(NewHeader)),
     ?assertEqual(bar, seq_tree_state(NewHeader)),
     ?assertEqual(bam, local_tree_state(NewHeader)),
-    ?assertEqual(1, purge_seq(NewHeader)),
-    ?assertEqual(baz, purged_docs(NewHeader)),
+    ?assertEqual(flam, purge_tree_state(NewHeader)),
+    ?assertEqual(baz, purge_seq_tree_state(NewHeader)),
     ?assertEqual(bang, security_ptr(NewHeader)),
     ?assertEqual(999, revs_limit(NewHeader)),
     ?assertEqual(undefined, uuid(NewHeader)),