You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2018/03/21 17:26:37 UTC

[couchdb] 02/08: WIP - Add internal replication of purges

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch COUCHDB-3326-clustered-purge-davisp-refactor
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 195fb5a40754b446f7963807cb817d012a0981cf
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Mar 21 12:26:11 2018 -0500

    WIP - Add internal replication of purges
---
 src/mem3/src/mem3_rep.erl | 135 ++++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 124 insertions(+), 11 deletions(-)

diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl
index 6d38af7..59d37f7 100644
--- a/src/mem3/src/mem3_rep.erl
+++ b/src/mem3/src/mem3_rep.erl
@@ -35,6 +35,7 @@
     infos = [],
     seq = 0,
     localid,
+    purgeid,
     source,
     target,
     filter,
@@ -169,20 +170,119 @@ find_source_seq_int(#doc{body={Props}}, SrcNode0, TgtNode0, TgtUUID, TgtSeq) ->
     end.
 
 
-repl(#acc{db = Db} = Acc0) ->
-    erlang:put(io_priority, {internal_repl, couch_db:name(Db)}),
-    #acc{seq=Seq} = Acc1 = calculate_start_seq(Acc0),
-    case Seq >= couch_db:get_update_seq(Db) of
-        true ->
-            {ok, 0};
-        false ->
-            Fun = fun ?MODULE:changes_enumerator/2,
-            {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1),
-            {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
-            {ok, couch_db:count_changes_since(Db, LastSeq)}
+repl(Acc0#acc{db = Db0}) ->
+    erlang:put(io_priority, {internal_repl, couch_db:name(Db0)}),
+    #acc{seq = Seq} = Acc1 = calculate_start_seq(Acc0),
+    try
+        Acc3 = case config:get_boolean("mem3", "replicate_purges", false) of
+            true ->
+                Acc2 = pull_purges(Acc1),
+                push_purges(Acc2);
+            false ->
+                Acc1
+        end,
+        push_changes(Acc3)
+    catch
+        throw:{finished, Count} ->
+            {ok, Count}
     end.
 
 
+pull_purges(#acc{} = Acc0) ->
+    #acc{
+        batch_size = Count,
+        seq = UpdateSeq,
+        source = Source,
+        target = Target
+    } = Acc0,
+    #shard{
+        node = TgtNode,
+        name = TgtDbName
+    } = Target,
+
+    {Acc2, RemToPull} = with_db(Source#shard.name, fun(Db) ->
+        SrcUUID = couch_db:get_uuid(Db),
+        {ok, LocalPurgeId, Infos, ThroughSeq, Remaining} =
+                mem3_rpc:load_purges(TNode, DbName, SrcUUID, Count),
+
+        if Infos == [] -> ok; true ->
+            {ok, _} = couch_db:purge_docs(Db, Infos, [replicated_edits]),
+            ok = mem3_rpc:save_purge_checkpoint(
+                    TgtNode, TgtDbName, LocalPurgeId, node(), ThroughSeq)
+        end,
+
+        if Remaining =< 0 -> ok; true ->
+            OldestPurgeSeq = couch_db:get_oldest_purge_seq(Db),
+            PurgesToPush = couch_db:get_purge_seq(Db) - OldestPurgeSeq,
+            Changes = couch_db:count_changes_since(Db, UpdateSeq),
+            throw({finished, Remaining + PurgesToPush + Changes})
+        end
+
+        Acc0#acc{purgeid = LocalPurgeId}
+    end).
+
+
+push_purges(#acc{} = Acc0) ->
+    #acc{
+        batch_size = BatchSize,
+        purgeid = LocalPurgeId,
+        seq = UpdateSeq,
+        source = Source,
+        target = Target
+    } = Acc0,
+    #shard{
+        node = TgtNode,
+        name = TgtDbName
+    } = Target,
+
+    with_db(Source#shard.name, fun(Db) ->
+        FoldFun = fun({PSeq, UUID, Id, Revs}, {Count, Infos, _}) ->
+            NewCount = Count + length(Revs),
+            NewInfos = [{UUID, Id, Revs} | Infos],
+            Status = if NewCount < BatchSize -> ok; true -> stop end,
+            {Status, {NewCount, NewInfos, PSeq}}
+        end,
+        InitAcc = {0, [], StartSeq}
+        {ok, {_, Infos, ThroughSeq}} =
+            couch_db:fold_purge_infos(Db, StartSeq, FoldFun, InitAcc),
+
+        ok = mem3_rpc:purge_docs(TgtNode, TgtDbName, Infos),
+        Doc = #doc{id = LocalPurgeId, body = purge_cp_body(ThroughSeq)},
+        {ok, _} = couch_db:update_doc(Db, Doc, []),
+
+        {ok, PurgeSeq} = couch_db:get_purge_seq(Db),
+        if ThroughSeq >= PurgeSeq -> ok; true ->
+            Remaining = PurgeSeq - ThroughSeq,
+            Changes = couch_db:count_changes_since(Db, UpdateSeq),
+            throw({finished, Remaining + Changes})
+        end,
+
+        Acc0
+    end).
+
+
+push_changes(#acc{} = Acc0) ->
+    #acc{
+        source = Source,
+        db = Db0,
+        seq = Seq
+    } = Acc0,
+
+    % Avoid needless rewriting the internal replication
+    % checkpoint document if nothing is replicated.
+    if Seq < couch_db:get_update_seq(Db0) -> ok; true ->
+        throw({finished, 0})
+    end
+
+    with_db(Source#shard.name, fun(Db) ->
+        Acc1 = Acc#acc{db = Db},
+        Fun = fun ?MODULE:changes_enumerator/2,
+        {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1),
+        {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
+        {ok, couch_db:count_changes_since(Db, LastSeq)}
+    end).
+
+
 calculate_start_seq(Acc) ->
     #acc{
         db = Db,
@@ -336,6 +436,19 @@ update_locals(Acc) ->
     {ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []).
 
 
+purge_cp_body(PurgeSeq) ->
+    {Mega, Secs, _} = os:timestamp(),
+    NowSecs = Mega * 1000000 + Secs,
+    Body = {[
+        {<<"type">>, <<"internal_replication">>},
+        {<<"updated_on">>, NowSecs},
+        {<<"purge_seq">>, PurgeSeq},
+        {<<"verify_module">>, <<"mem3_rep">>},
+        {<<"verify_function">>, <<"verify_purge_checkpoint">>},
+        {<<"node">>, atom_to_binary(node())}
+    ]}.
+
+
 find_repl_doc(SrcDb, TgtUUIDPrefix) ->
     SrcUUID = couch_db:get_uuid(SrcDb),
     S = couch_util:encodeBase64Url(crypto:hash(md5, term_to_binary(SrcUUID))),

-- 
To stop receiving notification emails like this one, please contact
davisp@apache.org.