You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2017/03/14 19:25:56 UTC

[01/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Repository: couchdb-couch-replicator
Updated Branches:
  refs/heads/63012-scheduler [created] 27a5eae90


Merge pull request #88 from cloudant/63012-fix-scheduler-docs-filtering-for-users

Fix _scheduler/docs filtering for users

Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/26fc4dfd
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/26fc4dfd
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/26fc4dfd

Branch: refs/heads/63012-scheduler
Commit: 26fc4dfdb9b7dab297d794278545355d0343cbef
Parents: 07a1f9b 0d05f77
Author: Nick Vatamaniuc <ni...@users.noreply.github.com>
Authored: Wed Oct 19 15:14:00 2016 -0400
Committer: GitHub <no...@github.com>
Committed: Wed Oct 19 15:14:00 2016 -0400

----------------------------------------------------------------------
 src/couch_replicator_doc_processor.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------



[22/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Implement Compatibility Mode

Compatibilty mode enables writing of `triggered` and `error` states to the
replication documents. It is designed for backward compatibility with the
previous replicator behavior.

Compatibility mode only affects writing of states to the documents, the
scheduling and backoff as well as the new _scheduler/docs and
_scheduler/jobs HTTP endpoints are not affected by the mode change.

Implementation notes:

 * A main idea is avoiding an update feedback cycle,  that is when
 a document state update triggers a replication job update,  which,
 in turn, generates another state update and so on... To avoid this situation a
 semantic short circuit is used: discarad all document updates which do not
 change replication parameters. This includes changes to the _replicator_* fields
 used during state updates, or custom user fields. To accomplish this shortcut,
 introduce the idea of a "normalized" #rep{} record. That is a replication record
 which only has fields set that make it possible to compare 2 instances of rep
 records.

* To make normalization work, some options and other list-based rep fields have
to be sorted. Since options are already sorted, don't sort them again, however,
doc_ids filter was not sorted, so make sure to sort just that.

* Since the scheduling replicator might force documents to wait a while in the
pending queue before running them, make sure to update the `triggered` state not
only when jobs started successfully but also when they are added to the
replicator the first time. The preserves the rough semantic of "triggered means
replicator has noticed my document and turned into a replication job" idea.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/f41fd677
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/f41fd677
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/f41fd677

Branch: refs/heads/63012-scheduler
Commit: f41fd677beffbb37e87134983c936a0bcc8cc9df
Parents: 2f95128
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Tue Nov 8 20:58:58 2016 -0500
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Tue Nov 8 20:58:58 2016 -0500

----------------------------------------------------------------------
 src/couch_replicator_api_wrap.erl      |  46 ++++++++-
 src/couch_replicator_doc_processor.erl | 144 ++++++++++++++++++++++++----
 src/couch_replicator_docs.erl          |  63 ++++++++++--
 src/couch_replicator_scheduler.erl     |   6 ++
 src/couch_replicator_scheduler_job.erl |   8 +-
 5 files changed, 240 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f41fd677/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index b87b020..7828684 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -38,7 +38,8 @@
     open_doc/3,
     open_doc_revs/6,
     changes_since/5,
-    db_uri/1
+    db_uri/1,
+    normalize_db/1
     ]).
 
 -import(couch_replicator_httpc, [
@@ -989,3 +990,46 @@ header_value(Key, Headers, Default) ->
         _ ->
             Default
     end.
+
+
+% Normalize an #httpdb{} or #db{} record such that it can be used for
+% comparisons. This means remove things like pids and also sort options / props.
+normalize_db(#httpdb{} = HttpDb) ->
+    #httpdb{
+       url = HttpDb#httpdb.url,
+       oauth = HttpDb#httpdb.oauth,
+       headers = lists:keysort(1, HttpDb#httpdb.headers),
+       timeout = HttpDb#httpdb.timeout,
+       ibrowse_options = lists:keysort(1, HttpDb#httpdb.ibrowse_options),
+       retries = HttpDb#httpdb.retries,
+       http_connections = HttpDb#httpdb.http_connections
+    };
+
+normalize_db(<<DbName/binary>>) ->
+    DbName.
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+
+normalize_http_db_test() ->
+    HttpDb =  #httpdb{
+       url = "http://host/db",
+       oauth = #oauth{},
+       headers = [{"k2","v2"}, {"k1","v1"}],
+       timeout = 30000,
+       ibrowse_options = [{k2, v2}, {k1, v1}],
+       retries = 10,
+       http_connections = 20
+    },
+    Expected = HttpDb#httpdb{
+        headers = [{"k1","v1"}, {"k2","v2"}],
+        ibrowse_options = [{k1, v1}, {k2, v2}]
+    },
+    ?assertEqual(Expected, normalize_db(HttpDb)),
+    ?assertEqual(<<"local">>, normalize_db(<<"local">>)).
+
+
+-endif.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f41fd677/src/couch_replicator_doc_processor.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_doc_processor.erl b/src/couch_replicator_doc_processor.erl
index 733406f..dddde0f 100644
--- a/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator_doc_processor.erl
@@ -15,6 +15,7 @@
 
 -export([start_link/0]).
 -export([docs/1, doc/2]).
+-export([compat_mode/0]).
 
 % multidb changes callback
 -export([db_created/2, db_deleted/2, db_found/2, db_change/3]).
@@ -31,6 +32,7 @@
     get_json_value/3
 ]).
 
+-define(DEFAULT_COMPATIBILITY, false).
 -define(ERROR_MAX_BACKOFF_EXPONENT, 12).  % ~ 1 day on average
 -define(TS_DAY_SEC, 86400).
 
@@ -104,14 +106,14 @@ process_change(DbName, {Change}) ->
         undefined ->
             ok = process_updated(Id, JsonRepDoc);
         <<"triggered">> ->
-            couch_replicator_docs:remove_state_fields(DbName, DocId),
+            maybe_remove_state_fields(DbName, DocId),
             ok = process_updated(Id, JsonRepDoc);
         <<"completed">> ->
             ok = gen_server:call(?MODULE, {completed, Id}, infinity);
         <<"error">> ->
             % Handle replications started from older versions of replicator
             % which wrote transient errors to replication docs
-            couch_replicator_docs:remove_state_fields(DbName, DocId),
+            maybe_remove_state_fields(DbName, DocId),
             ok = process_updated(Id, JsonRepDoc);
         <<"failed">> ->
             ok
@@ -122,6 +124,15 @@ process_change(DbName, {Change}) ->
     ok.
 
 
+maybe_remove_state_fields(DbName, DocId) ->
+    case compat_mode() of
+        true ->
+            ok;
+        false ->
+            couch_replicator_docs:remove_state_fields(DbName, DocId)
+    end.
+
+
 process_updated({DbName, _DocId} = Id, JsonRepDoc) ->
     % Parsing replication doc (but not calculating the id) could throw an
     % exception which would indicate this document is malformed. This exception
