You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2015/11/12 12:07:45 UTC

couch-replicator commit: updated refs/heads/master to 3f268ab

Repository: couchdb-couch-replicator
Updated Branches:
  refs/heads/master 437c6571f -> 3f268abba


Fix couch_replicator_manager rescans

When couch_replicator_manager starts it scans every _replicator database
looking for replications to start. When it starts the replication it
modifies a document in the _replicator database. This change ends up
sending a message back to couch_replicator_manager to rescan the
database. This message to rescan the database had no protection to be
unique. This would result in many processes re-scanning the same
database over and over.

To fix this we track the DbName for every scanning process so that if we
get a change to a database we can ignore the change because a scanner
pid is already running. However we also have to track if we need to
restart the scanning pid when it finishes so that we ensure that we
process any changes that occurred during the scan.

COUCHDB-2878


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/3f268abb
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/3f268abb
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/3f268abb

Branch: refs/heads/master
Commit: 3f268abba89bd5b93f43185465e66ef42b3876ad
Parents: 437c657
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Nov 4 18:48:07 2015 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Nov 12 10:53:00 2015 +0000

----------------------------------------------------------------------
 src/couch_replicator_manager.erl | 73 +++++++++++++++++++++++++----------
 1 file changed, 52 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/3f268abb/src/couch_replicator_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index c6f7960..f0ac7a8 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -220,7 +220,13 @@ handle_call({rep_error, RepId, Error}, _From, State) ->
     {reply, ok, replication_error(State, RepId, Error)};
 
 handle_call({rep_db_checkpoint, DbName, EndSeq}, _From, State) ->
-    true = ets:insert(?DB_TO_SEQ, {DbName, EndSeq}),
+    Entry = case ets:lookup(?DB_TO_SEQ, DbName) of
+        [] ->
+            {DbName, EndSeq, false};
+        [{DbName, _OldSeq, Rescan}] ->
+            {DbName, EndSeq, Rescan}
+    end,
+    true = ets:insert(?DB_TO_SEQ, Entry),
     {reply, ok, State};
 
 handle_call(rep_db_changed, _From, State) ->
@@ -232,14 +238,29 @@ handle_call(Msg, From, State) ->
     {stop, {error, {unexpected_call, Msg}}, State}.
 
 handle_cast({resume_scan, DbName}, State) ->
-    Since = case ets:lookup(?DB_TO_SEQ, DbName) of
-        [] -> 0;
-        [{DbName, EndSeq}] -> EndSeq
+    Pids = State#state.rep_start_pids,
+    NewPids = case lists:keyfind(DbName, 1, Pids) of
+        {DbName, _Pid} ->
+            Entry = case ets:lookup(?DB_TO_SEQ, DbName) of
+                [] ->
+                    {DbName, 0, true};
+                [{DbName, EndSeq, _Rescan}] ->
+                    {DbName, EndSeq, true}
+            end,
+            true = ets:insert(?DB_TO_SEQ, Entry),
+            Pids;
+        false ->
+            Since = case ets:lookup(?DB_TO_SEQ, DbName) of
+                [] -> 0;
+                [{DbName, EndSeq, _Rescan}] -> EndSeq
+            end,
+            true = ets:insert(?DB_TO_SEQ, {DbName, Since, false}),
+            ensure_rep_ddoc_exists(DbName),
+            Pid = start_changes_reader(DbName, Since),
+            couch_log:debug("Scanning ~s from update_seq ~p", [DbName, Since]),
+            [{DbName, Pid} | Pids]
     end,
-    ensure_rep_ddoc_exists(DbName),
-    Pid = start_changes_reader(DbName, Since),
-    couch_log:debug("Scanning ~s from update_seq ~p", [DbName, Since]),
-    {noreply, State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}};
+    {noreply, State#state{rep_start_pids = NewPids}};
 
 handle_cast({set_max_retries, MaxRetries}, State) ->
     {noreply, State#state{max_retries = MaxRetries}};
@@ -270,16 +291,22 @@ handle_info({'EXIT', From, Reason}, #state{event_listener = From} = State) ->
     couch_log:error("Database update notifier died. Reason: ~p", [Reason]),
     {stop, {db_update_notifier_died, Reason}, State};
 
-handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) ->
-    % one of the replication start processes terminated successfully
-    {noreply, State#state{rep_start_pids = Pids -- [From]}};
-
 handle_info({'EXIT', From, Reason}, #state{rep_start_pids = Pids} = State) ->
-    case lists:member(From, Pids) of
-        true ->
-            Fmt = "~s : Known replication pid ~w died :: ~w",
-            couch_log:error(Fmt, [?MODULE, From, Reason]),
-            {noreply, State#state{rep_start_pids = Pids -- [From]}};
+    case lists:keytake(From, 2, Pids) of
+        {value, {DbName, From}, NewPids} ->
+            if Reason == normal -> ok; true ->
+                Fmt = "~s : Known replication pid ~w died :: ~w",
+                couch_log:error(Fmt, [?MODULE, From, Reason])
+            end,
+            NewState = State#state{rep_start_pids = NewPids},
+            case ets:lookup(?DB_TO_SEQ, DbName) of
+                [{DbName, _EndSeq, true}] ->
+                    handle_cast({resume_scan, DbName}, NewState);
+                _ ->
+                    {noreply, NewState}
+            end;
+        false when Reason == normal ->
+            {noreply, State};
         false ->
             Fmt = "~s : Unknown pid ~w died :: ~w",
             couch_log:error(Fmt, [?MODULE, From, Reason]),
@@ -306,11 +333,11 @@ terminate(_Reason, State) ->
     } = State,
     stop_all_replications(),
     lists:foreach(
-        fun(Pid) ->
+        fun({_Tag, Pid}) ->
             catch unlink(Pid),
             catch exit(Pid, stop)
         end,
-        [ScanPid | StartPids]),
+        [{scanner, ScanPid} | StartPids]),
     true = ets:delete(?REP_TO_STATE),
     true = ets:delete(?DOC_TO_REP),
     true = ets:delete(?DB_TO_SEQ),
@@ -496,7 +523,9 @@ maybe_start_replication(State, DbName, DocId, RepDoc) ->
         couch_log:notice("Delaying replication `~s` start by ~p seconds.",
             [pp_rep_id(RepId), DelaySecs]),
         Pid = spawn_link(?MODULE, start_replication, [Rep, DelaySecs]),
-        State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
+        State#state{
+            rep_start_pids = [{rep_start, Pid} | State#state.rep_start_pids]
+        };
     #rep_state{rep = #rep{doc_id = DocId}} ->
         State;
     #rep_state{starting = false, rep = #rep{db_name = DbName, doc_id = OtherDocId}} ->
@@ -612,7 +641,9 @@ maybe_retry_replication(RepState, Error, State) ->
         "~nRestarting replication in ~p seconds.",
         [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), Wait]),
     Pid = spawn_link(?MODULE, start_replication, [Rep, Wait]),
-    State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}.
+    State#state{
+        rep_start_pids = [{rep_start, Pid} | State#state.rep_start_pids]
+    }.
 
 
 stop_all_replications() ->