You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/28 14:22:50 UTC
[30/50] mem3 commit: updated refs/heads/master to 64c0c74
Reorder functions into a logical progression
This just moves functions around in the mem3_rep module to give a better
logical progression. Purely stylistic but it should make things easier
to read and find.
Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/ade6ab16
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/ade6ab16
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/ade6ab16
Branch: refs/heads/master
Commit: ade6ab16709146f3196505e24067e5151d142148
Parents: 27e315b
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Dec 6 11:42:49 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:27 2014 +0100
----------------------------------------------------------------------
src/mem3_rep.erl | 96 +++++++++++++++++++++++++--------------------------
1 file changed, 48 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/ade6ab16/src/mem3_rep.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rep.erl b/src/mem3_rep.erl
index 2460f64..9904965 100644
--- a/src/mem3_rep.erl
+++ b/src/mem3_rep.erl
@@ -101,16 +101,6 @@ go(#acc{source=Source, batch_count=BC}=Acc0) ->
end.
-repl(#db{name=DbName, seq_tree=Bt}=Db, #acc{localid=LocalId}=Acc0) ->
- erlang:put(io_priority, {internal_repl, DbName}),
- Seq = calculate_start_seq(Db, Acc0#acc.target, LocalId),
- Acc1 = Acc0#acc{source=Db, seq=Seq},
- Fun = fun ?MODULE:changes_enumerator/3,
- {ok, _, Acc2} = couch_btree:fold(Bt, Fun, Acc1, [{start_key, Seq + 1}]),
- {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
- {ok, couch_db:count_changes_since(Db, LastSeq)}.
-
-
make_local_id(#shard{}=Source, #shard{}=Target) ->
make_local_id(Source, Target, undefined).
@@ -129,6 +119,33 @@ make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}, Filter) ->
<<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>.
+repl(#db{name=DbName, seq_tree=Bt}=Db, #acc{localid=LocalId}=Acc0) ->
+ erlang:put(io_priority, {internal_repl, DbName}),
+ Seq = calculate_start_seq(Db, Acc0#acc.target, LocalId),
+ Acc1 = Acc0#acc{source=Db, seq=Seq},
+ Fun = fun ?MODULE:changes_enumerator/3,
+ {ok, _, Acc2} = couch_btree:fold(Bt, Fun, Acc1, [{start_key, Seq + 1}]),
+ {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
+ {ok, couch_db:count_changes_since(Db, LastSeq)}.
+
+
+calculate_start_seq(Db, #shard{node=Node, name=Name}, LocalId) ->
+ case couch_db:open_doc(Db, LocalId, [ejson_body]) of
+ {ok, #doc{body = {SProps}}} ->
+ Opts = [{user_ctx, ?CTX}, {io_priority, {internal_repl, Name}}],
+ try rexi_call(Node, {fabric_rpc, open_doc, [Name, LocalId, Opts]}) of
+ #doc{body = {TProps}} ->
+ SourceSeq = couch_util:get_value(<<"seq">>, SProps, 0),
+ TargetSeq = couch_util:get_value(<<"seq">>, TProps, 0),
+ erlang:min(SourceSeq, TargetSeq)
+ catch error:{not_found, _} ->
+ 0
+ end;
+ {not_found, _} ->
+ 0
+ end.
+
+
changes_enumerator(#doc_info{id=DocId}, Reds, #acc{db=Db}=Acc) ->
{ok, FDI} = couch_db:get_full_doc_info(Db, DocId),
changes_enumerator(FDI, Reds, Acc);
@@ -151,17 +168,6 @@ changes_enumerator(#full_doc_info{}=FDI, _,
{Go, Acc1}.
-filter_doc(Filter, FullDocInfo) when is_function(Filter) ->
- try Filter(FullDocInfo) of
- discard -> discard;
- _ -> keep
- catch _:_ ->
- keep
- end;
-filter_doc(_, _) ->
- keep.
-
-
replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) ->
case find_missing_revs(Acc) of
[] ->
@@ -183,6 +189,13 @@ find_missing_revs(Acc) ->
rexi_call(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs, Options]}).
+save_on_target(Node, Name, Docs) ->
+ Options = [replicated_changes, full_commit, {user_ctx, ?CTX},
+ {io_priority, {internal_repl, Name}}],
+ rexi_call(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
+ ok.
+
+
open_docs(#acc{source=Source, infos=Infos}, Missing) ->
lists:flatmap(fun({Id, Revs, _}) ->
FDI = lists:keyfind(Id, #full_doc_info.id, Infos),
@@ -190,11 +203,11 @@ open_docs(#acc{source=Source, infos=Infos}, Missing) ->
end, Missing).
-save_on_target(Node, Name, Docs) ->
- Options = [replicated_changes, full_commit, {user_ctx, ?CTX},
- {io_priority, {internal_repl, Name}}],
- rexi_call(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
- ok.
+open_doc_revs(Db, #full_doc_info{id=Id, rev_tree=RevTree}, Revs) ->
+ {FoundRevs, _} = couch_key_tree:get_key_leafs(RevTree, Revs),
+ lists:map(fun({#leaf{deleted=IsDel, ptr=SummaryPtr}, FoundRevPath}) ->
+ couch_db:make_doc(Db, Id, IsDel, SummaryPtr, FoundRevPath)
+ end, FoundRevs).
update_locals(Acc) ->
@@ -228,28 +241,15 @@ rexi_call(Node, MFA) ->
end.
-calculate_start_seq(Db, #shard{node=Node, name=Name}, LocalId) ->
- case couch_db:open_doc(Db, LocalId, [ejson_body]) of
- {ok, #doc{body = {SProps}}} ->
- Opts = [{user_ctx, ?CTX}, {io_priority, {internal_repl, Name}}],
- try rexi_call(Node, {fabric_rpc, open_doc, [Name, LocalId, Opts]}) of
- #doc{body = {TProps}} ->
- SourceSeq = couch_util:get_value(<<"seq">>, SProps, 0),
- TargetSeq = couch_util:get_value(<<"seq">>, TProps, 0),
- erlang:min(SourceSeq, TargetSeq)
- catch error:{not_found, _} ->
- 0
- end;
- {not_found, _} ->
- 0
- end.
-
-
-open_doc_revs(Db, #full_doc_info{id=Id, rev_tree=RevTree}, Revs) ->
- {FoundRevs, _} = couch_key_tree:get_key_leafs(RevTree, Revs),
- lists:map(fun({#leaf{deleted=IsDel, ptr=SummaryPtr}, FoundRevPath}) ->
- couch_db:make_doc(Db, Id, IsDel, SummaryPtr, FoundRevPath)
- end, FoundRevs).
+filter_doc(Filter, FullDocInfo) when is_function(Filter) ->
+ try Filter(FullDocInfo) of
+ discard -> discard;
+ _ -> keep
+ catch _:_ ->
+ keep
+ end;
+filter_doc(_, _) ->
+ keep.
iso8601_timestamp() ->