You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ji...@apache.org on 2018/08/09 09:43:24 UTC

[couchdb] 07/10: [07/10] Clustered Purge: Internal replication

This is an automated email from the ASF dual-hosted git repository.

jiangphcn pushed a commit to branch COUCHDB-3326-clustered-purge-pr5-implementation
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 8025828e5654652f7f477256a602841288ab2685
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Tue Apr 24 12:27:14 2018 -0500

    [07/10] Clustered Purge: Internal replication
    
    This commit implements the internal replication of purge requests. This
    part of the anit-entropy process is important for ensuring that shard
    copies continue to be eventually consistent even if updates happen to
    shards independently due to a network split or other event that prevents
    the successful purge request to a given copy.
    
    The main addition to internal replication is that we both pull and push
    purge requests between the source and target shards. The push direction
    is obvious given that internal replication is in the push direction
    already. Pull isn't quite as obvious but is required so that we don't
    push an update that was already purged on the target.
    
    Of note is that internal replication also has to maintain _local doc
    checkpoints to prevent compaction removing old purge requests or else
    shard copies could end up missing purge requests which would prevent the
    shard copies from ever reaching a consistent state.
    
    COUCHDB-3326
    
    Co-authored-by: Mayya Sharipova <ma...@ca.ibm.com>
    Co-authored-by: jiangphcn <ji...@cn.ibm.com>
---
 .../src/cpse_test_purge_replication.erl            | 202 ++++++++++++++++++++
 src/couch_pse_tests/src/cpse_util.erl              |  32 +++-
 src/mem3/src/mem3_epi.erl                          |   3 +-
 .../src/{mem3_epi.erl => mem3_plugin_couch_db.erl} |  39 +---
 src/mem3/src/mem3_rep.erl                          | 206 +++++++++++++++++++--
 src/mem3/src/mem3_rpc.erl                          |  71 ++++++-
 6 files changed, 499 insertions(+), 54 deletions(-)

