You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2018/03/26 20:55:51 UTC
[couchdb] 20/33: WIP - internal replication
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch COUCHDB-3326-clustered-purge-davisp-refactor
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit e590bdf407eb2952af7b357615e93910e676781b
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Mon Mar 26 13:07:57 2018 -0500
WIP - internal replication
---
src/mem3/src/mem3_rep.erl | 35 ++++++++++++++++++++++-------------
src/mem3/src/mem3_rpc.erl | 4 ++--
2 files changed, 24 insertions(+), 15 deletions(-)
diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl
index 338d20e..edac76d 100644
--- a/src/mem3/src/mem3_rep.erl
+++ b/src/mem3/src/mem3_rep.erl
@@ -76,7 +76,7 @@ go(#shard{} = Source, #shard{} = Target, Opts) ->
go(Acc).
-go(#acc{source=Source, batch_count=BC}=Acc0) ->
+go(#acc{source=Source, batch_count=BC}=Acc) ->
case couch_db:open(Source#shard.name, [?ADMIN_CTX]) of
{ok, Db} ->
Resp = try
@@ -200,9 +200,9 @@ find_source_seq_int(#doc{body={Props}}, SrcNode0, TgtNode0, TgtUUID, TgtSeq) ->
end.
-repl(Acc0#acc{db = Db0}) ->
+repl(#acc{db = Db0} = Acc0) ->
erlang:put(io_priority, {internal_repl, couch_db:name(Db0)}),
- #acc{seq = Seq} = Acc1 = calculate_start_seq(Acc0),
+ Acc1 = calculate_start_seq(Acc0),
try
Acc3 = case config:get_boolean("mem3", "replicate_purges", false) of
true ->
@@ -229,10 +229,10 @@ pull_purges(#acc{} = Acc0) ->
name = TgtDbName
} = Target,
- {Acc2, RemToPull} = with_src_db(Acc0, fun(Db) ->
+ with_src_db(Acc0, fun(Db) ->
SrcUUID = couch_db:get_uuid(Db),
{ok, LocalPurgeId, Infos, ThroughSeq, Remaining} =
- mem3_rpc:load_purge_infos(TNode, DbName, SrcUUID, Count),
+ mem3_rpc:load_purge_infos(TgtNode, TgtDbName, SrcUUID, Count),
if Infos == [] -> ok; true ->
{ok, _} = couch_db:purge_docs(Db, Infos, [replicated_edits]),
@@ -246,7 +246,7 @@ pull_purges(#acc{} = Acc0) ->
PurgesToPush = couch_db:get_purge_seq(Db) - OldestPurgeSeq,
Changes = couch_db:count_changes_since(Db, UpdateSeq),
throw({finished, Remaining + PurgesToPush + Changes})
- end
+ end,
Acc0#acc{purgeid = LocalPurgeId}
end).
@@ -265,13 +265,21 @@ push_purges(#acc{} = Acc0) ->
} = Target,
with_src_db(Acc0, fun(Db) ->
+ StartSeq = case couch_db:open_doc(Db, LocalPurgeId, []) of
+ {ok, #doc{body = {Props}}} ->
+ couch_util:get_value(<<"purge_seq">>, Props);
+ {not_found, _} ->
+ {ok, Oldest} = couch_db:get_oldest_purge_seq(Db),
+ Oldest
+ end,
+
FoldFun = fun({PSeq, UUID, Id, Revs}, {Count, Infos, _}) ->
NewCount = Count + length(Revs),
NewInfos = [{UUID, Id, Revs} | Infos],
Status = if NewCount < BatchSize -> ok; true -> stop end,
{Status, {NewCount, NewInfos, PSeq}}
end,
- InitAcc = {0, [], StartSeq}
+ InitAcc = {0, [], StartSeq},
{ok, {_, Infos, ThroughSeq}} =
couch_db:fold_purge_infos(Db, StartSeq, FoldFun, InitAcc),
@@ -303,12 +311,13 @@ push_changes(#acc{} = Acc0) ->
% Avoid needless rewriting the internal replication
% checkpoint document if nothing is replicated.
- if Seq < couch_db:get_update_seq(Db0) -> ok; true ->
+ UpdateSeq = couch_db:get_update_seq(Db0),
+ if Seq < UpdateSeq -> ok; true ->
throw({finished, 0})
- end
+ end,
with_src_db(Acc0, fun(Db) ->
- Acc1 = Acc#acc{db = Db},
+ Acc1 = Acc0#acc{db = Db},
Fun = fun ?MODULE:changes_enumerator/2,
{ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1),
{ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
@@ -485,15 +494,15 @@ purge_cp_body(#acc{} = Acc, PurgeSeq) ->
} = Acc,
{Mega, Secs, _} = os:timestamp(),
NowSecs = Mega * 1000000 + Secs,
- Body = {[
+ {[
{<<"type">>, <<"internal_replication">>},
{<<"updated_on">>, NowSecs},
{<<"purge_seq">>, PurgeSeq},
{<<"verify_module">>, <<"mem3_rep">>},
{<<"verify_function">>, <<"verify_purge_checkpoint">>},
{<<"dbname">>, Source#shard.dbname},
- {<<"source">>, atom_to_binary(Source#shard.node)},
- {<<"target">>, atom_to_binary(Target#shard.node)},
+ {<<"source">>, atom_to_binary(Source#shard.node, latin1)},
+ {<<"target">>, atom_to_binary(Target#shard.node, latin1)},
{<<"range">>, Source#shard.range}
]}.
diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl
index ab293d7..b0b0098 100644
--- a/src/mem3/src/mem3_rpc.erl
+++ b/src/mem3/src/mem3_rpc.erl
@@ -156,7 +156,7 @@ load_purge_infos_rpc(DbName, SrcUUID, BatchSize) ->
TgtUUID = couch_db:get_uuid(Db),
PurgeDocId = mem3_rep:make_purge_id(SrcUUID, TgtUUID),
StartSeq = case couch_db:open_doc(Db, PurgeDocId, []) of
- {ok, #doc{props = {Props}}} ->
+ {ok, #doc{body = {Props}}} ->
couch_util:get_value(<<"purge_seq">>, Props);
{not_found, _} ->
{ok, Oldest} = couch_db:get_oldest_purge_seq(Db),
@@ -173,7 +173,7 @@ load_purge_infos_rpc(DbName, SrcUUID, BatchSize) ->
couch_db:fold_purge_infos(Db, StartSeq, FoldFun, InitAcc),
{ok, PurgeSeq} = couch_db:get_purge_seq(Db),
Remaining = PurgeSeq - ThroughSeq,
- rexi:reply({ok, PurgeDocId, PurgeInfos, ThroughSeq, Remainin})
+ rexi:reply({ok, PurgeDocId, PurgeInfos, ThroughSeq, Remaining});
Else ->
rexi:reply(Else)
end.
--
To stop receiving notification emails like this one, please contact
davisp@apache.org.