@@ -206,21 +217,68 @@ code_change(_OldVsn, State, _Extra) ->
 % same document.
 -spec updated_doc(db_doc_id(), #rep{}, filter_type()) -> ok.
 updated_doc(Id, Rep, Filter) ->
-    removed_doc(Id),
-    Row = #rdoc{
-        id = Id,
-        state = initializing,
-        rep = Rep,
-        rid = nil,
-        filter = Filter,
-        info = nil,
-        errcnt = 0,
-        worker = nil,
-        last_updated = os:timestamp()
-    },
-    true = ets:insert(?MODULE, Row),
-    ok = maybe_start_worker(Id),
-    ok.
+    case compare_replication_records(current_rep(Id), Rep) of
+        false ->
+            removed_doc(Id),
+            Row = #rdoc{
+                id = Id,
+                state = initializing,
+                rep = Rep,
+                rid = nil,
+                filter = Filter,
+                info = nil,
+                errcnt = 0,
+                worker = nil,
+                last_updated = os:timestamp()
+            },
+            true = ets:insert(?MODULE, Row),
+            ok = maybe_start_worker(Id);
+        true ->
+            ok
+    end.
+
+
+-spec compare_replication_records(#rep{}, #rep{}) -> boolean().
+compare_replication_records(Rep1, Rep2) ->
+    normalize_rep(Rep1) == normalize_rep(Rep2).
+
+
+% Return current #rep{} record if any. If replication hasn't been submitted
+% to the scheduler yet, #rep{} record will be in the document processor's
+% ETS table, otherwise query scheduler for the #rep{} record.
+-spec current_rep({binary(), binary()}) -> #rep{} | nil.
+current_rep({DbName, DocId}) when is_binary(DbName), is_binary(DocId) ->
+    case ets:lookup(?MODULE, {DbName, DocId}) of
+        [] ->
+            nil;
+        [#rdoc{state = scheduled, rep = nil, rid = JobId}] ->
+            % When replication is scheduled, #rep{} record which can be quite
+            % large compared to other bits in #rdoc is removed in order to avoid
+            % having to keep 2 copies of it. So have to fetch it from the
+            % scheduler.
+            couch_replicator_scheduler:rep_state(JobId);
+        [#rdoc{rep = Rep}] ->
+            Rep
+    end.
+
+
+% Normalize a #rep{} record such that it doesn't contain time dependent fields
+% pids (like httpc pools), and options / props are sorted. This function would
+% used during comparisons.
+-spec normalize_rep(#rep{} | nil) -> #rep{} | nil.
+normalize_rep(nil) ->
+    nil;
+
+normalize_rep(#rep{} = Rep)->
+    #rep{
+       source = couch_replicator_api_wrap:normalize_db(Rep#rep.source),
+       target = couch_replicator_api_wrap:normalize_db(Rep#rep.target),
+       options = Rep#rep.options,  % already sorted in make_options/1
+       type = Rep#rep.type,
+       view = Rep#rep.view,
+       doc_id = Rep#rep.doc_id,
+       db_name = Rep#rep.db_name
+    }.
 
 
 -spec worker_returned(reference(), db_doc_id(), rep_start_result()) -> ok.
@@ -255,6 +313,7 @@ worker_returned(Ref, Id, {ok, RepId}) ->
                 Row0#rdoc{rep=nil, rid=RepId, info=nil}
         end,
         true = ets:insert(?MODULE, NewRow),
+        ok = maybe_update_doc_triggered(Row#rdoc.rep, RepId),
         ok = maybe_start_worker(Id);
     _ ->
         ok  % doc could have been deleted, ignore
@@ -273,6 +332,7 @@ worker_returned(Ref, Id, {temporary_error, Reason}) ->
             last_updated = os:timestamp()
         },
         true = ets:insert(?MODULE, NewRow),
+        ok = maybe_update_doc_error(NewRow#rdoc.rep, Reason),
         ok = maybe_start_worker(Id);
     _ ->
         ok  % doc could have been deleted, ignore
@@ -288,6 +348,24 @@ worker_returned(Ref, Id, {permanent_failure, _Reason}) ->
     end,
     ok.
 
+-spec maybe_update_doc_error(#rep{}, any()) -> ok.
+maybe_update_doc_error(Rep, Reason) ->
+    case compat_mode() of
+        true ->
+            couch_replicator_docs:update_error(Rep, Reason);
+        false ->
+            ok
+    end.
+
+-spec maybe_update_doc_triggered(#rep{}, rep_id()) -> ok.
+maybe_update_doc_triggered(Rep, RepId) ->
+    case compat_mode() of
+        true ->
+            couch_replicator_docs:update_triggered(Rep, RepId);
+        false ->
+            ok
+    end.
+
 
 -spec error_backoff(non_neg_integer()) -> seconds().
 error_backoff(ErrCnt) ->
@@ -347,6 +425,10 @@ get_worker_wait(#rdoc{state = error, errcnt = ErrCnt}) ->
 get_worker_wait(#rdoc{state = initializing}) ->
     0.
 
+-spec compat_mode() -> boolean().
+compat_mode() ->
+    config:get_boolean("replicator", "compatibility_mode", ?DEFAULT_COMPATIBILITY).
+
 
 % _scheduler/docs HTTP endpoint helpers
 
@@ -630,6 +712,33 @@ t_ejson_docs() ->
     end).
 
 
+normalize_rep_test_() ->
+    {
+        setup,
+        fun() -> meck:expect(config, get, fun(_, _, Default) -> Default end) end,
+        fun(_) -> meck:unload() end,
+        ?_test(begin
+            EJson1 = {[
+                {<<"source">>, <<"http://host.com/source_db">>},
+                {<<"target">>, <<"local">>},
+                {<<"doc_ids">>, [<<"a">>, <<"c">>, <<"b">>]},
+                {<<"other_field">>, <<"some_value">>}
+            ]},
+            Rep1 = couch_replicator_docs:parse_rep_doc_without_id(EJson1),
+            EJson2 = {[
+                {<<"other_field">>, <<"unrelated">>},
+                {<<"target">>, <<"local">>},
+                {<<"source">>, <<"http://host.com/source_db">>},
+                {<<"doc_ids">>, [<<"c">>, <<"a">>, <<"b">>]},
+                {<<"other_field2">>, <<"unrelated2">>}
+            ]},
+            Rep2 = couch_replicator_docs:parse_rep_doc_without_id(EJson2),
+            ?assertEqual(normalize_rep(Rep1), normalize_rep(Rep2))
+        end)
+    }.
+
+
+
 % Test helper functions
 
 
@@ -639,6 +748,7 @@ setup() ->
     meck:expect(couch_log, warning, 2, ok),
     meck:expect(couch_log, error, 2, ok),
     meck:expect(config, get, fun(_, _, Default) -> Default end),
+    meck:expect(config, listen_for_changes, 2, ok),
     meck:expect(couch_replicator_clustering, owner, 2, node()),
     meck:expect(couch_replicator_doc_processor_worker, spawn_worker, 3, wref),
     meck:expect(couch_replicator_scheduler, remove_job, 1, ok),

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f41fd677/src/couch_replicator_docs.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_docs.erl b/src/couch_replicator_docs.erl
index 75889e9..fb417e4 100644
--- a/src/couch_replicator_docs.erl
+++ b/src/couch_replicator_docs.erl
@@ -23,6 +23,8 @@
     update_failed/4,
     update_rep_id/1
 ]).
+-export([update_triggered/2, update_error/2]).
+
 
 -define(REP_DB_NAME, <<"_replicator">>).
 -define(REP_DESIGN_DOC, <<"_design/_replicator">>).
@@ -57,6 +59,8 @@ remove_state_fields(DbName, DocId) ->
         {<<"_replication_state">>, undefined},
         {<<"_replication_state_time">>, undefined},
         {<<"_replication_state_reason">>, undefined},
+        {<<"_replication_start_time">>, undefined},
+        {<<"_replication_id">>, undefined},
         {<<"_replication_stats">>, undefined}]).
 
 -spec update_doc_completed(binary(), binary(), [_], erlang:timestamp()) -> any().
@@ -72,19 +76,49 @@ update_doc_completed(DbName, DocId, Stats, StartTime) ->
 
 -spec update_failed(binary(), binary(), any(), erlang:timestamp()) -> any().
 update_failed(DbName, DocId, Error, StartTime) ->
-    Reason = case Error of
-        {bad_rep_doc, Reas} ->
-            Reas;
-        _ ->
-            to_binary(Error)
-    end,
+    Reason = error_reason(Error),
     couch_log:error("Error processing replication doc `~s`: ~s", [DocId, Reason]),
     StartTimeBin = couch_replicator_utils:iso8601(StartTime),
     update_rep_doc(DbName, DocId, [
         {<<"_replication_state">>, <<"failed">>},
         {<<"_replication_start_time">>, StartTimeBin},
+        {<<"_replication_stats">>, undefined},
         {<<"_replication_state_reason">>, Reason}]),
-   couch_stats:increment_counter([couch_replicator, docs, failed_state_updates]).
+    couch_stats:increment_counter([couch_replicator, docs, failed_state_updates]).
+
+
+-spec update_triggered(#rep{}, rep_id()) -> ok.
+update_triggered(Rep, {Base, Ext}) ->
+    #rep{
+        db_name = DbName,
+        doc_id = DocId,
+        start_time = StartTime
+    } = Rep,
+    StartTimeBin = couch_replicator_utils:iso8601(StartTime),
+    update_rep_doc(DbName, DocId, [
+            {<<"_replication_state">>, <<"triggered">>},
+            {<<"_replication_state_reason">>, undefined},
+            {<<"_replication_id">>, iolist_to_binary([Base, Ext])},
+            {<<"_replication_start_time">>, StartTimeBin},
+            {<<"_replication_stats">>, undefined}]),
+    ok.
+
+
+-spec update_error(#rep{}, any()) -> ok.
+update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) ->
+    Reason = error_reason(Error),
+    BinRepId = case RepId of
+        {Base, Ext} ->
+            iolist_to_binary([Base, Ext]);
+        _Other ->
+            null
+    end,
+    update_rep_doc(DbName, DocId, [
+            {<<"_replication_state">>, <<"error">>},
+            {<<"_replication_state_reason">>, Reason},
+            {<<"_replication_stats">>, undefined},
+            {<<"_replication_id">>, BinRepId}]),
+    ok.
 
 
 -spec ensure_rep_db_exists() -> {ok, #db{}}.
@@ -458,7 +492,7 @@ convert_options([{<<"doc_ids">>, V} | _R]) when not is_list(V) ->
 convert_options([{<<"doc_ids">>, V} | R]) ->
     % Ensure same behaviour as old replicator: accept a list of percent
     % encoded doc IDs.
-    DocIds = [?l2b(couch_httpd:unquote(Id)) || Id <- V],
+    DocIds = lists:usort([?l2b(couch_httpd:unquote(Id)) || Id <- V]),
     [{doc_ids, DocIds} | convert_options(R)];
 convert_options([{<<"selector">>, V} | _R]) when not is_tuple(V) ->
     throw({bad_request, <<"parameter `selector` must be a JSON object">>});
@@ -626,6 +660,19 @@ strip_credentials({Props}) ->
     {lists:keydelete(<<"oauth">>, 1, Props)}.
 
 
+error_reason({shutdown, Error}) ->
+    error_reason(Error);
+error_reason({bad_rep_doc, Reason}) ->
+    to_binary(Reason);
+error_reason({error, {Error, Reason}})
+  when is_atom(Error), is_binary(Reason) ->
+    to_binary(io_lib:format("~s: ~s", [Error, Reason]));
+error_reason({error, Reason}) ->
+    to_binary(Reason);
+error_reason(Reason) ->
+    to_binary(Reason).
+
+
 
 -ifdef(TEST).
 

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f41fd677/src/couch_replicator_scheduler.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_scheduler.erl b/src/couch_replicator_scheduler.erl
index 0aa4ef6..a772ecf 100644
--- a/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator_scheduler.erl
@@ -309,6 +309,12 @@ handle_crashed_job(#job{rep = #rep{db_name = null}} = Job, Reason, _State) ->
 
 handle_crashed_job(Job, Reason, State) ->
     ok = update_state_crashed(Job, Reason, State),
+    case couch_replicator_doc_processor:compat_mode() of
+        true ->
+            couch_replicator_docs:update_error(Job#job.rep, Reason);
+        false ->
+            ok
+    end,
     case ets:info(?MODULE, size) < State#state.max_jobs of
         true ->
             % Starting pending jobs is an O(TotalJobsCount) operation. Only do

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f41fd677/src/couch_replicator_scheduler_job.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_scheduler_job.erl b/src/couch_replicator_scheduler_job.erl
index 9d14a9d..32e6930 100644
--- a/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator_scheduler_job.erl
@@ -486,7 +486,13 @@ format_status(_Opt, [_PDict, State]) ->
 -spec doc_update_triggered(#rep{}) -> ok.
 doc_update_triggered(#rep{db_name = null}) ->
     ok;
-doc_update_triggered(#rep{id = RepId, doc_id = DocId}) ->
+doc_update_triggered(#rep{id = RepId, doc_id = DocId} = Rep) ->
+    case couch_replicator_doc_processor:compat_mode() of
+        true ->
+            couch_replicator_docs:update_triggered(Rep, RepId);
+        false ->
+            ok
+    end,
     couch_log:notice("Document `~s` triggered replication `~s`",
         [DocId, pp_rep_id(RepId)]),
     ok.


[42/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
fixup! Fix whitesapace


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/cc9c6c65
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/cc9c6c65
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/cc9c6c65

Branch: refs/heads/63012-scheduler
Commit: cc9c6c65a986a32197a23b3a8bd459dd462baae2
Parents: fb77cbc
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Tue Feb 28 21:47:05 2017 -0500
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Tue Feb 28 21:47:21 2017 -0500

----------------------------------------------------------------------
 src/couch_multidb_changes.erl | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/cc9c6c65/src/couch_multidb_changes.erl
----------------------------------------------------------------------
diff --git a/src/couch_multidb_changes.erl b/src/couch_multidb_changes.erl
index af85a78..d5016b6 100644
--- a/src/couch_multidb_changes.erl
+++ b/src/couch_multidb_changes.erl
@@ -674,15 +674,15 @@ t_misc_gen_server_callbacks() ->
 
 scan_dbs_test_() ->
 {
-      foreach,
-      fun() -> test_util:start_couch([mem3, fabric]) end,
-      fun(Ctx) -> test_util:stop_couch(Ctx) end,
-      [
-          t_pass_shard(),
-          t_fail_shard(),
-          t_pass_local(),
-          t_fail_local()
-     ]
+    foreach,
+    fun() -> test_util:start_couch([mem3, fabric]) end,
+    fun(Ctx) -> test_util:stop_couch(Ctx) end,
+    [
+        t_pass_shard(),
+        t_fail_shard(),
+        t_pass_local(),
+        t_fail_local()
+    ]
 }.
 
 
@@ -722,7 +722,7 @@ t_pass_local() ->
 
 
 t_fail_local() ->
- ?_test(begin
+    ?_test(begin
         LocalDb = ?tempdb(),
         {ok, Db} = couch_db:create(LocalDb, [?CTX]),
         ok = couch_db:close(Db),


[06/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Re-create missing design doc if it is not found.

Previously only created the document when a new replicator db was created or
was found. Howver, user can delete the design document anytime. When they,
query the _scheduler/docs endpoint, the query would crash as the view will
not be there.

So to fix, when querying the view and it is missing, try to re-create it.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/3bb565ad
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/3bb565ad
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/3bb565ad

Branch: refs/heads/63012-scheduler
Commit: 3bb565ade9799a74a037ea3d822450228e232c37
Parents: b759481
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Thu Oct 20 17:56:05 2016 -0400
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Mon Oct 24 13:59:25 2016 -0400

----------------------------------------------------------------------
 src/couch_replicator.erl      | 11 +++++++++--
 src/couch_replicator_docs.erl | 10 ++++++++++
 2 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/3bb565ad/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index 0bfd9f6..a8d4608 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -164,8 +164,15 @@ stream_terminal_docs_info(Db, Cb, UserAcc, States) ->
     try fabric:query_view(Db, DDoc, View, QueryCb, Acc, Args) of
     {ok, {Db, Cb, UserAcc1, States}} ->
         UserAcc1
-    catch error:database_does_not_exist ->
-        UserAcc
+    catch
+        error:database_does_not_exist ->
+            UserAcc;
+        error:{badmatch, {not_found, Reason}} ->
+            Msg = "Could not find _design/~s ~s view in replicator db ~s : ~p",
+            couch_log:error(Msg, [DDoc, View, Db, Reason]),
+            couch_replicator_docs:ensure_cluster_rep_ddoc_exists(Db),
+            timer:sleep(1000),
+            stream_terminal_docs_info(Db, Cb, UserAcc, States)
     end.
 
 

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/3bb565ad/src/couch_replicator_docs.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_docs.erl b/src/couch_replicator_docs.erl
index aeb9219..e27aad6 100644
--- a/src/couch_replicator_docs.erl
+++ b/src/couch_replicator_docs.erl
@@ -16,6 +16,7 @@
 -export([parse_rep_doc_without_id/1, parse_rep_doc_without_id/2]).
 -export([before_doc_update/2, after_doc_read/2]).
 -export([ensure_rep_db_exists/0, ensure_rep_ddoc_exists/1]).
+-export([ensure_cluster_rep_ddoc_exists/1]).
 -export([
     remove_state_fields/2,
     update_doc_completed/3,
@@ -28,6 +29,7 @@
 
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("ibrowse/include/ibrowse.hrl").
+-include_lib("mem3/include/mem3.hrl").
 -include("couch_replicator_api_wrap.hrl").
 -include("couch_replicator.hrl").
 
@@ -134,6 +136,14 @@ ensure_rep_ddoc_exists(RepDb, DDocId) ->
     end,
     ok.
 
+
+-spec ensure_cluster_rep_ddoc_exists(binary()) -> ok.
+ensure_cluster_rep_ddoc_exists(RepDb) ->
+    DDocId = ?REP_DESIGN_DOC,
+    [#shard{name = DbShard} | _] = mem3:shards(RepDb, DDocId),
+    ensure_rep_ddoc_exists(DbShard, DDocId).
+
+
 -spec compare_ejson({[_]}, {[_]}) -> boolean().
 compare_ejson(EJson1, EJson2) ->
     EjsonSorted1 = couch_replicator_filters:ejsort(EJson1),


[28/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Add tests for replication proxies


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/32d0df72
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/32d0df72
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/32d0df72

Branch: refs/heads/63012-scheduler
Commit: 32d0df729d301da9e81a882f86c45992c44588de
Parents: 58ddc26
Author: Benjamin Bastian <be...@gmail.com>
Authored: Mon Nov 21 10:15:03 2016 -0800
Committer: Benjamin Bastian <be...@gmail.com>
Committed: Wed Nov 23 13:49:52 2016 -0800

----------------------------------------------------------------------
 test/couch_replicator_proxy_tests.erl | 69 ++++++++++++++++++++++++++++++
 1 file changed, 69 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/32d0df72/test/couch_replicator_proxy_tests.erl
----------------------------------------------------------------------
diff --git a/test/couch_replicator_proxy_tests.erl b/test/couch_replicator_proxy_tests.erl
new file mode 100644
index 0000000..a40e5b1
--- /dev/null
+++ b/test/couch_replicator_proxy_tests.erl
@@ -0,0 +1,69 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_proxy_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch_replicator/src/couch_replicator.hrl").
+-include_lib("couch_replicator/src/couch_replicator_api_wrap.hrl").
+
+
+setup() ->
+    ok.
+
+
+teardown(_) ->
+    ok.
+
+
+replicator_proxy_test_() ->
+    {
+        "replicator proxy tests",
+        {
+            setup,
+            fun() -> test_util:start_couch([couch_replicator]) end, fun test_util:stop_couch/1,
+            {
+                foreach,
+                fun setup/0, fun teardown/1,
+                [
+                    fun parse_rep_doc_without_proxy/1,
+                    fun parse_rep_doc_with_proxy/1
+                ]
+            }
+        }
+    }.
+
+
+parse_rep_doc_without_proxy(_) ->
+    ?_test(begin
+        NoProxyDoc = {[
+            {<<"source">>, <<"http://unproxied.com">>},
+            {<<"target">>, <<"http://otherunproxied.com">>}
+        ]},
+        Rep = couch_replicator_docs:parse_rep_doc(NoProxyDoc),
+        ?assertEqual((Rep#rep.source)#httpdb.proxy_url, undefined),
+        ?assertEqual((Rep#rep.target)#httpdb.proxy_url, undefined)
+    end).
+
+
+parse_rep_doc_with_proxy(_) ->
+    ?_test(begin
+        ProxyURL = <<"http://myproxy.com">>,
+        ProxyDoc = {[
+            {<<"source">>, <<"http://unproxied.com">>},
+            {<<"target">>, <<"http://otherunproxied.com">>},
+            {<<"proxy">>, ProxyURL}
+        ]},
+        Rep = couch_replicator_docs:parse_rep_doc(ProxyDoc),
+        ?assertEqual((Rep#rep.source)#httpdb.proxy_url, binary_to_list(ProxyURL)),
+        ?assertEqual((Rep#rep.target)#httpdb.proxy_url, binary_to_list(ProxyURL))
+    end).


[24/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Fix indentation


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/849ca9b4
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/849ca9b4
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/849ca9b4

Branch: refs/heads/63012-scheduler
Commit: 849ca9b49f2b4d5665a8a5e6764a9269b2ba36fc
Parents: 36367d0
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Wed Nov 9 17:44:02 2016 -0500
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Wed Nov 9 17:44:02 2016 -0500

----------------------------------------------------------------------
 src/couch_replicator_api_wrap.erl      | 28 ++++++++++++++--------------
 src/couch_replicator_doc_processor.erl | 14 +++++++-------
 src/couch_replicator_docs.erl          | 18 +++++++++---------
 3 files changed, 30 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/849ca9b4/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index 7828684..a0d08d7 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -996,13 +996,13 @@ header_value(Key, Headers, Default) ->
 % comparisons. This means remove things like pids and also sort options / props.
 normalize_db(#httpdb{} = HttpDb) ->
     #httpdb{
-       url = HttpDb#httpdb.url,
-       oauth = HttpDb#httpdb.oauth,
-       headers = lists:keysort(1, HttpDb#httpdb.headers),
-       timeout = HttpDb#httpdb.timeout,
-       ibrowse_options = lists:keysort(1, HttpDb#httpdb.ibrowse_options),
-       retries = HttpDb#httpdb.retries,
-       http_connections = HttpDb#httpdb.http_connections
+        url = HttpDb#httpdb.url,
+        oauth = HttpDb#httpdb.oauth,
+        headers = lists:keysort(1, HttpDb#httpdb.headers),
+        timeout = HttpDb#httpdb.timeout,
+        ibrowse_options = lists:keysort(1, HttpDb#httpdb.ibrowse_options),
+        retries = HttpDb#httpdb.retries,
+        http_connections = HttpDb#httpdb.http_connections
     };
 
 normalize_db(<<DbName/binary>>) ->
@@ -1016,13 +1016,13 @@ normalize_db(<<DbName/binary>>) ->
 
 normalize_http_db_test() ->
     HttpDb =  #httpdb{
-       url = "http://host/db",
-       oauth = #oauth{},
-       headers = [{"k2","v2"}, {"k1","v1"}],
-       timeout = 30000,
-       ibrowse_options = [{k2, v2}, {k1, v1}],
-       retries = 10,
-       http_connections = 20
+        url = "http://host/db",
+        oauth = #oauth{},
+        headers = [{"k2","v2"}, {"k1","v1"}],
+        timeout = 30000,
+        ibrowse_options = [{k2, v2}, {k1, v1}],
+        retries = 10,
+        http_connections = 20
     },
     Expected = HttpDb#httpdb{
         headers = [{"k1","v1"}, {"k2","v2"}],

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/849ca9b4/src/couch_replicator_doc_processor.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_doc_processor.erl b/src/couch_replicator_doc_processor.erl
index 8ee1fe7..917d8c9 100644
--- a/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator_doc_processor.erl
@@ -266,13 +266,13 @@ normalize_rep(nil) ->
 
 normalize_rep(#rep{} = Rep)->
     #rep{
-       source = couch_replicator_api_wrap:normalize_db(Rep#rep.source),
-       target = couch_replicator_api_wrap:normalize_db(Rep#rep.target),
-       options = Rep#rep.options,  % already sorted in make_options/1
-       type = Rep#rep.type,
-       view = Rep#rep.view,
-       doc_id = Rep#rep.doc_id,
-       db_name = Rep#rep.db_name
+        source = couch_replicator_api_wrap:normalize_db(Rep#rep.source),
+        target = couch_replicator_api_wrap:normalize_db(Rep#rep.target),
+        options = Rep#rep.options,  % already sorted in make_options/1
+        type = Rep#rep.type,
+        view = Rep#rep.view,
+        doc_id = Rep#rep.doc_id,
+        db_name = Rep#rep.db_name
     }.
 
 

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/849ca9b4/src/couch_replicator_docs.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_docs.erl b/src/couch_replicator_docs.erl
index fb417e4..07344bb 100644
--- a/src/couch_replicator_docs.erl
+++ b/src/couch_replicator_docs.erl
@@ -96,11 +96,11 @@ update_triggered(Rep, {Base, Ext}) ->
     } = Rep,
     StartTimeBin = couch_replicator_utils:iso8601(StartTime),
     update_rep_doc(DbName, DocId, [
-            {<<"_replication_state">>, <<"triggered">>},
-            {<<"_replication_state_reason">>, undefined},
-            {<<"_replication_id">>, iolist_to_binary([Base, Ext])},
-            {<<"_replication_start_time">>, StartTimeBin},
-            {<<"_replication_stats">>, undefined}]),
+        {<<"_replication_state">>, <<"triggered">>},
+        {<<"_replication_state_reason">>, undefined},
+        {<<"_replication_id">>, iolist_to_binary([Base, Ext])},
+        {<<"_replication_start_time">>, StartTimeBin},
+        {<<"_replication_stats">>, undefined}]),
     ok.
 
 
@@ -114,10 +114,10 @@ update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) ->
             null
     end,
     update_rep_doc(DbName, DocId, [
-            {<<"_replication_state">>, <<"error">>},
-            {<<"_replication_state_reason">>, Reason},
-            {<<"_replication_stats">>, undefined},
-            {<<"_replication_id">>, BinRepId}]),
+        {<<"_replication_state">>, <<"error">>},
+        {<<"_replication_state_reason">>, Reason},
+        {<<"_replication_stats">>, undefined},
+        {<<"_replication_id">>, BinRepId}]),
     ok.
 
 


[35/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Fix add_job/1 spec for scheduler

add_job/1 doesn't return `{error, already_added}` anymore so fix spec to
conform.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/73d17368
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/73d17368
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/73d17368

Branch: refs/heads/63012-scheduler
Commit: 73d173687909adf482a2222f22d9e764728eb843
Parents: 32d43df
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Wed Dec 7 20:47:29 2016 -0500
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Wed Jan 11 14:58:33 2017 -0500

----------------------------------------------------------------------
 src/couch_replicator_scheduler.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/73d17368/src/couch_replicator_scheduler.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_scheduler.erl b/src/couch_replicator_scheduler.erl
index af2262b..a741a9f 100644
--- a/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator_scheduler.erl
@@ -74,7 +74,7 @@ start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 
--spec add_job(#rep{}) -> ok | {error, already_added}.
+-spec add_job(#rep{}) -> ok.
 add_job(#rep{} = Rep) when Rep#rep.id /= undefined ->
     Job = #job{
         id = Rep#rep.id,


[18/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Simplify replicator doc processor logic

Condense all `worker_returned(Ref, Id, {ok, RepId})` sub-cases into
a single case statement


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/525e83e2
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/525e83e2
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/525e83e2

Branch: refs/heads/63012-scheduler
Commit: 525e83e2910a37d6514249568fdadcadbec2bae9
Parents: 13ea06b
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Thu Nov 3 16:59:45 2016 -0400
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Thu Nov 3 16:59:45 2016 -0400

----------------------------------------------------------------------
 src/couch_replicator_doc_processor.erl | 67 ++++++++++++-----------------
 1 file changed, 28 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/525e83e2/src/couch_replicator_doc_processor.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_doc_processor.erl b/src/couch_replicator_doc_processor.erl
index 06ec743..733406f 100644
--- a/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator_doc_processor.erl
@@ -227,8 +227,34 @@ updated_doc(Id, Rep, Filter) ->
 worker_returned(Ref, Id, {ok, RepId}) ->
     case ets:lookup(?MODULE, Id) of
     [#rdoc{worker = Ref} = Row] ->
-        NewRow = update_docs_row(Row, RepId),
-        true = ets:insert(?MODULE, NewRow#rdoc{last_updated = os:timestamp()}),
+        Row0 = Row#rdoc{
+            state = scheduled,
+            errcnt = 0,
+            worker = nil,
+            last_updated = os:timestamp()
+        },
+        NewRow = case Row0 of
+            #rdoc{rid = RepId, filter = user} ->
+                % Filtered replication id didn't change.
+                Row0;
+            #rdoc{rid = nil, filter = user} ->
+                % Calculated new replication id for a filtered replication. Make sure
+                % to schedule another check as filter code could change. Replications
+                % starts could have been failing, so also clear error count.
+                Row0#rdoc{rid = RepId};
+            #rdoc{rid = OldRepId, filter = user} ->
+                % Replication id of existing replication job with filter has changed.
+                % Remove old replication job from scheduler and schedule check to check
+                % for future changes.
+                ok = couch_replicator_scheduler:remove_job(OldRepId),
+                Msg = io_lib:format("Replication id changed: ~p -> ~p", [OldRepId, RepId]),
+                Row0#rdoc{info = couch_util:to_binary(Msg)};
+            #rdoc{rid = nil} ->
+                % Calculated new replication id for non-filtered replication. Remove
+                % replication doc body, after this we won't needed any more.
+                Row0#rdoc{rep=nil, rid=RepId, info=nil}
+        end,
+        true = ets:insert(?MODULE, NewRow),
         ok = maybe_start_worker(Id);
     _ ->
         ok  % doc could have been deleted, ignore
@@ -263,43 +289,6 @@ worker_returned(Ref, Id, {permanent_failure, _Reason}) ->
     ok.
 
 
-% Filtered replication id didn't change.
--spec update_docs_row(#rdoc{}, rep_id()) -> #rdoc{}.
-update_docs_row(#rdoc{rid = RepId, filter = user} = Row, RepId) ->
-    Row#rdoc{state = scheduled, errcnt = 0, worker = nil};
-
-% Calculated new replication id for a filtered replication. Make sure
-% to schedule another check as filter code could change. Replications starts
-% could have been failing, so also clear error count.
-update_docs_row(#rdoc{rid = nil, filter = user} = Row, RepId) ->
-    Row#rdoc{rid = RepId, state = scheduled, errcnt = 0, worker = nil};
-
-% Replication id of existing replication job with filter has changed.
-% Remove old replication job from scheduler and schedule check to check for
-% future changes.
-update_docs_row(#rdoc{rid = OldRepId, filter = user} = Row, RepId) ->
-    ok = couch_replicator_scheduler:remove_job(OldRepId),
-    Msg = io_lib:format("Replication id changed: ~p -> ~p", [OldRepId, RepId]),
-    Row#rdoc{
-        rid = RepId,
-        state = scheduled,
-        info = couch_util:to_binary(Msg),
-        errcnt = 0,
-        worker = nil
-     };
-
-% Calculated new replication id for non-filtered replication.
- update_docs_row(#rdoc{rid = nil} = Row, RepId) ->
-    Row#rdoc{
-        rep = nil, % remove replication doc body, after this we won't needed any more
-        rid = RepId,
-        state = scheduled,
-        info = nil,
-        errcnt = 0,
-        worker = nil
-     }.
-
-
 -spec error_backoff(non_neg_integer()) -> seconds().
 error_backoff(ErrCnt) ->
     Exp = min(ErrCnt, ?ERROR_MAX_BACKOFF_EXPONENT),


[26/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Rename compat_mode config flag to update_docs

`update_docs` better reflect its functionality. When enabled old replicator
API is still closely emulated, however, there are some new states which could be
written to the document (such as `failed`). Calling it `compat_mode` is thus
misleading.

BugzID: 63012


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/2c2c87ac
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/2c2c87ac
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/2c2c87ac

Branch: refs/heads/63012-scheduler
Commit: 2c2c87accc0f9f3a0242534bade68815e0e68255
Parents: 01fffff
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Wed Nov 23 12:06:45 2016 -0500
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Wed Nov 23 12:40:05 2016 -0500

----------------------------------------------------------------------
 src/couch_replicator_doc_processor.erl | 16 ++++++++--------
 src/couch_replicator_scheduler.erl     |  2 +-
 src/couch_replicator_scheduler_job.erl |  2 +-
 3 files changed, 10 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/2c2c87ac/src/couch_replicator_doc_processor.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_doc_processor.erl b/src/couch_replicator_doc_processor.erl
index 917d8c9..6851542 100644
--- a/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator_doc_processor.erl
@@ -15,7 +15,7 @@
 
 -export([start_link/0]).
 -export([docs/1, doc/2]).
--export([compat_mode/0]).
+-export([update_docs/0]).
 
 % multidb changes callback
 -export([db_created/2, db_deleted/2, db_found/2, db_change/3]).
@@ -32,7 +32,7 @@
     get_json_value/3
 ]).
 
--define(DEFAULT_COMPATIBILITY, false).
+-define(DEFAULT_UPDATE_DOCS, false).
 -define(ERROR_MAX_BACKOFF_EXPONENT, 12).  % ~ 1 day on average
 -define(TS_DAY_SEC, 86400).
 
@@ -125,7 +125,7 @@ process_change(DbName, {Change}) ->
 
 
 maybe_remove_state_fields(DbName, DocId) ->
-    case compat_mode() of
+    case update_docs() of
         true ->
             ok;
         false ->
@@ -345,7 +345,7 @@ worker_returned(Ref, Id, {permanent_failure, _Reason}) ->
 
 -spec maybe_update_doc_error(#rep{}, any()) -> ok.
 maybe_update_doc_error(Rep, Reason) ->
-    case compat_mode() of
+    case update_docs() of
         true ->
             couch_replicator_docs:update_error(Rep, Reason);
         false ->
@@ -354,7 +354,7 @@ maybe_update_doc_error(Rep, Reason) ->
 
 -spec maybe_update_doc_triggered(#rep{}, rep_id()) -> ok.
 maybe_update_doc_triggered(Rep, RepId) ->
-    case compat_mode() of
+    case update_docs() of
         true ->
             couch_replicator_docs:update_triggered(Rep, RepId);
         false ->
@@ -420,9 +420,9 @@ get_worker_wait(#rdoc{state = error, errcnt = ErrCnt}) ->
 get_worker_wait(#rdoc{state = initializing}) ->
     0.
 
--spec compat_mode() -> boolean().
-compat_mode() ->
-    config:get_boolean("replicator", "compatibility_mode", ?DEFAULT_COMPATIBILITY).
+-spec update_docs() -> boolean().
+update_docs() ->
+    config:get_boolean("replicator", "update_docs", ?DEFAULT_UPDATE_DOCS).
 
 
 % _scheduler/docs HTTP endpoint helpers

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/2c2c87ac/src/couch_replicator_scheduler.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_scheduler.erl b/src/couch_replicator_scheduler.erl
index a772ecf..440458e 100644
--- a/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator_scheduler.erl
@@ -309,7 +309,7 @@ handle_crashed_job(#job{rep = #rep{db_name = null}} = Job, Reason, _State) ->
 
 handle_crashed_job(Job, Reason, State) ->
     ok = update_state_crashed(Job, Reason, State),
-    case couch_replicator_doc_processor:compat_mode() of
+    case couch_replicator_doc_processor:update_docs() of
         true ->
             couch_replicator_docs:update_error(Job#job.rep, Reason);
         false ->

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/2c2c87ac/src/couch_replicator_scheduler_job.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_scheduler_job.erl b/src/couch_replicator_scheduler_job.erl
index 32e6930..4dcecb4 100644
--- a/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator_scheduler_job.erl
@@ -487,7 +487,7 @@ format_status(_Opt, [_PDict, State]) ->
 doc_update_triggered(#rep{db_name = null}) ->
     ok;
 doc_update_triggered(#rep{id = RepId, doc_id = DocId} = Rep) ->
-    case couch_replicator_doc_processor:compat_mode() of
+    case couch_replicator_doc_processor:update_docs() of
         true ->
             couch_replicator_docs:update_triggered(Rep, RepId);
         false ->


[50/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Merge pull request #119 from cloudant/63012-add-source-target-continuous-to-terminal-states

Add 'source' and 'target' fields to _scheduler/docs terminal states output

Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/27a5eae9
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/27a5eae9
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/27a5eae9

Branch: refs/heads/63012-scheduler
Commit: 27a5eae90adcee81315dc67734a6f4577d450476
Parents: 0b51c1b 419ea1d
Author: Nick Vatamaniuc <ni...@users.noreply.github.com>
Authored: Tue Mar 7 13:02:17 2017 -0500
Committer: GitHub <no...@github.com>
Committed: Tue Mar 7 13:02:17 2017 -0500

----------------------------------------------------------------------
 src/couch_replicator.erl              | 62 ++++++++++++++++++++++++++++--
 src/couch_replicator_docs.erl         |  2 +-
 src/couch_replicator_js_functions.hrl |  8 +++-
 3 files changed, 65 insertions(+), 7 deletions(-)
----------------------------------------------------------------------



[12/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Fix canceling _replicate requests using the full document body

If 'cancel' was 'true' but no id was specified, previously id was not parsed
out of the document body and remained undefined.

Fix and make all the possible configuration explicit in the parsing case
statement.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/b7d92afb
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/b7d92afb
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/b7d92afb

Branch: refs/heads/63012-scheduler
Commit: b7d92afb8da7c0caa29d8a734f83a1df051ae382
Parents: c4f7b78
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Fri Oct 28 10:45:32 2016 -0400
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Fri Oct 28 10:45:32 2016 -0400

----------------------------------------------------------------------
 src/couch_replicator_docs.erl | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/b7d92afb/src/couch_replicator_docs.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_docs.erl b/src/couch_replicator_docs.erl
index e27aad6..71acc29 100644
--- a/src/couch_replicator_docs.erl
+++ b/src/couch_replicator_docs.erl
@@ -204,10 +204,17 @@ parse_rep_doc_without_id(RepDoc) ->
 -spec parse_rep_doc({[_]}, #user_ctx{}) -> {ok, #rep{}}.
 parse_rep_doc(Doc, UserCtx) ->
     {ok, Rep} = parse_rep_doc_without_id(Doc, UserCtx),
-    case get_value(cancel, Rep#rep.options, false) of
-        true ->
+    Cancel = get_value(cancel, Rep#rep.options, false),
+    Id = get_value(id, Rep#rep.options, nil),
+    case {Cancel, Id} of
+        {true, nil} ->
+            % Cancel request with no id, must parse id out of body contents
+            {ok, update_rep_id(Rep)};
+        {true, Id} ->
+            % Cancel request with an id specified, so do not parse id from body
             {ok, Rep};
-        false ->
+        {false, _Id} ->
+            % Not a cancel request, regular replication doc
             {ok, update_rep_id(Rep)}
     end.
 


[45/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Merge branch 'master' into 63012-scheduler-update-from-upstream

Master branch 3 functional changes:

"Allow configuring maximum document ID length during replication"
d23025ebd7176f6c307ddf49902cf20b33bd55c4

"Fix crashes when replicator db is deleted"
7a2b2b68bf4afa6d20b56a8ae51361f83981412a

"Make sure to log db as well as doc in replicator logs"
50dcd7d7c5f7ce003e8e2fc84646c1aa9931ebaa

The 1st change merged cleanly and is part of this commit. The other two where
changes the replicator manager. Those will be separate commits as they have
to be ported manually.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/38c94056
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/38c94056
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/38c94056

Branch: refs/heads/63012-scheduler
Commit: 38c94056d83d3c5330b30b65aea058f1f728700d
Parents: bcd852d 50a88ba
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Fri Mar 3 12:36:32 2017 -0500
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Fri Mar 3 12:37:26 2017 -0500

----------------------------------------------------------------------
 src/couch_replicator_changes_reader.erl     | 24 +++++-
 test/couch_replicator_id_too_long_tests.erl | 94 ++++++++++++++++++++++++
 2 files changed, 115 insertions(+), 3 deletions(-)
----------------------------------------------------------------------



[30/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Merge pull request #101 from cloudant/63012-scheduler-proxy-fix

Share replicator connections by proxy

Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/85dece2e
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/85dece2e
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/85dece2e

Branch: refs/heads/63012-scheduler
Commit: 85dece2e7fea6713ca03c00e2a536b85af01f332
Parents: 1413a6f 32d0df7
Author: Benjamin Bastian <be...@gmail.com>
Authored: Wed Nov 23 14:06:04 2016 -0800
Committer: GitHub <no...@github.com>
Committed: Wed Nov 23 14:06:04 2016 -0800

----------------------------------------------------------------------
 src/couch_replicator_api_wrap.hrl     |  3 +-
 src/couch_replicator_connection.erl   |  7 ++-
 src/couch_replicator_docs.erl         | 32 ++++++++------
 src/couch_replicator_httpc.erl        | 14 +++++-
 src/couch_replicator_scheduler.erl    |  9 +++-
 test/couch_replicator_proxy_tests.erl | 69 ++++++++++++++++++++++++++++++
 6 files changed, 115 insertions(+), 19 deletions(-)
----------------------------------------------------------------------



[17/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Merge pull request #97 from cloudant/63012-scheduler-better-timestamp

Add start_time and last_updated fields to _scheduler/ endpoints output 

Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/13ea06b3
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/13ea06b3
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/13ea06b3

Branch: refs/heads/63012-scheduler
Commit: 13ea06b32997836ee2a5312b2d8eaf814f562fc0
Parents: 4e9b956 ab198aa
Author: Nick Vatamaniuc <ni...@users.noreply.github.com>
Authored: Thu Nov 3 16:50:44 2016 -0400
Committer: GitHub <no...@github.com>
Committed: Thu Nov 3 16:50:44 2016 -0400

----------------------------------------------------------------------
 src/couch_replicator.erl                      | 33 +++++++++++++----
 src/couch_replicator.hrl                      |  3 +-
 src/couch_replicator_doc_processor.erl        | 43 +++++++++++++++-------
 src/couch_replicator_doc_processor_worker.erl |  5 ++-
 src/couch_replicator_docs.erl                 | 41 ++++++---------------
 src/couch_replicator_js_functions.hrl         | 10 ++++-
 src/couch_replicator_scheduler.erl            | 32 ++++++++--------
 src/couch_replicator_scheduler_job.erl        | 13 ++++---
 src/couch_replicator_utils.erl                |  8 ++++
 9 files changed, 113 insertions(+), 75 deletions(-)
----------------------------------------------------------------------



[08/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
When adding job, first call maybe_delete existent job then call add

Previously job was added, then removed. However surprisingly sometimes
it still worked because job was also started (if less then max_job were
running), and during start job record is inserted in the ets table.
However, as soon as max_jobs limit was reached jobs were not added to
the job ets table anymore.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/fabc14ed
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/fabc14ed
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/fabc14ed

Branch: refs/heads/63012-scheduler
Commit: fabc14ed397df451bc01204fda87e5ef150a4e09
Parents: 7bdaba5
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Mon Oct 24 17:48:55 2016 -0400
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Mon Oct 24 17:48:55 2016 -0400

----------------------------------------------------------------------
 src/couch_replicator_scheduler.erl | 19 +++++++------------
 1 file changed, 7 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/fabc14ed/src/couch_replicator_scheduler.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_scheduler.erl b/src/couch_replicator_scheduler.erl
index b43b36c..dadd8b0 100644
--- a/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator_scheduler.erl
@@ -175,18 +175,13 @@ init(_) ->
 
 
 handle_call({add_job, Job}, _From, State) ->
-    case add_job_int(Job) of
-        true ->
-            ok = maybe_remove_job_int(Job#job.id, State),
-            ok = maybe_start_newly_added_job(Job, State),
-            couch_stats:increment_counter([couch_replicator, jobs, adds]),
-            TotalJobs = ets:info(?MODULE, size),
-            couch_stats:update_gauge([couch_replicator, jobs, total], TotalJobs),
-            {reply, ok, State};
-        false ->
-            couch_stats:increment_counter([couch_replicator, jobs, duplicate_adds]),
-            {reply, {error, already_added}, State}
-    end;
+    ok = maybe_remove_job_int(Job#job.id, State),
+    true = add_job_int(Job),
+    ok = maybe_start_newly_added_job(Job, State),
+    couch_stats:increment_counter([couch_replicator, jobs, adds]),
+    TotalJobs = ets:info(?MODULE, size),
+    couch_stats:update_gauge([couch_replicator, jobs, total], TotalJobs),
+    {reply, ok, State};
 
 handle_call({remove_job, Id}, _From, State) ->
     ok = maybe_remove_job_int(Id, State),


[07/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Merge pull request #90 from cloudant/63012-recreate-missing-replicator-design-doc-on-view-fail

Re-create missing design doc if it is not found.

Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/7bdaba52
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/7bdaba52
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/7bdaba52

Branch: refs/heads/63012-scheduler
Commit: 7bdaba5272e188ffd8b7a67eda2abd5c5c19085e
Parents: b759481 3bb565a
Author: Nick Vatamaniuc <ni...@users.noreply.github.com>
Authored: Mon Oct 24 14:17:18 2016 -0400
Committer: GitHub <no...@github.com>
Committed: Mon Oct 24 14:17:18 2016 -0400

----------------------------------------------------------------------
 src/couch_replicator.erl      | 11 +++++++++--
 src/couch_replicator_docs.erl | 10 ++++++++++
 2 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[34/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Merge pull request #104 from cloudant/63012-fix-crashing-and-error-job-migration

Fix job migration during membership change

Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/32d43df3
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/32d43df3
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/32d43df3

Branch: refs/heads/63012-scheduler
Commit: 32d43df3f1a384e65f99cf8189d4413e7c294ffd
Parents: bc2f053 99a2b90
Author: Nick Vatamaniuc <ni...@users.noreply.github.com>
Authored: Mon Jan 9 14:11:12 2017 -0500
Committer: GitHub <no...@github.com>
Committed: Mon Jan 9 14:11:12 2017 -0500

----------------------------------------------------------------------
 src/couch_replicator_clustering.erl    | 23 ++++++++++-----
 src/couch_replicator_db_changes.erl    | 13 +--------
 src/couch_replicator_doc_processor.erl | 43 ++++++++++++++++++++++++++++-
 src/couch_replicator_scheduler_job.erl | 22 +++++----------
 4 files changed, 66 insertions(+), 35 deletions(-)
----------------------------------------------------------------------



[36/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Update filtered replication id when it changes in doc processor ETS table

When replication filter changes, replication id record in doc processor ETS
table was not updated. This led to the new replication job not showing up in
the _scheduler/docs output.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/f2b3ac5d
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/f2b3ac5d
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/f2b3ac5d

Branch: refs/heads/63012-scheduler
Commit: f2b3ac5d009ea3a185097676316669b84dfaaeb0
Parents: 73d1736
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Wed Dec 7 20:50:14 2016 -0500
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Wed Jan 11 14:58:33 2017 -0500

----------------------------------------------------------------------
 src/couch_replicator_doc_processor.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f2b3ac5d/src/couch_replicator_doc_processor.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_doc_processor.erl b/src/couch_replicator_doc_processor.erl
index 035a7ec..402a72f 100644
--- a/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator_doc_processor.erl
@@ -310,7 +310,7 @@ worker_returned(Ref, Id, {ok, RepId}) ->
                 % for future changes.
                 ok = couch_replicator_scheduler:remove_job(OldRepId),
                 Msg = io_lib:format("Replication id changed: ~p -> ~p", [OldRepId, RepId]),
-                Row0#rdoc{info = couch_util:to_binary(Msg)};
+                Row0#rdoc{rid = RepId, info = couch_util:to_binary(Msg)};
             #rdoc{rid = nil} ->
                 % Calculated new replication id for non-filtered replication. Remove
                 % replication doc body, after this we won't needed any more.


[20/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Fix transient replications authorization

Previously non-admin user could replace or cancel a replication job created
by admin or other users.

Allow replacement and cancelation only for admin and matching user.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/b68d79e1
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/b68d79e1
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/b68d79e1

Branch: refs/heads/63012-scheduler
Commit: b68d79e118e54c4e9a70132d2f3bee4e77c7d9db
Parents: 1ad72e3
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Wed Nov 2 21:39:36 2016 -0400
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Tue Nov 8 16:41:48 2016 -0500

----------------------------------------------------------------------
 src/couch_replicator.erl | 99 +++++++++++++++++++++++++++++++++----------
 1 file changed, 77 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/b68d79e1/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index 0a159c3..59fb292 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -52,13 +52,20 @@ replicate(PostBody, Ctx) ->
     #rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep,
     case get_value(cancel, Options, false) of
     true ->
-        case get_value(id, Options, nil) of
+        CancelRepId = case get_value(id, Options, nil) of
         nil ->
-            cancel_replication(RepId);
+            RepId;
         RepId2 ->
-            cancel_replication(RepId2, UserCtx)
+            RepId2
+        end,
+        case check_authorization(CancelRepId, UserCtx) of
+        ok ->
+            cancel_replication(CancelRepId);
+        not_found ->
+            {error, not_found}
         end;
     false ->
+        check_authorization(RepId, UserCtx),
         {ok, Listener} = rep_result_listener(RepId),
         Result = do_replication_loop(Rep),
         couch_replicator_notifier:stop(Listener),
@@ -131,25 +138,6 @@ cancel_replication({BasedId, Extension} = RepId) ->
     end.
 
 
--spec cancel_replication(rep_id(), #user_ctx{}) ->
-    {ok, {cancelled, binary()}} | {error, not_found}.
-cancel_replication(RepId, #user_ctx{name = Name, roles = Roles}) ->
-    case lists:member(<<"_admin">>, Roles) of
-    true ->
-        cancel_replication(RepId);
-    false ->
-        case couch_replicator_scheduler:rep_state(RepId) of
-        #rep{user_ctx = #user_ctx{name = Name}} ->
-            cancel_replication(RepId);
-        #rep{user_ctx = #user_ctx{name = _Other}} ->
-            throw({unauthorized,
-                <<"Can't cancel a replication triggered by another user">>});
-        nil ->
-            {error, not_found}
-        end
-     end.
-
-
 -spec replication_states() -> [atom()].
 replication_states() ->
     ?REPLICATION_STATES.
@@ -305,3 +293,70 @@ doc_from_db(RepDb, DocId, UserCtx) ->
          {not_found, _Reason} ->
             {error, not_found}
     end.
+
+
+-spec check_authorization(rep_id(), #user_ctx{}) -> ok | not_found.
+check_authorization(RepId, #user_ctx{name = Name} = Ctx) ->
+    case couch_replicator_scheduler:rep_state(RepId) of
+    #rep{user_ctx = #user_ctx{name = Name}} ->
+        ok;
+    #rep{} ->
+        couch_httpd:verify_is_server_admin(Ctx);
+    nil ->
+        not_found
+    end.
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+authorization_test_() ->
+    {
+        foreach,
+        fun () -> ok end,
+        fun (_) -> meck:unload() end,
+        [
+            t_admin_is_always_authorized(),
+            t_username_must_match(),
+            t_replication_not_found()
+        ]
+    }.
+
+
+t_admin_is_always_authorized() ->
+    ?_test(begin
+        expect_rep_user_ctx(<<"someuser">>, <<"_admin">>),
+        UserCtx = #user_ctx{name = <<"adm">>, roles = [<<"_admin">>]},
+        ?assertEqual(ok, check_authorization(<<"RepId">>, UserCtx))
+    end).
+
+
+t_username_must_match() ->
+     ?_test(begin
+        expect_rep_user_ctx(<<"user">>, <<"somerole">>),
+        UserCtx1 = #user_ctx{name = <<"user">>, roles = [<<"somerole">>]},
+        ?assertEqual(ok, check_authorization(<<"RepId">>, UserCtx1)),
+        UserCtx2 = #user_ctx{name = <<"other">>, roles = [<<"somerole">>]},
+        ?assertThrow({unauthorized, _}, check_authorization(<<"RepId">>, UserCtx2))
+    end).
+
+
+t_replication_not_found() ->
+     ?_test(begin
+        meck:expect(couch_replicator_scheduler, rep_state, 1, nil),
+        UserCtx1 = #user_ctx{name = <<"user">>, roles = [<<"somerole">>]},
+        ?assertEqual(not_found, check_authorization(<<"RepId">>, UserCtx1)),
+        UserCtx2 = #user_ctx{name = <<"adm">>, roles = [<<"_admin">>]},
+        ?assertEqual(not_found, check_authorization(<<"RepId">>, UserCtx2))
+    end).
+
+
+expect_rep_user_ctx(Name, Role) ->
+    meck:expect(couch_replicator_scheduler, rep_state,
+        fun(_Id) ->
+            UserCtx = #user_ctx{name = Name, roles = [Role]},
+            #rep{user_ctx = UserCtx}
+        end).
+
+-endif.


[39/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Fix ISO8601 timestamp formatting

`2017-01-16T11-38-04Z` is now `2017-01-16T11:38:04Z`


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/8746e715
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/8746e715
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/8746e715

Branch: refs/heads/63012-scheduler
Commit: 8746e7158d54d21f7fe0c722c22d116ec9a034e1
Parents: 146b700
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Mon Jan 23 10:34:36 2017 -0500
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Mon Jan 23 10:34:36 2017 -0500

----------------------------------------------------------------------
 src/couch_replicator_utils.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/8746e715/src/couch_replicator_utils.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_utils.erl b/src/couch_replicator_utils.erl
index 4ab4f81..3fd98a3 100644
--- a/src/couch_replicator_utils.erl
+++ b/src/couch_replicator_utils.erl
@@ -141,5 +141,5 @@ parse_rep_doc(Props, UserCtx) ->
 -spec iso8601(erlang:timestamp()) -> binary().
 iso8601({_Mega, _Sec, _Micro} = Timestamp) ->
     {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time(Timestamp),
-    Format = "~B-~2..0B-~2..0BT~2..0B-~2..0B-~2..0BZ",
+    Format = "~B-~2..0B-~2..0BT~2..0B:~2..0B:~2..0BZ",
     iolist_to_binary(io_lib:format(Format, [Y, Mon, D, H, Min, S])).


[16/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Add start_time and last_updated fields to _scheduler/ endpoints output

Also for consistency add _replication_start_time field to completed and failed
replication document updates.

`start_time` is defined as the time since the replication first was noticed by
the replicator. For a document-based replication it is when the document update
event reached couch replicator and the replication record was parsed from it.
For a _replicate replication it is the time when the POST request body was
parsed.

`last_updated` is time of last state update.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/ab198aae
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/ab198aae
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/ab198aae

Branch: refs/heads/63012-scheduler
Commit: ab198aaeca6da5a3287a507f2e810f8d7c483a91
Parents: 4e9b956
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Wed Nov 2 13:57:37 2016 -0400
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Thu Nov 3 16:37:42 2016 -0400

----------------------------------------------------------------------
 src/couch_replicator.erl                      | 33 +++++++++++++----
 src/couch_replicator.hrl                      |  3 +-
 src/couch_replicator_doc_processor.erl        | 43 +++++++++++++++-------
 src/couch_replicator_doc_processor_worker.erl |  5 ++-
 src/couch_replicator_docs.erl                 | 41 ++++++---------------
 src/couch_replicator_js_functions.hrl         | 10 ++++-
 src/couch_replicator_scheduler.erl            | 32 ++++++++--------
 src/couch_replicator_scheduler_job.erl        | 13 ++++---
 src/couch_replicator_utils.erl                |  8 ++++
 9 files changed, 113 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/ab198aae/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index a8d4608..0a159c3 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -47,8 +47,9 @@
     {ok, {cancelled, binary()}} |
     {error, any()}.
 replicate(PostBody, Ctx) ->
-    {ok, #rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep} =
-        couch_replicator_utils:parse_rep_doc(PostBody, Ctx),
+    {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, Ctx),
+    Rep = Rep0#rep{start_time = os:timestamp()},
+    #rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep,
     case get_value(cancel, Options, false) of
     true ->
         case get_value(id, Options, nil) of
@@ -207,7 +208,15 @@ handle_replicator_doc_query({row, Props}, {Db, Cb, UserAcc, States}) ->
     DocId = couch_util:get_value(id, Props),
     DocStateBin = couch_util:get_value(key, Props),
     DocState = erlang:binary_to_existing_atom(DocStateBin, utf8),
-    StateInfo = couch_util:get_value(value, Props),
+    MapValue = couch_util:get_value(value, Props),
+    {StartTime, StateTime, StateInfo} = case MapValue of
+        [StartT, StateT, Info] ->
+            {StartT, StateT, Info};
+        _Other ->
+            % Handle the case where the view code was upgraded but new view code
+            % wasn't updated yet (before a _scheduler/docs request was made)
+            {null, null, null}
+    end,
     % Set the error_count to 1 if failed. This is mainly for consistency as
     % jobs from doc_processor and scheduler will have that value set
     ErrorCount = case DocState of failed -> 1; _ -> 0 end,
@@ -219,7 +228,9 @@ handle_replicator_doc_query({row, Props}, {Db, Cb, UserAcc, States}) ->
                 {id, null},
                 {state, DocState},
                 {error_count, ErrorCount},
-                {info, StateInfo}
+                {info, StateInfo},
+                {last_updated, StateTime},
+                {start_time, StartTime}
             ]},
             {ok, {Db, Cb, Cb(EjsonInfo, UserAcc), States}};
         false ->
@@ -268,12 +279,16 @@ doc_from_db(RepDb, DocId, UserCtx) ->
     case fabric:open_doc(RepDb, DocId, [UserCtx, ejson_body]) of
         {ok, Doc} ->
             {Props} = couch_doc:to_json_obj(Doc, []),
-            State = couch_util:get_value(<<"_replication_state">>, Props, null),
+            State = get_value(<<"_replication_state">>, Props, null),
+            StartTime = get_value(<<"_replication_start_time">>, Props, null),
+            StateTime = get_value(<<"_replication_state_time">>, Props, null),
             {StateInfo, ErrorCount} = case State of
                 <<"completed">> ->
-                    {couch_util:get_value(<<"_replication_stats">>, Props, null), 0};
+                    Info = get_value(<<"_replication_stats">>, Props, null),
+                    {Info, 0};
                 <<"failed">> ->
-                    {couch_util:get_value(<<"_replication_state_reason">>, Props, null), 1};
+                    Info = get_value(<<"_replication_state_reason">>, Props, null),
+                    {Info, 1};
                 _OtherState ->
                     {null, 0}
             end,
@@ -283,7 +298,9 @@ doc_from_db(RepDb, DocId, UserCtx) ->
                 {id, null},
                 {state, State},
                 {error_count, ErrorCount},
-                {info, StateInfo}
+                {info, StateInfo},
+                {start_time, StartTime},
+                {last_updated, StateTime}
             ]}};
          {not_found, _Reason} ->
             {error, not_found}

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/ab198aae/src/couch_replicator.hrl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.hrl b/src/couch_replicator.hrl
index f116ee2..339e162 100644
--- a/src/couch_replicator.hrl
+++ b/src/couch_replicator.hrl
@@ -21,7 +21,8 @@
     type = db :: atom() | '_',
     view = nil :: any() | '_',
     doc_id :: any() | '_',
-    db_name = null :: null | binary() | '_'
+    db_name = null :: null | binary() | '_',
+    start_time :: erlang:timestamp() | '_'
 }).
 
 -type rep_id() :: {string(), string()}.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/ab198aae/src/couch_replicator_doc_processor.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_doc_processor.erl b/src/couch_replicator_doc_processor.erl
index eadea5d..06ec743 100644
--- a/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator_doc_processor.erl
@@ -46,7 +46,8 @@
     filter :: filter_type() | '_',
     info :: binary() | nil | '_',
     errcnt :: non_neg_integer() | '_',
-    worker :: reference() | nil | '_'
+    worker :: reference() | nil | '_',
+    last_updated :: erlang:timestamp() | '_'
 }).
 
 
@@ -79,7 +80,8 @@ db_change(DbName, {ChangeProps} = Change, Server) ->
     _Tag:Error ->
         {RepProps} = get_json_value(doc, ChangeProps),
         DocId = get_json_value(<<"_id">>, RepProps),
-        couch_replicator_docs:update_failed(DbName, DocId, Error)
+        Timestamp = os:timestamp(),
+        couch_replicator_docs:update_failed(DbName, DocId, Error, Timestamp)
     end,
     Server.
 
@@ -127,7 +129,7 @@ process_updated({DbName, _DocId} = Id, JsonRepDoc) ->
     % failure in the document. User will have to delete and re-create the document
     % to fix the problem.
     Rep0 = couch_replicator_docs:parse_rep_doc_without_id(JsonRepDoc),
-    Rep = Rep0#rep{db_name = DbName},
+    Rep = Rep0#rep{db_name = DbName, start_time = os:timestamp()},
     Filter = case couch_replicator_filters:parse(Rep#rep.options) of
     {ok, nil} ->
         nil;
@@ -213,7 +215,8 @@ updated_doc(Id, Rep, Filter) ->
         filter = Filter,
         info = nil,
         errcnt = 0,
-        worker = nil
+        worker = nil,
+        last_updated = os:timestamp()
     },
     true = ets:insert(?MODULE, Row),
     ok = maybe_start_worker(Id),
@@ -224,7 +227,8 @@ updated_doc(Id, Rep, Filter) ->
 worker_returned(Ref, Id, {ok, RepId}) ->
     case ets:lookup(?MODULE, Id) of
     [#rdoc{worker = Ref} = Row] ->
-        true = ets:insert(?MODULE, update_docs_row(Row, RepId)),
+        NewRow = update_docs_row(Row, RepId),
+        true = ets:insert(?MODULE, NewRow#rdoc{last_updated = os:timestamp()}),
         ok = maybe_start_worker(Id);
     _ ->
         ok  % doc could have been deleted, ignore
@@ -239,7 +243,8 @@ worker_returned(Ref, Id, {temporary_error, Reason}) ->
             state = error,
             info = Reason,
             errcnt = ErrCnt + 1,
-            worker = nil
+            worker = nil,
+            last_updated = os:timestamp()
         },
         true = ets:insert(?MODULE, NewRow),
         ok = maybe_start_worker(Id);
@@ -433,7 +438,9 @@ ejson_doc(#rdoc{state = RepState} = RDoc, _HealthThreshold) ->
        id = {DbName, DocId},
        info = StateInfo,
        rid = RepId,
-       errcnt = ErrorCount
+       errcnt = ErrorCount,
+       last_updated = StateTime,
+       rep = Rep
     } = RDoc,
     {[
         {doc_id, DocId},
@@ -442,7 +449,9 @@ ejson_doc(#rdoc{state = RepState} = RDoc, _HealthThreshold) ->
         {state, RepState},
         {info, ejson_state_info(StateInfo)},
         {error_count, ErrorCount},
-        {node, node()}
+        {node, node()},
+        {last_updated, couch_replicator_utils:iso8601(StateTime)},
+        {start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)}
     ]}.
 
 
@@ -510,7 +519,7 @@ t_regular_change() ->
 % a running job with same Id found.
 t_change_with_existing_job() ->
     ?_test(begin
-        mock_existing_jobs_lookup([#rep{id = ?R2}]),
+        mock_existing_jobs_lookup([test_rep(?R2)]),
         ?assertEqual(ok, process_change(?DB, change())),
         ?assert(ets:member(?MODULE, {?DB, ?DOC1})),
         ?assert(started_worker({?DB, ?DOC1}))
@@ -520,7 +529,7 @@ t_change_with_existing_job() ->
 % Change is a deletion, and job is running, so remove job.
 t_deleted_change() ->
     ?_test(begin
-        mock_existing_jobs_lookup([#rep{id = ?R2}]),
+        mock_existing_jobs_lookup([test_rep(?R2)]),
         ?assertEqual(ok, process_change(?DB, deleted_change())),
         ?assert(removed_job(?R2))
     end).
@@ -615,6 +624,10 @@ t_ejson_docs() ->
         EJsonDocs = docs([]),
         ?assertMatch([{[_|_]}], EJsonDocs),
         [{DocProps}] = EJsonDocs,
+        {value, StateTime, DocProps1} = lists:keytake(last_updated, 1, DocProps),
+        ?assertMatch({last_updated, BinVal1} when is_binary(BinVal1), StateTime),
+        {value, StartTime, DocProps2} = lists:keytake(start_time, 1, DocProps1),
+        ?assertMatch({start_time, BinVal2} when is_binary(BinVal2), StartTime),
         ExpectedProps = [
             {database, ?DB},
             {doc_id, ?DOC1},
@@ -624,7 +637,7 @@ t_ejson_docs() ->
             {node, node()},
             {state, initializing}
         ],
-        ?assertEqual(ExpectedProps, lists:usort(DocProps))
+        ?assertEqual(ExpectedProps, lists:usort(DocProps2))
     end).
 
 
@@ -641,7 +654,7 @@ setup() ->
     meck:expect(couch_replicator_doc_processor_worker, spawn_worker, 3, wref),
     meck:expect(couch_replicator_scheduler, remove_job, 1, ok),
     meck:expect(couch_replicator_docs, remove_state_fields, 2, ok),
-    meck:expect(couch_replicator_docs, update_failed, 3, ok),
+    meck:expect(couch_replicator_docs, update_failed, 4, ok),
     {ok, Pid} = start_link(),
     Pid.
 
@@ -662,7 +675,7 @@ started_worker(Id) ->
 
 
 removed_job(Id) ->
-    meck:called(couch_replicator_scheduler, remove_job, [#rep{id = Id}]).
+    meck:called(couch_replicator_scheduler, remove_job, [test_rep(Id)]).
 
 
 did_not_remove_state_fields() ->
@@ -682,6 +695,10 @@ mock_existing_jobs_lookup(ExistingJobs) ->
             fun(?DB, ?DOC1) -> ExistingJobs end).
 
 
+test_rep(Id) ->
+  #rep{id = Id, start_time = {0, 0, 0}}.
+
+
 change() ->
     {[
         {<<"id">>, ?DOC1},

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/ab198aae/src/couch_replicator_doc_processor_worker.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_doc_processor_worker.erl b/src/couch_replicator_doc_processor_worker.erl
index 6107438..a6bdeef 100644
--- a/src/couch_replicator_doc_processor_worker.erl
+++ b/src/couch_replicator_doc_processor_worker.erl
@@ -78,7 +78,8 @@ maybe_start_replication(Id, RepWithoutId) ->
         {temporary_error, Reason};
     {permanent_failure, Reason} ->
         {DbName, DocId} = Id,
-        couch_replicator_docs:update_failed(DbName, DocId, Reason),
+        StartTime = Rep#rep.start_time,
+        couch_replicator_docs:update_failed(DbName, DocId, Reason, StartTime),
         {permanent_failure, Reason}
     end.
 
@@ -199,7 +200,7 @@ setup() ->
     meck:expect(couch_replicator_scheduler, add_job, 1, ok),
     meck:expect(config, get, fun(_, _, Default) -> Default end),
     meck:expect(couch_server, get_uuid, 0, this_is_snek),
-    meck:expect(couch_replicator_docs, update_failed, 3, ok),
+    meck:expect(couch_replicator_docs, update_failed, 4, ok),
     meck:expect(couch_replicator_scheduler, rep_state, 1, nil),
     ok.
 

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/ab198aae/src/couch_replicator_docs.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_docs.erl b/src/couch_replicator_docs.erl
index 71acc29..75889e9 100644
--- a/src/couch_replicator_docs.erl
+++ b/src/couch_replicator_docs.erl
@@ -19,8 +19,8 @@
 -export([ensure_cluster_rep_ddoc_exists/1]).
 -export([
     remove_state_fields/2,
-    update_doc_completed/3,
-    update_failed/3,
+    update_doc_completed/4,
+    update_failed/4,
     update_rep_id/1
 ]).
 
@@ -59,17 +59,19 @@ remove_state_fields(DbName, DocId) ->
         {<<"_replication_state_reason">>, undefined},
         {<<"_replication_stats">>, undefined}]).
 
--spec update_doc_completed(binary(), binary(), [_]) -> any().
-update_doc_completed(DbName, DocId, Stats) ->
+-spec update_doc_completed(binary(), binary(), [_], erlang:timestamp()) -> any().
+update_doc_completed(DbName, DocId, Stats, StartTime) ->
+    StartTimeBin = couch_replicator_utils:iso8601(StartTime),
     update_rep_doc(DbName, DocId, [
         {<<"_replication_state">>, <<"completed">>},
         {<<"_replication_state_reason">>, undefined},
+        {<<"_replication_start_time">>,  StartTimeBin},
         {<<"_replication_stats">>, {Stats}}]),
     couch_stats:increment_counter([couch_replicator, docs, completed_state_updates]).
 
 
--spec update_failed(binary(), binary(), any()) -> any().
-update_failed(DbName, DocId, Error) ->
+-spec update_failed(binary(), binary(), any(), erlang:timestamp()) -> any().
+update_failed(DbName, DocId, Error, StartTime) ->
     Reason = case Error of
         {bad_rep_doc, Reas} ->
             Reas;
@@ -77,8 +79,10 @@ update_failed(DbName, DocId, Error) ->
             to_binary(Error)
     end,
     couch_log:error("Error processing replication doc `~s`: ~s", [DocId, Reason]),
+    StartTimeBin = couch_replicator_utils:iso8601(StartTime),
     update_rep_doc(DbName, DocId, [
         {<<"_replication_state">>, <<"failed">>},
+        {<<"_replication_start_time">>, StartTimeBin},
         {<<"_replication_state_reason">>, Reason}]),
    couch_stats:increment_counter([couch_replicator, docs, failed_state_updates]).
 
@@ -295,9 +299,10 @@ update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) ->
                     Body;
                 _ ->
                     Body1 = lists:keystore(K, 1, Body, KV),
+                    Timestamp = couch_replicator_utils:iso8601(os:timestamp()),
                     lists:keystore(
                         <<"_replication_state_time">>, 1, Body1,
-                        {<<"_replication_state_time">>, timestamp()})
+                        {<<"_replication_state_time">>, Timestamp})
                 end;
             ({K, _V} = KV, Body) ->
                 lists:keystore(K, 1, Body, KV)
@@ -329,28 +334,6 @@ save_rep_doc(DbName, Doc) ->
     end.
 
 
-% RFC3339 timestamps.
-% Note: doesn't include the time seconds fraction (RFC3339 says it's optional).
--spec timestamp() -> binary().
-timestamp() ->
-    {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(os:timestamp()),
-    UTime = erlang:universaltime(),
-    LocalTime = calendar:universal_time_to_local_time(UTime),
-    DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) -
-        calendar:datetime_to_gregorian_seconds(UTime),
-    zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60),
-    iolist_to_binary(
-        io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w~s",
-            [Year, Month, Day, Hour, Min, Sec,
-                zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])).
-
--spec zone(integer(), integer()) -> iolist().
-zone(Hr, Min) when Hr >= 0, Min >= 0 ->
-    io_lib:format("+~2..0w:~2..0w", [Hr, Min]);
-zone(Hr, Min) ->
-    io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]).
-
-
 -spec rep_user_ctx({[_]}) -> #user_ctx{}.
 rep_user_ctx({RepDoc}) ->
     case get_json_value(<<"user_ctx">>, RepDoc) of

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/ab198aae/src/couch_replicator_js_functions.hrl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_js_functions.hrl b/src/couch_replicator_js_functions.hrl
index d70b1ff..3724b19 100644
--- a/src/couch_replicator_js_functions.hrl
+++ b/src/couch_replicator_js_functions.hrl
@@ -176,9 +176,15 @@
     function(doc) {
         state = doc._replication_state;
         if (state === 'failed') {
-            emit('failed', doc._replication_state_reason);
+            start_time = doc._replication_start_time;
+            last_updated = doc._replication_state_time;
+            state_reason = doc._replication_state_reason;
+            emit('failed', [start_time, last_updated, state_reason]);
         } else if (state === 'completed') {
-            emit('completed', doc._replication_stats);
+            start_time = doc._replication_start_time;
+            last_updated = doc._replication_state_time;
+            stats = doc._replication_stats;
+            emit('completed', [start_time, last_updated, stats]);
         }
     }
 ">>).

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/ab198aae/src/couch_replicator_scheduler.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_scheduler.erl b/src/couch_replicator_scheduler.erl
index dadd8b0..0aa4ef6 100644
--- a/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator_scheduler.erl
@@ -124,7 +124,9 @@ job_summary(JobId, HealthThreshold) ->
                 {target, iolist_to_binary(ejson_url(Rep#rep.target))},
                 {state, State},
                 {info, Info},
-                {error_count, ErrorCount}
+                {error_count, ErrorCount},
+                {last_updated, last_updated(History)},
+                {start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)}
             ];
         {error, not_found} ->
             nil  % Job might have just completed
@@ -748,20 +750,14 @@ job_ejson(Job) ->
     Rep = Job#job.rep,
     Source = ejson_url(Rep#rep.source),
     Target = ejson_url(Rep#rep.target),
-    History = lists:map(fun(Event) ->
-    EventProps  = case Event of
-        {{crashed, Reason}, _When} ->
-            [{type, crashed}, {reason, crash_reason_json(Reason)}];
-        {Type, _When} ->
-            [{type, Type}]
+    History = lists:map(fun({Type, When}) ->
+        EventProps  = case Type of
+            {crashed, Reason} ->
+                [{type, crashed}, {reason, crash_reason_json(Reason)}];
+            Type ->
+                [{type, Type}]
         end,
-        {_Type, {_Mega, _Sec, Micros}=When} = Event,
-        {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time(When),
-        ISO8601 = iolist_to_binary(io_lib:format(
-            "~B-~2..0B-~2..0BT~2..0B-~2..0B-~2..0B.~BZ",
-            [Y,Mon,D,H,Min,S,Micros]
-        )),
-        {[{timestamp, ISO8601} | EventProps]}
+        {[{timestamp, couch_replicator_utils:iso8601(When)} | EventProps]}
     end, Job#job.history),
     {BaseID, Ext} = Job#job.id,
     Pid = case Job#job.pid of
@@ -779,7 +775,8 @@ job_ejson(Job) ->
         {user, (Rep#rep.user_ctx)#user_ctx.name},
         {doc_id, Rep#rep.doc_id},
         {history, History},
-        {node, node()}
+        {node, node()},
+        {start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)}
     ]}.
 
 
@@ -808,6 +805,11 @@ crash_reason_json(Error) ->
     couch_replicator_utils:rep_error_to_binary(Error).
 
 
+-spec last_updated([_]) -> binary().
+last_updated([{_Type, When} | _]) ->
+    couch_replicator_utils:iso8601(When).
+
+
 -spec is_continuous(#job{}) -> boolean().
 is_continuous(#job{rep = Rep}) ->
     couch_util:get_value(continuous, Rep#rep.options, false).

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/ab198aae/src/couch_replicator_scheduler_job.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_scheduler_job.erl b/src/couch_replicator_scheduler_job.erl
index cdedc39..9d14a9d 100644
--- a/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator_scheduler_job.erl
@@ -494,8 +494,9 @@ doc_update_triggered(#rep{id = RepId, doc_id = DocId}) ->
 -spec doc_update_completed(#rep{}, list()) -> ok.
 doc_update_completed(#rep{db_name = null}, _Stats) ->
     ok;
-doc_update_completed(#rep{id = RepId, doc_id = DocId, db_name = DbName}, Stats) ->
-    couch_replicator_docs:update_doc_completed(DbName, DocId, Stats),
+doc_update_completed(#rep{id = RepId, doc_id = DocId, db_name = DbName,
+    start_time = StartTime}, Stats) ->
+    couch_replicator_docs:update_doc_completed(DbName, DocId, Stats, StartTime),
     couch_log:notice("Replication `~s` completed (triggered by `~s`)",
         [pp_rep_id(RepId), DocId]),
     ok.
@@ -538,7 +539,8 @@ init_state(Rep) ->
         id = {BaseId, _Ext},
         source = Src0, target = Tgt,
         options = Options, user_ctx = UserCtx,
-        type = Type, view = View
+        type = Type, view = View,
+        start_time = StartTime
     } = Rep,
     % Adjust minimum number of http source connections to 2 to avoid deadlock
     Src = adjust_maxconn(Src0, BaseId),
@@ -571,7 +573,7 @@ init_state(Rep) ->
         committed_seq = StartSeq,
         source_log = SourceLog,
         target_log = TargetLog,
-        rep_starttime = httpd_util:rfc1123_date(),
+        rep_starttime = StartTime,
         src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
         tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
         session_id = couch_uuids:random(),
@@ -673,7 +675,8 @@ do_checkpoint(State) ->
     {SrcInstanceStartTime, TgtInstanceStartTime} ->
         couch_log:notice("recording a checkpoint for `~s` -> `~s` at source update_seq ~p",
             [SourceName, TargetName, NewSeq]),
-        StartTime = ?l2b(ReplicationStartTime),
+        UniversalStartTime = calendar:now_to_universal_time(ReplicationStartTime),
+        StartTime = ?l2b(httpd_util:rfc1123_date(UniversalStartTime)),
         EndTime = ?l2b(httpd_util:rfc1123_date()),
         NewHistoryEntry = {[
             {<<"session_id">>, SessionId},

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/ab198aae/src/couch_replicator_utils.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_utils.erl b/src/couch_replicator_utils.erl
index e2f5ef8..4ab4f81 100644
--- a/src/couch_replicator_utils.erl
+++ b/src/couch_replicator_utils.erl
@@ -20,6 +20,7 @@
 -export([rep_error_to_binary/1]).
 -export([get_json_value/2, get_json_value/3]).
 -export([pp_rep_id/1]).
+-export([iso8601/1]).
 
 -export([handle_db_event/3]).
 
@@ -135,3 +136,10 @@ sum_stats(S1, S2) ->
 
 parse_rep_doc(Props, UserCtx) ->
     couch_replicator_docs:parse_rep_doc(Props, UserCtx).
+
+
+-spec iso8601(erlang:timestamp()) -> binary().
+iso8601({_Mega, _Sec, _Micro} = Timestamp) ->
+    {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time(Timestamp),
+    Format = "~B-~2..0B-~2..0BT~2..0B-~2..0B-~2..0BZ",
+    iolist_to_binary(io_lib:format(Format, [Y, Mon, D, H, Min, S])).


[38/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Merge pull request #105 from cloudant/63012-fix-filtered-replications

63012 fix filtered replications

Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/146b700f
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/146b700f
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/146b700f

Branch: refs/heads/63012-scheduler
Commit: 146b700f934e39de9b86d063f225a8ab9b8516b9
Parents: 32d43df 700a929
Author: Nick Vatamaniuc <ni...@users.noreply.github.com>
Authored: Wed Jan 11 15:46:25 2017 -0500
Committer: GitHub <no...@github.com>
Committed: Wed Jan 11 15:46:25 2017 -0500

----------------------------------------------------------------------
 src/couch_replicator.hrl                      |  2 +
 src/couch_replicator_doc_processor.erl        | 50 +++++++++---
 src/couch_replicator_doc_processor_worker.erl | 94 +++++++++++++++-------
 src/couch_replicator_scheduler.erl            |  2 +-
 4 files changed, 109 insertions(+), 39 deletions(-)
----------------------------------------------------------------------



[23/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Remove redunant comparison function


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/36367d09
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/36367d09
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/36367d09

Branch: refs/heads/63012-scheduler
Commit: 36367d094730855c852b1306b9a4e3ea1bcad56d
Parents: f41fd67
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Wed Nov 9 17:42:14 2016 -0500
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Wed Nov 9 17:42:14 2016 -0500

----------------------------------------------------------------------
 src/couch_replicator_doc_processor.erl | 7 +------
 1 file changed, 1 insertion(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/36367d09/src/couch_replicator_doc_processor.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_doc_processor.erl b/src/couch_replicator_doc_processor.erl
index dddde0f..8ee1fe7 100644
--- a/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator_doc_processor.erl
@@ -217,7 +217,7 @@ code_change(_OldVsn, State, _Extra) ->
 % same document.
 -spec updated_doc(db_doc_id(), #rep{}, filter_type()) -> ok.
 updated_doc(Id, Rep, Filter) ->
-    case compare_replication_records(current_rep(Id), Rep) of
+    case normalize_rep(current_rep(Id)) == normalize_rep(Rep) of
         false ->
             removed_doc(Id),
             Row = #rdoc{
@@ -238,11 +238,6 @@ updated_doc(Id, Rep, Filter) ->
     end.
 
 
--spec compare_replication_records(#rep{}, #rep{}) -> boolean().
-compare_replication_records(Rep1, Rep2) ->
-    normalize_rep(Rep1) == normalize_rep(Rep2).
-
-
 % Return current #rep{} record if any. If replication hasn't been submitted
 % to the scheduler yet, #rep{} record will be in the document processor's
 % ETS table, otherwise query scheduler for the #rep{} record.


[05/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Merge pull request #92 from cloudant/63012-scheduler-individual-jobs-and-docs

Add support for _scheduler/jobs/<jobid> and _scheduler/docs/<docid>

Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/b759481c
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/b759481c
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/b759481c

Branch: refs/heads/63012-scheduler
Commit: b759481cd2557561797fe30b4d825cb9ff44c8a5
Parents: a1d7554 f1140e9
Author: Nick Vatamaniuc <ni...@users.noreply.github.com>
Authored: Mon Oct 24 13:58:06 2016 -0400
Committer: GitHub <no...@github.com>
Committed: Mon Oct 24 13:58:06 2016 -0400

----------------------------------------------------------------------
 src/couch_replicator.erl               | 51 +++++++++++++++
 src/couch_replicator_doc_processor.erl | 23 ++++++-
 src/couch_replicator_docs.erl          |  3 +-
 src/couch_replicator_ids.erl           | 13 +++-
 src/couch_replicator_scheduler.erl     | 99 +++++++++++++++++------------
 5 files changed, 143 insertions(+), 46 deletions(-)
----------------------------------------------------------------------



[44/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Merge pull request #111 from cloudant/63012-use-mem3-to-scan-dbs

Use mem3 in couch_multidb_changes to discover _replicator shards

Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/bcd852d2
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/bcd852d2
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/bcd852d2

Branch: refs/heads/63012-scheduler
Commit: bcd852d2057585f60867af295b2b7c03bbe1fab9
Parents: c813d0e f52b503
Author: Nick Vatamaniuc <ni...@users.noreply.github.com>
Authored: Fri Mar 3 10:29:51 2017 -0500
Committer: GitHub <no...@github.com>
Committed: Fri Mar 3 10:29:51 2017 -0500

----------------------------------------------------------------------
 .travis.yml                   |   1 +
 src/couch_multidb_changes.erl | 169 ++++++++++++++++++++++++-------------
 2 files changed, 112 insertions(+), 58 deletions(-)
----------------------------------------------------------------------



[32/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Merge pull request #103 from cloudant/63012-fix-proxy-url-info-for-local-endpoint

Fix job summary crash for local endpoints

Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/bc2f053b
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/bc2f053b
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/bc2f053b

Branch: refs/heads/63012-scheduler
Commit: bc2f053b2dc6712a2a13fa98e47e78d3a0ff8e31
Parents: 85dece2 cb2777b
Author: Nick Vatamaniuc <ni...@users.noreply.github.com>
Authored: Fri Nov 25 12:10:24 2016 -0500
Committer: GitHub <no...@github.com>
Committed: Fri Nov 25 12:10:24 2016 -0500

----------------------------------------------------------------------
 src/couch_replicator_scheduler.erl | 15 ++++++++-------
 1 file changed, 8 insertions(+), 7 deletions(-)
----------------------------------------------------------------------



[09/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Merge pull request #93 from cloudant/63012-scheduler-delete-then-add

When adding job, first call maybe_delete existent job then call add

Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/1aa3c3d3
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/1aa3c3d3
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/1aa3c3d3

Branch: refs/heads/63012-scheduler
Commit: 1aa3c3d3cd74bedca38848c711fb9fb4700ab254
Parents: 7bdaba5 fabc14e
Author: Nick Vatamaniuc <ni...@users.noreply.github.com>
Authored: Tue Oct 25 10:41:43 2016 -0400
Committer: GitHub <no...@github.com>
Committed: Tue Oct 25 10:41:43 2016 -0400

----------------------------------------------------------------------
 src/couch_replicator_scheduler.erl | 19 +++++++------------
 1 file changed, 7 insertions(+), 12 deletions(-)
----------------------------------------------------------------------



[13/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Merge pull request #96 from cloudant/63012-cancel-with-full-body

Fix canceling _replicate requests using the full document body

Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/9584fcd8
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/9584fcd8
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/9584fcd8

Branch: refs/heads/63012-scheduler
Commit: 9584fcd8f4a46898342fea985a5d61e0c3771fa6
Parents: c4f7b78 b7d92af
Author: Nick Vatamaniuc <ni...@users.noreply.github.com>
Authored: Fri Oct 28 11:21:19 2016 -0400
Committer: GitHub <no...@github.com>
Committed: Fri Oct 28 11:21:19 2016 -0400

----------------------------------------------------------------------
 src/couch_replicator_docs.erl | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------



[47/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Forward-port "Make sure to log db as well as doc in replicator logs."

Forward-port commit: 50dcd7d7c5f7ce003e8e2fc84646c1aa9931ebaa from master.

This commit was made to couch_replicator_manager so it had to be ported by hand.

Most changes didn't apply for one or more of these reasons:
 * We already do the right thing and log db name
 * There is an improved state inspection API so we simply don't log as much
 * Functionality is completely changed (eg. We don't stop replications if they
   reach a maximum retry count. Scheduler does an exponential backoff)


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/318e9f8c
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/318e9f8c
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/318e9f8c

Branch: refs/heads/63012-scheduler
Commit: 318e9f8c7d4639b3fc6cb57cf5fad1657db773e8
Parents: 5bebb8a
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Fri Mar 3 14:16:11 2017 -0500
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Fri Mar 3 14:16:11 2017 -0500

----------------------------------------------------------------------
 src/couch_replicator_docs.erl | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/318e9f8c/src/couch_replicator_docs.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_docs.erl b/src/couch_replicator_docs.erl
index 1bc852e..c0a9acd 100644
--- a/src/couch_replicator_docs.erl
+++ b/src/couch_replicator_docs.erl
@@ -77,7 +77,8 @@ update_doc_completed(DbName, DocId, Stats, StartTime) ->
 -spec update_failed(binary(), binary(), any(), erlang:timestamp()) -> any().
 update_failed(DbName, DocId, Error, StartTime) ->
     Reason = error_reason(Error),
-    couch_log:error("Error processing replication doc `~s`: ~s", [DocId, Reason]),
+    couch_log:error("Error processing replication doc `~s` from `~s`: ~s",
+        [DocId, DbName, Reason]),
     StartTimeBin = couch_replicator_utils:iso8601(StartTime),
     update_rep_doc(DbName, DocId, [
         {<<"_replication_state">>, <<"failed">>},


[40/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Merge pull request #106 from cloudant/fix-iso-timestamp

Fix ISO8601 timestamp formatting

Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/c813d0ef
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/c813d0ef
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/c813d0ef

Branch: refs/heads/63012-scheduler
Commit: c813d0ef50678e9824c903dbb3df2bbb229c4450
Parents: 146b700 8746e71
Author: Nick Vatamaniuc <ni...@users.noreply.github.com>
Authored: Mon Jan 23 10:53:06 2017 -0500
Committer: GitHub <no...@github.com>
Committed: Mon Jan 23 10:53:06 2017 -0500

----------------------------------------------------------------------
 src/couch_replicator_utils.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------



[25/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Merge pull request #100 from cloudant/63012-compat-mode-2

Implement Compatibility Mode

Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/01fffffb
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/01fffffb
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/01fffffb

Branch: refs/heads/63012-scheduler
Commit: 01fffffb71abe71ca78534444c09c6a2d944ae10
Parents: 2f95128 849ca9b
Author: Nick Vatamaniuc <ni...@users.noreply.github.com>
Authored: Thu Nov 10 11:15:25 2016 -0500
Committer: GitHub <no...@github.com>
Committed: Thu Nov 10 11:15:25 2016 -0500

----------------------------------------------------------------------
 src/couch_replicator_api_wrap.erl      |  46 ++++++++-
 src/couch_replicator_doc_processor.erl | 139 ++++++++++++++++++++++++----
 src/couch_replicator_docs.erl          |  63 +++++++++++--
 src/couch_replicator_scheduler.erl     |   6 ++
 src/couch_replicator_scheduler_job.erl |   8 +-
 5 files changed, 235 insertions(+), 27 deletions(-)
----------------------------------------------------------------------



[19/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Merge pull request #99 from cloudant/63012-scheduler-simplify-doc-processor

Simplify replicator doc processor logic

Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/1ad72e34
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/1ad72e34
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/1ad72e34

Branch: refs/heads/63012-scheduler
Commit: 1ad72e346c045a649574e27b949d8ba374b7e2c5
Parents: 13ea06b 525e83e
Author: Nick Vatamaniuc <ni...@users.noreply.github.com>
Authored: Thu Nov 3 17:13:30 2016 -0400
Committer: GitHub <no...@github.com>
Committed: Thu Nov 3 17:13:30 2016 -0400

----------------------------------------------------------------------
 src/couch_replicator_doc_processor.erl | 67 ++++++++++++-----------------
 1 file changed, 28 insertions(+), 39 deletions(-)
----------------------------------------------------------------------



[41/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Use mem3 in couch_multidb_changes to discover _replicator shards

This is a forward-port of a corresponding commit in master:

"Use mem3 to discover all _replicator shards in replicator manager"

https://github.com/apache/couchdb-couch-replicator/commit/b281d2bb320ed6e6d8226765315a40637ba91a46

This wasn't a direct merge as replicator shard discovery and traversal is slightly
different.

`couch_multidb_changes` is more generic and takes a db suffix and callback
module. So `<<"_replicator">>` is not hard-coded in multidb changes module.

`couch_replicator_manager` handles local `_replicator` db by directly
creating it and launching a changes feed for it. In the scheduling replicator
creation is separate from monitoring. The logic is handled in `scan_all_dbs`
function where first thing it always checks if there is a local db present
matching the suffix, if so a `{resume_scan, DbName}` is sent to main process.
Due to supervisor order by the time that code runs a local replicator db
will be created already.

COUCHDB-3277


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/fb77cbc4
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/fb77cbc4
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/fb77cbc4

Branch: refs/heads/63012-scheduler
Commit: fb77cbc463caa573a51f971243a5cb18ee8b2e9a
Parents: c813d0e
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Wed Jan 25 01:28:18 2017 -0500
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Wed Jan 25 01:28:18 2017 -0500

----------------------------------------------------------------------
 src/couch_multidb_changes.erl | 169 ++++++++++++++++++++++++-------------
 1 file changed, 111 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/fb77cbc4/src/couch_multidb_changes.erl
----------------------------------------------------------------------
diff --git a/src/couch_multidb_changes.erl b/src/couch_multidb_changes.erl
index b9c9ad5..af85a78 100644
--- a/src/couch_multidb_changes.erl
+++ b/src/couch_multidb_changes.erl
@@ -21,6 +21,7 @@
 -export([changes_reader/3, changes_reader_cb/3]).
 
 -include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/include/mem3.hrl").
 
 -define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}).
 
@@ -249,34 +250,44 @@ changes_reader_cb(_, _, Acc) ->
 
 
 scan_all_dbs(Server, DbSuffix) when is_pid(Server) ->
-    Root = config:get("couchdb", "database_dir", "."),
-    NormRoot = couch_util:normpath(Root),
-    Pat = io_lib:format("~s(\\.[0-9]{10,})?.couch$", [DbSuffix]),
-    filelib:fold_files(Root, lists:flatten(Pat), true,
-        fun(Filename, Acc) ->
-            % shamelessly stolen from couch_server.erl
-            NormFilename = couch_util:normpath(Filename),
-            case NormFilename -- NormRoot of
-                [$/ | RelativeFilename] -> ok;
-                RelativeFilename -> ok
-            end,
-            DbName = ?l2b(filename:rootname(RelativeFilename, ".couch")),
-            Jitter = jitter(Acc),
-            spawn_link(fun() ->
-                timer:sleep(Jitter),
-                gen_server:cast(Server, {resume_scan, DbName})
-            end),
-            Acc + 1
-        end, 1).
+    ok = scan_local_db(Server, DbSuffix),
+    {ok, Db} = mem3_util:ensure_exists(
+        config:get("mem3", "shards_db", "_dbs")),
+    ChangesFun = couch_changes:handle_changes(#changes_args{}, nil, Db, nil),
+    ChangesFun(fun({change, {Change}, _}, _) ->
+        DbName = couch_util:get_value(<<"id">>, Change),
+        case DbName of <<"_design/", _/binary>> -> ok; _Else ->
+            case couch_replicator_utils:is_deleted(Change) of
+            true ->
+                ok;
+            false ->
+                [gen_server:cast(Server, {resume_scan, ShardName})
+                    || ShardName <- filter_shards(DbName, DbSuffix)],
+                ok
+            end
+        end;
+        (_, _) -> ok
+    end),
+    couch_db:close(Db).
+
+
+filter_shards(DbName, DbSuffix) ->
+    case DbSuffix =:= couch_db:dbname_suffix(DbName) of
+    false ->
+        [];
+    true ->
+        [ShardName || #shard{name = ShardName} <- mem3:local_shards(DbName)]
+    end.
 
 
-% calculate random delay proportional to the number of replications
-% on current node, in order to prevent a stampede:
-%   - when a source with multiple replication targets fails
-%   - when we restart couch_replication_manager
-jitter(N) ->
-    Range = min(2 * N * ?AVG_DELAY_MSEC, ?MAX_DELAY_MSEC),
-    random:uniform(Range).
+scan_local_db(Server, DbSuffix) when is_pid(Server) ->
+    case couch_db:open_int(DbSuffix, [?CTX, sys_db, nologifmissing]) of
+        {ok, Db} ->
+            gen_server:cast(Server, {resume_scan, DbSuffix}),
+            ok = couch_db:close(Db);
+        _Error ->
+            ok
+    end.
 
 
 is_design_doc({Change}) ->
@@ -336,7 +347,6 @@ couch_multidb_changes_test_() ->
             t_handle_call_resume_scan_no_chfeed_ets_entry(),
             t_start_link(),
             t_start_link_no_ddocs(),
-            t_scanner_finds_shard(),
             t_misc_gen_server_callbacks()
         ]
     }.
@@ -345,7 +355,15 @@ setup() ->
     mock_logs(),
     mock_callback_mod(),
     meck:expect(couch_event, register_all, 1, ok),
-    meck:expect(config, get, fun("couchdb", "database_dir", _) -> ?TEMPDIR end),
+    meck:expect(config, get, ["mem3", "shards_db", '_'], "_dbs"),
+    meck:expect(mem3_util, ensure_exists, 1, {ok, dbs}),
+    ChangesFun = meck:val(fun(_) -> ok end),
+    meck:expect(couch_changes, handle_changes, 4, ChangesFun),
+    meck:expect(couch_db, open_int,
+        fun(?DBNAME, [?CTX, sys_db]) -> {ok, db};
+            (_, _) -> {not_found, no_db_file}
+        end),
+    meck:expect(couch_db, close, 1, ok),
     mock_changes_reader(),
     % create process to stand in for couch_ever_server
     % mocking erlang:monitor doesn't, so give it real process to monitor
@@ -357,8 +375,7 @@ setup() ->
 teardown(EvtPid) ->
     unlink(EvtPid),
     exit(EvtPid, kill),
-    meck:unload(),
-    delete_shard_file(?DBNAME).
+    meck:unload().
 
 
 t_handle_call_change() ->
@@ -648,15 +665,6 @@ t_start_link_no_ddocs() ->
         exit(Pid, kill)
     end).
 
-t_scanner_finds_shard() ->
-    ?_test(begin
-        ok = create_shard_file(?DBNAME),
-        {ok, Pid} = start_link(?SUFFIX, ?MOD, zig, []),
-        ok = meck:wait(?MOD, db_found, [?DBNAME, zig], 2000),
-        unlink(Pid),
-        exit(Pid, kill)
-    end).
-
 t_misc_gen_server_callbacks() ->
     ?_test(begin
         ?assertEqual(ok, terminate(reason, state)),
@@ -664,6 +672,70 @@ t_misc_gen_server_callbacks() ->
     end).
 
 
+scan_dbs_test_() ->
+{
+      foreach,
+      fun() -> test_util:start_couch([mem3, fabric]) end,
+      fun(Ctx) -> test_util:stop_couch(Ctx) end,
+      [
+          t_pass_shard(),
+          t_fail_shard(),
+          t_pass_local(),
+          t_fail_local()
+     ]
+}.
+
+
+t_pass_shard() ->
+    ?_test(begin
+        DbName0 = ?tempdb(),
+        DbSuffix = <<"_replicator">>,
+        DbName = <<DbName0/binary, "/", DbSuffix/binary>>,
+        ok = fabric:create_db(DbName, [?CTX]),
+        ?assertEqual(8, length(filter_shards(DbName, DbSuffix))),
+        fabric:delete_db(DbName, [?CTX])
+    end).
+
+
+t_fail_shard() ->
+    ?_test(begin
+        DbName = ?tempdb(),
+        ok = fabric:create_db(DbName, [?CTX]),
+        ?assertEqual([], filter_shards(DbName, <<"_replicator">>)),
+        fabric:delete_db(DbName, [?CTX])
+    end).
+
+
+t_pass_local() ->
+    ?_test(begin
+        LocalDb = ?tempdb(),
+        {ok, Db} = couch_db:create(LocalDb, [?CTX]),
+        ok = couch_db:close(Db),
+        scan_local_db(self(), LocalDb),
+        receive
+            {'$gen_cast', Msg} ->
+                ?assertEqual(Msg, {resume_scan, LocalDb})
+        after 0 ->
+                ?assert(false)
+        end
+    end).
+
+
+t_fail_local() ->
+ ?_test(begin
+        LocalDb = ?tempdb(),
+        {ok, Db} = couch_db:create(LocalDb, [?CTX]),
+        ok = couch_db:close(Db),
+        scan_local_db(self(), <<"some_other_db">>),
+        receive
+            {'$gen_cast', Msg} ->
+                ?assertNotEqual(Msg, {resume_scan, LocalDb})
+        after 0 ->
+                ?assert(true)
+        end
+    end).
+
+
 % Test helper functions
 
 mock_logs() ->
@@ -699,7 +771,6 @@ kill_mock_change_reader_and_get_its_args(Pid) ->
     end.
 
 mock_changes_reader() ->
-    meck:expect(couch_db, open_int, 2, {ok, db}),
     meck:expect(couch_changes, handle_db_changes,
         fun(_ChArgs, _Req, db) ->
             fun mock_changes_reader_loop/1
@@ -734,24 +805,6 @@ change_row(Id) when is_binary(Id) ->
         {doc, {[{<<"_id">>, Id}, {<<"_rev">>, <<"1-f00">>}]}}
     ]}.
 
-shard_fname(DbName) ->
-    filename:join([?TEMPDIR, binary_to_list(DbName) ++ ".couch"]).
-
-delete_shard_file(DbName) ->
-    Path = shard_fname(DbName),
-    case filelib:is_file(Path) of
-        true ->
-            ok = file:delete(Path);
-        false ->
-            ok
-    end.
-
-create_shard_file(DbName) ->
-    Path = shard_fname(DbName),
-    ok = filelib:ensure_dir(Path),
-    ok = file:write_file(Path, <<>>).
-
-
 handle_call_ok(Msg, State) ->
     ?assertMatch({reply, ok, _}, handle_call(Msg, from, State)).
 


[15/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Merge pull request #95 from cloudant/63012-scheduler-do-not-retry-for-too-long

Make sure jobs do not retry HTTP requests for too long

Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/4e9b956a
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/4e9b956a
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/4e9b956a

Branch: refs/heads/63012-scheduler
Commit: 4e9b956a2bfd2c90c579d53b0b9f1102379067e5
Parents: 9584fcd c40b5c2
Author: Nick Vatamaniuc <ni...@users.noreply.github.com>
Authored: Fri Oct 28 12:10:48 2016 -0400
Committer: GitHub <no...@github.com>
Committed: Fri Oct 28 12:10:48 2016 -0400

----------------------------------------------------------------------
 src/couch_replicator_api_wrap.hrl |  3 ++-
 src/couch_replicator_httpc.erl    | 44 +++++++++++++++++++++++++++++-----
 2 files changed, 40 insertions(+), 7 deletions(-)
----------------------------------------------------------------------



[37/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Make sure doc processor workers do not re-add deleted replication jobs

Previously, especially in case of filtered replications, doc processor workers
could inadvertently re-add a replication job after it was deleted.

Workers after finishing fetching filter code and computing the replication
id, would try to add the replication job to the scheduler. They did that without
checking if replication document was already deleted, or another worker
was spawned.

The fix is to create a unique worker reference, pass it to the
worker, then worker confirms they are still current and document was not
deleted before adding the job, otherwise they exit with an `ignore` result.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/700a9295
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/700a9295
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/700a9295

Branch: refs/heads/63012-scheduler
Commit: 700a9295590d789b33c34999c75a4410e22eb43d
Parents: f2b3ac5
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Wed Dec 7 20:51:45 2016 -0500
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Wed Jan 11 15:00:23 2017 -0500

----------------------------------------------------------------------
 src/couch_replicator.hrl                      |  2 +
 src/couch_replicator_doc_processor.erl        | 48 +++++++++--
 src/couch_replicator_doc_processor_worker.erl | 94 +++++++++++++++-------
 3 files changed, 107 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/700a9295/src/couch_replicator.hrl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.hrl b/src/couch_replicator.hrl
index 339e162..b8669e8 100644
--- a/src/couch_replicator.hrl
+++ b/src/couch_replicator.hrl
@@ -30,11 +30,13 @@
 -type seconds() :: non_neg_integer().
 -type rep_start_result() ::
     {ok, rep_id()} |
+    ignore |
     {temporary_error, binary()} |
     {permanent_failure, binary()}.
 
 
 -record(doc_worker_result, {
     id :: db_doc_id(),
+    wref :: reference(),
     result :: rep_start_result()
 }).

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/700a9295/src/couch_replicator_doc_processor.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_doc_processor.erl b/src/couch_replicator_doc_processor.erl
index 402a72f..9c2e2b3 100644
--- a/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator_doc_processor.erl
@@ -16,6 +16,7 @@
 -export([start_link/0]).
 -export([docs/1, doc/2]).
 -export([update_docs/0]).
+-export([get_worker_ref/1]).
 
 % multidb changes callback
 -export([db_created/2, db_deleted/2, db_found/2, db_change/3]).
@@ -88,6 +89,18 @@ db_change(DbName, {ChangeProps} = Change, Server) ->
     Server.
 
 
+-spec get_worker_ref(db_doc_id()) -> reference() | nil.
+get_worker_ref({DbName, DocId}) when is_binary(DbName), is_binary(DocId) ->
+    case ets:lookup(?MODULE, {DbName, DocId}) of
+        [#rdoc{worker = WRef}] when is_reference(WRef) ->
+            WRef;
+        [#rdoc{worker = nil}] ->
+            nil;
+        [] ->
+            nil
+    end.
+
+
 % Private helpers for multidb changes API, these updates into the doc
 % processor gen_server
 
@@ -203,8 +216,8 @@ handle_cast(Msg, State) ->
     {stop, {error, unexpected_message, Msg}, State}.
 
 
-handle_info({'DOWN', Ref, _, _, #doc_worker_result{id = Id, result = Res}},
-        State) ->
+handle_info({'DOWN', _, _, _, #doc_worker_result{id = Id, wref = Ref,
+        result = Res}}, State) ->
     ok = worker_returned(Ref, Id, Res),
     {noreply, State};
 
@@ -324,6 +337,9 @@ worker_returned(Ref, Id, {ok, RepId}) ->
     end,
     ok;
 
+worker_returned(_Ref, _Id, ignore) ->
+    ok;
+
 worker_returned(Ref, Id, {temporary_error, Reason}) ->
     case ets:lookup(?MODULE, Id) of
     [#rdoc{worker = Ref, errcnt = ErrCnt} = Row] ->
@@ -413,8 +429,9 @@ maybe_start_worker(Id) ->
         ok;
     [#rdoc{rep = Rep} = Doc] ->
         Wait = get_worker_wait(Doc),
-        WRef = couch_replicator_doc_processor_worker:spawn_worker(Id, Rep, Wait),
-        true = ets:insert(?MODULE, Doc#rdoc{worker = WRef}),
+        Ref = make_ref(),
+        true = ets:insert(?MODULE, Doc#rdoc{worker = Ref}),
+        couch_replicator_doc_processor_worker:spawn_worker(Id, Rep, Wait, Ref),
         ok
     end.
 
@@ -773,6 +790,22 @@ normalize_rep_test_() ->
     }.
 
 
+get_worker_ref_test_() ->
+    {
+        setup,
+        fun() -> ets:new(?MODULE, [named_table, public, {keypos, #rdoc.id}]) end,
+        fun(_) -> ets:delete(?MODULE) end,
+        ?_test(begin
+            Id = {<<"db">>, <<"doc">>},
+            ?assertEqual(nil, get_worker_ref(Id)),
+            ets:insert(?MODULE, #rdoc{id = Id, worker = nil}),
+            ?assertEqual(nil, get_worker_ref(Id)),
+            Ref = make_ref(),
+            ets:insert(?MODULE, #rdoc{id = Id, worker = Ref}),
+            ?assertEqual(Ref, get_worker_ref(Id))
+        end)
+    }.
+
 
 % Test helper functions
 
@@ -786,7 +819,7 @@ setup() ->
     meck:expect(config, listen_for_changes, 2, ok),
     meck:expect(couch_replicator_clustering, owner, 2, node()),
     meck:expect(couch_replicator_clustering, link_cluster_event_listener, 1, ok),
-    meck:expect(couch_replicator_doc_processor_worker, spawn_worker, 3, wref),
+    meck:expect(couch_replicator_doc_processor_worker, spawn_worker, 4, pid),
     meck:expect(couch_replicator_scheduler, remove_job, 1, ok),
     meck:expect(couch_replicator_docs, remove_state_fields, 2, ok),
     meck:expect(couch_replicator_docs, update_failed, 4, ok),
@@ -804,9 +837,8 @@ removed_state_fields() ->
     meck:called(couch_replicator_docs, remove_state_fields, [?DB, ?DOC1]).
 
 
-started_worker(Id) ->
-    meck:called(couch_replicator_doc_processor_worker, spawn_worker,
-        [Id, '_', '_']).
+started_worker(_Id) ->
+    1 == meck:num_calls(couch_replicator_doc_processor_worker, spawn_worker, 4).
 
 
 removed_job(Id) ->

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/700a9295/src/couch_replicator_doc_processor_worker.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_doc_processor_worker.erl b/src/couch_replicator_doc_processor_worker.erl
index a6bdeef..30a6988 100644
--- a/src/couch_replicator_doc_processor_worker.erl
+++ b/src/couch_replicator_doc_processor_worker.erl
@@ -12,7 +12,7 @@
 
 -module(couch_replicator_doc_processor_worker).
 
--export([spawn_worker/3]).
+-export([spawn_worker/4]).
 
 -include("couch_replicator.hrl").
 
@@ -27,19 +27,19 @@
 % a worker will then exit with the #doc_worker_result{} record within
 % ?WORKER_TIMEOUT_MSEC timeout period.A timeout is considered a `temporary_error`.
 % Result will be sent as the `Reason` in the {'DOWN',...} message.
--spec spawn_worker(db_doc_id(), #rep{}, seconds()) -> reference().
-spawn_worker(Id, Rep, WaitSec) ->
-    {_Pid, WRef} = spawn_monitor(fun() -> worker_fun(Id, Rep, WaitSec) end),
-    WRef.
+-spec spawn_worker(db_doc_id(), #rep{}, seconds(), reference()) -> pid().
+spawn_worker(Id, Rep, WaitSec, WRef) ->
+    {Pid, _Ref} = spawn_monitor(fun() -> worker_fun(Id, Rep, WaitSec, WRef) end),
+    Pid.
 
 
 % Private functions
 
--spec worker_fun(db_doc_id(), #rep{}, seconds()) -> no_return().
-worker_fun(Id, Rep, WaitSec) ->
+-spec worker_fun(db_doc_id(), #rep{}, seconds(), reference()) -> no_return().
+worker_fun(Id, Rep, WaitSec, WRef) ->
     timer:sleep(WaitSec * 1000),
     Fun = fun() ->
-        try maybe_start_replication(Id, Rep) of
+        try maybe_start_replication(Id, Rep, WRef) of
             Res ->
                 exit(Res)
         catch
@@ -52,7 +52,7 @@ worker_fun(Id, Rep, WaitSec) ->
     {Pid, Ref} = spawn_monitor(Fun),
     receive
         {'DOWN', Ref, _, Pid, Result} ->
-            exit(#doc_worker_result{id = Id, result = Result})
+            exit(#doc_worker_result{id = Id, wref = WRef, result = Result})
     after ?WORKER_TIMEOUT_MSEC ->
         erlang:demonitor(Ref, [flush]),
         exit(Pid, kill),
@@ -61,7 +61,7 @@ worker_fun(Id, Rep, WaitSec) ->
         Msg = io_lib:format("Replication for db ~p doc ~p failed to start due "
             "to timeout after ~B seconds", [DbName, DocId, TimeoutSec]),
         Result = {temporary_error, couch_util:to_binary(Msg)},
-        exit(#doc_worker_result{id = Id, result = Result})
+        exit(#doc_worker_result{id = Id, wref = WRef, result = Result})
     end.
 
 
@@ -69,9 +69,11 @@ worker_fun(Id, Rep, WaitSec) ->
 % rep_start_result(), also throws {filter_fetch_error, Reason} if cannot fetch filter.
 % It can also block for an indeterminate amount of time while fetching the
 % filter.
-maybe_start_replication(Id, RepWithoutId) ->
+maybe_start_replication(Id, RepWithoutId, WRef) ->
     Rep = couch_replicator_docs:update_rep_id(RepWithoutId),
-    case maybe_add_job_to_scheduler(Id, Rep) of
+    case maybe_add_job_to_scheduler(Id, Rep, WRef) of
+    ignore ->
+        ignore;
     {ok, RepId} ->
         {ok, RepId};
     {temporary_error, Reason} ->
@@ -84,18 +86,23 @@ maybe_start_replication(Id, RepWithoutId) ->
     end.
 
 
--spec maybe_add_job_to_scheduler(db_doc_id(), #rep{}) -> rep_start_result().
-maybe_add_job_to_scheduler({_DbName, DocId}, Rep) ->
+-spec maybe_add_job_to_scheduler(db_doc_id(), #rep{}, reference()) ->
+   rep_start_result().
+maybe_add_job_to_scheduler({DbName, DocId}, Rep, WRef) ->
     RepId = Rep#rep.id,
     case couch_replicator_scheduler:rep_state(RepId) of
     nil ->
-        case couch_replicator_scheduler:add_job(Rep) of
-        ok ->
-           ok;
-        {error, already_added} ->
-            couch_log:warning("replicator scheduler: ~p was already added", [Rep])
-        end,
-        {ok, RepId};
+        % Before adding a job check that this worker is still the current
+        % worker. This is to handle a race condition where a worker which was
+        % sleeping and then checking a replication filter may inadvertently re-add
+        % a replication which was already deleted.
+        case couch_replicator_doc_processor:get_worker_ref({DbName, DocId}) of
+        WRef ->
+            ok = couch_replicator_scheduler:add_job(Rep),
+            {ok, RepId};
+        _NilOrOtherWRef ->
+            ignore
+        end;
     #rep{doc_id = DocId} ->
         {ok, RepId};
     #rep{doc_id = null} ->
@@ -130,7 +137,9 @@ doc_processor_worker_test_() ->
             t_already_running_same_docid(),
             t_already_running_transient(),
             t_already_running_other_db_other_doc(),
-            t_spawn_worker()
+            t_spawn_worker(),
+            t_ignore_if_doc_deleted(),
+            t_ignore_if_worker_ref_does_not_match()
         ]
     }.
 
@@ -140,7 +149,7 @@ t_should_add_job() ->
    ?_test(begin
        Id = {?DB, ?DOC1},
        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
-       ?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep)),
+       ?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)),
        ?assert(added_job())
    end).
 
@@ -151,7 +160,7 @@ t_already_running_same_docid() ->
        Id = {?DB, ?DOC1},
        mock_already_running(?DB, ?DOC1),
        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
-       ?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep)),
+       ?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)),
        ?assert(did_not_add_job())
    end).
 
@@ -162,7 +171,7 @@ t_already_running_transient() ->
        Id = {?DB, ?DOC1},
        mock_already_running(null, null),
        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
-       ?assertMatch({temporary_error, _}, maybe_start_replication(Id, Rep)),
+       ?assertMatch({temporary_error, _}, maybe_start_replication(Id, Rep, nil)),
        ?assert(did_not_add_job())
    end).
 
@@ -174,7 +183,7 @@ t_already_running_other_db_other_doc() ->
        Id = {?DB, ?DOC1},
        mock_already_running(<<"otherdb">>, <<"otherdoc">>),
        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
-       ?assertMatch({permanent_failure, _}, maybe_start_replication(Id, Rep)),
+       ?assertMatch({permanent_failure, _}, maybe_start_replication(Id, Rep, nil)),
        ?assert(did_not_add_job()),
        1 == meck:num_calls(couch_replicator_docs, update_failed, '_')
    end).
@@ -185,15 +194,41 @@ t_spawn_worker() ->
    ?_test(begin
        Id = {?DB, ?DOC1},
        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
-       Ref = spawn_worker(Id, Rep, 0),
-       Res = receive  {'DOWN', Ref, _, _, Reason} -> Reason
+       WRef = make_ref(),
+       meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, WRef),
+       Pid = spawn_worker(Id, Rep, 0, WRef),
+       Res = receive  {'DOWN', _Ref, process, Pid, Reason} -> Reason
            after 1000 -> timeout end,
-       Expect = #doc_worker_result{id = Id, result = {ok, ?R1}},
+       Expect = #doc_worker_result{id = Id, wref = WRef, result = {ok, ?R1}},
        ?assertEqual(Expect, Res),
        ?assert(added_job())
    end).
 
 
+% Should not add job if by the time worker got to fetching the filter
+% and getting a replication id, replication doc was deleted
+t_ignore_if_doc_deleted() ->
+   ?_test(begin
+       Id = {?DB, ?DOC1},
+       Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+       meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, nil),
+       ?assertEqual(ignore, maybe_start_replication(Id, Rep, make_ref())),
+       ?assertNot(added_job())
+   end).
+
+
+% Should not add job if by the time worker got to fetchign the filter
+% and building a replication id, another worker was spawned.
+t_ignore_if_worker_ref_does_not_match() ->
+    ?_test(begin
+       Id = {?DB, ?DOC1},
+       Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+       meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, make_ref()),
+       ?assertEqual(ignore, maybe_start_replication(Id, Rep, make_ref())),
+       ?assertNot(added_job())
+   end).
+
+
 % Test helper functions
 
 setup() ->
@@ -202,6 +237,7 @@ setup() ->
     meck:expect(couch_server, get_uuid, 0, this_is_snek),
     meck:expect(couch_replicator_docs, update_failed, 4, ok),
     meck:expect(couch_replicator_scheduler, rep_state, 1, nil),
+    meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, nil),
     ok.
 
 


[29/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Share replicator connections by proxy

Prior to this commit, proxied and unproxied replication connections
would be shared based on their source or target. This caused a bug where
a proxied connection could function as an unproxied connection if an
idle unproxied connection was shared with it.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/58ddc265
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/58ddc265
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/58ddc265

Branch: refs/heads/63012-scheduler
Commit: 58ddc26531789320e00d497c4fa3b95dea618d36
Parents: 1413a6f
Author: Benjamin Bastian <be...@gmail.com>
Authored: Mon Nov 21 09:57:30 2016 -0800
Committer: Benjamin Bastian <be...@gmail.com>
Committed: Wed Nov 23 13:49:52 2016 -0800

----------------------------------------------------------------------
 src/couch_replicator_api_wrap.hrl   |  3 ++-
 src/couch_replicator_connection.erl |  7 ++++++-
 src/couch_replicator_docs.erl       | 32 ++++++++++++++++++--------------
 src/couch_replicator_httpc.erl      | 14 ++++++++++++--
 src/couch_replicator_scheduler.erl  |  9 ++++++++-
 5 files changed, 46 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/58ddc265/src/couch_replicator_api_wrap.hrl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_api_wrap.hrl b/src/couch_replicator_api_wrap.hrl
index d15d214..fc94054 100644
--- a/src/couch_replicator_api_wrap.hrl
+++ b/src/couch_replicator_api_wrap.hrl
@@ -25,7 +25,8 @@
     wait = 250,         % milliseconds
     httpc_pool = nil,
     http_connections,
-    first_error_timestamp = nil
+    first_error_timestamp = nil,
+    proxy_url
 }).
 
 -record(oauth, {

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/58ddc265/src/couch_replicator_connection.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_connection.erl b/src/couch_replicator_connection.erl
index b776624..e580663 100644
--- a/src/couch_replicator_connection.erl
+++ b/src/couch_replicator_connection.erl
@@ -55,6 +55,9 @@ init([]) ->
     {ok, #state{close_interval=Interval, timer=Timer}}.
 
 
+acquire(URL) when is_binary(URL) ->
+    acquire(binary_to_list(URL));
+
 acquire(URL) ->
     case gen_server:call(?MODULE, {acquire, URL}) of
         {ok, Worker} ->
@@ -85,7 +88,9 @@ handle_call({acquire, URL}, From, State) ->
                     couch_stats:increment_counter([couch_replicator, connection, acquires]),
                     ets:insert(?MODULE, Worker#connection{mref=monitor(process, Pid)}),
                     {reply, {ok, Worker#connection.worker}, State}
-            end
+            end;
+        {error, invalid_uri} ->
+            {reply, {error, invalid_uri}, State}
     end;
 
 handle_call({create, URL, Worker}, From, State) ->

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/58ddc265/src/couch_replicator_docs.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_docs.erl b/src/couch_replicator_docs.erl
index 07344bb..204cb39 100644
--- a/src/couch_replicator_docs.erl
+++ b/src/couch_replicator_docs.erl
@@ -259,17 +259,15 @@ parse_rep_doc(Doc, UserCtx) ->
 
 -spec parse_rep_doc_without_id({[_]}, #user_ctx{}) -> {ok, #rep{}}.
 parse_rep_doc_without_id({Props}, UserCtx) ->
-    ProxyParams = parse_proxy_params(get_value(<<"proxy">>, Props, <<>>)),
+    Proxy = get_value(<<"proxy">>, Props, <<>>),
     Opts = make_options(Props),
     case get_value(cancel, Opts, false) andalso
         (get_value(id, Opts, nil) =/= nil) of
     true ->
         {ok, #rep{options = Opts, user_ctx = UserCtx}};
     false ->
-        Source = parse_rep_db(get_value(<<"source">>, Props),
-                              ProxyParams, Opts),
-        Target = parse_rep_db(get_value(<<"target">>, Props),
-                              ProxyParams, Opts),
+        Source = parse_rep_db(get_value(<<"source">>, Props), Proxy, Opts),
+        Target = parse_rep_db(get_value(<<"target">>, Props), Proxy, Opts),
         {Type, View} = case couch_replicator_filters:view_type(Props, Opts) of
         {error, Error} ->
             throw({bad_request, Error});
@@ -380,8 +378,13 @@ rep_user_ctx({RepDoc}) ->
         }
     end.
 
--spec parse_rep_db({[_]} | binary(), [_], [_]) -> #httpd{} | binary().
-parse_rep_db({Props}, ProxyParams, Options) ->
+-spec parse_rep_db({[_]} | binary(), binary(), [_]) -> #httpd{} | binary().
+parse_rep_db({Props}, Proxy, Options) ->
+    ProxyParams = parse_proxy_params(Proxy),
+    ProxyURL = case ProxyParams of
+        [] -> undefined;
+        _ -> binary_to_list(Proxy)
+    end,
     Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)),
     {AuthProps} = get_value(<<"auth">>, Props, {[]}),
     {BinHeaders} = get_value(<<"headers">>, Props, {[]}),
@@ -414,15 +417,16 @@ parse_rep_db({Props}, ProxyParams, Options) ->
                 ProxyParams ++ ssl_params(Url)]),
         timeout = get_value(connection_timeout, Options),
         http_connections = get_value(http_connections, Options),
-        retries = get_value(retries, Options)
+        retries = get_value(retries, Options),
+        proxy_url = ProxyURL
     };
-parse_rep_db(<<"http://", _/binary>> = Url, ProxyParams, Options) ->
-    parse_rep_db({[{<<"url">>, Url}]}, ProxyParams, Options);
-parse_rep_db(<<"https://", _/binary>> = Url, ProxyParams, Options) ->
-    parse_rep_db({[{<<"url">>, Url}]}, ProxyParams, Options);
-parse_rep_db(<<DbName/binary>>, _ProxyParams, _Options) ->
+parse_rep_db(<<"http://", _/binary>> = Url, Proxy, Options) ->
+    parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options);
+parse_rep_db(<<"https://", _/binary>> = Url, Proxy, Options) ->
+    parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options);
+parse_rep_db(<<DbName/binary>>, _Proxy, _Options) ->
     DbName;
-parse_rep_db(undefined, _ProxyParams, _Options) ->
+parse_rep_db(undefined, _Proxy, _Options) ->
     throw({error, <<"Missing replicator database">>}).
 
 

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/58ddc265/src/couch_replicator_httpc.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_httpc.erl b/src/couch_replicator_httpc.erl
index 5fb7842..58fb0e1 100644
--- a/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator_httpc.erl
@@ -40,8 +40,18 @@
 -define(MAX_DISCARDED_MESSAGES, 16).
 
 
-setup(#httpdb{httpc_pool = nil, url = Url, http_connections = MaxConns} = Db) ->
-    {ok, Pid} = couch_replicator_httpc_pool:start_link(Url, [{max_connections, MaxConns}]),
+setup(Db) ->
+    #httpdb{
+        httpc_pool = nil,
+        url = Url,
+        http_connections = MaxConns,
+        proxy_url = ProxyURL
+    } = Db,
+    HttpcURL = case ProxyURL of
+        undefined -> Url;
+        _ when is_list(ProxyURL) -> ProxyURL
+    end,
+    {ok, Pid} = couch_replicator_httpc_pool:start_link(HttpcURL, [{max_connections, MaxConns}]),
     {ok, Db#httpdb{httpc_pool = Pid}}.
 
 

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/58ddc265/src/couch_replicator_scheduler.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_scheduler.erl b/src/couch_replicator_scheduler.erl
index 440458e..f08e829 100644
--- a/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator_scheduler.erl
@@ -119,6 +119,12 @@ job_summary(JobId, HealthThreshold) ->
                 {Pid, ErrorCount} when is_pid(Pid) ->
                      {running, null}
             end,
+            StrippedProxyURL = case (Rep#rep.source)#httpdb.proxy_url of
+                undefined ->
+                    null;
+                ProxyURL when is_list(ProxyURL) ->
+                    list_to_binary(couch_util:url_strip_password(ProxyURL))
+            end,
             [
                 {source, iolist_to_binary(ejson_url(Rep#rep.source))},
                 {target, iolist_to_binary(ejson_url(Rep#rep.target))},
@@ -126,7 +132,8 @@ job_summary(JobId, HealthThreshold) ->
                 {info, Info},
                 {error_count, ErrorCount},
                 {last_updated, last_updated(History)},
-                {start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)}
+                {start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)},
+                {proxy, StrippedProxyURL}
             ];
         {error, not_found} ->
             nil  % Job might have just completed


[10/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Fix _scheduler/docs filtering by database name.

mem3:dbname() transformation needs to be done in showroom endpoint, doing it
too early prevents filtering by user in showroom and so docs ends up "missing"
in the query output.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/bcebf2c7
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/bcebf2c7
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/bcebf2c7

Branch: refs/heads/63012-scheduler
Commit: bcebf2c7f63acd6864a3b1bde2bd6577f4e0acad
Parents: 1aa3c3d
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Wed Oct 26 12:04:08 2016 -0400
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Wed Oct 26 12:04:08 2016 -0400

----------------------------------------------------------------------
 src/couch_replicator_doc_processor.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/bcebf2c7/src/couch_replicator_doc_processor.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_doc_processor.erl b/src/couch_replicator_doc_processor.erl
index 0349fa6..eadea5d 100644
--- a/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator_doc_processor.erl
@@ -437,7 +437,7 @@ ejson_doc(#rdoc{state = RepState} = RDoc, _HealthThreshold) ->
     } = RDoc,
     {[
         {doc_id, DocId},
-        {database, mem3:dbname(DbName)},
+        {database, DbName},
         {id, ejson_rep_id(RepId)},
         {state, RepState},
         {info, ejson_state_info(StateInfo)},


[33/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Fix job migration during membership change

Specifically jobs which are in error or crashing state now migrate properly.

Previously jobs migrated only when they checkpointed. Since jobs which are
crashing or trying to fetch their filter code do not checkpoint, they never
migrated and thus could result in duplicate jobs. (New jobs would start on
new node, but jobs would not stop on the old node).

This also simplifies code a bit - removed `no_owner` return value from job
ownership test. Since we do the  check in doc processor all jobs should
have a proper db and doc.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/99a2b908
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/99a2b908
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/99a2b908

Branch: refs/heads/63012-scheduler
Commit: 99a2b90834cc0ee27c763285c6bced51a4b37cfc
Parents: bc2f053
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Wed Dec 7 11:41:56 2016 -0500
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Wed Dec 7 11:41:56 2016 -0500

----------------------------------------------------------------------
 src/couch_replicator_clustering.erl    | 23 ++++++++++-----
 src/couch_replicator_db_changes.erl    | 13 +--------
 src/couch_replicator_doc_processor.erl | 43 ++++++++++++++++++++++++++++-
 src/couch_replicator_scheduler_job.erl | 22 +++++----------
 4 files changed, 66 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/99a2b908/src/couch_replicator_clustering.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_clustering.erl b/src/couch_replicator_clustering.erl
index 07b103e..4a6abd7 100644
--- a/src/couch_replicator_clustering.erl
+++ b/src/couch_replicator_clustering.erl
@@ -30,6 +30,7 @@
 
 % public API
 -export([start_link/0, owner/2, is_stable/0]).
+-export([link_cluster_event_listener/1]).
 
 % gen_server callbacks
 -export([init/1, handle_call/3, handle_info/2, handle_cast/2,
@@ -60,14 +61,10 @@ start_link() ->
 
 
 % owner/2 function computes ownership for a {DbName, DocId} tuple
-% Returns `no_owner` in case no DocId is null, `unstable` if cluster
-% is considered to be unstable i.e. it has changed recently, or returns
-% node() which is considered to be the owner.
+% `unstable` if cluster is considered to be unstable i.e. it has changed
+% recently, or returns node() which of the owner.
 %
--spec owner(Dbname :: binary(), DocId :: binary() | null) ->
-    node() | no_owner | unstable.
-owner(_DbName, null) ->
-    no_owner;
+-spec owner(Dbname :: binary(), DocId :: binary()) -> node() | unstable.
 owner(<<"shards/", _/binary>> = DbName, DocId) ->
     case is_stable() of
         false ->
@@ -84,6 +81,18 @@ is_stable() ->
     gen_server:call(?MODULE, is_stable).
 
 
+% Convenience function for gen_servers to subscribe to {cluster, stable} and
+% {cluster, unstable} events from couch_replicator clustering module.
+-spec link_cluster_event_listener(pid()) -> pid().
+link_cluster_event_listener(GenServer) when is_pid(GenServer) ->
+    CallbackFun =
+        fun(Event = {cluster, _}) -> gen_server:cast(GenServer, Event);
+           (_) -> ok
+        end,
+    {ok, Pid} = couch_replicator_notifier:start_link(CallbackFun),
+    Pid.
+
+
 % gen_server callbacks
 
 init([]) ->

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/99a2b908/src/couch_replicator_db_changes.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_db_changes.erl b/src/couch_replicator_db_changes.erl
index 78ec069..924c24f 100644
--- a/src/couch_replicator_db_changes.erl
+++ b/src/couch_replicator_db_changes.erl
@@ -32,7 +32,7 @@ start_link() ->
 
 
 init([]) ->
-    EvtPid = start_link_cluster_event_listener(),
+    EvtPid = couch_replicator_clustering:link_cluster_event_listener(self()),
     State = #state{event_listener = EvtPid, mdb_changes = nil},
     case couch_replicator_clustering:is_stable() of
         true ->
@@ -88,14 +88,3 @@ stop_mdb_changes(#state{mdb_changes = Pid} = State) ->
     unlink(Pid),
     exit(Pid, kill),
     State#state{mdb_changes = nil}.
-
-
--spec start_link_cluster_event_listener() -> pid().
-start_link_cluster_event_listener() ->
-    Server = self(),
-    CallbackFun =
-        fun(Event = {cluster, _}) -> gen_server:cast(Server, Event);
-           (_) -> ok
-        end,
-    {ok, Pid} = couch_replicator_notifier:start_link(CallbackFun),
-    Pid.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/99a2b908/src/couch_replicator_doc_processor.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_doc_processor.erl b/src/couch_replicator_doc_processor.erl
index 6851542..035a7ec 100644
--- a/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator_doc_processor.erl
@@ -166,6 +166,7 @@ start_link() ->
 
 init([]) ->
     ?MODULE = ets:new(?MODULE, [ordered_set, named_table, {keypos, #rdoc.id}]),
+    couch_replicator_clustering:link_cluster_event_listener(self()),
     {ok, nil}.
 
 
@@ -189,6 +190,14 @@ handle_call({clean_up_replications, DbName}, _From, State) ->
     ok = removed_db(DbName),
     {reply, ok, State}.
 
+handle_cast({cluster, unstable}, State) ->
+    % Ignoring unstable state transition
+    {noreply, State};
+
+handle_cast({cluster, stable}, State) ->
+    % Membership changed recheck all the replication document ownership
+    nil = ets:foldl(fun cluster_membership_foldl/2, nil, ?MODULE),
+    {noreply, State};
 
 handle_cast(Msg, State) ->
     {stop, {error, unexpected_message, Msg}, State}.
@@ -530,6 +539,21 @@ ejson_doc_state_filter(State, States) when is_list(States), is_atom(State) ->
     lists:member(State, States).
 
 
+-spec cluster_membership_foldl(#rdoc{}, nil) -> nil.
+cluster_membership_foldl(#rdoc{id = {DbName, DocId} = Id, rid = RepId}, nil) ->
+    case couch_replicator_clustering:owner(DbName, DocId) of
+        unstable ->
+            nil;
+        ThisNode when ThisNode =:= node() ->
+            nil;
+        OtherNode ->
+            Msg = "Replication doc ~p:~p with id ~p usurped by node ~p",
+            couch_log:notice(Msg, [DbName, DocId, RepId, OtherNode]),
+            removed_doc(Id),
+            nil
+    end.
+
+
 -ifdef(TEST).
 
 -include_lib("eunit/include/eunit.hrl").
@@ -558,7 +582,8 @@ doc_processor_test_() ->
             t_failed_change(),
             t_change_for_different_node(),
             t_change_when_cluster_unstable(),
-            t_ejson_docs()
+            t_ejson_docs(),
+            t_cluster_membership_foldl()
         ]
     }.
 
@@ -707,6 +732,21 @@ t_ejson_docs() ->
     end).
 
 
+% Check that when cluster membership changes records from doc processor and job
+% scheduler get removed
+t_cluster_membership_foldl() ->
+   ?_test(begin
+        mock_existing_jobs_lookup([test_rep(?R1)]),
+        ?assertEqual(ok, process_change(?DB, change())),
+        meck:expect(couch_replicator_clustering, owner, 2, different_node),
+        ?assert(ets:member(?MODULE, {?DB, ?DOC1})),
+        gen_server:cast(?MODULE, {cluster, stable}),
+        timer:sleep(100),
+        ?assertNot(ets:member(?MODULE, {?DB, ?DOC1})),
+        ?assert(removed_job(?R1))
+   end).
+
+
 normalize_rep_test_() ->
     {
         setup,
@@ -745,6 +785,7 @@ setup() ->
     meck:expect(config, get, fun(_, _, Default) -> Default end),
     meck:expect(config, listen_for_changes, 2, ok),
     meck:expect(couch_replicator_clustering, owner, 2, node()),
+    meck:expect(couch_replicator_clustering, link_cluster_event_listener, 1, ok),
     meck:expect(couch_replicator_doc_processor_worker, spawn_worker, 3, wref),
     meck:expect(couch_replicator_scheduler, remove_job, 1, ok),
     meck:expect(couch_replicator_docs, remove_state_fields, 2, ok),

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/99a2b908/src/couch_replicator_scheduler_job.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_scheduler_job.erl b/src/couch_replicator_scheduler_job.erl
index 4dcecb4..1c9faaf 100644
--- a/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator_scheduler_job.erl
@@ -349,21 +349,13 @@ handle_cast({db_compacted, DbName},
     {noreply, State#rep_state{target = NewTarget}};
 
 handle_cast(checkpoint, State) ->
-    #rep_state{rep_details = #rep{db_name = DbName, doc_id = DocId} = Rep} = State,
-    case couch_replicator_clustering:owner(DbName, DocId) of
-    Owner when Owner =:= node(); Owner =:= no_owner; Owner =:= unstable ->
-        case do_checkpoint(State) of
-        {ok, NewState} ->
-            couch_stats:increment_counter([couch_replicator, checkpoints, success]),
-            {noreply, NewState#rep_state{timer = start_timer(State)}};
-        Error ->
-            couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
-            {stop, Error, State}
-        end;
-    Other when Other =/= node() ->
-        couch_log:notice("Replication `~s` usurped by ~s (triggered by `~s`)",
-            [pp_rep_id(Rep#rep.id), Other, DocId]),
-        {stop, shutdown, State}
+    case do_checkpoint(State) of
+    {ok, NewState} ->
+        couch_stats:increment_counter([couch_replicator, checkpoints, success]),
+        {noreply, NewState#rep_state{timer = start_timer(State)}};
+    Error ->
+        couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
+        {stop, Error, State}
     end;
 
 handle_cast({report_seq, Seq},


[27/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Merge pull request #102 from cloudant/63012-scheduler-update-docs

Rename compat_mode config flag to update_docs

Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/1413a6fb
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/1413a6fb
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/1413a6fb

Branch: refs/heads/63012-scheduler
Commit: 1413a6fbbd78288d5a6fef92ff08f59388a8ef19
Parents: 01fffff 2c2c87a
Author: Nick Vatamaniuc <ni...@users.noreply.github.com>
Authored: Wed Nov 23 12:56:58 2016 -0500
Committer: GitHub <no...@github.com>
Committed: Wed Nov 23 12:56:58 2016 -0500

----------------------------------------------------------------------
 src/couch_replicator_doc_processor.erl | 16 ++++++++--------
 src/couch_replicator_scheduler.erl     |  2 +-
 src/couch_replicator_scheduler_job.erl |  2 +-
 3 files changed, 10 insertions(+), 10 deletions(-)
----------------------------------------------------------------------



[49/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Add 'source' and 'target' fields to _scheduler/docs terminal states output

Previously terminal states from _scheduler/docs were different from others
as they did not have source and target data in the summary.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/419ea1d9
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/419ea1d9
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/419ea1d9

Branch: refs/heads/63012-scheduler
Commit: 419ea1d99afc89c7d783e8c9cd428b219e42628d
Parents: 0b51c1b
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Tue Mar 7 02:57:21 2017 -0500
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Tue Mar 7 02:57:21 2017 -0500

----------------------------------------------------------------------
 src/couch_replicator.erl              | 62 ++++++++++++++++++++++++++++--
 src/couch_replicator_docs.erl         |  2 +-
 src/couch_replicator_js_functions.hrl |  8 +++-
 3 files changed, 65 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/419ea1d9/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index 59fb292..f2b0d02 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -19,6 +19,7 @@
 
 -include_lib("couch/include/couch_db.hrl").
 -include("couch_replicator.hrl").
+-include("couch_replicator_api_wrap.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 
 -define(REPLICATION_STATES, [
@@ -197,13 +198,13 @@ handle_replicator_doc_query({row, Props}, {Db, Cb, UserAcc, States}) ->
     DocStateBin = couch_util:get_value(key, Props),
     DocState = erlang:binary_to_existing_atom(DocStateBin, utf8),
     MapValue = couch_util:get_value(value, Props),
-    {StartTime, StateTime, StateInfo} = case MapValue of
-        [StartT, StateT, Info] ->
-            {StartT, StateT, Info};
+    {Source, Target, StartTime, StateTime, StateInfo} = case MapValue of
+        [Src, Tgt, StartT, StateT, Info] ->
+            {Src, Tgt, StartT, StateT, Info};
         _Other ->
             % Handle the case where the view code was upgraded but new view code
             % wasn't updated yet (before a _scheduler/docs request was made)
-            {null, null, null}
+            {null, null, null, null, null}
     end,
     % Set the error_count to 1 if failed. This is mainly for consistency as
     % jobs from doc_processor and scheduler will have that value set
@@ -214,6 +215,8 @@ handle_replicator_doc_query({row, Props}, {Db, Cb, UserAcc, States}) ->
                 {doc_id, DocId},
                 {database, Db},
                 {id, null},
+                {source, strip_url_creds(Source)},
+                {target, strip_url_creds(Target)},
                 {state, DocState},
                 {error_count, ErrorCount},
                 {info, StateInfo},
@@ -232,6 +235,16 @@ handle_replicator_doc_query(complete, Acc) ->
     {stop, Acc}.
 
 
+-spec strip_url_creds(binary() | {[_]}) -> binary().
+strip_url_creds(Endpoint) ->
+    case couch_replicator_docs:parse_rep_db(Endpoint, [], []) of
+        #httpdb{url=Url} ->
+            iolist_to_binary(couch_util:url_strip_password(Url));
+        LocalDb when is_binary(LocalDb) ->
+            LocalDb
+    end.
+
+
 -spec filter_replicator_doc_query(atom(), [atom()]) -> boolean().
 filter_replicator_doc_query(_DocState, []) ->
     true;
@@ -267,6 +280,8 @@ doc_from_db(RepDb, DocId, UserCtx) ->
     case fabric:open_doc(RepDb, DocId, [UserCtx, ejson_body]) of
         {ok, Doc} ->
             {Props} = couch_doc:to_json_obj(Doc, []),
+            Source = get_value(<<"source">>, Props),
+            Target = get_value(<<"target">>, Props),
             State = get_value(<<"_replication_state">>, Props, null),
             StartTime = get_value(<<"_replication_start_time">>, Props, null),
             StateTime = get_value(<<"_replication_state_time">>, Props, null),
@@ -284,6 +299,8 @@ doc_from_db(RepDb, DocId, UserCtx) ->
                 {doc_id, DocId},
                 {database, RepDb},
                 {id, null},
+                {source, strip_url_creds(Source)},
+                {target, strip_url_creds(Target)},
                 {state, State},
                 {error_count, ErrorCount},
                 {info, StateInfo},
@@ -359,4 +376,41 @@ expect_rep_user_ctx(Name, Role) ->
             #rep{user_ctx = UserCtx}
         end).
 
+
+strip_url_creds_test_() ->
+     {
+        foreach,
+        fun () -> meck:expect(config, get, fun(_, _, Default) -> Default end) end,
+        fun (_) -> meck:unload() end,
+        [
+            t_strip_local_db_creds(),
+            t_strip_http_basic_creds(),
+            t_strip_http_props_creds()
+        ]
+    }.
+
+
+t_strip_local_db_creds() ->
+    ?_test(?assertEqual(<<"localdb">>, strip_url_creds(<<"localdb">>))).
+
+
+t_strip_http_basic_creds() ->
+    ?_test(begin
+        Url1 = <<"http://adm:pass@host/db">>,
+        ?assertEqual(<<"http://adm:*****@host/db/">>, strip_url_creds(Url1)),
+        Url2 = <<"https://adm:pass@host/db">>,
+        ?assertEqual(<<"https://adm:*****@host/db/">>, strip_url_creds(Url2))
+    end).
+
+
+t_strip_http_props_creds() ->
+    ?_test(begin
+        Props1 = {[{<<"url">>, <<"http://adm:pass@host/db">>}]},
+        ?assertEqual(<<"http://adm:*****@host/db/">>, strip_url_creds(Props1)),
+        Props2 = {[ {<<"url">>, <<"http://host/db">>},
+            {<<"headers">>, {[{<<"Authorization">>, <<"Basic pa55">>}]}}
+        ]},
+        ?assertEqual(<<"http://host/db/">>, strip_url_creds(Props2))
+    end).
+
 -endif.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/419ea1d9/src/couch_replicator_docs.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_docs.erl b/src/couch_replicator_docs.erl
index c0a9acd..5bdfe92 100644
--- a/src/couch_replicator_docs.erl
+++ b/src/couch_replicator_docs.erl
@@ -12,7 +12,7 @@
 
 -module(couch_replicator_docs).
 
--export([parse_rep_doc/1, parse_rep_doc/2]).
+-export([parse_rep_doc/1, parse_rep_doc/2, parse_rep_db/3]).
 -export([parse_rep_doc_without_id/1, parse_rep_doc_without_id/2]).
 -export([before_doc_update/2, after_doc_read/2]).
 -export([ensure_rep_db_exists/0, ensure_rep_ddoc_exists/1]).

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/419ea1d9/src/couch_replicator_js_functions.hrl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_js_functions.hrl b/src/couch_replicator_js_functions.hrl
index 3724b19..dbad050 100644
--- a/src/couch_replicator_js_functions.hrl
+++ b/src/couch_replicator_js_functions.hrl
@@ -176,15 +176,19 @@
     function(doc) {
         state = doc._replication_state;
         if (state === 'failed') {
+            source = doc.source;
+            target = doc.target;
             start_time = doc._replication_start_time;
             last_updated = doc._replication_state_time;
             state_reason = doc._replication_state_reason;
-            emit('failed', [start_time, last_updated, state_reason]);
+            emit('failed', [source, target, start_time, last_updated, state_reason]);
         } else if (state === 'completed') {
+            source = doc.source;
+            target = doc.target;
             start_time = doc._replication_start_time;
             last_updated = doc._replication_state_time;
             stats = doc._replication_stats;
-            emit('completed', [start_time, last_updated, stats]);
+            emit('completed', [source, target, start_time, last_updated, stats]);
         }
     }
 ">>).


[11/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Merge pull request #94 from cloudant/63012-scheduler-make-docs-dbname-work-again

Fix _scheduler/docs filtering by database name.

Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/c4f7b784
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/c4f7b784
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/c4f7b784

Branch: refs/heads/63012-scheduler
Commit: c4f7b784214bcab0975afee39547121c606a406c
Parents: 1aa3c3d bcebf2c
Author: Nick Vatamaniuc <ni...@users.noreply.github.com>
Authored: Wed Oct 26 12:28:30 2016 -0400
Committer: GitHub <no...@github.com>
Committed: Wed Oct 26 12:28:30 2016 -0400

----------------------------------------------------------------------
 src/couch_replicator_doc_processor.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------



[31/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Fix job summary crash for local endpoints

Previously proxy url manipulation code assumed all endpoints to be remote.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/cb2777b5
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/cb2777b5
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/cb2777b5

Branch: refs/heads/63012-scheduler
Commit: cb2777b50bcc98b8768daf9bad37366c5285667c
Parents: 85dece2
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Fri Nov 25 11:47:56 2016 -0500
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Fri Nov 25 11:47:56 2016 -0500

----------------------------------------------------------------------
 src/couch_replicator_scheduler.erl | 15 ++++++++-------
 1 file changed, 8 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/cb2777b5/src/couch_replicator_scheduler.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_scheduler.erl b/src/couch_replicator_scheduler.erl
index f08e829..af2262b 100644
--- a/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator_scheduler.erl
@@ -119,12 +119,6 @@ job_summary(JobId, HealthThreshold) ->
                 {Pid, ErrorCount} when is_pid(Pid) ->
                      {running, null}
             end,
-            StrippedProxyURL = case (Rep#rep.source)#httpdb.proxy_url of
-                undefined ->
-                    null;
-                ProxyURL when is_list(ProxyURL) ->
-                    list_to_binary(couch_util:url_strip_password(ProxyURL))
-            end,
             [
                 {source, iolist_to_binary(ejson_url(Rep#rep.source))},
                 {target, iolist_to_binary(ejson_url(Rep#rep.target))},
@@ -133,13 +127,20 @@ job_summary(JobId, HealthThreshold) ->
                 {error_count, ErrorCount},
                 {last_updated, last_updated(History)},
                 {start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)},
-                {proxy, StrippedProxyURL}
+                {proxy, job_proxy_url(Rep#rep.source)}
             ];
         {error, not_found} ->
             nil  % Job might have just completed
     end.
 
 
+job_proxy_url(#httpdb{proxy_url = ProxyUrl}) when is_list(ProxyUrl) ->
+    list_to_binary(couch_util:url_strip_password(ProxyUrl));
+
+job_proxy_url(_Endpoint) ->
+    null.
+
+
 -spec health_threshold() -> non_neg_integer().
 health_threshold() ->
     config:get_integer("replicator", "health_threshold",


[48/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Merge pull request #118 from cloudant/63012-scheduler-update-from-upstream

63012 scheduler update from upstream

Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/0b51c1b6
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/0b51c1b6
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/0b51c1b6

Branch: refs/heads/63012-scheduler
Commit: 0b51c1b6a8e686eb57a5abe5072a6fd65856a69c
Parents: bcd852d 318e9f8
Author: Nick Vatamaniuc <ni...@users.noreply.github.com>
Authored: Fri Mar 3 14:58:13 2017 -0500
Committer: GitHub <no...@github.com>
Committed: Fri Mar 3 14:58:13 2017 -0500

----------------------------------------------------------------------
 src/couch_multidb_changes.erl               |  7 +-
 src/couch_replicator_changes_reader.erl     | 24 +++++-
 src/couch_replicator_docs.erl               | 20 +++--
 test/couch_replicator_id_too_long_tests.erl | 94 ++++++++++++++++++++++++
 4 files changed, 135 insertions(+), 10 deletions(-)
----------------------------------------------------------------------



[03/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Merge pull request #91 from cloudant/63012-scheduler-cancel-via-replication-id

Canceling a transient replication via a "replication_id" works again

Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/a1d7554a
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/a1d7554a
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/a1d7554a

Branch: refs/heads/63012-scheduler
Commit: a1d7554a88f26b3d3ca9d4d48b7262aba6036648
Parents: 26fc4df c9f0486
Author: Nick Vatamaniuc <ni...@users.noreply.github.com>
Authored: Fri Oct 21 10:53:14 2016 -0400
Committer: GitHub <no...@github.com>
Committed: Fri Oct 21 10:53:14 2016 -0400

----------------------------------------------------------------------
 src/couch_replicator_docs.erl | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[21/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Merge pull request #98 from cloudant/63012-scheduler-authorization-fix

Fix transient replications authorization

Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/2f95128e
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/2f95128e
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/2f95128e

Branch: refs/heads/63012-scheduler
Commit: 2f95128efc112f0b62d6b6e500d21d576d46bb0e
Parents: 1ad72e3 b68d79e
Author: Nick Vatamaniuc <ni...@users.noreply.github.com>
Authored: Tue Nov 8 17:33:06 2016 -0500
Committer: GitHub <no...@github.com>
Committed: Tue Nov 8 17:33:06 2016 -0500

----------------------------------------------------------------------
 src/couch_replicator.erl | 99 +++++++++++++++++++++++++++++++++----------
 1 file changed, 77 insertions(+), 22 deletions(-)
----------------------------------------------------------------------



[14/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Make sure jobs do not retry HTTP requests for too long

Replication HTTP requests have their own retry mechanism. If a request fails
it will be individually retried a few times in a row without crashing the
whole replication job. This can help with short / intermetent network failures.

However, longer retries in that part of code will interact unfavorably
with the scheduler, as it makes the job seem to run without crashing long enough
for the scheduler to consider it to be "healthy". When in reality the job might
have been wasting all that time retrying  without making any real progress.

To fix, record the first time the job start crashing in the #httpdb{} record
which is passed recursively between retry attempts. If a request was retrying
close to what the current scheduler health hreshold value is, stop retrying and
crash the whole job. This ensure scheduler will register the job as crashing
consecutively and will back it off as intended.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/c40b5c21
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/c40b5c21
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/c40b5c21

Branch: refs/heads/63012-scheduler
Commit: c40b5c21760e45ef91df93e73c56327c7605b236
Parents: 9584fcd
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Fri Oct 28 01:32:28 2016 -0400
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Fri Oct 28 11:28:59 2016 -0400

----------------------------------------------------------------------
 src/couch_replicator_api_wrap.hrl |  3 ++-
 src/couch_replicator_httpc.erl    | 44 +++++++++++++++++++++++++++++-----
 2 files changed, 40 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/c40b5c21/src/couch_replicator_api_wrap.hrl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_api_wrap.hrl b/src/couch_replicator_api_wrap.hrl
index eee04da..d15d214 100644
--- a/src/couch_replicator_api_wrap.hrl
+++ b/src/couch_replicator_api_wrap.hrl
@@ -24,7 +24,8 @@
     retries = 10,
     wait = 250,         % milliseconds
     httpc_pool = nil,
-    http_connections
+    http_connections,
+    first_error_timestamp = nil
 }).
 
 -record(oauth, {

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/c40b5c21/src/couch_replicator_httpc.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_httpc.erl b/src/couch_replicator_httpc.erl
index 4606e99..5fb7842 100644
--- a/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator_httpc.erl
@@ -274,13 +274,45 @@ discard_message(ReqId, Worker, Count) ->
 maybe_retry(Error, Worker, #httpdb{retries = 0} = HttpDb, Params) ->
     report_error(Worker, HttpDb, Params, {error, Error});
 
-maybe_retry(Error, _Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb,
+maybe_retry(Error, Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb,
     Params) ->
-    ok = timer:sleep(Wait),
-    log_retry_error(Params, HttpDb, Wait, Error),
-    Wait2 = erlang:min(Wait * 2, ?MAX_WAIT),
-    NewHttpDb = HttpDb#httpdb{retries = Retries - 1, wait = Wait2},
-    throw({retry, NewHttpDb, Params}).
+    case total_error_time_exceeded(HttpDb) of
+        true ->
+            report_error(Worker, HttpDb, Params, {error, Error});
+        false ->
+            ok = timer:sleep(Wait),
+            log_retry_error(Params, HttpDb, Wait, Error),
+            Wait2 = erlang:min(Wait * 2, ?MAX_WAIT),
+            HttpDb1 = HttpDb#httpdb{retries = Retries - 1, wait = Wait2},
+            HttpDb2 = update_first_error_timestamp(HttpDb1),
+            throw({retry, HttpDb2, Params})
+    end.
+
+
+% When retrying, check to make total time spent retrying a request is below
+% the current scheduler health threshold. The goal is to not exceed the
+% threshold, otherwise the job which keep retrying too long will still be
+% considered healthy.
+total_error_time_exceeded(#httpdb{first_error_timestamp = nil}) ->
+    false;
+
+total_error_time_exceeded(#httpdb{first_error_timestamp = ErrorTimestamp}) ->
+    HealthThresholdSec = couch_replicator_scheduler:health_threshold(),
+    % Theshold value is halved because in the calling code the next step
+    % is a doubling. Not halving here could mean sleeping too long and
+    % exceeding the health threshold.
+    ThresholdUSec = (HealthThresholdSec / 2) * 1000000,
+    timer:now_diff(os:timestamp(), ErrorTimestamp) > ThresholdUSec.
+
+
+% Remember the first time an error occurs. This value is used later to check
+% the total time spend retrying a request. Because retrying is cursive, on
+% successful result #httpdb{} record is reset back to the original value.
+update_first_error_timestamp(#httpdb{first_error_timestamp = nil} = HttpDb) ->
+    HttpDb#httpdb{first_error_timestamp = os:timestamp()};
+
+update_first_error_timestamp(HttpDb) ->
+    HttpDb.
 
 
 log_retry_error(Params, HttpDb, Wait, Error) ->


[02/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Canceling a transient replication via a "replication_id" works again

Canceling a transient replication by POST-ing to _replicate was failing because
it was assumed request body had to always be well-formed (with source, target,
...).

However it is valid to cancel a replication with a body which has just
`"cancel": true` and either `replication_id`, `id` or `_local_id` fields.

Example:

Creating a transient continuous replication
```
http --auth=adm:pass :15984/_replicate \
  source='http://adm:pass@localhost:15984/s' \
  target='http://adm:pass@localhost:15984/r' \
  create_target:='true' \
  continuous:='true'
{
    "_local_id": "14ed9e9f8411232bdcd2387a8f578806+continuous+create_target",
    "ok": true
}
```

Canceling

```
http --auth=adm:pass :15984/_replicate \
  cancel:='true' \
  replication_id='14ed9e9f8411232bdcd2387a8f578806+continuous+create_target'
{
    "_local_id": "14ed9e9f8411232bdcd2387a8f578806+continuous+create_target",
    "ok": true
}
```

Log shows:

```
Canceling replication '14ed9e9f8411232bdcd2387a8f578806+continuous+create_target'
```


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/c9f04862
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/c9f04862
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/c9f04862

Branch: refs/heads/63012-scheduler
Commit: c9f04862f7431845919476421dbaab4468d2de71
Parents: 26fc4df
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Thu Oct 20 23:47:26 2016 -0400
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Thu Oct 20 23:47:26 2016 -0400

----------------------------------------------------------------------
 src/couch_replicator_docs.erl | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/c9f04862/src/couch_replicator_docs.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_docs.erl b/src/couch_replicator_docs.erl
index a3e6b35..777c8a4 100644
--- a/src/couch_replicator_docs.erl
+++ b/src/couch_replicator_docs.erl
@@ -194,7 +194,12 @@ parse_rep_doc_without_id(RepDoc) ->
 -spec parse_rep_doc({[_]}, #user_ctx{}) -> {ok, #rep{}}.
 parse_rep_doc(Doc, UserCtx) ->
     {ok, Rep} = parse_rep_doc_without_id(Doc, UserCtx),
-    {ok, update_rep_id(Rep)}.
+    case get_value(cancel, Rep#rep.options, false) of
+        true ->
+            {ok, Rep};
+        false ->
+            {ok, update_rep_id(Rep)}
+    end.
 
 
 -spec parse_rep_doc_without_id({[_]}, #user_ctx{}) -> {ok, #rep{}}.


[43/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Fix Travis build.

Travis on GH currently copies stuff from this replicator branch into a
replicator source dir from upstream which has an extra test file. That
test file doesn't pass without upstream changes.

Make sure to clear the replicator directory before copying all the
stuff into it.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/f52b503d
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/f52b503d
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/f52b503d

Branch: refs/heads/63012-scheduler
Commit: f52b503d6a2f2e069c1b1902260d56b823e81031
Parents: cc9c6c6
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Thu Mar 2 17:42:26 2017 -0500
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Thu Mar 2 17:50:33 2017 -0500

----------------------------------------------------------------------
 .travis.yml | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f52b503d/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 3ef2ac4..e1f7e85 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -33,6 +33,7 @@ before_install:
 before_script:
   - cd couchdb
   - ./configure --disable-docs --disable-fauxton
+  - rm -rf ./src/couch_replicator/*
   - cp -r ../!(couchdb) ./src/couch_replicator
   - make
 


[46/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Forward-port "Fix crashes when replicator db is deleted" from master

This is port of commit: 7a2b2b68bf4afa6d20b56a8ae51361f83981412a from master.

That commit was made to couch_replicator_manager so it had to be ported by
hand.

Most of the logic for replicator document handling is now in
couch_replicator_docs.erl module. The scanner part is in couch_multidb_changes.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/5bebb8a6
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/5bebb8a6
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/5bebb8a6

Branch: refs/heads/63012-scheduler
Commit: 5bebb8a61a70cc783eedf430b46155409099d11e
Parents: 38c9405
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Fri Mar 3 13:55:13 2017 -0500
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Fri Mar 3 13:55:13 2017 -0500

----------------------------------------------------------------------
 src/couch_multidb_changes.erl |  7 ++++++-
 src/couch_replicator_docs.erl | 17 ++++++++++++-----
 2 files changed, 18 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/5bebb8a6/src/couch_multidb_changes.erl
----------------------------------------------------------------------
diff --git a/src/couch_multidb_changes.erl b/src/couch_multidb_changes.erl
index d5016b6..9c0b7cf 100644
--- a/src/couch_multidb_changes.erl
+++ b/src/couch_multidb_changes.erl
@@ -276,7 +276,12 @@ filter_shards(DbName, DbSuffix) ->
     false ->
         [];
     true ->
-        [ShardName || #shard{name = ShardName} <- mem3:local_shards(DbName)]
+        try
+            [ShardName || #shard{name = ShardName} <- mem3:local_shards(DbName)]
+        catch
+            error:database_does_not_exist ->
+                []
+        end
     end.
 
 

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/5bebb8a6/src/couch_replicator_docs.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_docs.erl b/src/couch_replicator_docs.erl
index 204cb39..1bc852e 100644
--- a/src/couch_replicator_docs.erl
+++ b/src/couch_replicator_docs.erl
@@ -147,6 +147,9 @@ ensure_rep_ddoc_exists(RepDb) ->
 -spec ensure_rep_ddoc_exists(binary(), binary()) -> ok.
 ensure_rep_ddoc_exists(RepDb, DDocId) ->
     case open_rep_doc(RepDb, DDocId) of
+        {not_found, no_db_file} ->
+            %% database was deleted.
+            ok;
         {not_found, _Reason} ->
             DocProps = replication_design_doc_props(DDocId),
             DDoc = couch_doc:from_json_obj({DocProps}),
@@ -350,11 +353,15 @@ update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) ->
     end.
 
 open_rep_doc(DbName, DocId) ->
-    {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
-    try
-        couch_db:open_doc(Db, DocId, [ejson_body])
-    after
-        couch_db:close(Db)
+    case couch_db:open_int(DbName, [?CTX, sys_db]) of
+        {ok, Db} ->
+            try
+                couch_db:open_doc(Db, DocId, [ejson_body])
+            after
+                couch_db:close(Db)
+            end;
+        Else ->
+            Else
     end.
 
 save_rep_doc(DbName, Doc) ->


[04/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Posted by va...@apache.org.
Add support for _scheduler/jobs/<jobid> and _scheduler/docs/<docid>


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/f1140e94
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/f1140e94
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/f1140e94

Branch: refs/heads/63012-scheduler
Commit: f1140e941d8e417f4f271f2b3c3f81d4f09a2e67
Parents: a1d7554
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Fri Oct 21 15:44:53 2016 -0400
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Fri Oct 21 15:55:44 2016 -0400

----------------------------------------------------------------------
 src/couch_replicator.erl               | 51 +++++++++++++++
 src/couch_replicator_doc_processor.erl | 23 ++++++-
 src/couch_replicator_docs.erl          |  3 +-
 src/couch_replicator_ids.erl           | 13 +++-
 src/couch_replicator_scheduler.erl     | 99 +++++++++++++++++------------
 5 files changed, 143 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f1140e94/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index b888f82..0bfd9f6 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -15,6 +15,7 @@
 -export([replicate/2, ensure_rep_db_exists/0]).
 -export([stream_active_docs_info/3, stream_terminal_docs_info/4]).
 -export([replication_states/0]).
+-export([job/1, doc/3]).
 
 -include_lib("couch/include/couch_db.hrl").
 -include("couch_replicator.hrl").
@@ -230,3 +231,53 @@ filter_replicator_doc_query(_DocState, []) ->
     true;
 filter_replicator_doc_query(State, States) when is_list(States) ->
     lists:member(State, States).
+
+
+-spec job(binary()) -> {ok, {[_]}} | {error, not_found}.
+job(JobId0) when is_binary(JobId0) ->
+    JobId = couch_replicator_ids:convert(JobId0),
+    {Res, _Bad} = rpc:multicall(couch_replicator_scheduler, job, [JobId]),
+    case [JobInfo || {ok, JobInfo} <- Res] of
+        [JobInfo| _] ->
+            {ok, JobInfo};
+        [] ->
+            {error, not_found}
+    end.
+
+
+-spec doc(binary(), binary(), [_]) -> {ok, {[_]}} | {error, not_found}.
+doc(RepDb, DocId, UserCtx) ->
+    {Res, _Bad} = rpc:multicall(couch_replicator_doc_processor, doc, [RepDb, DocId]),
+    case [DocInfo || {ok, DocInfo} <- Res] of
+        [DocInfo| _] ->
+            {ok, DocInfo};
+        [] ->
+            doc_from_db(RepDb, DocId, UserCtx)
+    end.
+
+
+-spec doc_from_db(binary(), binary(), [_]) -> {ok, {[_]}} | {error, not_found}.
+doc_from_db(RepDb, DocId, UserCtx) ->
+    case fabric:open_doc(RepDb, DocId, [UserCtx, ejson_body]) of
+        {ok, Doc} ->
+            {Props} = couch_doc:to_json_obj(Doc, []),
+            State = couch_util:get_value(<<"_replication_state">>, Props, null),
+            {StateInfo, ErrorCount} = case State of
+                <<"completed">> ->
+                    {couch_util:get_value(<<"_replication_stats">>, Props, null), 0};
+                <<"failed">> ->
+                    {couch_util:get_value(<<"_replication_state_reason">>, Props, null), 1};
+                _OtherState ->
+                    {null, 0}
+            end,
+            {ok, {[
+                {doc_id, DocId},
+                {database, RepDb},
+                {id, null},
+                {state, State},
+                {error_count, ErrorCount},
+                {info, StateInfo}
+            ]}};
+         {not_found, _Reason} ->
+            {error, not_found}
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f1140e94/src/couch_replicator_doc_processor.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_doc_processor.erl b/src/couch_replicator_doc_processor.erl
index 60a3ad1..0349fa6 100644
--- a/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator_doc_processor.erl
@@ -14,7 +14,7 @@
 -behaviour(couch_multidb_changes).
 
 -export([start_link/0]).
--export([docs/1]).
+-export([docs/1, doc/2]).
 
 % multidb changes callback
 -export([db_created/2, db_deleted/2, db_found/2, db_change/3]).
@@ -375,6 +375,27 @@ docs(States) ->
     end, [], ?MODULE).
 
 
+-spec doc(binary(), binary()) -> {ok, {[_]}} | {error, not_found}.
+doc(Db, DocId) ->
+    HealthThreshold = couch_replicator_scheduler:health_threshold(),
+    Res = ets:foldl(fun(_RDoc, [_] = Acc) -> Acc;
+        (RDoc, []) ->
+            {Shard, RDocId} = RDoc#rdoc.id,
+            case {mem3:dbname(Shard), RDocId} of
+                {Db, DocId} ->
+                    [ejson_doc(RDoc, HealthThreshold)];
+                {_OtherDb, _OtherDocId} ->
+                    []
+            end
+    end, [], ?MODULE),
+    case Res of
+        [DocInfo] ->
+            {ok, DocInfo};
+        [] ->
+            {error, not_found}
+    end.
+
+
 -spec ejson_state_info(binary() | nil) -> binary() | null.
 ejson_state_info(nil) ->
     null;

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f1140e94/src/couch_replicator_docs.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_docs.erl b/src/couch_replicator_docs.erl
index 777c8a4..aeb9219 100644
--- a/src/couch_replicator_docs.erl
+++ b/src/couch_replicator_docs.erl
@@ -438,8 +438,7 @@ convert_options([{<<"cancel">>, V} | R]) ->
     [{cancel, V} | convert_options(R)];
 convert_options([{IdOpt, V} | R]) when IdOpt =:= <<"_local_id">>;
         IdOpt =:= <<"replication_id">>; IdOpt =:= <<"id">> ->
-    Id = lists:splitwith(fun(X) -> X =/= $+ end, ?b2l(V)),
-    [{id, Id} | convert_options(R)];
+    [{id, couch_replicator_ids:convert(V)} | convert_options(R)];
 convert_options([{<<"create_target">>, V} | _R]) when not is_boolean(V)->
     throw({bad_request, <<"parameter `create_target` must be a boolean">>});
 convert_options([{<<"create_target">>, V} | R]) ->

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f1140e94/src/couch_replicator_ids.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_ids.erl b/src/couch_replicator_ids.erl
index 50760b0..565ed9d 100644
--- a/src/couch_replicator_ids.erl
+++ b/src/couch_replicator_ids.erl
@@ -12,7 +12,7 @@
 
 -module(couch_replicator_ids).
 
--export([replication_id/1, replication_id/2]).
+-export([replication_id/1, replication_id/2, convert/1]).
 
 -include_lib("couch/include/couch_db.hrl").
 -include("couch_replicator_api_wrap.hrl").
@@ -67,6 +67,17 @@ replication_id(#rep{user_ctx = UserCtx} = Rep, 1) ->
     maybe_append_filters([HostName, Src, Tgt], Rep).
 
 
+-spec convert([_] | binary() | {string(), string()}) -> {string(), string()}.
+convert(Id) when is_list(Id) ->
+    convert(?l2b(Id));
+
+convert(Id) when is_binary(Id) ->
+    lists:splitwith(fun(Char) -> Char =/= $+ end, ?b2l(Id));
+
+convert({BaseId, Ext} = Id) when is_list(BaseId), is_list(Ext) ->
+    Id.
+
+
 % Private functions
 
 maybe_append_filters(Base,

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f1140e94/src/couch_replicator_scheduler.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_scheduler.erl b/src/couch_replicator_scheduler.erl
index 3ca417a..b43b36c 100644
--- a/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator_scheduler.erl
@@ -24,7 +24,7 @@
 -export([start_link/0, add_job/1, remove_job/1, reschedule/0]).
 -export([rep_state/1, find_jobs_by_dbname/1, find_jobs_by_doc/2]).
 -export([job_summary/2, health_threshold/0]).
--export([jobs/0]).
+-export([jobs/0, job/1]).
 
 %% gen_server callbacks
 -export([init/1, terminate/2, code_change/3]).
@@ -748,48 +748,63 @@ ejson_url(DbName) when is_binary(DbName) ->
     DbName.
 
 
+-spec job_ejson(#job{}) -> {[_ | _]}.
+job_ejson(Job) ->
+    Rep = Job#job.rep,
+    Source = ejson_url(Rep#rep.source),
+    Target = ejson_url(Rep#rep.target),
+    History = lists:map(fun(Event) ->
+    EventProps  = case Event of
+        {{crashed, Reason}, _When} ->
+            [{type, crashed}, {reason, crash_reason_json(Reason)}];
+        {Type, _When} ->
+            [{type, Type}]
+        end,
+        {_Type, {_Mega, _Sec, Micros}=When} = Event,
+        {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time(When),
+        ISO8601 = iolist_to_binary(io_lib:format(
+            "~B-~2..0B-~2..0BT~2..0B-~2..0B-~2..0B.~BZ",
+            [Y,Mon,D,H,Min,S,Micros]
+        )),
+        {[{timestamp, ISO8601} | EventProps]}
+    end, Job#job.history),
+    {BaseID, Ext} = Job#job.id,
+    Pid = case Job#job.pid of
+        undefined ->
+            null;
+        P when is_pid(P) ->
+            ?l2b(pid_to_list(P))
+    end,
+    {[
+        {id, iolist_to_binary([BaseID, Ext])},
+        {pid, Pid},
+        {source, iolist_to_binary(Source)},
+        {target, iolist_to_binary(Target)},
+        {database, Rep#rep.db_name},
+        {user, (Rep#rep.user_ctx)#user_ctx.name},
+        {doc_id, Rep#rep.doc_id},
+        {history, History},
+        {node, node()}
+    ]}.
+
+
 -spec jobs() -> [[tuple()]].
 jobs() ->
     ets:foldl(fun(Job, Acc) ->
-        Rep = Job#job.rep,
-        Source = ejson_url(Rep#rep.source),
-        Target = ejson_url(Rep#rep.target),
-        History = lists:map(fun(Event) ->
-            EventProps  = case Event of
-                {{crashed, Reason}, _When} ->
-                    [{type, crashed}, {reason, crash_reason_json(Reason)}];
-                {Type, _When} ->
-                    [{type, Type}]
-            end,
-            {_Type, {_Mega, _Sec, Micros}=When} = Event,
-            {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time(When),
-            ISO8601 = iolist_to_binary(io_lib:format(
-                "~B-~2..0B-~2..0BT~2..0B-~2..0B-~2..0B.~BZ",
-                [Y,Mon,D,H,Min,S,Micros]
-            )),
-            {[{timestamp, ISO8601} | EventProps]}
-        end, Job#job.history),
-        {BaseID, Ext} = Job#job.id,
-        Pid = case Job#job.pid of
-            undefined ->
-                null;
-            P when is_pid(P) ->
-                ?l2b(pid_to_list(P))
-        end,
-        [{[
-            {id, iolist_to_binary([BaseID, Ext])},
-            {pid, Pid},
-            {source, iolist_to_binary(Source)},
-            {target, iolist_to_binary(Target)},
-            {database, Rep#rep.db_name},
-            {user, (Rep#rep.user_ctx)#user_ctx.name},
-            {doc_id, Rep#rep.doc_id},
-            {history, History},
-            {node, node()}
-        ]} | Acc]
+        [job_ejson(Job) | Acc]
     end, [], couch_replicator_scheduler).
 
 
+-spec job(job_id()) -> {ok, {[_ | _]}} | {error, not_found}.
+job(JobId) ->
+    case job_by_id(JobId) of
+        {ok, Job} ->
+            {ok, job_ejson(Job)};
+        Error ->
+            Error
+    end.
+
+
 crash_reason_json({_CrashType, Info}) when is_binary(Info) ->
     Info;
 crash_reason_json(Reason) when is_binary(Reason) ->
@@ -884,7 +899,7 @@ latest_crash_timestamp_test_() ->
 
 
 last_started_test_() ->
-    [?_assertEqual({0, R, 0}, last_started(job(H))) || {R, H} <- [
+    [?_assertEqual({0, R, 0}, last_started(testjob(H))) || {R, H} <- [
          {0, [added()]},
          {0, [crashed(1)]},
          {1, [started(1)]},
@@ -895,9 +910,9 @@ last_started_test_() ->
 
 
 oldest_job_first_test() ->
-    J0 = job([crashed()]),
-    J1 = job([started(1)]),
-    J2 = job([started(2)]),
+    J0 = testjob([crashed()]),
+    J1 = testjob([started(1)]),
+    J2 = testjob([started(2)]),
     Sort = fun(Jobs) -> lists:sort(fun oldest_job_first/2, Jobs) end,
     ?assertEqual([], Sort([])),
     ?assertEqual([J1], Sort([J1])),
@@ -1312,7 +1327,7 @@ oneshot_running(Id) when is_integer(Id) ->
     }.
 
 
-job(Hist) when is_list(Hist) ->
+testjob(Hist) when is_list(Hist) ->
     #job{history = Hist}.