You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2017/04/22 17:54:01 UTC

[couchdb] 07/08: Stitch scheduling replicator together.

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch 63012-scheduler
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 5b612222a42f653b9396301bc2802f29dd133436
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Fri Nov 25 11:47:56 2016 -0500

    Stitch scheduling replicator together.
    
    Glue together all the scheduling replicator pieces.
    
    Scheduler is the main component. It can run a large number of replication jobs
    by switching between them, stopping and starting some periodically. Jobs
    which fail are backed off exponentially. Normal (non-continuous) jobs will be
    allowed to run to completion to preserve their current semantics.
    
    Scheduler behavior can configured by these configuration options in
    `[replicator]` sections:
    
     * `max_jobs` : Number of actively running replications. Making this too high
     could cause performance issues. Making it too low could mean replications jobs
     might not have enough time to make progress before getting unscheduled again.
     This parameter can be adjusted at runtime and will take effect during next
     reschudling cycle.
    
     * `interval` : Scheduling interval in milliseconds. During each reschedule
     cycle scheduler might start or stop up to "max_churn" number of jobs.
    
     * `max_churn` : Maximum number of replications to start and stop during
     rescheduling. This parameter along with "interval" defines the rate of job
     replacement. During startup, however a much larger number of jobs could be
     started (up to max_jobs) in short period of time.
    
    Replication jobs are added to the scheduler by the document processor or from
    the `couch_replicator:replicate/2` function when called from `_replicate` HTTP
    endpoint handler.
    
    Document processor listens for updates via couch_mutlidb_changes module then
    tries to add replication jobs to the scheduler. Sometimes translating a
    document update to a replication job could fail, either permantly (if document
    is malformed and missing some expected fields for example) or temporarily if
    it is a filtered replication and filter cannot be fetched. A failed filter
    fetch will be retried with an exponential backoff.
    
    couch_replicator_clustering is in charge of monitoring cluster membership
    changes. When membership changes, after a configurable quiet period, a rescan
    will be initiated. Rescan will shufle replication jobs to make sure a
    replication job is running on only one node.
    
    A new set of stats were added to introspect scheduler and doc processor
    internals.
    
    The top replication supervisor structure is `rest_for_one`. This means if
    a child crashes, all children to the "right" of it will be restarted (if
    visualized supervisor hierarchy as an upside-down tree). Clustering,
    connection pool and rate limiter are towards the "left" as they are more
    fundamental, if clustering child crashes, most other components will be
    restart. Doc process or and multi-db changes children are towards the "right".
    If they crash, they can be safely restarted without affecting already running
    replication or components like clustering or connection pool.
    
    Jira: COUCHDB-3324
---
 Makefile                                           |    4 +-
 dev/run                                            |   14 +
 rel/files/eunit.ini                                |    3 +
 rel/overlay/etc/default.ini                        |    8 +-
 src/couch_replicator/README.md                     |  292 +++++
 src/couch_replicator/priv/stats_descriptions.cfg   |   96 ++
 src/couch_replicator/src/couch_replicator.app.src  |   26 +-
 src/couch_replicator/src/couch_replicator.erl      | 1219 +++++---------------
 src/couch_replicator/src/couch_replicator.hrl      |   35 +-
 .../src/couch_replicator_api_wrap.erl              |   50 +-
 .../src/couch_replicator_api_wrap.hrl              |    3 +-
 src/couch_replicator/src/couch_replicator_docs.erl |    4 +-
 .../src/couch_replicator_job_sup.erl               |    7 +-
 .../src/couch_replicator_js_functions.hrl          |    8 +-
 .../src/couch_replicator_manager.erl               | 1034 +----------------
 .../src/couch_replicator_scheduler.erl             |   22 +-
 src/couch_replicator/src/couch_replicator_sup.erl  |   54 +-
 .../src/couch_replicator_worker.erl                |    3 +
 .../test/couch_replicator_compact_tests.erl        |   30 +-
 .../test/couch_replicator_connection_tests.erl     |  241 ++++
 .../test/couch_replicator_httpc_pool_tests.erl     |    2 +-
 .../test/couch_replicator_many_leaves_tests.erl    |   24 +-
 .../test/couch_replicator_modules_load_tests.erl   |   11 +-
 .../test/couch_replicator_proxy_tests.erl          |   69 ++
 .../test/couch_replicator_test_helper.erl          |   22 +-
 .../couch_replicator_use_checkpoints_tests.erl     |   24 +-
 test/javascript/tests/replicator_db_bad_rep_id.js  |    3 +-
 27 files changed, 1240 insertions(+), 2068 deletions(-)

diff --git a/Makefile b/Makefile
index 1ace170..e105054 100644
--- a/Makefile
+++ b/Makefile
@@ -115,7 +115,9 @@ else
 endif
 	# This might help with emfile errors during `make javascript`: ulimit -n 10240
 	@rm -rf dev/lib
-	@dev/run -n 1 -q --with-admin-party-please test/javascript/run $(suites)
+	@dev/run -n 1 -q --with-admin-party-please \
+            -c 'startup_jitter=0' \
+            test/javascript/run $(suites)
 
 
 .PHONY: check-qs
diff --git a/dev/run b/dev/run
index 94f06ef..793e4c4 100755
--- a/dev/run
+++ b/dev/run
@@ -125,6 +125,8 @@ def setup_argparse():
                       help='HAProxy port')
     parser.add_option('--node-number', dest="node_number", type=int, default=1,
                       help='The node number to seed them when creating the node(s)')
+    parser.add_option('-c', '--config-overrides', action="append", default=[],
+                      help='Optional key=val config overrides. Can be repeated')
     return parser.parse_args()
 
 
@@ -143,6 +145,7 @@ def setup_context(opts, args):
             'with_haproxy': opts.with_haproxy,
             'haproxy': opts.haproxy,
             'haproxy_port': opts.haproxy_port,
+            'config_overrides': opts.config_overrides,
             'procs': []}
 
 
@@ -190,6 +193,16 @@ def setup_configs(ctx):
         write_config(ctx, node, env)
 
 
+def apply_config_overrides(ctx, content):
+    for kv_str in  ctx['config_overrides']:
+        key, val = kv_str.split('=')
+        key, val = key.strip(), val.strip()
+        match = "[;=]{0,2}%s.*" % key
+        repl = "%s = %s" % (key, val)
+        content = re.sub(match, repl, content)
+    return content
+
+
 def get_ports(idnode):
     assert idnode
     return ((10000 * idnode) + 5984, (10000 * idnode) + 5986)
@@ -211,6 +224,7 @@ def write_config(ctx, node, env):
 
         if base == "default.ini":
             content = hack_default_ini(ctx, node, content)
+            content = apply_config_overrides(ctx, content)
         elif base == "local.ini":
             content = hack_local_ini(ctx, content)
 
diff --git a/rel/files/eunit.ini b/rel/files/eunit.ini
index c706fff..2536a6a 100644
--- a/rel/files/eunit.ini
+++ b/rel/files/eunit.ini
@@ -32,3 +32,6 @@ writer = file
 file = couch.log
 level = info
 
