You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2018/03/21 18:25:31 UTC

[couchdb] branch COUCHDB-3326-clustered-purge-davisp-refactor updated: WIP - Internal replication

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch COUCHDB-3326-clustered-purge-davisp-refactor
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/COUCHDB-3326-clustered-purge-davisp-refactor by this push:
     new 1a46337  WIP - Internal replication
1a46337 is described below

commit 1a4633787fdaec5594668fc23346e7787c8c1039
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Mar 21 13:22:23 2018 -0500

    WIP - Internal replication
---
 src/mem3/src/mem3_rep.erl | 84 +++++++++++++++++++++++++++++++++++++++--------
 src/mem3/src/mem3_rpc.erl | 72 ++++++++++++++++++++++++++++++++++++++--
 2 files changed, 141 insertions(+), 15 deletions(-)

diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl
index 59d37f7..338d20e 100644
--- a/src/mem3/src/mem3_rep.erl
+++ b/src/mem3/src/mem3_rep.erl
@@ -17,6 +17,8 @@
     go/2,
     go/3,
     make_local_id/2,
+    make_purge_id/2,
+    verify_purge_checkpoint/3,
     find_source_seq/4
 ]).
 
@@ -119,6 +121,34 @@ make_local_id(SourceThing, TargetThing, Filter) ->
     <<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>.
 
 
