You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/28 14:15:55 UTC

[24/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Fix couch_replicator_mangaer changes feed upgrades

The changes feed readers were unnecessarily holding onto anonymous
functions. This makes it so they dont.

BugzId: 27666


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/0ae9e60e
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/0ae9e60e
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/0ae9e60e

Branch: refs/heads/master
Commit: 0ae9e60eb71ac7336472902379f5e446c34674ac
Parents: e87a923
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Jan 30 19:57:48 2014 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 16:40:22 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator_manager.erl | 71 +++++++++++++++++++----------------
 1 file changed, 39 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/0ae9e60e/src/couch_replicator_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index b0c1c9b..21e759f 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -24,6 +24,9 @@
 -export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]).
 -export([code_change/3, terminate/2]).
 
+% changes callbacks
+-export([changes_reader/3, changes_reader_cb/2]).
+
 % config_listener callback
 -export([handle_config_change/5]).
 
@@ -135,7 +138,7 @@ init(_) ->
     % Automatically start node local changes feed loop
     LocalRepDb = ?l2b(config:get("replicator", "db", "_replicator")),
     ensure_rep_db_exists(LocalRepDb),
-    Pid = changes_feed_loop(LocalRepDb, 0),
+    Pid = start_changes_reader(LocalRepDb, 0),
     {ok, #state{
         event_listener = start_event_listener(),
         scan_pid = ScanPid,
@@ -198,7 +201,7 @@ handle_cast({resume_scan, DbName}, State) ->
         [{DbName, EndSeq}] -> EndSeq
     end,
     ensure_rep_ddoc_exists(DbName),
-    Pid = changes_feed_loop(DbName, Since),
+    Pid = start_changes_reader(DbName, Since),
     couch_log:debug("Scanning ~s from update_seq ~p", [DbName, Since]),
     {noreply, State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}};
 
@@ -273,37 +276,41 @@ terminate(_Reason, State) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-changes_feed_loop(DbName, Since) ->
-    Server = self(),
-    spawn_link(fun() ->
-        UserCtx = #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]},
-        DbOpenOptions = [{user_ctx, UserCtx}, sys_db],
-        {ok, Db} = couch_db:open_int(DbName, DbOpenOptions),
-        ChangesFeedFun = couch_changes:handle_changes(
-            #changes_args{
-                include_docs = true,
-                since = Since,
-                feed = "continuous",
-                timeout = infinity
-            },
-            {json_req, null},
-            Db
-        ),
-        EnumFun = fun
-        ({change, Change, _}, _) ->
-            case has_valid_rep_id(Change) of
-                true ->
-                    Msg = {rep_db_update, DbName, Change},
-                    ok = gen_server:call(Server, Msg, infinity);
-                false ->
-                    ok
-            end;
-        (_, _) ->
-            ok
-        end,
-        ChangesFeedFun(EnumFun)
-    end).
 
+start_changes_reader(DbName, Since) ->
+    spawn_link(?MODULE, changes_reader, [self(), DbName, Since]).
+
+changes_reader(Server, DbName, Since) ->
+    UserCtx = #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]},
+    DbOpenOptions = [{user_ctx, UserCtx}, sys_db],
+    {ok, Db} = couch_db:open_int(DbName, DbOpenOptions),
+    ChangesFeedFun = couch_changes:handle_changes(
+        #changes_args{
+            include_docs = true,
+            since = Since,
+            feed = "continuous",
+            timeout = infinity
+        },
+        {json_req, null},
+        Db
+    ),
+    ChangesFeedFun({fun ?MODULE:changes_reader_cb/2, {Server, DbName}}).
+
+changes_reader_cb({change, Change}, {Server, DbName}) ->
+    case has_valid_rep_id(Change) of
+        true ->
+            Msg = {rep_db_update, DbName, Change},
+            ok = gen_server:call(Server, Msg, infinity);
+        false ->
+            ok
+    end,
+    {ok, {Server, DbName}};
+changes_reader_cb({stop, EndSeq, _Pending}, {Server, DbName}) ->
+    Msg = {rep_db_checkpoint, DbName, EndSeq},
+    ok = gen_server:call(Server, Msg, infinity),
+    {ok, {Server, DbName}};
+changes_reader_cb(_, Acc) ->
+    {ok, Acc}.
 
 has_valid_rep_id({Change}) ->
     has_valid_rep_id(get_json_value(<<"id">>, Change));