You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/28 14:15:41 UTC

[10/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Move changes_reader to a dedicated module

Also, export read_changes for better upgrade handling.

BugzID: 24294


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/7e286402
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/7e286402
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/7e286402

Branch: refs/heads/master
Commit: 7e28640275f9063ff80d19ccd3cd722f6fe0865a
Parents: 44328a6
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Fri Oct 18 11:31:59 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 14:16:03 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator.erl                |  81 +--------------------
 src/couch_replicator_changes_reader.erl | 105 +++++++++++++++++++++++++++
 2 files changed, 108 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/7e286402/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index bb81cf0..5891084 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -252,7 +252,9 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
     ]),
     % This starts the _changes reader process. It adds the changes from
     % the source db to the ChangesQueue.
-    ChangesReader = spawn_changes_reader(StartSeq, Source, ChangesQueue, Options),
+    {ok, ChangesReader} = couch_replicator_changes_reader:start_link(
+        StartSeq, Source, ChangesQueue, Options
+    ),
     % Changes manager - responsible for dequeing batches from the changes queue
     % and deliver them to the worker processes.
     ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize),
@@ -624,83 +626,6 @@ fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
     end.
 
 
-spawn_changes_reader(StartSeq, #httpdb{} = Db, ChangesQueue, Options) ->
-    Parent = self(),
-    spawn_link(fun() ->
-        put(last_seq, StartSeq),
-        put(retries_left, Db#httpdb.retries),
-        read_changes(Parent, StartSeq, Db#httpdb{retries = 0}, ChangesQueue, Options, 1)
-    end);
-spawn_changes_reader(StartSeq, Db, ChangesQueue, Options) ->
-    Parent = self(),
-    spawn_link(fun() ->
-        read_changes(Parent, StartSeq, Db, ChangesQueue, Options, 1)
-    end).
-
-read_changes(Parent, StartSeq, Db, ChangesQueue, Options, Ts) ->
-    try
-        couch_replicator_api_wrap:changes_since(Db, all_docs, StartSeq,
-            fun(#doc_info{high_seq = Seq, id = Id} = DocInfo) ->
-                case Id of
-                <<>> ->
-                    % Previous CouchDB releases had a bug which allowed a doc
-                    % with an empty ID to be inserted into databases. Such doc
-                    % is impossible to GET.
-                    couch_log:error("Replicator: ignoring document with empty ID in "
-                        "source database `~s` (_changes sequence ~p)",
-                        [couch_replicator_api_wrap:db_uri(Db), Seq]);
-                _ ->
-                    ok = couch_work_queue:queue(ChangesQueue, DocInfo)
-                end,
-                put(last_seq, Seq);
-            ({last_seq, LS}) ->
-                case get_value(continuous, Options) of
-                true ->
-                    % LS should never be undefined, but it doesn't hurt to be
-                    % defensive inside the replicator.
-                    Seq = case LS of undefined -> get(last_seq); _ -> LS end,
-                    OldSeq = get(last_seq),
-                    if Seq == OldSeq -> ok; true ->
-                        Msg = {report_seq_done, {Ts, Seq}, #rep_stats{}},
-                        ok = gen_server:call(Parent, Msg, infinity)
-                    end,
-                    put(last_seq, Seq),
-                    throw(recurse);
-                _ ->
-                    % This clause is unreachable today, but let's plan ahead
-                    % for the future where we checkpoint against last_seq
-                    % instead of the sequence of the last change.  The two can
-                    % differ substantially in the case of a restrictive filter.
-                    ok
-                end
-            end, Options),
-        couch_work_queue:close(ChangesQueue)
-    catch
-        throw:recurse ->
-            LS = get(last_seq),
-            read_changes(Parent, LS, Db, ChangesQueue, Options, Ts+1);
-        exit:{http_request_failed, _, _, _} = Error ->
-        case get(retries_left) of
-        N when N > 0 ->
-            put(retries_left, N - 1),
-            LastSeq = get(last_seq),
-            Db2 = case LastSeq of
-            StartSeq ->
-                couch_log:notice("Retrying _changes request to source database ~s"
-                    " with since=~p in ~p seconds",
-                    [couch_replicator_api_wrap:db_uri(Db), LastSeq, Db#httpdb.wait / 1000]),
-                ok = timer:sleep(Db#httpdb.wait),
-                Db#httpdb{wait = 2 * Db#httpdb.wait};
-            _ ->
-                couch_log:notice("Retrying _changes request to source database ~s"
-                    " with since=~p", [couch_replicator_api_wrap:db_uri(Db), LastSeq]),
-                Db
-            end,
-            read_changes(Parent, LastSeq, Db2, ChangesQueue, Options, Ts);
-        _ ->
-            exit(Error)
-        end
-    end.
 
 
 spawn_changes_manager(Parent, ChangesQueue, BatchSize) ->

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/7e286402/src/couch_replicator_changes_reader.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_changes_reader.erl b/src/couch_replicator_changes_reader.erl
new file mode 100644
index 0000000..390a87e
--- /dev/null
+++ b/src/couch_replicator_changes_reader.erl
@@ -0,0 +1,105 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_changes_reader).
+
+% Public API
+-export([start_link/4]).
+
+% Exported for code reloading
+-export([read_changes/6]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator_api_wrap.hrl").
+-include("couch_replicator.hrl").
+
+-import(couch_util, [
+    get_value/2
+]).
+
+start_link(StartSeq, #httpdb{} = Db, ChangesQueue, Options) ->
+    Parent = self(),
+    spawn_link(fun() ->
+        put(last_seq, StartSeq),
+        put(retries_left, Db#httpdb.retries),
+        ?MODULE:read_changes(Parent, StartSeq, Db#httpdb{retries = 0}, ChangesQueue, Options, 1)
+    end);
+start_link(StartSeq, Db, ChangesQueue, Options) ->
+    Parent = self(),
+    spawn_link(fun() ->
+        ?MODULE:read_changes(Parent, StartSeq, Db, ChangesQueue, Options, 1)
+    end).
+
+read_changes(Parent, StartSeq, Db, ChangesQueue, Options, Ts) ->
+    try
+        couch_replicator_api_wrap:changes_since(Db, all_docs, StartSeq,
+            fun(#doc_info{high_seq = Seq, id = Id} = DocInfo) ->
+                case Id of
+                <<>> ->
+                    % Previous CouchDB releases had a bug which allowed a doc
+                    % with an empty ID to be inserted into databases. Such doc
+                    % is impossible to GET.
+                    couch_log:error("Replicator: ignoring document with empty ID in "
+                        "source database `~s` (_changes sequence ~p)",
+                        [couch_replicator_api_wrap:db_uri(Db), Seq]);
+                _ ->
+                    ok = couch_work_queue:queue(ChangesQueue, DocInfo)
+                end,
+                put(last_seq, Seq);
+            ({last_seq, LS}) ->
+                case get_value(continuous, Options) of
+                true ->
+                    % LS should never be undefined, but it doesn't hurt to be
+                    % defensive inside the replicator.
+                    Seq = case LS of undefined -> get(last_seq); _ -> LS end,
+                    OldSeq = get(last_seq),
+                    if Seq == OldSeq -> ok; true ->
+                        Msg = {report_seq_done, {Ts, Seq}, #rep_stats{}},
+                        ok = gen_server:call(Parent, Msg, infinity)
+                    end,
+                    put(last_seq, Seq),
+                    throw(recurse);
+                _ ->
+                    % This clause is unreachable today, but let's plan ahead
+                    % for the future where we checkpoint against last_seq
+                    % instead of the sequence of the last change.  The two can
+                    % differ substantially in the case of a restrictive filter.
+                    ok
+                end
+            end, Options),
+        couch_work_queue:close(ChangesQueue)
+    catch
+        throw:recurse ->
+            LS = get(last_seq),
+            read_changes(Parent, LS, Db, ChangesQueue, Options, Ts+1);
+        exit:{http_request_failed, _, _, _} = Error ->
+        case get(retries_left) of
+        N when N > 0 ->
+            put(retries_left, N - 1),
+            LastSeq = get(last_seq),
+            Db2 = case LastSeq of
+            StartSeq ->
+                couch_log:notice("Retrying _changes request to source database ~s"
+                    " with since=~p in ~p seconds",
+                    [couch_replicator_api_wrap:db_uri(Db), LastSeq, Db#httpdb.wait / 1000]),
+                ok = timer:sleep(Db#httpdb.wait),
+                Db#httpdb{wait = 2 * Db#httpdb.wait};
+            _ ->
+                couch_log:notice("Retrying _changes request to source database ~s"
+                    " with since=~p", [couch_replicator_api_wrap:db_uri(Db), LastSeq]),
+                Db
+            end,
+            read_changes(Parent, LastSeq, Db2, ChangesQueue, Options, Ts);
+        _ ->
+            exit(Error)
+        end
+    end.