You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/01 11:11:21 UTC
[35/48] mem3 commit: updated refs/heads/windsor-merge to ff02b9a
Add a new mem3_rpc module for replication RPCs
This is intended to make the local/remote code execution contexts a lot
more clear.
Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/fcbc821b
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/fcbc821b
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/fcbc821b
Branch: refs/heads/windsor-merge
Commit: fcbc821b4f9cd3fa66124860ebce921a8fe428f0
Parents: ade6ab1
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Dec 6 11:55:47 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:27 2014 +0100
----------------------------------------------------------------------
src/mem3_rep.erl | 41 ++++++++++++---------------------
src/mem3_rpc.erl | 64 +++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 79 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/fcbc821b/src/mem3_rep.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rep.erl b/src/mem3_rep.erl
index 9904965..d01eaf3 100644
--- a/src/mem3_rep.erl
+++ b/src/mem3_rep.erl
@@ -133,7 +133,7 @@ calculate_start_seq(Db, #shard{node=Node, name=Name}, LocalId) ->
case couch_db:open_doc(Db, LocalId, [ejson_body]) of
{ok, #doc{body = {SProps}}} ->
Opts = [{user_ctx, ?CTX}, {io_priority, {internal_repl, Name}}],
- try rexi_call(Node, {fabric_rpc, open_doc, [Name, LocalId, Opts]}) of
+ try mem3_rpc:load_checkpoint(Node, Name, LocalId, Opts) of
#doc{body = {TProps}} ->
SourceSeq = couch_util:get_value(<<"seq">>, SProps, 0),
TargetSeq = couch_util:get_value(<<"seq">>, TProps, 0),
@@ -185,14 +185,19 @@ find_missing_revs(Acc) ->
#doc_info{id=Id, revs=RevInfos} = couch_doc:to_doc_info(FDI),
{Id, [R || #rev_info{rev=R} <- RevInfos]}
end, Infos),
- Options = [{io_priority, {internal_repl, Name}}, {user_ctx, ?CTX}],
- rexi_call(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs, Options]}).
+ mem3_rpc:get_missing_revs(Node, Name, IdsRevs, [
+ {io_priority, {internal_repl, Name}},
+ {user_ctx, ?CTX}
+ ]).
save_on_target(Node, Name, Docs) ->
- Options = [replicated_changes, full_commit, {user_ctx, ?CTX},
- {io_priority, {internal_repl, Name}}],
- rexi_call(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
+ mem3_rpc:update_docs(Node, Name, Docs, [
+ replicated_changes,
+ full_commit,
+ {user_ctx, ?CTX},
+ {io_priority, {internal_repl, Name}}
+ ]),
ok.
@@ -219,26 +224,10 @@ update_locals(Acc) ->
{<<"timestamp">>, list_to_binary(iso8601_timestamp())}
]}},
{ok, _} = couch_db:update_doc(Db, Doc, []),
- Options = [{user_ctx, ?CTX}, {io_priority, {internal_repl, Name}}],
- rexi_call(Node, {fabric_rpc, update_docs, [Name, [Doc], Options]}).
-
-
-rexi_call(Node, MFA) ->
- Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]),
- Ref = rexi:cast(Node, self(), MFA, [sync]),
- try
- receive {Ref, {ok, Reply}} ->
- Reply;
- {Ref, Error} ->
- erlang:error(Error);
- {rexi_DOWN, Mon, _, Reason} ->
- erlang:error({rexi_DOWN, {Node, Reason}})
- after 600000 ->
- erlang:error(timeout)
- end
- after
- rexi_monitor:stop(Mon)
- end.
+ mem3_rpc:save_checkpoint(Node, Name, Doc, [
+ {user_ctx, ?CTX},
+ {io_priority, {internal_repl, Name}}
+ ]).
filter_doc(Filter, FullDocInfo) when is_function(Filter) ->
http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/fcbc821b/src/mem3_rpc.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rpc.erl b/src/mem3_rpc.erl
new file mode 100644
index 0000000..d71cc93
--- /dev/null
+++ b/src/mem3_rpc.erl
@@ -0,0 +1,64 @@
+% Copyright 2013 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_rpc).
+
+
+-export([
+ get_missing_revs/4,
+ update_docs/4,
+ load_checkpoint/4,
+ save_checkpoint/4
+]).
+
+
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(CTX, #user_ctx{roles = [<<"_admin">>]}).
+
+
+get_missing_revs(Node, DbName, IdsRevs, Options) ->
+ rexi_call(Node, {fabric_rpc, get_missing_revs, [DbName, IdsRevs, Options]}).
+
+
+update_docs(Node, DbName, Docs, Options) ->
+ rexi_call(Node, {fabric_rpc, update_docs, [DbName, Docs, Options]}).
+
+
+load_checkpoint(Node, DbName, DocId, Opts) ->
+ rexi_call(Node, {fabric_rpc, open_doc, [DbName, DocId, Opts]}).
+
+
+save_checkpoint(Node, DbName, Doc, Options) ->
+ rexi_call(Node, {fabric_rpc, update_docs, [DbName, [Doc], Options]}).
+
+
+rexi_call(Node, MFA) ->
+ Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]),
+ Ref = rexi:cast(Node, self(), MFA, [sync]),
+ try
+ receive {Ref, {ok, Reply}} ->
+ Reply;
+ {Ref, Error} ->
+ erlang:error(Error);
+ {rexi_DOWN, Mon, _, Reason} ->
+ erlang:error({rexi_DOWN, {Node, Reason}})
+ after 600000 ->
+ erlang:error(timeout)
+ end
+ after
+ rexi_monitor:stop(Mon)
+ end.