You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2018/03/26 20:55:51 UTC

[couchdb] 20/33: WIP - internal replication

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch COUCHDB-3326-clustered-purge-davisp-refactor
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit e590bdf407eb2952af7b357615e93910e676781b
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Mon Mar 26 13:07:57 2018 -0500

    WIP - internal replication
---
 src/mem3/src/mem3_rep.erl | 35 ++++++++++++++++++++++-------------
 src/mem3/src/mem3_rpc.erl |  4 ++--
 2 files changed, 24 insertions(+), 15 deletions(-)

diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl
index 338d20e..edac76d 100644
--- a/src/mem3/src/mem3_rep.erl
+++ b/src/mem3/src/mem3_rep.erl
@@ -76,7 +76,7 @@ go(#shard{} = Source, #shard{} = Target, Opts) ->
     go(Acc).
 
 
-go(#acc{source=Source, batch_count=BC}=Acc0) ->
+go(#acc{source=Source, batch_count=BC}=Acc) ->
     case couch_db:open(Source#shard.name, [?ADMIN_CTX]) of
     {ok, Db} ->
         Resp = try
@@ -200,9 +200,9 @@ find_source_seq_int(#doc{body={Props}}, SrcNode0, TgtNode0, TgtUUID, TgtSeq) ->
     end.
 
 
-repl(Acc0#acc{db = Db0}) ->
+repl(#acc{db = Db0} = Acc0) ->
     erlang:put(io_priority, {internal_repl, couch_db:name(Db0)}),
-    #acc{seq = Seq} = Acc1 = calculate_start_seq(Acc0),
+    Acc1 = calculate_start_seq(Acc0),
     try
         Acc3 = case config:get_boolean("mem3", "replicate_purges", false) of
             true ->
@@ -229,10 +229,10 @@ pull_purges(#acc{} = Acc0) ->
         name = TgtDbName
     } = Target,
 
-    {Acc2, RemToPull} = with_src_db(Acc0, fun(Db) ->
+    with_src_db(Acc0, fun(Db) ->
         SrcUUID = couch_db:get_uuid(Db),
         {ok, LocalPurgeId, Infos, ThroughSeq, Remaining} =
-                mem3_rpc:load_purge_infos(TNode, DbName, SrcUUID, Count),
+                mem3_rpc:load_purge_infos(TgtNode, TgtDbName, SrcUUID, Count),
 
         if Infos == [] -> ok; true ->
             {ok, _} = couch_db:purge_docs(Db, Infos, [replicated_edits]),
@@ -246,7 +246,7 @@ pull_purges(#acc{} = Acc0) ->
             PurgesToPush = couch_db:get_purge_seq(Db) - OldestPurgeSeq,
             Changes = couch_db:count_changes_since(Db, UpdateSeq),
             throw({finished, Remaining + PurgesToPush + Changes})
-        end
+        end,
 
         Acc0#acc{purgeid = LocalPurgeId}
     end).
@@ -265,13 +265,21 @@ push_purges(#acc{} = Acc0) ->
     } = Target,
 
     with_src_db(Acc0, fun(Db) ->
+        StartSeq = case couch_db:open_doc(Db, LocalPurgeId, []) of
+            {ok, #doc{body = {Props}}} ->
+                couch_util:get_value(<<"purge_seq">>, Props);
+            {not_found, _} ->
+                {ok, Oldest} = couch_db:get_oldest_purge_seq(Db),
+                Oldest
+        end,
+
         FoldFun = fun({PSeq, UUID, Id, Revs}, {Count, Infos, _}) ->
             NewCount = Count + length(Revs),
             NewInfos = [{UUID, Id, Revs} | Infos],
             Status = if NewCount < BatchSize -> ok; true -> stop end,
             {Status, {NewCount, NewInfos, PSeq}}
         end,
-        InitAcc = {0, [], StartSeq}
+        InitAcc = {0, [], StartSeq},
         {ok, {_, Infos, ThroughSeq}} =
             couch_db:fold_purge_infos(Db, StartSeq, FoldFun, InitAcc),
 
@@ -303,12 +311,13 @@ push_changes(#acc{} = Acc0) ->
 
     % Avoid needless rewriting the internal replication
     % checkpoint document if nothing is replicated.
-    if Seq < couch_db:get_update_seq(Db0) -> ok; true ->
+    UpdateSeq = couch_db:get_update_seq(Db0),
+    if Seq < UpdateSeq -> ok; true ->
         throw({finished, 0})
-    end
+    end,
 
     with_src_db(Acc0, fun(Db) ->
-        Acc1 = Acc#acc{db = Db},
+        Acc1 = Acc0#acc{db = Db},
         Fun = fun ?MODULE:changes_enumerator/2,
         {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1),
         {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
@@ -485,15 +494,15 @@ purge_cp_body(#acc{} = Acc, PurgeSeq) ->
     } = Acc,
     {Mega, Secs, _} = os:timestamp(),
     NowSecs = Mega * 1000000 + Secs,
-    Body = {[
+    {[
         {<<"type">>, <<"internal_replication">>},
         {<<"updated_on">>, NowSecs},
         {<<"purge_seq">>, PurgeSeq},
         {<<"verify_module">>, <<"mem3_rep">>},
         {<<"verify_function">>, <<"verify_purge_checkpoint">>},
         {<<"dbname">>, Source#shard.dbname},
-        {<<"source">>, atom_to_binary(Source#shard.node)},
-        {<<"target">>, atom_to_binary(Target#shard.node)},
+        {<<"source">>, atom_to_binary(Source#shard.node, latin1)},
+        {<<"target">>, atom_to_binary(Target#shard.node, latin1)},
         {<<"range">>, Source#shard.range}
     ]}.
 
diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl
index ab293d7..b0b0098 100644
--- a/src/mem3/src/mem3_rpc.erl
+++ b/src/mem3/src/mem3_rpc.erl
@@ -156,7 +156,7 @@ load_purge_infos_rpc(DbName, SrcUUID, BatchSize) ->
             TgtUUID = couch_db:get_uuid(Db),
             PurgeDocId = mem3_rep:make_purge_id(SrcUUID, TgtUUID),
             StartSeq = case couch_db:open_doc(Db, PurgeDocId, []) of
-                {ok, #doc{props = {Props}}} ->
+                {ok, #doc{body = {Props}}} ->
                     couch_util:get_value(<<"purge_seq">>, Props);
                 {not_found, _} ->
                     {ok, Oldest} = couch_db:get_oldest_purge_seq(Db),
@@ -173,7 +173,7 @@ load_purge_infos_rpc(DbName, SrcUUID, BatchSize) ->
                     couch_db:fold_purge_infos(Db, StartSeq, FoldFun, InitAcc),
             {ok, PurgeSeq} = couch_db:get_purge_seq(Db),
             Remaining = PurgeSeq - ThroughSeq,
-            rexi:reply({ok, PurgeDocId, PurgeInfos, ThroughSeq, Remainin})
+            rexi:reply({ok, PurgeDocId, PurgeInfos, ThroughSeq, Remaining});
         Else ->
             rexi:reply(Else)
     end.

-- 
To stop receiving notification emails like this one, please contact
davisp@apache.org.