You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2020/07/14 16:54:45 UTC

[couchdb] 01/01: WIP 2 - Rebased on latest master

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch prototype/fdb-replicator
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit f18ca6c1da75652e1c9a99c7ed31210c6a062f03
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Tue Jul 14 12:53:51 2020 -0400

    WIP 2 - Rebased on latest master
---
 src/couch_replicator/src/couch_replicator.erl      | 168 ++--
 src/couch_replicator/src/couch_replicator.hrl      |  82 +-
 .../src/couch_replicator_acceptor.erl              |  49 ++
 .../src/couch_replicator_api_wrap.erl              | 120 +--
 .../src/couch_replicator_clustering.erl            | 279 -------
 .../src/couch_replicator_db_changes.erl            | 108 ---
 .../src/couch_replicator_doc_processor.erl         | 916 +++++++++++----------
 src/couch_replicator/src/couch_replicator_docs.erl | 678 ++++++++-------
 .../src/couch_replicator_filters.erl               |  42 +-
 src/couch_replicator/src/couch_replicator_ids.erl  |  62 +-
 .../src/couch_replicator_notifier.erl              |  58 --
 .../src/couch_replicator_scheduler_job.erl         | 338 ++++----
 .../src/couch_replicator_scheduler_sup.erl         |   6 +-
 src/couch_replicator/src/couch_replicator_sup.erl  |  23 +-
 .../src/couch_replicator_utils.erl                 |  91 +-
 .../src/couch_replicator_validate_doc.erl          | 119 +++
 .../couch_replicator_error_reporting_tests.erl     | 271 ------
 17 files changed, 1490 insertions(+), 1920 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index b38f31b..69bbb2f 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -14,6 +14,7 @@
 
 -export([
     replicate/2,
+    ensure_rep_db_exists/0,
     replication_states/0,
     job/1,
     doc/3,
@@ -51,80 +52,83 @@
     {ok, {cancelled, binary()}} |
     {error, any()} |
     no_return().
-replicate(PostBody, Ctx) ->
-    {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, Ctx),
-    Rep = Rep0#rep{start_time = os:timestamp()},
-    #rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep,
-    case get_value(cancel, Options, false) of
-    true ->
-        CancelRepId = case get_value(id, Options, nil) of
-        nil ->
-            RepId;
-        RepId2 ->
-            RepId2
-        end,
-        case check_authorization(CancelRepId, UserCtx) of
-        ok ->
-            cancel_replication(CancelRepId);
-        not_found ->
-            {error, not_found}
-        end;
-    false ->
-        check_authorization(RepId, UserCtx),
-        {ok, Listener} = rep_result_listener(RepId),
-        Result = do_replication_loop(Rep),
-        couch_replicator_notifier:stop(Listener),
-        Result
-    end.
-
-
--spec do_replication_loop(#rep{}) ->
-    {ok, {continuous, binary()}} | {ok, tuple()} | {error, any()}.
-do_replication_loop(#rep{id = {BaseId, Ext} = Id, options = Options} = Rep) ->
-    ok = couch_replicator_scheduler:add_job(Rep),
-    case get_value(continuous, Options, false) of
-    true ->
-        {ok, {continuous, ?l2b(BaseId ++ Ext)}};
-    false ->
-        wait_for_result(Id)
+replicate(PostBody, #user_ctx{name = UserName}) ->
+    {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, UserName),
+    Rep = Rep0#{<<"start_time">> => erlang:system_time()},
+    #{<<"id">> := RepId, <<"options">> := Options} = Rep,
+    case maps:get(<<"cancel">>, Options, false) of
+        true ->
+            CancelRepId = case maps:get(<<"id">>, Options, nil) of
+                nil -> RepId;
+                RepId2 -> RepId2
+            end,
+            case check_authorization(CancelRepId, UserCtx) of
+                ok -> cancel_replication(CancelRepId);
+                not_found -> {error, not_found}
+            end;
+        false ->
+            check_authorization(RepId, UserCtx),
+            ok = couch_replicator_scheduler:add_job(Rep),
+            case maps:get(<<"continuous">>, Options, false) of
+                true -> {ok, {continuous, Id}};
+                false -> wait_for_result(Id)
+            end
     end.
 
 
--spec rep_result_listener(rep_id()) -> {ok, pid()}.
-rep_result_listener(RepId) ->
-    ReplyTo = self(),
-    {ok, _Listener} = couch_replicator_notifier:start_link(
-        fun({_, RepId2, _} = Ev) when RepId2 =:= RepId ->
-                ReplyTo ! Ev;
-            (_) ->
-                ok
-        end).
+% This is called from supervisor. Must respect supervisor protocol so
+% it returns `ignore`.
+-spec ensure_rep_db_exists() -> ignore.
+ensure_rep_db_exists() ->
+    couch_jobs:set_type_timeout(?REP_DOCS, ?REP_DOCS_TIMEOUT_MSEC),
+    couch_jobs:set_type_timeout(?REP_JOBS, ?REP_JOBS_TIMEOUT_MSEC),
+    case config:get_boolean("replicator", "create_replicator_db", false) of
+        true ->
+            ok = couch_replicator_docs:ensure_rep_db_exists();
+        false ->
+            ok
+    end,
+    ignore.
 
 
 -spec wait_for_result(rep_id()) ->
     {ok, {[_]}} | {error, any()}.
 wait_for_result(RepId) ->
-    receive
-    {finished, RepId, RepResult} ->
-        {ok, RepResult};
-    {error, RepId, Reason} ->
-        {error, Reason}
+    FinishRes = case couch_jobs:subscribe(?REP_JOBS, RepId) of
+        {ok, finished, JobData} ->
+            {ok, JobData};
+        {ok, SubId, _, _} ->
+            case couch_jobs:wait(SubId, finished, infinity) of
+                {?REP_JOBS, RepId, finished, JobData} -> {ok, JobData};
+                timeout -> timeout
+            end;
+        {error, Error} ->
+            {error, Error}
+    end,
+    case FinishRes of
+       {ok, #{<<"finished_result">> := CheckpointHistory}} ->
+            {ok, CheckpointHistory};
+       timeout ->
+            {error, timeout};
+       {error, Error} ->
+            {error, Error}
     end.
 
 
 -spec cancel_replication(rep_id()) ->
     {ok, {cancelled, binary()}} | {error, not_found}.
-cancel_replication({BasedId, Extension} = RepId) ->
-    FullRepId = BasedId ++ Extension,
-    couch_log:notice("Canceling replication '~s' ...", [FullRepId]),
-    case couch_replicator_scheduler:rep_state(RepId) of
-    #rep{} ->
-        ok = couch_replicator_scheduler:remove_job(RepId),
-        couch_log:notice("Replication '~s' cancelled", [FullRepId]),
-        {ok, {cancelled, ?l2b(FullRepId)}};
-    nil ->
-        couch_log:notice("Replication '~s' not found", [FullRepId]),
-        {error, not_found}
+cancel_replication(RepId) when is_binary(RepId) ->
+    couch_log:notice("Canceling replication '~s' ...", [RepId]),
+    case couch_jobs:get_job_data(undefined, ?REP_JOBS, RepId) of
+        {error_not, found} ->
+            {error, not_found};
+        #{<<"rep">> := #{<<"db_name">> := null}} ->
+            couch_jobs:remove(undefined, ?REP_JOBS, RepId)
+            {ok, {cancelled, ?l2b(FullRepId)}};
+        #{<<"rep">> := #{}} ->
+            % Job was started from a replicator doc canceling via _replicate
+            % doesn't quite make sense, instead replicator should be deleted.
+            {error, not_found}
     end.
 
 
@@ -133,11 +137,11 @@ replication_states() ->
     ?REPLICATION_STATES.
 
 
--spec strip_url_creds(binary() | {[_]}) -> binary().
+-spec strip_url_creds(binary() | #{}) -> binary().
 strip_url_creds(Endpoint) ->
     try
-        couch_replicator_docs:parse_rep_db(Endpoint, [], []) of
-            #httpdb{url = Url} ->
+        couch_replicator_docs:parse_rep_db(Endpoint, #{}, #{}) of
+            #{<<"url">> := Url} ->
                 iolist_to_binary(couch_util:url_strip_password(Url))
     catch
         throw:{error, local_endpoints_not_supported} ->
@@ -250,9 +254,8 @@ info_from_doc(RepDb, {Props}) ->
                         end
             end;
         failed ->
-            Info = get_value(<<"_replication_state_reason">>, Props, nil),
-            EJsonInfo = couch_replicator_utils:ejson_state_info(Info),
-            {State0, EJsonInfo, 1, StateTime};
+            Info = get_value(<<"_replication_state_reason">>, Props, null),
+            {State0, Info, 1, StateTime};
         _OtherState ->
             {null, null, 0, null}
     end,
@@ -280,13 +283,13 @@ state_atom(State) when is_atom(State) ->
 
 -spec check_authorization(rep_id(), #user_ctx{}) -> ok | not_found.
 check_authorization(RepId, #user_ctx{name = Name} = Ctx) ->
-    case couch_replicator_scheduler:rep_state(RepId) of
-    #rep{user_ctx = #user_ctx{name = Name}} ->
-        ok;
-    #rep{} ->
-        couch_httpd:verify_is_server_admin(Ctx);
-    nil ->
-        not_found
+    case couch_jobs:get_job_data(undefined, ?REP_JOBS, RePid) of
+        {error_not, found} ->
+            not_found;
+        #{<<"rep">> := {<<"user">> := Name}} ->
+            ok;
+        #{} ->
+            couch_httpd:verify_is_server_admin(Ctx)
     end.
 
 
@@ -336,23 +339,14 @@ t_replication_not_found() ->
     end).
 
 
-expect_rep_user_ctx(Name, Role) ->
-    meck:expect(couch_replicator_scheduler, rep_state,
-        fun(_Id) ->
-            UserCtx = #user_ctx{name = Name, roles = [Role]},
-            #rep{user_ctx = UserCtx}
-        end).
-
 
 strip_url_creds_test_() ->
      {
-        setup,
-        fun() ->
-            meck:expect(config, get, fun(_, _, Default) -> Default end)
-        end,
-        fun(_) ->
-            meck:unload()
+        foreach,
+        fun () -> meck:expect(config, get,
+            fun(_, _, Default) -> Default end)
         end,
+        fun (_) -> meck:unload() end,
         [
             t_strip_http_basic_creds(),
             t_strip_http_props_creds(),
diff --git a/src/couch_replicator/src/couch_replicator.hrl b/src/couch_replicator/src/couch_replicator.hrl
index 2a5b7c8..d94c3eb 100644
--- a/src/couch_replicator/src/couch_replicator.hrl
+++ b/src/couch_replicator/src/couch_replicator.hrl
@@ -12,32 +12,60 @@
 
 -define(REP_ID_VERSION, 4).
 
--record(rep, {
-    id :: rep_id() | '_' | 'undefined',
-    source :: any() | '_',
-    target :: any() | '_',
-    options :: [_] | '_',
-    user_ctx :: any() | '_',
-    type = db :: atom() | '_',
-    view = nil :: any() | '_',
-    doc_id :: any() | '_',
-    db_name = null :: null | binary() | '_',
-    start_time = {0, 0, 0} :: erlang:timestamp() | '_',
-    stats = couch_replicator_stats:new() :: orddict:orddict() | '_'
-}).
-
--type rep_id() :: {string(), string()}.
+% Couch jobs types and timeouts
+-define(REP_DOCS, <<"repdocs">>).
+-define(REP_JOBS, <<"repjobs">>).
+-define(REP_DOCS_TIMEOUT_MSEC, 17000).
+-define(REP_JOBS_TIMEOUT_MSEC, 33000).
+
+% Some fields from the replication doc
+-define(SOURCE, <<"source">>).
+-define(TARGET, <<"target">>).
+-define(CREATE_TARGET, <<"create_target">>).
+-define(DOC_IDS, <<"doc_ids">>).
+-define(SELECTOR, <<"selector">>).
+-define(FILTER, <<"filter">>).
+-define(QUERY_PARAMS, <<"query_params">>).
+-define(URL, <<"url">>).
+-define(AUTH, <<"auth">>).
+-define(HEADERS, <<"headers">>).
+
+% Replication states
+-define(ST_ERROR, <<"error">>).
+-define(ST_FINISHED, <<"completed">>).
+-define(ST_RUNNING, <<"running">>).
+-define(ST_INITIALIZING, <<"initializing">>).
+-define(ST_FAILED, <<"failed">>).
+-define(ST_PENDING, <<"pending">>).
+-define(ST_ERROR, <<"error">>).
+-define(ST_CRASHING, <<"crashing">>).
+-define(ST_TRIGGERED, <<"triggered">>).
+
+% Some fields from a rep object
+-define(REP_ID, <<"id">>).
+-define(DB_NAME, <<"db_name">>).
+-define(DOC_ID, <<"doc_id">>).
+-define(START_TIME, <<"start_time">>).
+
+% Fields couch job data objects
+-define(REP, <<"rep">>).
+-define(REP_PARSE_ERROR, <<"rep_parse_error">>).
+-define(STATE, <<"state">>).
+-define(STATE_INFO, <<"state_info">>).
+-define(DOC_STATE, <<"doc_state">>).
+-define(DB_NAME, <<"db_name">>).
+-define(DOC_ID, <<"doc_id">>).
+-define(ERROR_COUNT, <<"error_count">>).
+-define(LAST_UPDATED, <<"last_updated">>).
+-define(HISTORY, <<"history">>).
+-define(VER, <<"ver">>).
+
+% Accepted job message tag
+-define(ACCEPTED_JOB, accepted_job).
+
+
+
+-type rep_id() :: binary().
+-type user_name() :: binary() | null.
 -type db_doc_id() :: {binary(), binary() | '_'}.
 -type seconds() :: non_neg_integer().
--type rep_start_result() ::
-    {ok, rep_id()} |
-    ignore |
-    {temporary_error, binary()} |
-    {permanent_failure, binary()}.
-
-
--record(doc_worker_result, {
-    id :: db_doc_id(),
-    wref :: reference(),
-    result :: rep_start_result()
-}).
diff --git a/src/couch_replicator/src/couch_replicator_acceptor.erl b/src/couch_replicator/src/couch_replicator_acceptor.erl
new file mode 100644
index 0000000..bd67d66
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_acceptor.erl
@@ -0,0 +1,49 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_acceptor).
+
+
+-include("couch_replicator.hrl").
+
+
+-export([
+   start/4,
+   stop/1
+]).
+
+
+start(_Type, Count, _Parent, _Opts) when Count < 1 ->
+    #{};
+
+start(Type, Count, Parent, Opts) when Count > 1 ->
+    lists:foldl(fun(_N, Acc) ->
+        Pid = spawn_link(fun() -> accept(Type, Parent, Opts) end),
+        Acc#{Pid => true}
+    end, #{}, lists:seq(1, Max)).
+
+
+stop(#{} = Acceptors) ->
+    maps:map(fun(Pid, true) ->
+        unlink(Pid),
+        exit(Pid, kill)
+    end, Acceptors),
+    ok.
+
+
+accept(Type, Parent, Opts) ->
+    case couch_jobs:accept(Type, Opts) of
+        {ok, Job, JobData} ->
+            ok = gen_server:cast(Parent, {?ACCEPTED_JOB, Job, JobData});
+        {error, not_found} ->
+            ok
+    end.
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index a21de42..ecd4c00 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -38,8 +38,8 @@
     open_doc/3,
     open_doc_revs/6,
     changes_since/5,
-    db_uri/1,
-    normalize_db/1
+    db_uri/1
+    db_from_map/1,
     ]).
 
 -import(couch_replicator_httpc, [
@@ -57,21 +57,19 @@
 -define(MAX_URL_LEN, 7000).
 -define(MIN_URL_LEN, 200).
 
-db_uri(#httpdb{url = Url}) ->
+db_uri(#{<<"url">> := Url}) ->
     couch_util:url_strip_password(Url);
 
-db_uri(DbName) when is_binary(DbName) ->
-    ?b2l(DbName);
+db_uri(#httpdb{url = Url}) ->
+    couch_util:url_strip_password(Url).
 
-db_uri(Db) ->
-    db_uri(couch_db:name(Db)).
 
+db_open(#{} = Db) ->
+    db_open(Db, false, []);
 
-db_open(Db) ->
-    db_open(Db, false, []).
 
-db_open(#httpdb{} = Db1, Create, CreateParams) ->
-    {ok, Db} = couch_replicator_httpc:setup(Db1),
+db_open(#{} = Db0, Create, CreateParams) ->
+    {ok, Db} = couch_replicator_httpc:setup(db_from_json(Db0)),
     try
         case Create of
         false ->
@@ -895,23 +893,6 @@ header_value(Key, Headers, Default) ->
     end.
 
 
-% Normalize an #httpdb{} or #db{} record such that it can be used for
-% comparisons. This means remove things like pids and also sort options / props.
-normalize_db(#httpdb{} = HttpDb) ->
-    #httpdb{
-        url = HttpDb#httpdb.url,
-        auth_props = lists:sort(HttpDb#httpdb.auth_props),
-        headers = lists:keysort(1, HttpDb#httpdb.headers),
-        timeout = HttpDb#httpdb.timeout,
-        ibrowse_options = lists:keysort(1, HttpDb#httpdb.ibrowse_options),
-        retries = HttpDb#httpdb.retries,
-        http_connections = HttpDb#httpdb.http_connections
-    };
-
-normalize_db(<<DbName/binary>>) ->
-    DbName.
-
-
 maybe_append_create_query_params(Db, []) ->
     Db;
 
@@ -920,27 +901,72 @@ maybe_append_create_query_params(Db, CreateParams) ->
     Db#httpdb{url = NewUrl}.
 
 
--ifdef(TEST).
+db_from_json(#{} = DbMap) ->
+    #{
+        <<"url">> := Url,
+        <<"auth">> := Auth,
+        <<"headers">> := Headers0,
+        <<"ibrowse_options">> := IBrowseOptions0,
+        <<"timeout">> := Timeout,
+        <<"http_connections">> := HttpConnections,
+        <<"retries">> := Retries,
+        <<"proxy_url">> := ProxyURL0
+    } = DbMap,
+    Headers = maps:fold(fun(K, V, Acc) ->
+        [{binary_to_list(K), binary_to_list(V)} | Acc]
+    end, [], Headers0),
+    IBrowseOptions0 = maps:fold(fun
+        (<<"proxy_protocol">>, V, Acc) ->
+            [{binary_to_atom(K), binary_to_existing_atom(V)} | Acc];
+        (<<"socket_options">>, #{} = SockOpts, Acc) ->
+            SockOptsKVs = maps:fold(fun sock_opts_fold/3, [], SockOpts),
+            [{socket_options, SockOptsKVs} | Acc];
+        (<<"ssl_options">>, #{} = SslOpts, Acc) ->
+            SslOptsKVs = maps:fold(fun ssl_opts_fold/3, [], SslOpts),
+            [{ssl_options, SslOptsKVs} | Acc];
+        (K, V, Acc) when is_binary(V) ->
+            [{binary_to_atom(K), binary_to_list(V)} | Acc];
+        (K, V, Acc) ->
+            [{binary_to_atom(K), V} | Acc]
+    end, [], IBrowseOptions0),
+    ProxyUrl = case ProxyUrl0 of
+        null -> undefined,
+        V when is_binary(V) -> binary_to_list(V)
+    end,
+    #httpdb{
+        url = binary_to_list(Url),
+        auth_props = maps:to_list(Auth),
+        headers = Headers,
+        ibrowse_options = IBrowseOptions,
+        timeout = Timeout,
+        http_connections = HttpConnections,
+        retries = Retries,
+        proxy_url = ProxyURL
+    }.
+
 
--include_lib("eunit/include/eunit.hrl").
 
+% See couch_replicator_docs:ssl_params/1 for ssl parsed options
+% and http://erlang.org/doc/man/ssl.html#type-server_option
+% all latest SSL server options
+%
+ssl_opts_fold(K, V, Acc) when is_boolean(V); is_integer(V) ->
+    [{binary_to_atom(K), V} | Acc];
+
+ssl_opts_fold(K, null, Acc) ->
+    [{binary_to_atom(K), undefined} | Acc];
 
-normalize_http_db_test() ->
-    HttpDb =  #httpdb{
-        url = "http://host/db",
-        auth_props = [{"key", "val"}],
-        headers = [{"k2","v2"}, {"k1","v1"}],
-        timeout = 30000,
-        ibrowse_options = [{k2, v2}, {k1, v1}],
-        retries = 10,
-        http_connections = 20
-    },
-    Expected = HttpDb#httpdb{
-        headers = [{"k1","v1"}, {"k2","v2"}],
-        ibrowse_options = [{k1, v1}, {k2, v2}]
-    },
-    ?assertEqual(Expected, normalize_db(HttpDb)),
-    ?assertEqual(<<"local">>, normalize_db(<<"local">>)).
+ssl_opts_fold(<<"verify">>, V, Acc) ->
+    [{binary_to_atom(K), binary_to_atom(V)};
 
+ssl_opts_fold(K, V, Acc) when is_list(V) ->
+    [{binary_to_atom(K), binary_to_list(V)} | Acc].
+
+
+% See ?VALID_SOCK_OPTS in couch_replicator_docs for accepted socket options
+%
+sock_opts_fold(K, V, Acc) when is_list(V) ->
+     [{binary_to_atom(K), binary_to_atom(V)} | Acc];
 
--endif.
+sock_opts_fold(K, V, Acc) when is_boolean(V); is_integer(V) ->
+    [{binary_to_atom(K), V} | Acc].
diff --git a/src/couch_replicator/src/couch_replicator_clustering.erl b/src/couch_replicator/src/couch_replicator_clustering.erl
deleted file mode 100644
index 18de1e8..0000000
--- a/src/couch_replicator/src/couch_replicator_clustering.erl
+++ /dev/null
@@ -1,279 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-
-% Maintain cluster membership and stability notifications for replications.
-% On changes to cluster membership, broadcast events to `replication` gen_event.
-% Listeners will get `{cluster, stable}` or `{cluster, unstable}` events.
-%
-% Cluster stability is defined as "there have been no nodes added or removed in
-% last `QuietPeriod` seconds". QuietPeriod value is configurable. To ensure a
-% speedier startup, during initialization there is a shorter StartupPeriod
-% in effect (also configurable).
-%
-% This module is also in charge of calculating ownership of replications based
-% on where their _replicator db documents shards live.
-
-
--module(couch_replicator_clustering).
-
--behaviour(gen_server).
--behaviour(config_listener).
--behaviour(mem3_cluster).
-
--export([
-    start_link/0
-]).
-
--export([
-    init/1,
-    terminate/2,
-    handle_call/3,
-    handle_info/2,
-    handle_cast/2,
-    code_change/3
-]).
-
--export([
-    owner/2,
-    is_stable/0,
-    link_cluster_event_listener/3
-]).
-
-% config_listener callbacks
--export([
-    handle_config_change/5,
-    handle_config_terminate/3
-]).
-
-% mem3_cluster callbacks
--export([
-    cluster_stable/1,
-    cluster_unstable/1
-]).
-
--include_lib("couch/include/couch_db.hrl").
--include_lib("mem3/include/mem3.hrl").
-
--define(DEFAULT_QUIET_PERIOD, 60). % seconds
--define(DEFAULT_START_PERIOD, 5). % seconds
--define(RELISTEN_DELAY, 5000).
-
--record(state, {
-    mem3_cluster_pid :: pid(),
-    cluster_stable :: boolean()
-}).
-
-
--spec start_link() -> {ok, pid()} | ignore | {error, term()}.
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-
-% owner/2 function computes ownership for a {DbName, DocId} tuple
-% `unstable` if cluster is considered to be unstable i.e. it has changed
-% recently, or returns node() which of the owner.
-%
--spec owner(Dbname :: binary(), DocId :: binary()) -> node() | unstable.
-owner(<<"shards/", _/binary>> = DbName, DocId) ->
-    case is_stable() of
-        false ->
-            unstable;
-        true ->
-            owner_int(DbName, DocId)
-    end;
-owner(_DbName, _DocId) ->
-    node().
-
-
--spec is_stable() -> true | false.
-is_stable() ->
-    gen_server:call(?MODULE, is_stable).
-
-
--spec link_cluster_event_listener(atom(), atom(), list()) -> pid().
-link_cluster_event_listener(Mod, Fun, Args)
-        when is_atom(Mod), is_atom(Fun), is_list(Args) ->
-    CallbackFun =
-        fun(Event = {cluster, _}) -> erlang:apply(Mod, Fun, Args ++ [Event]);
-           (_) -> ok
-        end,
-    {ok, Pid} = couch_replicator_notifier:start_link(CallbackFun),
-    Pid.
-
-
-% Mem3 cluster callbacks
-
-cluster_unstable(Server) ->
-    ok = gen_server:call(Server, set_unstable),
-    couch_replicator_notifier:notify({cluster, unstable}),
-    couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
-    couch_log:notice("~s : cluster unstable", [?MODULE]),
-    Server.
-
-cluster_stable(Server) ->
-    ok = gen_server:call(Server, set_stable),
-    couch_replicator_notifier:notify({cluster, stable}),
-    couch_stats:update_gauge([couch_replicator, cluster_is_stable], 1),
-    couch_log:notice("~s : cluster stable", [?MODULE]),
-    Server.
-
-
-% gen_server callbacks
-
-init([]) ->
-    ok = config:listen_for_changes(?MODULE, nil),
-    Period = abs(config:get_integer("replicator", "cluster_quiet_period",
-        ?DEFAULT_QUIET_PERIOD)),
-    StartPeriod = abs(config:get_integer("replicator", "cluster_start_period",
-        ?DEFAULT_START_PERIOD)),
-    couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
-    {ok, Mem3Cluster} = mem3_cluster:start_link(?MODULE, self(), StartPeriod,
-        Period),
-    {ok, #state{mem3_cluster_pid = Mem3Cluster, cluster_stable = false}}.
-
-
-terminate(_Reason, _State) ->
-    ok.
-
-
-handle_call(is_stable, _From, #state{cluster_stable = IsStable} = State) ->
-    {reply, IsStable, State};
-
-handle_call(set_stable, _From, State) ->
-    {reply, ok, State#state{cluster_stable = true}};
-
-handle_call(set_unstable, _From, State) ->
-    {reply, ok, State#state{cluster_stable = false}}.
-
-
-handle_cast({set_period, Period}, #state{mem3_cluster_pid = Pid} = State) ->
-    ok = mem3_cluster:set_period(Pid, Period),
-    {noreply, State}.
-
-
-handle_info(restart_config_listener, State) ->
-    ok = config:listen_for_changes(?MODULE, nil),
-    {noreply, State}.
-
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-
-%% Internal functions
-
-
-handle_config_change("replicator", "cluster_quiet_period", V, _, S) ->
-    ok = gen_server:cast(?MODULE, {set_period, list_to_integer(V)}),
-    {ok, S};
-handle_config_change(_, _, _, _, S) ->
-    {ok, S}.
-
-
-handle_config_terminate(_, stop, _) -> ok;
-handle_config_terminate(_S, _R, _St) ->
-    Pid = whereis(?MODULE),
-    erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener).
-
-
--spec owner_int(binary(), binary()) -> node().
-owner_int(ShardName, DocId) ->
-    DbName = mem3:dbname(ShardName),
-    Live = [node() | nodes()],
-    Shards = mem3:shards(DbName, DocId),
-    Nodes = [N || #shard{node=N} <- Shards, lists:member(N, Live)],
-    mem3:owner(DbName, DocId, Nodes).
-
-
-
--ifdef(TEST).
-
--include_lib("eunit/include/eunit.hrl").
-
-
-replicator_clustering_test_() ->
-    {
-        setup,
-        fun setup_all/0,
-        fun teardown_all/1,
-        {
-            foreach,
-            fun setup/0,
-            fun teardown/1,
-            [
-                t_stable_callback(),
-                t_unstable_callback()
-            ]
-        }
-    }.
-
-
-t_stable_callback() ->
-    ?_test(begin
-        ?assertEqual(false, is_stable()),
-        cluster_stable(whereis(?MODULE)),
-        ?assertEqual(true, is_stable())
-    end).
-
-
-t_unstable_callback() ->
-    ?_test(begin
-        cluster_stable(whereis(?MODULE)),
-        ?assertEqual(true, is_stable()),
-        cluster_unstable(whereis(?MODULE)),
-        ?assertEqual(false, is_stable())
-    end).
-
-
-setup_all() ->
-    meck:expect(couch_log, notice, 2, ok),
-    meck:expect(config, get, fun(_, _, Default) -> Default end),
-    meck:expect(config, listen_for_changes, 2, ok),
-    meck:expect(couch_stats, update_gauge, 2, ok),
-    meck:expect(couch_replicator_notifier, notify, 1, ok).
-
-
-teardown_all(_) ->
-    meck:unload().
-
-
-setup() ->
-    meck:reset([
-        config,
-        couch_log,
-        couch_stats,
-        couch_replicator_notifier
-    ]),
-    stop_clustering_process(),
-    {ok, Pid} = start_link(),
-    Pid.
-
-
-teardown(Pid) ->
-    stop_clustering_process(Pid).
-
-
-stop_clustering_process() ->
-    stop_clustering_process(whereis(?MODULE)).
-
-
-stop_clustering_process(undefined) ->
-    ok;
-
-stop_clustering_process(Pid) when is_pid(Pid) ->
-    Ref = erlang:monitor(process, Pid),
-    unlink(Pid),
-    exit(Pid, kill),
-    receive {'DOWN', Ref, _, _, _} -> ok end.
-
--endif.
diff --git a/src/couch_replicator/src/couch_replicator_db_changes.erl b/src/couch_replicator/src/couch_replicator_db_changes.erl
deleted file mode 100644
index 92b0222..0000000
--- a/src/couch_replicator/src/couch_replicator_db_changes.erl
+++ /dev/null
@@ -1,108 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_db_changes).
-
--behaviour(gen_server).
-
--export([
-   start_link/0
-]).
-
--export([
-   init/1,
-   terminate/2,
-   handle_call/3,
-   handle_info/2,
-   handle_cast/2,
-   code_change/3
-]).
-
--export([
-   notify_cluster_event/2
-]).
-
--record(state, {
-   event_listener :: pid(),
-   mdb_changes :: pid() | nil
-}).
-
-
--spec notify_cluster_event(pid(), {cluster, any()}) -> ok.
-notify_cluster_event(Server, {cluster, _} = Event) ->
-    gen_server:cast(Server, Event).
-
-
--spec start_link() ->
-    {ok, pid()} | ignore | {error, any()}.
-start_link() ->
-    gen_server:start_link(?MODULE, [], []).
-
-
-init([]) ->
-    EvtPid = couch_replicator_clustering:link_cluster_event_listener(?MODULE,
-        notify_cluster_event, [self()]),
-    State = #state{event_listener = EvtPid, mdb_changes = nil},
-    case couch_replicator_clustering:is_stable() of
-        true ->
-            {ok, restart_mdb_changes(State)};
-        false ->
-            {ok, State}
-    end.
-
-
-terminate(_Reason, _State) ->
-    ok.
-
-
-handle_call(_Msg, _From, State) ->
-    {reply, {error, invalid_call}, State}.
-
-
-handle_cast({cluster, unstable}, State) ->
-    {noreply, stop_mdb_changes(State)};
-
-handle_cast({cluster, stable}, State) ->
-    {noreply, restart_mdb_changes(State)}.
-
-
-handle_info(_Msg, State) ->
-    {noreply, State}.
-
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-
--spec restart_mdb_changes(#state{}) -> #state{}.
-restart_mdb_changes(#state{mdb_changes = nil} = State) ->
-    Suffix = <<"_replicator">>,
-    CallbackMod = couch_replicator_doc_processor,
-    Options = [skip_ddocs],
-    {ok, Pid} = couch_multidb_changes:start_link(Suffix, CallbackMod, nil,
-        Options),
-    couch_stats:increment_counter([couch_replicator, db_scans]),
-    couch_log:notice("Started replicator db changes listener ~p", [Pid]),
-    State#state{mdb_changes = Pid};
-
-restart_mdb_changes(#state{mdb_changes = _Pid} = State) ->
-    restart_mdb_changes(stop_mdb_changes(State)).
-
-
--spec stop_mdb_changes(#state{}) -> #state{}.
-stop_mdb_changes(#state{mdb_changes = nil} = State) ->
-    State;
-stop_mdb_changes(#state{mdb_changes = Pid} = State) ->
-    couch_log:notice("Stopping replicator db changes listener ~p", [Pid]),
-    unlink(Pid),
-    exit(Pid, kill),
-    State#state{mdb_changes = nil}.
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index 6778d53..d892339 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -13,7 +13,6 @@
 -module(couch_replicator_doc_processor).
 
 -behaviour(gen_server).
--behaviour(couch_multidb_changes).
 
 -export([
     start_link/0
@@ -29,10 +28,8 @@
 ]).
 
 -export([
-    db_created/2,
-    db_deleted/2,
-    db_found/2,
-    db_change/3
+    during_doc_update/3,
+    after_db_delete/1
 ]).
 
 -export([
@@ -40,8 +37,7 @@
     doc/2,
     doc_lookup/3,
     update_docs/0,
-    get_worker_ref/1,
-    notify_cluster_event/2
+    get_worker_ref/1
 ]).
 
 -include_lib("couch/include/couch_db.hrl").
@@ -59,345 +55,436 @@
 -define(INITIAL_BACKOFF_EXPONENT, 64).
 -define(MIN_FILTER_DELAY_SEC, 60).
 
--type filter_type() ::  nil | view | user | docids | mango.
 -type repstate() :: initializing | error | scheduled.
 
 
--record(rdoc, {
-    id :: db_doc_id() | '_' | {any(), '_'},
-    state :: repstate() | '_',
-    rep :: #rep{} | nil | '_',
-    rid :: rep_id() | nil | '_',
-    filter :: filter_type() | '_',
-    info :: binary() | nil | '_',
-    errcnt :: non_neg_integer() | '_',
-    worker :: reference() | nil | '_',
-    last_updated :: erlang:timestamp() | '_'
-}).
+-define(MAX_ACCEPTORS, 10).
+-define(MAX_JOBS, 500).
 
 
-% couch_multidb_changes API callbacks
-
-db_created(DbName, Server) ->
-    couch_stats:increment_counter([couch_replicator, docs, dbs_created]),
-    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
-    Server.
+during_doc_update(#doc{} = Doc, Db, _UpdateType) ->
+    couch_stats:increment_counter([couch_replicator, docs, db_changes]),
+    ok = process_change(Db, Doc).
 
 
-db_deleted(DbName, Server) ->
+after_db_delete(#{name := DbName}) ->
     couch_stats:increment_counter([couch_replicator, docs, dbs_deleted]),
-    ok = gen_server:call(?MODULE, {clean_up_replications, DbName}, infinity),
-    Server.
-
+    remove_replications_by_dbname(DbName).
 
-db_found(DbName, Server) ->
-    couch_stats:increment_counter([couch_replicator, docs, dbs_found]),
-    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
-    Server.
 
+process_change(_Db, #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>}) ->
+    ok;
 
-db_change(DbName, {ChangeProps} = Change, Server) ->
-    couch_stats:increment_counter([couch_replicator, docs, db_changes]),
-    try
-        ok = process_change(DbName, Change)
+process_change(#{name := DbName} = Db, #doc{deleted = true} = Doc) ->
+    Id = docs_job_id(DbName, Doc#doc.id),
+    ok = remove_replication_by_doc_job_id(Db, Id);
+
+process_change(#{name := DbName} = Db, #doc{} = Doc) ->
+    #doc{id = DocId, body = {Props} = Body} = Doc,
+    {Rep, Error} = try
+        Rep0 = couch_replicator_docs:parse_rep_doc_without_id(Body),
+        DocState = get_json_value(<<"_replication_state">>, Props, null),
+        Rep1 = Rep0#{?DB_NAME := DbName, ?DOC_STATE := DocState},
+        {Rep1, null}
     catch
-    exit:{Error, {gen_server, call, [?MODULE, _, _]}} ->
-        ErrMsg = "~p exited ~p while processing change from db ~p",
-        couch_log:error(ErrMsg, [?MODULE, Error, DbName]);
-    _Tag:Error ->
-        {RepProps} = get_json_value(doc, ChangeProps),
-        DocId = get_json_value(<<"_id">>, RepProps),
-        couch_replicator_docs:update_failed(DbName, DocId, Error)
+        throw:{bad_rep_doc, Reason} ->
+            {null, couch_replicator_utils:rep_error_to_binary(Reason)}
     end,
-    Server.
+    case couch_jobs:get_job_data(Db, ?REP_DOCS, docs_job_id(DbName, DocId)) of
+        {error, not_found} ->
+            add_rep_doc_job(Db, DbName, DocId, Rep, Error);
+        {ok, #{?REP := null, ?REP_PARSE_ERROR := Error}}
+                when Rep =:= null ->
+            % Same error as before occurred, don't bother updating the job
+            ok;
+        {ok, #{?REP := null}} when Rep =:= null ->
+            % Error occured but it's a different error. Update the job so user
+            % sees the new error
+            add_rep_doc_job(Db, DbName, DocId, Rep, Error);
+        {ok, #{?REP := OldRep, ?REP_PARSE_ERROR := OldError}} ->
+            case compare_reps(OldRep, Rep) of
+                true ->
+                    % Document was changed but none of the parameters relevent
+                    % for the replication job have changed, so make it a no-op
+                    ok;
+                false ->
+                    add_rep_doc_job(Db, DbName, DocId, Rep, Error)
+            end
+    end.
 
 
--spec get_worker_ref(db_doc_id()) -> reference() | nil.
-get_worker_ref({DbName, DocId}) when is_binary(DbName), is_binary(DocId) ->
-    case ets:lookup(?MODULE, {DbName, DocId}) of
-        [#rdoc{worker = WRef}] when is_reference(WRef) ->
-            WRef;
-        [#rdoc{worker = nil}] ->
-            nil;
-        [] ->
-            nil
-    end.
+compare_reps(Rep1, Rep2) ->
+    NormRep1 = couch_replicator_util:normalize_rep(Rep1),
+    NormRep2 = couch_replicator_util:normalize_rep(Rep2),
+    NormRep1 =:= NormRep2.
 
 
-% Cluster membership change notification callback
--spec notify_cluster_event(pid(), {cluster, any()}) -> ok.
-notify_cluster_event(Server, {cluster, _} = Event) ->
-    gen_server:cast(Server, Event).
-
-
-process_change(DbName, {Change}) ->
-    {RepProps} = JsonRepDoc = get_json_value(doc, Change),
-    DocId = get_json_value(<<"_id">>, RepProps),
-    Owner = couch_replicator_clustering:owner(DbName, DocId),
-    Id = {DbName, DocId},
-    case {Owner, get_json_value(deleted, Change, false)} of
-    {_, true} ->
-        ok = gen_server:call(?MODULE, {removed, Id}, infinity);
-    {unstable, false} ->
-        couch_log:notice("Not starting '~s' as cluster is unstable", [DocId]);
-    {ThisNode, false} when ThisNode =:= node() ->
-        case get_json_value(<<"_replication_state">>, RepProps) of
-        undefined ->
-            ok = process_updated(Id, JsonRepDoc);
-        <<"triggered">> ->
-            maybe_remove_state_fields(DbName, DocId),
-            ok = process_updated(Id, JsonRepDoc);
-        <<"completed">> ->
-            ok = gen_server:call(?MODULE, {completed, Id}, infinity);
-        <<"error">> ->
-            % Handle replications started from older versions of replicator
-            % which wrote transient errors to replication docs
-            maybe_remove_state_fields(DbName, DocId),
-            ok = process_updated(Id, JsonRepDoc);
-        <<"failed">> ->
-            ok
-        end;
-    {Owner, false} ->
-        ok
-    end,
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [],  []).
+
+
+init([]) ->
+    process_flag(trap_exit, true),
+    St = #{
+        acceptors => #{},
+        workers => #{}
+    },
+    {ok, update_config(St), 0}.
+
+
+terminate(_Reason, #{} = St) ->
+    #{workers := Workers, acceptors := Acceptors},
+    lists:foreach(fun(WPid) -> unlink(Pid), exit(Pid, 9) end, Workers),
+    couch_replicator_job_acceptor:stop(Acceptors),
     ok.
 
 
-maybe_remove_state_fields(DbName, DocId) ->
-    case update_docs() of
-        true ->
-            ok;
-        false ->
-            couch_replicator_docs:remove_state_fields(DbName, DocId)
-    end.
+handle_call(Msg, _From, #{} = St) ->
+    {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
 
 
-process_updated({DbName, _DocId} = Id, JsonRepDoc) ->
-    % Parsing replication doc (but not calculating the id) could throw an
-    % exception which would indicate this document is malformed. This exception
-    % should propagate to db_change function and will be recorded as permanent
-    % failure in the document. User will have to update the documet to fix the
-    % problem.
-    Rep0 = couch_replicator_docs:parse_rep_doc_without_id(JsonRepDoc),
-    Rep = Rep0#rep{db_name = DbName, start_time = os:timestamp()},
-    Filter = case couch_replicator_filters:parse(Rep#rep.options) of
-    {ok, nil} ->
-        nil;
-    {ok, {user, _FName, _QP}} ->
-        user;
-    {ok, {view, _FName, _QP}} ->
-        view;
-    {ok, {docids, _DocIds}} ->
-        docids;
-    {ok, {mango, _Selector}} ->
-        mango;
-    {error, FilterError} ->
-        throw(FilterError)
-    end,
-    gen_server:call(?MODULE, {updated, Id, Rep, Filter}, infinity).
+handle_cast({?ACCEPTED_JOB, Job, JobData}, #{} = St) ->
+    {noreply, spawn_worker(Job, JobData, St)};
 
+handle_cast(Msg, #{} = St) ->
+    {stop, {bad_cast, Msg}, St}.
 
-% Doc processor gen_server API and callbacks
 
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [],  []).
+handle_info({'EXIT', Pid, Reason}, #{} = St) ->
+    #{workers := Workers, acceptors := Acceptors} = St,
+    case {maps:is_key(Pid, Acceptors), maps:is_key(Pid, Workers)} of
+        {false, false} -> handle_unknown_pid(Pid, Reason, St);
+        {true, false} -> handle_acceptor_died(Pid, Reason, St);
+        {false, true} -> handle_worker_died(Pid, Reason, St)
+    end;
 
+handle_info(timeout, #{} = St) ->
+    {noreply, maybe_start_acceptors(St)};
 
-init([]) ->
-    ?MODULE = ets:new(?MODULE, [named_table, {keypos, #rdoc.id},
-        {read_concurrency, true}, {write_concurrency, true}]),
-    couch_replicator_clustering:link_cluster_event_listener(?MODULE,
-        notify_cluster_event, [self()]),
-    {ok, nil}.
+handle_info(Msg, St) ->
+    {stop, {bad_info, Msg}, St}.
 
 
-terminate(_Reason, _State) ->
-    ok.
+%% handle_info({'DOWN', _, _, _, #doc_worker_result{id = Id, wref = Ref,
+%%         result = Res}}, State) ->
+%%     ok = worker_returned(Ref, Id, Res),
+%%     {noreply, State};
 
 
-handle_call({updated, Id, Rep, Filter}, _From, State) ->
-    ok = updated_doc(Id, Rep, Filter),
-    {reply, ok, State};
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
 
-handle_call({removed, Id}, _From, State) ->
-    ok = removed_doc(Id),
-    {reply, ok, State};
 
-handle_call({completed, Id}, _From, State) ->
-    true = ets:delete(?MODULE, Id),
-    {reply, ok, State};
+update_config(#{} = St) ->
+    MaxJobs = config:get_integer("replicator",
+        "max_doc_processor_jobs", ?MAX_JOBS),
+    MaxAcceptors = config:get_integer("replicator",
+        "max_doc_processor_acceptors", ?MAX_ACCEPTORS),
+    AcceptTimeoutSec = config:get_integer("replicator",
+        "doc_processor_accept_timeout_sec", ?ACCEPT_TIMEOUT_SEC),
+    AcceptFudgeSec = config:get_integer("replicator",
+        "doc_processor_accept_fudge_sec", ?ACCEPT_FUDGE_SEC),
+    St#{
+        max_jobs => MaxJobs,
+        max_acceptors => MaxAcceptors,
+        accept_timeout_sec => AcceptTimeoutSec,
+        accept_fudge_sec => AcceptFudgeSec,
+    }.
 
-handle_call({clean_up_replications, DbName}, _From, State) ->
-    ok = removed_db(DbName),
-    {reply, ok, State}.
 
-handle_cast({cluster, unstable}, State) ->
-    % Ignoring unstable state transition
-    {noreply, State};
+handle_acceptor_died(Pid, normal, #{acceptors := Acceptors} = St1) ->
+    St2 = St1#{acceptors := maps:remove(Pid, Acceptors)},
+    St3 = update_config(St2),
+    St4 = maybe_start_acceptors(St3),
+    {noreply, St4};
+
+handle_acceptor_died(Pid, Error, #{acceptors := Acceptors} = St) ->
+    Msg = "~p : acceptor ~p died with ~p",
+    couch_log:error(Msg, [?ERROR, Pid, Reason]),
+    {stop, {acceptor_pid_exit, Pid, Reason}, St}.
+
+
+handle_worker_died(Pid, normal, #{workers := Workers} = St1) ->
+    St2 = St1#{workers := maps:remove(Pid, Workers)},
+    St3 = maybe_start_acceptors(St2),
+    {noreply, St3};
+
+handle_worker_died(Pid, Error, #{workers := Workers} = St) ->
+    Msg = "~p : acceptor ~p died with ~p",
+    couch_log:error(Msg, [?ERROR, Pid, Reason]),
+    {stop, {worker_pid_exit, Pid, Reason}, St}.
+
+
+handle_unknown_pid(Pid, Reason, #{} = St) ->
+    Msg = "~p : unknown pid ~p died with ~p",
+    couch_log:error(Msg, [?MODULE, Pid, Reason]),
+    {stop, {unknown_pid_exit, Pid, Reason}, St};
+
+
+maybe_start_acceptors(#st{} = St1) when
+    St2 = update_config(St1),
+    #{
+        workers := Workers,
+        acceptors := Acceptors,
+        max_jobs := MaxJobs,
+        max_acceptors := MaxAcceptors
+    } = St2,
+    WCount = map_size(Workers),
+    ACount = map_size(Acceptors),
+    case ACount + WCount < MaxJobs of
+        true -> start_acceptors(MaxAcceptors - ACount, St2);
+        false -> St2
+    end.
 
-handle_cast({cluster, stable}, State) ->
-    % Membership changed recheck all the replication document ownership
-    nil = ets:foldl(fun cluster_membership_foldl/2, nil, ?MODULE),
-    {noreply, State};
 
-handle_cast(Msg, State) ->
-    {stop, {error, unexpected_message, Msg}, State}.
+start_acceptors(N, #st{} = St) ->
+    St#{
+       acceptors := Acceptors,
+       accept_timeout_sec := TimeoutSec,
+       accept_fudge_sec := FudgeSec
+    },
+    Opts = #{
+        timeout = TimeoutSec,
+        max_sched_time = erlang:system_time(second) + FudgeSec
+    },
+    Pids = couch_replicator_acceptor:start(?REP_DOCS, N, self(), Opts),
+    St#{acceptors := maps:merge(Acceptors, Pids)}.
 
 
-handle_info({'DOWN', _, _, _, #doc_worker_result{id = Id, wref = Ref,
-        result = Res}}, State) ->
-    ok = worker_returned(Ref, Id, Res),
-    {noreply, State};
+start_worker(Job, #{} = JobData, #{workers := Workers} = St) ->
+    Pid = spawn_link(fun() -> worker_fun(Job, JobData) end),
+    St#{workers := Workers#{Pid => true}}.
 
-handle_info(_Msg, State) ->
-    {noreply, State}.
 
+worker_fun(Job, JobData) ->
+    try
+        worker_fun1(Job, JobData)
+    catch
+        throw:halt ->
+            Msg = "~p : replication doc job ~p lock conflict",
+            couch_log:error(Msg, [?MODULE, Job]);
+        throw:{rep_doc_not_current, DbName, DocId} ->
+            Msg = "~p : replication doc ~s:~s is not current",
+            couch_log:error(Msg, [?MODULE, DbName, DocID]),
+    end.
 
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
 
+worker_fun1(Job, #{?REP := null} = RepDocData) ->
+    #{
+        ?STATE_INFO := Error,
+        ?DB_NAME := DbName,
+        ?DOC_ID := DocId
+    } = RepDocData,
+    finish_with_permanent_failure(undefined, Job, RepDocData, Error),
+    couch_replicator_docs:update_failed(DbName, DocId, Error);
 
-% Doc processor gen_server private helper functions
 
-% Handle doc update -- add to ets, then start a worker to try to turn it into
-% a replication job. In most cases it will succeed quickly but for filtered
-% replications or if there are duplicates, it could take longer
-% (theoretically indefinitely) until a replication could be started. Before
-% adding replication job, make sure to delete all old jobs associated with
-% same document.
--spec updated_doc(db_doc_id(), #rep{}, filter_type()) -> ok.
-updated_doc(Id, Rep, Filter) ->
-    NormCurRep = couch_replicator_utils:normalize_rep(current_rep(Id)),
-    NormNewRep = couch_replicator_utils:normalize_rep(Rep),
-    case NormCurRep == NormNewRep of
-        false ->
-            removed_doc(Id),
-            Row = #rdoc{
-                id = Id,
-                state = initializing,
-                rep = Rep,
-                rid = nil,
-                filter = Filter,
-                info = nil,
-                errcnt = 0,
-                worker = nil,
-                last_updated = os:timestamp()
-            },
-            true = ets:insert(?MODULE, Row),
-            ok = maybe_start_worker(Id);
-        true ->
-            ok
+worker_fun1(Job, #{?REP := #{}} = RepDocData) ->
+    #{?REP := Rep} = RepDocData,
+    #{?REP_ID := OldRepId, ?DB_NAME := DbName, ?DOC_ID := DocId} = Rep,
+    ok = remove_old_state_fields(RepDocData),
+    try
+        RepWithId = couch_replicator_docs:update_rep_id(Rep),
+        worker_fun2(Job, OldRepId, RepWithId, RepDocData)
+    catch
+        throw:{filter_fetch_error, Error} ->
+            Error1 = io_lib:format("Filter fetch error ~p", [Error]),
+            Error2 = couch_util:to_binary(Error1),
+            finish_with_temporary_error(undefined, Job, RepDocData, Error2),
+            maybe_update_doc_error(OldRepId, DbName, DocId, Error2)
     end.
 
 
-% Return current #rep{} record if any. If replication hasn't been submitted
-% to the scheduler yet, #rep{} record will be in the document processor's
-% ETS table, otherwise query scheduler for the #rep{} record.
--spec current_rep({binary(), binary()}) -> #rep{} | nil.
-current_rep({DbName, DocId}) when is_binary(DbName), is_binary(DocId) ->
-    case ets:lookup(?MODULE, {DbName, DocId}) of
-        [] ->
-            nil;
-        [#rdoc{state = scheduled, rep = nil, rid = JobId}] ->
-            % When replication is scheduled, #rep{} record which can be quite
-            % large compared to other bits in #rdoc is removed in order to avoid
-            % having to keep 2 copies of it. So have to fetch it from the
-            % scheduler.
-            couch_replicator_scheduler:rep_state(JobId);
-        [#rdoc{rep = Rep}] ->
-            Rep
+
+worker_fun2(Job, OldRepId, #{} = Rep, #{} = RepDocData) ->
+    Result = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+        check_rep_doc_current(JTx, Rep),
+        remove_stale_replication_job(JTx, OldRepId, Rep),
+        maybe_start_replication_job(JTx, Job, Rep, RepDocData)
+    end),
+    case Result of
+    {ok, RepId} ->
+            maybe_update_doc_triggered(DbName, DocId, RepId);
+        ignore ->
+            ok;
+        {error, {permanent_failure, Error}}  ->
+            couch_replicator_docs:update_failed(DbName, DocId, Error);
+        {error, {temporary_error, RepId, Error}} ->
+            maybe_update_doc_error(RepId, DbName, DocId, Error)
     end.
 
 
--spec worker_returned(reference(), db_doc_id(), rep_start_result()) -> ok.
-worker_returned(Ref, Id, {ok, RepId}) ->
-    case ets:lookup(?MODULE, Id) of
-    [#rdoc{worker = Ref} = Row] ->
-        Row0 = Row#rdoc{
-            state = scheduled,
-            errcnt = 0,
-            worker = nil,
-            last_updated = os:timestamp()
-        },
-        NewRow = case Row0 of
-            #rdoc{rid = RepId, filter = user} ->
-                % Filtered replication id didn't change.
-                Row0;
-            #rdoc{rid = nil, filter = user} ->
-                % Calculated new replication id for a filtered replication. Make
-                % sure to schedule another check as filter code could change.
-                % Replication starts could have been failing, so also clear
-                % error count.
-                Row0#rdoc{rid = RepId};
-            #rdoc{rid = OldRepId, filter = user} ->
-                % Replication id of existing replication job with filter has
-                % changed. Remove old replication job from scheduler and
-                % schedule check to check for future changes.
-                ok = couch_replicator_scheduler:remove_job(OldRepId),
-                Msg = io_lib:format("Replication id changed: ~p -> ~p", [
-                    OldRepId, RepId]),
-                Row0#rdoc{rid = RepId, info = couch_util:to_binary(Msg)};
-            #rdoc{rid = nil} ->
-                % Calculated new replication id for non-filtered replication.
-                % Remove replication doc body, after this we won't need it
-                % anymore.
-                Row0#rdoc{rep=nil, rid=RepId, info=nil}
-        end,
-        true = ets:insert(?MODULE, NewRow),
-        ok = maybe_update_doc_triggered(Row#rdoc.rep, RepId),
-        ok = maybe_start_worker(Id);
-    _ ->
-        ok  % doc could have been deleted, ignore
-    end,
-    ok;
+check_rep_doc_current(JTx, #{} = Rep) ->
+    #{?DB_NAME := DbName, ?DOC_ID := DocId, ?VER := Ver} = Rep,
+    case couch_jobs:get_job_data(JTx, ?REP_DOCS, doc_job_id(DbName, DocId)) of
+        {ok, #{?REP := #{?VER: = Ver}}} ->
+            ok;
+        {ok, #{?REP := #{?VER := V}}} when Ver =/= V ->
+            throw({rep_doc_not_current, DbName, DocId});
+        {error, not_found} ->
+            throw({rep_doc_not_current, DbName, DocId});
+    end.
 
-worker_returned(_Ref, _Id, ignore) ->
-    ok;
 
-worker_returned(Ref, Id, {temporary_error, Reason}) ->
-    case ets:lookup(?MODULE, Id) of
-    [#rdoc{worker = Ref, errcnt = ErrCnt} = Row] ->
-        NewRow = Row#rdoc{
-            rid = nil,
-            state = error,
-            info = Reason,
-            errcnt = ErrCnt + 1,
-            worker = nil,
-            last_updated = os:timestamp()
-        },
-        true = ets:insert(?MODULE, NewRow),
-        ok = maybe_update_doc_error(NewRow#rdoc.rep, Reason),
-        ok = maybe_start_worker(Id);
-    _ ->
-        ok  % doc could have been deleted, ignore
-    end,
+% A stale replication job is one still running after the filter
+% has been updated and a new replication id was generated.
+%
+remove_stale_replication_job(_, null, #{}) ->
     ok;
 
-worker_returned(Ref, Id, {permanent_failure, _Reason}) ->
-    case ets:lookup(?MODULE, Id) of
-    [#rdoc{worker = Ref}] ->
-        true = ets:delete(?MODULE, Id);
-    _ ->
-        ok  % doc could have been deleted, ignore
-    end,
+remove_stale_replication_job(JTx, OldRepId, #{} = Rep) ->
+    #{?REP_ID := RepId, ?VER := Ver} = Rep,
+    case couch_jobs:get_job_data(JTx, ?REP_JOBS, OldRepId) of
+        {error, not_found} ->
+            ok;
+        {ok, #{?REP := {?VER := Ver}} when OldRep =/= RepId ->
+            couch_jobs:remove(JTx, ?REP_JOBS, OldRepId)
+        {ok, #{}} ->
+            ok
+    end.
+
+
+maybe_start_replication_job(JTx, Job, #{} = Rep, #{} = RepDocData) ->
+    {#?REP_ID := RepId, ?DB_NAME := DbName, ?DOC_ID := DocId} = Rep,
+    case couch_jobs:get_job_data(JTx, ?REP_JOBS, RepId) of
+        {error, not_found} ->
+            start_replication_job(JTx, Job, Rep, RepDocData);
+        {ok, #{?REP := {?DB_NAME := DbName, ?DOC_ID := DocId}} = CurRep} ->
+            case compare_reps(Rep, CurRep) of
+                true ->
+                    dont_start_replication_job(JTx, Job, Rep, RepDocData);
+                false ->
+                    ok = couch_jobs:remove(JTx, ?REP_JOBS, RepId),
+                    start_replication_job(JTx, Job, Rep, RepDocData)
+            end;
+        {ok, #{?REP := {?DB_NAME := null}}} ->
+            Err1 = io_lib:format("Replication `~s` specified by `~s:~s`"
+                " already running as a transient replication, started via"
+                " `_replicate` API endpoint", [RepId, DbName, DocId]),
+            Err2 = couch_util:to_binary(Err1),
+            ok = finish_with_temporary_error(JTx, Job, RepDocData, Err2),
+            {error, {temporary_error, RepId, Error2}};
+        {ok, #{?REP := {?DB_NAME := OtherDb, ?DOC_ID := OtherDoc}}} ->
+            Err1 = io_lib:format("Replication `~s` specified by `~s:~s`"
+                " already started by document `~s:~s`", [RepId, DocId,
+                DbName, OtherDb, OtherDoc],
+            Error2 = couch_util:to_binary(Err1),
+            ok = finish_with_permanent_failure(JTx, Job, RepDocData, Error),
+            {error, {permanent_failure, Error2}}
+    end.
+
+
+finish_with_temporary_error(JTx, Job, RepDocData, Error) ->
+    #{?ERROR_COUNT := ErrorCount} = RepDocData,
+    ErrorCount1 = ErrorCount + 1,
+    RepDocData1 = RepDocData#{
+        ?STATE := ?ST_ERROR,
+        ?STATE_INFO := Error,
+        ?ERROR_COUNT := ErrorCount1,
+    } = RepDocData,
+    schedule_error_backoff(JTx, Job, ErrorCount1),
+    case couch_jobs:finish(JTx, Job, RepDocData1) of
+        ok -> ok;
+        {error, halt} -> throw(halt)
+    end.
+
+
+finish_with_permanent_failure(JTx, Job, RepDocData, Error) ->
+    #{?ERROR_COUNT := ErrorCount} = RepDocData,
+    RepDocData1 = RepDocData#{
+        ?STATE := ?ST_FAILED,
+        ?STATE_INFO := Error,
+        ?ERROR_COUNT := ErrorCount + 1,
+    } = RepDocData,
+    case couch_jobs:finish(JTx, Job, RepDocData1) of
+        ok -> ok;
+        {error, halt} -> throw(halt)
+    end.
+
+
+dont_start_replication_job(JTx, Job, Rep, RepDocData) ->
+    RepDocData1 = RepDocData#{?LAST_UPDATED => erlang:system_time()},
+    ok = schedule_filter_check(JTx, Job, Rep),
+    case couch_jobs:finish(JTx, Job, RepDocData1) of
+        ok -> ignore;
+        {error, halt} -> throw(halt)
+    end.
+
+
+start_replication_job(JTx, Job, #{} = Rep, #{} = RepDocData) ->
+    #{?REP_ID := RepId} = Rep,
+    RepJobData = #{
+        ?REP => Rep,
+        ?STATE => ?ST_PENDING,
+        ?STATE_INFO => null,
+        ?ERROR_COUNT => 0,
+        ?LAST_UPDATED => erlang:system_time(),
+        ?HISTORY => []
+    },
+    ok = couch_jobs:add(JTx, ?REP_JOBS, RepId, RepJobData),
+    RepDocData1 = RepDocData#{
+       ?REP := Rep,
+       ?STATE := ?ST_SCHEDULED,
+       ?STATE_INFO := null,
+       ?ERROR_COUNT := 0,
+       ?LAST_UPDATED => erlang:system_time()
+    },
+    ok = schedule_filter_check(JTx, Job, Rep),
+    case couch_jobs:finish(JTx, Job, RepDocData1) of
+        ok -> {ok, RepId};
+        {error, halt} -> throw(halt)
+    end.
+
+
+schedule_error_backoff(JTx, Job, ErrorCount) ->
+    Exp = min(ErrCnt, ?ERROR_MAX_BACKOFF_EXPONENT),
+    % ErrCnt is the exponent here. The reason 64 is used is to start at
+    % 64 (about a minute) max range. Then first backoff would be 30 sec
+    % on average. Then 1 minute and so on.
+    NowSec = erlang:system_time(second),
+    When = NowSec + rand:uniform(?INITIAL_BACKOFF_EXPONENT bsl Exp).
+    couch_jobs:resubmit(JTx, Job, trunc(When)).
+
+
+schedule_filter_check(JTx, Job, #{<<"filter_type">> := <<"user">>} = Rep) ->
+    IntervalSec = filter_check_interval_sec(),
+    NowSec = erlang:system_time(second),
+    When = NowSec + 0.5 * IntervalSec + rand:uniform(IntervalSec),
+    couch_jobs:resubmit(JTx, Job, trunc(When)).
+
+schedule_filter_check(_JTx, _Job, #{}) ->
     ok.
 
 
--spec maybe_update_doc_error(#rep{}, any()) -> ok.
-maybe_update_doc_error(Rep, Reason) ->
+remove_old_state_fields(#{?DOC_STATE := DocState} = RepDocData) when
+        DocState =:= ?TRIGGERED orelse DocState =:= ?ERROR ->
     case update_docs() of
         true ->
-            couch_replicator_docs:update_error(Rep, Reason);
+            ok;
+        false ->
+            #{?DB_NAME := DbName, ?DOC_ID := DocId} = RepDocData,
+            couch_replicator_docs:remove_state_fields(DbName, DocId)
+    end;
+
+remove_old_state_fields(#{}) ->
+    ok.
+
+
+-spec maybe_update_doc_error(binary(), binary(), binary(), any()) -> ok.
+maybe_update_doc_error(RepId, DbName, DocId, Error) ->
+    case update_docs() of
+        true ->
+            couch_replicator_docs:update_error(RepId, DbName, DocId, Error);
         false ->
             ok
     end.
 
 
--spec maybe_update_doc_triggered(#rep{}, rep_id()) -> ok.
-maybe_update_doc_triggered(Rep, RepId) ->
+-spec maybe_update_doc_triggered(#{}, rep_id()) -> ok.
+maybe_update_doc_triggered(RepId, DbName, DocId) ->
     case update_docs() of
         true ->
-            couch_replicator_docs:update_triggered(Rep, RepId);
+            couch_replicator_docs:update_triggered(RepId, DbName, DocId);
         false ->
             ok
     end.
@@ -412,75 +499,17 @@ error_backoff(ErrCnt) ->
     couch_rand:uniform(?INITIAL_BACKOFF_EXPONENT bsl Exp).
 
 
--spec filter_backoff() -> seconds().
-filter_backoff() ->
-    Total = ets:info(?MODULE, size),
-    % This value scaled by the number of replications. If the are a lot of them
-    % wait is longer, but not more than a day (?TS_DAY_SEC). If there are just
-    % few, wait is shorter, starting at about 30 seconds. `2 *` is used since
-    % the expected wait would then be 0.5 * Range so it is easier to see the
-    % average wait. `1 +` is used because couch_rand:uniform only
-    % accepts >= 1 values and crashes otherwise.
-    Range = 1 + min(2 * (Total / 10), ?TS_DAY_SEC),
-    ?MIN_FILTER_DELAY_SEC + couch_rand:uniform(round(Range)).
-
-
-% Document removed from db -- clear ets table and remove all scheduled jobs
--spec removed_doc(db_doc_id()) -> ok.
-removed_doc({DbName, DocId} = Id) ->
-    ets:delete(?MODULE, Id),
-    RepIds = couch_replicator_scheduler:find_jobs_by_doc(DbName, DocId),
-    lists:foreach(fun couch_replicator_scheduler:remove_job/1, RepIds).
-
-
-% Whole db shard is gone -- remove all its ets rows and stop jobs
--spec removed_db(binary()) -> ok.
-removed_db(DbName) ->
-    EtsPat = #rdoc{id = {DbName, '_'}, _ = '_'},
-    ets:match_delete(?MODULE, EtsPat),
-    RepIds = couch_replicator_scheduler:find_jobs_by_dbname(DbName),
-    lists:foreach(fun couch_replicator_scheduler:remove_job/1, RepIds).
-
-
-% Spawn a worker process which will attempt to calculate a replication id, then
-% start a replication. Returns a process monitor reference. The worker is
-% guaranteed to exit with rep_start_result() type only.
--spec maybe_start_worker(db_doc_id()) -> ok.
-maybe_start_worker(Id) ->
-    case ets:lookup(?MODULE, Id) of
-    [] ->
-        ok;
-    [#rdoc{state = scheduled, filter = Filter}] when Filter =/= user ->
-        ok;
-    [#rdoc{rep = Rep} = Doc] ->
-        % For any replication with a user created filter function, periodically
-        % (every `filter_backoff/0` seconds) to try to see if the user filter
-        % has changed by using a worker to check for changes. When the worker
-        % returns check if replication ID has changed. If it hasn't keep
-        % checking (spawn another worker and so on). If it has stop the job
-        % with the old ID and continue checking.
-        Wait = get_worker_wait(Doc),
-        Ref = make_ref(),
-        true = ets:insert(?MODULE, Doc#rdoc{worker = Ref}),
-        couch_replicator_doc_processor_worker:spawn_worker(Id, Rep, Wait, Ref),
-        ok
-    end.
-
-
--spec get_worker_wait(#rdoc{}) -> seconds().
-get_worker_wait(#rdoc{state = scheduled, filter = user}) ->
-    filter_backoff();
-get_worker_wait(#rdoc{state = error, errcnt = ErrCnt}) ->
-    error_backoff(ErrCnt);
-get_worker_wait(#rdoc{state = initializing}) ->
-    0.
-
-
 -spec update_docs() -> boolean().
 update_docs() ->
     config:get_boolean("replicator", "update_docs", ?DEFAULT_UPDATE_DOCS).
 
 
+-spec filter_check_interval_sec() -> integer().
+filter_check_interval_sec() ->
+    config:get_integer("replicator", "filter_check_interval_sec",
+        ?DEFAULT_FILTER_CHECK_INTERVAL_SEC).
+
+
 % _scheduler/docs HTTP endpoint helpers
 
 -spec docs([atom()]) -> [{[_]}] | [].
@@ -533,6 +562,15 @@ doc_lookup(Db, DocId, HealthThreshold) ->
     end.
 
 
+-spec ejson_state_info(binary() | nil) -> binary() | null.
+ejson_state_info(nil) ->
+    null;
+ejson_state_info(Info) when is_binary(Info) ->
+    Info;
+ejson_state_info(Info) ->
+    couch_replicator_utils:rep_error_to_binary(Info).
+
+
 -spec ejson_rep_id(rep_id() | nil) -> binary() | null.
 ejson_rep_id(nil) ->
     null;
@@ -570,7 +608,7 @@ ejson_doc(#rdoc{state = RepState} = RDoc, _HealthThreshold) ->
         {database, DbName},
         {id, ejson_rep_id(RepId)},
         {state, RepState},
-        {info, couch_replicator_utils:ejson_state_info(StateInfo)},
+        {info, ejson_state_info(StateInfo)},
         {error_count, ErrorCount},
         {node, node()},
         {last_updated, couch_replicator_utils:iso8601(StateTime)},
@@ -585,27 +623,78 @@ ejson_doc_state_filter(State, States) when is_list(States), is_atom(State) ->
     lists:member(State, States).
 
 
--spec cluster_membership_foldl(#rdoc{}, nil) -> nil.
-cluster_membership_foldl(#rdoc{id = {DbName, DocId} = Id, rid = RepId}, nil) ->
-    case couch_replicator_clustering:owner(DbName, DocId) of
-        unstable ->
-            nil;
-        ThisNode when ThisNode =:= node() ->
-            nil;
-        OtherNode ->
-            Msg = "Replication doc ~p:~p with id ~p usurped by node ~p",
-            couch_log:notice(Msg, [DbName, DocId, RepId, OtherNode]),
-            removed_doc(Id),
-            nil
+-spec add_rep_doc_job(any(), binary(), binary(), #{} | null,
+    binary() | null) -> ok.
+add_rep_doc_job(Tx, DbName, DocId, Rep, RepParseError) ->
+    JobId = docs_job_id(DbName, DocId),
+    RepDocData = case Rep of
+        null ->
+            #{
+                ?REP => null,
+                ?DB_NAME => DbName,
+                ?DOC_ID => DocId,
+                ?STATE => ?ST_INITIALIZING,
+                ?STATE_INFO => RepParseError
+                ?ERROR_COUNT => 0,
+                ?LAST_UPDATED => erlang:system_time()
+            };
+        #{} ->
+            #{
+                ?REP => Rep,
+                ?STATE => ?ST_INITIALIZING,
+                ?ERROR_COUNT => 0,
+                ?LAST_UPDATED => erlang:system_time(),
+                ?STATE_INFO => null
+            }
+    end,
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+       ok = remove_replication_by_doc_job_id(JTx, JobId),
+       ok = couch_jobs:add(JTx, ?REP_DOCS, RepDocData)
+    end).
+
+
+docs_job_id(DbName, Id) when is_binary(DbName), is_binary(Id) ->
+    <<DbName/binary, "|", Id/binary>>.
+
+
+-spec remove_replication_by_doc_job_id(Tx, Id) -> ok.
+remove_replication_by_doc_job_id(Tx, Id) ->
+    case couch_jobs:get_job_data(Tx, ?REP_DOCS, Id) of
+        {error, not_found} ->
+            ok;
+        {ok, #{?REP := {?REP_ID :=  null}}} ->
+            couch_jobs:remove(Tx, ?REP_DOCS, Id),
+            ok;
+        {ok, #{?REP := {?REP_ID := RepId}}} ->
+            couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+                couch_jobs:remove(JTx, ?REP_JOBS, RepId),
+                couch_jobs:remove(JTx, ?REP_DOCS, Id)
+            end),
+            ok
     end.
 
 
+-spec remove_replications_by_dbname(DbName) -> ok.
+remove_replications_by_dbname(DbName) ->
+    DbNameSize = byte_size(DbName),
+    Filter = fun
+        (<<DbName:DbNameSize/binary, "|", _, _/binary>>) -> true;
+        (_) -> false
+    end,
+    JobsMap = couch_job:get_jobs(undefined, ?REP_DOCS, Filter),
+    % Batch these into smaller transactions eventually...
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+        maps:map(fun(Id, _) ->
+            remove_replication_by_doc_job_id(JTx, Id)
+        end, JobsMap)
+    end).
+
+
 -ifdef(TEST).
 
 -include_lib("eunit/include/eunit.hrl").
 
 -define(DB, <<"db">>).
--define(EXIT_DB, <<"exit_db">>).
 -define(DOC1, <<"doc1">>).
 -define(DOC2, <<"doc2">>).
 -define(R1, {"1", ""}).
@@ -614,30 +703,23 @@ cluster_membership_foldl(#rdoc{id = {DbName, DocId} = Id, rid = RepId}, nil) ->
 
 doc_processor_test_() ->
     {
-        setup,
-        fun setup_all/0,
-        fun teardown_all/1,
-        {
-            foreach,
-            fun setup/0,
-            fun teardown/1,
-            [
-                t_bad_change(),
-                t_regular_change(),
-                t_change_with_doc_processor_crash(),
-                t_change_with_existing_job(),
-                t_deleted_change(),
-                t_triggered_change(),
-                t_completed_change(),
-                t_active_replication_completed(),
-                t_error_change(),
-                t_failed_change(),
-                t_change_for_different_node(),
-                t_change_when_cluster_unstable(),
-                t_ejson_docs(),
-                t_cluster_membership_foldl()
-            ]
-        }
+        foreach,
+        fun setup/0,
+        fun teardown/1,
+        [
+            t_bad_change(),
+            t_regular_change(),
+            t_change_with_existing_job(),
+            t_deleted_change(),
+            t_triggered_change(),
+            t_completed_change(),
+            t_active_replication_completed(),
+            t_error_change(),
+            t_failed_change(),
+            t_change_for_different_node(),
+            t_change_when_cluster_unstable(),
+            t_ejson_docs()
+        ]
     }.
 
 
@@ -659,15 +741,6 @@ t_regular_change() ->
     end).
 
 
-% Handle cases where doc processor exits or crashes while processing a change
-t_change_with_doc_processor_crash() ->
-    ?_test(begin
-        mock_existing_jobs_lookup([]),
-        ?assertEqual(acc, db_change(?EXIT_DB, change(), acc)),
-        ?assert(failed_state_not_updated())
-  end).
-
-
 % Regular change, parse to a #rep{} and then add job but there is already
 % a running job with same Id found.
 t_change_with_existing_job() ->
@@ -797,21 +870,6 @@ t_ejson_docs() ->
     end).
 
 
-% Check that when cluster membership changes records from doc processor and job
-% scheduler get removed
-t_cluster_membership_foldl() ->
-   ?_test(begin
-        mock_existing_jobs_lookup([test_rep(?R1)]),
-        ?assertEqual(ok, process_change(?DB, change())),
-        meck:expect(couch_replicator_clustering, owner, 2, different_node),
-        ?assert(ets:member(?MODULE, {?DB, ?DOC1})),
-        gen_server:cast(?MODULE, {cluster, stable}),
-        meck:wait(2, couch_replicator_scheduler, find_jobs_by_doc, 2, 5000),
-        ?assertNot(ets:member(?MODULE, {?DB, ?DOC1})),
-        ?assert(removed_job(?R1))
-   end).
-
-
 get_worker_ref_test_() ->
     {
         setup,
@@ -834,7 +892,7 @@ get_worker_ref_test_() ->
 % Test helper functions
 
 
-setup_all() ->
+setup() ->
     meck:expect(couch_log, info, 2, ok),
     meck:expect(couch_log, notice, 2, ok),
     meck:expect(couch_log, warning, 2, ok),
@@ -844,38 +902,18 @@ setup_all() ->
     meck:expect(couch_replicator_clustering, owner, 2, node()),
     meck:expect(couch_replicator_clustering, link_cluster_event_listener, 3,
         ok),
-    meck:expect(couch_replicator_doc_processor_worker, spawn_worker, fun
-        ({?EXIT_DB, _}, _, _, _) -> exit(kapow);
-        (_, _, _, _) -> pid
-    end),
+    meck:expect(couch_replicator_doc_processor_worker, spawn_worker, 4, pid),
     meck:expect(couch_replicator_scheduler, remove_job, 1, ok),
     meck:expect(couch_replicator_docs, remove_state_fields, 2, ok),
-    meck:expect(couch_replicator_docs, update_failed, 3, ok).
-
-
-teardown_all(_) ->
-    meck:unload().
-
-
-setup() ->
-    meck:reset([
-        config,
-        couch_log,
-        couch_replicator_clustering,
-        couch_replicator_doc_processor_worker,
-        couch_replicator_docs,
-        couch_replicator_scheduler
-    ]),
-    % Set this expectation back to the default for
-    % each test since some tests change it
-    meck:expect(couch_replicator_clustering, owner, 2, node()),
+    meck:expect(couch_replicator_docs, update_failed, 3, ok),
     {ok, Pid} = start_link(),
-    unlink(Pid),
     Pid.
 
 
 teardown(Pid) ->
-    exit(Pid, kill).
+    unlink(Pid),
+    exit(Pid, kill),
+    meck:unload().
 
 
 removed_state_fields() ->
@@ -901,14 +939,10 @@ did_not_spawn_worker() ->
 updated_doc_with_failed_state() ->
     1 == meck:num_calls(couch_replicator_docs, update_failed, '_').
 
-failed_state_not_updated() ->
-    0 == meck:num_calls(couch_replicator_docs, update_failed, '_').
 
 mock_existing_jobs_lookup(ExistingJobs) ->
-    meck:expect(couch_replicator_scheduler, find_jobs_by_doc, fun
-        (?EXIT_DB, ?DOC1) -> [];
-        (?DB, ?DOC1) -> ExistingJobs
-    end).
+    meck:expect(couch_replicator_scheduler, find_jobs_by_doc,
+        fun(?DB, ?DOC1) -> ExistingJobs end).
 
 
 test_rep(Id) ->
@@ -917,7 +951,7 @@ test_rep(Id) ->
 
 change() ->
     {[
-        {<<"id">>, ?DOC1},
+        {?REP_ID, ?DOC1},
         {doc, {[
             {<<"_id">>, ?DOC1},
             {<<"source">>, <<"http://srchost.local/src">>},
@@ -928,7 +962,7 @@ change() ->
 
 change(State) ->
     {[
-        {<<"id">>, ?DOC1},
+        {?REP_ID, ?DOC1},
         {doc, {[
             {<<"_id">>, ?DOC1},
             {<<"source">>, <<"http://srchost.local/src">>},
@@ -940,7 +974,7 @@ change(State) ->
 
 deleted_change() ->
     {[
-        {<<"id">>, ?DOC1},
+        {?REP_ID, ?DOC1},
         {<<"deleted">>, true},
         {doc, {[
             {<<"_id">>, ?DOC1},
@@ -952,7 +986,7 @@ deleted_change() ->
 
 bad_change() ->
     {[
-        {<<"id">>, ?DOC2},
+        {?REP_ID, ?DOC2},
         {doc, {[
             {<<"_id">>, ?DOC2},
             {<<"source">>, <<"src">>}
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index 6190632..ef28b0e 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -20,14 +20,14 @@
     parse_rep_doc_without_id/2,
     before_doc_update/3,
     after_doc_read/2,
+    ensure_rep_db_exists/0,
     ensure_rep_ddoc_exists/1,
-    ensure_cluster_rep_ddoc_exists/1,
     remove_state_fields/2,
     update_doc_completed/3,
     update_failed/3,
     update_rep_id/1,
-    update_triggered/2,
-    update_error/2
+    update_triggered/3,
+    update_error/4
 ]).
 
 
@@ -56,6 +56,23 @@
 -define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}).
 -define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
 
+-define(DEFAULT_SOCK_OPTS, "[{keepalive, true}, {nodelay, false}]").
+-define(VALID_SOCK_OPTS, [buffer, delay_send, exit_on_close, ipv6_v6only,
+    keepalive, nodelay, recbuf, send_timeout, send_timout_close, sndbuf,
+    priority, tos, tclass
+]).
+
+-define(CONFIG_DEFAULTS, [
+    {"worker_processes",    "4",                fun list_to_integer/1},
+    {"worker_batch_size",   "500",              fun list_to_integer/1},
+    {"http_connections",    "20",               fun list_to_integer/1},
+    {"connection_timeout",  "30000",            fun list_to_integer/1},
+    {"retries_per_request", "5",                fun list_to_integer/1},
+    {"use_checkpoints",     "true",             fun list_to_existing_atom/1},
+    {"checkpoint_interval", "30000",            fun list_to_integer/1},
+    {"socket_options",      ?DEFAULT_SOCK_OPTS, fun parse_sock_opts/1}
+]).
+
 
 remove_state_fields(DbName, DocId) ->
     update_rep_doc(DbName, DocId, [
@@ -89,28 +106,22 @@ update_failed(DbName, DocId, Error) ->
         failed_state_updates]).
 
 
--spec update_triggered(#rep{}, rep_id()) -> ok.
-update_triggered(Rep, {Base, Ext}) ->
-    #rep{
-        db_name = DbName,
-        doc_id = DocId
-    } = Rep,
+-spec update_triggered(binary(), binary(), binary()) -> ok.
+update_triggered(Id, DocId, DbName) ->
     update_rep_doc(DbName, DocId, [
         {<<"_replication_state">>, <<"triggered">>},
         {<<"_replication_state_reason">>, undefined},
-        {<<"_replication_id">>, iolist_to_binary([Base, Ext])},
+        {<<"_replication_id">>, Id},
         {<<"_replication_stats">>, undefined}]),
     ok.
 
 
--spec update_error(#rep{}, any()) -> ok.
-update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) ->
+-spec update_error(binary(), binary(), binary(), any()) -> ok.
+update_error(RepId0, DbName, DocId, Error) ->
     Reason = error_reason(Error),
-    BinRepId = case RepId of
-        {Base, Ext} ->
-            iolist_to_binary([Base, Ext]);
-        _Other ->
-            null
+    RepId = case RepId0 of
+        Id when is_binary(Id) -> Id;
+        _Other -> null
     end,
     update_rep_doc(DbName, DocId, [
         {<<"_replication_state">>, <<"error">>},
@@ -120,98 +131,19 @@ update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) ->
     ok.
 
 
--spec ensure_rep_ddoc_exists(binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb) ->
-    case mem3:belongs(RepDb, ?REP_DESIGN_DOC) of
-        true ->
-            ensure_rep_ddoc_exists(RepDb, ?REP_DESIGN_DOC);
-        false ->
-            ok
+-spec ensure_rep_db_exists() -> ok.
+ensure_rep_db_exists() ->
+    Opts = [?CTX, sys_db],
+    case fabric2_db:create(?REP_DB_NAME, [?CTX, sys_db]) of
+        {error, file_exists} -> ok;
+        {ok, _Db} -> ok
     end.
 
 
--spec ensure_rep_ddoc_exists(binary(), binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb, DDocId) ->
-    case open_rep_doc(RepDb, DDocId) of
-        {not_found, no_db_file} ->
-            %% database was deleted.
-            ok;
-        {not_found, _Reason} ->
-            DocProps = replication_design_doc_props(DDocId),
-            DDoc = couch_doc:from_json_obj({DocProps}),
-            couch_log:notice("creating replicator ddoc ~p", [RepDb]),
-            {ok, _Rev} = save_rep_doc(RepDb, DDoc);
-        {ok, Doc} ->
-            Latest = replication_design_doc_props(DDocId),
-            {Props0} = couch_doc:to_json_obj(Doc, []),
-            {value, {_, Rev}, Props} = lists:keytake(<<"_rev">>, 1, Props0),
-            case compare_ejson({Props}, {Latest}) of
-                true ->
-                    ok;
-                false ->
-                    LatestWithRev = [{<<"_rev">>, Rev} | Latest],
-                    DDoc = couch_doc:from_json_obj({LatestWithRev}),
-                    couch_log:notice("updating replicator ddoc ~p", [RepDb]),
-                    try
-                        {ok, _} = save_rep_doc(RepDb, DDoc)
-                    catch
-                        throw:conflict ->
-                            %% ignore, we'll retry next time
-                            ok
-                    end
-            end
-    end,
-    ok.
-
-
--spec ensure_cluster_rep_ddoc_exists(binary()) -> ok.
-ensure_cluster_rep_ddoc_exists(RepDb) ->
-    DDocId = ?REP_DESIGN_DOC,
-    [#shard{name = DbShard} | _] = mem3:shards(RepDb, DDocId),
-    ensure_rep_ddoc_exists(DbShard, DDocId).
-
-
--spec compare_ejson({[_]}, {[_]}) -> boolean().
-compare_ejson(EJson1, EJson2) ->
-    EjsonSorted1 = couch_replicator_filters:ejsort(EJson1),
-    EjsonSorted2 = couch_replicator_filters:ejsort(EJson2),
-    EjsonSorted1 == EjsonSorted2.
-
-
--spec replication_design_doc_props(binary()) -> [_].
-replication_design_doc_props(DDocId) ->
-    [
-        {<<"_id">>, DDocId},
-        {<<"language">>, <<"javascript">>},
-        {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
-    ].
-
-
-% Note: parse_rep_doc can handle filtered replications. During parsing of the
-% replication doc it will make possibly remote http requests to the source
-% database. If failure or parsing of filter docs fails, parse_doc throws a
-% {filter_fetch_error, Error} excation. This exception should be considered
-% transient in respect to the contents of the document itself, since it depends
-% on netowrk availability of the source db and other factors.
--spec parse_rep_doc({[_]}) -> #rep{}.
-parse_rep_doc(RepDoc) ->
-    {ok, Rep} = try
-        parse_rep_doc(RepDoc, rep_user_ctx(RepDoc))
-    catch
-        throw:{error, Reason} ->
-            throw({bad_rep_doc, Reason});
-        throw:{filter_fetch_error, Reason} ->
-            throw({filter_fetch_error, Reason});
-        Tag:Err ->
-            throw({bad_rep_doc, to_binary({Tag, Err})})
-    end,
-    Rep.
-
-
--spec parse_rep_doc_without_id({[_]}) -> #rep{}.
+-spec parse_rep_doc_without_id({[_]}) -> #{}.
 parse_rep_doc_without_id(RepDoc) ->
     {ok, Rep} = try
-        parse_rep_doc_without_id(RepDoc, rep_user_ctx(RepDoc))
+        parse_rep_doc_without_id(RepDoc, null)
     catch
         throw:{error, Reason} ->
             throw({bad_rep_doc, Reason});
@@ -221,11 +153,12 @@ parse_rep_doc_without_id(RepDoc) ->
     Rep.
 
 
--spec parse_rep_doc({[_]}, #user_ctx{}) -> {ok, #rep{}}.
-parse_rep_doc(Doc, UserCtx) ->
-    {ok, Rep} = parse_rep_doc_without_id(Doc, UserCtx),
-    Cancel = get_value(cancel, Rep#rep.options, false),
-    Id = get_value(id, Rep#rep.options, nil),
+-spec parse_rep_doc({[_]}, user_name()) -> {ok, #{}}.
+parse_rep_doc({[_]} = Doc, UserName) ->
+    {ok, Rep} = parse_rep_doc_without_id(Doc, UserName),
+    #{<<"options">> := Options} = Rep,
+    Cancel = maps:get(<<"cancel">>, Options, false),
+    Id = maps:get(<<"id">>, Options, nil),
     case {Cancel, Id} of
         {true, nil} ->
             % Cancel request with no id, must parse id out of body contents
@@ -239,47 +172,62 @@ parse_rep_doc(Doc, UserCtx) ->
     end.
 
 
--spec parse_rep_doc_without_id({[_]}, #user_ctx{}) -> {ok, #rep{}}.
-parse_rep_doc_without_id({Props}, UserCtx) ->
-    {SrcProxy, TgtProxy} = parse_proxy_settings(Props),
-    Opts = make_options(Props),
-    case get_value(cancel, Opts, false) andalso
-        (get_value(id, Opts, nil) =/= nil) of
+-spec parse_rep_doc_without_id({[_]} | #{}, user_name()) -> {ok, #{}}.
+parse_rep_doc_without_id({[_]} = EJson, UserName) ->
+    % Normalize all field names to be binaries and turn into a map
+    Map = ?JSON_DECODE(?JSON_ENCODE(EJson)),
+    parse_rep_doc_without_id(Map, UserName);
+
+parse_rep_doc_without_id(#{} = Doc, UserName) ->
+    {SrcProxy, TgtProxy} = parse_proxy_settings(Doc),
+    Opts = make_options(Doc),
+    Cancel = maps:get(<<"cancel">>, Opts, false),
+    Id = maps:get(<<"id">>, Opts, nil),
+    case Cancel andalso Id =/= nil of
     true ->
-        {ok, #rep{options = Opts, user_ctx = UserCtx}};
+        {ok, #{<<"options">> => Opts, <<"user">> => UserName}};
     false ->
-        Source = parse_rep_db(get_value(<<"source">>, Props), SrcProxy, Opts),
-        Target = parse_rep_db(get_value(<<"target">>, Props), TgtProxy, Opts),
+        #{<<"source">> := Source0, <<"target">> := Target0} = Doc,
+        Source = parse_rep_db(Source0, SrcProxy, Opts),
+        Target = parse_rep_db(Target0, TgtProxy, Opts),
         {Type, View} = case couch_replicator_filters:view_type(Props, Opts) of
-        {error, Error} ->
-            throw({bad_request, Error});
-        Result ->
-            Result
+            {error, Error} ->
+                throw({bad_request, Error});
+            Result ->
+                Result
         end,
-        Rep = #rep{
-            source = Source,
-            target = Target,
-            options = Opts,
-            user_ctx = UserCtx,
-            type = Type,
-            view = View,
-            doc_id = get_value(<<"_id">>, Props, null)
-        },
-        % Check if can parse filter code, if not throw exception
-        case couch_replicator_filters:parse(Opts) of
-        {error, FilterError} ->
-            throw({error, FilterError});
-        {ok, _Filter} ->
-             ok
+        FilterType = couch_replicator_filters:parse(Options) of
+            {ok, nil} -> null;
+            {ok, {user, _FName, _QP}} -> <<"user">>;
+            {ok, {view, _FName, _QP}} -> <<"view">>;
+            {ok, {docids, _DocIds}} -> <<"doc_ids">>;
+            {ok, {mango, _Selector}} -> <<"mango">>;
+            {error, FilterError} -> throw({error, FilterError})
         end,
+        Rep = #{
+            <<"id">> => null,
+            <<"base_id">> => null,
+            ?SOURCE => Source,
+            ?TARGET => Target,
+            <<"options">> => Opts,
+            <<"user">> => UserName,
+            <<"filter_type">> => FilterType,
+            <<"type">> => Type,
+            <<"view">> => View,
+            ?DOC_ID => maps:get(<<"_id">>, Doc, null),
+            ?DB_NAME => null,
+            ?DOC_STATE => null,
+            ?START_TIME => erlang:system_time(),
+            ?VER => fabric2_util:uuid()
+        },
         {ok, Rep}
     end.
 
 
-parse_proxy_settings(Props) when is_list(Props) ->
-    Proxy = get_value(<<"proxy">>, Props, <<>>),
-    SrcProxy = get_value(<<"source_proxy">>, Props, <<>>),
-    TgtProxy = get_value(<<"target_proxy">>, Props, <<>>),
+parse_proxy_settings(#{} = Doc) ->
+    Proxy = maps:get(<<"proxy">>, Doc, <<>>),
+    SrcProxy = maps:get(<<"source_proxy">>, Doc, <<>>),
+    TgtProxy = maps:get(<<"target_proxy">>, Doc, <<>>),
 
     case Proxy =/= <<>> of
         true when SrcProxy =/= <<>> ->
@@ -289,9 +237,9 @@ parse_proxy_settings(Props) when is_list(Props) ->
             Error = "`proxy` is mutually exclusive with `target_proxy`",
             throw({bad_request, Error});
         true ->
-            {Proxy, Proxy};
+            {parse_proxy_params(Proxy), parse_proxy_params(Proxy)};
         false ->
-            {SrcProxy, TgtProxy}
+            {parse_proxy_params(SrcProxy), parse_proxy_params(TgtProxy)}
     end.
 
 
@@ -299,9 +247,10 @@ parse_proxy_settings(Props) when is_list(Props) ->
 % fetching a filter from the source db, and so it could fail intermetently.
 % In case of a failure to fetch the filter this function will throw a
 %  `{filter_fetch_error, Reason} exception.
-update_rep_id(Rep) ->
-    RepId = couch_replicator_ids:replication_id(Rep),
-    Rep#rep{id = RepId}.
+update_rep_id(#{} = Rep) ->
+    {BaseId, ExtId} = couch_replicator_ids:replication_id(Rep),
+    RepId = erlang:iolist_to_binary([BaseId, ExtId]),
+    Rep#{<<"id">> => RepId, <<"base_id">> = BaseId}.
 
 
 update_rep_doc(RepDbName, RepDocId, KVs) ->
@@ -354,22 +303,21 @@ update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) ->
 
 
 open_rep_doc(DbName, DocId) ->
-    case couch_db:open_int(DbName, [?CTX, sys_db]) of
-        {ok, Db} ->
-            try
-                couch_db:open_doc(Db, DocId, [ejson_body])
-            after
-                couch_db:close(Db)
-            end;
-        Else ->
-            Else
+    try
+        case fabric2_db:open(DbName, [?CTX, sys_db]) of
+            {ok, Db} -> fabric2_db:open_doc(Db, DocId, [ejson_body]);
+            Else -> Else
+        end
+    catch
+        error:database_does_not_exist ->
+            {not_found, database_does_not_exist}
     end.
 
 
 save_rep_doc(DbName, Doc) ->
-    {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
+    {ok, Db} = fabric2_db:open(DbName, [?CTX, sys_db]),
     try
-        couch_db:update_doc(Db, Doc, [])
+        fabric2_db:update_doc(Db, Doc, [])
     catch
         % User can accidently write a VDU which prevents _replicator from
         % updating replication documents. Avoid crashing replicator and thus
@@ -378,54 +326,48 @@ save_rep_doc(DbName, Doc) ->
             Msg = "~p VDU function preventing doc update to ~s ~s ~p",
             couch_log:error(Msg, [?MODULE, DbName, Doc#doc.id, Reason]),
             {ok, forbidden}
-    after
-        couch_db:close(Db)
-    end.
-
-
--spec rep_user_ctx({[_]}) -> #user_ctx{}.
-rep_user_ctx({RepDoc}) ->
-    case get_json_value(<<"user_ctx">>, RepDoc) of
-    undefined ->
-        #user_ctx{};
-    {UserCtx} ->
-        #user_ctx{
-            name = get_json_value(<<"name">>, UserCtx, null),
-            roles = get_json_value(<<"roles">>, UserCtx, [])
-        }
     end.
 
 
--spec parse_rep_db({[_]} | binary(), binary(), [_]) -> #httpd{} | binary().
-parse_rep_db({Props}, Proxy, Options) ->
-    ProxyParams = parse_proxy_params(Proxy),
+-spec parse_rep_db(#{}, #{}, #{}) -> #{}.
+parse_rep_db(#{} = Endpoint, #{} = ProxyParams, #{} = Options) ->
     ProxyURL = case ProxyParams of
-        [] -> undefined;
-        _ -> binary_to_list(Proxy)
+       #{<<"proxy_url">> := PUrl} -> PUrl;
+       _ -> null
     end,
-    Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)),
-    {AuthProps} = get_value(<<"auth">>, Props, {[]}),
-    {BinHeaders} = get_value(<<"headers">>, Props, {[]}),
-    Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]),
-    DefaultHeaders = (#httpdb{})#httpdb.headers,
-    #httpdb{
-        url = Url,
-        auth_props = AuthProps,
-        headers = lists:ukeymerge(1, Headers, DefaultHeaders),
-        ibrowse_options = lists:keysort(1,
-            [{socket_options, get_value(socket_options, Options)} |
-                ProxyParams ++ ssl_params(Url)]),
-        timeout = get_value(connection_timeout, Options),
-        http_connections = get_value(http_connections, Options),
-        retries = get_value(retries, Options),
-        proxy_url = ProxyURL
-    };
+
+    Url0 = maps:get(<<"url">>, Endpoint),
+    Url = maybe_add_trailing_slash(Url0),
+
+    AuthProps = maps:get(<<"auth">>, Endpoint, #{}),
+
+    Headers0 = maps:get(<<"headers">>, Endpoint, #{}),
+    DefaultHeaders = couch_replicator_utils:get_default_headers(),
+    % For same keys values in second map override those in the first
+    Headers = maps:merge(DefaultHeaders, Headers0),
+
+    SockOpts = maps:get(<<"socket_options">>, Options, #{}),
+    SockAndProxy = maps:merge(SockOpts, ProxyParams),
+
+    SslParams = ssl_params(Url),
+
+    #{
+        <<"url">> => Url,
+        <<"auth_props">> => AuthProps,
+        <<"headers">> => Headers,
+        <<"ibrowse_options">> => maps:merge(SslParams, SockAndProxy),
+        <<"timeout">> => maps:get(<<"timeout">>, Options),
+        <<"http_connections">> => maps:get(<<"http_connections">>, Options),
+        <<"retries">> => maps:get(<<"retries">>, Options)
+        <<"proxy_url">> => ProxyUrl
+    }.
+
 
 parse_rep_db(<<"http://", _/binary>> = Url, Proxy, Options) ->
-    parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options);
+    parse_rep_db(#{<<"url">> => Url}, Proxy, Options);
 
 parse_rep_db(<<"https://", _/binary>> = Url, Proxy, Options) ->
-    parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options);
+    parse_rep_db(#{<<"url">> => Url}, Proxy, Options);
 
 parse_rep_db(<<_/binary>>, _Proxy, _Options) ->
     throw({error, local_endpoints_not_supported});
@@ -434,118 +376,99 @@ parse_rep_db(undefined, _Proxy, _Options) ->
     throw({error, <<"Missing replicator database">>}).
 
 
--spec maybe_add_trailing_slash(binary() | list()) -> list().
+-spec maybe_add_trailing_slash(binary()) -> binary().
+maybe_add_trailing_slash(<<>>) ->
+    <<>>;
+
 maybe_add_trailing_slash(Url) when is_binary(Url) ->
-    maybe_add_trailing_slash(?b2l(Url));
-maybe_add_trailing_slash(Url) ->
-    case lists:member($?, Url) of
-        true ->
-            Url;  % skip if there are query params
-        false ->
-            case lists:last(Url) of
-                $/ ->
-                    Url;
-                _ ->
-                    Url ++ "/"
-            end
+    case binary:match(Url, <<"?">>) of
+        nomatch ->
+            case binary:last(Url) of
+                $/  -> Url;
+                _ -> <<Url/binary, "/">>;
+        _ ->
+            Url  % skip if there are query params
     end.
 
 
--spec make_options([_]) -> [_].
-make_options(Props) ->
-    Options0 = lists:ukeysort(1, convert_options(Props)),
+-spec make_options(#{}) -> #{}.
+make_options(#{} = RepDoc) ->
+    Options0 = maps:fold(fun convert_options/3, #{}, RepDoc)
     Options = check_options(Options0),
-    DefWorkers = config:get("replicator", "worker_processes", "4"),
-    DefBatchSize = config:get("replicator", "worker_batch_size", "500"),
-    DefConns = config:get("replicator", "http_connections", "20"),
-    DefTimeout = config:get("replicator", "connection_timeout", "30000"),
-    DefRetries = config:get("replicator", "retries_per_request", "5"),
-    UseCheckpoints = config:get("replicator", "use_checkpoints", "true"),
-    DefCheckpointInterval = config:get("replicator", "checkpoint_interval",
-        "30000"),
-    {ok, DefSocketOptions} = couch_util:parse_term(
-        config:get("replicator", "socket_options",
-            "[{keepalive, true}, {nodelay, false}]")),
-    lists:ukeymerge(1, Options, lists:keysort(1, [
-        {connection_timeout, list_to_integer(DefTimeout)},
-        {retries, list_to_integer(DefRetries)},
-        {http_connections, list_to_integer(DefConns)},
-        {socket_options, DefSocketOptions},
-        {worker_batch_size, list_to_integer(DefBatchSize)},
-        {worker_processes, list_to_integer(DefWorkers)},
-        {use_checkpoints, list_to_existing_atom(UseCheckpoints)},
-        {checkpoint_interval, list_to_integer(DefCheckpointInterval)}
-    ])).
-
-
--spec convert_options([_]) -> [_].
-convert_options([])->
-    [];
-convert_options([{<<"cancel">>, V} | _R]) when not is_boolean(V)->
+    ConfigOptions = lists:foldl(fun({K, Default, ConversionFun}, Acc) ->
+        V = ConversionFun(config:get("replicator", K, Default)),
+        Acc#{list_to_binary(K) => V}
+    end, #{}, ?CONFIG_DEFAULTS),
+    maps:merge(ConfigOptions, Options).
+
+
+-spec convert_options(binary(), any(), #{}) -> #{}.
+convert_options(<<"cancel">>, V, _Acc) when not is_boolean(V)->
     throw({bad_request, <<"parameter `cancel` must be a boolean">>});
-convert_options([{<<"cancel">>, V} | R]) ->
-    [{cancel, V} | convert_options(R)];
-convert_options([{IdOpt, V} | R]) when IdOpt =:= <<"_local_id">>;
+convert_options(<<"cancel">>, V, Acc) ->
+    Acc#{<<"cancel">> => V};
+convert_options(IdOpt, V, Acc) when IdOpt =:= <<"_local_id">>;
         IdOpt =:= <<"replication_id">>; IdOpt =:= <<"id">> ->
-    [{id, couch_replicator_ids:convert(V)} | convert_options(R)];
-convert_options([{<<"create_target">>, V} | _R]) when not is_boolean(V)->
+    Acc#{<<"id">> => couch_replicator_ids:convert(V)};
+convert_options(<<"create_target">>, V, _Acc) when not is_boolean(V)->
     throw({bad_request, <<"parameter `create_target` must be a boolean">>});
-convert_options([{<<"create_target">>, V} | R]) ->
-    [{create_target, V} | convert_options(R)];
-convert_options([{<<"create_target_params">>, V} | _R]) when not is_tuple(V) ->
+convert_options(<<"create_target">>, V, Acc) ->
+    Acc#{<<"create_target">> => V};
+convert_options(<<"create_target_params">>, V, _Acc) when not is_tuple(V) ->
     throw({bad_request,
         <<"parameter `create_target_params` must be a JSON object">>});
-convert_options([{<<"create_target_params">>, V} | R]) ->
-    [{create_target_params, V} | convert_options(R)];
-convert_options([{<<"continuous">>, V} | _R]) when not is_boolean(V)->
+convert_options(<<"create_target_params">>, V, Acc) ->
+    Acc#{<<"create_target_params">> => V};
+convert_options(<<"continuous">>, V, Acc) when not is_boolean(V)->
     throw({bad_request, <<"parameter `continuous` must be a boolean">>});
-convert_options([{<<"continuous">>, V} | R]) ->
-    [{continuous, V} | convert_options(R)];
-convert_options([{<<"filter">>, V} | R]) ->
-    [{filter, V} | convert_options(R)];
-convert_options([{<<"query_params">>, V} | R]) ->
-    [{query_params, V} | convert_options(R)];
-convert_options([{<<"doc_ids">>, null} | R]) ->
-    convert_options(R);
-convert_options([{<<"doc_ids">>, V} | _R]) when not is_list(V) ->
+convert_options(<<"continuous">>, V, Acc) ->
+    Acc#{<<"continuous">> => V};
+convert_options(<<"filter">>, V, Acc) ->
+    Acc#{<<"filter">> => V};
+convert_options(<<"query_params">>, V, Acc) ->
+    Acc#{<<"query_params">> => V};
+convert_options(<<"doc_ids">>, null, Acc) ->
+    Acc;
+convert_options(<<"doc_ids">>, V, _Acc) when not is_list(V) ->
     throw({bad_request, <<"parameter `doc_ids` must be an array">>});
-convert_options([{<<"doc_ids">>, V} | R]) ->
+convert_options(<<"doc_ids">>, V, Acc) ->
     % Ensure same behaviour as old replicator: accept a list of percent
     % encoded doc IDs.
     DocIds = lists:usort([?l2b(couch_httpd:unquote(Id)) || Id <- V]),
-    [{doc_ids, DocIds} | convert_options(R)];
-convert_options([{<<"selector">>, V} | _R]) when not is_tuple(V) ->
+    Acc#{<<"doc_ids">> => DocIds};
+convert_options(<<"selector">>, V, _Acc) when not is_tuple(V) ->
     throw({bad_request, <<"parameter `selector` must be a JSON object">>});
-convert_options([{<<"selector">>, V} | R]) ->
-    [{selector, V} | convert_options(R)];
-convert_options([{<<"worker_processes">>, V} | R]) ->
-    [{worker_processes, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"worker_batch_size">>, V} | R]) ->
-    [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"http_connections">>, V} | R]) ->
-    [{http_connections, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"connection_timeout">>, V} | R]) ->
-    [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"retries_per_request">>, V} | R]) ->
-    [{retries, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"socket_options">>, V} | R]) ->
-    {ok, SocketOptions} = couch_util:parse_term(V),
-    [{socket_options, SocketOptions} | convert_options(R)];
-convert_options([{<<"since_seq">>, V} | R]) ->
-    [{since_seq, V} | convert_options(R)];
-convert_options([{<<"use_checkpoints">>, V} | R]) ->
-    [{use_checkpoints, V} | convert_options(R)];
-convert_options([{<<"checkpoint_interval">>, V} | R]) ->
-    [{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([_ | R]) -> % skip unknown option
-    convert_options(R).
-
-
--spec check_options([_]) -> [_].
+convert_options(<<"selector">>, V, Acc) ->
+    Acc#{<<"selector">> => V};
+convert_options(<<"worker_processes">>, V, Acc) ->
+    Acc#{<<"worker_processes">> => couch_util:to_integer(V)};
+convert_options(<<"worker_batch_size">>, V, Acc) ->
+    Acc#{<<"worker_batch_size">> => couch_util:to_integer(V)};
+convert_options(<<"http_connections">>, V, Acc) ->
+    Acc#{<<"http_connections">> => couch_util:to_integer(V)};
+convert_options(<<"connection_timeout">>, V, Acc) ->
+    Acc#{<<"connection_timeout">> => couch_util:to_integer(V)};
+convert_options(<<"retries_per_request">>, V, Acc) ->
+    Acc#{<<"retries">> => couch_util:to_integer(V)};
+convert_options(<<"socket_options">>, V, Acc) ->
+    Acc#{<<"socket_options">> => parse_sock_opts(V)};
+convert_options(<<"since_seq">>, V, Acc) ->
+    Acc#{<<"since_seq">> => V};
+convert_options(<<"use_checkpoints">>, V, Acc) when not is_boolean(V)->
+    throw({bad_request, <<"parameter `use_checkpoints` must be a boolean">>});
+convert_options(<<"use_checkpoints">>, V, Acc) ->
+    Acc#{<<"use_checkpoints">> => V};
+convert_options(<<"checkpoint_interval">>, V, Acc) ->
+    Acc#{<<"checkpoint_interval">>, couch_util:to_integer(V)};
+convert_options(_K, _V, Acc) -> % skip unknown option
+    Acc.
+
+
+-spec check_options(#{}) -> #{}.
 check_options(Options) ->
-    DocIds = lists:keyfind(doc_ids, 1, Options),
-    Filter = lists:keyfind(filter, 1, Options),
-    Selector = lists:keyfind(selector, 1, Options),
+    DocIds = maps:is_key(<<"doc_ids">>, Options),
+    Filter = maps:is_key(<<"filter">>, Options),
+    Selector = maps:is_key(<<"selector">>, Options),
     case {DocIds, Filter, Selector} of
         {false, false, false} -> Options;
         {false, false, _} -> Options;
@@ -557,66 +480,113 @@ check_options(Options) ->
     end.
 
 
--spec parse_proxy_params(binary() | [_]) -> [_].
-parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
-    parse_proxy_params(?b2l(ProxyUrl));
-parse_proxy_params([]) ->
-    [];
-parse_proxy_params(ProxyUrl) ->
+parse_sock_opts(V) ->
+    {ok, SocketOptions} = couch_util:parse_term(V),
+    lists:foldl(fun
+        ({K, V}, Acc) when is_atom(K) ->
+            case lists:member(K, ?VALID_SOCKET_OPTIONS) of
+                true -> Acc#{atom_to_binary(K) => V};
+                false -> Acc
+            end;
+        (_, Acc) ->
+            Acc
+    end, #{}, SocketOptions).
+
+
+-spec parse_proxy_params(binary() | #{}) -> #{}.
+parse_proxy_params(<<>>) ->
+    #{};
+parse_proxy_params(ProxyUrl0) when is_binary(ProxyUrl0)->
+    ProxyUrl = binary_to_list(ProxyUrl0),
     #url{
         host = Host,
         port = Port,
         username = User,
         password = Passwd,
-        protocol = Protocol
+        protocol = Protocol0
     } = ibrowse_lib:parse_url(ProxyUrl),
-    [
-        {proxy_protocol, Protocol},
-        {proxy_host, Host},
-        {proxy_port, Port}
-    ] ++ case is_list(User) andalso is_list(Passwd) of
+    Protocol = atom_to_binary(Protocol, utf8),
+    case lists:member(Protocol, [<<"http">>, <<"https">>, <<"socks5">>]) of
+        true ->
+            atom_to_binary(Protocol, utf8);
         false ->
-            [];
+            Error = <<"Unsupported proxy protocol", Protocol/binary>>,
+            throw({bad_request, Error})
+    end,
+    ProxyParams = #{
+        <<"proxy_url">> => ProxyUrl,
+        <<"proxy_protocol">> => Protocol,
+        <<"proxy_host">> => list_to_binary(Host),
+        <<"proxy_port">> => Port
+    #},
+    case is_list(User) andalso is_list(Passwd) of
         true ->
-            [{proxy_user, User}, {proxy_password, Passwd}]
-        end.
+            ProxyParams#{
+                <<"proxy_user">> => list_to_binary(User),
+                <<"proxy_password">> => list_to_binary(Passwd)
+            };
+        false ->
+            ProxyParams
+    end.
 
 
--spec ssl_params([_]) -> [_].
+-spec ssl_params(binary()) -> #{}.
 ssl_params(Url) ->
-    case ibrowse_lib:parse_url(Url) of
+    case ibrowse_lib:parse_url(binary_to_list(Url)) of
     #url{protocol = https} ->
         Depth = list_to_integer(
             config:get("replicator", "ssl_certificate_max_depth", "3")
         ),
         VerifyCerts = config:get("replicator", "verify_ssl_certificates"),
-        CertFile = config:get("replicator", "cert_file", undefined),
-        KeyFile = config:get("replicator", "key_file", undefined),
-        Password = config:get("replicator", "password", undefined),
-        SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts =:= "true")],
-        SslOpts1 = case CertFile /= undefined andalso KeyFile /= undefined of
+        CertFile = config:get("replicator", "cert_file", null),
+        KeyFile = config:get("replicator", "key_file", null),
+        Password = config:get("replicator", "password", null),
+        VerifySslOptions = ssl_verify_options(VerifyCerts =:= "true"),
+        SslOpts = maps:merge(VerifySslOptions, #{<<"depth">> => Depth}),
+        SslOpts1 = case CertFile /= null andalso KeyFile /= null of
             true ->
-                case Password of
-                    undefined ->
-                        [{certfile, CertFile}, {keyfile, KeyFile}] ++ SslOpts;
+                CertFileOpts = case Password of
+                    null ->
+                        #{
+                            <<"certfile">> => list_to_binary(CertFile),
+                            <<"keyfile">> => list_to_binary(KeyFile)
+                        };
                     _ ->
-                        [{certfile, CertFile}, {keyfile, KeyFile},
-                            {password, Password}] ++ SslOpts
-                end;
-            false -> SslOpts
+                        #{
+                            <<"certfile">> => list_to_binary(CertFile),
+                            <<"keyfile">> => list_to_binary(KeyFile),
+                            <<"password">> => list_to_binary(Password)
+                        }
+                end,
+                maps:merge(SslOpts, CertFileOpts)
+            false ->
+                SslOpts
         end,
-        [{is_ssl, true}, {ssl_options, SslOpts1}];
+        #{<<"is_ssl">> => true, <<"ssl_options">> => SslOpts1};
     #url{protocol = http} ->
-        []
+        #{}
     end.
 
 
 -spec ssl_verify_options(true | false) -> [_].
 ssl_verify_options(true) ->
-    CAFile = config:get("replicator", "ssl_trusted_certificates_file"),
-    [{verify, verify_peer}, {cacertfile, CAFile}];
+    case config:get("replicator", "ssl_trusted_certificates_file", undefined) of
+        undefined ->
+            #{
+                <<"verify">> => <<"verify_peer">>,
+                <<"cacertfile">> => null
+            };
+        CAFile when is_list(CAFile) ->
+            #{
+                <<"verify">> => <<"verify_peer">>,
+                <<"cacertfile">> => list_to_binary(CAFile)
+            }
+    end;
+
 ssl_verify_options(false) ->
-    [{verify, verify_none}].
+    #{
+        <<"verify">> => <<"verify_none">>
+    }.
 
 
 -spec before_doc_update(#doc{}, Db::any(), couch_db:update_type()) -> #doc{}.
@@ -626,8 +596,9 @@ before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
     #user_ctx{
        roles = Roles,
        name = Name
-    } = couch_db:get_user_ctx(Db),
-    case lists:member(<<"_replicator">>, Roles) of
+    } = fabric2_db:get_user_ctx(Db),
+    IsReplicator = case lists:member(<<"_replicator">>, Roles),
+    Doc1 = case IsReplicator of
     true ->
         Doc;
     false ->
@@ -637,7 +608,7 @@ before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
         Name ->
             Doc;
         Other ->
-            case (catch couch_db:check_is_admin(Db)) of
+            case (catch fabric2_db:check_is_admin(Db)) of
             ok when Other =:= null ->
                 Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
             ok ->
@@ -647,15 +618,25 @@ before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
                     " from other users.">>})
             end
         end
-    end.
+    end,
+    case IsReplicator orelse Doc1#doc.deleted of
+        true ->
+            ok;  % If replicator or deleting don't validate doc body
+        false ->
+            % Encode as a map and normalize all field names as binaries
+            BodyStr = couch_util:json_encode(Doc1#doc.body),
+            BodyMap = couch_util:json_decode(BodyStr, [return_maps]),
+            couch_replicator_validate_doc:validate(BodyMap)
+    end,
+    Doc1.
 
 
 -spec after_doc_read(#doc{}, Db::any()) -> #doc{}.
 after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
     Doc;
 after_doc_read(#doc{body = {Body}} = Doc, Db) ->
-    #user_ctx{name = Name} = couch_db:get_user_ctx(Db),
-    case (catch couch_db:check_is_admin(Db)) of
+    #user_ctx{name = Name} = fabric2_db:get_user_ctx(Db),
+    case (catch fabric2_db:check_is_admin(Db)) of
     ok ->
         Doc;
     _ ->
@@ -663,16 +644,15 @@ after_doc_read(#doc{body = {Body}} = Doc, Db) ->
         Name ->
             Doc;
         _Other ->
-            Source = strip_credentials(couch_util:get_value(<<"source">>,
-Body)),
-            Target = strip_credentials(couch_util:get_value(<<"target">>,
-Body)),
+            Source0 = couch_util:get_value(<<"source">>, Body),
+            Target0 = couch_util:get_value(<<"target">>, Body),
+            Source = strip_credentials(Source0),
+            Target = strip_credentials(Target0),
             NewBody0 = ?replace(Body, <<"source">>, Source),
             NewBody = ?replace(NewBody0, <<"target">>, Target),
             #doc{revs = {Pos, [_ | Revs]}} = Doc,
             NewDoc = Doc#doc{body = {NewBody}, revs = {Pos - 1, Revs}},
-            NewRevId = couch_db:new_revid(NewDoc),
-            NewDoc#doc{revs = {Pos, [NewRevId | Revs]}}
+            fabric2_db:new_revid(NewDoc)
         end
     end.
 
@@ -791,26 +771,24 @@ check_strip_credentials_test() ->
 
 setup() ->
     DbName = ?tempdb(),
-    {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
-    ok = couch_db:close(Db),
-    create_vdu(DbName),
+    {ok, Db} = fabric2_db:create(DbName, [?ADMIN_CTX]),
+    create_vdu(Db),
     DbName.
 
 
 teardown(DbName) when is_binary(DbName) ->
-    couch_server:delete(DbName, [?ADMIN_CTX]),
+    fabric2_db:delete(DbName, [?ADMIN_CTX]),
     ok.
 
 
-create_vdu(DbName) ->
-    couch_util:with_db(DbName, fun(Db) ->
-        VduFun = <<"function(newdoc, olddoc, userctx) {throw({'forbidden':'fail'})}">>,
-        Doc = #doc{
-            id = <<"_design/vdu">>,
-            body = {[{<<"validate_doc_update">>, VduFun}]}
-        },
-        {ok, _} = couch_db:update_docs(Db, [Doc])
-    end).
+create_vdu(Db) ->
+    VduFun = <<"function(newdoc, olddoc, userctx) {throw({'forbidden':'fail'})}">>,
+    Doc = #doc{
+        id = <<"_design/vdu">>,
+        body = {[{<<"validate_doc_update">>, VduFun}]}
+    },
+    {ok, _} = fabric2_db:update_doc(Db, [Doc]),
+    ok.
 
 
 update_replicator_doc_with_bad_vdu_test_() ->
diff --git a/src/couch_replicator/src/couch_replicator_filters.erl b/src/couch_replicator/src/couch_replicator_filters.erl
index c898000..fe785a2 100644
--- a/src/couch_replicator/src/couch_replicator_filters.erl
+++ b/src/couch_replicator/src/couch_replicator_filters.erl
@@ -27,17 +27,17 @@
 % For `user` filter, i.e. filters specified as user code
 % in source database, this code doesn't fetch the filter
 % code, but only returns the name of the filter.
--spec parse([_]) ->
+-spec parse(#{}) ->
     {ok, nil} |
     {ok, {view, binary(), {[_]}}} |
     {ok, {user, {binary(), binary()}, {[_]}}} |
     {ok, {docids, [_]}} |
     {ok, {mango, {[_]}}} |
     {error, binary()}.
-parse(Options) ->
-    Filter = couch_util:get_value(filter, Options),
-    DocIds = couch_util:get_value(doc_ids, Options),
-    Selector = couch_util:get_value(selector, Options),
+parse(#{} = Options) ->
+    Filter = maps:get(<<"filter">>, Options, undefined),
+    DocIds = maps:get(<<"doc_ids">>, Options, undefined),
+    Selector = maps:get(<<"selector">>, Options, undefined),
     case {Filter, DocIds, Selector} of
         {undefined, undefined, undefined} ->
             {ok, nil};
@@ -88,22 +88,24 @@ fetch(DDocName, FilterName, Source) ->
 
 
 % Get replication type and view (if any) from replication document props
--spec view_type([_], [_]) ->
-    {view, {binary(), binary()}} | {db, nil} | {error, binary()}.
-view_type(Props, Options) ->
-    case couch_util:get_value(<<"filter">>, Props) of
-        <<"_view">> ->
-            {QP}  = couch_util:get_value(query_params, Options, {[]}),
-            ViewParam = couch_util:get_value(<<"view">>, QP),
-            case re:split(ViewParam, <<"/">>) of
-                [DName, ViewName] ->
-                    {view, {<< "_design/", DName/binary >>, ViewName}};
-                _ ->
-                    {error, <<"Invalid `view` parameter.">>}
-            end;
+-spec view_type(#{}, [_]) ->
+    {binary(), #{}} | {error, binary()}.
+view_type(#{<<"filter">> := <<"_view">>}, Options) ->
+    {QP}  = couch_util:get_value(query_params, Options, {[]}),
+    ViewParam = couch_util:get_value(<<"view">>, QP),
+    case re:split(ViewParam, <<"/">>) of
+        [DName, ViewName] ->
+            DDocMap = #{
+                <<"ddoc">> => <<"_design/",DName/binary>>,
+                <<"view">> => ViewName
+            },
+            {<<"view">>, DDocMap};
         _ ->
-            {db, nil}
-    end.
+            {error, <<"Invalid `view` parameter.">>}
+    end;
+
+view_type(#{}, [_] = Options) ->
+    {<<"db">>, #{}}.
 
 
 % Private functions
diff --git a/src/couch_replicator/src/couch_replicator_ids.erl b/src/couch_replicator/src/couch_replicator_ids.erl
index 04e71c3..a3f6220 100644
--- a/src/couch_replicator/src/couch_replicator_ids.erl
+++ b/src/couch_replicator/src/couch_replicator_ids.erl
@@ -30,28 +30,29 @@
 %  {filter_fetch_error, Error} exception.
 %
 
-replication_id(#rep{options = Options} = Rep) ->
+replication_id(#{<<"options">> := Options} = Rep) ->
     BaseId = replication_id(Rep, ?REP_ID_VERSION),
-    {BaseId, maybe_append_options([continuous, create_target], Options)}.
+    UseOpts = [<<"continuous">>, <<"create_target">>]
+    {BaseId, maybe_append_options(UseOpts, Options)}.
 
 
 % Versioned clauses for generating replication IDs.
 % If a change is made to how replications are identified,
 % please add a new clause and increase ?REP_ID_VERSION.
 
-replication_id(#rep{} = Rep, 4) ->
+replication_id(#{<<"source">> := Src, <<"target">> := Tgt} = Rep, 4) ->
     UUID = couch_server:get_uuid(),
-    SrcInfo = get_v4_endpoint(Rep#rep.source),
-    TgtInfo = get_v4_endpoint(Rep#rep.target),
+    SrcInfo = get_v4_endpoint(Src),
+    TgtInfo = get_v4_endpoint(Tgt),
     maybe_append_filters([UUID, SrcInfo, TgtInfo], Rep);
 
-replication_id(#rep{} = Rep, 3) ->
+replication_id(#{<<"source">> := Src0, <<"target">> := Tgt0} = Rep, 3) ->
     UUID = couch_server:get_uuid(),
-    Src = get_rep_endpoint(Rep#rep.source),
-    Tgt = get_rep_endpoint(Rep#rep.target),
+    Src = get_rep_endpoint(Src0),
+    Tgt = get_rep_endpoint(Tgt0),
     maybe_append_filters([UUID, Src, Tgt], Rep);
 
-replication_id(#rep{} = Rep, 2) ->
+replication_id(#{<<"source">> := Src0, <<"target">> := Tgt0} = Rep, 2) ->
     {ok, HostName} = inet:gethostname(),
     Port = case (catch mochiweb_socket_server:get(couch_httpd, port)) of
     P when is_number(P) ->
@@ -64,14 +65,14 @@ replication_id(#rep{} = Rep, 2) ->
         % ... mochiweb_socket_server:get(https, port)
         list_to_integer(config:get("httpd", "port", "5984"))
     end,
-    Src = get_rep_endpoint(Rep#rep.source),
-    Tgt = get_rep_endpoint(Rep#rep.target),
+    Src = get_rep_endpoint(Src0),
+    Tgt = get_rep_endpoint(Tgt0),
     maybe_append_filters([HostName, Port, Src, Tgt], Rep);
 
-replication_id(#rep{} = Rep, 1) ->
+replication_id(#{<<"source">> := Src0, <<"target">> := Tgt0} = Rep, 1) ->
     {ok, HostName} = inet:gethostname(),
-    Src = get_rep_endpoint(Rep#rep.source),
-    Tgt = get_rep_endpoint(Rep#rep.target),
+    Src = get_rep_endpoint(Src0),
+    Tgt = get_rep_endpoint(Tgt0),
     maybe_append_filters([HostName, Src, Tgt], Rep).
 
 
@@ -83,15 +84,23 @@ convert(Id0) when is_binary(Id0) ->
     % the URL path. So undo the incorrect parsing here to avoid forcing
     % users to url encode + characters.
     Id = binary:replace(Id0, <<" ">>, <<"+">>, [global]),
-    lists:splitwith(fun(Char) -> Char =/= $+ end, ?b2l(Id));
-convert({BaseId, Ext} = Id) when is_list(BaseId), is_list(Ext) ->
+    case binary:split(Id, <<"+">>) of
+        [BaseId, Ext] -> {BaseId, Ext};
+        [BaseId] -> {BaseId, <<>>}
+    end
+convert({BaseId, Ext}) when is_list(BaseId), is_list(Ext) ->
+    {list_to_binary(BaseId), list_to_binary(Ext)};
+convert({BaseId, Ext} = Id) when is_binary(BaseId), is_binary(Ext) ->
     Id.
 
 
 % Private functions
 
-maybe_append_filters(Base,
-        #rep{source = Source, options = Options}) ->
+maybe_append_filters(Base, #{} = Rep) ->
+    #{
+        <<"source">> := Source,
+        <<"options">> := Options
+    } = Rep,
     Base2 = Base ++
         case couch_replicator_filters:parse(Options) of
         {ok, nil} ->
@@ -112,7 +121,8 @@ maybe_append_filters(Base,
         {error, FilterParseError} ->
             throw({error, FilterParseError})
         end,
-    couch_util:to_hex(couch_hash:md5_hash(term_to_binary(Base2))).
+    Res = couch_util:to_hex(couch_hash:md5_hash(term_to_binary(Base2))),
+    list_to_binary(Res).
 
 
 maybe_append_options(Options, RepOptions) ->
@@ -127,12 +137,19 @@ maybe_append_options(Options, RepOptions) ->
     end, [], Options).
 
 
-get_rep_endpoint(#httpdb{url=Url, headers=Headers}) ->
+get_rep_endpoint(#{<<"url">> := Url0, <<"headers">> := Headers0}) ->
+    Url = binary_to_list(Url0),
+    % We turn headers into a proplist of string() KVs to calculate
+    % the same replication ID as CouchDB 2.x
+    Headers1 = maps:fold(fun(K, V, Acc) ->
+        [{binary_to_list(K), binary_to_list(V)} | Acc]
+    end, [], Header0),
+    Headers2 = lists:keysort(1, Headers1),
     DefaultHeaders = (#httpdb{})#httpdb.headers,
-    {remote, Url, Headers -- DefaultHeaders}.
+    {remote, Url, Headers2 -- DefaultHeaders}.
 
 
-get_v4_endpoint(#httpdb{} = HttpDb) ->
+get_v4_endpoint(#{} = HttpDb) ->
     {remote, Url, Headers} = get_rep_endpoint(HttpDb),
     {{UserFromHeaders, _}, HeadersWithoutBasicAuth} =
         couch_replicator_utils:remove_basic_auth_from_headers(Headers),
@@ -141,7 +158,6 @@ get_v4_endpoint(#httpdb{} = HttpDb) ->
     OAuth = undefined, % Keep this to ensure checkpoints don't change
     {remote, User, Host, NonDefaultPort, Path, HeadersWithoutBasicAuth, OAuth}.
 
-
 pick_defined_value(Values) ->
     case [V || V <- Values, V /= undefined] of
         [] ->
diff --git a/src/couch_replicator/src/couch_replicator_notifier.erl b/src/couch_replicator/src/couch_replicator_notifier.erl
deleted file mode 100644
index f7640a3..0000000
--- a/src/couch_replicator/src/couch_replicator_notifier.erl
+++ /dev/null
@@ -1,58 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_notifier).
-
--behaviour(gen_event).
--vsn(1).
-
-% public API
--export([start_link/1, stop/1, notify/1]).
-
-% gen_event callbacks
--export([init/1, terminate/2, code_change/3]).
--export([handle_event/2, handle_call/2, handle_info/2]).
-
--include_lib("couch/include/couch_db.hrl").
-
-start_link(FunAcc) ->
-    couch_event_sup:start_link(couch_replication,
-        {couch_replicator_notifier, make_ref()}, FunAcc).
-
-notify(Event) ->
-    gen_event:notify(couch_replication, Event).
-
-stop(Pid) ->
-    couch_event_sup:stop(Pid).
-
-
-init(FunAcc) ->
-    {ok, FunAcc}.
-
-terminate(_Reason, _State) ->
-    ok.
-
-handle_event(Event, Fun) when is_function(Fun, 1) ->
-    Fun(Event),
-    {ok, Fun};
-handle_event(Event, {Fun, Acc}) when is_function(Fun, 2) ->
-    Acc2 = Fun(Event, Acc),
-    {ok, {Fun, Acc2}}.
-
-handle_call(_Msg, State) ->
-    {ok, ok, State}.
-
-handle_info(_Msg, State) ->
-    {ok, State}.
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 0b33419..fef96dd 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -15,7 +15,7 @@
 -behaviour(gen_server).
 
 -export([
-   start_link/1
+   start_link/3
 ]).
 
 -export([
@@ -39,17 +39,16 @@
     to_binary/1
 ]).
 
--import(couch_replicator_utils, [
-    pp_rep_id/1
-]).
-
 
 -define(LOWEST_SEQ, 0).
 -define(DEFAULT_CHECKPOINT_INTERVAL, 30000).
 -define(STARTUP_JITTER_DEFAULT, 5000).
 
 -record(rep_state, {
-    rep_details,
+    job,
+    job_data,
+    id,
+    base_id,
     source_name,
     target_name,
     source,
@@ -77,31 +76,32 @@
     use_checkpoints = true,
     checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL,
     type = db,
-    view = nil
+    view = nil,
+    user = null,
+    options = #{}
 }).
 
 
-start_link(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
-    RepChildId = BaseId ++ Ext,
-    Source = couch_replicator_api_wrap:db_uri(Src),
-    Target = couch_replicator_api_wrap:db_uri(Tgt),
-    ServerName = {global, {?MODULE, Rep#rep.id}},
-
-    case gen_server:start_link(ServerName, ?MODULE, Rep, []) of
+start_link(#{] = Job, #{} = JobData) ->
+    case gen_server:start_link(?MODULE, {Job, JobData}, []) of
         {ok, Pid} ->
             {ok, Pid};
         {error, Reason} ->
-            couch_log:warning("failed to start replication `~s` (`~s` -> `~s`)",
-                           [RepChildId, Source, Target]),
+            #{?REP := Rep} = JobData,
+            {<<"id">> := Id, ?SOURCE := Src, ?TARGET := Ttg} = Rep,
+            Source = couch_replicator_api_wrap:db_uri(Src),
+            Target = couch_replicator_api_wrap:db_uri(Tgt),
+            ErrMsg = "failed to start replication `~s` (`~s` -> `~s`)",
+            couch_log:warning(ErrMsg, [RepId, Source, Target]),
             {error, Reason}
     end.
 
 
-init(InitArgs) ->
-    {ok, InitArgs, 0}.
+init({#{} = Job, #{} = JobData}) ->
+    {ok, {Job, JobData}, 0}.
 
 
-do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
+do_init(#{} = Job, #{} = JobData) ->
     process_flag(trap_exit, true),
 
     timer:sleep(startup_jitter()),
@@ -113,8 +113,12 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
         target_name = TargetName,
         start_seq = {_Ts, StartSeq},
         highest_seq_done = {_, HighestSeq},
-        checkpoint_interval = CheckpointInterval
-    } = State = init_state(Rep),
+        checkpoint_interval = CheckpointInterval,
+        user = User,
+        options = Options,
+        doc_id = DocId,
+        db_name = DbName
+    } = State = init_state(Job, JobData),
 
     NumWorkers = get_value(worker_processes, Options),
     BatchSize = get_value(worker_batch_size, Options),
@@ -145,10 +149,10 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
 
     couch_task_status:add_task([
         {type, replication},
-        {user, UserCtx#user_ctx.name},
-        {replication_id, ?l2b(BaseId ++ Ext)},
-        {database, Rep#rep.db_name},
-        {doc_id, Rep#rep.doc_id},
+        {user, User},
+        {replication_id, State#rep_state.id},
+        {database, DbName},
+        {doc_id, DocId},
         {source, ?l2b(SourceName)},
         {target, ?l2b(TargetName)},
         {continuous, get_value(continuous, Options, false)},
@@ -157,16 +161,6 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
     ] ++ rep_stats(State)),
     couch_task_status:set_update_frequency(1000),
 
-    % Until OTP R14B03:
-    %
-    % Restarting a temporary supervised child implies that the original arguments
-    % (#rep{} record) specified in the MFA component of the supervisor
-    % child spec will always be used whenever the child is restarted.
-    % This implies the same replication performance tunning parameters will
-    % always be used. The solution is to delete the child spec (see
-    % cancel_replication/1) and then start the replication again, but this is
-    % unfortunately not immune to race conditions.
-
     log_replication_start(State),
     couch_log:debug("Worker pids are: ~p", [Workers]),
 
@@ -314,9 +308,10 @@ handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
         {stop, StopReason, State2}
     end;
 
-handle_info(timeout, InitArgs) ->
-    try do_init(InitArgs) of {ok, State} ->
-        {noreply, State}
+handle_info(timeout, {#{} = Job, #{} = JobData} = InitArgs) ->
+    try do_init(Job, JobData) of
+        {ok, State} ->
+            {noreply, State}
     catch
         exit:{http_request_failed, _, _, max_backoff} ->
             {stop, {shutdown, max_backoff}, {error, InitArgs}};
@@ -331,13 +326,12 @@ handle_info(timeout, InitArgs) ->
     end.
 
 
-terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep,
-    checkpoint_history = CheckpointHistory} = State) ->
-    terminate_cleanup(State),
-    couch_replicator_notifier:notify({finished, RepId, CheckpointHistory}),
-    doc_update_completed(Rep, rep_stats(State));
+terminate(normal, #rep_state{} = State) ->
+    % Note: when terminating `normal`, the job was already marked as finished.
+    % if that fails then we'd end up in the error terminate clause
+    terminate_cleanup(State).
 
-terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
+terminate(shutdown, #rep_state{id = RepId} = State) ->
     % Replication stopped via _scheduler_sup:terminate_child/1, which can be
     % occur during regular scheduler operation or when job is removed from
     % the scheduler.
@@ -349,58 +343,57 @@ terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
             couch_log:error(LogMsg, [?MODULE, RepId, Error]),
             State
     end,
-    couch_replicator_notifier:notify({stopped, RepId, <<"stopped">>}),
+    finish_couch_job(State1, <<"stopped">>, null),
     terminate_cleanup(State1);
 
-terminate({shutdown, max_backoff}, {error, InitArgs}) ->
-    #rep{id = {BaseId, Ext} = RepId} = InitArgs,
+terminate({shutdown, max_backoff}, {error, {#{} = Job, #{} = JobData}}) ->
+    % Here we handle the case when replication fails during initialization.
+    % That is before the #rep_state{} is even built.
+    #{?REP := #{<<"id">> := RepId}} = JobData,
     couch_stats:increment_counter([couch_replicator, failed_starts]),
-    couch_log:warning("Replication `~s` reached max backoff ", [BaseId ++ Ext]),
-    couch_replicator_notifier:notify({error, RepId, max_backoff});
-
-terminate({shutdown, {error, Error}}, {error, Class, Stack, InitArgs}) ->
-    #rep{
-        id = {BaseId, Ext} = RepId,
-        source = Source0,
-        target = Target0,
-        doc_id = DocId,
-        db_name = DbName
-    } = InitArgs,
+    couch_log:warning("Replication `~s` reached max backoff ", [RepId]),
+    finish_couch_job(Job, JobData, <<"error">>, max_backoff);
+
+terminate({shutdown, {error, Error}}, {error, Class, Stack, {Job, JobData}}) ->
+    % Here we handle the case when replication fails during initialization.
+    #{?REP := Rep} = JobData,
+    #{
+       <<"id">> := Id,
+       ?SOURCE := Source0,
+       ?TARGET := Target0,
+       <<"doc_id">> := DocId,
+       <<"db_name">> := DbName
+    } = Rep,
     Source = couch_replicator_api_wrap:db_uri(Source0),
     Target = couch_replicator_api_wrap:db_uri(Target0),
-    RepIdStr = BaseId ++ Ext,
     Msg = "~p:~p: Replication ~s failed to start ~p -> ~p doc ~p:~p stack:~p",
-    couch_log:error(Msg, [Class, Error, RepIdStr, Source, Target, DbName,
+    couch_log:error(Msg, [Class, Error, RepId, Source, Target, DbName,
         DocId, Stack]),
     couch_stats:increment_counter([couch_replicator, failed_starts]),
-    couch_replicator_notifier:notify({error, RepId, Error});
+    finish_couch_job(Job, JobData, <<"error">>, Error);
 
-terminate({shutdown, max_backoff}, State) ->
+terminate({shutdown, max_backoff}, #rep_state{} = State) ->
     #rep_state{
+        id = RepId,
         source_name = Source,
         target_name = Target,
-        rep_details = #rep{id = {BaseId, Ext} = RepId}
     } = State,
     couch_log:error("Replication `~s` (`~s` -> `~s`) reached max backoff",
-        [BaseId ++ Ext, Source, Target]),
+        [RepId, Source, Target]),
     terminate_cleanup(State),
-    couch_replicator_notifier:notify({error, RepId, max_backoff});
-
-terminate({shutdown, Reason}, State) ->
-    % Unwrap so when reporting we don't have an extra {shutdown, ...} tuple
-    % wrapped around the message
-    terminate(Reason, State);
+    finish_couch_job(State, <<"error">>, max_backoff);
 
 terminate(Reason, State) ->
-#rep_state{
+    #rep_state{
+        id = RepId,
         source_name = Source,
         target_name = Target,
-        rep_details = #rep{id = {BaseId, Ext} = RepId}
     } = State,
     couch_log:error("Replication `~s` (`~s` -> `~s`) failed: ~s",
-        [BaseId ++ Ext, Source, Target, to_binary(Reason)]),
+        [RepId, Source, Target, to_binary(Reason)]),
     terminate_cleanup(State),
-    couch_replicator_notifier:notify({error, RepId, Reason}).
+    finish_couch_job(State, <<"error">>, Reason).
+
 
 terminate_cleanup(State) ->
     update_task(State),
@@ -414,22 +407,19 @@ code_change(_OldVsn, #rep_state{}=State, _Extra) ->
 
 format_status(_Opt, [_PDict, State]) ->
     #rep_state{
+       id = Id,
        source = Source,
        target = Target,
-       rep_details = RepDetails,
        start_seq = StartSeq,
        source_seq = SourceSeq,
        committed_seq = CommitedSeq,
        current_through_seq = ThroughSeq,
        highest_seq_done = HighestSeqDone,
-       session_id = SessionId
-    } = state_strip_creds(State),
-    #rep{
-       id = RepId,
-       options = Options,
+       session_id = SessionId,
        doc_id = DocId,
-       db_name = DbName
-    } = RepDetails,
+       db_name = DbName,
+       options = Options
+    } = state_strip_creds(State),
     [
         {rep_id, RepId},
         {source, couch_replicator_api_wrap:db_uri(Source)},
@@ -473,73 +463,108 @@ httpdb_strip_creds(LocalDb) ->
     LocalDb.
 
 
-rep_strip_creds(#rep{source = Source, target = Target} = Rep) ->
-    Rep#rep{
-        source = httpdb_strip_creds(Source),
-        target = httpdb_strip_creds(Target)
-    }.
-
-
-state_strip_creds(#rep_state{rep_details = Rep, source = Source, target = Target} = State) ->
-    % #rep_state contains the source and target at the top level and also
-    % in the nested #rep_details record
+state_strip_creds(#rep_state{source = Source, target = Target} = State) ->
     State#rep_state{
-        rep_details = rep_strip_creds(Rep),
         source = httpdb_strip_creds(Source),
         target = httpdb_strip_creds(Target)
     }.
 
 
-adjust_maxconn(Src = #httpdb{http_connections = 1}, RepId) ->
+adjust_maxconn(Src = #{<<"http_connections">> : = 1}, RepId) ->
     Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p",
     couch_log:notice(Msg, [RepId]),
-    Src#httpdb{http_connections = 2};
+    Src#{<<"http_connections">> := 2};
 adjust_maxconn(Src, _RepId) ->
     Src.
 
 
--spec doc_update_triggered(#rep{}) -> ok.
-doc_update_triggered(#rep{db_name = null}) ->
+-spec doc_update_triggered(#rep_state{}) -> ok.
+doc_update_triggered(#rep_state{db_name = null}) ->
     ok;
-doc_update_triggered(#rep{id = RepId, doc_id = DocId} = Rep) ->
+doc_update_triggered(#rep_state{} = State) ->
+    #rep_state{id = Id, doc_id = DocId, db_name = DbName} = State,
     case couch_replicator_doc_processor:update_docs() of
         true ->
-            couch_replicator_docs:update_triggered(Rep, RepId);
+            couch_replicator_docs:update_triggered(Id, DocId, DbName);
         false ->
             ok
     end,
-    couch_log:notice("Document `~s` triggered replication `~s`",
-        [DocId, pp_rep_id(RepId)]),
+    couch_log:notice("Document `~s` triggered replication `~s`", [DocId, Id]),
     ok.
 
 
--spec doc_update_completed(#rep{}, list()) -> ok.
-doc_update_completed(#rep{db_name = null}, _Stats) ->
+-spec doc_update_completed(#rep_state{}) -> ok.
+doc_update_completed(#rep_state{db_name = null}) ->
     ok;
-doc_update_completed(#rep{id = RepId, doc_id = DocId, db_name = DbName,
-    start_time = StartTime}, Stats0) ->
-    Stats = Stats0 ++ [{start_time, couch_replicator_utils:iso8601(StartTime)}],
+doc_update_completed(#rep_state{} = State) ->
+    #rep_state{
+        id = Id,
+        doc_id = DocId,
+        db_name = DbName,
+        start_time = Start,
+        stats = Stats0
+    } = State,
+    Stats = Stats0 ++ [{start_time, couch_replicator_utils:iso8601(Start)}],
     couch_replicator_docs:update_doc_completed(DbName, DocId, Stats),
-    couch_log:notice("Replication `~s` completed (triggered by `~s`)",
-        [pp_rep_id(RepId), DocId]),
+    couch_log:notice("Replication `~s` completed (triggered by `~s:~s`)",
+        [Id, DbName, DocId]),
     ok.
 
 
 do_last_checkpoint(#rep_state{seqs_in_progress = [],
     highest_seq_done = {_Ts, ?LOWEST_SEQ}} = State) ->
-    {stop, normal, cancel_timer(State)};
+    History = State#rep_state.checkoint_history,
+    Result = case finish_couch_job(State, <<"completed">>, History) of
+        ok -> normal;
+        {error, _} = Error -> Error
+    end,
+    {stop, Result, cancel_timer(State)};
 do_last_checkpoint(#rep_state{seqs_in_progress = [],
     highest_seq_done = Seq} = State) ->
     case do_checkpoint(State#rep_state{current_through_seq = Seq}) of
     {ok, NewState} ->
         couch_stats:increment_counter([couch_replicator, checkpoints, success]),
-        {stop, normal, cancel_timer(NewState)};
+        History = NewState#rep_state.checkpoint_history,
+        Result = case finish_couch_job(NewState, <<"completed">>, History) of
+            ok -> normal;
+            {error, _} = Error -> Error
+        end,
+        {stop, Result, cancel_timer(NewState)};
     Error ->
         couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
         {stop, Error, State}
     end.
 
 
+finish_couch_job(#rep_state{} = State, FinishedState, Result) ->
+    #rep_state{job = Job, job_data = Jobdata} = State,
+    finish_couch_job(Job, JobData, FinishedState, Result).
+
+
+finish_couch_job(#{} = Job, #{} = JobData, FinishState, Result0) ->
+    #{?REP := #{<<"id">> := RepId}} = JobData,
+    case Result of
+        null -> null;
+        #{} -> Result0;
+        <<_/binary>> -> Result0;
+        Atom when is_atom(Atom) -> atom_to_binary(Atom, utf8)
+        Other -> couch_replicator_utils:rep_error_to_binary(Result0)
+    end,
+    JobData= JobData0#{
+        ?FINISHED_STATE => FinishState,
+        ?FINISHED_RESULT => Result
+    },
+    case couch_jobs:finish(undefined, Job, JobData) of
+        ok ->
+            doc_update_completed(State),
+            ok;
+        {error, Error} ->
+            Msg = "Replication ~s job could not finish. Error:~p",
+            couch_log:error(Msg, [RepId, Error]),
+            {error, Error}
+    end.
+
+
 start_timer(State) ->
     After = State#rep_state.checkpoint_interval,
     case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of
@@ -558,21 +583,31 @@ cancel_timer(#rep_state{timer = Timer} = State) ->
     State#rep_state{timer = nil}.
 
 
-init_state(Rep) ->
-    #rep{
-        id = {BaseId, _Ext},
-        source = Src0, target = Tgt,
-        options = Options,
-        type = Type, view = View,
-        start_time = StartTime,
-        stats = ArgStats0
+init_state(#{} = Job, #{?REP =: Rep}} = JobData) ->
+    #{
+        <<"id">> := Id,
+        <<"base_id">> := BaseId,
+        ?SOURCE := Src0,
+        ?TARGET := Tgt,
+        <<"type">> := Type,
+        <<"view">> := View,
+        <<"start_time">> := StartTime,
+        <<"stats">> := ArgStats0,
+        <<"options">> := OptionsMap,
+        <<"db_name">> := DbName,
+        <<"doc_id">> := DocId,
     } = Rep,
+
+    Options = maps:fold(fun(K, V, Acc) ->
+        [{binary_to_atom(K, utf8), V} | Acc]
+    end, [], OptionsMap),
+
     % Adjust minimum number of http source connections to 2 to avoid deadlock
     Src = adjust_maxconn(Src0, BaseId),
     {ok, Source} = couch_replicator_api_wrap:db_open(Src),
-    {CreateTargetParams} = get_value(create_target_params, Options, {[]}),
-    {ok, Target} = couch_replicator_api_wrap:db_open(Tgt,
-        get_value(create_target, Options, false), CreateTargetParams),
+    CreateTgt = get_value(create_target, Options, false),
+    TParams = maps:to_list(get_value(create_target_params, Options, #{}),
+    {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, CreateTgt, TParams),
 
     {ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source),
     {ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target),
@@ -595,7 +630,10 @@ init_state(Rep) ->
 
     #doc{body={CheckpointHistory}} = SourceLog,
     State = #rep_state{
-        rep_details = Rep,
+        job = Job,
+        job_data = JobData,
+        id = Id,
+        base_id = BaseId,
         source_name = couch_replicator_api_wrap:db_uri(Source),
         target_name = couch_replicator_api_wrap:db_uri(Target),
         source = Source,
@@ -612,25 +650,26 @@ init_state(Rep) ->
         tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
         session_id = couch_uuids:random(),
         source_seq = SourceSeq,
-        use_checkpoints = get_value(use_checkpoints, Options, true),
-        checkpoint_interval = get_value(checkpoint_interval, Options,
-                                        ?DEFAULT_CHECKPOINT_INTERVAL),
+        use_checkpoints = get_value(use_checkpoints, Options),
+        checkpoint_interval = get_value(checkpoint_interval, Options),
         type = Type,
         view = View,
         stats = Stats
+        doc_id = DocId,
+        db_name = DbName
     },
     State#rep_state{timer = start_timer(State)}.
 
 
-find_and_migrate_logs(DbList, #rep{id = {BaseId, _}} = Rep) ->
+find_and_migrate_logs(DbList, #{<<"base_id">> := BaseId} = Rep) ->
     LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId),
-    fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, Rep, []).
+    fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, State, []).
 
 
 fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) ->
     lists:reverse(Acc);
 
-fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
+fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, #{} = Rep, Acc) ->
     case couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body]) of
     {error, <<"not_found">>} when Vsn > 1 ->
         OldRepId = couch_replicator_utils:replication_id(Rep, Vsn - 1),
@@ -650,8 +689,8 @@ fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
     end.
 
 
-maybe_save_migrated_log(Rep, Db, #doc{} = Doc, OldId) ->
-    case get_value(use_checkpoints, Rep#rep.options, true) of
+maybe_save_migrated_log(#{<<"options">> = Options}, Db, #doc{} = Doc, OldId) ->
+    case maps:get(<<"use_checkpoints">>, Options) of
         true ->
             update_checkpoint(Db, Doc),
             Msg = "Migrated replication checkpoint. Db:~p ~p -> ~p",
@@ -714,7 +753,7 @@ do_checkpoint(State) ->
         src_starttime = SrcInstanceStartTime,
         tgt_starttime = TgtInstanceStartTime,
         stats = Stats,
-        rep_details = #rep{options = Options},
+        options = Options,
         session_id = SessionId
     } = State,
     case commit_to_both(Source, Target) of
@@ -922,9 +961,8 @@ has_session_id(SessionId, [{Props} | Rest]) ->
     end.
 
 
-get_pending_count(St) ->
-    Rep = St#rep_state.rep_details,
-    Timeout = get_value(connection_timeout, Rep#rep.options),
+get_pending_count(#rep_state{options = Options} = St) ->
+    Timeout = get_value(connection_timeout, Options),
     TimeoutMicro = Timeout * 1000,
     case get(pending_count_state) of
         {LastUpdate, PendingCount} ->
@@ -1005,24 +1043,21 @@ replication_start_error(Error) ->
     Error.
 
 
-log_replication_start(#rep_state{rep_details = Rep} = RepState) ->
-    #rep{
-       id = {BaseId, Ext},
-       doc_id = DocId,
-       db_name = DbName,
-       options = Options
-    } = Rep,
-    Id = BaseId ++ Ext,
-    Workers = get_value(worker_processes, Options),
-    BatchSize = get_value(worker_batch_size, Options),
+log_replication_start(#rep_state{} = RepState) ->
     #rep_state{
-       source_name = Source,  % credentials already stripped
-       target_name = Target,  % credentials already stripped
-       session_id = Sid
+        id = Id,
+        doc_id = DocId,
+        db_name = DbName,
+        options = Options,
+        source_name = Source,
+        target_name = Target,
+        session_id = Sid,
     } = RepState,
+    Workers = get_value(worker_processes, Options),
+    BatchSize = get_value(worker_batch_size, Options),
     From = case DbName of
-        ShardName when is_binary(ShardName) ->
-            io_lib:format("from doc ~s:~s", [mem3:dbname(ShardName), DocId]);
+        Name when is_binary(Name) ->
+            io_lib:format("from doc ~s:~s", [Name, DocId]);
         _ ->
             "from _replicate endpoint"
     end,
@@ -1055,14 +1090,13 @@ scheduler_job_format_status_test() ->
     Target = <<"http://u:p@h2/d2">>,
     Rep = #rep{
         id = {"base", "+ext"},
-        source = couch_replicator_docs:parse_rep_db(Source, [], []),
-        target = couch_replicator_docs:parse_rep_db(Target, [], []),
+        source = couch_replicator_docs:parse_rep_db(Source, #{}, #{}),
+        target = couch_replicator_docs:parse_rep_db(Target, #{}, #{}),
         options = [{create_target, true}],
         doc_id = <<"mydoc">>,
         db_name = <<"mydb">>
     },
     State = #rep_state{
-        rep_details = Rep,
         source = Rep#rep.source,
         target = Rep#rep.target,
         session_id = <<"a">>,
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_sup.erl b/src/couch_replicator/src/couch_replicator_scheduler_sup.erl
index 8ab55f8..3ea9dff 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_sup.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_sup.erl
@@ -17,7 +17,7 @@
 %% public api
 -export([
     start_link/0,
-    start_child/1,
+    start_child/2,
     terminate_child/1
 ]).
 
@@ -37,8 +37,8 @@ start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 
-start_child(#rep{} = Rep) ->
-    supervisor:start_child(?MODULE, [Rep]).
+start_child(#{} = Job, #{} = Rep) ->
+    supervisor:start_child(?MODULE, [Job, Rep]).
 
 
 terminate_child(Pid) ->
diff --git a/src/couch_replicator/src/couch_replicator_sup.erl b/src/couch_replicator/src/couch_replicator_sup.erl
index cd4512c..b86529f 100644
--- a/src/couch_replicator/src/couch_replicator_sup.erl
+++ b/src/couch_replicator/src/couch_replicator_sup.erl
@@ -20,18 +20,6 @@ start_link() ->
 
 init(_Args) ->
     Children = [
-        {couch_replication_event,
-            {gen_event, start_link, [{local, couch_replication}]},
-            permanent,
-            brutal_kill,
-            worker,
-            dynamic},
-       {couch_replicator_clustering,
-            {couch_replicator_clustering, start_link, []},
-            permanent,
-            brutal_kill,
-            worker,
-            [couch_replicator_clustering]},
        {couch_replicator_connection,
             {couch_replicator_connection, start_link, []},
             permanent,
@@ -62,11 +50,14 @@ init(_Args) ->
             brutal_kill,
             worker,
             [couch_replicator_doc_processor]},
-        {couch_replicator_db_changes,
-            {couch_replicator_db_changes, start_link, []},
-            permanent,
+        {couch_replicator,
+            % This is a simple function call which does not create a process
+            % but returns `ignore`. It is used to make sure each node
+            % has a local `_replicator` database.
+            {couch_replicator, ensure_rep_db_exists, []},
+            transient,
             brutal_kill,
             worker,
-            [couch_multidb_changes]}
+            [couch_replicator]}
     ],
     {ok, {{rest_for_one,10,1}, Children}}.
diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl
index 5f608de..b71ffeb 100644
--- a/src/couch_replicator/src/couch_replicator_utils.erl
+++ b/src/couch_replicator/src/couch_replicator_utils.erl
@@ -20,12 +20,11 @@
    rep_error_to_binary/1,
    get_json_value/2,
    get_json_value/3,
-   pp_rep_id/1,
    iso8601/1,
    filter_state/3,
    remove_basic_auth_from_headers/1,
    normalize_rep/1,
-   ejson_state_info/1
+   default_headers_map/0
 ]).
 
 
@@ -75,14 +74,6 @@ get_json_value(Key, Props, Default) when is_binary(Key) ->
     end.
 
 
-% pretty-print replication id
--spec pp_rep_id(#rep{} | rep_id()) -> string().
-pp_rep_id(#rep{id = RepId}) ->
-    pp_rep_id(RepId);
-pp_rep_id({Base, Extension}) ->
-    Base ++ Extension.
-
-
 % NV: TODO: this function is not used outside api wrap module
 % consider moving it there during final cleanup
 is_deleted(Change) ->
@@ -103,8 +94,13 @@ parse_rep_doc(Props, UserCtx) ->
     couch_replicator_docs:parse_rep_doc(Props, UserCtx).
 
 
--spec iso8601(erlang:timestamp()) -> binary().
-iso8601({_Mega, _Sec, _Micro} = Timestamp) ->
+-spec iso8601(integer()) -> binary().
+iso8601(Native) when is_integer(Native) ->
+    ErlangSystemTime = erlang:convert_time_unit(Native, native, microsecond),
+    MegaSecs = ErlangSystemTime div 1000000000000,
+    Secs = ErlangSystemTime div 1000000 - MegaSecs * 1000000,
+    MicroSecs = ErlangSystemTime rem 1000000,
+    {MegaSecs, Secs, MicroSecs}.
     {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time(Timestamp),
     Format = "~B-~2..0B-~2..0BT~2..0B:~2..0B:~2..0BZ",
     iolist_to_binary(io_lib:format(Format, [Y, Mon, D, H, Min, S])).
@@ -158,37 +154,37 @@ decode_basic_creds(Base64) ->
     end.
 
 
-% Normalize a #rep{} record such that it doesn't contain time dependent fields
+% Normalize a rep map such that it doesn't contain time dependent fields
 % pids (like httpc pools), and options / props are sorted. This function would
 % used during comparisons.
--spec normalize_rep(#rep{} | nil) -> #rep{} | nil.
-normalize_rep(nil) ->
-    nil;
-
-normalize_rep(#rep{} = Rep)->
-    #rep{
-        source = couch_replicator_api_wrap:normalize_db(Rep#rep.source),
-        target = couch_replicator_api_wrap:normalize_db(Rep#rep.target),
-        options = Rep#rep.options,  % already sorted in make_options/1
-        type = Rep#rep.type,
-        view = Rep#rep.view,
-        doc_id = Rep#rep.doc_id,
-        db_name = Rep#rep.db_name
+-spec normalize_rep(#{} | null) -> #{} | null.
+normalize_rep(null) ->
+    null;
+
+normalize_rep(#{} = Rep)->
+    Ks = [<<"options">>, <<"type">>, <<"view">>, <<"doc_id">>, <<"db_name">>],
+    Rep1 = maps:with(Ks, Rep),
+    #{<<"source">> := Source, <<"target">> := Target} = Rep,
+    Rep1#{
+        <<"source">> => normalize_endpoint(Source),
+        <<"target">> => normalize_endpoint(Target)
     }.
 
 
--spec ejson_state_info(binary() | nil) -> binary() | null.
-ejson_state_info(nil) ->
-    null;
-ejson_state_info(Info) when is_binary(Info) ->
-    {[{<<"error">>, Info}]};
-ejson_state_info([]) ->
-    null;  % Status not set yet => null for compatibility reasons
-ejson_state_info([{_, _} | _] = Info) ->
-    {Info};
-ejson_state_info(Info) ->
-    ErrMsg = couch_replicator_utils:rep_error_to_binary(Info),
-    {[{<<"error">>, ErrMsg}]}.
+normalize_endpoint(<<DbName/binary>>) ->
+    DbName;
+
+normalize_endpoint(#{} = Endpoint) ->
+    Ks = [<<"url">>, <<"auth_props">>, <<"headers">>, <<"timeout">>,
+        <<"ibrowse_options">>, <<"retries">>, <<"http_connections">>
+    ],
+    maps:with(Ks, Endpoint).
+
+
+get_default_headers() ->
+    lists:foldl(fun({K, V}, Acc) ->
+        Acc#{list_to_binary(K) => list_to_binary(V)}
+    end, #{}, (#httpdb{})#httpdb.headers).
 
 
 -ifdef(TEST).
@@ -269,4 +265,23 @@ normalize_rep_test_() ->
         end)
     }.
 
+
+normalize_endpoint() ->
+    HttpDb =  #httpdb{
+        url = "http://host/db",
+        auth_props = [{"key", "val"}],
+        headers = [{"k2","v2"}, {"k1","v1"}],
+        timeout = 30000,
+        ibrowse_options = [{k2, v2}, {k1, v1}],
+        retries = 10,
+        http_connections = 20
+    },
+    Expected = HttpDb#httpdb{
+        headers = [{"k1","v1"}, {"k2","v2"}],
+        ibrowse_options = [{k1, v1}, {k2, v2}]
+    },
+    ?assertEqual(Expected, normalize_db(HttpDb)),
+    ?assertEqual(<<"local">>, normalize_db(<<"local">>)).
+
+
 -endif.
diff --git a/src/couch_replicator/src/couch_replicator_validate_doc.erl b/src/couch_replicator/src/couch_replicator_validate_doc.erl
new file mode 100644
index 0000000..da14361
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_validate_doc.erl
@@ -0,0 +1,119 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_validate_doc).
+
+
+-include("couch_replicator.hrl").
+
+
+-export([
+    validate/2
+]).
+
+
+validate(#{<<"_replication_state">> := ?ST_FAILED}) ->
+    % If replication failed, allow updating even a malformed document
+    ok;
+
+validate(#{?SOURCE := _, ?TARGET := _} = Doc) ->
+    maps:fold(fun validate_field/2, Doc).
+    validate_mutually_exclusive_filters(Doc);
+
+validate(_) ->
+    fail("Both `source` and `target` fields must exist").
+
+
+validate_field(?SOURCE, V) when is_binary(V) ->
+    ok;
+validate_field(?SOURCE, #{?URL := _} = V) ->
+    maps:fold(fun validate_endpoint_field/2, V);
+validate_field(?SOURCE, _V) ->
+    fail("`source` must be a string or an object with an `url` field").
+
+validate_field(?TARGET, V) when is_binary(V) ->
+    ok;
+validate_field(?TARGET, #{?URL := _} = V) ->
+    maps:fold(fun validate_endpoint_field/2, V);
+validate_field(?TARGET, _V) ->
+    fail("`target` must be a string or an object with an `url` field").
+
+validate_field(?CONTINUOUS, V) when is_boolean(V) ->
+    ok;
+validate_field(?CONTINUOUS, _V) ->
+    fail("`continuous` should be a boolean");
+
+validate_field(?CREATE_TARGET, V) when is_boolean(V) ->
+    ok;
+validate_field(?CREATE_TARGET, _V) ->
+    fail("`create_target` should be a boolean");
+
+validate_field(?DOC_IDS, V) when is_list(V) ->
+    lists:foreach(fun(DocId) ->
+        case is_binary(DocId), byte_size(V) > 1 of
+            true -> ok;
+            false -> fail("`doc_ids` should be a list of strings")
+        end
+    end, V);
+validate_field(?DOC_IDS, _V) ->
+    fail("`doc_ids` should be a list of string");
+
+validate_field(?SELECTOR, #{} = _V) ->
+    ok;
+validate_field(?SELECTOR, _V) ->
+    fail("`selector` should be an object");
+
+validate_field(?FILTER, V) when is_binary(V), byte_size(V) > 1 ->
+    ok;
+validate_field(?FILTER, _V) ->
+    fail("`filter` should be a non-empty string");
+
+validate_field(?QUERY_PARAMS, V) when is_map(V) orelse V =:= null ->
+    ok;
+validate_field(?QUERY_PARAMS, _V) ->
+    fail("`query_params` should be a an object or `null`").
+
+
+validate_endpoint_field(?URL, V) when is_binary(V) ->
+    ok;
+validate_endpoint_field(?URL, _V) ->
+    fail("`url` endpoint field must be a string");
+
+validate_endpoint_field(?AUTH, #{} = V) ->
+    ok;
+validate_endpoint_field(?AUTH, _V) ->
+    fail("`auth` endpoint field must be an object");
+
+validate_endpoint_field(?HEADERS, #{} = V) ->
+    ok;
+validate_endpoint_field(?HEADERS, _V) ->
+    fail("`headers` endpoint field must be an object").
+
+
+validate_mutually_exclusive_filter(#{} = Doc) ->
+    DocIds = maps:get(?DOC_IDS, Doc, undefined),
+    Selector = maps:get(?SELECTOR, Doc, undefined),
+    Filter = maps:get(?FILTER, Doc, undefined),
+    Defined = [V || V <- [DocIds, Selector, Filter], V =/= undefined],
+    case length(Defined) > 1 of
+        true ->
+            fail("`doc_ids`, `selector` and `filter` are mutually exclusive");
+        false ->
+            ok
+    end.
+
+
+fail(Msg) when is_list(Msg) ->
+    fail(list_to_binary(Msg));
+
+fail(Msg) when is_binary(Msg) ->
+    throw({forbidden, Msg}).
diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
deleted file mode 100644
index 6b4f95c..0000000
--- a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
+++ /dev/null
@@ -1,271 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_error_reporting_tests).
-
--include_lib("couch/include/couch_eunit.hrl").
--include_lib("couch/include/couch_db.hrl").
--include_lib("couch_replicator/src/couch_replicator.hrl").
-
-
-setup_all() ->
-    test_util:start_couch([couch_replicator, chttpd, mem3, fabric]).
-
-
-teardown_all(Ctx) ->
-    ok = test_util:stop_couch(Ctx).
-
-
-setup() ->
-    meck:unload(),
-    Source = setup_db(),
-    Target = setup_db(),
-    {Source, Target}.
-
-
-teardown({Source, Target}) ->
-    meck:unload(),
-    teardown_db(Source),
-    teardown_db(Target),
-    ok.
-
-
-error_reporting_test_() ->
-    {
-        setup,
-        fun setup_all/0,
-        fun teardown_all/1,
-        {
-            foreach,
-            fun setup/0,
-            fun teardown/1,
-            [
-                fun t_fail_bulk_docs/1,
-                fun t_fail_changes_reader/1,
-                fun t_fail_revs_diff/1,
-                fun t_fail_changes_queue/1,
-                fun t_fail_changes_manager/1,
-                fun t_fail_changes_reader_proc/1
-            ]
-        }
-    }.
-
-
-t_fail_bulk_docs({Source, Target}) ->
-    ?_test(begin
-        populate_db(Source, 1, 5),
-        {ok, RepId} = replicate(Source, Target),
-        wait_target_in_sync(Source, Target),
-
-        {ok, Listener} = rep_result_listener(RepId),
-        mock_fail_req("/_bulk_docs", {ok, "403", [], [<<"{\"x\":\"y\"}">>]}),
-        populate_db(Source, 6, 6),
-
-        {error, Result} = wait_rep_result(RepId),
-        ?assertEqual({bulk_docs_failed, 403, {[{<<"x">>, <<"y">>}]}}, Result),
-
-        couch_replicator_notifier:stop(Listener)
-    end).
-
-
-t_fail_changes_reader({Source, Target}) ->
-    ?_test(begin
-        populate_db(Source, 1, 5),
-        {ok, RepId} = replicate(Source, Target),
-        wait_target_in_sync(Source, Target),
-
-        {ok, Listener} = rep_result_listener(RepId),
-        mock_fail_req("/_changes", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}),
-        populate_db(Source, 6, 6),
-
-        {error, Result} = wait_rep_result(RepId),
-        ?assertEqual({changes_req_failed, 418, {[{<<"x">>, <<"y">>}]}}, Result),
-
-        couch_replicator_notifier:stop(Listener)
-    end).
-
-
-t_fail_revs_diff({Source, Target}) ->
-    ?_test(begin
-        populate_db(Source, 1, 5),
-        {ok, RepId} = replicate(Source, Target),
-        wait_target_in_sync(Source, Target),
-
-        {ok, Listener} = rep_result_listener(RepId),
-        mock_fail_req("/_revs_diff", {ok, "407", [], [<<"{\"x\":\"y\"}">>]}),
-        populate_db(Source, 6, 6),
-
-        {error, Result} = wait_rep_result(RepId),
-        ?assertEqual({revs_diff_failed, 407, {[{<<"x">>, <<"y">>}]}}, Result),
-
-        couch_replicator_notifier:stop(Listener)
-    end).
-
-
-t_fail_changes_queue({Source, Target}) ->
-    ?_test(begin
-        populate_db(Source, 1, 5),
-        {ok, RepId} = replicate(Source, Target),
-        wait_target_in_sync(Source, Target),
-
-        RepPid = couch_replicator_test_helper:get_pid(RepId),
-        State = sys:get_state(RepPid),
-        ChangesQueue = element(20, State),
-        ?assert(is_process_alive(ChangesQueue)),
-
-        {ok, Listener} = rep_result_listener(RepId),
-        exit(ChangesQueue, boom),
-
-        {error, Result} = wait_rep_result(RepId),
-        ?assertEqual({changes_queue_died, boom}, Result),
-        couch_replicator_notifier:stop(Listener)
-    end).
-
-
-t_fail_changes_manager({Source, Target}) ->
-    ?_test(begin
-        populate_db(Source, 1, 5),
-        {ok, RepId} = replicate(Source, Target),
-        wait_target_in_sync(Source, Target),
-
-        RepPid = couch_replicator_test_helper:get_pid(RepId),
-        State = sys:get_state(RepPid),
-        ChangesManager = element(21, State),
-        ?assert(is_process_alive(ChangesManager)),
-
-        {ok, Listener} = rep_result_listener(RepId),
-        exit(ChangesManager, bam),
-
-        {error, Result} = wait_rep_result(RepId),
-        ?assertEqual({changes_manager_died, bam}, Result),
-        couch_replicator_notifier:stop(Listener)
-    end).
-
-
-t_fail_changes_reader_proc({Source, Target}) ->
-    ?_test(begin
-        populate_db(Source, 1, 5),
-        {ok, RepId} = replicate(Source, Target),
-        wait_target_in_sync(Source, Target),
-
-        RepPid = couch_replicator_test_helper:get_pid(RepId),
-        State = sys:get_state(RepPid),
-        ChangesReader = element(22, State),
-        ?assert(is_process_alive(ChangesReader)),
-
-        {ok, Listener} = rep_result_listener(RepId),
-        exit(ChangesReader, kapow),
-
-        {error, Result} = wait_rep_result(RepId),
-        ?assertEqual({changes_reader_died, kapow}, Result),
-        couch_replicator_notifier:stop(Listener)
-    end).
-
-
-mock_fail_req(Path, Return) ->
-    meck:expect(ibrowse, send_req_direct,
-        fun(W, Url, Headers, Meth, Body, Opts, TOut) ->
-            Args = [W, Url, Headers, Meth, Body, Opts, TOut],
-            {ok, {_, _, _, _, UPath, _}} = http_uri:parse(Url),
-            case lists:suffix(Path, UPath) of
-                true -> Return;
-                false -> meck:passthrough(Args)
-            end
-        end).
-
-
-rep_result_listener(RepId) ->
-    ReplyTo = self(),
-    {ok, _Listener} = couch_replicator_notifier:start_link(
-        fun({_, RepId2, _} = Ev) when RepId2 =:= RepId ->
-                ReplyTo ! Ev;
-            (_) ->
-                ok
-        end).
-
-
-wait_rep_result(RepId) ->
-    receive
-        {finished, RepId, RepResult} -> {ok, RepResult};
-        {error, RepId, Reason} -> {error, Reason}
-    end.
-
-
-
-setup_db() ->
-    DbName = ?tempdb(),
-    {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
-    ok = couch_db:close(Db),
-    DbName.
-
-
-teardown_db(DbName) ->
-    ok = couch_server:delete(DbName, [?ADMIN_CTX]).
-
-
-populate_db(DbName, Start, End) ->
-    {ok, Db} = couch_db:open_int(DbName, []),
-    Docs = lists:foldl(
-        fun(DocIdCounter, Acc) ->
-            Id = integer_to_binary(DocIdCounter),
-            Doc = #doc{id = Id, body = {[]}},
-            [Doc | Acc]
-        end,
-        [], lists:seq(Start, End)),
-    {ok, _} = couch_db:update_docs(Db, Docs, []),
-    ok = couch_db:close(Db).
-
-
-wait_target_in_sync(Source, Target) ->
-    {ok, SourceDb} = couch_db:open_int(Source, []),
-    {ok, SourceInfo} = couch_db:get_db_info(SourceDb),
-    ok = couch_db:close(SourceDb),
-    SourceDocCount = couch_util:get_value(doc_count, SourceInfo),
-    wait_target_in_sync_loop(SourceDocCount, Target, 300).
-
-
-wait_target_in_sync_loop(_DocCount, _TargetName, 0) ->
-    erlang:error({assertion_failed, [
-          {module, ?MODULE}, {line, ?LINE},
-          {reason, "Could not get source and target databases in sync"}
-    ]});
-
-wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
-    {ok, Target} = couch_db:open_int(TargetName, []),
-    {ok, TargetInfo} = couch_db:get_db_info(Target),
-    ok = couch_db:close(Target),
-    TargetDocCount = couch_util:get_value(doc_count, TargetInfo),
-    case TargetDocCount == DocCount of
-        true ->
-            true;
-        false ->
-            ok = timer:sleep(500),
-            wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1)
-    end.
-
-
-replicate(Source, Target) ->
-    SrcUrl = couch_replicator_test_helper:db_url(Source),
-    TgtUrl = couch_replicator_test_helper:db_url(Target),
-    RepObject = {[
-        {<<"source">>, SrcUrl},
-        {<<"target">>, TgtUrl},
-        {<<"continuous">>, true},
-        {<<"worker_processes">>, 1},
-        {<<"retries_per_request">>, 1},
-        % Low connection timeout so _changes feed gets restarted quicker
-        {<<"connection_timeout">>, 3000}
-    ]},
-    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
-    ok = couch_replicator_scheduler:add_job(Rep),
-    couch_replicator_scheduler:reschedule(),
-    {ok, Rep#rep.id}.