You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/01 11:11:21 UTC

[35/48] mem3 commit: updated refs/heads/windsor-merge to ff02b9a

Add a new mem3_rpc module for replication RPCs

This is intended to make the local/remote code execution contexts a lot
more clear.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/fcbc821b
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/fcbc821b
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/fcbc821b

Branch: refs/heads/windsor-merge
Commit: fcbc821b4f9cd3fa66124860ebce921a8fe428f0
Parents: ade6ab1
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Dec 6 11:55:47 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:27 2014 +0100

----------------------------------------------------------------------
 src/mem3_rep.erl | 41 ++++++++++++---------------------
 src/mem3_rpc.erl | 64 +++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 79 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/fcbc821b/src/mem3_rep.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rep.erl b/src/mem3_rep.erl
index 9904965..d01eaf3 100644
--- a/src/mem3_rep.erl
+++ b/src/mem3_rep.erl
@@ -133,7 +133,7 @@ calculate_start_seq(Db, #shard{node=Node, name=Name}, LocalId) ->
     case couch_db:open_doc(Db, LocalId, [ejson_body]) of
     {ok, #doc{body = {SProps}}} ->
         Opts = [{user_ctx, ?CTX}, {io_priority, {internal_repl, Name}}],
-        try rexi_call(Node, {fabric_rpc, open_doc, [Name, LocalId, Opts]}) of
+        try mem3_rpc:load_checkpoint(Node, Name, LocalId, Opts) of
         #doc{body = {TProps}} ->
             SourceSeq = couch_util:get_value(<<"seq">>, SProps, 0),
             TargetSeq = couch_util:get_value(<<"seq">>, TProps, 0),
@@ -185,14 +185,19 @@ find_missing_revs(Acc) ->
         #doc_info{id=Id, revs=RevInfos} = couch_doc:to_doc_info(FDI),
         {Id, [R || #rev_info{rev=R} <- RevInfos]}
     end, Infos),
-    Options = [{io_priority, {internal_repl, Name}}, {user_ctx, ?CTX}],
-    rexi_call(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs, Options]}).
+    mem3_rpc:get_missing_revs(Node, Name, IdsRevs, [
+        {io_priority, {internal_repl, Name}},
+        {user_ctx, ?CTX}
+    ]).
 
 
 save_on_target(Node, Name, Docs) ->
-    Options = [replicated_changes, full_commit, {user_ctx, ?CTX},
-        {io_priority, {internal_repl, Name}}],
-    rexi_call(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
+    mem3_rpc:update_docs(Node, Name, Docs, [
+        replicated_changes,
+        full_commit,
+        {user_ctx, ?CTX},
+        {io_priority, {internal_repl, Name}}
+    ]),
     ok.
 
 
@@ -219,26 +224,10 @@ update_locals(Acc) ->
         {<<"timestamp">>, list_to_binary(iso8601_timestamp())}
     ]}},
     {ok, _} = couch_db:update_doc(Db, Doc, []),
-    Options = [{user_ctx, ?CTX}, {io_priority, {internal_repl, Name}}],
-    rexi_call(Node, {fabric_rpc, update_docs, [Name, [Doc], Options]}).
-
-
-rexi_call(Node, MFA) ->
-    Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]),
-    Ref = rexi:cast(Node, self(), MFA, [sync]),
-    try
-        receive {Ref, {ok, Reply}} ->
-            Reply;
-        {Ref, Error} ->
-            erlang:error(Error);
-        {rexi_DOWN, Mon, _, Reason} ->
-            erlang:error({rexi_DOWN, {Node, Reason}})
-        after 600000 ->
-            erlang:error(timeout)
-        end
-    after
-        rexi_monitor:stop(Mon)
-    end.
+    mem3_rpc:save_checkpoint(Node, Name, Doc, [
+        {user_ctx, ?CTX},
+        {io_priority, {internal_repl, Name}}
+    ]).
 
 
 filter_doc(Filter, FullDocInfo) when is_function(Filter) ->

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/fcbc821b/src/mem3_rpc.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rpc.erl b/src/mem3_rpc.erl
new file mode 100644
index 0000000..d71cc93
--- /dev/null
+++ b/src/mem3_rpc.erl
@@ -0,0 +1,64 @@
+% Copyright 2013 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_rpc).
+
+
+-export([
+    get_missing_revs/4,
+    update_docs/4,
+    load_checkpoint/4,
+    save_checkpoint/4
+]).
+
+
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(CTX, #user_ctx{roles = [<<"_admin">>]}).
+
+
+get_missing_revs(Node, DbName, IdsRevs, Options) ->
+    rexi_call(Node, {fabric_rpc, get_missing_revs, [DbName, IdsRevs, Options]}).
+
+
+update_docs(Node, DbName, Docs, Options) ->
+    rexi_call(Node, {fabric_rpc, update_docs, [DbName, Docs, Options]}).
+
+
+load_checkpoint(Node, DbName, DocId, Opts) ->
+    rexi_call(Node, {fabric_rpc, open_doc, [DbName, DocId, Opts]}).
+
+
+save_checkpoint(Node, DbName, Doc, Options) ->
+    rexi_call(Node, {fabric_rpc, update_docs, [DbName, [Doc], Options]}).
+
+
+rexi_call(Node, MFA) ->
+    Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]),
+    Ref = rexi:cast(Node, self(), MFA, [sync]),
+    try
+        receive {Ref, {ok, Reply}} ->
+            Reply;
+        {Ref, Error} ->
+            erlang:error(Error);
+        {rexi_DOWN, Mon, _, Reason} ->
+            erlang:error({rexi_DOWN, {Node, Reason}})
+        after 600000 ->
+            erlang:error(timeout)
+        end
+    after
+        rexi_monitor:stop(Mon)
+    end.