You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2018/03/16 20:00:08 UTC

[couchdb] 14/20: Add internal replication of purge requests

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch COUCHDB-3326-clustered-purge-davisp-refactor
in repository

commit 0f44181de0cb729883c11326da284cf687e3a70e
Author: Mayya Sharipova <>
AuthorDate: Mon May 1 14:36:39 2017 -0400

    Add internal replication of purge requests
    * Add initial pull replication of purge requests
     - Each internal replication job starts by pulling purge requests from
        target and applying them on source.  If a source and target were
        disconnected during a purge request, it's possible that the target
        has received a purge request not yet present on the source. Given
        that internal replication is push oriented it would be possible for the
        source and target to reconnect and have the source push a revision that
        has since been purged. To avoid this we should pull purge requests
        from the target to ensure we're up to date before beginning internal
     - Add _local/purge-mem3-$hash docs in mem3_rep. mem3 writes a
        _local/purge-mem3-$hash document once purge requests have been
        replicated. This document will exist on the target and the
        purge_seq value will be the target's purge_seq that has been processed
        during *pull* replication.
    * Add push replication of purge requests
     - Push new purge requests from source to target, and apply them on
     - Update checkpoint docs to store the purge_seq on source
 src/mem3/src/mem3_rep.erl | 142 +++++++++++++++++++++++++++++++++++++++++-----
 src/mem3/src/mem3_rpc.erl | 128 +++++++++++++++++++++++++++++++++++++++--
 2 files changed, 252 insertions(+), 18 deletions(-)

diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl
index 942f8a8..44aee49 100644
--- a/src/mem3/src/mem3_rep.erl
+++ b/src/mem3/src/mem3_rep.erl
@@ -17,7 +17,9 @@
-    find_source_seq/4
+    make_local_purge_id/2,
+    find_source_seq/4,
+    mem3_sync_purge/1
@@ -39,7 +41,8 @@
-    history = {[]}
+    history = {[]},
+    purge_seq = 0
@@ -119,6 +122,12 @@ make_local_id(SourceThing, TargetThing, Filter) ->
     <<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>.
+make_local_purge_id(SourceUUID, TargetUUID) ->
+    V = ?l2b("v" ++ config:get("purge", "version", "1") ++ "-"),
+   <<"_local/purge-", V/binary, "mem3-",
+       SourceUUID/binary, "-", TargetUUID/binary>>.
 %% @doc Find and return the largest update_seq in SourceDb
 %% that the client has seen from TargetNode.
