You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/28 14:15:55 UTC
[24/37] couch-replicator commit: updated refs/heads/master to aafb5f9
Fix couch_replicator_mangaer changes feed upgrades
The changes feed readers were unnecessarily holding onto anonymous
functions. This makes it so they dont.
BugzId: 27666
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/0ae9e60e
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/0ae9e60e
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/0ae9e60e
Branch: refs/heads/master
Commit: 0ae9e60eb71ac7336472902379f5e446c34674ac
Parents: e87a923
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Jan 30 19:57:48 2014 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 16:40:22 2014 +0100
----------------------------------------------------------------------
src/couch_replicator_manager.erl | 71 +++++++++++++++++++----------------
1 file changed, 39 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/0ae9e60e/src/couch_replicator_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index b0c1c9b..21e759f 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -24,6 +24,9 @@
-export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]).
-export([code_change/3, terminate/2]).
+% changes callbacks
+-export([changes_reader/3, changes_reader_cb/2]).
+
% config_listener callback
-export([handle_config_change/5]).
@@ -135,7 +138,7 @@ init(_) ->
% Automatically start node local changes feed loop
LocalRepDb = ?l2b(config:get("replicator", "db", "_replicator")),
ensure_rep_db_exists(LocalRepDb),
- Pid = changes_feed_loop(LocalRepDb, 0),
+ Pid = start_changes_reader(LocalRepDb, 0),
{ok, #state{
event_listener = start_event_listener(),
scan_pid = ScanPid,
@@ -198,7 +201,7 @@ handle_cast({resume_scan, DbName}, State) ->
[{DbName, EndSeq}] -> EndSeq
end,
ensure_rep_ddoc_exists(DbName),
- Pid = changes_feed_loop(DbName, Since),
+ Pid = start_changes_reader(DbName, Since),
couch_log:debug("Scanning ~s from update_seq ~p", [DbName, Since]),
{noreply, State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}};
@@ -273,37 +276,41 @@ terminate(_Reason, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-changes_feed_loop(DbName, Since) ->
- Server = self(),
- spawn_link(fun() ->
- UserCtx = #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]},
- DbOpenOptions = [{user_ctx, UserCtx}, sys_db],
- {ok, Db} = couch_db:open_int(DbName, DbOpenOptions),
- ChangesFeedFun = couch_changes:handle_changes(
- #changes_args{
- include_docs = true,
- since = Since,
- feed = "continuous",
- timeout = infinity
- },
- {json_req, null},
- Db
- ),
- EnumFun = fun
- ({change, Change, _}, _) ->
- case has_valid_rep_id(Change) of
- true ->
- Msg = {rep_db_update, DbName, Change},
- ok = gen_server:call(Server, Msg, infinity);
- false ->
- ok
- end;
- (_, _) ->
- ok
- end,
- ChangesFeedFun(EnumFun)
- end).
+start_changes_reader(DbName, Since) ->
+ spawn_link(?MODULE, changes_reader, [self(), DbName, Since]).
+
+changes_reader(Server, DbName, Since) ->
+ UserCtx = #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]},
+ DbOpenOptions = [{user_ctx, UserCtx}, sys_db],
+ {ok, Db} = couch_db:open_int(DbName, DbOpenOptions),
+ ChangesFeedFun = couch_changes:handle_changes(
+ #changes_args{
+ include_docs = true,
+ since = Since,
+ feed = "continuous",
+ timeout = infinity
+ },
+ {json_req, null},
+ Db
+ ),
+ ChangesFeedFun({fun ?MODULE:changes_reader_cb/2, {Server, DbName}}).
+
+changes_reader_cb({change, Change}, {Server, DbName}) ->
+ case has_valid_rep_id(Change) of
+ true ->
+ Msg = {rep_db_update, DbName, Change},
+ ok = gen_server:call(Server, Msg, infinity);
+ false ->
+ ok
+ end,
+ {ok, {Server, DbName}};
+changes_reader_cb({stop, EndSeq, _Pending}, {Server, DbName}) ->
+ Msg = {rep_db_checkpoint, DbName, EndSeq},
+ ok = gen_server:call(Server, Msg, infinity),
+ {ok, {Server, DbName}};
+changes_reader_cb(_, Acc) ->
+ {ok, Acc}.
has_valid_rep_id({Change}) ->
has_valid_rep_id(get_json_value(<<"id">>, Change));