You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/01 11:11:18 UTC

[32/48] mem3 commit: updated refs/heads/windsor-merge to ff02b9a

Update whitespace and exports formatting


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/27e315be
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/27e315be
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/27e315be

Branch: refs/heads/windsor-merge
Commit: 27e315be18d1dc0584a08a5baac06370cfeca912
Parents: 88a6491
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Dec 6 11:36:47 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:27 2014 +0100

----------------------------------------------------------------------
 src/mem3_rep.erl | 32 +++++++++++++++++++++++++++++++-
 1 file changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/27e315be/src/mem3_rep.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rep.erl b/src/mem3_rep.erl
index 1a528e7..2460f64 100644
--- a/src/mem3_rep.erl
+++ b/src/mem3_rep.erl
@@ -12,13 +12,25 @@
 
 -module(mem3_rep).
 
--export([go/2, go/3, changes_enumerator/3, make_local_id/2]).
+
+-export([
+    go/2,
+    go/3,
+    make_local_id/2
+]).
+
+-export([
+    changes_enumerator/3
+]).
+
 
 -include_lib("mem3/include/mem3.hrl").
 -include_lib("couch/include/couch_db.hrl").
 
+
 -define(CTX, #user_ctx{roles = [<<"_admin">>]}).
 
+
 -record(acc, {
     batch_size,
     batch_count,
@@ -32,12 +44,15 @@
     db
 }).
 
+
 go(Source, Target) ->
     go(Source, Target, []).
 
+
 go(DbName, Node, Opts) when is_binary(DbName), is_atom(Node) ->
     go(#shard{name=DbName, node=node()}, #shard{name=DbName, node=Node}, Opts);
 
+
 go(#shard{} = Source, #shard{} = Target, Opts) ->
     mem3_sync_security:maybe_sync(Source, Target),
     BatchSize = case proplists:get_value(batch_size, Opts) of
@@ -61,6 +76,7 @@ go(#shard{} = Source, #shard{} = Target, Opts) ->
     },
     go(Acc).
 
+
 go(#acc{source=Source, batch_count=BC}=Acc0) ->
     case couch_db:open(Source#shard.name, [{user_ctx,?CTX}]) of
     {ok, Db} ->
@@ -84,6 +100,7 @@ go(#acc{source=Source, batch_count=BC}=Acc0) ->
         {error, missing_source}
     end.
 
+
 repl(#db{name=DbName, seq_tree=Bt}=Db, #acc{localid=LocalId}=Acc0) ->
     erlang:put(io_priority, {internal_repl, DbName}),
     Seq = calculate_start_seq(Db, Acc0#acc.target, LocalId),
@@ -93,9 +110,11 @@ repl(#db{name=DbName, seq_tree=Bt}=Db, #acc{localid=LocalId}=Acc0) ->
     {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
     {ok, couch_db:count_changes_since(Db, LastSeq)}.
 
+
 make_local_id(#shard{}=Source, #shard{}=Target) ->
     make_local_id(Source, Target, undefined).
 
+
 make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}, Filter) ->
     S = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(SourceNode))),
     T = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(TargetNode))),
@@ -109,6 +128,7 @@ make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}, Filter) ->
     end,
     <<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>.
 
+
 changes_enumerator(#doc_info{id=DocId}, Reds, #acc{db=Db}=Acc) ->
     {ok, FDI} = couch_db:get_full_doc_info(Db, DocId),
     changes_enumerator(FDI, Reds, Acc);
@@ -130,6 +150,7 @@ changes_enumerator(#full_doc_info{}=FDI, _,
     Go = if Count < Acc1#acc.batch_size -> ok; true -> stop end,
     {Go, Acc1}.
 
+
 filter_doc(Filter, FullDocInfo) when is_function(Filter) ->
     try Filter(FullDocInfo) of
         discard -> discard;
@@ -140,6 +161,7 @@ filter_doc(Filter, FullDocInfo) when is_function(Filter) ->
 filter_doc(_, _) ->
     keep.
 
+
 replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) ->
     case find_missing_revs(Acc) of
     [] ->
@@ -150,6 +172,7 @@ replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) ->
     update_locals(Acc),
     {ok, Acc#acc{revcount=0, infos=[]}}.
 
+
 find_missing_revs(Acc) ->
     #acc{target = #shard{node=Node, name=Name}, infos = Infos} = Acc,
     IdsRevs = lists:map(fun(FDI) ->
@@ -159,18 +182,21 @@ find_missing_revs(Acc) ->
     Options = [{io_priority, {internal_repl, Name}}, {user_ctx, ?CTX}],
     rexi_call(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs, Options]}).
 
+
 open_docs(#acc{source=Source, infos=Infos}, Missing) ->
     lists:flatmap(fun({Id, Revs, _}) ->
         FDI = lists:keyfind(Id, #full_doc_info.id, Infos),
         open_doc_revs(Source, FDI, Revs)
     end, Missing).
 
+
 save_on_target(Node, Name, Docs) ->
     Options = [replicated_changes, full_commit, {user_ctx, ?CTX},
         {io_priority, {internal_repl, Name}}],
     rexi_call(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
     ok.
 
+
 update_locals(Acc) ->
     #acc{seq=Seq, source=Db, target=Target, localid=Id} = Acc,
     #shard{name=Name, node=Node} = Target,
@@ -183,6 +209,7 @@ update_locals(Acc) ->
     Options = [{user_ctx, ?CTX}, {io_priority, {internal_repl, Name}}],
     rexi_call(Node, {fabric_rpc, update_docs, [Name, [Doc], Options]}).
 
+
 rexi_call(Node, MFA) ->
     Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]),
     Ref = rexi:cast(Node, self(), MFA, [sync]),
@@ -200,6 +227,7 @@ rexi_call(Node, MFA) ->
         rexi_monitor:stop(Mon)
     end.
 
+
 calculate_start_seq(Db, #shard{node=Node, name=Name}, LocalId) ->
     case couch_db:open_doc(Db, LocalId, [ejson_body]) of
     {ok, #doc{body = {SProps}}} ->
@@ -216,12 +244,14 @@ calculate_start_seq(Db, #shard{node=Node, name=Name}, LocalId) ->
         0
     end.
 
+
 open_doc_revs(Db, #full_doc_info{id=Id, rev_tree=RevTree}, Revs) ->
     {FoundRevs, _} = couch_key_tree:get_key_leafs(RevTree, Revs),
     lists:map(fun({#leaf{deleted=IsDel, ptr=SummaryPtr}, FoundRevPath}) ->
                   couch_db:make_doc(Db, Id, IsDel, SummaryPtr, FoundRevPath)
     end, FoundRevs).
 
+
 iso8601_timestamp() ->
     {_,_,Micro} = Now = os:timestamp(),
     {{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now),