@@ -172,18 +181,58 @@ find_source_seq_int(#doc{body={Props}}, SrcNode0, TgtNode0, TgtUUID, TgtSeq) ->
 repl(Db, Acc0) ->
     erlang:put(io_priority, {internal_repl, couch_db:name(Db)}),
-    #acc{seq=Seq} = Acc1 = calculate_start_seq(Acc0#acc{source = Db}),
-    case Seq >= couch_db:get_update_seq(Db) of
-        true ->
-            {ok, 0};
-        false ->
-            Fun = fun ?MODULE:changes_enumerator/2,
-            {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1),
-            {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
-            {ok, couch_db:count_changes_since(Db, LastSeq)}
+    #acc{source = Db2} = Acc1 = pull_purges_from_target(Db, Acc0),
+    #acc{seq=Seq} = Acc2 = calculate_start_seq(Acc1),
+    try
+        % this throws an exception: {invalid_start_purge_seq, PurgeSeq0}
+        Acc3 = replicate_purged_docs(Acc2),
+        Fun = fun ?MODULE:changes_enumerator/2,
+        {ok, Acc4} = couch_db:fold_changes(Db2, Seq, Fun, Acc3),
+        {ok, #acc{seq = LastSeq}} = replicate_batch(Acc4),
+        {ok, couch_db:count_changes_since(Db2, LastSeq)}
+    catch
+        throw:{invalid_start_purge_seq, PurgeSeq} ->
+            couch_log:error(
+                "Oldest_purge_seq on source is greated than "
+                "the last source's purge_seq: ~p known to target!"
+                "Can't synchronize purges between: ~p and ~p!",
+                [PurgeSeq, Acc2#acc.source,]
+            )
+pull_purges_from_target(Db, #acc{target=#shard{node=TNode, name=DbName}}=Acc) ->
+    SourceUUID = couch_db:get_uuid(Db),
+    {TUUIDsIdsRevs, TargetPDocID, TargetPSeq} =
+            mem3_rpc:load_purges(TNode, DbName, SourceUUID),
+    Acc2 = case TUUIDsIdsRevs of
+        [] -> Acc#acc{source = Db};
+        _ ->
+            % check which Target UUIDs have not been applied to Source
+            UUIDs = [UUID || {UUID, _Id, _Revs} <- TUUIDsIdsRevs],
+            PurgedDocs = couch_db:open_purged_docs(Db, UUIDs),
+            Results = lists:zip(TUUIDsIdsRevs, PurgedDocs),
+            Unapplied = lists:filtermap(fun
+                ({UUIDIdRevs, not_found}) -> {true, UUIDIdRevs};
+                (_) -> false
+            end, Results),
+            Acc1 = case Unapplied of
+                [] -> Acc#acc{source = Db};
+                _ ->
+                    % purge Db on Source and reopen it
+                    couch_db:purge_docs(Db, Unapplied),
+                    couch_db:close(Db),
+                    {ok, Db2} = couch_db:open(DbName, [?ADMIN_CTX]),
+                    Acc#acc{source = Db2}
+            end,
+            % update on Target target_purge_seq known to Source
+            mem3_rpc:save_purge_checkpoint(TNode, DbName, TargetPDocID,
+                    TargetPSeq, node()),
+            Acc1
+    end,
+    Acc2.
 calculate_start_seq(Acc) ->
         source = Db,
@@ -215,7 +264,33 @@ calculate_start_seq(Acc) ->
                     Seq = TargetSeq,
                     History = couch_util:get_value(<<"history">>, TProps, {[]})
-            Acc1#acc{seq = Seq, history = History};
+            SourcePurgeSeq0 = couch_util:get_value(<<"purge_seq">>, SProps),
+            TargetPurgeSeq0 = couch_util:get_value(<<"purge_seq">>, TProps),
+            % before purge upgrade, purge_seq was not saved in checkpoint file,
+            % thus get purge_seq directly from dbs
+            SourcePurgeSeq = case is_integer(SourcePurgeSeq0) of
+                true ->
+                    SourcePurgeSeq0;
+                false ->
+                    {ok, SPS} = couch_db:get_purge_seq(Db),
+                    SPS
+            end,
+            TargetPurgeSeq = case is_integer(TargetPurgeSeq0) of
+                true ->
+                    TargetPurgeSeq0;
+                false ->
+                    mem3_rpc:get_purge_seq(Node, Name, [
+                        {io_priority, {internal_repl, Name}},
+                        ?ADMIN_CTX
+                    ])
+            end,
+            case SourcePurgeSeq =< TargetPurgeSeq of
+                true ->
+                    PurgeSeq = SourcePurgeSeq;
+                false ->
+                    PurgeSeq = TargetPurgeSeq
+            end,
+            Acc1#acc{seq = Seq, history = History, purge_seq = PurgeSeq};
         {not_found, _} ->
@@ -251,6 +326,27 @@ changes_enumerator(#full_doc_info{}=FDI, #acc{revcount=C, infos=Infos}=Acc0) ->
     {Go, Acc1}.
+replicate_purged_docs(Acc0) ->
+    #acc{
+        source = Db,
+        target = #shard{node=Node, name=Name},
+        purge_seq = PurgeSeq0
+    } = Acc0,
+    PFoldFun = fun({_PSeq, UUID, Id, Revs}, Acc) ->
+        [{UUID, Id, Revs} | Acc]
+    end,
+    {ok, UUIDsIdsRevs} = couch_db:fold_purged_docs(Db, PurgeSeq0, PFoldFun, [], []),
+    case UUIDsIdsRevs of
+        [] ->
+            Acc0;
+        _ ->
+            ok = purge_on_target(Node, Name, UUIDsIdsRevs),
+            {ok, PurgeSeq} = couch_db:get_purge_seq(Db),
+            Acc0#acc{purge_seq = PurgeSeq}
+    end.
 replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) ->
     case find_missing_revs(Acc) of
     [] ->
@@ -324,8 +420,19 @@ save_on_target(Node, Name, Docs) ->
+purge_on_target(Node, Name, UUIdsIdsRevs) ->
+    mem3_rpc:purge_docs(Node, Name, UUIdsIdsRevs,[
+        replicated_changes,
+        full_commit,
+        ?ADMIN_CTX,
+        {io_priority, {internal_repl, Name}}
+    ]),
+    ok.
 update_locals(Acc) ->
-    #acc{seq=Seq, source=Db, target=Target, localid=Id, history=History} = Acc,
+    #acc{seq=Seq, source=Db, target=Target, localid=Id,
+        history=History, purge_seq = PurgeSeq} = Acc,
     #shard{name=Name, node=Node} = Target,
     NewEntry = [
         {<<"source_node">>, atom_to_binary(node(), utf8)},
@@ -333,7 +440,8 @@ update_locals(Acc) ->
         {<<"source_seq">>, Seq},
         {<<"timestamp">>, list_to_binary(iso8601_timestamp())}
-    NewBody = mem3_rpc:save_checkpoint(Node, Name, Id, Seq, NewEntry, History),
+    NewBody = mem3_rpc:save_checkpoint(Node, Name, Id, Seq, PurgeSeq,
+        NewEntry, History),
     {ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []).
@@ -369,6 +477,12 @@ find_repl_doc(SrcDb, TgtUUIDPrefix) ->
+% used during compaction to check if _local/purge doc is current
+    Node = couch_util:get_value(<<"node">>, Opts),
+    lists:member(Node, mem3:nodes()).
 is_prefix(Prefix, Subject) ->
     binary:longest_common_prefix([Prefix, Subject]) == size(Prefix).
diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl
index c2bd58f..b9c1b39 100644
--- a/src/mem3/src/mem3_rpc.erl
+++ b/src/mem3/src/mem3_rpc.erl
@@ -19,15 +19,22 @@
+    get_purge_seq/3,
+    purge_docs/4,
-    save_checkpoint/6
+    save_checkpoint/7,
+    load_purges/3,
+    save_purge_checkpoint/5
 % Private RPC callbacks
-    save_checkpoint_rpc/5
+    save_checkpoint_rpc/5,
+    save_checkpoint_rpc/6,
+    load_purges_rpc/2,
+    save_purge_checkpoint_rpc/4
@@ -43,16 +50,34 @@ update_docs(Node, DbName, Docs, Options) ->
     rexi_call(Node, {fabric_rpc, update_docs, [DbName, Docs, Options]}).
+get_purge_seq(Node, DbName, Options) ->
+    rexi_call(Node, {fabric_rpc, get_purge_seq, [DbName, Options]}).
+purge_docs(Node, DbName, PUUIdsIdsRevs, Options) ->
+    rexi_call(Node, {fabric_rpc, purge_docs, [DbName, PUUIdsIdsRevs, Options]}).
 load_checkpoint(Node, DbName, SourceNode, SourceUUID) ->
     Args = [DbName, SourceNode, SourceUUID],
     rexi_call(Node, {mem3_rpc, load_checkpoint_rpc, Args}).
-save_checkpoint(Node, DbName, DocId, Seq, Entry, History) ->
-    Args = [DbName, DocId, Seq, Entry, History],
+save_checkpoint(Node, DbName, DocId, Seq, PurgeSeq, Entry, History) ->
+    Args = [DbName, DocId, Seq, PurgeSeq, Entry, History],
     rexi_call(Node, {mem3_rpc, save_checkpoint_rpc, Args}).
+load_purges(Node, DbName, SourceUUID) ->
+    Args = [DbName, SourceUUID],
+    rexi_call(Node, {mem3_rpc, load_purges_rpc, Args}).
+save_purge_checkpoint(Node, DbName, DocId, PurgeSeq, SourceNode) ->
+    Args = [DbName, DocId, PurgeSeq, SourceNode],
+    rexi_call(Node, {mem3_rpc, save_purge_checkpoint_rpc, Args}).
 find_common_seq(Node, DbName, SourceUUID, SourceEpochs) ->
     Args = [DbName, SourceUUID, SourceEpochs],
     rexi_call(Node, {mem3_rpc, find_common_seq_rpc, Args}).
@@ -81,6 +106,7 @@ load_checkpoint_rpc(DbName, SourceNode, SourceUUID) ->
+% Remove after all nodes in the cluster are upgrades to clustered purge
 save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) ->
     erlang:put(io_priority, {internal_repl, DbName}),
     case get_or_create_db(DbName, [?ADMIN_CTX]) of
@@ -111,6 +137,40 @@ save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) ->
+save_checkpoint_rpc(DbName, Id, SourceSeq, SourcePurgeSeq,
+        NewEntry0, History0) ->
+    erlang:put(io_priority, {internal_repl, DbName}),
+    case get_or_create_db(DbName, [?ADMIN_CTX]) of
+        {ok, Db} ->
+            NewEntry = {[
+                {<<"target_node">>, atom_to_binary(node(), utf8)},
+                {<<"target_uuid">>, couch_db:get_uuid(Db)},
+                {<<"target_seq">>, couch_db:get_update_seq(Db)}
+            ] ++ NewEntry0},
+            Body = {[
+                {<<"seq">>, SourceSeq},
+                {<<"purge_seq">>, SourcePurgeSeq},
+                {<<"target_uuid">>, couch_db:get_uuid(Db)},
+                {<<"history">>, add_checkpoint(NewEntry, History0)}
+            ]},
+            Doc = #doc{id = Id, body = Body},
+            rexi:reply(try couch_db:update_doc(Db, Doc, []) of
+                {ok, _} ->
+                    {ok, Body};
+                Else ->
+                    {error, Else}
+            catch
+                Exception ->
+                    Exception;
+                error:Reason ->
+                    {error, Reason}
+            end);
+        Error ->
+            rexi:reply(Error)
+    end.
 find_common_seq_rpc(DbName, SourceUUID, SourceEpochs) ->
     erlang:put(io_priority, {internal_repl, DbName}),
     case get_or_create_db(DbName, [?ADMIN_CTX]) of
@@ -128,6 +188,66 @@ find_common_seq_rpc(DbName, SourceUUID, SourceEpochs) ->
+load_purges_rpc(DbName, SourceUUID) ->
+    erlang:put(io_priority, {internal_repl, DbName}),
+    case get_or_create_db(DbName, [?ADMIN_CTX]) of
+    {ok, Db} ->
+        TargetUUID = couch_db:get_uuid(Db),
+        DocId = mem3_rep:make_local_purge_id(SourceUUID, TargetUUID),
+        LastPSeq = case couch_db:open_doc(Db, DocId, []) of
+            {ok, #doc{body={Props}} } ->
+                couch_util:get_value(<<"purge_seq">>, Props);
+            {not_found, _} ->
+                % synchronize only last purge
+                {ok, OldestPSeq} = couch_db:get_oldest_purge_seq(Db),
+                erlang:max(OldestPSeq-1, 0)
+        end,
+        {ok, CurPSeq} = couch_db:get_purge_seq(Db),
+        UUIDsIdsRevs = if (LastPSeq == CurPSeq) -> []; true ->
+            FoldFun = fun({_PSeq, UUID, Id, Revs}, Acc) ->
+                [{UUID, Id, Revs} | Acc]
+            end,
+            {ok, UUIDsIdsRevs0} = couch_db:fold_purged_docs(
+                Db, LastPSeq, FoldFun, [], []
+            ),
+            UUIDsIdsRevs0
+        end,
+        rexi:reply({ok, {UUIDsIdsRevs, DocId, CurPSeq}});
+    Error ->
+        rexi:reply(Error)
+    end.
+save_purge_checkpoint_rpc(DbName, Id, PurgeSeq, Node) ->
+    erlang:put(io_priority, {internal_repl, DbName}),
+    case get_or_create_db(DbName, [?ADMIN_CTX]) of
+        {ok, Db} ->
+            Timestamp = couch_util:utc_string(),
+            Body = {[
+                {<<"purge_seq">>, PurgeSeq},
+                {<<"timestamp_utc">>, Timestamp},
+                {<<"verify_module">>, <<"mem3_rep">>},
+                {<<"verify_function">>, <<"mem3_sync_purge">>},
+                {<<"verify_options">>, {[{<<"node">>, Node}]}},
+                {<<"type">>, <<"internal_replication">>}
+            ]},
+            Doc = #doc{id = Id, body = Body},
+            rexi:reply(try couch_db:update_doc(Db, Doc, []) of
+                {ok, _} ->
+                    {ok, Body};
+                Else ->
+                    {error, Else}
+            catch
+                Exception ->
+                    Exception;
+                error:Reason ->
+                    {error, Reason}
+            end);
+        Error ->
+            rexi:reply(Error)
+    end.
 %% @doc Return the sequence where two files with the same UUID diverged.
 compare_epochs(SourceEpochs, TargetEpochs) ->

To stop receiving notification emails like this one, please contact