+[replicator]
+; disable jitter to reduce test run times
+startup_jitter = 0
\ No newline at end of file
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 2cb5bca..c758134 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -322,12 +322,8 @@ compression_level = 8 ; from 1 (lowest, fastest) to 9 (highest, slowest), 0 to d
 compressible_types = text/*, application/javascript, application/json, application/xml
 
 [replicator]
-; minimum time between a replicator job restart (milliseconds)
-start_delay = 0
-; random splay time between a replicator job restart (milliseconds)
-start_splay = 0
-; Maximum replicaton retry count can be a non-negative integer or "infinity".
-max_replication_retry_count = 10
+; Random jitter applied on replication job startup (milliseconds)
+;startup_jitter = 5000
 ; More worker processes can give higher network throughput but can also
 ; imply more disk and network IO.
 worker_processes = 4
diff --git a/src/couch_replicator/README.md b/src/couch_replicator/README.md
new file mode 100644
index 0000000..f08ff35
--- /dev/null
+++ b/src/couch_replicator/README.md
@@ -0,0 +1,292 @@
+Developer Oriented Replicator Description
+=========================================
+
+This description of scheduling replicator's functionality is mainly geared to
+CouchDB developers. It dives a bit into the internal and explains how
+everything is connected together.
+
+A natural place to start is the top application supervisor:
+`couch_replicator_sup`. It's a `rest_for_one` so if a child process terminates,
+the rest of the children in the hierarchy following it are also terminated.
+This structure implies a useful constraint -- children lower in the list can
+safely call their siblings which are higher in the list.
+
+A description of each child:
+
+ * `couch_replication_event`: Starts a gen_event publication bus to handle some
+    replication related events. This used for example, to publish cluster
+    membership changes by the `couch_replicator_clustering` process. But is
+    also used in replication tests to monitor for replication events.
+    Notification is performed via the `couch_replicator_notifier:notify/1`
+    function. It's the first (left-most) child because
+    `couch_replicator_clustering` uses it.
+
+ * `couch_replicator_clustering`: This module maintains cluster membership
+    information for the replication application and provides functions to check
+    ownership of replication jobs. A cluster membership change is published via
+    the `gen_event` event server named `couch_replication_event` as previously
+    covered. Published events are `{cluster, stable}` when cluster membership
+    has stabilized, that it, no node membership changes in a given period, and
+    `{cluster, unstable}` which indicates there was a recent change to the
+    cluster membership and now it's considered unstable. Listeners for cluster
+    membership change include `couch_replicator_doc_processor` and
+    `couch_replicator_db_changes`. When doc processor gets an `{cluster,
+    stable}` event it will remove all the replication jobs not belonging to the
+    current node. When `couch_replicator_db_chanages` gets a `{cluster,
+    stable}` event, it will restart the `couch_multidb_changes` process it
+    controls, which will launch an new scan of all the replicator databases.
+
+  * `couch_replicator_connection`: Maintains a global replication connection
+    pool. It allows reusing connections across replication tasks. The Main
+    interface is `acquire/1` and `release/1`. The general idea is once a
+    connection is established, it is kept around for
+    `replicator.connection_close_interval` milliseconds in case another
+    replication task wants to re-use it. It is worth pointing out how linking
+    and monitoring is handled: Workers are linked to the connection pool when
+    they are created. If they crash, the connection pool will receive an 'EXIT'
+    event and clean up after the worker. The connection pool also monitors
+    owners (by monitoring the `Pid` from the `From` argument in the call to
+    `acquire/1`) and cleans up if owner dies, and the pool receives a 'DOWN'
+    message. Another interesting thing is that connection establishment
+    (creation) happens in the owner process so the pool is not blocked on it.
+
+ * `couch_replicator_rate_limiter` : Implements a rate limiter to handle
+    connection throttling from sources or targets where requests return 429
+    error codes. Uses the Additive Increase / Multiplicative Decrease feedback
+    control algorithm to converge on the channel capacity. Implemented using a
+    16-way sharded ETS table to maintain connection state. The table sharding
+    code is split out to `couch_replicator_rate_limiter_tables` module. The
+    purpose of the module it so maintain and continually estimate sleep
+    intervals for each connection represented as a `{Method, Url}` pair. The
+    interval is updated accordingly on each call to `failure/1` or `success/1`
+    calls. For a successful request, a client should call `success/1`. Whenever
+    a 429 response is received the client should call `failure/1`. When no
+    failures are happening the code is ensuring the ETS tables are empty in
+    order to have a lower impact on a running system.
+
+ * `couch_replicator_scheduler` : This is the core component of the scheduling
+    replicator. It's main task is to switch between replication jobs, by
+    stopping some and starting others to ensure all of them make progress.
+    Replication jobs which fail are penalized using an exponential backoff.
+    That is, each consecutive failure will double the time penalty. This frees
+    up system resources for more useful work than just continuously trying to
+    run the same subset of failing jobs.
+
+    The main API function is `add_job/1`. Its argument is an instance of the
+    `#rep{}` record, which could be the result of a document update from a
+    `_replicator` db or the result of a POST to `_replicate` endpoint.
+
+    Each job internally is represented by the `#job{}` record. It contains the
+    original `#rep{}` but also, maintains an event history. The history is a
+    sequence of past events for each job. These are timestamped and ordered
+    such that the most recent event is at the head. History length is limited
+    based on the `replicator.max_history` configuration value. The default is
+    20 entries. History events types are:
+
+    * `added` : job was just added to the scheduler. This is the first event.
+    * `started` : job was started. This was an attempt to run the job.
+    * `stopped` : job was stopped by the scheduler.
+    * `crashed` : job has crashed (instead of stopping cleanly).
+
+    The core of the scheduling algorithm is the `reschedule/1` function. This
+    function is called every `replicator.interval` milliseconds (default is
+    60000 i.e. a minute). During each call the scheduler will try to stop some
+    jobs, start some new ones and will also try to keep the maximum number of
+    jobs running less than `replicator.max_jobs` (deafult 500). So the
+    functions does these operations (actual code paste):
+
+    ```
+    Running = running_job_count(),
+    Pending = pending_job_count(),
+    stop_excess_jobs(State, Running),
+    start_pending_jobs(State, Running, Pending),
+    rotate_jobs(State, Running, Pending),
+    update_running_jobs_stats(State#state.stats_pid)
+    ```
+
+    `Running` is the total number of currently runnig jobs. `Pending` is the
+    total number of jobs waiting to be run. `stop_excess_jobs` will stop any
+    exceeding the `replicator.max_jobs` configured limit. This code takes
+    effect if user reduces the `max_jobs` configuration value.
+    `start_pending_jobs` will start any jobs if there is more room available.
+    This will take effect on startup or when user increases the `max_jobs`
+    configuration value. `rotate_jobs` is where all the action happens. The
+    scheduler picks `replicator.max_churn` running jobs to stop and then picks
+    the same number of pending jobs to start. The default value of `max_churn`
+    is 20. So by default every minute, 20 running jobs are stopped, and 20 new
+    pending jobs are started.
+
+    Before moving on it is worth pointing out that scheduler treats continuous
+    and non-continuous replications differently. Normal (non-continuous)
+    replications once started will be allowed to run to completion. That
+    behavior is to preserve their semantics of replicating a snapshot of the
+    source database to the target. For example if new documents are added to
+    the source after the replication are started, those updates should not show
+    up on the target database. Stopping and restarting a normal replication
+    would violate that constraint. The only exception to the rule is the user
+    explicitly reduces `replicator.max_jobs` configuration value. Even then
+    scheduler will first attempt to stop as many continuous jobs as possible
+    and only if it has no choice left will it stop normal jobs.
+
+    Keeping that in mind and going back to the scheduling algorithm, the next
+    interesting part is how the scheduler picks which jobs to stop and which
+    ones to start:
+
+    * Stopping: When picking jobs to stop the cheduler will pick longest
+      running continuous jobs first. The sorting callback function to get the
+      longest running jobs is unsurprisingly called `longest_running/2`. To
+      pick the longest running jobs it looks at the most recent `started`
+      event. After it gets a sorted list by longest running, it simply picks
+      first few depending on the value of `max_churn` using `lists:sublist/2`.
+      Then those jobs are stopped.
+
+    * Starting: When starting the scheduler will pick the jobs which have been
+      waiting the longest. Surprisingly, in this case it also looks at the
+      `started` timestamp and picks the jobs which have the oldest `started`
+      timestamp. If there are 3 jobs, A[started=10], B[started=7],
+      C[started=9], then B will be picked first, then C then A. This ensures
+      that jobs are not starved, which is a classic scheduling pitfall.
+
+    In the code, the list of pending jobs is picked slightly differently than
+    how the list of running jobs is picked. `pending_jobs/1` uses `ets:foldl`
+    to iterate over all the pending jobs. As it iterates it tries to keep only
+    up to `max_churn` oldest items in the accumulator. The reason this is done
+    is that there could be a very large number of pending jobs and loading them
+    all in a list (making a copy from ETS) and then sorting it can be quite
+    expensive performance-wise. The tricky part of the iteration is happening
+    in `pending_maybe_replace/2`. A `gb_sets` ordered set is used to keep top-N
+    longest waiting jobs so far. The code has a comment with a helpful example
+    on how this algorithm works.
+
+    The last part is how the scheduler treats jobs which keep crashing. If a
+    job is started but then crashes then that job is considered unhealthy. The
+    main idea is to penalize such jobs such that they are forced to wait an
+    exponentially larger amount of time with each consecutive crash. A central
+    part to this algorithm is determining what forms a sequence of consecutive
+    crashes. If a job starts then quickly crashes, and after next start it
+    crashes again, then that would become a sequence of 2 consecutive crashes.
+    The penalty then would be calcualted by `backoff_micros/1` function where
+    the consecutive crash count would end up as the exponent. However for
+    practical concerns there is also maximum penalty specified and that's the
+    equivalent of 10 consecutive crashes. Timewise it ends up being about 8
+    hours. That means even a job which keep crashing will still get a chance to
+    retry once in 8 hours.
+
+    There is subtlety when calculating consecutive crashes and that is deciding
+    when the sequence stops. That is, figuring out when a job becomes healthy
+    again. The scheduler considers a job healthy again if it started and hasn't
+    crashed in a while. The "in a while" part is a configuration parameter
+    `replicator.health_threshold` defaulting to 2 minutes. This means if job
+    has been crashing, for example 5 times in a row, but then on the 6th
+    attempt it started and ran for more than 2 minutes then it is considered
+    healthy again. The next time it crashes its sequence of consecutive crashes
+    will restart at 1.
+
+ * `couch_replicator_scheduler_sup`: This module is a supervisor for running
+   replication tasks. The most interesting thing about it is perhaps that it is
+   not used to restart children. The scheduler handles restarts and error
+   handling backoffs.
+
+ * `couch_replicator_doc_processor`: The doc procesoor component is in charge
+   of processing replication document updates, turning them into replication
+   jobs and adding those jobs to the scheduler. Unfortunately the only reason
+   there is even a `couch_replicator_doc_processor` gen_server, instead of
+   replication documents being turned to jobs and inserted into the scheduler
+   directly, is because of one corner case -- filtered replications using
+   custom (Javascript mostly) filters. More about this later. It is better to
+   start with how updates flow through the doc processor:
+
+   Document updates come via the `db_change/3` callback from
+   `couch_multidb_changes`, then go to the `process_change/2` function.
+
+   In `process_change/2` a few decisions are made regarding how to proceed. The
+   first is "ownership" check. That is a check if the replication document
+   belongs on the current node. If not, then it is ignored. In a cluster, in
+   general there would be N copies of a document change and we only want to run
+   the replication once. Another check is to see if the update has arrived
+   during a time when the cluster is considered "unstable". If so, it is
+   ignored, because soon enough a rescan will be launched and all the documents
+   will be reprocessed anyway. Another noteworthy thing in `process_change/2`
+   is handling of upgrades from the previous version of the replicator when
+   transient states were written to the documents. Two such states were
+   `triggered` and `error`. Both of those states are removed from the document
+   then then update proceeds in the regular fashion. `failed` documents are
+   also ignored here. `failed` is a terminal state which indicates the document
+   was somehow unsuitable to become a replication job (it was malforemd or a
+   duplicate). Otherwise the state update proceeds to `process_updated/2`.
+
+   `process_updated/2` is where replication document updates are parsed and
+   translated to `#rep{}` records. The interesting part here is that the
+   replication ID isn't calculated yet. Unsurprisingly the parsing function
+   used is called `parse_rep_doc_without_id/1`. Also note that up until now
+   everything is still running in the context of the `db_change/3` callback.
+   After replication filter type is determined the update gets passed to the
+   `couch_replicator_doc_processor` gen_server.
+
+   The `couch_replicator_doc_processor` gen_server's main role is to try to
+   calculate replication IDs for each `#rep{}` record passed to it, then add
+   that as a scheduler job. As noted before, `#rep{}` records parsed up until
+   this point lack a replication ID. The reason is replication ID calculation
+   includes a hash of the filter code. And because user defined replication
+   filters live in the source DB, which most likely involves a remote network
+   fetch there is a possibility of blocking and a need to handle various
+   network failures and retries. Because of that `replication_doc_processor`
+   dispatches all of that blocking and retrying to a separate `worker` process
+   (`couch_replicator_doc_processor_worker` module).
+
+   `couch_replicator_doc_processor_worker` is where replication IDs are
+   calculated for each individual doc update. There are two separate modules
+   which contain utilities related to replication ID calculation:
+   `couch_replicator_ids` and `couch_replicator_filters`. The first one
+   contains ID calculation algorithms and the second one knows how to parse and
+   fetch user filters from a remote source DB. One interesting thing about the
+   worker is that it is time-bounded and is guaranteed to not be stuck forever.
+   That's why it spawns an extra process with `spawn_monitor`, just so it can
+   do an `after` clause in receive and bound the maximum time this worker will
+   take.
+
+   A doc processor worker will either succeed or fail but never block for too
+   long. Success and failure are returned as exit values. Those are handled in
+   the `worker_returned/3` doc processor clauses. The most common pattern is
+   that a worker is spawned to add a replication job, it does so and returns a
+   `{ok, ReplicationID}` value in `worker_returned`.
+
+   In case of a filtered replication with custom user code there are two case to
+   consider:
+
+     1. Filter fetching code has failed. In that case worker returns an error.
+        But because the error could be a transient network error, another
+        worker is started to try again. It could fail and return an error
+        again, then another one is started and so on. However each consecutive
+        worker will do an exponential backoff, not unlike the scheduler code.
+        `error_backoff/1` is where the backoff period is calculated.
+        Consecutive errors are held in the `errcnt` field in the ETS table.
+
+     2. Fetchig filter code succeeds, replication ID is calculated and job is
+        added to the scheduler. However, because this is a filtered replication
+        the source database could get an updated filter. Which means
+        replication ID could change again. So the worker is spawned to
+        periodically check the filter and see if it changed. In other words doc
+        processor will do the work of checking for filtered replications, get
+        an updated filter and will then refresh the replication job (remove the
+        old one and add a new one with a different ID). The filter checking
+        interval is determined by the `filter_backoff` function. An unusual
+        thing about that function is it calculates the period based on the size
+        of the ETS table. The idea there is for a few replications in a
+        cluster, it's ok to check filter changes often. But when there are lots
+        of replications running, having each one checking their filter often is
+        not a good idea.
+
+ * `couch_replicator`: This is an unusual but useful pattern. This child is not
+   an actual process but a one-time call to the
+   `couch_replicator:ensure_rep_db_exists/0` function, executed by the
+   supervisor in the correct order (and monitored for crashes). This ensures
+   the local replicator db exists, then returns `ignore`. This pattern is
+   useful for doing setup-like things at the top level and in the correct order
+   regdaring the rest of the children in the supervisor.
+
+ * `couch_replicator_db_changes`: This process specializes and configures
+   `couch_multidb_changes` so that it looks for `_replicator` suffixed shards
+   and makes sure to restart it when node membership changes.
+
+
diff --git a/src/couch_replicator/priv/stats_descriptions.cfg b/src/couch_replicator/priv/stats_descriptions.cfg
index 2564f92..d9efb91 100644
--- a/src/couch_replicator/priv/stats_descriptions.cfg
+++ b/src/couch_replicator/priv/stats_descriptions.cfg
@@ -54,3 +54,99 @@
     {type, counter},
     {desc, <<"number of replicator workers started">>}
 ]}.
+{[couch_replicator, cluster_is_stable], [
+    {type, gauge},
+    {desc, <<"1 if cluster is stable, 0 if unstable">>}
+]}.
+{[couch_replicator, db_scans], [
+    {type, counter},
+    {desc, <<"number of times replicator db scans have been started">>}
+]}.
+{[couch_replicator, docs, dbs_created], [
+    {type, counter},
+    {desc, <<"number of db shard creations seen by replicator doc processor">>}
+]}.
+{[couch_replicator, docs, dbs_deleted], [
+    {type, counter},
+    {desc, <<"number of db shard deletions seen by replicator doc processor">>}
+]}.
+{[couch_replicator, docs, dbs_found], [
+    {type, counter},
+    {desc, <<"number of db shard found by replicator doc processor">>}
+]}.
+{[couch_replicator, docs, db_changes], [
+    {type, counter},
+    {desc, <<"number of db changes processed by replicator doc processor">>}
+]}.
+{[couch_replicator, docs, failed_state_updates], [
+    {type, counter},
+    {desc, <<"number of 'failed' state document updates">>}
+]}.
+{[couch_replicator, docs, completed_state_updates], [
+    {type, counter},
+    {desc, <<"number of 'completed' state document updates">>}
+]}.
+{[couch_replicator, jobs, adds], [
+    {type, counter},
+    {desc, <<"number of jobs added to replicator scheduler">>}
+]}.
+{[couch_replicator, jobs, duplicate_adds], [
+    {type, counter},
+    {desc, <<"number of duplicate jobs added to replicator scheduler">>}
+]}.
+{[couch_replicator, jobs, removes], [
+    {type, counter},
+    {desc, <<"number of jobs removed from replicator scheduler">>}
+]}.
+{[couch_replicator, jobs, starts], [
+    {type, counter},
+    {desc, <<"number of jobs started by replicator scheduler">>}
+]}.
+{[couch_replicator, jobs, stops], [
+    {type, counter},
+    {desc, <<"number of jobs stopped by replicator scheduler">>}
+]}.
+{[couch_replicator, jobs, crashes], [
+    {type, counter},
+    {desc, <<"number of job crashed noticed by replicator scheduler">>}
+]}.
+{[couch_replicator, jobs, running], [
+    {type, gauge},
+    {desc, <<"replicator scheduler running jobs">>}
+]}.
+{[couch_replicator, jobs, pending], [
+    {type, gauge},
+    {desc, <<"replicator scheduler pending jobs">>}
+]}.
+{[couch_replicator, jobs, crashed], [
+    {type, gauge},
+    {desc, <<"replicator scheduler crashed jobs">>}
+]}.
+{[couch_replicator, jobs, total], [
+    {type, gauge},
+    {desc, <<"total number of replicator scheduler jobs">>}
+]}.
+{[couch_replicator, connection, acquires], [
+    {type, counter},
+    {desc, <<"number of times connections are shared">>}
+]}.
+{[couch_replicator, connection, creates], [
+    {type, counter},
+    {desc, <<"number of connections created">>}
+]}.
+{[couch_replicator, connection, releases], [
+    {type, counter},
+    {desc, <<"number of times ownership of a connection is released">>}
+]}.
+{[couch_replicator, connection, owner_crashes], [
+    {type, counter},
+    {desc, <<"number of times a connection owner crashes while owning at least one connection">>}
+]}.
+{[couch_replicator, connection, worker_crashes], [
+    {type, counter},
+    {desc, <<"number of times a worker unexpectedly terminates">>}
+]}.
+{[couch_replicator, connection, closes], [
+    {type, counter},
+    {desc, <<"number of times a worker is gracefully shut down">>}
+]}.
diff --git a/src/couch_replicator/src/couch_replicator.app.src b/src/couch_replicator/src/couch_replicator.app.src
index 4f12195..18dde37 100644
--- a/src/couch_replicator/src/couch_replicator.app.src
+++ b/src/couch_replicator/src/couch_replicator.app.src
@@ -14,24 +14,15 @@
     {description, "CouchDB replicator"},
     {vsn, git},
     {mod, {couch_replicator_app, []}},
-    {modules, [
-        couch_replicator,
-        couch_replicator_api_wrap,
-        couch_replicator_app,
-        couch_replicator_httpc,
-        couch_replicator_httpd,
-        couch_replicator_job_sup,
-        couch_replicator_notifier,
-        couch_replicator_manager,
-        couch_replicator_httpc_pool,
-        couch_replicator_sup,
-        couch_replicator_utils,
-        couch_replicator_worker
-    ]},
     {registered, [
-        couch_replicator,
-        couch_replicator_manager,
-        couch_replicator_job_sup
+        couch_replicator_sup,
+        couch_replicator_rate_limiter,
+        couch_replicator_connection,
+        couch_replication,  % couch_replication_event gen_event
+        couch_replicator_clustering,
+        couch_replicator_scheduler,
+        couch_replicator_scheduler_sup,
+        couch_replicator_doc_processor
     ]},
     {applications, [
         kernel,
@@ -43,4 +34,3 @@
         couch_stats
     ]}
 ]}.
-
diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index 7f0c7ee..68de114 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -11,91 +11,67 @@
 % the License.
 
 -module(couch_replicator).
--behaviour(gen_server).
--vsn(1).
 
-% public API
--export([replicate/2]).
-
-% meant to be used only by the replicator database listener
--export([async_replicate/1]).
--export([cancel_replication/1]).
-
-% gen_server callbacks
--export([init/1, terminate/2, code_change/3]).
--export([handle_call/3, handle_cast/2, handle_info/2]).
--export([format_status/2]).
-
--export([details/1]).
+-export([
+    replicate/2,
+    ensure_rep_db_exists/0,
+    replication_states/0,
+    job/1,
+    doc/3,
+    active_doc/2,
+    info_from_doc/2,
+    restart_job/1
+]).
 
 -include_lib("couch/include/couch_db.hrl").
--include("couch_replicator_api_wrap.hrl").
 -include("couch_replicator.hrl").
-
--define(LOWEST_SEQ, 0).
-
--define(DEFAULT_CHECKPOINT_INTERVAL, 30000).
+-include("couch_replicator_api_wrap.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+-define(DESIGN_DOC_CREATION_DELAY_MSEC, 1000).
+-define(REPLICATION_STATES, [
+    initializing,  % Just added to scheduler
+    error,         % Could not be turned into a replication job
+    running,       % Scheduled and running
+    pending,       % Scheduled and waiting to run
+    crashing,      % Scheduled but crashing, backed off by the scheduler
+    completed,     % Non-continuous (normal) completed replication
+    failed         % Terminal failure, will not be retried anymore
+]).
 
 -import(couch_util, [
     get_value/2,
-    get_value/3,
-    to_binary/1
+    get_value/3
 ]).
 
--import(couch_replicator_utils, [
-    start_db_compaction_notifier/2,
-    stop_db_compaction_notifier/1
-]).
-
--record(rep_state, {
-    rep_details,
-    source_name,
-    target_name,
-    source,
-    target,
-    history,
-    checkpoint_history,
-    start_seq,
-    committed_seq,
-    current_through_seq,
-    seqs_in_progress = [],
-    highest_seq_done = {0, ?LOWEST_SEQ},
-    source_log,
-    target_log,
-    rep_starttime,
-    src_starttime,
-    tgt_starttime,
-    timer, % checkpoint timer
-    changes_queue,
-    changes_manager,
-    changes_reader,
-    workers,
-    stats = couch_replicator_stats:new(),
-    session_id,
-    source_db_compaction_notifier = nil,
-    target_db_compaction_notifier = nil,
-    source_monitor = nil,
-    target_monitor = nil,
-    source_seq = nil,
-    use_checkpoints = true,
-    checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL,
-    type = db,
-    view = nil
-}).
-
 
+-spec replicate({[_]}, any()) ->
+    {ok, {continuous, binary()}} |
+    {ok, {[_]}} |
+    {ok, {cancelled, binary()}} |
+    {error, any()} |
+    no_return().
 replicate(PostBody, Ctx) ->
-    {ok, #rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep} =
-        couch_replicator_utils:parse_rep_doc(PostBody, Ctx),
+    {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, Ctx),
+    Rep = Rep0#rep{start_time = os:timestamp()},
+    #rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep,
     case get_value(cancel, Options, false) of
     true ->
-        case get_value(id, Options, nil) of
+        CancelRepId = case get_value(id, Options, nil) of
         nil ->
-            cancel_replication(RepId);
+            RepId;
         RepId2 ->
-            cancel_replication(RepId2, UserCtx)
+            RepId2
+        end,
+        case check_authorization(CancelRepId, UserCtx) of
+        ok ->
+            cancel_replication(CancelRepId);
+        not_found ->
+            {error, not_found}
         end;
     false ->
+        check_authorization(RepId, UserCtx),
         {ok, Listener} = rep_result_listener(RepId),
         Result = do_replication_loop(Rep),
         couch_replicator_notifier:stop(Listener),
@@ -103,75 +79,27 @@ replicate(PostBody, Ctx) ->
     end.
 
 
-do_replication_loop(#rep{id = {BaseId, Ext} = Id, options = Options} = Rep) ->
-    case async_replicate(Rep) of
-    {ok, _Pid} ->
-        case get_value(continuous, Options, false) of
-        true ->
-            {ok, {continuous, ?l2b(BaseId ++ Ext)}};
-        false ->
-            wait_for_result(Id)
-        end;
-    Error ->
-        Error
-    end.
+% This is called from supervisor. Must respect supervisor protocol so
+% it returns `ignore`.
+-spec ensure_rep_db_exists() -> ignore.
+ensure_rep_db_exists() ->
+    {ok, _Db} = couch_replicator_docs:ensure_rep_db_exists(),
+    ignore.
 
 
-async_replicate(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
-    RepChildId = BaseId ++ Ext,
-    Source = couch_replicator_api_wrap:db_uri(Src),
-    Target = couch_replicator_api_wrap:db_uri(Tgt),
-    Timeout = get_value(connection_timeout, Rep#rep.options),
-    ChildSpec = {
-        RepChildId,
-        {gen_server, start_link, [?MODULE, Rep, [{timeout, Timeout}]]},
-        temporary,
-        250,
-        worker,
-        [?MODULE]
-    },
-    % All these nested cases to attempt starting/restarting a replication child
-    % are ugly and not 100% race condition free. The following patch submission
-    % is a solution:
-    %
-    % http://erlang.2086793.n4.nabble.com/PATCH-supervisor-atomically-delete-child-spec-when-child-terminates-td3226098.html
-    %
-    case supervisor:start_child(couch_replicator_job_sup, ChildSpec) of
-    {ok, Pid} ->
-        couch_log:notice("starting new replication `~s` at ~p (`~s` -> `~s`)",
-            [RepChildId, Pid, Source, Target]),
-        {ok, Pid};
-    {error, already_present} ->
-        case supervisor:restart_child(couch_replicator_job_sup, RepChildId) of
-        {ok, Pid} ->
-            couch_log:notice("restarting replication `~s` at ~p (`~s` -> `~s`)",
-                [RepChildId, Pid, Source, Target]),
-            {ok, Pid};
-        {error, running} ->
-            %% this error occurs if multiple replicators are racing
-            %% each other to start and somebody else won. Just grab
-            %% the Pid by calling start_child again.
-            timer:sleep(50 + random:uniform(100)),
-            async_replicate(Rep);
-        {error, {'EXIT', {badarg,
-            [{erlang, apply, [gen_server, start_link, undefined]} | _]}}} ->
-            % Clause to deal with a change in the supervisor module introduced
-            % in R14B02. For more details consult the thread at:
-            %     http://erlang.org/pipermail/erlang-bugs/2011-March/002273.html
-            _ = supervisor:delete_child(couch_replicator_job_sup, RepChildId),
-            async_replicate(Rep);
-        {error, _} = Error ->
-            Error
-        end;
-    {error, {already_started, Pid}} ->
-        couch_log:notice("replication `~s` already running at ~p (`~s` -> `~s`)",
-            [RepChildId, Pid, Source, Target]),
-        {ok, Pid};
-    {error, {Error, _}} ->
-        {error, Error}
+-spec do_replication_loop(#rep{}) ->
+    {ok, {continuous, binary()}} | {ok, tuple()} | {error, any()}.
+do_replication_loop(#rep{id = {BaseId, Ext} = Id, options = Options} = Rep) ->
+    ok = couch_replicator_scheduler:add_job(Rep),
+    case get_value(continuous, Options, false) of
+    true ->
+        {ok, {continuous, ?l2b(BaseId ++ Ext)}};
+    false ->
+        wait_for_result(Id)
     end.
 
 
+-spec rep_result_listener(rep_id()) -> {ok, pid()}.
 rep_result_listener(RepId) ->
     ReplyTo = self(),
     {ok, _Listener} = couch_replicator_notifier:start_link(
@@ -182,6 +110,8 @@ rep_result_listener(RepId) ->
         end).
 
 
+-spec wait_for_result(rep_id()) ->
+    {ok, {[_]}} | {error, any()}.
 wait_for_result(RepId) ->
     receive
     {finished, RepId, RepResult} ->
@@ -191,847 +121,276 @@ wait_for_result(RepId) ->
     end.
 
 
-cancel_replication({BaseId, Extension}) ->
-    FullRepId = BaseId ++ Extension,
-    couch_log:notice("Canceling replication `~s`...", [FullRepId]),
-    case supervisor:terminate_child(couch_replicator_job_sup, FullRepId) of
-    ok ->
-        couch_log:notice("Replication `~s` canceled.", [FullRepId]),
-        case supervisor:delete_child(couch_replicator_job_sup, FullRepId) of
-            ok ->
-                {ok, {cancelled, ?l2b(FullRepId)}};
-            {error, not_found} ->
-                {ok, {cancelled, ?l2b(FullRepId)}};
-            Error ->
-                Error
-        end;
-    Error ->
-        couch_log:error("Error canceling replication `~s`: ~p", [FullRepId, Error]),
-        Error
+-spec cancel_replication(rep_id()) ->
+    {ok, {cancelled, binary()}} | {error, not_found}.
+cancel_replication({BasedId, Extension} = RepId) ->
+    FullRepId = BasedId ++ Extension,
+    couch_log:notice("Canceling replication '~s' ...", [FullRepId]),
+    case couch_replicator_scheduler:rep_state(RepId) of
+    #rep{} ->
+        ok = couch_replicator_scheduler:remove_job(RepId),
+        couch_log:notice("Replication '~s' cancelled", [FullRepId]),
+        {ok, {cancelled, ?l2b(FullRepId)}};
+    nil ->
+        couch_log:notice("Replication '~s' not found", [FullRepId]),
+        {error, not_found}
     end.
 
-cancel_replication(RepId, #user_ctx{name = Name, roles = Roles}) ->
-    case lists:member(<<"_admin">>, Roles) of
-    true ->
-        cancel_replication(RepId);
-    false ->
-        case find_replicator(RepId) of
-        {ok, Pid} ->
-            case details(Pid) of
-            {ok, #rep{user_ctx = #user_ctx{name = Name}}} ->
-                cancel_replication(RepId);
-            {ok, _} ->
-                throw({unauthorized,
-                    <<"Can't cancel a replication triggered by another user">>});
-            Error ->
-                Error
-            end;
-        Error ->
-            Error
-        end
-    end.
 
-find_replicator({BaseId, Ext} = _RepId) ->
-    case lists:keysearch(
-        BaseId ++ Ext, 1, supervisor:which_children(couch_replicator_job_sup)) of
-    {value, {_, Pid, _, _}} when is_pid(Pid) ->
-            {ok, Pid};
-    _ ->
-            {error, not_found}
-    end.
-
-details(Pid) ->
-    case (catch gen_server:call(Pid, get_details)) of
-    {ok, Rep} ->
-        {ok, Rep};
-    {'EXIT', {noproc, {gen_server, call, _}}} ->
-        {error, not_found};
-    Error ->
-        throw(Error)
-    end.
-
-init(InitArgs) ->
-    {ok, InitArgs, 0}.
-
-do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
-    process_flag(trap_exit, true),
-
-    random:seed(os:timestamp()),
-
-    #rep_state{
-        source = Source,
-        target = Target,
-        source_name = SourceName,
-        target_name = TargetName,
-        start_seq = {_Ts, StartSeq},
-        committed_seq = {_, CommittedSeq},
-        highest_seq_done = {_, HighestSeq},
-        checkpoint_interval = CheckpointInterval
-    } = State = init_state(Rep),
-
-    NumWorkers = get_value(worker_processes, Options),
-    BatchSize = get_value(worker_batch_size, Options),
-    {ok, ChangesQueue} = couch_work_queue:new([
-        {max_items, BatchSize * NumWorkers * 2},
-        {max_size, 100 * 1024 * NumWorkers}
-    ]),
-    % This starts the _changes reader process. It adds the changes from
-    % the source db to the ChangesQueue.
-    {ok, ChangesReader} = couch_replicator_changes_reader:start_link(
-        StartSeq, Source, ChangesQueue, Options
-    ),
-    % Changes manager - responsible for dequeing batches from the changes queue
-    % and deliver them to the worker processes.
-    ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize),
-    % This starts the worker processes. They ask the changes queue manager for a
-    % a batch of _changes rows to process -> check which revs are missing in the
-    % target, and for the missing ones, it copies them from the source to the target.
-    MaxConns = get_value(http_connections, Options),
-    Workers = lists:map(
-        fun(_) ->
-            couch_stats:increment_counter([couch_replicator, workers_started]),
-            {ok, Pid} = couch_replicator_worker:start_link(
-                self(), Source, Target, ChangesManager, MaxConns),
-            Pid
-        end,
-        lists:seq(1, NumWorkers)),
+-spec replication_states() -> [atom()].
+replication_states() ->
+    ?REPLICATION_STATES.
 
-    couch_task_status:add_task([
-        {type, replication},
-        {user, UserCtx#user_ctx.name},
-        {replication_id, ?l2b(BaseId ++ Ext)},
-        {database, Rep#rep.db_name},
-        {doc_id, Rep#rep.doc_id},
-        {source, ?l2b(SourceName)},
-        {target, ?l2b(TargetName)},
-        {continuous, get_value(continuous, Options, false)},
-        {revisions_checked, 0},
-        {missing_revisions_found, 0},
-        {docs_read, 0},
-        {docs_written, 0},
-        {changes_pending, get_pending_count(State)},
-        {doc_write_failures, 0},
-        {source_seq, HighestSeq},
-        {checkpointed_source_seq, CommittedSeq},
-        {checkpoint_interval, CheckpointInterval}
-    ]),
-    couch_task_status:set_update_frequency(1000),
 
-    % Until OTP R14B03:
-    %
-    % Restarting a temporary supervised child implies that the original arguments
-    % (#rep{} record) specified in the MFA component of the supervisor
-    % child spec will always be used whenever the child is restarted.
-    % This implies the same replication performance tunning parameters will
-    % always be used. The solution is to delete the child spec (see
-    % cancel_replication/1) and then start the replication again, but this is
-    % unfortunately not immune to race conditions.
-
-    couch_log:notice("Replication `~p` is using:~n"
-        "~c~p worker processes~n"
-        "~ca worker batch size of ~p~n"
-        "~c~p HTTP connections~n"
-        "~ca connection timeout of ~p milliseconds~n"
-        "~c~p retries per request~n"
-        "~csocket options are: ~s~s",
-        [BaseId ++ Ext, $\t, NumWorkers, $\t, BatchSize, $\t,
-            MaxConns, $\t, get_value(connection_timeout, Options),
-            $\t, get_value(retries, Options),
-            $\t, io_lib:format("~p", [get_value(socket_options, Options)]),
-            case StartSeq of
-            ?LOWEST_SEQ ->
-                "";
-            _ ->
-                io_lib:format("~n~csource start sequence ~p", [$\t, StartSeq])
-            end]),
-
-    couch_log:debug("Worker pids are: ~p", [Workers]),
-
-    couch_replicator_manager:replication_started(Rep),
-
-    {ok, State#rep_state{
-            changes_queue = ChangesQueue,
-            changes_manager = ChangesManager,
-            changes_reader = ChangesReader,
-            workers = Workers
-        }
-    }.
-
-adjust_maxconn(Src = #httpdb{http_connections = 1}, RepId) ->
-    Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p",
-    couch_log:notice(Msg, [RepId]),
-    Src#httpdb{http_connections = 2};
-
-adjust_maxconn(Src, _RepId) ->
-    Src.
-
-handle_info(shutdown, St) ->
-    {stop, shutdown, St};
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
-    couch_log:error("Source database is down. Reason: ~p", [Why]),
-    {stop, source_db_down, St};
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
-    couch_log:error("Target database is down. Reason: ~p", [Why]),
-    {stop, target_db_down, St};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
-    {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
-    couch_stats:increment_counter([couch_replicator, changes_reader_deaths]),
-    couch_log:error("ChangesReader process died with reason: ~p", [Reason]),
-    {stop, changes_reader_died, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
-    {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) ->
-    couch_stats:increment_counter([couch_replicator, changes_manager_deaths]),
-    couch_log:error("ChangesManager process died with reason: ~p", [Reason]),
-    {stop, changes_manager_died, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
-    {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
-    couch_stats:increment_counter([couch_replicator, changes_queue_deaths]),
-    couch_log:error("ChangesQueue process died with reason: ~p", [Reason]),
-    {stop, changes_queue_died, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
-    case Workers -- [Pid] of
-    Workers ->
-        couch_log:error("unknown pid bit the dust ~p ~n",[Pid]),
-        {noreply, State#rep_state{workers = Workers}};
-        %% not clear why a stop was here before
-        %%{stop, {unknown_process_died, Pid, normal}, State};
-    [] ->
-        catch unlink(State#rep_state.changes_manager),
-        catch exit(State#rep_state.changes_manager, kill),
-        do_last_checkpoint(State);
-    Workers2 ->
-        {noreply, State#rep_state{workers = Workers2}}
-    end;
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
-    State2 = cancel_timer(State),
-    case lists:member(Pid, Workers) of
-    false ->
-        {stop, {unknown_process_died, Pid, Reason}, State2};
-    true ->
-        couch_stats:increment_counter([couch_replicator, worker_deaths]),
-        couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]),
-        {stop, {worker_died, Pid, Reason}, State2}
-    end;
-
-handle_info(timeout, InitArgs) ->
-    try do_init(InitArgs) of {ok, State} ->
-        {noreply, State}
-    catch Class:Error ->
-        Stack = erlang:get_stacktrace(),
-        {stop, shutdown, {error, Class, Error, Stack, InitArgs}}
+-spec strip_url_creds(binary() | {[_]}) -> binary().
+strip_url_creds(Endpoint) ->
+    case couch_replicator_docs:parse_rep_db(Endpoint, [], []) of
+        #httpdb{url=Url} ->
+            iolist_to_binary(couch_util:url_strip_password(Url));
+        LocalDb when is_binary(LocalDb) ->
+            LocalDb
     end.
 
-handle_call(get_details, _From, #rep_state{rep_details = Rep} = State) ->
-    {reply, {ok, Rep}, State};
-
-handle_call({add_stats, Stats}, From, State) ->
-    gen_server:reply(From, ok),
-    NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats),
-    {noreply, State#rep_state{stats = NewStats}};
-
-handle_call({report_seq_done, Seq, StatsInc}, From,
-    #rep_state{seqs_in_progress = SeqsInProgress, highest_seq_done = HighestDone,
-        current_through_seq = ThroughSeq, stats = Stats} = State) ->
-    gen_server:reply(From, ok),
-    {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of
-    [] ->
-        {Seq, []};
-    [Seq | Rest] ->
-        {Seq, Rest};
-    [_ | _] ->
-        {ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)}
-    end,
-    NewHighestDone = lists:max([HighestDone, Seq]),
-    NewThroughSeq = case NewSeqsInProgress of
-    [] ->
-        lists:max([NewThroughSeq0, NewHighestDone]);
-    _ ->
-        NewThroughSeq0
-    end,
-    couch_log:debug("Worker reported seq ~p, through seq was ~p, "
-        "new through seq is ~p, highest seq done was ~p, "
-        "new highest seq done is ~p~n"
-        "Seqs in progress were: ~p~nSeqs in progress are now: ~p",
-        [Seq, ThroughSeq, NewThroughSeq, HighestDone,
-            NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
-    NewState = State#rep_state{
-        stats = couch_replicator_utils:sum_stats(Stats, StatsInc),
-        current_through_seq = NewThroughSeq,
-        seqs_in_progress = NewSeqsInProgress,
-        highest_seq_done = NewHighestDone
-    },
-    update_task(NewState),
-    {noreply, NewState}.
-
-
-handle_cast({db_compacted, DbName},
-    #rep_state{source = #db{name = DbName} = Source} = State) ->
-    {ok, NewSource} = couch_db:reopen(Source),
-    {noreply, State#rep_state{source = NewSource}};
-
-handle_cast({db_compacted, DbName},
-    #rep_state{target = #db{name = DbName} = Target} = State) ->
-    {ok, NewTarget} = couch_db:reopen(Target),
-    {noreply, State#rep_state{target = NewTarget}};
-
-handle_cast(checkpoint, State) ->
-    #rep_state{rep_details = #rep{} = Rep} = State,
-    case couch_replicator_manager:continue(Rep) of
-    {true, _} ->
-        case do_checkpoint(State) of
-        {ok, NewState} ->
-            couch_stats:increment_counter([couch_replicator, checkpoints, success]),
-            {noreply, NewState#rep_state{timer = start_timer(State)}};
-        Error ->
-            couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
-            {stop, Error, State}
-        end;
-    {false, Owner} ->
-        couch_replicator_manager:replication_usurped(Rep, Owner),
-        {stop, shutdown, State}
-    end;
-
-handle_cast({report_seq, Seq},
-    #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
-    NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress),
-    {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}}.
-
-
-code_change(_OldVsn, #rep_state{}=State, _Extra) ->
-    {ok, State}.
-
-
-headers_strip_creds([], Acc) ->
-    lists:reverse(Acc);
-headers_strip_creds([{Key, Value0} | Rest], Acc) ->
-    Value = case string:to_lower(Key) of
-    "authorization" ->
-        "****";
-    _ ->
-        Value0
-    end,
-    headers_strip_creds(Rest, [{Key, Value} | Acc]).
-
 
-httpdb_strip_creds(#httpdb{url = Url, headers = Headers} = HttpDb) ->
-    HttpDb#httpdb{
-        url = couch_util:url_strip_password(Url),
-        headers = headers_strip_creds(Headers, [])
-    };
-httpdb_strip_creds(LocalDb) ->
-    LocalDb.
-
-
-rep_strip_creds(#rep{source = Source, target = Target} = Rep) ->
-    Rep#rep{
-        source = httpdb_strip_creds(Source),
-        target = httpdb_strip_creds(Target)
-    }.
-
-
-state_strip_creds(#rep_state{rep_details = Rep, source = Source, target = Target} = State) ->
-    % #rep_state contains the source and target at the top level and also
-    % in the nested #rep_details record
-    State#rep_state{
-        rep_details = rep_strip_creds(Rep),
-        source = httpdb_strip_creds(Source),
-        target = httpdb_strip_creds(Target)
-    }.
-
-
-terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep,
-    checkpoint_history = CheckpointHistory} = State) ->
-    terminate_cleanup(State),
-    couch_replicator_notifier:notify({finished, RepId, CheckpointHistory}),
-    couch_replicator_manager:replication_completed(Rep, rep_stats(State));
-
-terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
-    % cancelled replication throught ?MODULE:cancel_replication/1
-    couch_replicator_notifier:notify({error, RepId, <<"cancelled">>}),
-    terminate_cleanup(State);
-
-terminate(shutdown, {error, Class, Error, Stack, InitArgs}) ->
-    #rep{id=RepId} = InitArgs,
-    couch_stats:increment_counter([couch_replicator, failed_starts]),
-    CleanInitArgs = rep_strip_creds(InitArgs),
-    couch_log:error("~p:~p: Replication failed to start for args ~p: ~p",
-             [Class, Error, CleanInitArgs, Stack]),
-    case Error of
-    {unauthorized, DbUri} ->
-        NotifyError = {unauthorized, <<"unauthorized to access or create database ", DbUri/binary>>};
-    {db_not_found, DbUri} ->
-        NotifyError = {db_not_found, <<"could not open ", DbUri/binary>>};
-    _ ->
-        NotifyError = Error
-    end,
-    couch_replicator_notifier:notify({error, RepId, NotifyError}),
-    couch_replicator_manager:replication_error(InitArgs, NotifyError);
-terminate(Reason, State) ->
-    #rep_state{
-        source_name = Source,
-        target_name = Target,
-        rep_details = #rep{id = {BaseId, Ext} = RepId} = Rep
-    } = State,
-    couch_log:error("Replication `~s` (`~s` -> `~s`) failed: ~s",
-        [BaseId ++ Ext, Source, Target, to_binary(Reason)]),
-    terminate_cleanup(State),
-    couch_replicator_notifier:notify({error, RepId, Reason}),
-    couch_replicator_manager:replication_error(Rep, Reason).
-
-terminate_cleanup(State) ->
-    update_task(State),
-    stop_db_compaction_notifier(State#rep_state.source_db_compaction_notifier),
-    stop_db_compaction_notifier(State#rep_state.target_db_compaction_notifier),
-    couch_replicator_api_wrap:db_close(State#rep_state.source),
-    couch_replicator_api_wrap:db_close(State#rep_state.target).
-
-
-format_status(_Opt, [_PDict, State]) ->
-    [{data, [{"State", state_strip_creds(State)}]}].
-
-
-do_last_checkpoint(#rep_state{seqs_in_progress = [],
-    highest_seq_done = {_Ts, ?LOWEST_SEQ}} = State) ->
-    {stop, normal, cancel_timer(State)};
-do_last_checkpoint(#rep_state{seqs_in_progress = [],
-    highest_seq_done = Seq} = State) ->
-    case do_checkpoint(State#rep_state{current_through_seq = Seq}) of
-    {ok, NewState} ->
-        couch_stats:increment_counter([couch_replicator, checkpoints, success]),
-        {stop, normal, cancel_timer(NewState)};
-    Error ->
-        couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
-        {stop, Error, State}
+-spec job(binary()) -> {ok, {[_]}} | {error, not_found}.
+job(JobId0) when is_binary(JobId0) ->
+    JobId = couch_replicator_ids:convert(JobId0),
+    {Res, _Bad} = rpc:multicall(couch_replicator_scheduler, job, [JobId]),
+    case [JobInfo || {ok, JobInfo} <- Res] of
+        [JobInfo| _] ->
+            {ok, JobInfo};
+        [] ->
+            {error, not_found}
     end.
 
 
-start_timer(State) ->
-    After = State#rep_state.checkpoint_interval,
-    case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of
-    {ok, Ref} ->
-        Ref;
-    Error ->
-        couch_log:error("Replicator, error scheduling checkpoint:  ~p", [Error]),
-        nil
+-spec restart_job(binary() | list() | rep_id()) ->
+    {ok, {[_]}} | {error, not_found}.
+restart_job(JobId0) ->
+    JobId = couch_replicator_ids:convert(JobId0),
+    {Res, _} = rpc:multicall(couch_replicator_scheduler, restart_job, [JobId]),
+    case [JobInfo || {ok, JobInfo} <- Res] of
+        [JobInfo| _] ->
+            {ok, JobInfo};
+        [] ->
+            {error, not_found}
     end.
 
 
-cancel_timer(#rep_state{timer = nil} = State) ->
-    State;
-cancel_timer(#rep_state{timer = Timer} = State) ->
-    {ok, cancel} = timer:cancel(Timer),
-    State#rep_state{timer = nil}.
-
-
-init_state(Rep) ->
-    #rep{
-        id = {BaseId, _Ext},
-        source = Src0, target = Tgt,
-        options = Options, user_ctx = UserCtx,
-        type = Type, view = View
-    } = Rep,
-    % Adjust minimum number of http source connections to 2 to avoid deadlock
-    Src = adjust_maxconn(Src0, BaseId),
-    {ok, Source} = couch_replicator_api_wrap:db_open(Src, [{user_ctx, UserCtx}]),
-    {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}],
-        get_value(create_target, Options, false)),
-
-    {ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source),
-    {ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target),
-
-    [SourceLog, TargetLog] = find_replication_logs([Source, Target], Rep),
-
-    {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog),
-    StartSeq1 = get_value(since_seq, Options, StartSeq0),
-    StartSeq = {0, StartSeq1},
-
-    SourceSeq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ),
-
-    #doc{body={CheckpointHistory}} = SourceLog,
-    State = #rep_state{
-        rep_details = Rep,
-        source_name = couch_replicator_api_wrap:db_uri(Source),
-        target_name = couch_replicator_api_wrap:db_uri(Target),
-        source = Source,
-        target = Target,
-        history = History,
-        checkpoint_history = {[{<<"no_changes">>, true}| CheckpointHistory]},
-        start_seq = StartSeq,
-        current_through_seq = StartSeq,
-        committed_seq = StartSeq,
-        source_log = SourceLog,
-        target_log = TargetLog,
-        rep_starttime = httpd_util:rfc1123_date(),
-        src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
-        tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
-        session_id = couch_uuids:random(),
-        source_db_compaction_notifier =
-            start_db_compaction_notifier(Source, self()),
-        target_db_compaction_notifier =
-            start_db_compaction_notifier(Target, self()),
-        source_monitor = db_monitor(Source),
-        target_monitor = db_monitor(Target),
-        source_seq = SourceSeq,
-        use_checkpoints = get_value(use_checkpoints, Options, true),
-        checkpoint_interval = get_value(checkpoint_interval, Options,
-                                        ?DEFAULT_CHECKPOINT_INTERVAL),
-        type = Type,
-        view = View
-    },
-    State#rep_state{timer = start_timer(State)}.
-
+-spec active_doc(binary(), binary()) -> {ok, {[_]}} | {error, not_found}.
+active_doc(DbName, DocId) ->
+    try
+        Shards = mem3:shards(DbName),
+        Live = [node() | nodes()],
+        Nodes = lists:usort([N || #shard{node=N} <- Shards,
+            lists:member(N, Live)]),
+        Owner = couch_replicator_clustering:owner(DbName, DocId, Nodes),
+        case active_doc_rpc(DbName, DocId, [Owner]) of
+            {ok, DocInfo} ->
+                {ok, DocInfo};
+            {error, not_found} ->
+                active_doc_rpc(DbName, DocId, Nodes -- Owner)
+        end
+    catch
+        % Might be a local database
+        error:database_does_not_exist ->
+            active_doc_rpc(DbName, DocId, [node()])
+    end.
 
-find_replication_logs(DbList, #rep{id = {BaseId, _}} = Rep) ->
-    LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId),
-    fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, Rep, []).
 
+-spec active_doc_rpc(binary(), binary(), [node()]) ->
+    {ok, {[_]}} | {error, not_found}.
+active_doc_rpc(_DbName, _DocId, []) ->
+    {error, not_found};
+active_doc_rpc(DbName, DocId, [Node]) when Node =:= node() ->
+    couch_replicator_doc_processor:doc(DbName, DocId);
+active_doc_rpc(DbName, DocId, Nodes) ->
+    {Res, _Bad} = rpc:multicall(Nodes, couch_replicator_doc_processor, doc,
+        [DbName, DocId]),
+    case [DocInfo || {ok, DocInfo} <- Res] of
+        [DocInfo | _] ->
+            {ok, DocInfo};
+        [] ->
+            {error, not_found}
+    end.
 
-fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) ->
-    lists:reverse(Acc);
 
-fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
-    case couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body]) of
-    {error, <<"not_found">>} when Vsn > 1 ->
-        OldRepId = couch_replicator_utils:replication_id(Rep, Vsn - 1),
-        fold_replication_logs(Dbs, Vsn - 1,
-            ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, Rep, Acc);
-    {error, <<"not_found">>} ->
-        fold_replication_logs(
-            Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [#doc{id = NewId} | Acc]);
-    {ok, Doc} when LogId =:= NewId ->
-        fold_replication_logs(
-            Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Doc | Acc]);
-    {ok, Doc} ->
-        MigratedLog = #doc{id = NewId, body = Doc#doc.body},
-        fold_replication_logs(
-            Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [MigratedLog | Acc])
+-spec doc(binary(), binary(), any()) -> {ok, {[_]}} | {error, not_found}.
+doc(RepDb, DocId, UserCtx) ->
+    case active_doc(RepDb, DocId) of
+        {ok, DocInfo} ->
+            {ok, DocInfo};
+        {error, not_found} ->
+            doc_from_db(RepDb, DocId, UserCtx)
     end.
 
 
-spawn_changes_manager(Parent, ChangesQueue, BatchSize) ->
-    spawn_link(fun() ->
-        changes_manager_loop_open(Parent, ChangesQueue, BatchSize, 1)
-    end).
-
-changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) ->
-    receive
-    {get_changes, From} ->
-        case couch_work_queue:dequeue(ChangesQueue, BatchSize) of
-        closed ->
-            From ! {closed, self()};
-        {ok, Changes} ->
-            #doc_info{high_seq = Seq} = lists:last(Changes),
-            ReportSeq = {Ts, Seq},
-            ok = gen_server:cast(Parent, {report_seq, ReportSeq}),
-            From ! {changes, self(), Changes, ReportSeq}
-        end,
-        changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts + 1)
+-spec doc_from_db(binary(), binary(), any()) -> {ok, {[_]}} | {error, not_found}.
+doc_from_db(RepDb, DocId, UserCtx) ->
+    case fabric:open_doc(RepDb, DocId, [UserCtx, ejson_body]) of
+        {ok, Doc} ->
+            {ok, info_from_doc(RepDb, couch_doc:to_json_obj(Doc, []))};
+         {not_found, _Reason} ->
+            {error, not_found}
     end.
 
 
-do_checkpoint(#rep_state{use_checkpoints=false} = State) ->
-    NewState = State#rep_state{checkpoint_history = {[{<<"use_checkpoints">>, false}]} },
-    {ok, NewState};
-do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=Seq} = State) ->
-    update_task(State),
-    {ok, State};
-do_checkpoint(State) ->
-    #rep_state{
-        source_name=SourceName,
-        target_name=TargetName,
-        source = Source,
-        target = Target,
-        history = OldHistory,
-        start_seq = {_, StartSeq},
-        current_through_seq = {_Ts, NewSeq} = NewTsSeq,
-        source_log = SourceLog,
-        target_log = TargetLog,
-        rep_starttime = ReplicationStartTime,
-        src_starttime = SrcInstanceStartTime,
-        tgt_starttime = TgtInstanceStartTime,
-        stats = Stats,
-        rep_details = #rep{options = Options},
-        session_id = SessionId
-    } = State,
-    case commit_to_both(Source, Target) of
-    {source_error, Reason} ->
-         {checkpoint_commit_failure,
-             <<"Failure on source commit: ", (to_binary(Reason))/binary>>};
-    {target_error, Reason} ->
-         {checkpoint_commit_failure,
-             <<"Failure on target commit: ", (to_binary(Reason))/binary>>};
-    {SrcInstanceStartTime, TgtInstanceStartTime} ->
-        couch_log:notice("recording a checkpoint for `~s` -> `~s` at source update_seq ~p",
-            [SourceName, TargetName, NewSeq]),
-        StartTime = ?l2b(ReplicationStartTime),
-        EndTime = ?l2b(httpd_util:rfc1123_date()),
-        NewHistoryEntry = {[
-            {<<"session_id">>, SessionId},
-            {<<"start_time">>, StartTime},
-            {<<"end_time">>, EndTime},
-            {<<"start_last_seq">>, StartSeq},
-            {<<"end_last_seq">>, NewSeq},
-            {<<"recorded_seq">>, NewSeq},
-            {<<"missing_checked">>, couch_replicator_stats:missing_checked(Stats)},
-            {<<"missing_found">>, couch_replicator_stats:missing_found(Stats)},
-            {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)},
-            {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)},
-            {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)}
-        ]},
-        BaseHistory = [
-            {<<"session_id">>, SessionId},
-            {<<"source_last_seq">>, NewSeq},
-            {<<"replication_id_version">>, ?REP_ID_VERSION}
-        ] ++ case get_value(doc_ids, Options) of
-        undefined ->
-            [];
-        _DocIds ->
-            % backwards compatibility with the result of a replication by
-            % doc IDs in versions 0.11.x and 1.0.x
-            % TODO: deprecate (use same history format, simplify code)
-            [
-                {<<"start_time">>, StartTime},
-                {<<"end_time">>, EndTime},
-                {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)},
-                {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)},
-                {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)}
-            ]
-        end,
-        % limit history to 50 entries
-        NewRepHistory = {
-            BaseHistory ++
-            [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}]
-        },
-
-        try
-            {SrcRevPos, SrcRevId} = update_checkpoint(
-                Source, SourceLog#doc{body = NewRepHistory}, source),
-            {TgtRevPos, TgtRevId} = update_checkpoint(
-                Target, TargetLog#doc{body = NewRepHistory}, target),
-            NewState = State#rep_state{
-                checkpoint_history = NewRepHistory,
-                committed_seq = NewTsSeq,
-                source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
-                target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
-            },
-            update_task(NewState),
-            {ok, NewState}
-        catch throw:{checkpoint_commit_failure, _} = Failure ->
-            Failure
-        end;
-    {SrcInstanceStartTime, _NewTgtInstanceStartTime} ->
-        {checkpoint_commit_failure, <<"Target database out of sync. "
-            "Try to increase max_dbs_open at the target's server.">>};
-    {_NewSrcInstanceStartTime, TgtInstanceStartTime} ->
-        {checkpoint_commit_failure, <<"Source database out of sync. "
-            "Try to increase max_dbs_open at the source's server.">>};
-    {_NewSrcInstanceStartTime, _NewTgtInstanceStartTime} ->
-        {checkpoint_commit_failure, <<"Source and target databases out of "
-            "sync. Try to increase max_dbs_open at both servers.">>}
+-spec info_from_doc(binary(), {[_]}) -> {[_]}.
+info_from_doc(RepDb, {Props}) ->
+    DocId = get_value(<<"_id">>, Props),
+    Source = get_value(<<"source">>, Props),
+    Target = get_value(<<"target">>, Props),
+    State0 = state_atom(get_value(<<"_replication_state">>, Props, null)),
+    StateTime = get_value(<<"_replication_state_time">>, Props, null),
+    {State1, StateInfo, ErrorCount, StartTime} = case State0 of
+        completed ->
+            {InfoP} = get_value(<<"_replication_stats">>, Props, {[]}),
+            case lists:keytake(<<"start_time">>, 1, InfoP) of
+                {value, {_, Time}, InfoP1} ->
+                    {State0, {InfoP1}, 0, Time};
+                false ->
+                    case lists:keytake(start_time, 1, InfoP) of
+                        {value, {_, Time}, InfoP1} ->
+                            {State0, {InfoP1}, 0, Time};
+                        false ->
+                            {State0, {InfoP}, 0, null}
+                        end
+            end;
+        failed ->
+            Info = get_value(<<"_replication_state_reason">>, Props, null),
+            {State0, Info, 1, StateTime};
+        _OtherState ->
+            {null, null, 0, null}
+    end,
+    {[
+        {doc_id, DocId},
+        {database, RepDb},
+        {id, null},
+        {source, strip_url_creds(Source)},
+        {target, strip_url_creds(Target)},
+        {state, State1},
+        {error_count, ErrorCount},
+        {info, StateInfo},
+        {start_time, StartTime},
+        {last_updated, StateTime}
+     ]}.
+
+
+state_atom(<<"triggered">>) ->
+    triggered;  % This handles a legacy case were document wasn't converted yet
+state_atom(State) when is_binary(State) ->
+    erlang:binary_to_existing_atom(State, utf8);
+state_atom(State) when is_atom(State) ->
+    State.
+
+
+-spec check_authorization(rep_id(), #user_ctx{}) -> ok | not_found.
+check_authorization(RepId, #user_ctx{name = Name} = Ctx) ->
+    case couch_replicator_scheduler:rep_state(RepId) of
+    #rep{user_ctx = #user_ctx{name = Name}} ->
+        ok;
+    #rep{} ->
+        couch_httpd:verify_is_server_admin(Ctx);
+    nil ->
+        not_found
     end.
 
 
-update_checkpoint(Db, Doc, DbType) ->
-    try
-        update_checkpoint(Db, Doc)
-    catch throw:{checkpoint_commit_failure, Reason} ->
-        throw({checkpoint_commit_failure,
-            <<"Error updating the ", (to_binary(DbType))/binary,
-                " checkpoint document: ", (to_binary(Reason))/binary>>})
-    end.
+-ifdef(TEST).
 
-update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) ->
-    try
-        case couch_replicator_api_wrap:update_doc(Db, Doc, [delay_commit]) of
-        {ok, PosRevId} ->
-            PosRevId;
-        {error, Reason} ->
-            throw({checkpoint_commit_failure, Reason})
-        end
-    catch throw:conflict ->
-        case (catch couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body])) of
-        {ok, #doc{body = LogBody, revs = {Pos, [RevId | _]}}} ->
-            % This means that we were able to update successfully the
-            % checkpoint doc in a previous attempt but we got a connection
-            % error (timeout for e.g.) before receiving the success response.
-            % Therefore the request was retried and we got a conflict, as the
-            % revision we sent is not the current one.
-            % We confirm this by verifying the doc body we just got is the same
-            % that we have just sent.
-            {Pos, RevId};
-        _ ->
-            throw({checkpoint_commit_failure, conflict})
-        end
-    end.
+-include_lib("eunit/include/eunit.hrl").
 
+authorization_test_() ->
+    {
+        foreach,
+        fun () -> ok end,
+        fun (_) -> meck:unload() end,
+        [
+            t_admin_is_always_authorized(),
+            t_username_must_match(),
+            t_replication_not_found()
+        ]
+    }.
 
-commit_to_both(Source, Target) ->
-    % commit the src async
-    ParentPid = self(),
-    SrcCommitPid = spawn_link(
-        fun() ->
-            Result = (catch couch_replicator_api_wrap:ensure_full_commit(Source)),
-            ParentPid ! {self(), Result}
-        end),
 
-    % commit tgt sync
-    TargetResult = (catch couch_replicator_api_wrap:ensure_full_commit(Target)),
+t_admin_is_always_authorized() ->
+    ?_test(begin
+        expect_rep_user_ctx(<<"someuser">>, <<"_admin">>),
+        UserCtx = #user_ctx{name = <<"adm">>, roles = [<<"_admin">>]},
+        ?assertEqual(ok, check_authorization(<<"RepId">>, UserCtx))
+    end).
 
-    SourceResult = receive
-    {SrcCommitPid, Result} ->
-        unlink(SrcCommitPid),
-        receive {'EXIT', SrcCommitPid, _} -> ok after 0 -> ok end,
-        Result;
-    {'EXIT', SrcCommitPid, Reason} ->
-        {error, Reason}
-    end,
-    case TargetResult of
-    {ok, TargetStartTime} ->
-        case SourceResult of
-        {ok, SourceStartTime} ->
-            {SourceStartTime, TargetStartTime};
-        SourceError ->
-            {source_error, SourceError}
-        end;
-    TargetError ->
-        {target_error, TargetError}
-    end.
 
+t_username_must_match() ->
+     ?_test(begin
+        expect_rep_user_ctx(<<"user">>, <<"somerole">>),
+        UserCtx1 = #user_ctx{name = <<"user">>, roles = [<<"somerole">>]},
+        ?assertEqual(ok, check_authorization(<<"RepId">>, UserCtx1)),
+        UserCtx2 = #user_ctx{name = <<"other">>, roles = [<<"somerole">>]},
+        ?assertThrow({unauthorized, _}, check_authorization(<<"RepId">>,
+            UserCtx2))
+    end).
 
-compare_replication_logs(SrcDoc, TgtDoc) ->
-    #doc{body={RepRecProps}} = SrcDoc,
-    #doc{body={RepRecPropsTgt}} = TgtDoc,
-    case get_value(<<"session_id">>, RepRecProps) ==
-            get_value(<<"session_id">>, RepRecPropsTgt) of
-    true ->
-        % if the records have the same session id,
-        % then we have a valid replication history
-        OldSeqNum = get_value(<<"source_last_seq">>, RepRecProps, ?LOWEST_SEQ),
-        OldHistory = get_value(<<"history">>, RepRecProps, []),
-        {OldSeqNum, OldHistory};
-    false ->
-        SourceHistory = get_value(<<"history">>, RepRecProps, []),
-        TargetHistory = get_value(<<"history">>, RepRecPropsTgt, []),
-        couch_log:notice("Replication records differ. "
-                "Scanning histories to find a common ancestor.", []),
-        couch_log:debug("Record on source:~p~nRecord on target:~p~n",
-                [RepRecProps, RepRecPropsTgt]),
-        compare_rep_history(SourceHistory, TargetHistory)
-    end.
 
-compare_rep_history(S, T) when S =:= [] orelse T =:= [] ->
-    couch_log:notice("no common ancestry -- performing full replication", []),
-    {?LOWEST_SEQ, []};
-compare_rep_history([{S} | SourceRest], [{T} | TargetRest] = Target) ->
-    SourceId = get_value(<<"session_id">>, S),
-    case has_session_id(SourceId, Target) of
-    true ->
-        RecordSeqNum = get_value(<<"recorded_seq">>, S, ?LOWEST_SEQ),
-        couch_log:notice("found a common replication record with source_seq ~p",
-            [RecordSeqNum]),
-        {RecordSeqNum, SourceRest};
-    false ->
-        TargetId = get_value(<<"session_id">>, T),
-        case has_session_id(TargetId, SourceRest) of
-        true ->
-            RecordSeqNum = get_value(<<"recorded_seq">>, T, ?LOWEST_SEQ),
-            couch_log:notice("found a common replication record with source_seq ~p",
-                [RecordSeqNum]),
-            {RecordSeqNum, TargetRest};
-        false ->
-            compare_rep_history(SourceRest, TargetRest)
-        end
-    end.
+t_replication_not_found() ->
+     ?_test(begin
+        meck:expect(couch_replicator_scheduler, rep_state, 1, nil),
+        UserCtx1 = #user_ctx{name = <<"user">>, roles = [<<"somerole">>]},
+        ?assertEqual(not_found, check_authorization(<<"RepId">>, UserCtx1)),
+        UserCtx2 = #user_ctx{name = <<"adm">>, roles = [<<"_admin">>]},
+        ?assertEqual(not_found, check_authorization(<<"RepId">>, UserCtx2))
+    end).
 
 
-has_session_id(_SessionId, []) ->
-    false;
-has_session_id(SessionId, [{Props} | Rest]) ->
-    case get_value(<<"session_id">>, Props, nil) of
-    SessionId ->
-        true;
-    _Else ->
-        has_session_id(SessionId, Rest)
-    end.
+expect_rep_user_ctx(Name, Role) ->
+    meck:expect(couch_replicator_scheduler, rep_state,
+        fun(_Id) ->
+            UserCtx = #user_ctx{name = Name, roles = [Role]},
+            #rep{user_ctx = UserCtx}
+        end).
 
 
-db_monitor(#db{} = Db) ->
-    couch_db:monitor(Db);
-db_monitor(_HttpDb) ->
-    nil.
+strip_url_creds_test_() ->
+     {
+        foreach,
+        fun () -> meck:expect(config, get,
+            fun(_, _, Default) -> Default end)
+        end,
+        fun (_) -> meck:unload() end,
+        [
+            t_strip_local_db_creds(),
+            t_strip_http_basic_creds(),
+            t_strip_http_props_creds()
+        ]
+    }.
 
-get_pending_count(St) ->
-    Rep = St#rep_state.rep_details,
-    Timeout = get_value(connection_timeout, Rep#rep.options),
-    TimeoutMicro = Timeout * 1000,
-    case get(pending_count_state) of
-        {LastUpdate, PendingCount} ->
-            case timer:now_diff(os:timestamp(), LastUpdate) > TimeoutMicro of
-                true ->
-                    NewPendingCount = get_pending_count_int(St),
-                    put(pending_count_state, {os:timestamp(), NewPendingCount}),
-                    NewPendingCount;
-                false ->
-                    PendingCount
-            end;
-        undefined ->
-            NewPendingCount = get_pending_count_int(St),
-            put(pending_count_state, {os:timestamp(), NewPendingCount}),
-            NewPendingCount
-    end.
 
+t_strip_local_db_creds() ->
+    ?_test(?assertEqual(<<"localdb">>, strip_url_creds(<<"localdb">>))).
 
-get_pending_count_int(#rep_state{source = #httpdb{} = Db0}=St) ->
-    {_, Seq} = St#rep_state.highest_seq_done,
-    Db = Db0#httpdb{retries = 3},
-    case (catch couch_replicator_api_wrap:get_pending_count(Db, Seq)) of
-    {ok, Pending} ->
-        Pending;
-    _ ->
-        null
-    end;
-get_pending_count_int(#rep_state{source = Db}=St) ->
-    {_, Seq} = St#rep_state.highest_seq_done,
-    {ok, Pending} = couch_replicator_api_wrap:get_pending_count(Db, Seq),
-    Pending.
 
+t_strip_http_basic_creds() ->
+    ?_test(begin
+        Url1 = <<"http://adm:pass@host/db">>,
+        ?assertEqual(<<"http://adm:*****@host/db/">>, strip_url_creds(Url1)),
+        Url2 = <<"https://adm:pass@host/db">>,
+        ?assertEqual(<<"https://adm:*****@host/db/">>, strip_url_creds(Url2)),
+        Url3 = <<"http://adm:pass@host:80/db">>,
+        ?assertEqual(<<"http://adm:*****@host:80/db/">>, strip_url_creds(Url3)),
+        Url4 = <<"http://adm:pass@host/db?a=b&c=d">>,
+        ?assertEqual(<<"http://adm:*****@host/db?a=b&c=d">>,
+            strip_url_creds(Url4))
+    end).
 
-update_task(State) ->
-    #rep_state{
-        current_through_seq = {_, ThroughSeq},
-        highest_seq_done = {_, HighestSeq}
-    } = State,
-    couch_task_status:update(
-        rep_stats(State) ++ [
-        {source_seq, HighestSeq},
-        {through_seq, ThroughSeq}
-    ]).
 
+t_strip_http_props_creds() ->
+    ?_test(begin
+        Props1 = {[{<<"url">>, <<"http://adm:pass@host/db">>}]},
+        ?assertEqual(<<"http://adm:*****@host/db/">>, strip_url_creds(Props1)),
+        Props2 = {[ {<<"url">>, <<"http://host/db">>},
+            {<<"headers">>, {[{<<"Authorization">>, <<"Basic pa55">>}]}}
+        ]},
+        ?assertEqual(<<"http://host/db/">>, strip_url_creds(Props2))
+    end).
 
-rep_stats(State) ->
-    #rep_state{
-        committed_seq = {_, CommittedSeq},
-        stats = Stats
-    } = State,
-    [
-        {revisions_checked, couch_replicator_stats:missing_checked(Stats)},
-        {missing_revisions_found, couch_replicator_stats:missing_found(Stats)},
-        {docs_read, couch_replicator_stats:docs_read(Stats)},
-        {docs_written, couch_replicator_stats:docs_written(Stats)},
-        {changes_pending, get_pending_count(State)},
-        {doc_write_failures, couch_replicator_stats:doc_write_failures(Stats)},
-        {checkpointed_source_seq, CommittedSeq}
-    ].
+-endif.
diff --git a/src/couch_replicator/src/couch_replicator.hrl b/src/couch_replicator/src/couch_replicator.hrl
index d3485c0..ba9a606 100644
--- a/src/couch_replicator/src/couch_replicator.hrl
+++ b/src/couch_replicator/src/couch_replicator.hrl
@@ -13,13 +13,30 @@
 -define(REP_ID_VERSION, 3).
 
 -record(rep, {
-    id,
-    source,
-    target,
-    options,
-    user_ctx,
-    type = db,
-    view = nil,
-    doc_id,
-    db_name = null
+    id :: rep_id() | '_' | 'undefined',
+    source :: any() | '_',
+    target :: any() | '_',
+    options :: [_] | '_',
+    user_ctx :: any() | '_',
+    type = db :: atom() | '_',
+    view = nil :: any() | '_',
+    doc_id :: any() | '_',
+    db_name = null :: null | binary() | '_',
+    start_time = {0, 0, 0} :: erlang:timestamp() | '_'
+}).
+
+-type rep_id() :: {string(), string()}.
+-type db_doc_id() :: {binary(), binary() | '_'}.
+-type seconds() :: non_neg_integer().
+-type rep_start_result() ::
+    {ok, rep_id()} |
+    ignore |
+    {temporary_error, binary()} |
+    {permanent_failure, binary()}.
+
+
+-record(doc_worker_result, {
+    id :: db_doc_id(),
+    wref :: reference(),
+    result :: rep_start_result()
 }).
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index e5f6253..a0d08d7 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -38,7 +38,8 @@
     open_doc/3,
     open_doc_revs/6,
     changes_since/5,
-    db_uri/1
+    db_uri/1,
+    normalize_db/1
     ]).
 
 -import(couch_replicator_httpc, [
@@ -290,6 +291,8 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) ->
             throw(missing_doc);
         {'DOWN', Ref, process, Pid, {{nocatch, {missing_stub,_} = Stub}, _}} ->
             throw(Stub);
+        {'DOWN', Ref, process, Pid, {http_request_failed, _, _, max_backoff}} ->
+            exit(max_backoff);
         {'DOWN', Ref, process, Pid, request_uri_too_long} ->
             NewMaxLen = get_value(max_url_len, Options, ?MAX_URL_LEN) div 2,
             case NewMaxLen < ?MIN_URL_LEN of
@@ -517,6 +520,8 @@ changes_since(#httpdb{headers = Headers1, timeout = InactiveTimeout} = HttpDb,
                         end)
             end)
     catch
+        exit:{http_request_failed, _, _, max_backoff} ->
+            exit(max_backoff);
         exit:{http_request_failed, _, _, {error, {connection_closed,
                 mid_stream}}} ->
             throw(retry_no_limit);
@@ -985,3 +990,46 @@ header_value(Key, Headers, Default) ->
         _ ->
             Default
     end.
+
+
+% Normalize an #httpdb{} or #db{} record such that it can be used for
+% comparisons. This means remove things like pids and also sort options / props.
+normalize_db(#httpdb{} = HttpDb) ->
+    #httpdb{
+        url = HttpDb#httpdb.url,
+        oauth = HttpDb#httpdb.oauth,
+        headers = lists:keysort(1, HttpDb#httpdb.headers),
+        timeout = HttpDb#httpdb.timeout,
+        ibrowse_options = lists:keysort(1, HttpDb#httpdb.ibrowse_options),
+        retries = HttpDb#httpdb.retries,
+        http_connections = HttpDb#httpdb.http_connections
+    };
+
+normalize_db(<<DbName/binary>>) ->
+    DbName.
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+
+normalize_http_db_test() ->
+    HttpDb =  #httpdb{
+        url = "http://host/db",
+        oauth = #oauth{},
+        headers = [{"k2","v2"}, {"k1","v1"}],
+        timeout = 30000,
+        ibrowse_options = [{k2, v2}, {k1, v1}],
+        retries = 10,
+        http_connections = 20
+    },
+    Expected = HttpDb#httpdb{
+        headers = [{"k1","v1"}, {"k2","v2"}],
+        ibrowse_options = [{k1, v1}, {k2, v2}]
+    },
+    ?assertEqual(Expected, normalize_db(HttpDb)),
+    ?assertEqual(<<"local">>, normalize_db(<<"local">>)).
+
+
+-endif.
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.hrl b/src/couch_replicator/src/couch_replicator_api_wrap.hrl
index 24e204b..fc94054 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.hrl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.hrl
@@ -25,7 +25,8 @@
     wait = 250,         % milliseconds
     httpc_pool = nil,
     http_connections,
-    backoff = 25
+    first_error_timestamp = nil,
+    proxy_url
 }).
 
 -record(oauth, {
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index cce4ce2..a49d692 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -154,7 +154,7 @@ ensure_rep_ddoc_exists(RepDb, DDocId) ->
         {not_found, _Reason} ->
             DocProps = replication_design_doc_props(DDocId),
             DDoc = couch_doc:from_json_obj({DocProps}),
-            couch_log:notice("creating replicator ddoc", []),
+            couch_log:notice("creating replicator ddoc ~p", [RepDb]),
             {ok, _Rev} = save_rep_doc(RepDb, DDoc);
         {ok, Doc} ->
             Latest = replication_design_doc_props(DDocId),
@@ -166,7 +166,7 @@ ensure_rep_ddoc_exists(RepDb, DDocId) ->
                 false ->
                     LatestWithRev = [{<<"_rev">>, Rev} | Latest],
                     DDoc = couch_doc:from_json_obj({LatestWithRev}),
-                    couch_log:notice("updating replicator ddoc", []),
+                    couch_log:notice("updating replicator ddoc ~p", [RepDb]),
                     try
                         {ok, _} = save_rep_doc(RepDb, DDoc)
                     catch
diff --git a/src/couch_replicator/src/couch_replicator_job_sup.erl b/src/couch_replicator/src/couch_replicator_job_sup.erl
index 3cce46c..9ea65e8 100644
--- a/src/couch_replicator/src/couch_replicator_job_sup.erl
+++ b/src/couch_replicator/src/couch_replicator_job_sup.erl
@@ -11,8 +11,13 @@
 % the License.
 
 -module(couch_replicator_job_sup).
+
 -behaviour(supervisor).
--export([init/1, start_link/0]).
+
+-export([
+    init/1,
+    start_link/0
+]).
 
 start_link() ->
     supervisor:start_link({local,?MODULE}, ?MODULE, []).
diff --git a/src/couch_replicator/src/couch_replicator_js_functions.hrl b/src/couch_replicator/src/couch_replicator_js_functions.hrl
index eba1973..9b11e8a 100644
--- a/src/couch_replicator/src/couch_replicator_js_functions.hrl
+++ b/src/couch_replicator/src/couch_replicator_js_functions.hrl
@@ -53,7 +53,7 @@
         var isReplicator = (userCtx.roles.indexOf('_replicator') >= 0);
         var isAdmin = (userCtx.roles.indexOf('_admin') >= 0);
 
-        if (newDoc._replication_state === 'error') {
+        if (newDoc._replication_state === 'failed') {
             // Skip validation in case when we update the document with the
             // failed state. In this case it might be malformed. However,
             // replicator will not pay attention to failed documents so this
@@ -61,12 +61,6 @@
             return;
         }
 
-        if (oldDoc && !newDoc._deleted && !isReplicator &&
-            (oldDoc._replication_state === 'triggered')) {
-            reportError('Only the replicator can edit replication documents ' +
-                'that are in the triggered state.');
-        }
-
         if (!newDoc._deleted) {
             validateEndpoint(newDoc.source, 'source');
             validateEndpoint(newDoc.target, 'target');
diff --git a/src/couch_replicator/src/couch_replicator_manager.erl b/src/couch_replicator/src/couch_replicator_manager.erl
index 85dd428..afccc0b 100644
--- a/src/couch_replicator/src/couch_replicator_manager.erl
+++ b/src/couch_replicator/src/couch_replicator_manager.erl
@@ -11,1033 +11,19 @@
 % the License.
 
 -module(couch_replicator_manager).
--behaviour(gen_server).
--vsn(3).
--behaviour(config_listener).
 
-% public API
--export([replication_started/1, replication_completed/2, replication_error/2]).
--export([continue/1, replication_usurped/2]).
+% TODO: This is a temporary proxy module to external calls (outside replicator)
+%  to other replicator modules. This is done to avoid juggling multiple repos
+% during development.
 
+% NV: TODO: These functions were moved to couch_replicator_docs
+% but it is still called from fabric_doc_update. Keep it here for now
+% later, update fabric to call couch_replicator_docs instead
 -export([before_doc_update/2, after_doc_read/2]).
 
-% gen_server callbacks
--export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]).
--export([code_change/3, terminate/2]).
 
-% changes callbacks
--export([changes_reader/3, changes_reader_cb/3]).
+before_doc_update(Doc, Db) ->
+    couch_replicator_docs:before_doc_update(Doc, Db).
 
-% config_listener callback
--export([handle_config_change/5, handle_config_terminate/3]).
-
--export([handle_db_event/3]).
-
-%% exported but private
--export([start_replication/2]).
-
--include_lib("couch/include/couch_db.hrl").
--include_lib("mem3/include/mem3.hrl").
--include("couch_replicator.hrl").
--include("couch_replicator_js_functions.hrl").
-
--define(DOC_TO_REP, couch_rep_doc_id_to_rep_id).
--define(REP_TO_STATE, couch_rep_id_to_rep_state).
--define(INITIAL_WAIT, 2.5). % seconds
--define(MAX_WAIT, 600).     % seconds
--define(AVG_DELAY_MSEC, 100).
--define(MAX_DELAY_MSEC, 60000).
--define(OWNER, <<"owner">>).
--define(REPLICATOR_DB, <<"_replicator">>).
-
--define(DB_TO_SEQ, db_to_seq).
--define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}).
-
--define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
-
--define(RELISTEN_DELAY, 5000).
-
--record(rep_state, {
-    rep,
-    starting,
-    retries_left,
-    max_retries,
-    wait = ?INITIAL_WAIT
-}).
-
--import(couch_util, [
-    to_binary/1
-]).
-
--record(state, {
-    event_listener = nil,
-    scan_pid = nil,
-    rep_start_pids = [],
-    max_retries,
-    live = [],
-    epoch = nil
-}).
-
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-
-replication_started(#rep{id = {BaseId, _} = RepId}) ->
-    case rep_state(RepId) of
-    nil ->
-        ok;
-    #rep_state{rep = #rep{db_name = DbName, doc_id = DocId}} ->
-        update_rep_doc(DbName, DocId, [
-            {<<"_replication_state">>, <<"triggered">>},
-            {<<"_replication_state_reason">>, undefined},
-            {<<"_replication_id">>, ?l2b(BaseId)},
-            {<<"_replication_stats">>, undefined}]),
-        ok = gen_server:call(?MODULE, {rep_started, RepId}, infinity),
-        couch_log:notice("Document `~s` from `~s` triggered replication `~s`",
-            [DocId, DbName, pp_rep_id(RepId)])
-    end.
-
-
-replication_completed(#rep{id = RepId}, Stats) ->
-    case rep_state(RepId) of
-    nil ->
-        ok;
-    #rep_state{rep = #rep{db_name = DbName, doc_id = DocId}} ->
-        update_rep_doc(DbName, DocId, [
-            {<<"_replication_state">>, <<"completed">>},
-            {<<"_replication_state_reason">>, undefined},
-            {<<"_replication_stats">>, {Stats}}]),
-        ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity),
-        couch_log:notice("Replication `~s` finished (triggered by document `~s`"
-            " from `~s`)", [pp_rep_id(RepId), DocId, DbName])
-    end.
-
-
-replication_usurped(#rep{id = RepId}, By) ->
-    case rep_state(RepId) of
-    nil ->
-        ok;
-    #rep_state{rep = #rep{db_name = DbName, doc_id = DocId}} ->
-        ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity),
-        couch_log:notice("Replication `~s` usurped by ~s (triggered by document"
-            " `~s` from `~s`)", [pp_rep_id(RepId), By, DocId, DbName])
-    end.
-
-
-replication_error(#rep{id = {BaseId, _} = RepId}, Error) ->
-    case rep_state(RepId) of
-    nil ->
-        ok;
-    #rep_state{rep = #rep{db_name = DbName, doc_id = DocId}} ->
-        ok = timer:sleep(jitter(ets:info(?REP_TO_STATE, size))),
-        update_rep_doc(DbName, DocId, [
-            {<<"_replication_state">>, <<"error">>},
-            {<<"_replication_state_reason">>, to_binary(error_reason(Error))},
-            {<<"_replication_id">>, ?l2b(BaseId)}]),
-        ok = gen_server:call(?MODULE, {rep_error, RepId, Error}, infinity)
-    end.
-
-continue(#rep{doc_id = null}) ->
-    {true, no_owner};
-continue(#rep{id = RepId}) ->
-    Owner = gen_server:call(?MODULE, {owner, RepId}, infinity),
-    {node() == Owner, Owner}.
-
-
-handle_config_change("replicator", "max_replication_retry_count", V, _, S) ->
-    ok = gen_server:cast(?MODULE, {set_max_retries, retries_value(V)}),
-    {ok, S};
-handle_config_change(_, _, _, _, S) ->
-    {ok, S}.
-
-handle_config_terminate(_, stop, _) ->
-    ok;
-handle_config_terminate(_Server, _Reason, _State) ->
-    erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener).
-
-init(_) ->
-    process_flag(trap_exit, true),
-    net_kernel:monitor_nodes(true),
-    Live = [node() | nodes()],
-    ?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, public]),
-    ?REP_TO_STATE = ets:new(?REP_TO_STATE, [named_table, set, public]),
-    ?DB_TO_SEQ = ets:new(?DB_TO_SEQ, [named_table, set, public]),
-    Server = self(),
-    ok = config:listen_for_changes(?MODULE, nil),
-    Epoch = make_ref(),
-    ScanPid = spawn_link(fun() -> scan_all_dbs(Server) end),
-    % Automatically start node local changes feed loop
-    ensure_rep_db_exists(?REPLICATOR_DB),
-    Pid = start_changes_reader(?REPLICATOR_DB, 0, Epoch),
-    {ok, #state{
-        event_listener = start_event_listener(),
-        scan_pid = ScanPid,
-        max_retries = retries_value(
-            config:get("replicator", "max_replication_retry_count", "10")),
-        rep_start_pids = [{?REPLICATOR_DB, Pid}],
-        live = Live,
-        epoch = Epoch
-    }}.
-
-handle_call({owner, RepId}, _From, State) ->
-    case rep_state(RepId) of
-    nil ->
-        {reply, nonode, State};
-    #rep_state{rep = #rep{db_name = DbName, doc_id = DocId}} ->
-        {reply, owner(DbName, DocId, State#state.live), State}
-    end;
-
-handle_call({rep_db_update, DbName, {ChangeProps} = Change}, _From, State) ->
-    NewState = try
-        process_update(State, DbName, Change)
-    catch
-    _Tag:Error ->
-        {RepProps} = get_json_value(doc, ChangeProps),
-        DocId = get_json_value(<<"_id">>, RepProps),
-        rep_db_update_error(Error, DbName, DocId),
-        State
-    end,
-    {reply, ok, NewState};
-
-
-handle_call({rep_started, RepId}, _From, State) ->
-    case rep_state(RepId) of
-    nil ->
-        ok;
-    RepState ->
-        NewRepState = RepState#rep_state{
-            starting = false,
-            retries_left = State#state.max_retries,
-            max_retries = State#state.max_retries,
-            wait = ?INITIAL_WAIT
-        },
-        true = ets:insert(?REP_TO_STATE, {RepId, NewRepState})
-    end,
-    {reply, ok, State};
-
-handle_call({rep_complete, RepId}, _From, State) ->
-    true = ets:delete(?REP_TO_STATE, RepId),
-    {reply, ok, State};
-
-handle_call({rep_error, RepId, Error}, _From, State) ->
-    {reply, ok, replication_error(State, RepId, Error)};
-
-% Match changes epoch with the current epoch in the state.
-% New epoch ref is created on a full rescan. Change feeds have to
-% be replayed from the start to determine ownership in the new
-% cluster configuration and epoch is used to match & checkpoint
-% only changes from the current cluster configuration.
-handle_call({rep_db_checkpoint, DbName, EndSeq, Epoch}, _From,
-            #state{epoch = Epoch} = State) ->
-    Entry = case ets:lookup(?DB_TO_SEQ, DbName) of
-        [] ->
-            {DbName, EndSeq, false};
-        [{DbName, _OldSeq, Rescan}] ->
-            {DbName, EndSeq, Rescan}
-    end,
-    true = ets:insert(?DB_TO_SEQ, Entry),
-    {reply, ok, State};
-
-% Ignore checkpoints from previous epoch.
-handle_call({rep_db_checkpoint, _DbName, _EndSeq, _Epoch}, _From, State) ->
-    {reply, ok, State};
-
-handle_call(Msg, From, State) ->
-    couch_log:error("Replication manager received unexpected call ~p from ~p",
-        [Msg, From]),
-    {stop, {error, {unexpected_call, Msg}}, State}.
-
-handle_cast({resume_scan, DbName}, State) ->
-    Pids = State#state.rep_start_pids,
-    NewPids = case lists:keyfind(DbName, 1, Pids) of
-        {DbName, _Pid} ->
-            Entry = case ets:lookup(?DB_TO_SEQ, DbName) of
-                [] ->
-                    {DbName, 0, true};
-                [{DbName, EndSeq, _Rescan}] ->
-                    {DbName, EndSeq, true}
-            end,
-            true = ets:insert(?DB_TO_SEQ, Entry),
-            Pids;
-        false ->
-            Since = case ets:lookup(?DB_TO_SEQ, DbName) of
-                [] -> 0;
-                [{DbName, EndSeq, _Rescan}] -> EndSeq
-            end,
-            true = ets:insert(?DB_TO_SEQ, {DbName, Since, false}),
-            ensure_rep_ddoc_exists(DbName),
-            Pid = start_changes_reader(DbName, Since, State#state.epoch),
-            couch_log:debug("Scanning ~s from update_seq ~p", [DbName, Since]),
-            [{DbName, Pid} | Pids]
-    end,
-    {noreply, State#state{rep_start_pids = NewPids}};
-
-handle_cast({set_max_retries, MaxRetries}, State) ->
-    {noreply, State#state{max_retries = MaxRetries}};
-
-handle_cast(Msg, State) ->
-    couch_log:error("Replication manager received unexpected cast ~p", [Msg]),
-    {stop, {error, {unexpected_cast, Msg}}, State}.
-
-handle_info({nodeup, Node}, State) ->
-    couch_log:notice("Rescanning replicator dbs as ~s came up.", [Node]),
-    Live = lists:usort([Node | State#state.live]),
-    {noreply, rescan(State#state{live=Live})};
-
-handle_info({nodedown, Node}, State) ->
-    couch_log:notice("Rescanning replicator dbs ~s went down.", [Node]),
-    Live = State#state.live -- [Node],
-    {noreply, rescan(State#state{live=Live})};
-
-handle_info({'EXIT', From, normal}, #state{scan_pid = From} = State) ->
-    couch_log:debug("Background scan has completed.", []),
-    {noreply, State#state{scan_pid=nil}};
-
-handle_info({'EXIT', From, Reason}, #state{scan_pid = From} = State) ->
-    couch_log:error("Background scanner died. Reason: ~p", [Reason]),
-    {stop, {scanner_died, Reason}, State};
-
-handle_info({'EXIT', From, Reason}, #state{event_listener = From} = State) ->
-    couch_log:error("Database update notifier died. Reason: ~p", [Reason]),
-    {stop, {db_update_notifier_died, Reason}, State};
-
-handle_info({'EXIT', From, Reason}, #state{rep_start_pids = Pids} = State) ->
-    case lists:keytake(From, 2, Pids) of
-        {value, {DbName, From}, NewPids} ->
-            if Reason == normal -> ok; true ->
-                Fmt = "~s : Known replication or change feed pid ~w died :: ~w",
-                couch_log:error(Fmt, [?MODULE, From, Reason])
-            end,
-            NewState = State#state{rep_start_pids = NewPids},
-            case ets:lookup(?DB_TO_SEQ, DbName) of
-                [{DbName, _EndSeq, true}] ->
-                    handle_cast({resume_scan, DbName}, NewState);
-                _ ->
-                    {noreply, NewState}
-            end;
-        false when Reason == normal ->
-            {noreply, State};
-        false ->
-            Fmt = "~s : Unknown pid ~w died :: ~w",
-            couch_log:error(Fmt, [?MODULE, From, Reason]),
-            {stop, {unexpected_exit, From, Reason}, State}
-    end;
-
-handle_info({'DOWN', _Ref, _, _, _}, State) ->
-    % From a db monitor created by a replication process. Ignore.
-    {noreply, State};
-
-handle_info(shutdown, State) ->
-    {stop, shutdown, State};
-
-handle_info(restart_config_listener, State) ->
-    ok = config:listen_for_changes(?MODULE, nil),
-    {noreply, State};
-
-handle_info(Msg, State) ->
-    couch_log:error("Replication manager received unexpected message ~p", [Msg]),
-    {stop, {unexpected_msg, Msg}, State}.
-
-
-terminate(_Reason, State) ->
-    #state{
-        scan_pid = ScanPid,
-        rep_start_pids = StartPids,
-        event_listener = Listener
-    } = State,
-    stop_all_replications(),
-    lists:foreach(
-        fun({_Tag, Pid}) ->
-            catch unlink(Pid),
-            catch exit(Pid, stop)
-        end,
-        [{scanner, ScanPid} | StartPids]),
-    true = ets:delete(?REP_TO_STATE),
-    true = ets:delete(?DOC_TO_REP),
-    true = ets:delete(?DB_TO_SEQ),
-    couch_event:stop_listener(Listener).
-
-
-code_change(1, State, _Extra) ->
-    {ok, erlang:append_element(State, [node() | nodes()])};
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-
-start_changes_reader(DbName, Since, Epoch) ->
-    spawn_link(?MODULE, changes_reader, [{self(), Epoch}, DbName, Since]).
-
-changes_reader({Server, Epoch}, DbName, Since) ->
-    UserCtx = #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]},
-    DbOpenOptions = [{user_ctx, UserCtx}, sys_db],
-    {ok, Db} = couch_db:open_int(DbName, DbOpenOptions),
-    ChangesFeedFun = couch_changes:handle_db_changes(
-        #changes_args{
-            include_docs = true,
-            since = Since,
-            feed = "normal",
-            timeout = infinity
-        },
-        {json_req, null},
-        Db
-    ),
-    ChangesFeedFun({fun ?MODULE:changes_reader_cb/3, {Server, DbName, Epoch}}).
-
-changes_reader_cb({change, Change, _}, _, {Server, DbName, Epoch}) ->
-    case has_valid_rep_id(Change) of
-        true ->
-            Msg = {rep_db_update, DbName, Change},
-            ok = gen_server:call(Server, Msg, infinity);
-        false ->
-            ok
-    end,
-    {Server, DbName, Epoch};
-changes_reader_cb({stop, EndSeq}, _, {Server, DbName, Epoch}) ->
-    Msg = {rep_db_checkpoint, DbName, EndSeq, Epoch},
-    ok = gen_server:call(Server, Msg, infinity),
-    {Server, DbName, Epoch};
-changes_reader_cb(_, _, Acc) ->
-    Acc.
-
-has_valid_rep_id({Change}) ->
-    has_valid_rep_id(get_json_value(<<"id">>, Change));
-has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) ->
-    false;
-has_valid_rep_id(_Else) ->
-    true.
-
-
-start_event_listener() ->
-    {ok, Pid} = couch_event:link_listener(
-            ?MODULE, handle_db_event, self(), [all_dbs]
-        ),
-    Pid.
-
-
-handle_db_event(DbName, created, Server) ->
-    case is_replicator_db(DbName) of
-	true ->
-	    ensure_rep_ddoc_exists(DbName);
-	_ ->
-	    ok
-    end,
-    {ok, Server};
-handle_db_event(DbName, updated, Server) ->
-    case is_replicator_db(DbName) of
-        true ->
-	    Msg = {resume_scan, DbName},
-	    ok = gen_server:cast(Server, Msg);
-        _ ->
-            ok
-    end,
-    {ok, Server};
-handle_db_event(DbName, deleted, Server) ->
-    case is_replicator_db(DbName) of
-        true ->
-            clean_up_replications(DbName);
-        _ ->
-            ok
-    end,
-    {ok, Server};
-handle_db_event(_DbName, _Event, Server) ->
-    {ok, Server}.
-
-rescan(#state{scan_pid = nil} = State) ->
-    true = ets:delete_all_objects(?DB_TO_SEQ),
-    Server = self(),
-    Epoch = make_ref(),
-    NewScanPid = spawn_link(fun() -> scan_all_dbs(Server) end),
-    State#state{scan_pid = NewScanPid, epoch = Epoch};
-rescan(#state{scan_pid = ScanPid} = State) ->
-    unlink(ScanPid),
-    exit(ScanPid, exit),
-    rescan(State#state{scan_pid = nil}).
-
-process_update(State, DbName, {Change}) ->
-    {RepProps} = JsonRepDoc = get_json_value(doc, Change),
-    DocId = get_json_value(<<"_id">>, RepProps),
-    case {owner(DbName, DocId, State#state.live), get_json_value(deleted, Change, false)} of
-    {_, true} ->
-        rep_doc_deleted(DbName, DocId),
-        State;
-    {Owner, false} when Owner /= node() ->
-        couch_log:notice("Not starting '~s' from '~s' as owner is ~s.",
-            [DocId, DbName, Owner]),
-        State;
-    {_Owner, false} ->
-        couch_log:notice("Maybe starting '~s' from '~s' as I'm the owner", [DocId, DbName]),
-        case get_json_value(<<"_replication_state">>, RepProps) of
-        undefined ->
-            maybe_start_replication(State, DbName, DocId, JsonRepDoc);
-        <<"triggered">> ->
-            maybe_start_replication(State, DbName, DocId, JsonRepDoc);
-        <<"completed">> ->
-            replication_complete(DbName, DocId),
-            State;
-        <<"error">> ->
-            case ets:lookup(?DOC_TO_REP, {DbName, DocId}) of
-            [] ->
-                maybe_start_replication(State, DbName, DocId, JsonRepDoc);
-            _ ->
-                State
-            end
-        end
-    end.
-
-owner(<<"shards/", _/binary>> = DbName, DocId, Live) ->
-    Nodes = lists:sort([N || #shard{node=N} <- mem3:shards(mem3:dbname(DbName), DocId),
-			     lists:member(N, Live)]),
-    hd(mem3_util:rotate_list({DbName, DocId}, Nodes));
-owner(_DbName, _DocId, _Live) ->
-    node().
-
-rep_db_update_error(Error, DbName, DocId) ->
-    case Error of
-    {bad_rep_doc, Reason} ->
-        ok;
-    _ ->
-        Reason = to_binary(Error)
-    end,
-    couch_log:error("Replication manager, error processing document `~s`"
-        " from `~s`: ~s", [DocId, DbName, Reason]),
-    update_rep_doc(DbName, DocId, [{<<"_replication_state">>, <<"error">>},
-                           {<<"_replication_state_reason">>, Reason}]).
-
-
-rep_user_ctx({RepDoc}) ->
-    case get_json_value(<<"user_ctx">>, RepDoc) of
-    undefined ->
-        #user_ctx{};
-    {UserCtx} ->
-        #user_ctx{
-            name = get_json_value(<<"name">>, UserCtx, null),
-            roles = get_json_value(<<"roles">>, UserCtx, [])
-        }
-    end.
-
-
-maybe_start_replication(State, DbName, DocId, RepDoc) ->
-    #rep{id = {BaseId, _} = RepId} = Rep0 = parse_rep_doc(RepDoc),
-    Rep = Rep0#rep{db_name = DbName},
-    case rep_state(RepId) of
-    nil ->
-        RepState = #rep_state{
-            rep = Rep,
-            starting = true,
-            retries_left = State#state.max_retries,
-            max_retries = State#state.max_retries
-        },
-        true = ets:insert(?REP_TO_STATE, {RepId, RepState}),
-        true = ets:insert(?DOC_TO_REP, {{DbName, DocId}, RepId}),
-        couch_log:notice("Attempting to start replication `~s` (document `~s`"
-            " from `~s`).", [pp_rep_id(RepId), DocId, DbName]),
-        StartDelaySecs = erlang:max(0,
-            config:get_integer("replicator", "start_delay", 10)),
-        StartSplaySecs = erlang:max(1,
-            config:get_integer("replicator", "start_splay", 50)),
-        DelaySecs = StartDelaySecs + random:uniform(StartSplaySecs),
-        couch_log:notice("Delaying replication `~s` start by ~p seconds.",
-            [pp_rep_id(RepId), DelaySecs]),
-        Pid = spawn_link(?MODULE, start_replication, [Rep, DelaySecs]),
-        State#state{
-            rep_start_pids = [{rep_start, Pid} | State#state.rep_start_pids]
-        };
-    #rep_state{rep = #rep{doc_id = DocId}} ->
-        State;
-    #rep_state{starting = false, rep = #rep{db_name = DbName, doc_id = OtherDocId}} ->
-        couch_log:notice("The replication specified by the document `~s` from"
-            " `~s` was already triggered by the document `~s`",
-            [DocId, DbName, OtherDocId]),
-        maybe_tag_rep_doc(DbName, DocId, RepDoc, ?l2b(BaseId)),
-        State;
-    #rep_state{starting = true, rep = #rep{db_name = DbName, doc_id = OtherDocId}} ->
-        couch_log:notice("The replication specified by the document `~s` from"
-            " `~s` is already being triggered by the document `~s`",
-            [DocId, DbName, OtherDocId]),
-        maybe_tag_rep_doc(DbName, DocId, RepDoc, ?l2b(BaseId)),
-        State
-    end.
-
-
-parse_rep_doc(RepDoc) ->
-    {ok, Rep} = try
-        couch_replicator_utils:parse_rep_doc(RepDoc, rep_user_ctx(RepDoc))
-    catch
-    throw:{error, Reason} ->
-        throw({bad_rep_doc, Reason});
-    Tag:Err ->
-        throw({bad_rep_doc, to_binary({Tag, Err})})
-    end,
-    Rep.
-
-
-maybe_tag_rep_doc(DbName, DocId, {RepProps}, RepId) ->
-    case get_json_value(<<"_replication_id">>, RepProps) of
-    RepId ->
-        ok;
-    _ ->
-        update_rep_doc(DbName, DocId, [{<<"_replication_id">>, RepId}])
-    end.
-
-start_replication(Rep, Wait) ->
-    ok = timer:sleep(Wait * 1000),
-    case (catch couch_replicator:async_replicate(Rep)) of
-    {ok, _} ->
-        ok;
-    Error ->
-        replication_error(Rep, Error)
-    end.
-
-replication_complete(DbName, DocId) ->
-    case ets:lookup(?DOC_TO_REP, {DbName, DocId}) of
-    [{{DbName, DocId}, {BaseId, Ext} = RepId}] ->
-        case rep_state(RepId) of
-        nil ->
-            % Prior to OTP R14B02, temporary child specs remain in
-            % in the supervisor after a worker finishes - remove them.
-            % We want to be able to start the same replication but with
-            % eventually different values for parameters that don't
-            % contribute to its ID calculation.
-            case erlang:system_info(otp_release) < "R14B02" of
-            true ->
-                spawn(fun() ->
-                    _ = supervisor:delete_child(couch_replicator_job_sup, BaseId ++ Ext)
-                end);
-            false ->
-                ok
-            end;
-        #rep_state{} ->
-            ok
-        end,
-        true = ets:delete(?DOC_TO_REP, {DbName, DocId});
-    _ ->
-        ok
-    end.
-
-
-rep_doc_deleted(DbName, DocId) ->
-    case ets:lookup(?DOC_TO_REP, {DbName, DocId}) of
-    [{{DbName, DocId}, RepId}] ->
-        couch_replicator:cancel_replication(RepId),
-        true = ets:delete(?REP_TO_STATE, RepId),
-        true = ets:delete(?DOC_TO_REP, {DbName, DocId}),
-        couch_log:notice("Stopped replication `~s` because replication document"
-            " `~s` from `~s` was deleted", [pp_rep_id(RepId), DocId, DbName]);
-    [] ->
-        ok
-    end.
-
-
-replication_error(State, RepId, Error) ->
-    case rep_state(RepId) of
-    nil ->
-        State;
-    RepState ->
-        maybe_retry_replication(RepState, Error, State)
-    end.
-
-maybe_retry_replication(#rep_state{retries_left = 0} = RepState, Error, State) ->
-    #rep_state{
-        rep = #rep{id = RepId, doc_id = DocId, db_name = DbName},
-        max_retries = MaxRetries
-    } = RepState,
-    couch_replicator:cancel_replication(RepId),
-    true = ets:delete(?REP_TO_STATE, RepId),
-    true = ets:delete(?DOC_TO_REP, {DbName, DocId}),
-    couch_log:error("Error in replication `~s` (triggered by document `~s` from"
-        " `~s` ): ~s~nReached maximum retry attempts (~p).", [pp_rep_id(RepId),
-        DocId, DbName, to_binary(error_reason(Error)), MaxRetries]),
-    State;
-
-maybe_retry_replication(RepState, Error, State) ->
-    #rep_state{
-        rep = #rep{id = RepId, doc_id = DocId, db_name = DbName} = Rep
-    } = RepState,
-    #rep_state{wait = Wait} = NewRepState = state_after_error(RepState),
-    true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}),
-    couch_log:error("Error in replication `~s` (triggered by document `~s` from"
-        " `~s` ): ~s~nRestarting replication in ~p seconds.", [pp_rep_id(RepId),
-        DocId, DbName, to_binary(error_reason(Error)), Wait]),
-    Pid = spawn_link(?MODULE, start_replication, [Rep, Wait]),
-    State#state{
-        rep_start_pids = [{rep_start, Pid} | State#state.rep_start_pids]
-    }.
-
-
-stop_all_replications() ->
-    couch_log:notice("Stopping all ongoing replications", []),
-    ets:foldl(
-        fun({_, RepId}, _) ->
-            couch_replicator:cancel_replication(RepId)
-        end,
-        ok, ?DOC_TO_REP),
-    true = ets:delete_all_objects(?REP_TO_STATE),
-    true = ets:delete_all_objects(?DOC_TO_REP),
-    true = ets:delete_all_objects(?DB_TO_SEQ).
-
-clean_up_replications(DbName) ->
-    ets:foldl(
-        fun({{Name, DocId}, RepId}, _) when Name =:= DbName ->
-            couch_replicator:cancel_replication(RepId),
-            ets:delete(?DOC_TO_REP,{Name, DocId}),
-            ets:delete(?REP_TO_STATE, RepId);
-           ({_,_}, _) ->
-            ok
-        end,
-        ok, ?DOC_TO_REP),
-    ets:delete(?DB_TO_SEQ,DbName).
-
-
-update_rep_doc(RepDbName, RepDocId, KVs) ->
-    update_rep_doc(RepDbName, RepDocId, KVs, 1).
-
-update_rep_doc(RepDbName, RepDocId, KVs, Wait) when is_binary(RepDocId) ->
-    try
-        case open_rep_doc(RepDbName, RepDocId) of
-            {ok, LastRepDoc} ->
-                update_rep_doc(RepDbName, LastRepDoc, KVs, Wait * 2);
-            _ ->
-                ok
-        end
-    catch
-        throw:conflict ->
-            Msg = "Conflict when updating replication document `~s`. Retrying.",
-            couch_log:error(Msg, [RepDocId]),
-            ok = timer:sleep(random:uniform(erlang:min(128, Wait)) * 100),
-            update_rep_doc(RepDbName, RepDocId, KVs, Wait * 2)
-    end;
-update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) ->
-    NewRepDocBody = lists:foldl(
-        fun({K, undefined}, Body) ->
-                lists:keydelete(K, 1, Body);
-           ({<<"_replication_state">> = K, State} = KV, Body) ->
-                case get_json_value(K, Body) of
-                State ->
-                    Body;
-                _ ->
-                    Body1 = lists:keystore(K, 1, Body, KV),
-                    lists:keystore(
-                        <<"_replication_state_time">>, 1, Body1,
-                        {<<"_replication_state_time">>, timestamp()})
-                end;
-            ({K, _V} = KV, Body) ->
-                lists:keystore(K, 1, Body, KV)
-        end,
-        RepDocBody, KVs),
-    case NewRepDocBody of
-    RepDocBody ->
-        ok;
-    _ ->
-        % Might not succeed - when the replication doc is deleted right
-        % before this update (not an error, ignore).
-        save_rep_doc(RepDbName, RepDoc#doc{body = {NewRepDocBody}})
-    end.
-
-open_rep_doc(DbName, DocId) ->
-    case couch_db:open_int(DbName, [?CTX, sys_db]) of
-        {ok, Db} ->
-            try
-                couch_db:open_doc(Db, DocId, [ejson_body])
-            after
-                couch_db:close(Db)
-            end;
-        Else ->
-            Else
-    end.
-
-save_rep_doc(DbName, Doc) ->
-    {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
-    try
-        couch_db:update_doc(Db, Doc, [])
-    after
-        couch_db:close(Db)
-    end.
-
-% RFC3339 timestamps.
-% Note: doesn't include the time seconds fraction (RFC3339 says it's optional).
-timestamp() ->
-    {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(now()),
-    UTime = erlang:universaltime(),
-    LocalTime = calendar:universal_time_to_local_time(UTime),
-    DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) -
-        calendar:datetime_to_gregorian_seconds(UTime),
-    zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60),
-    iolist_to_binary(
-        io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w~s",
-            [Year, Month, Day, Hour, Min, Sec,
-                zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])).
-
-zone(Hr, Min) when Hr >= 0, Min >= 0 ->
-    io_lib:format("+~2..0w:~2..0w", [Hr, Min]);
-zone(Hr, Min) ->
-    io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]).
-
-
-ensure_rep_db_exists(<<"shards/", _/binary>>=DbName) ->
-    ensure_rep_ddoc_exists(DbName),
-    ok;
-ensure_rep_db_exists(DbName) ->
-    Db = case couch_db:open_int(DbName, [?CTX, sys_db, nologifmissing]) of
-        {ok, Db0} ->
-            Db0;
-        _Error ->
-            {ok, Db0} = couch_db:create(DbName, [?CTX, sys_db]),
-            Db0
-    end,
-    ensure_rep_ddoc_exists(DbName),
-    {ok, Db}.
-
-ensure_rep_ddoc_exists(RepDb) ->
-    DDocId = <<"_design/_replicator">>,
-    case mem3:belongs(RepDb, DDocId) of
-	true ->
-	    ensure_rep_ddoc_exists(RepDb, DDocId);
-	false ->
-	    ok
-    end.
-
-ensure_rep_ddoc_exists(RepDb, DDocId) ->
-    case open_rep_doc(RepDb, DDocId) of
-        {not_found, no_db_file} ->
-            %% database was deleted.
-            ok;
-        {not_found, _Reason} ->
-            {ok, DDoc} = replication_design_doc(DDocId),
-            couch_log:notice("creating replicator ddoc", []),
-            {ok, _Rev} = save_rep_doc(RepDb, DDoc);
-        {ok, Doc} ->
-            {Props} = couch_doc:to_json_obj(Doc, []),
-            case couch_util:get_value(<<"validate_doc_update">>, Props, []) of
-                ?REP_DB_DOC_VALIDATE_FUN ->
-                    ok;
-                _ ->
-                    Props1 = lists:keyreplace(<<"validate_doc_update">>, 1, Props,
-                         {<<"validate_doc_update">>,
-                        ?REP_DB_DOC_VALIDATE_FUN}),
-                    DDoc = couch_doc:from_json_obj({Props1}),
-                    couch_log:notice("updating replicator ddoc", []),
-                    try
-                        {ok, _} = save_rep_doc(RepDb, DDoc)
-                    catch
-                        throw:conflict ->
-                            %% ignore, we'll retry next time
-                            ok
-                    end
-            end
-    end,
-    ok.
-
-replication_design_doc(DDocId) ->
-    DocProps = [
-        {<<"_id">>, DDocId},
-        {<<"language">>, <<"javascript">>},
-        {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
-   ],
-   {ok, couch_doc:from_json_obj({DocProps})}.
-
-
-% pretty-print replication id
-pp_rep_id(#rep{id = RepId}) ->
-    pp_rep_id(RepId);
-pp_rep_id({Base, Extension}) ->
-    Base ++ Extension.
-
-
-rep_state(RepId) ->
-    case ets:lookup(?REP_TO_STATE, RepId) of
-    [{RepId, RepState}] ->
-        RepState;
-    [] ->
-        nil
-    end.
-
-
-error_reason({error, {Error, Reason}})
-  when is_atom(Error), is_binary(Reason) ->
-    io_lib:format("~s: ~s", [Error, Reason]);
-error_reason({error, Reason}) ->
-    Reason;
-error_reason(Reason) ->
-    Reason.
-
-
-retries_value("infinity") ->
-    infinity;
-retries_value(Value) ->
-    list_to_integer(Value).
-
-
-state_after_error(#rep_state{retries_left = Left, wait = Wait} = State) ->
-    Wait2 = erlang:min(trunc(Wait * 2), ?MAX_WAIT),
-    case Left of
-    infinity ->
-        State#rep_state{wait = Wait2};
-    _ ->
-        State#rep_state{retries_left = Left - 1, wait = Wait2}
-    end.
-
-
-before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
-    Doc;
-before_doc_update(#doc{body = {Body}} = Doc, #db{user_ctx=UserCtx} = Db) ->
-    #user_ctx{roles = Roles, name = Name} = UserCtx,
-    case lists:member(<<"_replicator">>, Roles) of
-    true ->
-        Doc;
-    false ->
-        case couch_util:get_value(?OWNER, Body) of
-        undefined ->
-            Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
-        Name ->
-            Doc;
-        Other ->
-            case (catch couch_db:check_is_admin(Db)) of
-            ok when Other =:= null ->
-                Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
-            ok ->
-                Doc;
-            _ ->
-                throw({forbidden, <<"Can't update replication documents",
-                    " from other users.">>})
-            end
-        end
-    end.
-
-
-after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
-    Doc;
-after_doc_read(#doc{body = {Body}} = Doc, #db{user_ctx=UserCtx} = Db) ->
-    #user_ctx{name = Name} = UserCtx,
-    case (catch couch_db:check_is_admin(Db)) of
-    ok ->
-        Doc;
-    _ ->
-        case couch_util:get_value(?OWNER, Body) of
-        Name ->
-            Doc;
-        _Other ->
-            Source = strip_credentials(couch_util:get_value(<<"source">>,
-Body)),
-            Target = strip_credentials(couch_util:get_value(<<"target">>,
-Body)),
-            NewBody0 = ?replace(Body, <<"source">>, Source),
-            NewBody = ?replace(NewBody0, <<"target">>, Target),
-            #doc{revs = {Pos, [_ | Revs]}} = Doc,
-            NewDoc = Doc#doc{body = {NewBody}, revs = {Pos - 1, Revs}},
-            NewRevId = couch_db:new_revid(NewDoc),
-            NewDoc#doc{revs = {Pos, [NewRevId | Revs]}}
-        end
-    end.
-
-
-strip_credentials(undefined) ->
-    undefined;
-strip_credentials(Url) when is_binary(Url) ->
-    re:replace(Url,
-        "http(s)?://(?:[^:]+):[^@]+@(.*)$",
-        "http\\1://\\2",
-        [{return, binary}]);
-strip_credentials({Props}) ->
-    {lists:keydelete(<<"oauth">>, 1, Props)}.
-
-scan_all_dbs(Server) when is_pid(Server) ->
-    {ok, Db} = mem3_util:ensure_exists(
-        config:get("mem3", "shards_db", "_dbs")),
-    ChangesFun = couch_changes:handle_changes(#changes_args{}, nil, Db, nil),
-    ChangesFun(fun({change, {Change}, _}, _) ->
-        DbName = couch_util:get_value(<<"id">>, Change),
-        case DbName of <<"_design/", _/binary>> -> ok; _Else ->
-            case couch_replicator_utils:is_deleted(Change) of
-            true ->
-                ok;
-            false ->
-                try
-                    [gen_server:cast(Server, {resume_scan, ShardName})
-                        || ShardName <- replicator_shards(DbName)]
-                catch error:database_does_not_exist ->
-                    ok
-                end
-            end
-        end;
-        (_, _) -> ok
-    end),
-    couch_db:close(Db).
-
-
-replicator_shards(DbName) ->
-    case is_replicator_db(DbName) of
-    false ->
-        [];
-    true ->
-        [ShardName || #shard{name = ShardName} <- mem3:local_shards(DbName)]
-    end.
-
-
-% calculate random delay proportional to the number of replications
-% on current node, in order to prevent a stampede:
-%   - when a source with multiple replication targets fails
-%   - when we restart couch_replication_manager
-jitter(N) ->
-    Range = min(2 * N * ?AVG_DELAY_MSEC, ?MAX_DELAY_MSEC),
-    random:uniform(Range).
-
-is_replicator_db(DbName) ->
-    ?REPLICATOR_DB =:= couch_db:dbname_suffix(DbName).
-
-get_json_value(Key, Props) ->
-    get_json_value(Key, Props, undefined).
-
-get_json_value(Key, Props, Default) when is_atom(Key) ->
-    Ref = make_ref(),
-    case couch_util:get_value(Key, Props, Ref) of
-        Ref ->
-            couch_util:get_value(?l2b(atom_to_list(Key)), Props, Default);
-        Else ->
-            Else
-    end;
-get_json_value(Key, Props, Default) when is_binary(Key) ->
-    Ref = make_ref(),
-    case couch_util:get_value(Key, Props, Ref) of
-        Ref ->
-            couch_util:get_value(list_to_atom(?b2l(Key)), Props, Default);
-        Else ->
-            Else
-    end.
-
-
--ifdef(TEST).
-
--include_lib("couch/include/couch_eunit.hrl").
-
-replicator_shards_test_() ->
-{
-      foreach,
-      fun() -> test_util:start_couch([mem3, fabric]) end,
-      fun(Ctx) -> test_util:stop_couch(Ctx) end,
-      [
-          t_pass_replicator_shard(),
-          t_fail_non_replicator_shard()
-     ]
-}.
-
-
-t_pass_replicator_shard() ->
-    ?_test(begin
-        DbName0 = ?tempdb(),
-        DbName = <<DbName0/binary, "/_replicator">>,
-        ok = fabric:create_db(DbName, [?CTX]),
-        ?assertEqual(8, length(replicator_shards(DbName))),
-        fabric:delete_db(DbName, [?CTX])
-    end).
-
-
-t_fail_non_replicator_shard() ->
-    ?_test(begin
-        DbName = ?tempdb(),
-        ok = fabric:create_db(DbName, [?CTX]),
-        ?assertEqual([], replicator_shards(DbName)),
-        fabric:delete_db(DbName, [?CTX])
-    end).
-
-
--endif.
+after_doc_read(Doc, Db) ->
+    couch_replicator_docs:after_doc_read(Doc, Db).
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
index 45d54b1..0739222 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -39,7 +39,8 @@
    job_summary/2,
    health_threshold/0,
    jobs/0,
-   job/1
+   job/1,
+   restart_job/1
 ]).
 
 %% config_listener callbacks
@@ -192,10 +193,24 @@ find_jobs_by_doc(DbName, DocId) ->
     [RepId || [RepId] <- ets:match(?MODULE, MatchSpec)].
 
 
+-spec restart_job(binary() | list() | rep_id()) ->
+    {ok, {[_]}} | {error, not_found}.
+restart_job(JobId) ->
+    case rep_state(JobId) of
+        nil ->
+            {error, not_found};
+        #rep{} = Rep ->
+            ok = remove_job(JobId),
+            ok = add_job(Rep),
+            job(JobId)
+    end.
+
+
 %% gen_server functions
 
 init(_) ->
-    EtsOpts = [named_table, {read_concurrency, true}, {keypos, #job.id}],
+    EtsOpts = [named_table, {keypos, #job.id}, {read_concurrency, true},
+        {write_concurrency, true}],
     ?MODULE = ets:new(?MODULE, EtsOpts),
     ok = config:listen_for_changes(?MODULE, nil),
     Interval = config:get_integer("replicator", "interval",
@@ -258,7 +273,8 @@ handle_cast({set_interval, Interval}, State) when is_integer(Interval),
     couch_log:notice("~p: interval set to ~B", [?MODULE, Interval]),
     {noreply, State#state{interval = Interval}};
 
-handle_cast(_, State) ->
+handle_cast(UnexpectedMsg, State) ->
+    couch_log:error("~p: received un-expected cast ~p", [?MODULE, UnexpectedMsg]),
     {noreply, State}.
 
 
diff --git a/src/couch_replicator/src/couch_replicator_sup.erl b/src/couch_replicator/src/couch_replicator_sup.erl
index 57ad63b..5475e8f 100644
--- a/src/couch_replicator/src/couch_replicator_sup.erl
+++ b/src/couch_replicator/src/couch_replicator_sup.erl
@@ -26,18 +26,56 @@ init(_Args) ->
             brutal_kill,
             worker,
             dynamic},
-        {couch_replicator_manager,
-            {couch_replicator_manager, start_link, []},
+       {couch_replicator_clustering,
+            {couch_replicator_clustering, start_link, []},
             permanent,
             brutal_kill,
             worker,
-            [couch_replicator_manager]},
-        {couch_replicator_job_sup,
-            {couch_replicator_job_sup, start_link, []},
+            [couch_replicator_clustering]},
+       {couch_replicator_connection,
+            {couch_replicator_connection, start_link, []},
+            permanent,
+            brutal_kill,
+            worker,
+            [couch_replicator_connection]},
+       {couch_replicator_rate_limiter,
+            {couch_replicator_rate_limiter, start_link, []},
+            permanent,
+            brutal_kill,
+            worker,
+            [couch_replicator_rate_limiter]},
+        {couch_replicator_scheduler_sup,
+            {couch_replicator_scheduler_sup, start_link, []},
             permanent,
             infinity,
             supervisor,
-            [couch_replicator_job_sup]}
+            [couch_replicator_scheduler_sup]},
+        {couch_replicator_scheduler,
+            {couch_replicator_scheduler, start_link, []},
+            permanent,
+            brutal_kill,
+            worker,
+            [couch_replicator_scheduler]},
+        {couch_replicator_doc_processor,
+            {couch_replicator_doc_processor, start_link, []},
+            permanent,
+            brutal_kill,
+            worker,
+            [couch_replicator_doc_processor]},
+        {couch_replicator,
+            % This is a simple function call which does not create a process
+            % but returns `ignore`. It is used to make sure each node
+            % has a local `_replicator` database.
+            {couch_replicator, ensure_rep_db_exists, []},
+            transient,
+            brutal_kill,
+            worker,
+            [couch_replicator]},
+        {couch_replicator_db_changes,
+            {couch_replicator_db_changes, start_link, []},
+            permanent,
+            brutal_kill,
+            worker,
+            [couch_multidb_changes]}
     ],
-    {ok, {{one_for_one,10,1}, Children}}.
-
+    {ok, {{rest_for_one,10,1}, Children}}.
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl
index ee0c455..1907879 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -196,6 +196,9 @@ handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) ->
         {noreply, State2}
     end;
 
+handle_info({'EXIT', _Pid, max_backoff}, State) ->
+    {stop, {shutdown, max_backoff}, State};
+
 handle_info({'EXIT', Pid, Reason}, State) ->
    {stop, {process_died, Pid, Reason}, State}.
 
diff --git a/src/couch_replicator/test/couch_replicator_compact_tests.erl b/src/couch_replicator/test/couch_replicator_compact_tests.erl
index 7a5a25a..3e6bb9e 100644
--- a/src/couch_replicator/test/couch_replicator_compact_tests.erl
+++ b/src/couch_replicator/test/couch_replicator_compact_tests.erl
@@ -16,6 +16,11 @@
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_replicator/src/couch_replicator.hrl").
 
+-import(couch_replicator_test_helper, [
+    db_url/1,
+    get_pid/1
+]).
+
 -define(ATTFILE, filename:join([?FIXTURESDIR, "logo.png"])).
 -define(DELAY, 100).
 -define(TIMEOUT, 30000).
@@ -92,7 +97,7 @@ should_run_replication(RepPid, RepId, Source, Target) ->
 should_ensure_replication_still_running(RepPid, RepId, Source, Target) ->
     ?_test(check_active_tasks(RepPid, RepId, Source, Target)).
 
-check_active_tasks(RepPid, {BaseId, Ext} = _RepId, Src, Tgt) ->
+check_active_tasks(RepPid, {BaseId, Ext} = RepId, Src, Tgt) ->
     Source = case Src of
         {remote, NameSrc} ->
             <<(db_url(NameSrc))/binary, $/>>;
@@ -107,7 +112,7 @@ check_active_tasks(RepPid, {BaseId, Ext} = _RepId, Src, Tgt) ->
     end,
     FullRepId = ?l2b(BaseId ++ Ext),
     Pid = ?l2b(pid_to_list(RepPid)),
-    ok = wait_for_replicator(RepPid),
+    ok = wait_for_replicator(RepId),
     [RepTask] = couch_task_status:all(),
     ?assertEqual(Pid, couch_util:get_value(pid, RepTask)),
     ?assertEqual(FullRepId, couch_util:get_value(replication_id, RepTask)),
@@ -124,16 +129,20 @@ check_active_tasks(RepPid, {BaseId, Ext} = _RepId, Src, Tgt) ->
     Pending = couch_util:get_value(changes_pending, RepTask),
     ?assert(is_integer(Pending)).
 
-wait_for_replicator(Pid) ->
+
+rep_details(RepId) ->
+    gen_server:call(get_pid(RepId), get_details).
+
+wait_for_replicator(RepId) ->
     %% since replicator started asynchronously
     %% we need to wait when it would be in couch_task_status
     %% we query replicator:details to ensure that do_init happen
-    ?assertMatch({ok, _}, couch_replicator:details(Pid)),
+    ?assertMatch({ok, _}, rep_details(RepId)),
     ok.
 
 should_cancel_replication(RepId, RepPid) ->
     ?_assertNot(begin
-        {ok, _} = couch_replicator:cancel_replication(RepId),
+        ok = couch_replicator_scheduler:remove_job(RepId),
         is_process_alive(RepPid)
     end).
 
@@ -295,13 +304,6 @@ wait_for_compaction(Type, Db) ->
                                          " database failed with: ", Reason])}]})
     end.
 
-db_url(DbName) ->
-    iolist_to_binary([
-        "http://", config:get("httpd", "bind_address", "127.0.0.1"),
-        ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
-        "/", DbName
-    ]).
-
 replicate({remote, Db}, Target) ->
     replicate(db_url(Db), Target);
 
@@ -315,7 +317,9 @@ replicate(Source, Target) ->
         {<<"continuous">>, true}
     ]},
     {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
-    {ok, Pid} = couch_replicator:async_replicate(Rep),
+    ok = couch_replicator_scheduler:add_job(Rep),
+    couch_replicator_scheduler:reschedule(),
+    Pid = get_pid(Rep#rep.id),
     {ok, Pid, Rep#rep.id}.
 
 
diff --git a/src/couch_replicator/test/couch_replicator_connection_tests.erl b/src/couch_replicator/test/couch_replicator_connection_tests.erl
new file mode 100644
index 0000000..ef3f2b3
--- /dev/null
+++ b/src/couch_replicator/test/couch_replicator_connection_tests.erl
@@ -0,0 +1,241 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_connection_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-define(TIMEOUT, 1000).
+
+
+setup() ->
+    Host = config:get("httpd", "bind_address", "127.0.0.1"),
+    Port = config:get("httpd", "port", "5984"),
+    {Host, Port}.
+
+teardown(_) ->
+    ok.
+
+
+httpc_pool_test_() ->
+    {
+        "replicator connection sharing tests",
+        {
+            setup,
+            fun() -> test_util:start_couch([couch_replicator]) end, fun test_util:stop_couch/1,
+            {
+                foreach,
+                fun setup/0, fun teardown/1,
+                [
+                    fun connections_shared_after_release/1,
+                    fun connections_not_shared_after_owner_death/1,
+                    fun idle_connections_closed/1,
+                    fun test_owner_monitors/1,
+                    fun worker_discards_creds_on_create/1,
+                    fun worker_discards_url_creds_after_request/1,
+                    fun worker_discards_creds_in_headers_after_request/1,
+                    fun worker_discards_proxy_creds_after_request/1
+                ]
+            }
+        }
+    }.
+
+
+connections_shared_after_release({Host, Port}) ->
+    ?_test(begin
+        URL = "http://" ++ Host ++ ":" ++ Port,
+        Self = self(),
+        {ok, Pid} = couch_replicator_connection:acquire(URL),
+        couch_replicator_connection:release(Pid),
+        spawn(fun() ->
+            Self ! couch_replicator_connection:acquire(URL)
+        end),
+        receive
+            {ok, Pid2} ->
+                ?assertEqual(Pid, Pid2)
+        end
+    end).
+
+
+connections_not_shared_after_owner_death({Host, Port}) ->
+    ?_test(begin
+        URL = "http://" ++ Host ++ ":" ++ Port,
+        Self = self(),
+        spawn(fun() ->
+            Self ! couch_replicator_connection:acquire(URL),
+            1/0
+        end),
+        receive
+            {ok, Pid} ->
+                {ok, Pid2} = couch_replicator_connection:acquire(URL),
+                ?assertNotEqual(Pid, Pid2),
+                MRef = monitor(process, Pid),
+                receive {'DOWN', MRef, process, Pid, _Reason} ->
+                    ?assert(not is_process_alive(Pid));
+                    Other -> throw(Other)
+                end
+        end
+    end).
+
+
+idle_connections_closed({Host, Port}) ->
+    ?_test(begin
+        URL = "http://" ++ Host ++ ":" ++ Port,
+        {ok, Pid} = couch_replicator_connection:acquire(URL),
+        couch_replicator_connection ! close_idle_connections,
+        ?assert(ets:member(couch_replicator_connection, Pid)),
+        % block until idle connections have closed
+        sys:get_status(couch_replicator_connection),
+        couch_replicator_connection:release(Pid),
+        couch_replicator_connection ! close_idle_connections,
+        % block until idle connections have closed
+        sys:get_status(couch_replicator_connection),
+        ?assert(not ets:member(couch_replicator_connection, Pid))
+    end).
+
+
+test_owner_monitors({Host, Port}) ->
+    ?_test(begin
+        URL = "http://" ++ Host ++ ":" ++ Port,
+        {ok, Worker0} = couch_replicator_connection:acquire(URL),
+        assert_monitors_equal([{process, self()}]),
+        couch_replicator_connection:release(Worker0),
+        assert_monitors_equal([]),
+        {Workers, Monitors}  = lists:foldl(fun(_, {WAcc, MAcc}) ->
+            {ok, Worker1} = couch_replicator_connection:acquire(URL),
+            MAcc1 = [{process, self()} | MAcc],
+            assert_monitors_equal(MAcc1),
+            {[Worker1 | WAcc], MAcc1}
+        end, {[], []}, lists:seq(1,5)),
+        lists:foldl(fun(Worker2, Acc) ->
+            [_ | NewAcc] = Acc,
+            couch_replicator_connection:release(Worker2),
+            assert_monitors_equal(NewAcc),
+            NewAcc
+        end, Monitors, Workers)
+    end).
+
+
+worker_discards_creds_on_create({Host, Port}) ->
+    ?_test(begin
+        {User, Pass, B64Auth} = user_pass(),
+        URL = "http://" ++ User ++ ":" ++ Pass ++ "@" ++ Host ++ ":" ++ Port,
+        {ok, WPid} = couch_replicator_connection:acquire(URL),
+        Internals = worker_internals(WPid),
+        ?assert(string:str(Internals, B64Auth) =:= 0),
+        ?assert(string:str(Internals, Pass) =:= 0)
+    end).
+
+
+worker_discards_url_creds_after_request({Host, _}) ->
+    ?_test(begin
+       {User, Pass, B64Auth} = user_pass(),
+       {Port, ServerPid} = server(),
+       PortStr = integer_to_list(Port),
+       URL = "http://" ++ User ++ ":" ++ Pass ++ "@" ++ Host ++ ":" ++ PortStr,
+       {ok, WPid} = couch_replicator_connection:acquire(URL),
+       ?assertMatch({ok, "200", _, _}, send_req(WPid, URL, [], [])),
+       Internals = worker_internals(WPid),
+       ?assert(string:str(Internals, B64Auth) =:= 0),
+       ?assert(string:str(Internals, Pass) =:= 0),
+       couch_replicator_connection:release(WPid),
+       unlink(ServerPid),
+       exit(ServerPid, kill)
+    end).
+
+
+worker_discards_creds_in_headers_after_request({Host, _}) ->
+    ?_test(begin
+       {_User, Pass, B64Auth} = user_pass(),
+       {Port, ServerPid} = server(),
+       PortStr = integer_to_list(Port),
+       URL = "http://" ++ Host ++ ":" ++ PortStr,
+       {ok, WPid} = couch_replicator_connection:acquire(URL),
+       Headers = [{"Authorization", "Basic " ++ B64Auth}],
+       ?assertMatch({ok, "200", _, _}, send_req(WPid, URL, Headers, [])),
+       Internals = worker_internals(WPid),
+       ?assert(string:str(Internals, B64Auth) =:= 0),
+       ?assert(string:str(Internals, Pass) =:= 0),
+       couch_replicator_connection:release(WPid),
+       unlink(ServerPid),
+       exit(ServerPid, kill)
+    end).
+
+
+worker_discards_proxy_creds_after_request({Host, _}) ->
+    ?_test(begin
+       {User, Pass, B64Auth} = user_pass(),
+       {Port, ServerPid} = server(),
+       PortStr = integer_to_list(Port),
+       URL = "http://" ++ Host ++ ":" ++ PortStr,
+       {ok, WPid} = couch_replicator_connection:acquire(URL),
+       Opts = [
+           {proxy_host, Host},
+           {proxy_port, Port},
+           {proxy_user, User},
+           {proxy_pass, Pass}
+       ],
+       ?assertMatch({ok, "200", _, _}, send_req(WPid, URL, [], Opts)),
+       Internals = worker_internals(WPid),
+       ?assert(string:str(Internals, B64Auth) =:= 0),
+       ?assert(string:str(Internals, Pass) =:= 0),
+       couch_replicator_connection:release(WPid),
+       unlink(ServerPid),
+       exit(ServerPid, kill)
+    end).
+
+
+send_req(WPid, URL, Headers, Opts) ->
+    ibrowse:send_req_direct(WPid, URL, Headers, get, [], Opts).
+
+
+user_pass() ->
+    User = "specialuser",
+    Pass = "averysecretpassword",
+    B64Auth = ibrowse_lib:encode_base64(User ++ ":" ++ Pass),
+    {User, Pass, B64Auth}.
+
+
+worker_internals(Pid) ->
+    Dict = io_lib:format("~p", [erlang:process_info(Pid, dictionary)]),
+    State = io_lib:format("~p", [sys:get_state(Pid)]),
+    lists:flatten([Dict, State]).
+
+
+server() ->
+    {ok, LSock} = gen_tcp:listen(0, [{recbuf, 256}, {active, false}]),
+    {ok, LPort} = inet:port(LSock),
+    SPid = spawn_link(fun() -> server_responder(LSock) end),
+    {LPort, SPid}.
+
+
+server_responder(LSock) ->
+    {ok, Sock} = gen_tcp:accept(LSock),
+    case gen_tcp:recv(Sock, 0) of
+        {ok, Data} ->
+            % sanity check that all the request data was received
+            ?assert(lists:prefix("GET ", Data)),
+            ?assert(lists:suffix("\r\n\r\n", Data)),
+            Res = ["HTTP/1.1 200 OK", "Content-Length: 0", "\r\n"],
+            ok = gen_tcp:send(Sock, string:join(Res, "\r\n"));
+        Other ->
+            gen_tcp:close(Sock),
+            throw({replication_eunit_tcp_server_crashed, Other})
+    end,
+    server_responder(LSock).
+
+
+assert_monitors_equal(ShouldBe) ->
+    sys:get_status(couch_replicator_connection),
+    {monitors, Monitors} = process_info(whereis(couch_replicator_connection), monitors),
+    ?assertEqual(Monitors, ShouldBe).
diff --git a/src/couch_replicator/test/couch_replicator_httpc_pool_tests.erl b/src/couch_replicator/test/couch_replicator_httpc_pool_tests.erl
index ea36f7f..c4ad4e9 100644
--- a/src/couch_replicator/test/couch_replicator_httpc_pool_tests.erl
+++ b/src/couch_replicator/test/couch_replicator_httpc_pool_tests.erl
@@ -30,7 +30,7 @@ httpc_pool_test_() ->
         "httpc pool tests",
         {
             setup,
-            fun test_util:start_couch/0, fun test_util:stop_couch/1,
+            fun() -> test_util:start_couch([couch_replicator]) end, fun test_util:stop_couch/1,
             {
                 foreach,
                 fun setup/0, fun teardown/1,
diff --git a/src/couch_replicator/test/couch_replicator_many_leaves_tests.erl b/src/couch_replicator/test/couch_replicator_many_leaves_tests.erl
index bde0e2c..a6999bd 100644
--- a/src/couch_replicator/test/couch_replicator_many_leaves_tests.erl
+++ b/src/couch_replicator/test/couch_replicator_many_leaves_tests.erl
@@ -15,6 +15,11 @@
 -include_lib("couch/include/couch_eunit.hrl").
 -include_lib("couch/include/couch_db.hrl").
 
+-import(couch_replicator_test_helper, [
+    db_url/1,
+    replicate/2
+]).
+
 -define(DOCS_CONFLICTS, [
     {<<"doc1">>, 10},
     {<<"doc2">>, 100},
@@ -199,22 +204,3 @@ add_attachments(SourceDb, NumAtts,  [{DocId, NumConflicts} | Rest]) ->
     ?assertEqual(length(NewDocs), length(NewRevs)),
     add_attachments(SourceDb, NumAtts, Rest).
 
-db_url(DbName) ->
-    iolist_to_binary([
-        "http://", config:get("httpd", "bind_address", "127.0.0.1"),
-        ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
-        "/", DbName
-    ]).
-
-replicate(Source, Target) ->
-    RepObject = {[
-        {<<"source">>, Source},
-        {<<"target">>, Target}
-    ]},
-    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
-    {ok, Pid} = couch_replicator:async_replicate(Rep),
-    MonRef = erlang:monitor(process, Pid),
-    receive
-        {'DOWN', MonRef, process, Pid, _} ->
-            ok
-    end.
diff --git a/src/couch_replicator/test/couch_replicator_modules_load_tests.erl b/src/couch_replicator/test/couch_replicator_modules_load_tests.erl
index 96a9346..a552d14 100644
--- a/src/couch_replicator/test/couch_replicator_modules_load_tests.erl
+++ b/src/couch_replicator/test/couch_replicator_modules_load_tests.erl
@@ -28,13 +28,18 @@ should_load_modules() ->
         couch_replicator_httpc,
         couch_replicator_httpd,
         couch_replicator_manager,
+        couch_replicator_scheduler,
+        couch_replicator_scheduler_job,
+        couch_replicator_docs,
+        couch_replicator_clustering,
+        couch_replicator_changes_reader,
+        couch_replicator_ids,
         couch_replicator_notifier,
         couch_replicator,
         couch_replicator_worker,
-        couch_replicator_utils,
-        couch_replicator_job_sup
+        couch_replicator_utils
     ],
     [should_load_module(Mod) || Mod <- Modules].
 
 should_load_module(Mod) ->
-    {atom_to_list(Mod), ?_assertMatch({module, _}, code:load_file(Mod))}.
+    {atom_to_list(Mod), ?_assertMatch({module, _}, code:ensure_loaded(Mod))}.
diff --git a/src/couch_replicator/test/couch_replicator_proxy_tests.erl b/src/couch_replicator/test/couch_replicator_proxy_tests.erl
new file mode 100644
index 0000000..a40e5b1
--- /dev/null
+++ b/src/couch_replicator/test/couch_replicator_proxy_tests.erl
@@ -0,0 +1,69 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_proxy_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch_replicator/src/couch_replicator.hrl").
+-include_lib("couch_replicator/src/couch_replicator_api_wrap.hrl").
+
+
+setup() ->
+    ok.
+
+
+teardown(_) ->
+    ok.
+
+
+replicator_proxy_test_() ->
+    {
+        "replicator proxy tests",
+        {
+            setup,
+            fun() -> test_util:start_couch([couch_replicator]) end, fun test_util:stop_couch/1,
+            {
+                foreach,
+                fun setup/0, fun teardown/1,
+                [
+                    fun parse_rep_doc_without_proxy/1,
+                    fun parse_rep_doc_with_proxy/1
+                ]
+            }
+        }
+    }.
+
+
+parse_rep_doc_without_proxy(_) ->
+    ?_test(begin
+        NoProxyDoc = {[
+            {<<"source">>, <<"http://unproxied.com">>},
+            {<<"target">>, <<"http://otherunproxied.com">>}
+        ]},
+        Rep = couch_replicator_docs:parse_rep_doc(NoProxyDoc),
+        ?assertEqual((Rep#rep.source)#httpdb.proxy_url, undefined),
+        ?assertEqual((Rep#rep.target)#httpdb.proxy_url, undefined)
+    end).
+
+
+parse_rep_doc_with_proxy(_) ->
+    ?_test(begin
+        ProxyURL = <<"http://myproxy.com">>,
+        ProxyDoc = {[
+            {<<"source">>, <<"http://unproxied.com">>},
+            {<<"target">>, <<"http://otherunproxied.com">>},
+            {<<"proxy">>, ProxyURL}
+        ]},
+        Rep = couch_replicator_docs:parse_rep_doc(ProxyDoc),
+        ?assertEqual((Rep#rep.source)#httpdb.proxy_url, binary_to_list(ProxyURL)),
+        ?assertEqual((Rep#rep.target)#httpdb.proxy_url, binary_to_list(ProxyURL))
+    end).
diff --git a/src/couch_replicator/test/couch_replicator_test_helper.erl b/src/couch_replicator/test/couch_replicator_test_helper.erl
index 398b27b..bbca0ae 100644
--- a/src/couch_replicator/test/couch_replicator_test_helper.erl
+++ b/src/couch_replicator/test/couch_replicator_test_helper.erl
@@ -2,8 +2,16 @@
 
 -include_lib("couch/include/couch_eunit.hrl").
 -include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_replicator/src/couch_replicator.hrl").
 
--export([compare_dbs/2, compare_dbs/3, db_url/1, replicate/1, replicate/2]).
+-export([
+    compare_dbs/2,
+    compare_dbs/3,
+    db_url/1,
+    replicate/1,
+    get_pid/1,
+    replicate/2
+]).
 
 
 compare_dbs(Source, Target) ->
@@ -103,6 +111,11 @@ db_url(DbName) ->
         "/", DbName
     ]).
 
+get_pid(RepId) ->
+    Pid = global:whereis_name({couch_replicator_scheduler_job,RepId}),
+    ?assert(is_pid(Pid)),
+    Pid.
+
 replicate(Source, Target) ->
     replicate({[
         {<<"source">>, Source},
@@ -111,9 +124,12 @@ replicate(Source, Target) ->
 
 replicate({[_ | _]} = RepObject) ->
     {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
-    {ok, Pid} = couch_replicator:async_replicate(Rep),
+    ok = couch_replicator_scheduler:add_job(Rep),
+    couch_replicator_scheduler:reschedule(),
+    Pid = get_pid(Rep#rep.id),
     MonRef = erlang:monitor(process, Pid),
     receive
         {'DOWN', MonRef, process, Pid, _} ->
             ok
-    end.
+    end,
+    ok = couch_replicator_scheduler:remove_job(Rep#rep.id).
diff --git a/src/couch_replicator/test/couch_replicator_use_checkpoints_tests.erl b/src/couch_replicator/test/couch_replicator_use_checkpoints_tests.erl
index e04488e..73ea7f1 100644
--- a/src/couch_replicator/test/couch_replicator_use_checkpoints_tests.erl
+++ b/src/couch_replicator/test/couch_replicator_use_checkpoints_tests.erl
@@ -15,6 +15,11 @@
 -include_lib("couch/include/couch_eunit.hrl").
 -include_lib("couch/include/couch_db.hrl").
 
+-import(couch_replicator_test_helper, [
+    db_url/1,
+    replicate/1
+]).
+
 -define(DOCS_COUNT, 100).
 -define(TIMEOUT_EUNIT, 30).
 -define(i2l(I), integer_to_list(I)).
@@ -167,23 +172,10 @@ compare_dbs(Source, Target) ->
     ok = couch_db:close(SourceDb),
     ok = couch_db:close(TargetDb).
 
-db_url(DbName) ->
-    iolist_to_binary([
-        "http://", config:get("httpd", "bind_address", "127.0.0.1"),
-        ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
-        "/", DbName
-    ]).
-
 replicate(Source, Target, UseCheckpoints) ->
-    RepObject = {[
+    replicate({[
         {<<"source">>, Source},
         {<<"target">>, Target},
         {<<"use_checkpoints">>, UseCheckpoints}
-    ]},
-    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
-    {ok, Pid} = couch_replicator:async_replicate(Rep),
-    MonRef = erlang:monitor(process, Pid),
-    receive
-        {'DOWN', MonRef, process, Pid, _} ->
-            ok
-    end.
+    ]}).
+
diff --git a/test/javascript/tests/replicator_db_bad_rep_id.js b/test/javascript/tests/replicator_db_bad_rep_id.js
index 529bbaa..30a1245 100644
--- a/test/javascript/tests/replicator_db_bad_rep_id.js
+++ b/test/javascript/tests/replicator_db_bad_rep_id.js
@@ -53,8 +53,7 @@ couchTests.replicator_db_bad_rep_id = function(debug) {
     T(repDoc1._replication_state === "completed",
       "replication document with bad replication id failed");
     T(typeof repDoc1._replication_state_time === "string");
-    T(typeof repDoc1._replication_id  === "string");
-    T(repDoc1._replication_id !== "1234abc");
+    T(typeof repDoc1._replication_id === "undefined");
   }
 
   /*var server_config = [

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.