diff --git a/src/couch_pse_tests/src/cpse_test_purge_replication.erl b/src/couch_pse_tests/src/cpse_test_purge_replication.erl
new file mode 100644
index 0000000..fb09eeb
--- /dev/null
+++ b/src/couch_pse_tests/src/cpse_test_purge_replication.erl
@@ -0,0 +1,202 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(cpse_test_purge_replication).
+-compile(export_all).
+-compile(nowarn_export_all).
+
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+
+setup_all() ->
+    cpse_util:setup_all([mem3, fabric, couch_replicator]).
+
+
+setup_each() ->
+    {ok, Src} = cpse_util:create_db(),
+    {ok, Tgt} = cpse_util:create_db(),
+    {couch_db:name(Src), couch_db:name(Tgt)}.
+
+
+teardown_each({SrcDb, TgtDb}) ->
+    ok = couch_server:delete(SrcDb, []),
+    ok = couch_server:delete(TgtDb, []).
+
+
+cpse_purge_http_replication({Source, Target}) ->
+    {ok, Rev1} = cpse_util:save_doc(Source, {[{'_id', foo}, {vsn, 1}]}),
+
+    cpse_util:assert_db_props(?MODULE, ?LINE, Source, [
+        {doc_count, 1},
+        {del_doc_count, 0},
+        {update_seq, 1},
+        {changes, 1},
+        {purge_seq, 0},
+        {purge_infos, []}
+    ]),
+
+    RepObject = {[
+        {<<"source">>, Source},
+        {<<"target">>, Target}
+    ]},
+
+    {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER),
+    {ok, Doc1} = cpse_util:open_doc(Target, foo),
+
+    cpse_util:assert_db_props(?MODULE, ?LINE, Target, [
+        {doc_count, 1},
+        {del_doc_count, 0},
+        {update_seq, 1},
+        {changes, 1},
+        {purge_seq, 0},
+        {purge_infos, []}
+    ]),
+
+    PurgeInfos = [
+        {cpse_util:uuid(), <<"foo">>, [Rev1]}
+    ],
+
+    {ok, [{ok, PRevs}]} = cpse_util:purge(Source, PurgeInfos),
+    ?assertEqual([Rev1], PRevs),
+
+    cpse_util:assert_db_props(?MODULE, ?LINE, Source, [
+        {doc_count, 0},
+        {del_doc_count, 0},
+        {update_seq, 2},
+        {changes, 0},
+        {purge_seq, 1},
+        {purge_infos, PurgeInfos}
+    ]),
+
+    % Show that a purge on the source is
+    % not replicated to the target
+    {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER),
+    {ok, Doc2} = cpse_util:open_doc(Target, foo),
+    [Rev2] = Doc2#doc_info.revs,
+    ?assertEqual(Rev1, Rev2#rev_info.rev),
+    ?assertEqual(Doc1, Doc2),
+
+    cpse_util:assert_db_props(?MODULE, ?LINE, Target, [
+        {doc_count, 1},
+        {del_doc_count, 0},
+        {update_seq, 1},
+        {changes, 1},
+        {purge_seq, 0},
+        {purge_infos, []}
+    ]),
+
+    % Show that replicating from the target
+    % back to the source reintroduces the doc
+    RepObject2 = {[
+        {<<"source">>, Target},
+        {<<"target">>, Source}
+    ]},
+
+    {ok, _} = couch_replicator:replicate(RepObject2, ?ADMIN_USER),
+    {ok, Doc3} = cpse_util:open_doc(Source, foo),
+    [Revs3] = Doc3#doc_info.revs,
+    ?assertEqual(Rev1, Revs3#rev_info.rev),
+
+    cpse_util:assert_db_props(?MODULE, ?LINE, Source, [
+        {doc_count, 1},
+        {del_doc_count, 0},
+        {update_seq, 3},
+        {changes, 1},
+        {purge_seq, 1},
+        {purge_infos, PurgeInfos}
+    ]).
+
+
+cpse_purge_internal_repl_disabled({Source, Target}) ->
+    cpse_util:with_config([{"mem3", "replicate_purges", "false"}], fun() ->
+        repl(Source, Target),
+
+        {ok, [Rev1, Rev2]} = cpse_util:save_docs(Source, [
+            {[{'_id', foo1}, {vsn, 1}]},
+            {[{'_id', foo2}, {vsn, 2}]}
+        ]),
+
+        repl(Source, Target),
+
+        PurgeInfos1 = [
+            {cpse_util:uuid(), <<"foo1">>, [Rev1]}
+        ],
+        {ok, [{ok, PRevs1}]} = cpse_util:purge(Source, PurgeInfos1),
+        ?assertEqual([Rev1], PRevs1),
+
+        PurgeInfos2 = [
+            {cpse_util:uuid(), <<"foo2">>, [Rev2]}
+        ],
+        {ok, [{ok, PRevs2}]} = cpse_util:purge(Target, PurgeInfos2),
+        ?assertEqual([Rev2], PRevs2),
+
+        SrcShard = make_shard(Source),
+        TgtShard = make_shard(Target),
+        ?assertEqual({ok, 0}, mem3_rep:go(SrcShard, TgtShard)),
+        ?assertEqual({ok, 0}, mem3_rep:go(TgtShard, SrcShard)),
+
+        ?assertMatch({ok, #doc_info{}}, cpse_util:open_doc(Source, <<"foo2">>)),
+        ?assertMatch({ok, #doc_info{}}, cpse_util:open_doc(Target, <<"foo1">>))
+    end).
+
+
+cpse_purge_repl_simple_pull({Source, Target}) ->
+    repl(Source, Target),
+
+    {ok, Rev} = cpse_util:save_doc(Source, {[{'_id', foo}, {vsn, 1}]}),
+    repl(Source, Target),
+
+    PurgeInfos = [
+        {cpse_util:uuid(), <<"foo">>, [Rev]}
+    ],
+    {ok, [{ok, PRevs}]} = cpse_util:purge(Target, PurgeInfos),
+    ?assertEqual([Rev], PRevs),
+    repl(Source, Target).
+
+
+cpse_purge_repl_simple_push({Source, Target}) ->
+    repl(Source, Target),
+
+    {ok, Rev} = cpse_util:save_doc(Source, {[{'_id', foo}, {vsn, 1}]}),
+    repl(Source, Target),
+
+    PurgeInfos = [
+        {cpse_util:uuid(), <<"foo">>, [Rev]}
+    ],
+    {ok, [{ok, PRevs}]} = cpse_util:purge(Source, PurgeInfos),
+    ?assertEqual([Rev], PRevs),
+    repl(Source, Target).
+
+
+repl(Source, Target) ->
+    SrcShard = make_shard(Source),
+    TgtShard = make_shard(Target),
+
+    ?assertEqual({ok, 0}, mem3_rep:go(SrcShard, TgtShard)),
+
+    SrcTerm = cpse_util:db_as_term(Source, replication),
+    TgtTerm = cpse_util:db_as_term(Target, replication),
+
+    Diff = cpse_util:term_diff(SrcTerm, TgtTerm),
+    ?assertEqual(nodiff, Diff).
+
+
+make_shard(DbName) ->
+    #shard{
+        name = DbName,
+        node = node(),
+        dbname = DbName,
+        range = [0, 16#FFFFFFFF]
+    }.
diff --git a/src/couch_pse_tests/src/cpse_util.erl b/src/couch_pse_tests/src/cpse_util.erl
index bbb596f..b25e1d6 100644
--- a/src/couch_pse_tests/src/cpse_util.erl
+++ b/src/couch_pse_tests/src/cpse_util.erl
@@ -62,6 +62,7 @@ setup_all(ExtraApps) ->
     EngineModStr = atom_to_list(EngineMod),
     config:set("couchdb_engines", Extension, EngineModStr, false),
     config:set("log", "include_sasl", "false", false),
+    config:set("mem3", "replicate_purges", "true", false),
     Ctx.
 
 
@@ -453,17 +454,25 @@ prev_rev(#full_doc_info{} = FDI) ->
 
 
 db_as_term(Db) ->
+    db_as_term(Db, compact).
+
+db_as_term(DbName, Type) when is_binary(DbName) ->
+    couch_util:with_db(DbName, fun(Db) ->
+        db_as_term(Db, Type)
+    end);
+
+db_as_term(Db, Type) ->
     [
-        {props, db_props_as_term(Db)},
+        {props, db_props_as_term(Db, Type)},
         {docs, db_docs_as_term(Db)},
-        {local_docs, db_local_docs_as_term(Db)},
+        {local_docs, db_local_docs_as_term(Db, Type)},
         {changes, db_changes_as_term(Db)},
         {purged_docs, db_purged_docs_as_term(Db)}
     ].
 
 
-db_props_as_term(Db) ->
-    Props = [
+db_props_as_term(Db, Type) ->
+    Props0 = [
         get_doc_count,
         get_del_doc_count,
         get_disk_version,
@@ -475,6 +484,9 @@ db_props_as_term(Db) ->
         get_uuid,
         get_epochs
     ],
+    Props = if Type /= replication -> Props0; true ->
+        Props0 -- [get_uuid]
+    end,
     lists:map(fun(Fun) ->
         {Fun, couch_db_engine:Fun(Db)}
     end, Props).
@@ -488,8 +500,16 @@ db_docs_as_term(Db) ->
     end, FDIs)).
 
 
-db_local_docs_as_term(Db) ->
-    FoldFun = fun(Doc, Acc) -> {ok, [Doc | Acc]} end,
+db_local_docs_as_term(Db, Type) ->
+    FoldFun = fun(Doc, Acc) ->
+        case Doc#doc.id of
+            <<?LOCAL_DOC_PREFIX, "purge-mem3", _/binary>>
+                when Type == replication ->
+                {ok, Acc};
+            _ ->
+                {ok, [Doc | Acc]}
+        end
+    end,
     {ok, LDocs} = couch_db:fold_local_docs(Db, FoldFun, [], []),
     lists:reverse(LDocs).
 
diff --git a/src/mem3/src/mem3_epi.erl b/src/mem3/src/mem3_epi.erl
index ebcd596..4bf2bf5 100644
--- a/src/mem3/src/mem3_epi.erl
+++ b/src/mem3/src/mem3_epi.erl
@@ -30,7 +30,8 @@ app() ->
 
 providers() ->
     [
-         {chttpd_handlers, mem3_httpd_handlers}
+        {couch_db, mem3_plugin_couch_db},
+        {chttpd_handlers, mem3_httpd_handlers}
     ].
 
 
diff --git a/src/mem3/src/mem3_epi.erl b/src/mem3/src/mem3_plugin_couch_db.erl
similarity index 52%
copy from src/mem3/src/mem3_epi.erl
copy to src/mem3/src/mem3_plugin_couch_db.erl
index ebcd596..8cb5d78 100644
--- a/src/mem3/src/mem3_epi.erl
+++ b/src/mem3/src/mem3_plugin_couch_db.erl
@@ -2,7 +2,7 @@
 % use this file except in compliance with the License. You may obtain a copy of
 % the License at
 %
-% http://www.apache.org/licenses/LICENSE-2.0
+%   http://www.apache.org/licenses/LICENSE-2.0
 %
 % Unless required by applicable law or agreed to in writing, software
 % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
@@ -10,41 +10,12 @@
 % License for the specific language governing permissions and limitations under
 % the License.
 
-
--module(mem3_epi).
-
--behaviour(couch_epi_plugin).
+-module(mem3_plugin_couch_db).
 
 -export([
-    app/0,
-    providers/0,
-    services/0,
-    data_subscriptions/0,
-    data_providers/0,
-    processes/0,
-    notify/3
+    is_valid_purge_client/2
 ]).
 
-app() ->
-    mem3.
-
-providers() ->
-    [
-         {chttpd_handlers, mem3_httpd_handlers}
-    ].
-
-
-services() ->
-    [].
-
-data_subscriptions() ->
-    [].
-
-data_providers() ->
-    [].
-
-processes() ->
-    [].
 
-notify(_Key, _Old, _New) ->
-    ok.
+is_valid_purge_client(DbName, Props) ->
+    mem3_rep:verify_purge_checkpoint(DbName, Props).
diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl
index 670f990..03178cf 100644
--- a/src/mem3/src/mem3_rep.erl
+++ b/src/mem3/src/mem3_rep.erl
@@ -17,6 +17,8 @@
     go/2,
     go/3,
     make_local_id/2,
+    make_purge_id/2,
+    verify_purge_checkpoint/2,
     find_source_seq/4
 ]).
 
@@ -35,6 +37,7 @@
     infos = [],
     seq = 0,
     localid,
+    purgeid,
     source,
     target,
     filter,
@@ -118,6 +121,40 @@ make_local_id(SourceThing, TargetThing, Filter) ->
     <<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>.
 
 
+make_purge_id(SourceUUID, TargetUUID) ->
+    <<"_local/purge-mem3-", SourceUUID/binary, "-", TargetUUID/binary>>.
+
+
+verify_purge_checkpoint(DbName, Props) ->
+    try
+        Type = couch_util:get_value(<<"type">>, Props),
+        if Type =/= <<"internal_replication">> -> false; true ->
+            SourceBin = couch_util:get_value(<<"source">>, Props),
+            TargetBin = couch_util:get_value(<<"target">>, Props),
+            Range = couch_util:get_value(<<"range">>, Props),
+
+            Source = binary_to_existing_atom(SourceBin, latin1),
+            Target = binary_to_existing_atom(TargetBin, latin1),
+
+            try
+                Shards = mem3:shards(DbName),
+                Nodes = lists:foldl(fun(Shard, Acc) ->
+                    case Shard#shard.range == Range of
+                        true -> [Shard#shard.node | Acc];
+                        false -> Acc
+                    end
+                end, [], mem3:shards(DbName)),
+                lists:member(Source, Nodes) andalso lists:member(Target, Nodes)
+            catch
+                error:database_does_not_exist ->
+                    false
+            end
+        end
+    catch _:_ ->
+        false
+    end.
+
+
 %% @doc Find and return the largest update_seq in SourceDb
 %% that the client has seen from TargetNode.
 %%
@@ -169,20 +206,132 @@ find_source_seq_int(#doc{body={Props}}, SrcNode0, TgtNode0, TgtUUID, TgtSeq) ->
     end.
 
 
-repl(#acc{db = Db} = Acc0) ->
-    erlang:put(io_priority, {internal_repl, couch_db:name(Db)}),
-    #acc{seq=Seq} = Acc1 = calculate_start_seq(Acc0),
-    case Seq >= couch_db:get_update_seq(Db) of
-        true ->
-            {ok, 0};
-        false ->
-            Fun = fun ?MODULE:changes_enumerator/2,
-            {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1),
-            {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
-            {ok, couch_db:count_changes_since(Db, LastSeq)}
+repl(#acc{db = Db0} = Acc0) ->
+    erlang:put(io_priority, {internal_repl, couch_db:name(Db0)}),
+    Acc1 = calculate_start_seq(Acc0),
+    try
+        Acc3 = case config:get_boolean("mem3", "replicate_purges", false) of
+            true ->
+                Acc2 = pull_purges(Acc1),
+                push_purges(Acc2);
+            false ->
+                Acc1
+        end,
+        push_changes(Acc3)
+    catch
+        throw:{finished, Count} ->
+            {ok, Count}
     end.
 
 
+pull_purges(#acc{} = Acc0) ->
+    #acc{
+        batch_size = Count,
+        seq = UpdateSeq,
+        target = Target
+    } = Acc0,
+    #shard{
+        node = TgtNode,
+        name = TgtDbName
+    } = Target,
+
+    with_src_db(Acc0, fun(Db) ->
+        SrcUUID = couch_db:get_uuid(Db),
+        {LocalPurgeId, Infos, ThroughSeq, Remaining} =
+                mem3_rpc:load_purge_infos(TgtNode, TgtDbName, SrcUUID, Count),
+
+        if Infos == [] -> ok; true ->
+            {ok, _} = couch_db:purge_docs(Db, Infos, [replicated_edits]),
+            Body = purge_cp_body(Acc0, ThroughSeq),
+            mem3_rpc:save_purge_checkpoint(
+                    TgtNode, TgtDbName, LocalPurgeId, Body)
+        end,
+
+        if Remaining =< 0 -> ok; true ->
+            PurgeSeq = couch_db:get_purge_seq(Db),
+            OldestPurgeSeq = couch_db:get_oldest_purge_seq(Db),
+            PurgesToPush = PurgeSeq - OldestPurgeSeq,
+            Changes = couch_db:count_changes_since(Db, UpdateSeq),
+            throw({finished, Remaining + PurgesToPush + Changes})
+        end,
+
+        Acc0#acc{purgeid = LocalPurgeId}
+    end).
+
+
+push_purges(#acc{} = Acc0) ->
+    #acc{
+        batch_size = BatchSize,
+        purgeid = LocalPurgeId,
+        seq = UpdateSeq,
+        target = Target
+    } = Acc0,
+    #shard{
+        node = TgtNode,
+        name = TgtDbName
+    } = Target,
+
+    with_src_db(Acc0, fun(Db) ->
+        StartSeq = case couch_db:open_doc(Db, LocalPurgeId, []) of
+            {ok, #doc{body = {Props}}} ->
+                couch_util:get_value(<<"purge_seq">>, Props);
+            {not_found, _} ->
+                Oldest = couch_db:get_oldest_purge_seq(Db),
+                erlang:max(0, Oldest - 1)
+        end,
+
+        FoldFun = fun({PSeq, UUID, Id, Revs}, {Count, Infos, _}) ->
+            NewCount = Count + length(Revs),
+            NewInfos = [{UUID, Id, Revs} | Infos],
+            Status = if NewCount < BatchSize -> ok; true -> stop end,
+            {Status, {NewCount, NewInfos, PSeq}}
+        end,
+        InitAcc = {0, [], StartSeq},
+        {ok, {_, Infos, ThroughSeq}} =
+            couch_db:fold_purge_infos(Db, StartSeq, FoldFun, InitAcc),
+
+        if Infos == [] -> ok; true ->
+            ok = purge_on_target(TgtNode, TgtDbName, Infos),
+            Doc = #doc{
+                id = LocalPurgeId,
+                body = purge_cp_body(Acc0, ThroughSeq)
+            },
+            {ok, _} = couch_db:update_doc(Db, Doc, [])
+        end,
+
+        PurgeSeq = couch_db:get_purge_seq(Db),
+        if ThroughSeq >= PurgeSeq -> ok; true ->
+            Remaining = PurgeSeq - ThroughSeq,
+            Changes = couch_db:count_changes_since(Db, UpdateSeq),
+            throw({finished, Remaining + Changes})
+        end,
+
+        Acc0
+    end).
+
+
+push_changes(#acc{} = Acc0) ->
+    #acc{
+        db = Db0,
+        seq = Seq
+    } = Acc0,
+
+    % Avoid needless rewriting the internal replication
+    % checkpoint document if nothing is replicated.
+    UpdateSeq = couch_db:get_update_seq(Db0),
+    if Seq < UpdateSeq -> ok; true ->
+        throw({finished, 0})
+    end,
+
+    with_src_db(Acc0, fun(Db) ->
+        Acc1 = Acc0#acc{db = Db},
+        Fun = fun ?MODULE:changes_enumerator/2,
+        {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1),
+        {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
+        {ok, couch_db:count_changes_since(Db, LastSeq)}
+    end).
+
+
 calculate_start_seq(Acc) ->
     #acc{
         db = Db,
@@ -323,6 +472,15 @@ save_on_target(Node, Name, Docs) ->
     ok.
 
 
+purge_on_target(Node, Name, PurgeInfos) ->
+    mem3_rpc:purge_docs(Node, Name, PurgeInfos, [
+        replicated_changes,
+        full_commit,
+        ?ADMIN_CTX,
+        {io_priority, {internal_repl, Name}}
+    ]),
+    ok.
+
 update_locals(Acc) ->
     #acc{seq=Seq, db=Db, target=Target, localid=Id, history=History} = Acc,
     #shard{name=Name, node=Node} = Target,
@@ -336,6 +494,23 @@ update_locals(Acc) ->
     {ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []).
 
 
+purge_cp_body(#acc{} = Acc, PurgeSeq) ->
+    #acc{
+        source = Source,
+        target = Target
+    } = Acc,
+    {Mega, Secs, _} = os:timestamp(),
+    NowSecs = Mega * 1000000 + Secs,
+    {[
+        {<<"type">>, <<"internal_replication">>},
+        {<<"updated_on">>, NowSecs},
+        {<<"purge_seq">>, PurgeSeq},
+        {<<"source">>, atom_to_binary(Source#shard.node, latin1)},
+        {<<"target">>, atom_to_binary(Target#shard.node, latin1)},
+        {<<"range">>, Source#shard.range}
+    ]}.
+
+
 find_repl_doc(SrcDb, TgtUUIDPrefix) ->
     SrcUUID = couch_db:get_uuid(SrcDb),
     S = couch_util:encodeBase64Url(couch_hash:md5_hash(term_to_binary(SrcUUID))),
@@ -366,6 +541,15 @@ find_repl_doc(SrcDb, TgtUUIDPrefix) ->
     end.
 
 
+with_src_db(#acc{source = Source}, Fun) ->
+    {ok, Db} = couch_db:open(Source#shard.name, [?ADMIN_CTX]),
+    try
+        Fun(Db)
+    after
+        couch_db:close(Db)
+    end.
+
+
 is_prefix(Prefix, Subject) ->
     binary:longest_common_prefix([Prefix, Subject]) == size(Prefix).
 
diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl
index c2bd58f..35d1d0a 100644
--- a/src/mem3/src/mem3_rpc.erl
+++ b/src/mem3/src/mem3_rpc.erl
@@ -20,14 +20,21 @@
     get_missing_revs/4,
     update_docs/4,
     load_checkpoint/4,
-    save_checkpoint/6
+    save_checkpoint/6,
+
+    load_purge_infos/4,
+    save_purge_checkpoint/4,
+    purge_docs/4
 ]).
 
 % Private RPC callbacks
 -export([
     find_common_seq_rpc/3,
     load_checkpoint_rpc/3,
-    save_checkpoint_rpc/5
+    save_checkpoint_rpc/5,
+
+    load_purge_infos_rpc/3,
+    save_purge_checkpoint_rpc/3
 ]).
 
 
@@ -58,6 +65,20 @@ find_common_seq(Node, DbName, SourceUUID, SourceEpochs) ->
     rexi_call(Node, {mem3_rpc, find_common_seq_rpc, Args}).
 
 
+load_purge_infos(Node, DbName, SourceUUID, Count) ->
+    Args = [DbName, SourceUUID, Count],
+    rexi_call(Node, {mem3_rpc, load_purge_infos_rpc, Args}).
+
+
+save_purge_checkpoint(Node, DbName, PurgeDocId, Body) ->
+    Args = [DbName, PurgeDocId, Body],
+    rexi_call(Node, {mem3_rpc, save_purge_checkpoint_rpc, Args}).
+
+
+purge_docs(Node, DbName, PurgeInfos, Options) ->
+    rexi_call(Node, {fabric_rpc, purge_docs, [DbName, PurgeInfos, Options]}).
+
+
 load_checkpoint_rpc(DbName, SourceNode, SourceUUID) ->
     erlang:put(io_priority, {internal_repl, DbName}),
     case get_or_create_db(DbName, [?ADMIN_CTX]) of
@@ -128,6 +149,52 @@ find_common_seq_rpc(DbName, SourceUUID, SourceEpochs) ->
     end.
 
 
+load_purge_infos_rpc(DbName, SrcUUID, BatchSize) ->
+    erlang:put(io_priority, {internal_repl, DbName}),
+    case get_or_create_db(DbName, [?ADMIN_CTX]) of
+        {ok, Db} ->
+            TgtUUID = couch_db:get_uuid(Db),
+            PurgeDocId = mem3_rep:make_purge_id(SrcUUID, TgtUUID),
+            StartSeq = case couch_db:open_doc(Db, PurgeDocId, []) of
+                {ok, #doc{body = {Props}}} ->
+                    couch_util:get_value(<<"purge_seq">>, Props);
+                {not_found, _} ->
+                    Oldest = couch_db:get_oldest_purge_seq(Db),
+                    erlang:max(0, Oldest - 1)
+            end,
+            FoldFun = fun({PSeq, UUID, Id, Revs}, {Count, Infos, _}) ->
+                NewCount = Count + length(Revs),
+                NewInfos = [{UUID, Id, Revs} | Infos],
+                Status = if NewCount < BatchSize -> ok; true -> stop end,
+                {Status, {NewCount, NewInfos, PSeq}}
+            end,
+            InitAcc = {0, [], StartSeq},
+            {ok, {_, PurgeInfos, ThroughSeq}} =
+                    couch_db:fold_purge_infos(Db, StartSeq, FoldFun, InitAcc),
+            PurgeSeq = couch_db:get_purge_seq(Db),
+            Remaining = PurgeSeq - ThroughSeq,
+            rexi:reply({ok, {PurgeDocId, PurgeInfos, ThroughSeq, Remaining}});
+        Else ->
+            rexi:reply(Else)
+    end.
+
+
+save_purge_checkpoint_rpc(DbName, PurgeDocId, Body) ->
+    erlang:put(io_priority, {internal_repl, DbName}),
+    case get_or_create_db(DbName, [?ADMIN_CTX]) of
+        {ok, Db} ->
+            Doc = #doc{id = PurgeDocId, body = Body},
+            Resp = try couch_db:update_doc(Db, Doc, []) of
+                Resp0 -> Resp0
+            catch T:R ->
+                {T, R}
+            end,
+            rexi:reply(Resp);
+        Error ->
+            rexi:reply(Error)
+    end.
+
+
 %% @doc Return the sequence where two files with the same UUID diverged.
 compare_epochs(SourceEpochs, TargetEpochs) ->
     compare_rev_epochs(