+make_purge_id(SourceUUID, TargetUUID) ->
+    <<"_local/purge-mem3-", SourceUUID/binary, "-", TargetUUID/binary>>.
+
+
+verify_purge_checkpoint(_Db, _DocId, Props) ->
+    DbName = couch_util:get_value(<<"dbname">>, Props),
+    SourceBin = couch_util:get_value(<<"source">>, Props),
+    TargetBin = couch_util:get_value(<<"target">>, Props),
+    Range = couch_util:get_value(<<"range">>, Props),
+
+    Source = binary_to_existing_atom(SourceBin, latin1),
+    Target = binary_to_existing_atom(TargetBin, latin1),
+
+    try
+        Shards = mem3:shards(DbName),
+        Nodes = lists:foldl(fun(Shard, Acc) ->
+            case Shard#shard.range == Range of
+                true -> [Shard#shard.node | Acc];
+                false -> Acc
+            end
+        end, [], mem3:shards(DbName)),
+        lists:member(Source, Nodes) andalso lists:member(Target, Nodes)
+    catch
+        error:database_does_not_exist ->
+            false
+    end.
+
+
 %% @doc Find and return the largest update_seq in SourceDb
 %% that the client has seen from TargetNode.
 %%
@@ -192,7 +222,6 @@ pull_purges(#acc{} = Acc0) ->
     #acc{
         batch_size = Count,
         seq = UpdateSeq,
-        source = Source,
         target = Target
     } = Acc0,
     #shard{
@@ -200,15 +229,16 @@ pull_purges(#acc{} = Acc0) ->
         name = TgtDbName
     } = Target,
 
-    {Acc2, RemToPull} = with_db(Source#shard.name, fun(Db) ->
+    {Acc2, RemToPull} = with_src_db(Acc0, fun(Db) ->
         SrcUUID = couch_db:get_uuid(Db),
         {ok, LocalPurgeId, Infos, ThroughSeq, Remaining} =
-                mem3_rpc:load_purges(TNode, DbName, SrcUUID, Count),
+                mem3_rpc:load_purge_infos(TNode, DbName, SrcUUID, Count),
 
         if Infos == [] -> ok; true ->
             {ok, _} = couch_db:purge_docs(Db, Infos, [replicated_edits]),
+            Body = purge_cp_body(Acc0, ThroughSeq),
             ok = mem3_rpc:save_purge_checkpoint(
-                    TgtNode, TgtDbName, LocalPurgeId, node(), ThroughSeq)
+                    TgtNode, TgtDbName, LocalPurgeId, Body)
         end,
 
         if Remaining =< 0 -> ok; true ->
@@ -227,7 +257,6 @@ push_purges(#acc{} = Acc0) ->
         batch_size = BatchSize,
         purgeid = LocalPurgeId,
         seq = UpdateSeq,
-        source = Source,
         target = Target
     } = Acc0,
     #shard{
@@ -235,7 +264,7 @@ push_purges(#acc{} = Acc0) ->
         name = TgtDbName
     } = Target,
 
-    with_db(Source#shard.name, fun(Db) ->
+    with_src_db(Acc0, fun(Db) ->
         FoldFun = fun({PSeq, UUID, Id, Revs}, {Count, Infos, _}) ->
             NewCount = Count + length(Revs),
             NewInfos = [{UUID, Id, Revs} | Infos],
@@ -246,9 +275,14 @@ push_purges(#acc{} = Acc0) ->
         {ok, {_, Infos, ThroughSeq}} =
             couch_db:fold_purge_infos(Db, StartSeq, FoldFun, InitAcc),
 
-        ok = mem3_rpc:purge_docs(TgtNode, TgtDbName, Infos),
-        Doc = #doc{id = LocalPurgeId, body = purge_cp_body(ThroughSeq)},
-        {ok, _} = couch_db:update_doc(Db, Doc, []),
+        if Infos == [] -> ok; true ->
+            ok = purge_on_target(TgtNode, TgtDbName, Infos),
+            Doc = #doc{
+                id = LocalPurgeId,
+                body = purge_cp_body(Acc0, ThroughSeq)
+            },
+            {ok, _} = couch_db:update_doc(Db, Doc, [])
+        end,
 
         {ok, PurgeSeq} = couch_db:get_purge_seq(Db),
         if ThroughSeq >= PurgeSeq -> ok; true ->
@@ -263,7 +297,6 @@ push_purges(#acc{} = Acc0) ->
 
 push_changes(#acc{} = Acc0) ->
     #acc{
-        source = Source,
         db = Db0,
         seq = Seq
     } = Acc0,
@@ -274,7 +307,7 @@ push_changes(#acc{} = Acc0) ->
         throw({finished, 0})
     end
 
-    with_db(Source#shard.name, fun(Db) ->
+    with_src_db(Acc0, fun(Db) ->
         Acc1 = Acc#acc{db = Db},
         Fun = fun ?MODULE:changes_enumerator/2,
         {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1),
@@ -423,6 +456,15 @@ save_on_target(Node, Name, Docs) ->
     ok.
 
 
+purge_on_target(Node, Name, PurgeInfos) ->
+    {ok, _} = mem3_rpc:purge_docs(Node, Name, PurgeInfos, [
+        replicated_changes,
+        full_commit,
+        ?ADMIN_CTX,
+        {io_priority, {internal_repl, Name}}
+    ]),
+    ok.
+
 update_locals(Acc) ->
     #acc{seq=Seq, db=Db, target=Target, localid=Id, history=History} = Acc,
     #shard{name=Name, node=Node} = Target,
@@ -436,7 +478,11 @@ update_locals(Acc) ->
     {ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []).
 
 
-purge_cp_body(PurgeSeq) ->
+purge_cp_body(#acc{} = Acc, PurgeSeq) ->
+    #acc{
+        source = Source,
+        target = Target
+    } = Acc,
     {Mega, Secs, _} = os:timestamp(),
     NowSecs = Mega * 1000000 + Secs,
     Body = {[
@@ -445,7 +491,10 @@ purge_cp_body(PurgeSeq) ->
         {<<"purge_seq">>, PurgeSeq},
         {<<"verify_module">>, <<"mem3_rep">>},
         {<<"verify_function">>, <<"verify_purge_checkpoint">>},
-        {<<"node">>, atom_to_binary(node())}
+        {<<"dbname">>, Source#shard.dbname},
+        {<<"source">>, atom_to_binary(Source#shard.node)},
+        {<<"target">>, atom_to_binary(Target#shard.node)},
+        {<<"range">>, Source#shard.range}
     ]}.
 
 
@@ -481,6 +530,15 @@ find_repl_doc(SrcDb, TgtUUIDPrefix) ->
     end.
 
 
+with_src_db(#acc{source = Source}, Fun) ->
+    {ok, Db} = couch_db:open(Source#shard.name, [?ADMIN_CTX]),
+    try
+        Fun(Db)
+    after
+        couch_db:close(Db)
+    end.
+
+
 is_prefix(Prefix, Subject) ->
     binary:longest_common_prefix([Prefix, Subject]) == size(Prefix).
 
diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl
index c2bd58f..ab293d7 100644
--- a/src/mem3/src/mem3_rpc.erl
+++ b/src/mem3/src/mem3_rpc.erl
@@ -20,14 +20,21 @@
     get_missing_revs/4,
     update_docs/4,
     load_checkpoint/4,
-    save_checkpoint/6
+    save_checkpoint/6,
+
+    load_purge_infos/4,
+    save_purge_checkpoint/4,
+    purge_docs/4
 ]).
 
 % Private RPC callbacks
 -export([
     find_common_seq_rpc/3,
     load_checkpoint_rpc/3,
-    save_checkpoint_rpc/5
+    save_checkpoint_rpc/5,
+
+    load_purge_infos_rpc/3,
+    save_purge_checkpoint_rpc/3
 ]).
 
 
@@ -58,6 +65,20 @@ find_common_seq(Node, DbName, SourceUUID, SourceEpochs) ->
     rexi_call(Node, {mem3_rpc, find_common_seq_rpc, Args}).
 
 
+load_purge_infos(Node, DbName, SourceUUID, Count) ->
+    Args = [DbName, SourceUUID, Count],
+    rexi_call(Node, {mem3_rpc, load_purge_infos_rpc, Args}).
+
+
+save_purge_checkpoint(Node, DbName, PurgeDocId, Body) ->
+    Args = [DbName, PurgeDocId, Body],
+    rexi_call(Node, {mem3_rpc, save_purge_checkpoint, Args}).
+
+
+purge_docs(Node, DbName, PurgeInfos, Options) ->
+    rexi_call(Node, {fabric_rpc, purge_docs, [DbName, PurgeInfos, Options]}).
+
+
 load_checkpoint_rpc(DbName, SourceNode, SourceUUID) ->
     erlang:put(io_priority, {internal_repl, DbName}),
     case get_or_create_db(DbName, [?ADMIN_CTX]) of
@@ -128,6 +149,53 @@ find_common_seq_rpc(DbName, SourceUUID, SourceEpochs) ->
     end.
 
 
+load_purge_infos_rpc(DbName, SrcUUID, BatchSize) ->
+    erlang:put(io_priority, {internal_repl, DbName}),
+    case get_or_create_db(DbName, [?ADMIN_CTX]) of
+        {ok, Db} ->
+            TgtUUID = couch_db:get_uuid(Db),
+            PurgeDocId = mem3_rep:make_purge_id(SrcUUID, TgtUUID),
+            StartSeq = case couch_db:open_doc(Db, PurgeDocId, []) of
+                {ok, #doc{props = {Props}}} ->
+                    couch_util:get_value(<<"purge_seq">>, Props);
+                {not_found, _} ->
+                    {ok, Oldest} = couch_db:get_oldest_purge_seq(Db),
+                    Oldest
+            end,
+            FoldFun = fun({PSeq, UUID, Id, Revs}, {Count, Infos, _}) ->
+                NewCount = Count + length(Revs),
+                NewInfos = [{UUID, Id, Revs} | Infos],
+                Status = if NewCount < BatchSize -> ok; true -> stop end,
+                {Status, {NewCount, NewInfos, PSeq}}
+            end,
+            InitAcc = {0, [], StartSeq},
+            {ok, {_, PurgeInfos, ThroughSeq}} =
+                    couch_db:fold_purge_infos(Db, StartSeq, FoldFun, InitAcc),
+            {ok, PurgeSeq} = couch_db:get_purge_seq(Db),
+            Remaining = PurgeSeq - ThroughSeq,
+            rexi:reply({ok, PurgeDocId, PurgeInfos, ThroughSeq, Remainin})
+        Else ->
+            rexi:reply(Else)
+    end.
+
+
+save_purge_checkpoint_rpc(DbName, PurgeDocId, Body) ->
+    erlang:put(io_priority, {internal_repl, DbName}),
+    case get_or_create_db(DbName, [?ADMIN_CTX]) of
+        {ok, Db} ->
+            Doc = #doc{id = PurgeDocId, body = Body},
+            Resp = try couch_db:update_doc(Db, Doc, []) of
+                {ok, _} -> ok;
+                Else -> {error, Else}
+            catch T:R ->
+                {T, R}
+            end,
+            rexi:reply(Resp);
+        Error ->
+            rexi:reply(Error)
+    end.
+
+
 %% @doc Return the sequence where two files with the same UUID diverged.
 compare_epochs(SourceEpochs, TargetEpochs) ->
     compare_rev_epochs(

-- 
To stop receiving notification emails like this one, please contact
davisp@apache.org.