You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2018/03/21 18:26:19 UTC
[couchdb] 01/07: WIP - Add internal replication of purges
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch COUCHDB-3326-clustered-purge-davisp-refactor
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 1b2eeeb93bddf8c889ec362ce47cf43ee0c7669b
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Mar 21 12:26:11 2018 -0500
WIP - Add internal replication of purges
---
src/mem3/src/mem3_rep.erl | 193 +++++++++++++++++++++++++++++++++++++++++++---
src/mem3/src/mem3_rpc.erl | 72 ++++++++++++++++-
2 files changed, 252 insertions(+), 13 deletions(-)
diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl
index 6d38af7..338d20e 100644
--- a/src/mem3/src/mem3_rep.erl
+++ b/src/mem3/src/mem3_rep.erl
@@ -17,6 +17,8 @@
go/2,
go/3,
make_local_id/2,
+ make_purge_id/2,
+ verify_purge_checkpoint/3,
find_source_seq/4
]).
@@ -35,6 +37,7 @@
infos = [],
seq = 0,
localid,
+ purgeid,
source,
target,
filter,
@@ -118,6 +121,34 @@ make_local_id(SourceThing, TargetThing, Filter) ->
<<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>.
+make_purge_id(SourceUUID, TargetUUID) ->
+ <<"_local/purge-mem3-", SourceUUID/binary, "-", TargetUUID/binary>>.
+
+
+verify_purge_checkpoint(_Db, _DocId, Props) ->
+ DbName = couch_util:get_value(<<"dbname">>, Props),
+ SourceBin = couch_util:get_value(<<"source">>, Props),
+ TargetBin = couch_util:get_value(<<"target">>, Props),
+ Range = couch_util:get_value(<<"range">>, Props),
+
+ Source = binary_to_existing_atom(SourceBin, latin1),
+ Target = binary_to_existing_atom(TargetBin, latin1),
+
+ try
+ Shards = mem3:shards(DbName),
+ Nodes = lists:foldl(fun(Shard, Acc) ->
+ case Shard#shard.range == Range of
+ true -> [Shard#shard.node | Acc];
+ false -> Acc
+ end
+ end, [], mem3:shards(DbName)),
+ lists:member(Source, Nodes) andalso lists:member(Target, Nodes)
+ catch
+ error:database_does_not_exist ->
+ false
+ end.
+
+
%% @doc Find and return the largest update_seq in SourceDb
%% that the client has seen from TargetNode.
%%
@@ -169,20 +200,122 @@ find_source_seq_int(#doc{body={Props}}, SrcNode0, TgtNode0, TgtUUID, TgtSeq) ->
end.
-repl(#acc{db = Db} = Acc0) ->
- erlang:put(io_priority, {internal_repl, couch_db:name(Db)}),
- #acc{seq=Seq} = Acc1 = calculate_start_seq(Acc0),
- case Seq >= couch_db:get_update_seq(Db) of
- true ->
- {ok, 0};
- false ->
- Fun = fun ?MODULE:changes_enumerator/2,
- {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1),
- {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
- {ok, couch_db:count_changes_since(Db, LastSeq)}
+repl(Acc0#acc{db = Db0}) ->
+ erlang:put(io_priority, {internal_repl, couch_db:name(Db0)}),
+ #acc{seq = Seq} = Acc1 = calculate_start_seq(Acc0),
+ try
+ Acc3 = case config:get_boolean("mem3", "replicate_purges", false) of
+ true ->
+ Acc2 = pull_purges(Acc1),
+ push_purges(Acc2);
+ false ->
+ Acc1
+ end,
+ push_changes(Acc3)
+ catch
+ throw:{finished, Count} ->
+ {ok, Count}
end.
+pull_purges(#acc{} = Acc0) ->
+ #acc{
+ batch_size = Count,
+ seq = UpdateSeq,
+ target = Target
+ } = Acc0,
+ #shard{
+ node = TgtNode,
+ name = TgtDbName
+ } = Target,
+
+ {Acc2, RemToPull} = with_src_db(Acc0, fun(Db) ->
+ SrcUUID = couch_db:get_uuid(Db),
+ {ok, LocalPurgeId, Infos, ThroughSeq, Remaining} =
+ mem3_rpc:load_purge_infos(TNode, DbName, SrcUUID, Count),
+
+ if Infos == [] -> ok; true ->
+ {ok, _} = couch_db:purge_docs(Db, Infos, [replicated_edits]),
+ Body = purge_cp_body(Acc0, ThroughSeq),
+ ok = mem3_rpc:save_purge_checkpoint(
+ TgtNode, TgtDbName, LocalPurgeId, Body)
+ end,
+
+ if Remaining =< 0 -> ok; true ->
+ OldestPurgeSeq = couch_db:get_oldest_purge_seq(Db),
+ PurgesToPush = couch_db:get_purge_seq(Db) - OldestPurgeSeq,
+ Changes = couch_db:count_changes_since(Db, UpdateSeq),
+ throw({finished, Remaining + PurgesToPush + Changes})
+ end
+
+ Acc0#acc{purgeid = LocalPurgeId}
+ end).
+
+
+push_purges(#acc{} = Acc0) ->
+ #acc{
+ batch_size = BatchSize,
+ purgeid = LocalPurgeId,
+ seq = UpdateSeq,
+ target = Target
+ } = Acc0,
+ #shard{
+ node = TgtNode,
+ name = TgtDbName
+ } = Target,
+
+ with_src_db(Acc0, fun(Db) ->
+ FoldFun = fun({PSeq, UUID, Id, Revs}, {Count, Infos, _}) ->
+ NewCount = Count + length(Revs),
+ NewInfos = [{UUID, Id, Revs} | Infos],
+ Status = if NewCount < BatchSize -> ok; true -> stop end,
+ {Status, {NewCount, NewInfos, PSeq}}
+ end,
+ InitAcc = {0, [], StartSeq}
+ {ok, {_, Infos, ThroughSeq}} =
+ couch_db:fold_purge_infos(Db, StartSeq, FoldFun, InitAcc),
+
+ if Infos == [] -> ok; true ->
+ ok = purge_on_target(TgtNode, TgtDbName, Infos),
+ Doc = #doc{
+ id = LocalPurgeId,
+ body = purge_cp_body(Acc0, ThroughSeq)
+ },
+ {ok, _} = couch_db:update_doc(Db, Doc, [])
+ end,
+
+ {ok, PurgeSeq} = couch_db:get_purge_seq(Db),
+ if ThroughSeq >= PurgeSeq -> ok; true ->
+ Remaining = PurgeSeq - ThroughSeq,
+ Changes = couch_db:count_changes_since(Db, UpdateSeq),
+ throw({finished, Remaining + Changes})
+ end,
+
+ Acc0
+ end).
+
+
+push_changes(#acc{} = Acc0) ->
+ #acc{
+ db = Db0,
+ seq = Seq
+ } = Acc0,
+
+ % Avoid needless rewriting the internal replication
+ % checkpoint document if nothing is replicated.
+ if Seq < couch_db:get_update_seq(Db0) -> ok; true ->
+ throw({finished, 0})
+ end
+
+ with_src_db(Acc0, fun(Db) ->
+ Acc1 = Acc#acc{db = Db},
+ Fun = fun ?MODULE:changes_enumerator/2,
+ {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1),
+ {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
+ {ok, couch_db:count_changes_since(Db, LastSeq)}
+ end).
+
+
calculate_start_seq(Acc) ->
#acc{
db = Db,
@@ -323,6 +456,15 @@ save_on_target(Node, Name, Docs) ->
ok.
+purge_on_target(Node, Name, PurgeInfos) ->
+ {ok, _} = mem3_rpc:purge_docs(Node, Name, PurgeInfos, [
+ replicated_changes,
+ full_commit,
+ ?ADMIN_CTX,
+ {io_priority, {internal_repl, Name}}
+ ]),
+ ok.
+
update_locals(Acc) ->
#acc{seq=Seq, db=Db, target=Target, localid=Id, history=History} = Acc,
#shard{name=Name, node=Node} = Target,
@@ -336,6 +478,26 @@ update_locals(Acc) ->
{ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []).
+purge_cp_body(#acc{} = Acc, PurgeSeq) ->
+ #acc{
+ source = Source,
+ target = Target
+ } = Acc,
+ {Mega, Secs, _} = os:timestamp(),
+ NowSecs = Mega * 1000000 + Secs,
+ Body = {[
+ {<<"type">>, <<"internal_replication">>},
+ {<<"updated_on">>, NowSecs},
+ {<<"purge_seq">>, PurgeSeq},
+ {<<"verify_module">>, <<"mem3_rep">>},
+ {<<"verify_function">>, <<"verify_purge_checkpoint">>},
+ {<<"dbname">>, Source#shard.dbname},
+ {<<"source">>, atom_to_binary(Source#shard.node)},
+ {<<"target">>, atom_to_binary(Target#shard.node)},
+ {<<"range">>, Source#shard.range}
+ ]}.
+
+
find_repl_doc(SrcDb, TgtUUIDPrefix) ->
SrcUUID = couch_db:get_uuid(SrcDb),
S = couch_util:encodeBase64Url(crypto:hash(md5, term_to_binary(SrcUUID))),
@@ -368,6 +530,15 @@ find_repl_doc(SrcDb, TgtUUIDPrefix) ->
end.
+with_src_db(#acc{source = Source}, Fun) ->
+ {ok, Db} = couch_db:open(Source#shard.name, [?ADMIN_CTX]),
+ try
+ Fun(Db)
+ after
+ couch_db:close(Db)
+ end.
+
+
is_prefix(Prefix, Subject) ->
binary:longest_common_prefix([Prefix, Subject]) == size(Prefix).
diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl
index c2bd58f..ab293d7 100644
--- a/src/mem3/src/mem3_rpc.erl
+++ b/src/mem3/src/mem3_rpc.erl
@@ -20,14 +20,21 @@
get_missing_revs/4,
update_docs/4,
load_checkpoint/4,
- save_checkpoint/6
+ save_checkpoint/6,
+
+ load_purge_infos/4,
+ save_purge_checkpoint/4,
+ purge_docs/4
]).
% Private RPC callbacks
-export([
find_common_seq_rpc/3,
load_checkpoint_rpc/3,
- save_checkpoint_rpc/5
+ save_checkpoint_rpc/5,
+
+ load_purge_infos_rpc/3,
+ save_purge_checkpoint_rpc/3
]).
@@ -58,6 +65,20 @@ find_common_seq(Node, DbName, SourceUUID, SourceEpochs) ->
rexi_call(Node, {mem3_rpc, find_common_seq_rpc, Args}).
+load_purge_infos(Node, DbName, SourceUUID, Count) ->
+ Args = [DbName, SourceUUID, Count],
+ rexi_call(Node, {mem3_rpc, load_purge_infos_rpc, Args}).
+
+
+save_purge_checkpoint(Node, DbName, PurgeDocId, Body) ->
+ Args = [DbName, PurgeDocId, Body],
+ rexi_call(Node, {mem3_rpc, save_purge_checkpoint, Args}).
+
+
+purge_docs(Node, DbName, PurgeInfos, Options) ->
+ rexi_call(Node, {fabric_rpc, purge_docs, [DbName, PurgeInfos, Options]}).
+
+
load_checkpoint_rpc(DbName, SourceNode, SourceUUID) ->
erlang:put(io_priority, {internal_repl, DbName}),
case get_or_create_db(DbName, [?ADMIN_CTX]) of
@@ -128,6 +149,53 @@ find_common_seq_rpc(DbName, SourceUUID, SourceEpochs) ->
end.
+load_purge_infos_rpc(DbName, SrcUUID, BatchSize) ->
+ erlang:put(io_priority, {internal_repl, DbName}),
+ case get_or_create_db(DbName, [?ADMIN_CTX]) of
+ {ok, Db} ->
+ TgtUUID = couch_db:get_uuid(Db),
+ PurgeDocId = mem3_rep:make_purge_id(SrcUUID, TgtUUID),
+ StartSeq = case couch_db:open_doc(Db, PurgeDocId, []) of
+ {ok, #doc{props = {Props}}} ->
+ couch_util:get_value(<<"purge_seq">>, Props);
+ {not_found, _} ->
+ {ok, Oldest} = couch_db:get_oldest_purge_seq(Db),
+ Oldest
+ end,
+ FoldFun = fun({PSeq, UUID, Id, Revs}, {Count, Infos, _}) ->
+ NewCount = Count + length(Revs),
+ NewInfos = [{UUID, Id, Revs} | Infos],
+ Status = if NewCount < BatchSize -> ok; true -> stop end,
+ {Status, {NewCount, NewInfos, PSeq}}
+ end,
+ InitAcc = {0, [], StartSeq},
+ {ok, {_, PurgeInfos, ThroughSeq}} =
+ couch_db:fold_purge_infos(Db, StartSeq, FoldFun, InitAcc),
+ {ok, PurgeSeq} = couch_db:get_purge_seq(Db),
+ Remaining = PurgeSeq - ThroughSeq,
+ rexi:reply({ok, PurgeDocId, PurgeInfos, ThroughSeq, Remainin})
+ Else ->
+ rexi:reply(Else)
+ end.
+
+
+save_purge_checkpoint_rpc(DbName, PurgeDocId, Body) ->
+ erlang:put(io_priority, {internal_repl, DbName}),
+ case get_or_create_db(DbName, [?ADMIN_CTX]) of
+ {ok, Db} ->
+ Doc = #doc{id = PurgeDocId, body = Body},
+ Resp = try couch_db:update_doc(Db, Doc, []) of
+ {ok, _} -> ok;
+ Else -> {error, Else}
+ catch T:R ->
+ {T, R}
+ end,
+ rexi:reply(Resp);
+ Error ->
+ rexi:reply(Error)
+ end.
+
+
%% @doc Return the sequence where two files with the same UUID diverged.
compare_epochs(SourceEpochs, TargetEpochs) ->
compare_rev_epochs(
--
To stop receiving notification emails like this one, please contact
davisp@apache.org.