You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/01 11:11:18 UTC
[32/48] mem3 commit: updated refs/heads/windsor-merge to ff02b9a
Update whitespace and exports formatting
Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/27e315be
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/27e315be
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/27e315be
Branch: refs/heads/windsor-merge
Commit: 27e315be18d1dc0584a08a5baac06370cfeca912
Parents: 88a6491
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Dec 6 11:36:47 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:27 2014 +0100
----------------------------------------------------------------------
src/mem3_rep.erl | 32 +++++++++++++++++++++++++++++++-
1 file changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/27e315be/src/mem3_rep.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rep.erl b/src/mem3_rep.erl
index 1a528e7..2460f64 100644
--- a/src/mem3_rep.erl
+++ b/src/mem3_rep.erl
@@ -12,13 +12,25 @@
-module(mem3_rep).
--export([go/2, go/3, changes_enumerator/3, make_local_id/2]).
+
+-export([
+ go/2,
+ go/3,
+ make_local_id/2
+]).
+
+-export([
+ changes_enumerator/3
+]).
+
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
+
-define(CTX, #user_ctx{roles = [<<"_admin">>]}).
+
-record(acc, {
batch_size,
batch_count,
@@ -32,12 +44,15 @@
db
}).
+
go(Source, Target) ->
go(Source, Target, []).
+
go(DbName, Node, Opts) when is_binary(DbName), is_atom(Node) ->
go(#shard{name=DbName, node=node()}, #shard{name=DbName, node=Node}, Opts);
+
go(#shard{} = Source, #shard{} = Target, Opts) ->
mem3_sync_security:maybe_sync(Source, Target),
BatchSize = case proplists:get_value(batch_size, Opts) of
@@ -61,6 +76,7 @@ go(#shard{} = Source, #shard{} = Target, Opts) ->
},
go(Acc).
+
go(#acc{source=Source, batch_count=BC}=Acc0) ->
case couch_db:open(Source#shard.name, [{user_ctx,?CTX}]) of
{ok, Db} ->
@@ -84,6 +100,7 @@ go(#acc{source=Source, batch_count=BC}=Acc0) ->
{error, missing_source}
end.
+
repl(#db{name=DbName, seq_tree=Bt}=Db, #acc{localid=LocalId}=Acc0) ->
erlang:put(io_priority, {internal_repl, DbName}),
Seq = calculate_start_seq(Db, Acc0#acc.target, LocalId),
@@ -93,9 +110,11 @@ repl(#db{name=DbName, seq_tree=Bt}=Db, #acc{localid=LocalId}=Acc0) ->
{ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
{ok, couch_db:count_changes_since(Db, LastSeq)}.
+
make_local_id(#shard{}=Source, #shard{}=Target) ->
make_local_id(Source, Target, undefined).
+
make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}, Filter) ->
S = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(SourceNode))),
T = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(TargetNode))),
@@ -109,6 +128,7 @@ make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}, Filter) ->
end,
<<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>.
+
changes_enumerator(#doc_info{id=DocId}, Reds, #acc{db=Db}=Acc) ->
{ok, FDI} = couch_db:get_full_doc_info(Db, DocId),
changes_enumerator(FDI, Reds, Acc);
@@ -130,6 +150,7 @@ changes_enumerator(#full_doc_info{}=FDI, _,
Go = if Count < Acc1#acc.batch_size -> ok; true -> stop end,
{Go, Acc1}.
+
filter_doc(Filter, FullDocInfo) when is_function(Filter) ->
try Filter(FullDocInfo) of
discard -> discard;
@@ -140,6 +161,7 @@ filter_doc(Filter, FullDocInfo) when is_function(Filter) ->
filter_doc(_, _) ->
keep.
+
replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) ->
case find_missing_revs(Acc) of
[] ->
@@ -150,6 +172,7 @@ replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) ->
update_locals(Acc),
{ok, Acc#acc{revcount=0, infos=[]}}.
+
find_missing_revs(Acc) ->
#acc{target = #shard{node=Node, name=Name}, infos = Infos} = Acc,
IdsRevs = lists:map(fun(FDI) ->
@@ -159,18 +182,21 @@ find_missing_revs(Acc) ->
Options = [{io_priority, {internal_repl, Name}}, {user_ctx, ?CTX}],
rexi_call(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs, Options]}).
+
open_docs(#acc{source=Source, infos=Infos}, Missing) ->
lists:flatmap(fun({Id, Revs, _}) ->
FDI = lists:keyfind(Id, #full_doc_info.id, Infos),
open_doc_revs(Source, FDI, Revs)
end, Missing).
+
save_on_target(Node, Name, Docs) ->
Options = [replicated_changes, full_commit, {user_ctx, ?CTX},
{io_priority, {internal_repl, Name}}],
rexi_call(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
ok.
+
update_locals(Acc) ->
#acc{seq=Seq, source=Db, target=Target, localid=Id} = Acc,
#shard{name=Name, node=Node} = Target,
@@ -183,6 +209,7 @@ update_locals(Acc) ->
Options = [{user_ctx, ?CTX}, {io_priority, {internal_repl, Name}}],
rexi_call(Node, {fabric_rpc, update_docs, [Name, [Doc], Options]}).
+
rexi_call(Node, MFA) ->
Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]),
Ref = rexi:cast(Node, self(), MFA, [sync]),
@@ -200,6 +227,7 @@ rexi_call(Node, MFA) ->
rexi_monitor:stop(Mon)
end.
+
calculate_start_seq(Db, #shard{node=Node, name=Name}, LocalId) ->
case couch_db:open_doc(Db, LocalId, [ejson_body]) of
{ok, #doc{body = {SProps}}} ->
@@ -216,12 +244,14 @@ calculate_start_seq(Db, #shard{node=Node, name=Name}, LocalId) ->
0
end.
+
open_doc_revs(Db, #full_doc_info{id=Id, rev_tree=RevTree}, Revs) ->
{FoundRevs, _} = couch_key_tree:get_key_leafs(RevTree, Revs),
lists:map(fun({#leaf{deleted=IsDel, ptr=SummaryPtr}, FoundRevPath}) ->
couch_db:make_doc(Db, Id, IsDel, SummaryPtr, FoundRevPath)
end, FoundRevs).
+
iso8601_timestamp() ->
{_,_,Micro} = Now = os:timestamp(),
{{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now),