You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/28 14:15:32 UTC

[01/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Repository: couchdb-couch-replicator
Updated Branches:
  refs/heads/master edd5dc40a -> aafb5f9d9


Allow last_seq through doc_ids check

BugzID: 17898
BugzID: 17840


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/d809436d
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/d809436d
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/d809436d

Branch: refs/heads/master
Commit: d809436d1ecd719334432c8978607ee9eb4b970b
Parents: edd5dc4
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Thu Mar 7 12:57:18 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 12:28:51 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator_api_wrap.erl | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/d809436d/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index 7410b07..bf37400 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -430,7 +430,9 @@ changes_since(#httpdb{headers = Headers1, timeout = InactiveTimeout} = HttpDb,
                                 UserFun(DocInfo);
                             false ->
                                 ok
-                            end
+                            end;
+                        (LastSeq) ->
+                            UserFun(LastSeq)
                         end,
                         parse_changes_feed(Options, UserFun2, DataStreamFun2)
                     end)


[12/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Provide an abstract interface to replicator stats

Switch to an ordered_dict() under the hood.

BugzID: 24298


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/65a1a106
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/65a1a106
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/65a1a106

Branch: refs/heads/master
Commit: 65a1a106a4ec3e4e3adbe5b26566c5d0eea3c940
Parents: 6f2dc4c
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Fri Oct 18 15:53:23 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 15:02:53 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator.erl                | 28 +++++-----
 src/couch_replicator.hrl                |  8 ---
 src/couch_replicator_changes_reader.erl |  2 +-
 src/couch_replicator_stats.erl          | 83 ++++++++++++++++++++++++++++
 src/couch_replicator_utils.erl          | 14 +----
 src/couch_replicator_worker.erl         | 53 +++++++++---------
 6 files changed, 127 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/65a1a106/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index 5891084..47bbd3a 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -64,7 +64,7 @@
     changes_manager,
     changes_reader,
     workers,
-    stats = #rep_stats{},
+    stats = couch_replicator_stats:new(),
     session_id,
     source_db_compaction_notifier = nil,
     target_db_compaction_notifier = nil,
@@ -694,11 +694,11 @@ do_checkpoint(State) ->
             {<<"start_last_seq">>, StartSeq},
             {<<"end_last_seq">>, NewSeq},
             {<<"recorded_seq">>, NewSeq},
-            {<<"missing_checked">>, Stats#rep_stats.missing_checked},
-            {<<"missing_found">>, Stats#rep_stats.missing_found},
-            {<<"docs_read">>, Stats#rep_stats.docs_read},
-            {<<"docs_written">>, Stats#rep_stats.docs_written},
-            {<<"doc_write_failures">>, Stats#rep_stats.doc_write_failures}
+            {<<"missing_checked">>, couch_replicator_stats:missing_checked(Stats)},
+            {<<"missing_found">>, couch_replicator_stats:missing_found(Stats)},
+            {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)},
+            {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)},
+            {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)}
         ]},
         BaseHistory = [
             {<<"session_id">>, SessionId},
@@ -714,9 +714,9 @@ do_checkpoint(State) ->
             [
                 {<<"start_time">>, StartTime},
                 {<<"end_time">>, EndTime},
-                {<<"docs_read">>, Stats#rep_stats.docs_read},
-                {<<"docs_written">>, Stats#rep_stats.docs_written},
-                {<<"doc_write_failures">>, Stats#rep_stats.doc_write_failures}
+                {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)},
+                {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)},
+                {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)}
             ]
         end,
         % limit history to 50 entries
@@ -921,12 +921,12 @@ rep_stats(State) ->
         source_seq = SourceCurSeq
     } = State,
     [
-        {revisions_checked, Stats#rep_stats.missing_checked},
-        {missing_revisions_found, Stats#rep_stats.missing_found},
-        {docs_read, Stats#rep_stats.docs_read},
-        {docs_written, Stats#rep_stats.docs_written},
+        {revisions_checked, couch_replicator_stats:missing_checked(Stats)},
+        {missing_revisions_found, couch_replicator_stats:missing_found(Stats)},
+        {docs_read, couch_replicator_stats:docs_read(Stats)},
+        {docs_written, couch_replicator_stats:docs_written(Stats)},
         {changes_pending, pending(SourceCurSeq, CommittedSeq)},
-        {doc_write_failures, Stats#rep_stats.doc_write_failures},
+        {doc_write_failures, couch_replicator_stats:doc_write_failures(Stats)},
         {checkpointed_source_seq, CommittedSeq}
     ].
 

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/65a1a106/src/couch_replicator.hrl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.hrl b/src/couch_replicator.hrl
index 018aa4b..893d9e4 100644
--- a/src/couch_replicator.hrl
+++ b/src/couch_replicator.hrl
@@ -20,11 +20,3 @@
     user_ctx,
     doc_id
 }).
-
--record(rep_stats, {
-    missing_checked = 0,
-    missing_found = 0,
-    docs_read = 0,
-    docs_written = 0,
-    doc_write_failures = 0
-}).

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/65a1a106/src/couch_replicator_changes_reader.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_changes_reader.erl b/src/couch_replicator_changes_reader.erl
index 045e074..f28dd51 100644
--- a/src/couch_replicator_changes_reader.erl
+++ b/src/couch_replicator_changes_reader.erl
@@ -93,7 +93,7 @@ process_change({last_seq, LS}, {Parent, _, _, true = _Continuous, Ts}) ->
     Seq = case LS of undefined -> get(last_seq); _ -> LS end,
     OldSeq = get(last_seq),
     if Seq == OldSeq -> ok; true ->
-        Msg = {report_seq_done, {Ts, Seq}, #rep_stats{}},
+        Msg = {report_seq_done, {Ts, Seq}, couch_replicator_stats:new()},
         ok = gen_server:call(Parent, Msg, infinity)
     end,
     put(last_seq, Seq),

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/65a1a106/src/couch_replicator_stats.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_stats.erl b/src/couch_replicator_stats.erl
new file mode 100644
index 0000000..af8ba4e
--- /dev/null
+++ b/src/couch_replicator_stats.erl
@@ -0,0 +1,83 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_stats).
+
+-record(rep_stats, {
+    missing_checked = 0,
+    missing_found = 0,
+    docs_read = 0,
+    docs_written = 0,
+    doc_write_failures = 0
+}).
+
+-export([
+    new/0,
+    new/1,
+    get/2,
+    increment/2,
+    sum_stats/2
+]).
+
+-export([
+    missing_checked/1,
+    missing_found/1,
+    docs_read/1,
+    docs_written/1,
+    doc_write_failures/1
+]).
+
+new() ->
+    orddict:new().
+
+new(Initializers) when is_list(Initializers) ->
+    orddict:from_list(Initializers).
+
+missing_checked(Stats) ->
+    get(missing_checked, upgrade(Stats)).
+
+missing_found(Stats) ->
+    get(missing_found, upgrade(Stats)).
+
+docs_read(Stats) ->
+    get(docs_read, upgrade(Stats)).
+
+docs_written(Stats) ->
+    get(docs_written, upgrade(Stats)).
+
+doc_write_failures(Stats) ->
+    get(doc_write_failures, upgrade(Stats)).
+
+get(Field, Stats) ->
+    case orddict:find(Field, upgrade(Stats)) of
+        {ok, Value} ->
+            Value;
+        error ->
+            0
+    end.
+
+increment(Field, Stats) ->
+    orddict:update_counter(Field, 1, upgrade(Stats)).
+
+sum_stats(S1, S2) ->
+    orddict:merge(fun(_, V1, V2) -> V1+V2 end, upgrade(S1), upgrade(S2)).
+
+upgrade(#rep_stats{} = Stats) ->
+    orddict:from_list([
+        {missing_checked, Stats#rep_stats.missing_checked},
+        {missing_found, Stats#rep_stats.missing_found},
+        {docs_read, Stats#rep_stats.docs_read},
+        {docs_written, Stats#rep_stats.docs_written},
+        {doc_write_failures, Stats#rep_stats.doc_write_failures}
+    ]);
+upgrade(Stats) ->
+    Stats.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/65a1a106/src/couch_replicator_utils.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_utils.erl b/src/couch_replicator_utils.erl
index f489ff1..9d716c9 100644
--- a/src/couch_replicator_utils.erl
+++ b/src/couch_replicator_utils.erl
@@ -392,17 +392,9 @@ handle_db_event(DbName, compacted, Server) ->
 handle_db_event(_DbName, _Event, Server) ->
     {ok, Server}.
 
-
-sum_stats(#rep_stats{} = S1, #rep_stats{} = S2) ->
-    #rep_stats{
-        missing_checked =
-            S1#rep_stats.missing_checked + S2#rep_stats.missing_checked,
-        missing_found = S1#rep_stats.missing_found + S2#rep_stats.missing_found,
-        docs_read = S1#rep_stats.docs_read + S2#rep_stats.docs_read,
-        docs_written = S1#rep_stats.docs_written + S2#rep_stats.docs_written,
-        doc_write_failures =
-            S1#rep_stats.doc_write_failures + S2#rep_stats.doc_write_failures
-    }.
+% Obsolete - remove in next release
+sum_stats(S1, S2) ->
+    couch_replicator_stats:sum_stats(S1, S2).
 
 mp_parse_doc({headers, H}, []) ->
     case couch_util:get_value("content-type", H) of

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/65a1a106/src/couch_replicator_worker.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_worker.erl b/src/couch_replicator_worker.erl
index fce445b..a0638c6 100644
--- a/src/couch_replicator_worker.erl
+++ b/src/couch_replicator_worker.erl
@@ -31,9 +31,6 @@
 -define(MAX_BULK_ATTS_PER_DOC, 8).
 -define(STATS_DELAY, 10000000).              % 10 seconds (in microseconds)
 
--define(inc_stat(StatPos, Stats, Inc),
-    setelement(StatPos, Stats, element(StatPos, Stats) + Inc)).
-
 -import(couch_replicator_utils, [
     open_db/1,
     close_db/1,
@@ -61,7 +58,7 @@
     writer = nil,
     pending_fetch = nil,
     flush_waiter = nil,
-    stats = #rep_stats{},
+    stats = couch_replicator_stats:new(),
     source_db_compaction_notifier = nil,
     target_db_compaction_notifier = nil,
     batch = #batch{}
@@ -306,7 +303,7 @@ fetch_doc(Source, {Id, Revs, PAs}, DocHandler, Acc) ->
 
 
 local_doc_handler({ok, Doc}, {Target, DocList, Stats, Cp}) ->
-    Stats2 = ?inc_stat(#rep_stats.docs_read, Stats, 1),
+    Stats2 = couch_replicator_stats:increment(docs_read, Stats),
     case batch_doc(Doc) of
     true ->
         {ok, {Target, [Doc | DocList], Stats2, Cp}};
@@ -317,9 +314,9 @@ local_doc_handler({ok, Doc}, {Target, DocList, Stats, Cp}) ->
         close_db(Target2),
         Stats3 = case Success of
         true ->
-            ?inc_stat(#rep_stats.docs_written, Stats2, 1);
+            couch_replicator_stats:increment(docs_written, Stats2);
         false ->
-            ?inc_stat(#rep_stats.doc_write_failures, Stats2, 1)
+            couch_replicator_stats:increment(doc_write_failures, Stats2)
         end,
         Stats4 = maybe_report_stats(Cp, Stats3),
         {ok, {Target, DocList, Stats4, Cp}}
@@ -336,16 +333,16 @@ remote_doc_handler({ok, Doc}, {Parent, Target} = Acc) ->
     % source. The data property of each attachment is a function that starts
     % streaming the attachment data from the remote source, therefore it's
     % convenient to call it ASAP to avoid ibrowse inactivity timeouts.
-    Stats = #rep_stats{docs_read = 1},
+    Stats = couch_replicator_stats:new([{docs_read, 1}]),
     couch_log:debug("Worker flushing doc with attachments", []),
     Target2 = open_db(Target),
     Success = (flush_doc(Target2, Doc) =:= ok),
     close_db(Target2),
     {Result, Stats2} = case Success of
     true ->
-        {{ok, Acc}, ?inc_stat(#rep_stats.docs_written, Stats, 1)};
+        {{ok, Acc}, couch_replicator_stats:increment(docs_written, Stats)};
     false ->
-        {{skip, Acc}, ?inc_stat(#rep_stats.doc_write_failures, Stats, 1)}
+        {{skip, Acc}, couch_replicator_stats:increment(doc_write_failures, Stats)}
     end,
     ok = gen_server:call(Parent, {add_stats, Stats2}, infinity),
     Result;
@@ -376,7 +373,7 @@ after_full_flush(#state{stats = Stats, flush_waiter = Waiter} = State) ->
     gen_server:reply(Waiter, {ok, Stats}),
     erlang:put(last_stats_report, now()),
     State#state{
-        stats = #rep_stats{},
+        stats = couch_replicator_stats:new(),
         flush_waiter = nil,
         writer = nil,
         batch = #batch{}
@@ -389,8 +386,8 @@ maybe_flush_docs(Doc,State) ->
         stats = Stats, cp = Cp
     } = State,
     {Batch2, WStats} = maybe_flush_docs(Target, Batch, Doc),
-    Stats2 = couch_replicator_utils:sum_stats(Stats, WStats),
-    Stats3 = ?inc_stat(#rep_stats.docs_read, Stats2, 1),
+    Stats2 = couch_replicator_stats:sum_stats(Stats, WStats),
+    Stats3 = couch_replicator_stats:increment(docs_read, Stats2),
     Stats4 = maybe_report_stats(Cp, Stats3),
     State#state{stats = Stats4, batch = Batch2}.
 
@@ -402,9 +399,9 @@ maybe_flush_docs(#httpdb{} = Target, Batch, Doc) ->
         couch_log:debug("Worker flushing doc with attachments", []),
         case flush_doc(Target, Doc) of
         ok ->
-            {Batch, #rep_stats{docs_written = 1}};
+            {Batch, couch_replicator_stats:new([{docs_written, 1}])};
         _ ->
-            {Batch, #rep_stats{doc_write_failures = 1}}
+            {Batch, couch_replicator_stats:new([{doc_write_failures, 1}])}
         end;
     true ->
         JsonDoc = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs, attachments])),
@@ -414,7 +411,8 @@ maybe_flush_docs(#httpdb{} = Target, Batch, Doc) ->
             Stats = flush_docs(Target, [JsonDoc | DocAcc]),
             {#batch{}, Stats};
         SizeAcc2 ->
-            {#batch{docs = [JsonDoc | DocAcc], size = SizeAcc2}, #rep_stats{}}
+            Stats = couch_replicator_stats:new(),
+            {#batch{docs = [JsonDoc | DocAcc], size = SizeAcc2}, Stats}
         end
     end;
 
@@ -425,7 +423,8 @@ maybe_flush_docs(#db{} = Target, #batch{docs = DocAcc, size = SizeAcc}, Doc) ->
         Stats = flush_docs(Target, [Doc | DocAcc]),
         {#batch{}, Stats};
     SizeAcc2 ->
-        {#batch{docs = [Doc | DocAcc], size = SizeAcc2}, #rep_stats{}}
+        Stats = couch_replicator_stats:new(),
+        {#batch{docs = [Doc | DocAcc], size = SizeAcc2}, Stats}
     end.
 
 
@@ -440,7 +439,7 @@ batch_doc(#doc{atts = Atts}) ->
 
 
 flush_docs(_Target, []) ->
-    #rep_stats{};
+    couch_replicator_stats:new();
 
 flush_docs(Target, DocList) ->
     {ok, Errors} = couch_replicator_api_wrap:update_docs(
@@ -453,10 +452,10 @@ flush_docs(Target, DocList) ->
                 [get_value(id, Props, ""), get_value(rev, Props, ""), DbUri,
                     get_value(error, Props, ""), get_value(reason, Props, "")])
         end, Errors),
-    #rep_stats{
-        docs_written = length(DocList) - length(Errors),
-        doc_write_failures = length(Errors)
-    }.
+    couch_replicator_stats:new([
+        {docs_written, length(DocList) - length(Errors)},
+        {doc_write_failures, length(Errors)}
+    ]).
 
 flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) ->
     try couch_replicator_api_wrap:update_doc(Target, Doc, [], replicated_changes) of
@@ -495,10 +494,10 @@ find_missing(DocInfos, Target) ->
     MissingRevsCount = lists:foldl(
         fun({_Id, MissingRevs, _PAs}, Acc) -> Acc + length(MissingRevs) end,
         0, Missing),
-    Stats = #rep_stats{
-        missing_checked = AllRevsCount,
-        missing_found = MissingRevsCount
-    },
+    Stats = couch_replicator_stats:new([
+        {missing_checked, AllRevsCount},
+        {missing_found, MissingRevsCount}
+    ]),
     {Missing, Stats}.
 
 
@@ -508,7 +507,7 @@ maybe_report_stats(Cp, Stats) ->
     true ->
         ok = gen_server:call(Cp, {add_stats, Stats}, infinity),
         erlang:put(last_stats_report, Now),
-        #rep_stats{};
+        couch_replicator_stats:new();
     false ->
         Stats
     end.


[09/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Fix a non-tail-recursive call that leaks memory

By recursing in the protected body of a try/catch we weren't properly
using tail recursion when restarting the changes feed. This leads to
mailbox accumulation of messages which slowly eats all RAM on the node.

BugzId: 20601


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/44328a6c
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/44328a6c
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/44328a6c

Branch: refs/heads/master
Commit: 44328a6cca9686334acc2932b3a46ccc780f2322
Parents: 20b837e
Author: Paul J. Davis <pa...@gmail.com>
Authored: Mon Jul 8 14:49:41 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 13:48:38 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator.erl | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/44328a6c/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index 3ef5e9c..bb81cf0 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -665,7 +665,7 @@ read_changes(Parent, StartSeq, Db, ChangesQueue, Options, Ts) ->
                         ok = gen_server:call(Parent, Msg, infinity)
                     end,
                     put(last_seq, Seq),
-                    read_changes(Parent, Seq, Db, ChangesQueue, Options, Ts + 1);
+                    throw(recurse);
                 _ ->
                     % This clause is unreachable today, but let's plan ahead
                     % for the future where we checkpoint against last_seq
@@ -675,7 +675,11 @@ read_changes(Parent, StartSeq, Db, ChangesQueue, Options, Ts) ->
                 end
             end, Options),
         couch_work_queue:close(ChangesQueue)
-    catch exit:{http_request_failed, _, _, _} = Error ->
+    catch
+        throw:recurse ->
+            LS = get(last_seq),
+            read_changes(Parent, LS, Db, ChangesQueue, Options, Ts+1);
+        exit:{http_request_failed, _, _, _} = Error ->
         case get(retries_left) of
         N when N > 0 ->
             put(retries_left, N - 1),


[31/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Fix badmatch with variable name duplication

Silly bug is silly.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/1b323802
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/1b323802
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/1b323802

Branch: refs/heads/master
Commit: 1b3238023d3edfe9283f7c140b71d83b9f746768
Parents: 9b2dd48
Author: Paul J. Davis <pa...@gmail.com>
Authored: Mon Aug 11 15:07:02 2014 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Mon Aug 11 15:07:02 2014 -0500

----------------------------------------------------------------------
 src/couch_replicator_api_wrap.erl | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/1b323802/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index fbbff19..ae4f16e 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -159,9 +159,9 @@ get_pending_count(#httpdb{} = Db, Seq) ->
         {ok, couch_util:get_value(<<"pending">>, Props, null)}
     end);
 get_pending_count(#db{name=DbName}=Db, Seq) when is_number(Seq) ->
-    {ok, Db} = couch_db:open(DbName, [{user_ctx, Db#db.user_ctx}]),
-    Pending = couch_db:count_changes_since(Db, Seq),
-    couch_db:close(Db),
+    {ok, CountDb} = couch_db:open(DbName, [{user_ctx, Db#db.user_ctx}]),
+    Pending = couch_db:count_changes_since(CountDb, Seq),
+    couch_db:close(CountDb),
     {ok, Pending}.
 
 


[27/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Retry if doc is missing

The catch-all clause in remote_doc_handler is the root cause of
missing updates in replication. This occurs when we receive a
{{not_found,missing}, Rev} message instead of the expected {ok, Doc}
message, and causes us to continue blindly on, including writing a
checkpoint.

This patch changes the catch-all clause to one that handles the
missing doc case and causes a retry. Any other response will be a
function_clause which will cause a messier retry.

BugzID: 31722


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/75e5ba17
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/75e5ba17
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/75e5ba17

Branch: refs/heads/master
Commit: 75e5ba177431483522f983ffab305213f0ccbfe6
Parents: 22e9ff4
Author: Robert Newson <rn...@apache.org>
Authored: Thu Jun 19 14:36:32 2014 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 11:57:31 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator_api_wrap.erl | 2 ++
 src/couch_replicator_worker.erl   | 9 +++++++--
 2 files changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/75e5ba17/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index c6fe691..fbbff19 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -262,6 +262,8 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) ->
     receive
         {'DOWN', Ref, process, Pid, {exit_ok, Ret}} ->
             Ret;
+        {'DOWN', Ref, process, Pid, {{nocatch, missing_doc}, _}} ->
+            throw(missing_doc);
         {'DOWN', Ref, process, Pid, {{nocatch, {missing_stub,_} = Stub}, _}} ->
             throw(Stub);
         {'DOWN', Ref, process, Pid, request_uri_too_long} ->

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/75e5ba17/src/couch_replicator_worker.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_worker.erl b/src/couch_replicator_worker.erl
index bb31d5d..7a6cc96 100644
--- a/src/couch_replicator_worker.erl
+++ b/src/couch_replicator_worker.erl
@@ -295,6 +295,11 @@ fetch_doc(Source, {Id, Revs, PAs}, DocHandler, Acc) ->
         couch_replicator_api_wrap:open_doc_revs(
             Source, Id, Revs, [{atts_since, PAs}, latest], DocHandler, Acc)
     catch
+    throw:missing_doc ->
+        couch_log:error("Retrying fetch and update of document `~s` as it is "
+            "unexpectedly missing. Missing revisions are: ~s",
+            [Id, couch_doc:revs_to_strs(Revs)]),
+        couch_replicator_api_wrap:open_doc_revs(Source, Id, Revs, [latest], DocHandler, Acc);
     throw:{missing_stub, _} ->
         couch_log:error("Retrying fetch and update of document `~s` due to out of "
             "sync attachment stubs. Missing revisions are: ~s",
@@ -347,8 +352,8 @@ remote_doc_handler({ok, Doc}, {Parent, Target} = Acc) ->
     end,
     ok = gen_server:call(Parent, {add_stats, Stats2}, infinity),
     Result;
-remote_doc_handler(_, Acc) ->
-    {ok, Acc}.
+remote_doc_handler({{not_found, missing}, _}, _Acc) ->
+    throw(missing_doc).
 
 
 spawn_writer(Target, #batch{docs = DocList, size = Size}) ->


[20/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Fallback to using source seq for Apache CouchDB

If the update_seq is a number we assume that the source server is Apache
CouchDB so we fall back to comparing the current update_seq against the
update_seq value in the database info blob.

BugzId: 26015


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/777da3d1
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/777da3d1
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/777da3d1

Branch: refs/heads/master
Commit: 777da3d1ed2d9ded240016273f3669c0660eaae9
Parents: a18aa43
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Dec 11 11:58:04 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 15:18:00 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator_api_wrap.erl | 25 +++++++++++++++++--------
 1 file changed, 17 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/777da3d1/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index 3b2e1de..fd8edc4 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -125,15 +125,24 @@ get_db_info(#db{name = DbName, user_ctx = UserCtx}) ->
     {ok, [{couch_util:to_binary(K), V} || {K, V} <- Info]}.
 
 
+get_pending_count(#httpdb{} = Db, Seq) when is_number(Seq) ->
+    % Source looks like Apache CouchDB and not Cloudant so we fall
+    % back to using update sequence differences.
+    send_req(Db, [], fun(200, _, {Props}) ->
+        case get_value(<<"update_seq">>, Props) of
+            UpdateSeq when is_number(UpdateSeq) ->
+                {ok, UpdateSeq - Seq};
+            _ ->
+                {ok, null}
+        end
+    end);
 get_pending_count(#httpdb{} = Db, Seq) ->
-    send_req(
-        Db,
-        [{path, "_changes"}, {qs, [{"since", Seq}, {"limit", "0"}]}],
-        fun(200, _, {Props}) ->
-            {ok, couch_util:get_value(<<"pending">>, Props, null)}
-        end);
-get_pending_count(#db{name=DbName, user_ctx = UserCtx}, Seq) ->
-    {ok, Db} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
+    Options = [{path, "_changes"}, {qs, [{"since", Seq}, {"limit", "0"}]}],
+    send_req(Db, Options, fun(200, _, {Props}) ->
+        {ok, couch_util:get_value(<<"pending">>, Props, null)}
+    end);
+get_pending_count(#db{name=DbName}=Db, Seq) when is_number(Seq) ->
+    {ok, Db} = couch_db:open(DbName, [{user_ctx, Db#db.user_ctx}]),
     Pending = couch_db:count_changes_since(Db, Seq),
     couch_db:close(Db),
     {ok, Pending}.


[34/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Fix the owner check for local replicator dbs


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/9640ccf2
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/9640ccf2
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/9640ccf2

Branch: refs/heads/master
Commit: 9640ccf2e9182bcc7119c4cbde01714fad71b24a
Parents: c6be20d
Author: Paul J. Davis <pa...@gmail.com>
Authored: Tue Aug 12 12:40:14 2014 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Tue Aug 12 12:40:14 2014 -0500

----------------------------------------------------------------------
 src/couch_replicator_manager.erl | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/9640ccf2/src/couch_replicator_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index 099b215..411e9e1 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -405,11 +405,13 @@ process_update(State, DbName, {Change}) ->
         end
     end.
 
-owner(DbName, DocId) ->
+owner(<<"shards/", _/binary>> = DbName, DocId) ->
     Live = [node()|nodes()],
     Nodes = lists:sort([N || #shard{node=N} <- mem3:shards(DbName, DocId),
 			     lists:member(N, Live)]),
-    node() =:= hd(mem3_util:rotate_list({DbName, DocId}, Nodes)).
+    node() =:= hd(mem3_util:rotate_list({DbName, DocId}, Nodes));
+owner(_DbName, _DocId) ->
+    true.
 
 rep_db_update_error(Error, DbName, DocId) ->
     case Error of


[05/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Cancel replication jobs on every node, not just owner node

It is safe to attempt to cancel a job owned by another node, just in
case ownership changed at the same time as the document was deleted.

BugzID: 19518


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/dd96e219
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/dd96e219
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/dd96e219

Branch: refs/heads/master
Commit: dd96e21930032fe6a2f6707ba3031268cc4ee187
Parents: ed447f8
Author: Robert Newson <ro...@cloudant.com>
Authored: Tue May 7 17:05:08 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 13:22:05 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator_manager.erl | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/dd96e219/src/couch_replicator_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index 6afddd8..71e6be0 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -346,12 +346,11 @@ process_update(State, DbName, {Change}) ->
     {RepProps} = JsonRepDoc = get_json_value(doc, Change),
     DocId = get_json_value(<<"_id">>, RepProps),
     case {is_owner(DbName, DocId), get_json_value(deleted, Change, false)} of
-    {false, _} ->
-        replication_complete(DbName, DocId),
-        State;
-    {true, true} ->
+    {_, true} ->
         rep_doc_deleted(DbName, DocId),
         State;
+    {false, false} ->
+        State;
     {true, false} ->
         case get_json_value(<<"_replication_state">>, RepProps) of
         undefined ->


[24/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Fix couch_replicator_mangaer changes feed upgrades

The changes feed readers were unnecessarily holding onto anonymous
functions. This makes it so they dont.

BugzId: 27666


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/0ae9e60e
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/0ae9e60e
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/0ae9e60e

Branch: refs/heads/master
Commit: 0ae9e60eb71ac7336472902379f5e446c34674ac
Parents: e87a923
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Jan 30 19:57:48 2014 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 16:40:22 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator_manager.erl | 71 +++++++++++++++++++----------------
 1 file changed, 39 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/0ae9e60e/src/couch_replicator_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index b0c1c9b..21e759f 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -24,6 +24,9 @@
 -export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]).
 -export([code_change/3, terminate/2]).
 
+% changes callbacks
+-export([changes_reader/3, changes_reader_cb/2]).
+
 % config_listener callback
 -export([handle_config_change/5]).
 
@@ -135,7 +138,7 @@ init(_) ->
     % Automatically start node local changes feed loop
     LocalRepDb = ?l2b(config:get("replicator", "db", "_replicator")),
     ensure_rep_db_exists(LocalRepDb),
-    Pid = changes_feed_loop(LocalRepDb, 0),
+    Pid = start_changes_reader(LocalRepDb, 0),
     {ok, #state{
         event_listener = start_event_listener(),
         scan_pid = ScanPid,
@@ -198,7 +201,7 @@ handle_cast({resume_scan, DbName}, State) ->
         [{DbName, EndSeq}] -> EndSeq
     end,
     ensure_rep_ddoc_exists(DbName),
-    Pid = changes_feed_loop(DbName, Since),
+    Pid = start_changes_reader(DbName, Since),
     couch_log:debug("Scanning ~s from update_seq ~p", [DbName, Since]),
     {noreply, State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}};
 
@@ -273,37 +276,41 @@ terminate(_Reason, State) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-changes_feed_loop(DbName, Since) ->
-    Server = self(),
-    spawn_link(fun() ->
-        UserCtx = #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]},
-        DbOpenOptions = [{user_ctx, UserCtx}, sys_db],
-        {ok, Db} = couch_db:open_int(DbName, DbOpenOptions),
-        ChangesFeedFun = couch_changes:handle_changes(
-            #changes_args{
-                include_docs = true,
-                since = Since,
-                feed = "continuous",
-                timeout = infinity
-            },
-            {json_req, null},
-            Db
-        ),
-        EnumFun = fun
-        ({change, Change, _}, _) ->
-            case has_valid_rep_id(Change) of
-                true ->
-                    Msg = {rep_db_update, DbName, Change},
-                    ok = gen_server:call(Server, Msg, infinity);
-                false ->
-                    ok
-            end;
-        (_, _) ->
-            ok
-        end,
-        ChangesFeedFun(EnumFun)
-    end).
 
+start_changes_reader(DbName, Since) ->
+    spawn_link(?MODULE, changes_reader, [self(), DbName, Since]).
+
+changes_reader(Server, DbName, Since) ->
+    UserCtx = #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]},
+    DbOpenOptions = [{user_ctx, UserCtx}, sys_db],
+    {ok, Db} = couch_db:open_int(DbName, DbOpenOptions),
+    ChangesFeedFun = couch_changes:handle_changes(
+        #changes_args{
+            include_docs = true,
+            since = Since,
+            feed = "continuous",
+            timeout = infinity
+        },
+        {json_req, null},
+        Db
+    ),
+    ChangesFeedFun({fun ?MODULE:changes_reader_cb/2, {Server, DbName}}).
+
+changes_reader_cb({change, Change}, {Server, DbName}) ->
+    case has_valid_rep_id(Change) of
+        true ->
+            Msg = {rep_db_update, DbName, Change},
+            ok = gen_server:call(Server, Msg, infinity);
+        false ->
+            ok
+    end,
+    {ok, {Server, DbName}};
+changes_reader_cb({stop, EndSeq, _Pending}, {Server, DbName}) ->
+    Msg = {rep_db_checkpoint, DbName, EndSeq},
+    ok = gen_server:call(Server, Msg, infinity),
+    {ok, {Server, DbName}};
+changes_reader_cb(_, Acc) ->
+    {ok, Acc}.
 
 has_valid_rep_id({Change}) ->
     has_valid_rep_id(get_json_value(<<"id">>, Change));


[21/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Change term formatting for error logs

HTTP errors were nearly unreadable using the `~p` format after a long
error message. This just uses `~w` so the term is printed on a single
line.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/b80c7c04
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/b80c7c04
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/b80c7c04

Branch: refs/heads/master
Commit: b80c7c043e6e2b43b9eedcef6a7845208261f1d1
Parents: 777da3d
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Dec 11 10:51:43 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 15:21:03 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator_api_wrap.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/b80c7c04/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index fd8edc4..16f7893 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -264,7 +264,7 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) ->
             ),
             #httpdb{retries = Retries, wait = Wait0} = HttpDb,
             Wait = 2 * erlang:min(Wait0 * 2, ?MAX_WAIT),
-            ?LOG_INFO("Retrying GET to ~s in ~p seconds due to error ~p",
+            couch_log:notice("Retrying GET to ~s in ~p seconds due to error ~w",
                 [Url, Wait / 1000, error_reason(Else)]
             ),
             ok = timer:sleep(Wait),


[18/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Remove progress field from replication tasks

The progress field doesn't make much sense in the context of a
replication. The new changes_pending field should be the correct gauge
of how far behind a replication is.

BugzId: 26015


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/353f51f5
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/353f51f5
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/353f51f5

Branch: refs/heads/master
Commit: 353f51f5bc48948501b42e99e712b5a48f9f8fcf
Parents: 06cf699
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Dec 11 10:10:19 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 15:17:45 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator.erl | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/353f51f5/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index 82c1360..006770c 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -287,7 +287,6 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
         {doc_write_failures, 0},
         {source_seq, HighestSeq},
         {checkpointed_source_seq, CommittedSeq},
-        {progress, 0},
         {checkpoint_interval, CheckpointInterval}
     ]),
     couch_task_status:set_update_frequency(1000),
@@ -917,8 +916,7 @@ update_task(State) ->
     } = State,
     couch_task_status:update(
         rep_stats(State) ++ [
-        {source_seq, HighestSeq},
-        {progress, 0}
+        {source_seq, HighestSeq}
     ]).
 
 


[15/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Fix messages leaking from streaming HTTP responses

When we cancel a streaming HTTP response (ie, Transfer-Encoding:
chunked) its possible to leave ibrowse messages in the mailbox. For
processes like the changes reader that will continually retry HTTP
requests these messages can build up and cause the process to slow to a
crawl which in turns causes replications to seemingly get stuck.

This patch changes the HTTP request functions to be wholly contained in
a try/catch/after clause. Using the after clause we can be sure that
we're removing all ibrowse related messages before returning to the
calling function. We also release the worker from the same after clause
and inform the release if its a changes feed worker which is important
as these workers are not managed by a pool.

BugzId: 25341


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/1746e93d
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/1746e93d
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/1746e93d

Branch: refs/heads/master
Commit: 1746e93de32ea273dc3cc211a41e0cf94e761dcc
Parents: 756f500
Author: Benjamin Anderson <b...@banjiewen.net>
Authored: Wed Nov 13 18:26:50 2013 -0800
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 15:05:23 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator_httpc.erl | 98 +++++++++++++++++++++++--------------
 1 file changed, 62 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/1746e93d/src/couch_replicator_httpc.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_httpc.erl b/src/couch_replicator_httpc.erl
index 6aeff93..0242885 100644
--- a/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator_httpc.erl
@@ -39,8 +39,25 @@ send_req(HttpDb, Params1, Callback) ->
         [{K, ?b2l(iolist_to_binary(V))} || {K, V} <- get_value(qs, Params1, [])]),
     Params = ?replace(Params2, ibrowse_options,
         lists:keysort(1, get_value(ibrowse_options, Params2, []))),
-    {Worker, Response} = send_ibrowse_req(HttpDb, Params),
-    process_response(Response, Worker, HttpDb, Params, Callback).
+    {Worker, Response, IsChanges} = send_ibrowse_req(HttpDb, Params),
+    Ret = try
+        process_response(Response, Worker, HttpDb, Params, Callback)
+    catch
+        throw:{retry, NewHttpDb0, NewParams0} ->
+            {retry, NewHttpDb0, NewParams0}
+    after
+        release_worker(Worker, HttpDb, IsChanges),
+        clean_mailbox(Response)
+    end,
+    % This is necessary to keep this tail-recursive. Calling
+    % send_req in the catch clause would turn it into a body
+    % recursive call accidentally.
+    case Ret of
+        {retry, #httpdb{}=NewHttpDb, NewParams} ->
+            send_req(NewHttpDb, NewParams, Callback);
+        _ ->
+            Ret
+    end.
 
 
 send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb, Params) ->
@@ -50,10 +67,10 @@ send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb, Params) ->
     Headers2 = oauth_header(HttpDb, Params) ++ Headers1,
     Url = full_url(HttpDb, Params),
     Body = get_value(body, Params, []),
-    case get_value(path, Params) of
-    "_changes" ->
+    IsChanges = get_value(path, Params) == "_changes",
+    if IsChanges ->
         {ok, Worker} = ibrowse:spawn_link_worker_process(Url);
-    _ ->
+    true ->
         {ok, Worker} = couch_replicator_httpc_pool:get_worker(HttpDb#httpdb.httpc_pool)
     end,
     IbrowseOptions = [
@@ -67,22 +84,21 @@ send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb, Params) ->
     end,
     Response = ibrowse:send_req_direct(
         Worker, Url, Headers2, Method, Body, IbrowseOptions, Timeout),
-    {Worker, Response}.
+    {Worker, Response, IsChanges}.
 
 
-process_response({error, sel_conn_closed}, _Worker, HttpDb, Params, Callback) ->
-    send_req(HttpDb, Params, Callback);
+process_response({error, sel_conn_closed}, _Worker, HttpDb, Params, _Cb) ->
+    throw({retry, HttpDb, Params});
 
-process_response({error, {'EXIT', {normal, _}}}, _Worker, HttpDb, Params, Cb) ->
+process_response({error, {'EXIT',{normal,_}}}, _Worker, HttpDb, Params, _Cb) ->
     % ibrowse worker terminated because remote peer closed the socket
     % -> not an error
-    send_req(HttpDb, Params, Cb);
+    throw({retry, HttpDb, Params});
 
 process_response({ibrowse_req_id, ReqId}, Worker, HttpDb, Params, Callback) ->
     process_stream_response(ReqId, Worker, HttpDb, Params, Callback);
 
 process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
-    release_worker(Worker, HttpDb),
     case list_to_integer(Code) of
     Ok when (Ok >= 200 andalso Ok < 300) ; (Ok >= 400 andalso Ok < 500) ->
         EJson = case Body of
@@ -95,11 +111,11 @@ process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
     R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
         do_redirect(Worker, R, Headers, HttpDb, Params, Callback);
     Error ->
-        maybe_retry({code, Error}, Worker, HttpDb, Params, Callback)
+        maybe_retry({code, Error}, Worker, HttpDb, Params)
     end;
 
-process_response(Error, Worker, HttpDb, Params, Callback) ->
-    maybe_retry(Error, Worker, HttpDb, Params, Callback).
+process_response(Error, Worker, HttpDb, Params, _Callback) ->
+    maybe_retry(Error, Worker, HttpDb, Params).
 
 
 process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
@@ -113,12 +129,10 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
             ibrowse:stream_next(ReqId),
             try
                 Ret = Callback(Ok, Headers, StreamDataFun),
-                release_worker(Worker, HttpDb),
-                clean_mailbox_req(ReqId),
                 Ret
-            catch throw:{maybe_retry_req, Err} ->
-                clean_mailbox_req(ReqId),
-                maybe_retry(Err, Worker, HttpDb, Params, Callback)
+            catch
+                throw:{maybe_retry_req, Err} ->
+                    maybe_retry(Err, Worker, HttpDb, Params)
             end;
         R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
             do_redirect(Worker, R, Headers, HttpDb, Params, Callback);
@@ -126,50 +140,63 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
             report_error(Worker, HttpDb, Params, {code, Error})
         end;
     {ibrowse_async_response, ReqId, {error, _} = Error} ->
-        maybe_retry(Error, Worker, HttpDb, Params, Callback)
+        maybe_retry(Error, Worker, HttpDb, Params)
     after HttpDb#httpdb.timeout + 500 ->
         % Note: ibrowse should always reply with timeouts, but this doesn't
         % seem to be always true when there's a very high rate of requests
         % and many open connections.
-        maybe_retry(timeout, Worker, HttpDb, Params, Callback)
+        maybe_retry(timeout, Worker, HttpDb, Params)
     end.
 
 
-clean_mailbox_req(ReqId) ->
+% Only streaming HTTP requests send messages back from
+% the ibrowse worker process. We can detect that based
+% on the ibrowse_req_id format. This just drops all
+% messages for the given ReqId on the floor since we're
+% no longer in the HTTP request.
+clean_mailbox({ibrowse_req_id, ReqId}) ->
     receive
     {ibrowse_async_response, ReqId, _} ->
-        clean_mailbox_req(ReqId);
+        clean_mailbox({ibrowse_req_id, ReqId});
     {ibrowse_async_response_end, ReqId} ->
-        clean_mailbox_req(ReqId)
+        clean_mailbox({ibrowse_req_id, ReqId})
     after 0 ->
         ok
-    end.
+    end;
+clean_mailbox(_) ->
+    ok.
 
 
-release_worker(Worker, #httpdb{httpc_pool = Pool}) ->
+release_worker(Worker, _, true) ->
+    true = unlink(Worker),
+    ibrowse_http_client:stop(Worker),
+    receive
+        {'EXIT', Worker, _} -> ok
+        after 0 -> ok
+    end;
+release_worker(Worker, #httpdb{httpc_pool = Pool}, false) ->
     ok = couch_replicator_httpc_pool:release_worker(Pool, Worker).
 
 
-maybe_retry(Error, Worker, #httpdb{retries = 0} = HttpDb, Params, _Cb) ->
+maybe_retry(Error, Worker, #httpdb{retries = 0} = HttpDb, Params) ->
     report_error(Worker, HttpDb, Params, {error, Error});
 
-maybe_retry(Error, Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb,
-    Params, Cb) ->
-    release_worker(Worker, HttpDb),
+maybe_retry(Error, _Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb,
+    Params) ->
     Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
     Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
     couch_log:notice("Retrying ~s request to ~s in ~p seconds due to error ~s",
         [Method, Url, Wait / 1000, error_cause(Error)]),
     ok = timer:sleep(Wait),
     Wait2 = erlang:min(Wait * 2, ?MAX_WAIT),
-    send_req(HttpDb#httpdb{retries = Retries - 1, wait = Wait2}, Params, Cb).
+    NewHttpDb = HttpDb#httpdb{retries = Retries - 1, wait = Wait2},
+    throw({retry, NewHttpDb, Params}).
 
 
-report_error(Worker, HttpDb, Params, Error) ->
+report_error(_Worker, HttpDb, Params, Error) ->
     Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
     Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
     do_report_error(Url, Method, Error),
-    release_worker(Worker, HttpDb),
     exit({http_request_failed, Method, Url, Error}).
 
 
@@ -255,11 +282,10 @@ oauth_header(#httpdb{url = BaseUrl, oauth = OAuth}, ConnParams) ->
         "OAuth " ++ oauth:header_params_encode(OAuthParams)}].
 
 
-do_redirect(Worker, Code, Headers, #httpdb{url = Url} = HttpDb, Params, Cb) ->
-    release_worker(Worker, HttpDb),
+do_redirect(_Worker, Code, Headers, #httpdb{url = Url} = HttpDb, Params, _Cb) ->
     RedirectUrl = redirect_url(Headers, Url),
     {HttpDb2, Params2} = after_redirect(RedirectUrl, Code, HttpDb, Params),
-    send_req(HttpDb2, Params2, Cb).
+    throw({retry, HttpDb2, Params2}).
 
 
 redirect_url(RespHeaders, OrigUrl) ->


[36/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Adjust the restart window

The test suite was causing the couch_replicator application to exit due
to couch_replicator_manager restaring frequently with the tests changing
the replicator_db. This just increases our tolerance to the frequency of
those restarts.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/a85e9b89
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/a85e9b89
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/a85e9b89

Branch: refs/heads/master
Commit: a85e9b89b6482712565188b55e968bb573dd8dc7
Parents: 7ff3a52
Author: Paul J. Davis <pa...@gmail.com>
Authored: Tue Aug 12 18:42:10 2014 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Tue Aug 12 18:42:10 2014 -0500

----------------------------------------------------------------------
 src/couch_replicator_sup.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/a85e9b89/src/couch_replicator_sup.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_sup.erl b/src/couch_replicator_sup.erl
index bfbc207..57ad63b 100644
--- a/src/couch_replicator_sup.erl
+++ b/src/couch_replicator_sup.erl
@@ -39,5 +39,5 @@ init(_Args) ->
             supervisor,
             [couch_replicator_job_sup]}
     ],
-    {ok, {{one_for_one,10,3600}, Children}}.
+    {ok, {{one_for_one,10,1}, Children}}.
 


[04/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Increase dt between heartbeat and inactive timeouts

The current theory is that on occasion the inactivity timeout in the
ibrowse client fires before the replicator receives a heartbeat line.
This patch replaces the 5 second delta with the "div 3" logic that
results in a 20 second delta by default.  This is the logic that we used
back when the replicator used heartbeats instead of timeouts to control
the frequency of heartbeat lines.

As an aside, the timeout value we're adjusting here actually serves a
dual purpose.  When a _changes response is streaming this parameter
controls the frequency of heartbeat newlines, useful e.g. for highly
selective filtered feeds.  In between streams of _changes it controls
how long the _changes coordinator will wait for updates to the DB before
terminating the response with a last_seq entry.

BugzID: 17709


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/ed447f8c
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/ed447f8c
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/ed447f8c

Branch: refs/heads/master
Commit: ed447f8c01880c7f99f5829a8ef485fd8d399376
Parents: 7898313
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Fri Apr 26 10:54:41 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 13:18:32 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator_api_wrap.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/ed447f8c/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index bf37400..fa64377 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -390,7 +390,7 @@ update_docs(Db, DocList, Options, UpdateType) ->
 
 changes_since(#httpdb{headers = Headers1, timeout = InactiveTimeout} = HttpDb,
               Style, StartSeq, UserFun, Options) ->
-    Timeout = erlang:max(1000, InactiveTimeout - 5000),
+    Timeout = erlang:max(1000, InactiveTimeout div 3),
     BaseQArgs = case get_value(continuous, Options, false) of
     false ->
         [{"feed", "normal"}];


[23/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Clean up couch_replicator_httpc_pool pid on error

db_open cleans up the couch_replicator_httpc_pool process
if an error occurs accessing the remote Db.

BugzID: 27202


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/e87a9230
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/e87a9230
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/e87a9230

Branch: refs/heads/master
Commit: e87a9230ca24f79bcebf958978ac54ba6b9023eb
Parents: 603e4a8
Author: Mike Wallace <mi...@googlemail.com>
Authored: Mon Jan 27 14:31:22 2014 +0000
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 15:26:37 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator_api_wrap.erl | 45 +++++++++++++++++++++++-----------
 1 file changed, 31 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/e87a9230/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index 16f7893..c6fe691 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -69,20 +69,37 @@ db_open(Db, Options) ->
 
 db_open(#httpdb{} = Db1, _Options, Create) ->
     {ok, Db} = couch_replicator_httpc:setup(Db1),
-    case Create of
-    false ->
-        ok;
-    true ->
-        send_req(Db, [{method, put}], fun(_, _, _) -> ok end)
-    end,
-    send_req(Db, [{method, head}],
-        fun(200, _, _) ->
-            {ok, Db};
-        (401, _, _) ->
-            throw({unauthorized, ?l2b(db_uri(Db))});
-        (_, _, _) ->
-            throw({db_not_found, ?l2b(db_uri(Db))})
-        end);
+    try
+        case Create of
+        false ->
+            ok;
+        true ->
+            send_req(Db, [{method, put}],
+                fun(401, _, _) ->
+                    throw({unauthorized, ?l2b(db_uri(Db))});
+                (_, _, _) ->
+                    ok
+                end)
+        end,
+        send_req(Db, [{method, head}],
+            fun(200, _, _) ->
+                {ok, Db};
+            (401, _, _) ->
+                throw({unauthorized, ?l2b(db_uri(Db))});
+            (_, _, _) ->
+                throw({db_not_found, ?l2b(db_uri(Db))})
+            end)
+    catch
+        throw:Error ->
+            db_close(Db),
+            throw(Error);
+        error:Error ->
+            db_close(Db),
+            erlang:error(Error);
+        exit:Error ->
+            db_close(Db),
+            erlang:exit(Error)
+    end;
 db_open(DbName, Options, Create) ->
     try
         case Create of


[14/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Report a correct value for changes_pending

This uses the work done in BugzId: 24236 that provides a new pending
field in _changes feeds. The source _changes is polled with a
`?since=seq&limit=0` query string to get the remaning changes to be
processed for a given sequence.

BugzId: 26015


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/756f500b
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/756f500b
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/756f500b

Branch: refs/heads/master
Commit: 756f500b3194ddf7b4ccbb9421024cec055f9ee3
Parents: 0c095c0
Author: Paul J. Davis <pa...@gmail.com>
Authored: Tue Dec 10 15:43:38 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 15:04:05 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator.erl          | 58 +++++++++++++---------------------
 src/couch_replicator_api_wrap.erl | 15 +++++++++
 2 files changed, 37 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/756f500b/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index ef44c54..8041283 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -240,8 +240,8 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
         source_name = SourceName,
         target_name = TargetName,
         start_seq = {_Ts, StartSeq},
-        source_seq = SourceCurSeq,
         committed_seq = {_, CommittedSeq},
+        highest_seq_done = {_, HighestSeq},
         checkpoint_interval = CheckpointInterval
     } = State = init_state(Rep),
 
@@ -283,9 +283,9 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
         {missing_revisions_found, 0},
         {docs_read, 0},
         {docs_written, 0},
-        {changes_pending, pending(SourceCurSeq, CommittedSeq)},
+        {changes_pending, get_pending_count(State)},
         {doc_write_failures, 0},
-        {source_seq, SourceCurSeq},
+        {source_seq, HighestSeq},
         {checkpointed_source_seq, CommittedSeq},
         {progress, 0},
         {checkpoint_interval, CheckpointInterval}
@@ -431,13 +431,11 @@ handle_call({report_seq_done, Seq, StatsInc}, From,
         "Seqs in progress were: ~p~nSeqs in progress are now: ~p",
         [Seq, ThroughSeq, NewThroughSeq, HighestDone,
             NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
-    SourceCurSeq = source_cur_seq(State),
     NewState = State#rep_state{
         stats = couch_replicator_utils:sum_stats(Stats, StatsInc),
         current_through_seq = NewThroughSeq,
         seqs_in_progress = NewSeqsInProgress,
-        highest_seq_done = NewHighestDone,
-        source_seq = SourceCurSeq
+        highest_seq_done = NewHighestDone
     },
     update_task(NewState),
     {noreply, NewState}.
@@ -589,7 +587,6 @@ init_state(Rep) ->
             start_db_compaction_notifier(Target, self()),
         source_monitor = db_monitor(Source),
         target_monitor = db_monitor(Target),
-        source_seq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ),
         use_checkpoints = get_value(use_checkpoints, Options, true),
         checkpoint_interval = get_value(checkpoint_interval, Options, 5000)
     },
@@ -650,10 +647,8 @@ do_checkpoint(#rep_state{use_checkpoints=false} = State) ->
     NewState = State#rep_state{checkpoint_history = {[{<<"use_checkpoints">>, false}]} },
     {ok, NewState};
 do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=Seq} = State) ->
-    SourceCurSeq = source_cur_seq(State),
-    NewState = State#rep_state{source_seq = SourceCurSeq},
-    update_task(NewState),
-    {ok, NewState};
+    update_task(State),
+    {ok, State};
 do_checkpoint(State) ->
     #rep_state{
         source_name=SourceName,
@@ -727,9 +722,7 @@ do_checkpoint(State) ->
                 Source, SourceLog#doc{body = NewRepHistory}, source),
             {TgtRevPos, TgtRevId} = update_checkpoint(
                 Target, TargetLog#doc{body = NewRepHistory}, target),
-            SourceCurSeq = source_cur_seq(State),
             NewState = State#rep_state{
-                source_seq = SourceCurSeq,
                 checkpoint_history = NewRepHistory,
                 committed_seq = NewTsSeq,
                 source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
@@ -882,54 +875,47 @@ db_monitor(_HttpDb) ->
     nil.
 
 
-source_cur_seq(#rep_state{source = #httpdb{} = Db, source_seq = Seq}) ->
-    case (catch couch_replicator_api_wrap:get_db_info(Db#httpdb{retries = 3})) of
-    {ok, Info} ->
-        get_value(<<"update_seq">>, Info, Seq);
+get_pending_count(#rep_state{source = #httpdb{} = Db0}=St) ->
+    {_, Seq} = St#rep_state.highest_seq_done,
+    Db = Db0#httpdb{retries = 3},
+    case (catch couch_replicator_api_wrap:get_pending_count(Db, Seq)) of
+    {ok, Pending} ->
+        Pending;
     _ ->
-        Seq
+        null
     end;
-source_cur_seq(#rep_state{source = Db, source_seq = Seq}) ->
-    {ok, Info} = couch_replicator_api_wrap:get_db_info(Db),
-    get_value(<<"update_seq">>, Info, Seq).
+get_pending_count(#rep_state{source = Db}=St) ->
+    {_, Seq} = St#rep_state.highest_seq_done,
+    {ok, Pending} = couch_replicator_api_wrap:get_pending_count(Db, Seq),
+    Pending.
 
 
 update_task(State) ->
     #rep_state{
-        current_through_seq = {_, CurSeq},
-        source_seq = SourceCurSeq
+        highest_seq_done = {_, HighestSeq}
     } = State,
     couch_task_status:update(
         rep_stats(State) ++ [
-        {source_seq, SourceCurSeq},
-        case {unpack_seq(CurSeq), unpack_seq(SourceCurSeq)} of
-            {_, 0} ->
-                {progress, 0};
-            {CurSeq1, SourceCurSeq1} ->
-                {progress, (CurSeq1 * 100) div SourceCurSeq1}
-        end
+        {source_seq, HighestSeq},
+        {progress, 0}
     ]).
 
 
 rep_stats(State) ->
     #rep_state{
         committed_seq = {_, CommittedSeq},
-        stats = Stats,
-        source_seq = SourceCurSeq
+        stats = Stats
     } = State,
     [
         {revisions_checked, couch_replicator_stats:missing_checked(Stats)},
         {missing_revisions_found, couch_replicator_stats:missing_found(Stats)},
         {docs_read, couch_replicator_stats:docs_read(Stats)},
         {docs_written, couch_replicator_stats:docs_written(Stats)},
-        {changes_pending, pending(SourceCurSeq, CommittedSeq)},
+        {changes_pending, get_pending_count(State)},
         {doc_write_failures, couch_replicator_stats:doc_write_failures(Stats)},
         {checkpointed_source_seq, CommittedSeq}
     ].
 
-pending(SourceCurSeq, CommittedSeq) ->
-    unpack_seq(SourceCurSeq) - unpack_seq(CommittedSeq).
-
 unpack_seq(Seq) when is_number(Seq) ->
     Seq;
 unpack_seq([SeqNum, _]) ->

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/756f500b/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index fa64377..3b2e1de 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -26,6 +26,7 @@
     db_open/3,
     db_close/1,
     get_db_info/1,
+    get_pending_count/2,
     update_doc/3,
     update_doc/4,
     update_docs/3,
@@ -124,6 +125,20 @@ get_db_info(#db{name = DbName, user_ctx = UserCtx}) ->
     {ok, [{couch_util:to_binary(K), V} || {K, V} <- Info]}.
 
 
+get_pending_count(#httpdb{} = Db, Seq) ->
+    send_req(
+        Db,
+        [{path, "_changes"}, {qs, [{"since", Seq}, {"limit", "0"}]}],
+        fun(200, _, {Props}) ->
+            {ok, couch_util:get_value(<<"pending">>, Props, null)}
+        end);
+get_pending_count(#db{name=DbName, user_ctx = UserCtx}, Seq) ->
+    {ok, Db} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
+    Pending = couch_db:count_changes_since(Db, Seq),
+    couch_db:close(Db),
+    {ok, Pending}.
+
+
 ensure_full_commit(#httpdb{} = Db) ->
     send_req(
         Db,


[16/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Only set an ibrowse request timeout for non-_changes requests

BugzID: 25246


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/f10dec72
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/f10dec72
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/f10dec72

Branch: refs/heads/master
Commit: f10dec729805cf63e0c835198fca2253e198c870
Parents: 1746e93
Author: Benjamin Anderson <b...@banjiewen.net>
Authored: Wed Dec 11 11:25:32 2013 -0800
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 15:17:28 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator_httpc.erl | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f10dec72/src/couch_replicator_httpc.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_httpc.erl b/src/couch_replicator_httpc.erl
index 0242885..ecd6a3e 100644
--- a/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator_httpc.erl
@@ -69,19 +69,20 @@ send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb, Params) ->
     Body = get_value(body, Params, []),
     IsChanges = get_value(path, Params) == "_changes",
     if IsChanges ->
-        {ok, Worker} = ibrowse:spawn_link_worker_process(Url);
+        {ok, Worker} = ibrowse:spawn_link_worker_process(Url),
+        Timeout = infinity;
     true ->
-        {ok, Worker} = couch_replicator_httpc_pool:get_worker(HttpDb#httpdb.httpc_pool)
+        {ok, Worker} = couch_replicator_httpc_pool:get_worker(HttpDb#httpdb.httpc_pool),
+        Timeout = case config:get("replicator", "request_timeout", "infinity") of
+            "infinity" -> infinity;
+            Milliseconds -> list_to_integer(Milliseconds)
+        end
     end,
     IbrowseOptions = [
         {response_format, binary}, {inactivity_timeout, HttpDb#httpdb.timeout} |
         lists:ukeymerge(1, get_value(ibrowse_options, Params, []),
             HttpDb#httpdb.ibrowse_options)
     ],
-    Timeout = case config:get("replicator", "request_timeout", "infinity") of
-        "infinity" -> infinity;
-        Milliseconds -> list_to_integer(Milliseconds)
-    end,
     Response = ibrowse:send_req_direct(
         Worker, Url, Headers2, Method, Body, IbrowseOptions, Timeout),
     {Worker, Response, IsChanges}.


[07/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Fix replication checkpoint frequency

The patch to track and checkpoint last_seq values seen in the _changes
feed resulted in high frequency updates to the checkpoint document. This
is because the comparison made for "needs a checkpoint" was including
the replicator's incrementing counter which the changes reader was
bumping each time it saw last_seq even if it was the same last_seq as
the previous time.

This just compares the new last_seq value to the old last_seq value and
if they're equal then skips notifying the replication manager so that no
superfluous checkpoint is made.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/6acc906b
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/6acc906b
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/6acc906b

Branch: refs/heads/master
Commit: 6acc906b41130319961c210ac76bdd3835eff6ed
Parents: d6079a6
Author: Paul J. Davis <pa...@gmail.com>
Authored: Mon Jun 17 20:02:58 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 13:48:06 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator.erl | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/6acc906b/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index 895bbcc..3ef5e9c 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -659,7 +659,11 @@ read_changes(Parent, StartSeq, Db, ChangesQueue, Options, Ts) ->
                     % LS should never be undefined, but it doesn't hurt to be
                     % defensive inside the replicator.
                     Seq = case LS of undefined -> get(last_seq); _ -> LS end,
-                    ok = gen_server:call(Parent, {report_seq_done, {Ts, Seq}, #rep_stats{}}, infinity),
+                    OldSeq = get(last_seq),
+                    if Seq == OldSeq -> ok; true ->
+                        Msg = {report_seq_done, {Ts, Seq}, #rep_stats{}},
+                        ok = gen_server:call(Parent, Msg, infinity)
+                    end,
                     put(last_seq, Seq),
                     read_changes(Parent, Seq, Db, ChangesQueue, Options, Ts + 1);
                 _ ->


[02/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Send message asynchronusly from event manager

The replicator_manager was blocking for too long responding to
resume_scan events and causing general breakage.

BugzID: 17907


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/8d03f0c5
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/8d03f0c5
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/8d03f0c5

Branch: refs/heads/master
Commit: 8d03f0c59b4302ebd7765c3d8e899bf3ef150dc5
Parents: d809436
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Thu Mar 7 16:48:29 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 12:38:39 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator_manager.erl | 24 ++++++++++++------------
 1 file changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/8d03f0c5/src/couch_replicator_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index 949ceef..6afddd8 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -177,16 +177,6 @@ handle_call({rep_complete, RepId}, _From, State) ->
 handle_call({rep_error, RepId, Error}, _From, State) ->
     {reply, ok, replication_error(State, RepId, Error)};
 
-handle_call({resume_scan, DbName}, _From, State) ->
-    Since = case ets:lookup(?DB_TO_SEQ, DbName) of
-        [] -> 0;
-        [{DbName, EndSeq}] -> EndSeq
-    end,
-    ensure_rep_ddoc_exists(DbName),
-    Pid = changes_feed_loop(DbName, Since),
-    couch_log:debug("Scanning ~s from update_seq ~p", [DbName, Since]),
-    {reply, ok, State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}};
-
 handle_call({rep_db_checkpoint, DbName, EndSeq}, _From, State) ->
     true = ets:insert(?DB_TO_SEQ, {DbName, EndSeq}),
     {reply, ok, State};
@@ -199,6 +189,16 @@ handle_call(Msg, From, State) ->
         [Msg, From]),
     {stop, {error, {unexpected_call, Msg}}, State}.
 
+handle_cast({resume_scan, DbName}, State) ->
+    Since = case ets:lookup(?DB_TO_SEQ, DbName) of
+        [] -> 0;
+        [{DbName, EndSeq}] -> EndSeq
+    end,
+    ensure_rep_ddoc_exists(DbName),
+    Pid = changes_feed_loop(DbName, Since),
+    couch_log:debug("Scanning ~s from update_seq ~p", [DbName, Since]),
+    {noreply, State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}};
+
 handle_cast({set_max_retries, MaxRetries}, State) ->
     {noreply, State#state{max_retries = MaxRetries}};
 
@@ -320,7 +320,7 @@ db_update_notifier() ->
                     ensure_rep_ddoc_exists(DbName);
                 updated when IsRepDb ->
                     Msg = {resume_scan, DbName},
-                    ok = gen_server:call(Server, Msg, infinity);
+                    ok = gen_server:cast(Server, Msg);
                 deleted when IsRepDb ->
                     clean_up_replications(DbName);
                 _ ->
@@ -800,7 +800,7 @@ scan_all_dbs(Server) when is_pid(Server) ->
                 RelativeFilename -> ok
             end,
             DbName = ?l2b(filename:rootname(RelativeFilename, ".couch")),
-	    gen_server:call(Server, {resume_scan, DbName}),
+	    gen_server:cast(Server, {resume_scan, DbName}),
 	    ok
 	end, ok).
 


[08/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Make replication request timeout configurable

This is intended as a defensive measure against failures in code that's out
of our control, such as ibrowse.

BugzID: 20550


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/20b837e2
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/20b837e2
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/20b837e2

Branch: refs/heads/master
Commit: 20b837e248e584bafece0b7c40353bc42b440592
Parents: 6acc906
Author: Benjamin Anderson <b...@banjiewen.net>
Authored: Mon Jun 24 20:44:18 2013 -0700
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 13:48:29 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator_httpc.erl | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/20b837e2/src/couch_replicator_httpc.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_httpc.erl b/src/couch_replicator_httpc.erl
index 1f08750..6aeff93 100644
--- a/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator_httpc.erl
@@ -61,8 +61,12 @@ send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb, Params) ->
         lists:ukeymerge(1, get_value(ibrowse_options, Params, []),
             HttpDb#httpdb.ibrowse_options)
     ],
+    Timeout = case config:get("replicator", "request_timeout", "infinity") of
+        "infinity" -> infinity;
+        Milliseconds -> list_to_integer(Milliseconds)
+    end,
     Response = ibrowse:send_req_direct(
-        Worker, Url, Headers2, Method, Body, IbrowseOptions, infinity),
+        Worker, Url, Headers2, Method, Body, IbrowseOptions, Timeout),
     {Worker, Response}.
 
 


[29/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
start_link functions should return {ok, Pid}


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/087bef99
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/087bef99
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/087bef99

Branch: refs/heads/master
Commit: 087bef992c4a296b0e7fa3aff2c7bb63fedd8505
Parents: a85ab81
Author: Paul J. Davis <pa...@gmail.com>
Authored: Mon Aug 11 15:03:06 2014 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Mon Aug 11 15:03:06 2014 -0500

----------------------------------------------------------------------
 src/couch_replicator_changes_reader.erl | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/087bef99/src/couch_replicator_changes_reader.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_changes_reader.erl b/src/couch_replicator_changes_reader.erl
index f28dd51..623e8f9 100644
--- a/src/couch_replicator_changes_reader.erl
+++ b/src/couch_replicator_changes_reader.erl
@@ -28,17 +28,17 @@
 
 start_link(StartSeq, #httpdb{} = Db, ChangesQueue, Options) ->
     Parent = self(),
-    spawn_link(fun() ->
+    {ok, spawn_link(fun() ->
         put(last_seq, StartSeq),
         put(retries_left, Db#httpdb.retries),
         ?MODULE:read_changes(Parent, StartSeq, Db#httpdb{retries = 0},
             ChangesQueue, Options, 1)
-    end);
+    end)};
 start_link(StartSeq, Db, ChangesQueue, Options) ->
     Parent = self(),
-    spawn_link(fun() ->
+    {ok, spawn_link(fun() ->
         ?MODULE:read_changes(Parent, StartSeq, Db, ChangesQueue, Options, 1)
-    end).
+    end)}.
 
 read_changes(Parent, StartSeq, Db, ChangesQueue, Options, Ts) ->
     Continuous = couch_util:get_value(continuous, Options),


[11/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Extract code that queues items into its own fun

BugzID: 24294


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/6f2dc4cf
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/6f2dc4cf
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/6f2dc4cf

Branch: refs/heads/master
Commit: 6f2dc4cf8d5e14e8c416c6d1be4b65e82ebb029a
Parents: 7e28640
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Fri Oct 18 11:50:14 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 14:57:50 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator_changes_reader.erl | 70 ++++++++++++++--------------
 1 file changed, 36 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/6f2dc4cf/src/couch_replicator_changes_reader.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_changes_reader.erl b/src/couch_replicator_changes_reader.erl
index 390a87e..045e074 100644
--- a/src/couch_replicator_changes_reader.erl
+++ b/src/couch_replicator_changes_reader.erl
@@ -31,7 +31,8 @@ start_link(StartSeq, #httpdb{} = Db, ChangesQueue, Options) ->
     spawn_link(fun() ->
         put(last_seq, StartSeq),
         put(retries_left, Db#httpdb.retries),
-        ?MODULE:read_changes(Parent, StartSeq, Db#httpdb{retries = 0}, ChangesQueue, Options, 1)
+        ?MODULE:read_changes(Parent, StartSeq, Db#httpdb{retries = 0},
+            ChangesQueue, Options, 1)
     end);
 start_link(StartSeq, Db, ChangesQueue, Options) ->
     Parent = self(),
@@ -40,41 +41,11 @@ start_link(StartSeq, Db, ChangesQueue, Options) ->
     end).
 
 read_changes(Parent, StartSeq, Db, ChangesQueue, Options, Ts) ->
+    Continuous = couch_util:get_value(continuous, Options),
     try
         couch_replicator_api_wrap:changes_since(Db, all_docs, StartSeq,
-            fun(#doc_info{high_seq = Seq, id = Id} = DocInfo) ->
-                case Id of
-                <<>> ->
-                    % Previous CouchDB releases had a bug which allowed a doc
-                    % with an empty ID to be inserted into databases. Such doc
-                    % is impossible to GET.
-                    couch_log:error("Replicator: ignoring document with empty ID in "
-                        "source database `~s` (_changes sequence ~p)",
-                        [couch_replicator_api_wrap:db_uri(Db), Seq]);
-                _ ->
-                    ok = couch_work_queue:queue(ChangesQueue, DocInfo)
-                end,
-                put(last_seq, Seq);
-            ({last_seq, LS}) ->
-                case get_value(continuous, Options) of
-                true ->
-                    % LS should never be undefined, but it doesn't hurt to be
-                    % defensive inside the replicator.
-                    Seq = case LS of undefined -> get(last_seq); _ -> LS end,
-                    OldSeq = get(last_seq),
-                    if Seq == OldSeq -> ok; true ->
-                        Msg = {report_seq_done, {Ts, Seq}, #rep_stats{}},
-                        ok = gen_server:call(Parent, Msg, infinity)
-                    end,
-                    put(last_seq, Seq),
-                    throw(recurse);
-                _ ->
-                    % This clause is unreachable today, but let's plan ahead
-                    % for the future where we checkpoint against last_seq
-                    % instead of the sequence of the last change.  The two can
-                    % differ substantially in the case of a restrictive filter.
-                    ok
-                end
+            fun(Item) ->
+                process_change(Item, {Parent, Db, ChangesQueue, Continuous, Ts})
             end, Options),
         couch_work_queue:close(ChangesQueue)
     catch
@@ -103,3 +74,34 @@ read_changes(Parent, StartSeq, Db, ChangesQueue, Options, Ts) ->
             exit(Error)
         end
     end.
+
+
+process_change(#doc_info{id = <<>>} = DocInfo, {_, Db, _, _, _}) ->
+    % Previous CouchDB releases had a bug which allowed a doc with an empty ID
+    % to be inserted into databases. Such doc is impossible to GET.
+    couch_log:error("Replicator: ignoring document with empty ID in "
+        "source database `~s` (_changes sequence ~p)",
+        [couch_replicator_api_wrap:db_uri(Db), DocInfo#doc_info.high_seq]);
+
+process_change(#doc_info{} = DocInfo, {_, _, ChangesQueue, _, _}) ->
+    ok = couch_work_queue:queue(ChangesQueue, DocInfo),
+    put(last_seq, DocInfo#doc_info.high_seq);
+
+process_change({last_seq, LS}, {Parent, _, _, true = _Continuous, Ts}) ->
+    % LS should never be undefined, but it doesn't hurt to be defensive inside
+    % the replicator.
+    Seq = case LS of undefined -> get(last_seq); _ -> LS end,
+    OldSeq = get(last_seq),
+    if Seq == OldSeq -> ok; true ->
+        Msg = {report_seq_done, {Ts, Seq}, #rep_stats{}},
+        ok = gen_server:call(Parent, Msg, infinity)
+    end,
+    put(last_seq, Seq),
+    throw(recurse);
+
+process_change({last_seq, _}, _) ->
+    % This clause is unreachable today, but let's plan ahead for the future
+    % where we checkpoint against last_seq instead of the sequence of the last
+    % change.  The two can differ substantially in the case of a restrictive
+    % filter.
+    ok.


[03/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Handle last_seq returns from failed filters

Fabric changes will now return a last sequence when the continuous request has timed out but docs
have failed to pass the filter. This enables more accurate tracking of progress.

BugzID: 17709


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/78983131
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/78983131
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/78983131

Branch: refs/heads/master
Commit: 789831310b75b19c43ef3037e1abd36bc7bd9bb4
Parents: 8d03f0c
Author: Bob Dionne <bo...@cloudant.com>
Authored: Mon Mar 4 09:48:33 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 12:38:53 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator.erl | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/78983131/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index d2cdbe5..895bbcc 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -408,6 +408,8 @@ handle_call({report_seq_done, Seq, StatsInc}, From,
         current_through_seq = ThroughSeq, stats = Stats} = State) ->
     gen_server:reply(From, ok),
     {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of
+    [] ->
+        {Seq, []};
     [Seq | Rest] ->
         {Seq, Rest};
     [_ | _] ->
@@ -623,17 +625,19 @@ fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
 
 
 spawn_changes_reader(StartSeq, #httpdb{} = Db, ChangesQueue, Options) ->
+    Parent = self(),
     spawn_link(fun() ->
         put(last_seq, StartSeq),
         put(retries_left, Db#httpdb.retries),
-        read_changes(StartSeq, Db#httpdb{retries = 0}, ChangesQueue, Options)
+        read_changes(Parent, StartSeq, Db#httpdb{retries = 0}, ChangesQueue, Options, 1)
     end);
 spawn_changes_reader(StartSeq, Db, ChangesQueue, Options) ->
+    Parent = self(),
     spawn_link(fun() ->
-        read_changes(StartSeq, Db, ChangesQueue, Options)
+        read_changes(Parent, StartSeq, Db, ChangesQueue, Options, 1)
     end).
 
-read_changes(StartSeq, Db, ChangesQueue, Options) ->
+read_changes(Parent, StartSeq, Db, ChangesQueue, Options, Ts) ->
     try
         couch_replicator_api_wrap:changes_since(Db, all_docs, StartSeq,
             fun(#doc_info{high_seq = Seq, id = Id} = DocInfo) ->
@@ -655,7 +659,9 @@ read_changes(StartSeq, Db, ChangesQueue, Options) ->
                     % LS should never be undefined, but it doesn't hurt to be
                     % defensive inside the replicator.
                     Seq = case LS of undefined -> get(last_seq); _ -> LS end,
-                    read_changes(Seq, Db, ChangesQueue, Options);
+                    ok = gen_server:call(Parent, {report_seq_done, {Ts, Seq}, #rep_stats{}}, infinity),
+                    put(last_seq, Seq),
+                    read_changes(Parent, Seq, Db, ChangesQueue, Options, Ts + 1);
                 _ ->
                     % This clause is unreachable today, but let's plan ahead
                     % for the future where we checkpoint against last_seq
@@ -682,7 +688,7 @@ read_changes(StartSeq, Db, ChangesQueue, Options) ->
                     " with since=~p", [couch_replicator_api_wrap:db_uri(Db), LastSeq]),
                 Db
             end,
-            read_changes(LastSeq, Db2, ChangesQueue, Options);
+            read_changes(Parent, LastSeq, Db2, ChangesQueue, Options, Ts);
         _ ->
             exit(Error)
         end


[13/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Remove old code_change, set module version to 1


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/0c095c0a
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/0c095c0a
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/0c095c0a

Branch: refs/heads/master
Commit: 0c095c0af6d98218b08dc1326dc7409ff8c61286
Parents: 65a1a10
Author: Robert Newson <ro...@cloudant.com>
Authored: Fri Nov 22 16:46:18 2013 +0000
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 15:03:10 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator.erl            |  5 +----
 src/couch_replicator_httpc_pool.erl | 13 ++-----------
 src/couch_replicator_manager.erl    |  1 +
 src/couch_replicator_notifier.erl   |  1 +
 src/couch_replicator_worker.erl     |  1 +
 5 files changed, 6 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/0c095c0a/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index 47bbd3a..ef44c54 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -12,6 +12,7 @@
 
 -module(couch_replicator).
 -behaviour(gen_server).
+-vsn(1).
 
 % public API
 -export([replicate/2]).
@@ -466,10 +467,6 @@ handle_cast({report_seq, Seq},
     {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}}.
 
 
-code_change(OldVsn, OldState, Extra) when tuple_size(OldState) =:= 30 ->
-    code_change(OldVsn, erlang:append_element(OldState, true), Extra);
-code_change(OldVsn, OldState, Extra) when tuple_size(OldState) =:= 31 ->
-    code_change(OldVsn, erlang:append_element(OldState, 5000), Extra);
 code_change(_OldVsn, #rep_state{}=State, _Extra) ->
     {ok, State}.
 

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/0c095c0a/src/couch_replicator_httpc_pool.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_httpc_pool.erl b/src/couch_replicator_httpc_pool.erl
index 0a42284..c895048 100644
--- a/src/couch_replicator_httpc_pool.erl
+++ b/src/couch_replicator_httpc_pool.erl
@@ -12,6 +12,7 @@
 
 -module(couch_replicator_httpc_pool).
 -behaviour(gen_server).
+-vsn(1).
 
 % public API
 -export([start_link/2, stop/1]).
@@ -163,17 +164,7 @@ handle_info({'DOWN', Ref, process, _, _}, #state{callers = Callers} = State) ->
             {noreply, State}
     end.
 
-code_change(_OldVsn, OldState, _Extra) when tuple_size(OldState) =:= 7 ->
-    case element(7, OldState) of
-        EtsTable when is_integer(EtsTable) ->
-            NewState = setelement(7, OldState, ets:tab2list(EtsTable)),
-            ets:delete(EtsTable),
-            {ok, NewState};
-        Callers when is_list(Callers) ->
-            % Already upgraded
-            {ok, OldState}
-    end;
-code_change(_OldVsn, State, _Extra) ->
+code_change(_OldVsn, #state{}=State, _Extra) ->
     {ok, State}.
 
 

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/0c095c0a/src/couch_replicator_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index 949622a..b0c1c9b 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -12,6 +12,7 @@
 
 -module(couch_replicator_manager).
 -behaviour(gen_server).
+-vsn(1).
 -behaviour(config_listener).
 
 % public API

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/0c095c0a/src/couch_replicator_notifier.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_notifier.erl b/src/couch_replicator_notifier.erl
index 39fd68b..99b199a 100644
--- a/src/couch_replicator_notifier.erl
+++ b/src/couch_replicator_notifier.erl
@@ -13,6 +13,7 @@
 -module(couch_replicator_notifier).
 
 -behaviour(gen_event).
+-vsn(1).
 
 % public API
 -export([start_link/1, stop/1, notify/1]).

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/0c095c0a/src/couch_replicator_worker.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_worker.erl b/src/couch_replicator_worker.erl
index a0638c6..5c8601d 100644
--- a/src/couch_replicator_worker.erl
+++ b/src/couch_replicator_worker.erl
@@ -12,6 +12,7 @@
 
 -module(couch_replicator_worker).
 -behaviour(gen_server).
+-vsn(1).
 
 % public API
 -export([start_link/5]).


[26/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Owner must be live

BugzID: 31712


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/22e9ff48
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/22e9ff48
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/22e9ff48

Branch: refs/heads/master
Commit: 22e9ff4838b29f0a1080a82a4845808caf96fa6c
Parents: f90b6e0
Author: Robert Newson <rn...@apache.org>
Authored: Mon Jun 16 13:37:58 2014 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 11:56:46 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator_manager.erl | 13 ++++++-------
 1 file changed, 6 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/22e9ff48/src/couch_replicator_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index bcb6462..c145b75 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -380,7 +380,7 @@ rescan(#state{scan_pid = ScanPid} = State) ->
 process_update(State, DbName, {Change}) ->
     {RepProps} = JsonRepDoc = get_json_value(doc, Change),
     DocId = get_json_value(<<"_id">>, RepProps),
-    case {is_owner(DbName, DocId), get_json_value(deleted, Change, false)} of
+    case {owner(DbName, DocId), get_json_value(deleted, Change, false)} of
     {_, true} ->
         rep_doc_deleted(DbName, DocId),
         State;
@@ -405,12 +405,11 @@ process_update(State, DbName, {Change}) ->
         end
     end.
 
-
-is_owner(<<"shards/", _/binary>>=DbName, DocId) ->
-    mem3_util:owner(mem3:dbname(DbName), DocId);
-is_owner(_, _) ->
-    true.
-
+owner(DbName, DocId) ->
+    Live = [node()|nodes()],
+    Nodes = lists:sort([N || #shard{node=N} <- mem3:shards(DbName, DocId),
+			     lists:member(N, Live)]),
+    node() =:= hd(mem3_util:rotate_list({DbName, DocId}, Nodes)).
 
 rep_db_update_error(Error, DbName, DocId) ->
     case Error of


[32/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Update changes_reader_cb for new Pending field

We've added a Pending count to changes messages that gets displayed in
replication task statuses. This just ignores that value in the
replication manager.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/e5e5d84b
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/e5e5d84b
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/e5e5d84b

Branch: refs/heads/master
Commit: e5e5d84b5c19b49c40a46530d7e3bcd1c8473110
Parents: 1b32380
Author: Paul J. Davis <pa...@gmail.com>
Authored: Tue Aug 12 12:38:15 2014 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Tue Aug 12 12:38:15 2014 -0500

----------------------------------------------------------------------
 src/couch_replicator_manager.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/e5e5d84b/src/couch_replicator_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index 9b1e818..3566433 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -308,7 +308,7 @@ changes_reader(Server, DbName, Since) ->
     ),
     ChangesFeedFun({fun ?MODULE:changes_reader_cb/3, {Server, DbName}}).
 
-changes_reader_cb({change, Change}, _, {Server, DbName}) ->
+changes_reader_cb({change, Change, _}, _, {Server, DbName}) ->
     case has_valid_rep_id(Change) of
         true ->
             Msg = {rep_db_update, DbName, Change},


[10/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Move changes_reader to a dedicated module

Also, export read_changes for better upgrade handling.

BugzID: 24294


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/7e286402
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/7e286402
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/7e286402

Branch: refs/heads/master
Commit: 7e28640275f9063ff80d19ccd3cd722f6fe0865a
Parents: 44328a6
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Fri Oct 18 11:31:59 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 14:16:03 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator.erl                |  81 +--------------------
 src/couch_replicator_changes_reader.erl | 105 +++++++++++++++++++++++++++
 2 files changed, 108 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/7e286402/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index bb81cf0..5891084 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -252,7 +252,9 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
     ]),
     % This starts the _changes reader process. It adds the changes from
     % the source db to the ChangesQueue.
-    ChangesReader = spawn_changes_reader(StartSeq, Source, ChangesQueue, Options),
+    {ok, ChangesReader} = couch_replicator_changes_reader:start_link(
+        StartSeq, Source, ChangesQueue, Options
+    ),
     % Changes manager - responsible for dequeing batches from the changes queue
     % and deliver them to the worker processes.
     ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize),
@@ -624,83 +626,6 @@ fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
     end.
 
 
-spawn_changes_reader(StartSeq, #httpdb{} = Db, ChangesQueue, Options) ->
-    Parent = self(),
-    spawn_link(fun() ->
-        put(last_seq, StartSeq),
-        put(retries_left, Db#httpdb.retries),
-        read_changes(Parent, StartSeq, Db#httpdb{retries = 0}, ChangesQueue, Options, 1)
-    end);
-spawn_changes_reader(StartSeq, Db, ChangesQueue, Options) ->
-    Parent = self(),
-    spawn_link(fun() ->
-        read_changes(Parent, StartSeq, Db, ChangesQueue, Options, 1)
-    end).
-
-read_changes(Parent, StartSeq, Db, ChangesQueue, Options, Ts) ->
-    try
-        couch_replicator_api_wrap:changes_since(Db, all_docs, StartSeq,
-            fun(#doc_info{high_seq = Seq, id = Id} = DocInfo) ->
-                case Id of
-                <<>> ->
-                    % Previous CouchDB releases had a bug which allowed a doc
-                    % with an empty ID to be inserted into databases. Such doc
-                    % is impossible to GET.
-                    couch_log:error("Replicator: ignoring document with empty ID in "
-                        "source database `~s` (_changes sequence ~p)",
-                        [couch_replicator_api_wrap:db_uri(Db), Seq]);
-                _ ->
-                    ok = couch_work_queue:queue(ChangesQueue, DocInfo)
-                end,
-                put(last_seq, Seq);
-            ({last_seq, LS}) ->
-                case get_value(continuous, Options) of
-                true ->
-                    % LS should never be undefined, but it doesn't hurt to be
-                    % defensive inside the replicator.
-                    Seq = case LS of undefined -> get(last_seq); _ -> LS end,
-                    OldSeq = get(last_seq),
-                    if Seq == OldSeq -> ok; true ->
-                        Msg = {report_seq_done, {Ts, Seq}, #rep_stats{}},
-                        ok = gen_server:call(Parent, Msg, infinity)
-                    end,
-                    put(last_seq, Seq),
-                    throw(recurse);
-                _ ->
-                    % This clause is unreachable today, but let's plan ahead
-                    % for the future where we checkpoint against last_seq
-                    % instead of the sequence of the last change.  The two can
-                    % differ substantially in the case of a restrictive filter.
-                    ok
-                end
-            end, Options),
-        couch_work_queue:close(ChangesQueue)
-    catch
-        throw:recurse ->
-            LS = get(last_seq),
-            read_changes(Parent, LS, Db, ChangesQueue, Options, Ts+1);
-        exit:{http_request_failed, _, _, _} = Error ->
-        case get(retries_left) of
-        N when N > 0 ->
-            put(retries_left, N - 1),
-            LastSeq = get(last_seq),
-            Db2 = case LastSeq of
-            StartSeq ->
-                couch_log:notice("Retrying _changes request to source database ~s"
-                    " with since=~p in ~p seconds",
-                    [couch_replicator_api_wrap:db_uri(Db), LastSeq, Db#httpdb.wait / 1000]),
-                ok = timer:sleep(Db#httpdb.wait),
-                Db#httpdb{wait = 2 * Db#httpdb.wait};
-            _ ->
-                couch_log:notice("Retrying _changes request to source database ~s"
-                    " with since=~p", [couch_replicator_api_wrap:db_uri(Db), LastSeq]),
-                Db
-            end,
-            read_changes(Parent, LastSeq, Db2, ChangesQueue, Options, Ts);
-        _ ->
-            exit(Error)
-        end
-    end.
 
 
 spawn_changes_manager(Parent, ChangesQueue, BatchSize) ->

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/7e286402/src/couch_replicator_changes_reader.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_changes_reader.erl b/src/couch_replicator_changes_reader.erl
new file mode 100644
index 0000000..390a87e
--- /dev/null
+++ b/src/couch_replicator_changes_reader.erl
@@ -0,0 +1,105 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_changes_reader).
+
+% Public API
+-export([start_link/4]).
+
+% Exported for code reloading
+-export([read_changes/6]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator_api_wrap.hrl").
+-include("couch_replicator.hrl").
+
+-import(couch_util, [
+    get_value/2
+]).
+
+start_link(StartSeq, #httpdb{} = Db, ChangesQueue, Options) ->
+    Parent = self(),
+    spawn_link(fun() ->
+        put(last_seq, StartSeq),
+        put(retries_left, Db#httpdb.retries),
+        ?MODULE:read_changes(Parent, StartSeq, Db#httpdb{retries = 0}, ChangesQueue, Options, 1)
+    end);
+start_link(StartSeq, Db, ChangesQueue, Options) ->
+    Parent = self(),
+    spawn_link(fun() ->
+        ?MODULE:read_changes(Parent, StartSeq, Db, ChangesQueue, Options, 1)
+    end).
+
+read_changes(Parent, StartSeq, Db, ChangesQueue, Options, Ts) ->
+    try
+        couch_replicator_api_wrap:changes_since(Db, all_docs, StartSeq,
+            fun(#doc_info{high_seq = Seq, id = Id} = DocInfo) ->
+                case Id of
+                <<>> ->
+                    % Previous CouchDB releases had a bug which allowed a doc
+                    % with an empty ID to be inserted into databases. Such doc
+                    % is impossible to GET.
+                    couch_log:error("Replicator: ignoring document with empty ID in "
+                        "source database `~s` (_changes sequence ~p)",
+                        [couch_replicator_api_wrap:db_uri(Db), Seq]);
+                _ ->
+                    ok = couch_work_queue:queue(ChangesQueue, DocInfo)
+                end,
+                put(last_seq, Seq);
+            ({last_seq, LS}) ->
+                case get_value(continuous, Options) of
+                true ->
+                    % LS should never be undefined, but it doesn't hurt to be
+                    % defensive inside the replicator.
+                    Seq = case LS of undefined -> get(last_seq); _ -> LS end,
+                    OldSeq = get(last_seq),
+                    if Seq == OldSeq -> ok; true ->
+                        Msg = {report_seq_done, {Ts, Seq}, #rep_stats{}},
+                        ok = gen_server:call(Parent, Msg, infinity)
+                    end,
+                    put(last_seq, Seq),
+                    throw(recurse);
+                _ ->
+                    % This clause is unreachable today, but let's plan ahead
+                    % for the future where we checkpoint against last_seq
+                    % instead of the sequence of the last change.  The two can
+                    % differ substantially in the case of a restrictive filter.
+                    ok
+                end
+            end, Options),
+        couch_work_queue:close(ChangesQueue)
+    catch
+        throw:recurse ->
+            LS = get(last_seq),
+            read_changes(Parent, LS, Db, ChangesQueue, Options, Ts+1);
+        exit:{http_request_failed, _, _, _} = Error ->
+        case get(retries_left) of
+        N when N > 0 ->
+            put(retries_left, N - 1),
+            LastSeq = get(last_seq),
+            Db2 = case LastSeq of
+            StartSeq ->
+                couch_log:notice("Retrying _changes request to source database ~s"
+                    " with since=~p in ~p seconds",
+                    [couch_replicator_api_wrap:db_uri(Db), LastSeq, Db#httpdb.wait / 1000]),
+                ok = timer:sleep(Db#httpdb.wait),
+                Db#httpdb{wait = 2 * Db#httpdb.wait};
+            _ ->
+                couch_log:notice("Retrying _changes request to source database ~s"
+                    " with since=~p", [couch_replicator_api_wrap:db_uri(Db), LastSeq]),
+                Db
+            end,
+            read_changes(Parent, LastSeq, Db2, ChangesQueue, Options, Ts);
+        _ ->
+            exit(Error)
+        end
+    end.


[25/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Don't die when a known replicator related pid dies

The couch_replicator_manager would die when any replication related pid
exited. Instead of that we don't. This means we don't cancel and restart
all known replications that happen to be running on the node due to an
error starting replications for a given database or document.

BugzId: 27666


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/f90b6e0e
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/f90b6e0e
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/f90b6e0e

Branch: refs/heads/master
Commit: f90b6e0ee0dceca7d43c1694353e2784afb92cb6
Parents: 0ae9e60
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Jan 30 20:01:19 2014 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 11:56:41 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator_manager.erl | 12 ++++++++++++
 1 file changed, 12 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f90b6e0e/src/couch_replicator_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index 21e759f..bcb6462 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -234,6 +234,18 @@ handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) ->
     % one of the replication start processes terminated successfully
     {noreply, State#state{rep_start_pids = Pids -- [From]}};
 
+handle_info({'EXIT', From, Reason}, #state{rep_start_pids = Pids} = State) ->
+    case lists:member(From, Pids) of
+        true ->
+            Fmt = "~s : Known replication pid ~w died :: ~w",
+            couch_log:error(Fmt, [?MODULE, From, Reason]),
+            {noreply, State#state{rep_start_pids = Pids -- [From]}};
+        false ->
+            Fmt = "~s : Unknown pid ~w died :: ~w",
+            couch_log:error(Fmt, [?MODULE, From, Reason]),
+            {stop, {unexpected_exit, From, Reason}, State}
+    end;
+
 handle_info({'DOWN', _Ref, _, _, _}, State) ->
     % From a db monitor created by a replication process. Ignore.
     {noreply, State};


[28/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
squash! Fix couch_replicator_mangaer changes feed upgrades


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/a85ab816
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/a85ab816
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/a85ab816

Branch: refs/heads/master
Commit: a85ab8168a2c0817b009d75b01538f188ad9bbc3
Parents: 75e5ba1
Author: Robert Newson <rn...@apache.org>
Authored: Fri Aug 8 10:27:06 2014 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 8 10:27:06 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator_manager.erl | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/a85ab816/src/couch_replicator_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index c145b75..9b1e818 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -25,7 +25,7 @@
 -export([code_change/3, terminate/2]).
 
 % changes callbacks
--export([changes_reader/3, changes_reader_cb/2]).
+-export([changes_reader/3, changes_reader_cb/3]).
 
 % config_listener callback
 -export([handle_config_change/5]).
@@ -306,9 +306,9 @@ changes_reader(Server, DbName, Since) ->
         {json_req, null},
         Db
     ),
-    ChangesFeedFun({fun ?MODULE:changes_reader_cb/2, {Server, DbName}}).
+    ChangesFeedFun({fun ?MODULE:changes_reader_cb/3, {Server, DbName}}).
 
-changes_reader_cb({change, Change}, {Server, DbName}) ->
+changes_reader_cb({change, Change}, _, {Server, DbName}) ->
     case has_valid_rep_id(Change) of
         true ->
             Msg = {rep_db_update, DbName, Change},
@@ -317,11 +317,11 @@ changes_reader_cb({change, Change}, {Server, DbName}) ->
             ok
     end,
     {ok, {Server, DbName}};
-changes_reader_cb({stop, EndSeq, _Pending}, {Server, DbName}) ->
+changes_reader_cb({stop, EndSeq, _Pending}, _, {Server, DbName}) ->
     Msg = {rep_db_checkpoint, DbName, EndSeq},
     ok = gen_server:call(Server, Msg, infinity),
     {ok, {Server, DbName}};
-changes_reader_cb(_, Acc) ->
+changes_reader_cb(_, _, Acc) ->
     {ok, Acc}.
 
 has_valid_rep_id({Change}) ->


[19/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Remove unused unpack_seqs function

This is no longer needed now that we're not calculating progress based
off update_seq values.

BugzId: 26015


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/a18aa43c
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/a18aa43c
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/a18aa43c

Branch: refs/heads/master
Commit: a18aa43c6a49fd4d43aebc0a58e7d724e6fe277a
Parents: 353f51f
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Dec 11 10:11:14 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 15:17:52 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator.erl | 10 ----------
 1 file changed, 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/a18aa43c/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index 006770c..0c06145 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -934,13 +934,3 @@ rep_stats(State) ->
         {doc_write_failures, couch_replicator_stats:doc_write_failures(Stats)},
         {checkpointed_source_seq, CommittedSeq}
     ].
-
-unpack_seq(Seq) when is_number(Seq) ->
-    Seq;
-unpack_seq([SeqNum, _]) ->
-    SeqNum;
-unpack_seq(Seq) when is_binary(Seq) ->
-    Pattern = "^\\[?(?<seqnum>[0-9]+)",
-    Options = [{capture, [seqnum], list}],
-    {match, [SeqNum]} = re:run(Seq, Pattern, Options),
-    list_to_integer(SeqNum).


[30/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Add the mod entry so the application starts correctly.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/9b2dd482
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/9b2dd482
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/9b2dd482

Branch: refs/heads/master
Commit: 9b2dd48231417c2b84dae746f8bf81accf09bb12
Parents: 087bef9
Author: Paul J. Davis <pa...@gmail.com>
Authored: Mon Aug 11 15:03:37 2014 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Mon Aug 11 15:03:37 2014 -0500

----------------------------------------------------------------------
 src/couch_replicator.app.src | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/9b2dd482/src/couch_replicator.app.src
----------------------------------------------------------------------
diff --git a/src/couch_replicator.app.src b/src/couch_replicator.app.src
index 06c6c68..f4470f2 100644
--- a/src/couch_replicator.app.src
+++ b/src/couch_replicator.app.src
@@ -13,6 +13,7 @@
 {application, couch_replicator, [
     {description, "CouchDB replicator"},
     {vsn, git},
+    {mod, {couch_replicator_app, []}},
     {modules, [
         couch_replicator,
         couch_replicator_api_wrap,


[06/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Update to use new couch_event application


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/d6079a6f
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/d6079a6f
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/d6079a6f

Branch: refs/heads/master
Commit: d6079a6f2420cb8d704f10e0d2b3953453d1fc83
Parents: dd96e21
Author: Paul J. Davis <pa...@gmail.com>
Authored: Tue Apr 23 17:57:15 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 13:44:46 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator.app.src     |  3 +-
 src/couch_replicator_manager.erl | 67 +++++++++++++++++++++--------------
 src/couch_replicator_utils.erl   | 24 ++++++++-----
 3 files changed, 58 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/d6079a6f/src/couch_replicator.app.src
----------------------------------------------------------------------
diff --git a/src/couch_replicator.app.src b/src/couch_replicator.app.src
index 740d007..06c6c68 100644
--- a/src/couch_replicator.app.src
+++ b/src/couch_replicator.app.src
@@ -37,7 +37,8 @@
         stdlib,
         couch_log,
         mem3,
-        couch
+        couch,
+        couch_event
     ]}
 ]}.
 

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/d6079a6f/src/couch_replicator_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index 71e6be0..949622a 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -26,6 +26,8 @@
 % config_listener callback
 -export([handle_config_change/5]).
 
+-export([handle_db_event/3]).
+
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("mem3/include/mem3.hrl").
 -include("couch_replicator.hrl").
@@ -56,7 +58,7 @@
 ]).
 
 -record(state, {
-    db_notifier = nil,
+    event_listener = nil,
     scan_pid = nil,
     rep_start_pids = [],
     max_retries
@@ -134,7 +136,7 @@ init(_) ->
     ensure_rep_db_exists(LocalRepDb),
     Pid = changes_feed_loop(LocalRepDb, 0),
     {ok, #state{
-        db_notifier = db_update_notifier(),
+        event_listener = start_event_listener(),
         scan_pid = ScanPid,
         max_retries = retries_value(
             config:get("replicator", "max_replication_retry_count", "10")),
@@ -220,7 +222,7 @@ handle_info({'EXIT', From, Reason}, #state{scan_pid = From} = State) ->
     couch_log:error("Background scanner died. Reason: ~p", [Reason]),
     {stop, {scanner_died, Reason}, State};
 
-handle_info({'EXIT', From, Reason}, #state{db_notifier = From} = State) ->
+handle_info({'EXIT', From, Reason}, #state{event_listener = From} = State) ->
     couch_log:error("Database update notifier died. Reason: ~p", [Reason]),
     {stop, {db_update_notifier_died, Reason}, State};
 
@@ -252,7 +254,7 @@ terminate(_Reason, State) ->
     #state{
         scan_pid = ScanPid,
         rep_start_pids = StartPids,
-        db_notifier = DbNotifier
+        event_listener = Listener
     } = State,
     stop_all_replications(),
     lists:foreach(
@@ -264,7 +266,7 @@ terminate(_Reason, State) ->
     true = ets:delete(?REP_TO_STATE),
     true = ets:delete(?DOC_TO_REP),
     true = ets:delete(?DB_TO_SEQ),
-    couch_db_update_notifier:stop(DbNotifier).
+    couch_event:stop_listener(Listener).
 
 
 code_change(_OldVsn, State, _Extra) ->
@@ -309,28 +311,41 @@ has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) ->
 has_valid_rep_id(_Else) ->
     true.
 
-db_update_notifier() ->
-    Server = self(),
-    {ok, Notifier} = couch_db_update_notifier:start_link(fun
-        ({Event, DbName})
-                when Event == created; Event == updated; Event == deleted ->
-            IsRepDb = is_replicator_db(DbName),
-            case Event of
-                created when IsRepDb ->
-                    ensure_rep_ddoc_exists(DbName);
-                updated when IsRepDb ->
-                    Msg = {resume_scan, DbName},
-                    ok = gen_server:cast(Server, Msg);
-                deleted when IsRepDb ->
-                    clean_up_replications(DbName);
-                _ ->
-                    ok
-            end;
-        (_Event) ->
+
+start_event_listener() ->
+    {ok, Pid} = couch_event:link_listener(
+            ?MODULE, handle_db_event, self(), [all_dbs]
+        ),
+    Pid.
+
+
+handle_db_event(DbName, created, Server) ->
+    case is_replicator_db(DbName) of
+	true ->
+	    ensure_rep_ddoc_exists(DbName);
+	_ ->
+	    ok
+    end,
+    {ok, Server};
+handle_db_event(DbName, updated, Server) ->
+    case is_replicator_db(DbName) of
+        true ->
+	    Msg = {resume_scan, DbName},
+	    ok = gen_server:cast(Server, Msg);
+        _ ->
             ok
-        end
-    ),
-    Notifier.
+    end,
+    {ok, Server};
+handle_db_event(DbName, deleted, Server) ->
+    case is_replicator_db(DbName) of
+        true ->
+            clean_up_replications(DbName);
+        _ ->
+            ok
+    end,
+    {ok, Server};
+handle_db_event(_DbName, _Event, Server) ->
+    {ok, Server}.
 
 rescan(#state{scan_pid = nil} = State) ->
     true = ets:delete_all_objects(?DB_TO_SEQ),

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/d6079a6f/src/couch_replicator_utils.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_utils.erl b/src/couch_replicator_utils.erl
index dff8e50..f489ff1 100644
--- a/src/couch_replicator_utils.erl
+++ b/src/couch_replicator_utils.erl
@@ -19,6 +19,8 @@
 -export([sum_stats/2, is_deleted/1]).
 -export([mp_parse_doc/2]).
 
+-export([handle_db_event/3]).
+
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("ibrowse/include/ibrowse.hrl").
 -include("couch_replicator_api_wrap.hrl").
@@ -370,21 +372,25 @@ close_db(_HttpDb) ->
 
 
 start_db_compaction_notifier(#db{name = DbName}, Server) ->
-    {ok, Notifier} = couch_db_update_notifier:start_link(
-        fun({compacted, DbName1}) when DbName1 =:= DbName ->
-                ok = gen_server:cast(Server, {db_compacted, DbName});
-            (_) ->
-                ok
-        end),
-    Notifier;
+    {ok, Pid} = couch_event:link_listener(
+            ?MODULE, handle_db_event, Server, [{dbname, DbName}]
+        ),
+    Pid;
 start_db_compaction_notifier(_, _) ->
     nil.
 
 
 stop_db_compaction_notifier(nil) ->
     ok;
-stop_db_compaction_notifier(Notifier) ->
-    couch_db_update_notifier:stop(Notifier).
+stop_db_compaction_notifier(Listener) ->
+    couch_event:stop_listener(Listener).
+
+
+handle_db_event(DbName, compacted, Server) ->
+    gen_server:cast(Server, {db_compacted, DbName}),
+    {ok, Server};
+handle_db_event(_DbName, _Event, Server) ->
+    {ok, Server}.
 
 
 sum_stats(#rep_stats{} = S1, #rep_stats{} = S2) ->


[22/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Move attachment code into couch_att

This is an attempt to isolate the attachment record and
some related code. This will allow seamless upgrades
over time.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/603e4a87
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/603e4a87
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/603e4a87

Branch: refs/heads/master
Commit: 603e4a87cce891f558527c676ef8895556b11b96
Parents: b80c7c0
Author: Brian Mitchell <br...@p2p.io>
Authored: Wed Dec 11 23:10:56 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 15:23:53 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator_worker.erl | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/603e4a87/src/couch_replicator_worker.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_worker.erl b/src/couch_replicator_worker.erl
index 5c8601d..bb31d5d 100644
--- a/src/couch_replicator_worker.erl
+++ b/src/couch_replicator_worker.erl
@@ -434,7 +434,8 @@ batch_doc(#doc{atts = []}) ->
 batch_doc(#doc{atts = Atts}) ->
     (length(Atts) =< ?MAX_BULK_ATTS_PER_DOC) andalso
         lists:all(
-            fun(#att{disk_len = L, data = Data}) ->
+            fun(Att) ->
+                [L, Data] = couch_att:fetch([disk_len, data], Att),
                 (L =< ?MAX_BULK_ATT_SIZE) andalso (Data =/= stub)
             end, Atts).
 


[35/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Expose current_through_seq in task statuses

This lets clients know how far along in the source update sequence we
have processed but not necessarily committed.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/7ff3a52e
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/7ff3a52e
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/7ff3a52e

Branch: refs/heads/master
Commit: 7ff3a52e9701ad1ef95be8791a9df32087a23777
Parents: 9640ccf
Author: Paul J. Davis <pa...@gmail.com>
Authored: Tue Aug 12 16:01:30 2014 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Tue Aug 12 16:01:30 2014 -0500

----------------------------------------------------------------------
 src/couch_replicator.erl | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/7ff3a52e/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index 0c06145..6bfd3bb 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -912,11 +912,13 @@ get_pending_count_int(#rep_state{source = Db}=St) ->
 
 update_task(State) ->
     #rep_state{
+        current_through_seq = {_, ThroughSeq},
         highest_seq_done = {_, HighestSeq}
     } = State,
     couch_task_status:update(
         rep_stats(State) ++ [
-        {source_seq, HighestSeq}
+        {source_seq, HighestSeq},
+        {through_seq, ThroughSeq}
     ]).
 
 


[33/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Fix return from the changes_reader_cb

CouchDB's changes enumerator doesn't support stopping the read midstream
so we need to not return a tagged accumulator.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/c6be20d1
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/c6be20d1
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/c6be20d1

Branch: refs/heads/master
Commit: c6be20d1d1e644d76dc0f55ff3eec0e2c5170889
Parents: e5e5d84
Author: Paul J. Davis <pa...@gmail.com>
Authored: Tue Aug 12 12:39:28 2014 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Tue Aug 12 12:39:28 2014 -0500

----------------------------------------------------------------------
 src/couch_replicator_manager.erl | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/c6be20d1/src/couch_replicator_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index 3566433..099b215 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -316,13 +316,13 @@ changes_reader_cb({change, Change, _}, _, {Server, DbName}) ->
         false ->
             ok
     end,
-    {ok, {Server, DbName}};
+    {Server, DbName};
 changes_reader_cb({stop, EndSeq, _Pending}, _, {Server, DbName}) ->
     Msg = {rep_db_checkpoint, DbName, EndSeq},
     ok = gen_server:call(Server, Msg, infinity),
-    {ok, {Server, DbName}};
+    {Server, DbName};
 changes_reader_cb(_, _, Acc) ->
-    {ok, Acc}.
+    Acc.
 
 has_valid_rep_id({Change}) ->
     has_valid_rep_id(get_json_value(<<"id">>, Change));


[37/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Update to use couch_stats


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/aafb5f9d
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/aafb5f9d
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/aafb5f9d

Branch: refs/heads/master
Commit: aafb5f9d9b97d0cadb21b10e338cca6c9caf6afd
Parents: a85e9b8
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Aug 21 01:47:01 2014 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Thu Aug 21 01:47:01 2014 -0500

----------------------------------------------------------------------
 priv/stats_descriptions.cfg             | 56 ++++++++++++++++++++++++++++
 src/couch_replicator.app.src            |  3 +-
 src/couch_replicator.erl                | 10 +++++
 src/couch_replicator_changes_reader.erl |  3 ++
 src/couch_replicator_httpc.erl          |  9 +++++
 5 files changed, 80 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/aafb5f9d/priv/stats_descriptions.cfg
----------------------------------------------------------------------
diff --git a/priv/stats_descriptions.cfg b/priv/stats_descriptions.cfg
new file mode 100644
index 0000000..2564f92
--- /dev/null
+++ b/priv/stats_descriptions.cfg
@@ -0,0 +1,56 @@
+{[couch_replicator, changes_read_failures], [
+    {type, counter},
+    {desc, <<"number of failed replicator changes read failures">>}
+]}.
+{[couch_replicator, changes_reader_deaths], [
+    {type, counter},
+    {desc, <<"number of failed replicator changes readers">>}
+]}.
+{[couch_replicator, changes_manager_deaths], [
+    {type, counter},
+    {desc, <<"number of failed replicator changes managers">>}
+]}.
+{[couch_replicator, changes_queue_deaths], [
+    {type, counter},
+    {desc, <<"number of failed replicator changes work queues">>}
+]}.
+{[couch_replicator, checkpoints, success], [
+    {type, counter},
+    {desc, <<"number of checkpoints successfully saves">>}
+]}.
+{[couch_replicator, checkpoints, failure], [
+    {type, counter},
+    {desc, <<"number of failed checkpoint saves">>}
+]}.
+{[couch_replicator, failed_starts], [
+    {type, counter},
+    {desc, <<"number of replications that have failed to start">>}
+]}.
+{[couch_replicator, requests], [
+    {type, counter},
+    {desc, <<"number of HTTP requests made by the replicator">>}
+]}.
+{[couch_replicator, responses, failure], [
+    {type, counter},
+    {desc, <<"number of failed HTTP responses received by the replicator">>}
+]}.
+{[couch_replicator, responses, success], [
+    {type, counter},
+    {desc, <<"number of successful HTTP responses received by the replicator">>}
+]}.
+{[couch_replicator, stream_responses, failure], [
+    {type, counter},
+    {desc, <<"number of failed streaming HTTP responses received by the replicator">>}
+]}.
+{[couch_replicator, stream_responses, success], [
+    {type, counter},
+    {desc, <<"number of successful streaming HTTP responses received by the replicator">>}
+]}.
+{[couch_replicator, worker_deaths], [
+    {type, counter},
+    {desc, <<"number of failed replicator workers">>}
+]}.
+{[couch_replicator, workers_started], [
+    {type, counter},
+    {desc, <<"number of replicator workers started">>}
+]}.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/aafb5f9d/src/couch_replicator.app.src
----------------------------------------------------------------------
diff --git a/src/couch_replicator.app.src b/src/couch_replicator.app.src
index f4470f2..4f12195 100644
--- a/src/couch_replicator.app.src
+++ b/src/couch_replicator.app.src
@@ -39,7 +39,8 @@
         couch_log,
         mem3,
         couch,
-        couch_event
+        couch_event,
+        couch_stats
     ]}
 ]}.
 

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/aafb5f9d/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index 6bfd3bb..ee45d0b 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -265,6 +265,7 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
     MaxConns = get_value(http_connections, Options),
     Workers = lists:map(
         fun(_) ->
+            couch_stats:increment_counter([couch_replicator, workers_started]),
             {ok, Pid} = couch_replicator_worker:start_link(
                 self(), Source, Target, ChangesManager, MaxConns),
             Pid
@@ -347,6 +348,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
     {noreply, State};
 
 handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
+    couch_stats:increment_counter([couch_replicator, changes_reader_deaths]),
     couch_log:error("ChangesReader process died with reason: ~p", [Reason]),
     {stop, changes_reader_died, cancel_timer(State)};
 
@@ -354,6 +356,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
     {noreply, State};
 
 handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) ->
+    couch_stats:increment_counter([couch_replicator, changes_manager_deaths]),
     couch_log:error("ChangesManager process died with reason: ~p", [Reason]),
     {stop, changes_manager_died, cancel_timer(State)};
 
@@ -361,6 +364,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
     {noreply, State};
 
 handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
+    couch_stats:increment_counter([couch_replicator, changes_queue_deaths]),
     couch_log:error("ChangesQueue process died with reason: ~p", [Reason]),
     {stop, changes_queue_died, cancel_timer(State)};
 
@@ -385,6 +389,7 @@ handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
     false ->
         {stop, {unknown_process_died, Pid, Reason}, State2};
     true ->
+        couch_stats:increment_counter([couch_replicator, worker_deaths]),
         couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]),
         {stop, {worker_died, Pid, Reason}, State2}
     end;
@@ -453,8 +458,10 @@ handle_cast({db_compacted, DbName},
 handle_cast(checkpoint, State) ->
     case do_checkpoint(State) of
     {ok, NewState} ->
+        couch_stats:increment_counter([couch_replicator, checkpoints, success]),
         {noreply, NewState#rep_state{timer = start_timer(State)}};
     Error ->
+        couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
         {stop, Error, State}
     end;
 
@@ -493,6 +500,7 @@ terminate(Reason, #rep_state{} = State) ->
 
 terminate(shutdown, {error, Class, Error, Stack, InitArgs}) ->
     #rep{id=RepId} = InitArgs,
+    couch_stats:increment_counter([couch_replicator, failed_starts]),
     couch_log:error("~p:~p: Replication failed to start for args ~p: ~p",
              [Class, Error, InitArgs, Stack]),
     case Error of
@@ -521,8 +529,10 @@ do_last_checkpoint(#rep_state{seqs_in_progress = [],
     highest_seq_done = Seq} = State) ->
     case do_checkpoint(State#rep_state{current_through_seq = Seq}) of
     {ok, NewState} ->
+        couch_stats:increment_counter([couch_replicator, checkpoints, success]),
         {stop, normal, cancel_timer(NewState)};
     Error ->
+        couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
         {stop, Error, State}
     end.
 

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/aafb5f9d/src/couch_replicator_changes_reader.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_changes_reader.erl b/src/couch_replicator_changes_reader.erl
index 623e8f9..b7d18e0 100644
--- a/src/couch_replicator_changes_reader.erl
+++ b/src/couch_replicator_changes_reader.erl
@@ -53,6 +53,9 @@ read_changes(Parent, StartSeq, Db, ChangesQueue, Options, Ts) ->
             LS = get(last_seq),
             read_changes(Parent, LS, Db, ChangesQueue, Options, Ts+1);
         exit:{http_request_failed, _, _, _} = Error ->
+        couch_stats:increment_counter(
+            [couch_replicator, changes_read_failures]
+        ),
         case get(retries_left) of
         N when N > 0 ->
             put(retries_left, N - 1),

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/aafb5f9d/src/couch_replicator_httpc.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_httpc.erl b/src/couch_replicator_httpc.erl
index ecd6a3e..985f0c7 100644
--- a/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator_httpc.erl
@@ -35,6 +35,7 @@ setup(#httpdb{httpc_pool = nil, url = Url, http_connections = MaxConns} = Db) ->
 
 
 send_req(HttpDb, Params1, Callback) ->
+    couch_stats:increment_counter([couch_replicator, requests]),
     Params2 = ?replace(Params1, qs,
         [{K, ?b2l(iolist_to_binary(V))} || {K, V} <- get_value(qs, Params1, [])]),
     Params = ?replace(Params2, ibrowse_options,
@@ -102,6 +103,7 @@ process_response({ibrowse_req_id, ReqId}, Worker, HttpDb, Params, Callback) ->
 process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
     case list_to_integer(Code) of
     Ok when (Ok >= 200 andalso Ok < 300) ; (Ok >= 400 andalso Ok < 500) ->
+        couch_stats:increment_counter([couch_replicator, responses, success]),
         EJson = case Body of
         <<>> ->
             null;
@@ -112,6 +114,7 @@ process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
     R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
         do_redirect(Worker, R, Headers, HttpDb, Params, Callback);
     Error ->
+        couch_stats:increment_counter([couch_replicator, responses, failure]),
         maybe_retry({code, Error}, Worker, HttpDb, Params)
     end;
 
@@ -138,9 +141,15 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
         R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
             do_redirect(Worker, R, Headers, HttpDb, Params, Callback);
         Error ->
+            couch_stats:increment_counter(
+                [couch_replicator, stream_responses, failure]
+            ),
             report_error(Worker, HttpDb, Params, {code, Error})
         end;
     {ibrowse_async_response, ReqId, {error, _} = Error} ->
+        couch_stats:increment_counter(
+            [couch_replicator, stream_responses, failure]
+        ),
         maybe_retry(Error, Worker, HttpDb, Params)
     after HttpDb#httpdb.timeout + 500 ->
         % Note: ibrowse should always reply with timeouts, but this doesn't


[17/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Posted by rn...@apache.org.
Rate limit the retrieval of pending changes

This limits the update of the changes_pending field to once every
`connection_timeout` milliseconds.

BugzId: 26015


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/06cf6995
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/06cf6995
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/06cf6995

Branch: refs/heads/master
Commit: 06cf69957b1bb0272cc4a0e679275bab8c7a16d1
Parents: f10dec7
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Dec 11 10:08:23 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 15:17:38 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator.erl | 25 +++++++++++++++++++++++--
 1 file changed, 23 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/06cf6995/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index 8041283..82c1360 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -875,7 +875,28 @@ db_monitor(_HttpDb) ->
     nil.
 
 
-get_pending_count(#rep_state{source = #httpdb{} = Db0}=St) ->
+get_pending_count(St) ->
+    Rep = St#rep_state.rep_details,
+    Timeout = get_value(connection_timeout, Rep#rep.options),
+    TimeoutMicro = Timeout * 1000,
+    case get(pending_count_state) of
+        {LastUpdate, PendingCount} ->
+            case timer:now_diff(os:timestamp(), LastUpdate) > TimeoutMicro of
+                true ->
+                    NewPendingCount = get_pending_count_int(St),
+                    put(pending_count_state, {os:timestamp(), NewPendingCount}),
+                    NewPendingCount;
+                false ->
+                    PendingCount
+            end;
+        undefined ->
+            NewPendingCount = get_pending_count_int(St),
+            put(pending_count_state, {os:timestamp(), NewPendingCount}),
+            NewPendingCount
+    end.
+
+
+get_pending_count_int(#rep_state{source = #httpdb{} = Db0}=St) ->
     {_, Seq} = St#rep_state.highest_seq_done,
     Db = Db0#httpdb{retries = 3},
     case (catch couch_replicator_api_wrap:get_pending_count(Db, Seq)) of
@@ -884,7 +905,7 @@ get_pending_count(#rep_state{source = #httpdb{} = Db0}=St) ->
     _ ->
         null
     end;
-get_pending_count(#rep_state{source = Db}=St) ->
+get_pending_count_int(#rep_state{source = Db}=St) ->
     {_, Seq} = St#rep_state.highest_seq_done,
     {ok, Pending} = couch_replicator_api_wrap:get_pending_count(Db, Seq),
     Pending.