You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2020/07/08 22:28:25 UTC

[couchdb] branch prototype/fdb-replicator-2 created (now 159f680)

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a change to branch prototype/fdb-replicator-2
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


      at 159f680  WIP 5

This branch includes the following new commits:

     new bab8763  FDB Replicator WIP
     new 453787f  WIP 4
     new 159f680  WIP 5

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[couchdb] 02/03: WIP 4

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch prototype/fdb-replicator-2
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 453787f12a0ac1e6388dd91c6bff1e9af68729fb
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Tue Aug 6 12:58:55 2019 -0400

    WIP 4
    
     * Replaced VDU with a simpler Erlang module
       - Avoid conflict handling, updates etc
     * Don't auto-create _replicator dbs
     * Doc job acceptor - max acceptors vs max jobs
---
 src/couch_replicator/src/couch_replicator.erl      |  11 +-
 src/couch_replicator/src/couch_replicator.hrl      |  65 +++--
 .../src/couch_replicator_acceptor.erl              |  49 ++++
 .../src/couch_replicator_doc_processor.erl         | 286 ++++++++++++++++-----
 src/couch_replicator/src/couch_replicator_docs.erl |  87 ++-----
 .../src/couch_replicator_js_functions.hrl          | 177 -------------
 .../src/couch_replicator_scheduler_job.erl         |  33 +--
 .../src/couch_replicator_validate_doc.erl          | 119 +++++++++
 src/fabric/src/fabric2_db.erl                      |   2 +-
 9 files changed, 474 insertions(+), 355 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index 225a258..262aacc 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -52,8 +52,8 @@
     {ok, {cancelled, binary()}} |
     {error, any()} |
     no_return().
-replicate(PostBody, UserCtx) ->
-    {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, UserCtx),
+replicate(PostBody, #user_ctx{name = UserName}) ->
+    {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, UserName),
     Rep = Rep0#{<<"start_time">> => erlang:system_time()},
     #{<<"id">> := RepId, <<"options">> := Options} = Rep,
     case maps:get(<<"cancel">>, Options, false) of
@@ -80,9 +80,14 @@ replicate(PostBody, UserCtx) ->
 % it returns `ignore`.
 -spec ensure_rep_db_exists() -> ignore.
 ensure_rep_db_exists() ->
-    ok = couch_replicator_docs:ensure_rep_db_exists(),
     couch_jobs:set_type_timeout(?REP_DOCS, ?REP_DOCS_TIMEOUT_MSEC),
     couch_jobs:set_type_timeout(?REP_JOBS, ?REP_JOBS_TIMEOUT_MSEC),
+    case config:get_boolean("replicator", "create_replicator_db", false) of
+        true ->
+            ok = couch_replicator_docs:ensure_rep_db_exists();
+        false ->
+            ok
+    end,
     ignore.
 
 
diff --git a/src/couch_replicator/src/couch_replicator.hrl b/src/couch_replicator/src/couch_replicator.hrl
index 6584078..050fcb4 100644
--- a/src/couch_replicator/src/couch_replicator.hrl
+++ b/src/couch_replicator/src/couch_replicator.hrl
@@ -11,38 +11,53 @@
 % the License.
 
 -define(REP_ID_VERSION, 4).
+
+% Couch jobs types and timeouts
 -define(REP_DOCS, <<"repdocs">>).
 -define(REP_JOBS, <<"repjobs">>).
 -define(REP_DOCS_TIMEOUT_MSEC, 17000).
 -define(REP_JOBS_TIMEOUT_MSEC, 33000).
 
--record(rep, {
-    id :: rep_id() | '_' | 'undefined',
-    source :: any() | '_',
-    target :: any() | '_',
-    options :: [_] | '_',
-    user_ctx :: any() | '_',
-    type = db :: atom() | '_',
-    view = nil :: any() | '_',
-    doc_id :: any() | '_',
-    db_name = null :: null | binary() | '_',
-    start_time = 0 :: integer() | '_',
-    stats = couch_replicator_stats:new() :: orddict:orddict() | '_'
-}).
+% Some fields from the replication doc
+-define(SOURCE, <<"source">>).
+-define(TARGET, <<"target">>).
+-define(CREATE_TARGET, <<"create_target">>).
+-define(DOC_IDS, <<"doc_ids">>).
+-define(SELECTOR, <<"selector">>).
+-define(FILTER, <<"filter">>).
+-define(QUERY_PARAMS, <<"query_params">>).
+-define(URL, <<"url">>).
+-define(AUTH, <<"auth">>).
+-define(HEADERS, <<"headers">>).
+
+% Replication states
+-define(ST_ERROR, <<"error">>).
+-define(ST_FINISHED, <<"completed">>).
+-define(ST_RUNNING, <<"running">>).
+-define(ST_INITIALIZING, <<"initializing">>).
+-define(ST_FAILED, <<"failed">>).
+-define(ST_PENDING, <<"pending">>).
+-define(ST_ERROR, <<"error">>).
+-define(ST_CRASHING, <<"crashing">>).
+-define(ST_TRIGGERED, <<"triggered">>).
+
+% Fields couch job data objects
+-define(REP, <<"rep">>).
+-define(REP_PARSE_ERROR, <<"rep_parse_error">>).
+-define(STATE, <<"state">>).
+-define(STATE_INFO, <<"state_info">>).
+-define(DOC_STATE, <<"doc_state">>).
+-define(DB_NAME, <<"db_name">>).
+-define(DOC_ID, <<"doc_id">>).
+-define(ERROR_COUNT, <<"error_count">>).
+-define(LAST_UPDATED, <<"last_updated">>).
+
+% Accepted job message tag
+-define(ACCEPTED_JOB, accepted_job).
+
+
 
 -type rep_id() :: binary().
 -type user_name() :: binary() | null.
 -type db_doc_id() :: {binary(), binary() | '_'}.
 -type seconds() :: non_neg_integer().
--type rep_start_result() ::
-    {ok, rep_id()} |
-    ignore |
-    {temporary_error, binary()} |
-    {permanent_failure, binary()}.
-
-
--record(doc_worker_result, {
-    id :: db_doc_id(),
-    wref :: reference(),
-    result :: rep_start_result()
-}).
diff --git a/src/couch_replicator/src/couch_replicator_acceptor.erl b/src/couch_replicator/src/couch_replicator_acceptor.erl
new file mode 100644
index 0000000..bd67d66
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_acceptor.erl
@@ -0,0 +1,49 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_acceptor).
+
+
+-include("couch_replicator.hrl").
+
+
+-export([
+   start/4,
+   stop/1
+]).
+
+
+start(_Type, Count, _Parent, _Opts) when Count < 1 ->
+    #{};
+
+start(Type, Count, Parent, Opts) when Count > 1 ->
+    lists:foldl(fun(_N, Acc) ->
+        Pid = spawn_link(fun() -> accept(Type, Parent, Opts) end),
+        Acc#{Pid => true}
+    end, #{}, lists:seq(1, Max)).
+
+
+stop(#{} = Acceptors) ->
+    maps:map(fun(Pid, true) ->
+        unlink(Pid),
+        exit(Pid, kill)
+    end, Acceptors),
+    ok.
+
+
+accept(Type, Parent, Opts) ->
+    case couch_jobs:accept(Type, Opts) of
+        {ok, Job, JobData} ->
+            ok = gen_server:cast(Parent, {?ACCEPTED_JOB, Job, JobData});
+        {error, not_found} ->
+            ok
+    end.
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index 6f44814..9899bef 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -29,7 +29,6 @@
 
 -export([
     during_doc_update/3,
-    after_db_create/1,
     after_db_delete/1
 ]).
 
@@ -56,31 +55,17 @@
 -define(INITIAL_BACKOFF_EXPONENT, 64).
 -define(MIN_FILTER_DELAY_SEC, 60).
 
--type filter_type() ::  nil | view | user | docids | mango.
 -type repstate() :: initializing | error | scheduled.
 
 
--record(rdoc, {
-    id :: db_doc_id() | '_' | {any(), '_'},
-    state :: repstate() | '_',
-    rep :: #rep{} | nil | '_',
-    rid :: rep_id() | nil | '_',
-    filter :: filter_type() | '_',
-    info :: binary() | nil | '_',
-    errcnt :: non_neg_integer() | '_',
-    worker :: reference() | nil | '_',
-    last_updated :: erlang:timestamp() | '_'
-}).
+-define(MAX_ACCEPTORS, 10).
+-define(MAX_JOBS, 500).
 
 
 during_doc_update(#doc{} = Doc, Db, _UpdateType) ->
     couch_stats:increment_counter([couch_replicator, docs, db_changes]),
     ok = process_change(Db, Doc).
 
-after_db_create(#{name := DbName}) ->
-    couch_stats:increment_counter([couch_replicator, docs, dbs_created]),
-    couch_replicator_docs:ensure_rep_ddoc_exists(DbName).
-
 
 after_db_delete(#{name := DbName}) ->
     couch_stats:increment_counter([couch_replicator, docs, dbs_deleted]),
@@ -112,16 +97,16 @@ process_change(#{name := DbName} = Db, #doc{} = Doc) ->
     DocState = get_json_value(<<"_replication_state">>, Props, null),
     case couch_jobs:get_job_data(Db, ?REP_DOCS, docs_job_id(DbName, DocId)) of
         {error, not_found} ->
-            update_replication_job(Db, DbName, DocId, Rep, RepError, DocState);
-        {ok, #{<<"rep">> := null, <<"rep_parse_error">> := RepError}}
+            add_rep_doc_job(Db, DbName, DocId, Rep, RepError, DocState);
+        {ok, #{?REP := null, ?REP_PARSE_ERROR := RepError}}
                 when Rep =:= null ->
             % Same error as before occurred, don't bother updating the job
             ok;
-        {ok, #{<<"rep">> := null}} when Rep =:= null ->
+        {ok, #{?REP := null}} when Rep =:= null ->
             % Error occured but it's a different error. Update the job so user
             % sees the new error
-            update_replication_job(Db, DbName, DocId, Rep, RepError, DocState);
-        {ok, #{<<"rep">> := OldRep, <<"rep_parse_error">> := OldError}} ->
+            add_rep_doc_job(Db, DbName, DocId, Rep, RepError, DocState);
+        {ok, #{?REP := OldRep, ?REP_PARSE_ERROR := OldError}} ->
             NormOldRep = couch_replicator_util:normalize_rep(OldRep),
             NormRep = couch_replicator_util:normalize_rep(Rep),
             case NormOldRep == NormRep of
@@ -130,12 +115,14 @@ process_change(#{name := DbName} = Db, #doc{} = Doc) ->
                     % for the replication job have changed, so make it a no-op
                     ok;
                 false ->
-                    update_replication_job(Db, DbName, DocId, Rep, RepError,
+                    add_rep_doc_job(Db, DbName, DocId, Rep, RepError,
                         DocState)
             end
     end.
 
 
+
+
 rep_docs_job_execute(#{} = Job, #{<<"rep">> := null} = JobData) ->
     #{
         <<"rep_parse_error">> := Error,
@@ -206,55 +193,228 @@ process_updated({DbName, _DocId} = Id, JsonRepDoc) ->
     gen_server:call(?MODULE, {updated, Id, Rep, Filter}, infinity).
 
 
-% Doc processor gen_server API and callbacks
-
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [],  []).
 
 
 init([]) ->
-    ?MODULE = ets:new(?MODULE, [named_table, {keypos, #rdoc.id},
-        {read_concurrency, true}, {write_concurrency, true}]),
-    {ok, nil}.
+    process_flag(trap_exit, true),
+    St = #{
+        acceptors => #{},
+        workers => #{}
+    },
+    {ok, update_config(St), 0}.
 
 
-terminate(_Reason, _State) ->
+terminate(_Reason, #{} = St) ->
+    #{workers := Workers, acceptors := Acceptors},
+    lists:foreach(fun(WPid) -> unlink(Pid), exit(Pid, 9) end, Workers),
+    couch_replicator_job_acceptor:stop(Acceptors),
     ok.
 
 
-handle_call({updated, Id, Rep, Filter}, _From, State) ->
-    ok = updated_doc(Id, Rep, Filter),
-    {reply, ok, State};
+handle_call(Msg, _From, #{} = St) ->
+    {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
 
-handle_call({removed, Id}, _From, State) ->
-    ok = removed_doc(Id),
-    {reply, ok, State};
+handle_cast({?ACCEPTED_JOB, Job, JobData}, #{} = St) ->
+    {noreply, execute_rep_doc_job(Job, JobData, St)};
 
-handle_call({completed, Id}, _From, State) ->
-    true = ets:delete(?MODULE, Id),
-    {reply, ok, State};
+handle_cast(Msg, #{} = St) ->
+    {stop, {bad_cast, Msg}, St}.
 
-handle_call({clean_up_replications, DbName}, _From, State) ->
-    ok = removed_db(DbName),
-    {reply, ok, State}.
 
-handle_cast(Msg, State) ->
-    {stop, {error, unexpected_message, Msg}, State}.
+handle_info({'EXIT', Pid, Reason}, #{} = St) ->
+    #{workers := Workers, acceptors := Acceptors} = St,
+    case {maps:is_key(Pid, Acceptors), maps:is_key(Pid, Workers)} of
+        {false, false} -> handle_unknown_pid(Pid, Reason, St);
+        {true, false} -> handle_acceptor_died(Pid, Reason, St);
+        {false, true} -> handle_worker_died(Pid, Reason, St)
+    end;
+
+handle_info(timeout, #{} = St) ->
+    {noreply, maybe_start_acceptors(St)};
 
+handle_info(Msg, St) ->
+    {stop, {bad_info, Msg}, St}.
 
-handle_info({'DOWN', _, _, _, #doc_worker_result{id = Id, wref = Ref,
-        result = Res}}, State) ->
-    ok = worker_returned(Ref, Id, Res),
-    {noreply, State};
 
-handle_info(_Msg, State) ->
-    {noreply, State}.
+%% handle_info({'DOWN', _, _, _, #doc_worker_result{id = Id, wref = Ref,
+%%         result = Res}}, State) ->
+%%     ok = worker_returned(Ref, Id, Res),
+%%     {noreply, State};
 
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
 
+update_config(#{} = St) ->
+    MaxJobs = config:get_integer("replicator",
+        "max_doc_processor_jobs", ?MAX_JOBS),
+    MaxAcceptors = config:get_integer("replicator",
+        "max_doc_processor_acceptors", ?MAX_ACCEPTORS),
+    AcceptTimeoutSec = config:get_integer("replicator",
+        "doc_processor_accept_timeout_sec", ?ACCEPT_TIMEOUT_SEC),
+    AcceptFudgeSec = config:get_integer("replicator",
+        "doc_processor_accept_fudge_sec", ?ACCEPT_FUDGE_SEC),
+    St#{
+        max_jobs => MaxJobs,
+        max_acceptors => MaxAcceptors,
+        accept_timeout_sec => AcceptTimeoutSec,
+        accept_fudge_sec => AcceptFudgeSec,
+    }.
+
+
+handle_acceptor_died(Pid, normal, #{acceptors := Acceptors} = St1) ->
+    St2 = St1#{acceptors := maps:remove(Pid, Acceptors)},
+    St3 = update_config(St2),
+    St4 = maybe_start_acceptors(St3),
+    {noreply, St4};
+
+handle_acceptor_died(Pid, Error, #{acceptors := Acceptors} = St) ->
+    Msg = "~p : acceptor ~p died with ~p",
+    couch_log:error(Msg, [?ERROR, Pid, Reason]),
+    {stop, {acceptor_pid_exit, Pid, Reason}, St}.
+
+
+handle_worker_died(Pid, normal, #{workers := Workers} = St1) ->
+    St2 = St1#{workers := maps:remove(Pid, Workers)},
+    St3 = maybe_start_acceptors(St2),
+    {noreply, St3};
+
+handle_worker_died(Pid, Error, #{workers := Workers} = St) ->
+    Msg = "~p : acceptor ~p died with ~p",
+    couch_log:error(Msg, [?ERROR, Pid, Reason]),
+    {stop, {worker_pid_exit, Pid, Reason}, St}.
+
+
+handle_unknown_pid(Pid, Reason, #{} = St) ->
+    Msg = "~p : unknown pid ~p died with ~p",
+    couch_log:error(Msg, [?MODULE, Pid, Reason]),
+    {stop, {unknown_pid_exit, Pid, Reason}, St};
+
+
+maybe_start_acceptors(#st{} = St1) when
+    St2 = update_config(St1),
+    #{
+        workers := Workers,
+        acceptors := Acceptors,
+        max_jobs := MaxJobs,
+        max_acceptors := MaxAcceptors
+    } = St2,
+    WCount = map_size(Workers),
+    ACount = map_size(Acceptors),
+    case ACount + WCount < MaxJobs of
+        true -> start_acceptors(MaxAcceptors - ACount, St2);
+        false -> St2
+    end.
+
+
+start_acceptors(N, #st{} = St) ->
+    St#{
+       acceptors := Acceptors,
+       accept_timeout_sec := TimeoutSec,
+       accept_fudge_sec := FudgeSec
+    },
+    Opts = #{
+        timeout = TimeoutSec,
+        max_sched_time = erlang:system_time(second) + FudgeSec
+    },
+    Pids = couch_replicator_acceptor:start(?REP_DOCS, N, self(), Opts),
+    St#{acceptors := maps:merge(Acceptors, Pids)}.
+
+
+start_worker(Job, #{} = JobData, #{workers := Workers} = St) ->
+    Parent  = self(),
+    Pid = spawn_link(fun() -> rep_doc_job_worker(Job, JobData, Parent) end),
+    St#{workers := Workers#{Pid => true}}.
+
+
+rep_doc_job_worker(Job, #{?REP := null} = RepDocData, Parent) ->
+    #{
+        ?REP_PARSE_ERROR := Error,
+        ?DB_NAME := DbName,
+        ?DOC_ID := DocId
+        ?ERROR_COUNT := ErrorCount,
+    } = RepDocData,
+    RepDocData1 = RepDocData#{
+        ?LAST_UPDATED := erlang:system_time(),
+        ?ERROR_COUNT := ErrorCount + 1,
+        ?FINISHED_STATE := ?FAILED,
+        ?FINISHED_RESULT := Error,
+    },
+    case couch_jobs:finish(undefined, Job, RepDocData1) of
+        ok ->
+            couch_replicator_docs:update_failed(DbName, DocId, Error),
+            St;
+        {error, JobError} ->
+            Msg = "~p : replication ~s job could not finish. JobError:~p",
+            couch_log:error(Msg, [?MODULE, RepId, JobError]),
+            St
+    end;
+
+rep_doc_job_worker(Job, #{?REP := #{}} = RepDocData, Parent) ->
+    #{
+        ?REP := Rep1,
+        ?DOC_STATE := DocState,
+    } = RepDocData,
+    ok = remove_old_state_fields(Rep, DocState),
+    % Calculate replication ID. In most case this is fast and easy
+    % but for filtered replications with user JS filters this
+    % means making a remote connection to the source db.
+    Rep2 = #{<<"id">> := RepId} = couch_replicator_docs:update_rep_id(Rep1),
+    try
+        couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+            maybe_start_replication_job(JTx, Rep)
+        end)
+    catch
+        throw:{job_start_error, halt} ->
+            Msg = "~p : replication doc job ~s lock conflict",
+            couch_log:error(Msg, [?MODULE, RepId]),
+            exit(normal);
+        throw:{job_start_error, Error} ->
+            Msg = "~p : replication job start failed ~slock conflict",
+            couch_log:error(Msg, [?MODULE, RepId])
+    end.
+
+
+maybe_start_replication_job(JTx, Rep) ->
+     case couch_jobs:get_job_data(JTx, ?REP_JOBS, RepId) of
+        {error, not_found} ->
+            RepJobData = #{
+                ?REP := Rep2,
+                ?STATE := ST_PENDING,
+                ?STATE_INFO := null,
+            },
+            ok = couch_jobs:add(JTx, ?REP_JOBS, RepJobData),
+            RepDocData1 = RepDocData#{
+                ?REP := Rep2,
+                ?STATE := ST_SCHEDULED,
+                ?STATE_INFO := null,
+                ?ERROR_COUNT := 0,
+                ?LAST_UPDATED => erlang:system_info()
+            },
+            case couch_jobs:finish(JTx, Job, RepDocData1) of
+                ok -> ok;
+                Error -> throw({job_start_error, Error})
+
+remove_old_state_fields(#{} = Rep, ?TRIGGERED) ->
+    case update_docs() of
+        true -> ok;
+        false -> couch_replicator_docs:remove_state_fields(DbName, DocId)
+    end;
+
+remove_old_state_fields(#{} = Rep, ?ERROR) ->
+    case update_docs() of
+        true -> ok;
+        false -> couch_replicator_docs:remove_state_fields(DbName, DocId)
+    end;
+
+maybe_remove_old_state_fields(#{} = _Rep, _) ->
+    ok.
+
+
 % Doc processor gen_server private helper functions
 
 % Handle doc update -- add to ets, then start a worker to try to turn it into
@@ -592,20 +752,24 @@ ejson_doc_state_filter(State, States) when is_list(States), is_atom(State) ->
     lists:member(State, States).
 
 
--spec update_replication(any(), binary(), binary(), #{} | null,
+-spec add_rep_doc_job(any(), binary(), binary(), #{} | null,
     binary() | null, binary() | null) -> ok.
-update_replication_job(Tx, DbName, DocId, Rep, RepParseError, DocState) ->
+add_rep_doc_job(Tx, DbName, DocId, Rep, RepParseError, DocState) ->
     JobId = docs_job_id(DbName, DocId),
     ok = remove_replication_by_doc_job_id(Db, JobId),
-    RepDocsJob = #{
-        <<"rep_id">> := null,
-        <<"db_name">> := DbName,
-        <<"doc_id">> := DocId,
-        <<"rep">> := Rep,
-        <<"rep_parse_error">> := RepParseError,
-        <<"doc_state">> := DocState
+    RepDocData = #{
+        ?REP => Rep,
+        ?REP_PARSE_ERROR => RepParseError,
+        ?DOC_STATE => DocState,
+        ?DB_NAME => DbName,
+        ?DOC_ID => DocId,
+        ?STATE => ST_INITIALIZING,
+        ?ERROR_COUNT => 0,
+        ?LAST_UPDATED => erlang:system_time(),
+        ?STATE => null,
+        ?STATE_INFO => null.
     },
-    ok = couch_jobs:add(Tx, ?REP_DOCS, RepDocsJob).
+    ok = couch_jobs:add(Tx, ?REP_DOCS, RepDocData).
 
 
 docs_job_id(DbName, Id) when is_binary(DbName), is_binary(Id) ->
@@ -617,10 +781,10 @@ remove_replication_by_doc_job_id(Tx, Id) ->
     case couch_jobs:get_job_data(Tx, ?REP_DOCS, Id) of
         {error, not_found} ->
             ok;
-        {ok, #{<<"rep_id">> := null}} ->
+        {ok, #{?REP := {<<"id">> :=  null}}} ->
             couch_jobs:remove(Tx, ?REP_DOCS, Id),
             ok;
-        {ok, #{<<"rep_id">> := RepId}} ->
+        {ok, #{?REP := {<<"id">> := RepId}}} ->
             couch_jobs:remove(Tx, ?REP_JOBS, RepId),
             couch_jobs:remove(Tx, ?REP_DOCS, Id),
             ok
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index cdc6a10..eaf8161 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -138,70 +138,17 @@ update_error(#rep{} = Rep, Error) ->
 
 -spec ensure_rep_db_exists() -> ok.
 ensure_rep_db_exists() ->
-    Opts = [?CTX, sys_db, nologifmissing],
-    case fabric2_db:create(?REP_DB_NAME, Opts) of
-        {error, file_exists} ->
-            ok;
-        {ok, _Db} ->
-            ok
+    Opts = [?CTX, sys_db],
+    case fabric2_db:create(?REP_DB_NAME, [?CTX, sys_db]) of
+        {error, file_exists} -> ok;
+        {ok, _Db} -> ok
     end.
 
 
--spec ensure_rep_ddoc_exists(binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb) ->
-    DDocId = ?REP_DESIGN_DOC,
-    case open_rep_doc(RepDb, DDocId) of
-        {not_found, database_does_not_exist} ->
-            %% database was deleted.
-            ok;
-        {not_found, _Reason} ->
-            DocProps = replication_design_doc_props(DDocId),
-            DDoc = couch_doc:from_json_obj({DocProps}),
-            couch_log:notice("creating replicator ddoc ~p", [RepDb]),
-            {ok, _Rev} = save_rep_doc(RepDb, DDoc);
-        {ok, Doc} ->
-            Latest = replication_design_doc_props(DDocId),
-            {Props0} = couch_doc:to_json_obj(Doc, []),
-            {value, {_, Rev}, Props} = lists:keytake(<<"_rev">>, 1, Props0),
-            case compare_ejson({Props}, {Latest}) of
-                true ->
-                    ok;
-                false ->
-                    LatestWithRev = [{<<"_rev">>, Rev} | Latest],
-                    DDoc = couch_doc:from_json_obj({LatestWithRev}),
-                    couch_log:notice("updating replicator ddoc ~p", [RepDb]),
-                    try
-                        {ok, _} = save_rep_doc(RepDb, DDoc)
-                    catch
-                        throw:conflict ->
-                            %% ignore, we'll retry next time
-                            ok
-                    end
-            end
-    end,
-    ok.
-
-
--spec compare_ejson({[_]}, {[_]}) -> boolean().
-compare_ejson(EJson1, EJson2) ->
-    EjsonSorted1 = couch_replicator_filters:ejsort(EJson1),
-    EjsonSorted2 = couch_replicator_filters:ejsort(EJson2),
-    EjsonSorted1 == EjsonSorted2.
-
-
--spec replication_design_doc_props(binary()) -> [_].
-replication_design_doc_props(DDocId) ->
-    [
-        {<<"_id">>, DDocId},
-        {<<"language">>, <<"javascript">>},
-        {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
-    ].
-
-
 -spec parse_rep_doc_without_id({[_]}) -> #{}.
 parse_rep_doc_without_id(RepDoc) ->
     {ok, Rep} = try
-        parse_rep_doc_without_id(RepDoc, rep_user_name(RepDoc))
+        parse_rep_doc_without_id(RepDoc, null)
     catch
         throw:{error, Reason} ->
             throw({bad_rep_doc, Reason});
@@ -358,14 +305,6 @@ save_rep_doc(DbName, Doc) ->
     end.
 
 
--spec rep_user_name({[_]}) -> binary() | null.
-rep_user_name({RepDoc}) ->
-    case get_json_value(<<"user_ctx">>, RepDoc) of
-        undefined -> null;
-        {UserCtx} -> get_json_value(<<"name">>, UserCtx, null)
-    end.
-
-
 -spec parse_rep_db(#{}, #{}, #{}) -> #{}.
 parse_rep_db(#{} = Endpoint, #{} = ProxyParams, #{} = Options) ->
     ProxyURL = case ProxyParams of
@@ -634,7 +573,8 @@ before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
        roles = Roles,
        name = Name
     } = fabric2_db:get_user_ctx(Db),
-    case lists:member(<<"_replicator">>, Roles) of
+    IsReplicator = case lists:member(<<"_replicator">>, Roles),
+    Doc1 = case IsReplicator of
     true ->
         Doc;
     false ->
@@ -649,12 +589,21 @@ before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
                 Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
             ok ->
                 Doc;
-            _ ->
+            false ->
                 throw({forbidden, <<"Can't update replication documents",
                     " from other users.">>})
             end
         end
-    end.
+    end,
+    case IsReplicator orelse Doc1#doc.deleted of
+        true ->
+            ok;  % If replicator or deleting don't validate doc body
+        false ->
+            % Encode as a map and normalize all field names as binaries
+            BMap = ?JSON_DECODE(?JSON_ENCODE(Doc1#doc.body)),
+            couch_replicator_validate_doc:validate(BMap)
+    end,
+    Doc1.
 
 
 -spec after_doc_read(#doc{}, Db::any()) -> #doc{}.
diff --git a/src/couch_replicator/src/couch_replicator_js_functions.hrl b/src/couch_replicator/src/couch_replicator_js_functions.hrl
deleted file mode 100644
index d410433..0000000
--- a/src/couch_replicator/src/couch_replicator_js_functions.hrl
+++ /dev/null
@@ -1,177 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License.  You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--define(REP_DB_DOC_VALIDATE_FUN, <<"
-    function(newDoc, oldDoc, userCtx) {
-        function reportError(error_msg) {
-            log('Error writing document `' + newDoc._id +
-                '\\' to the replicator database: ' + error_msg);
-            throw({forbidden: error_msg});
-        }
-
-        function validateEndpoint(endpoint, fieldName) {
-            if ((typeof endpoint !== 'string') &&
-                ((typeof endpoint !== 'object') || (endpoint === null))) {
-
-                reportError('The `' + fieldName + '\\' property must exist' +
-                    ' and be either a string or an object.');
-            }
-
-            if (typeof endpoint === 'object') {
-                if ((typeof endpoint.url !== 'string') || !endpoint.url) {
-                    reportError('The url property must exist in the `' +
-                        fieldName + '\\' field and must be a non-empty string.');
-                }
-
-                if ((typeof endpoint.auth !== 'undefined') &&
-                    ((typeof endpoint.auth !== 'object') ||
-                        endpoint.auth === null)) {
-
-                    reportError('`' + fieldName +
-                        '.auth\\' must be a non-null object.');
-                }
-
-                if ((typeof endpoint.headers !== 'undefined') &&
-                    ((typeof endpoint.headers !== 'object') ||
-                        endpoint.headers === null)) {
-
-                    reportError('`' + fieldName +
-                        '.headers\\' must be a non-null object.');
-                }
-            }
-        }
-
-        var isReplicator = (userCtx.roles.indexOf('_replicator') >= 0);
-        var isAdmin = (userCtx.roles.indexOf('_admin') >= 0);
-
-        if (isReplicator) {
-            // Always let replicator update the replication document
-            return;
-        }
-
-        if (newDoc._replication_state === 'failed') {
-            // Skip validation in case when we update the document with the
-            // failed state. In this case it might be malformed. However,
-            // replicator will not pay attention to failed documents so this
-            // is safe.
-            return;
-        }
-
-        if (!newDoc._deleted) {
-            validateEndpoint(newDoc.source, 'source');
-            validateEndpoint(newDoc.target, 'target');
-
-            if ((typeof newDoc.create_target !== 'undefined') &&
-                (typeof newDoc.create_target !== 'boolean')) {
-
-                reportError('The `create_target\\' field must be a boolean.');
-            }
-
-            if ((typeof newDoc.continuous !== 'undefined') &&
-                (typeof newDoc.continuous !== 'boolean')) {
-
-                reportError('The `continuous\\' field must be a boolean.');
-            }
-
-            if ((typeof newDoc.doc_ids !== 'undefined') &&
-                !isArray(newDoc.doc_ids)) {
-
-                reportError('The `doc_ids\\' field must be an array of strings.');
-            }
-
-            if ((typeof newDoc.selector !== 'undefined') &&
-                (typeof newDoc.selector !== 'object')) {
-
-                reportError('The `selector\\' field must be an object.');
-            }
-
-            if ((typeof newDoc.filter !== 'undefined') &&
-                ((typeof newDoc.filter !== 'string') || !newDoc.filter)) {
-
-                reportError('The `filter\\' field must be a non-empty string.');
-            }
-
-            if ((typeof newDoc.doc_ids !== 'undefined') &&
-                (typeof newDoc.selector !== 'undefined')) {
-
-                reportError('`doc_ids\\' field is incompatible with `selector\\'.');
-            }
-
-            if ( ((typeof newDoc.doc_ids !== 'undefined') ||
-                  (typeof newDoc.selector !== 'undefined')) &&
-                 (typeof newDoc.filter !== 'undefined') ) {
-
-                reportError('`filter\\' field is incompatible with `selector\\' and `doc_ids\\'.');
-            }
-
-            if ((typeof newDoc.query_params !== 'undefined') &&
-                ((typeof newDoc.query_params !== 'object') ||
-                    newDoc.query_params === null)) {
-
-                reportError('The `query_params\\' field must be an object.');
-            }
-
-            if (newDoc.user_ctx) {
-                var user_ctx = newDoc.user_ctx;
-
-                if ((typeof user_ctx !== 'object') || (user_ctx === null)) {
-                    reportError('The `user_ctx\\' property must be a ' +
-                        'non-null object.');
-                }
-
-                if (!(user_ctx.name === null ||
-                    (typeof user_ctx.name === 'undefined') ||
-                    ((typeof user_ctx.name === 'string') &&
-                        user_ctx.name.length > 0))) {
-
-                    reportError('The `user_ctx.name\\' property must be a ' +
-                        'non-empty string or null.');
-                }
-
-                if (!isAdmin && (user_ctx.name !== userCtx.name)) {
-                    reportError('The given `user_ctx.name\\' is not valid');
-                }
-
-                if (user_ctx.roles && !isArray(user_ctx.roles)) {
-                    reportError('The `user_ctx.roles\\' property must be ' +
-                        'an array of strings.');
-                }
-
-                if (!isAdmin && user_ctx.roles) {
-                    for (var i = 0; i < user_ctx.roles.length; i++) {
-                        var role = user_ctx.roles[i];
-
-                        if (typeof role !== 'string' || role.length === 0) {
-                            reportError('Roles must be non-empty strings.');
-                        }
-                        if (userCtx.roles.indexOf(role) === -1) {
-                            reportError('Invalid role (`' + role +
-                                '\\') in the `user_ctx\\'');
-                        }
-                    }
-                }
-            } else {
-                if (!isAdmin) {
-                    reportError('The `user_ctx\\' property is missing (it is ' +
-                       'optional for admins only).');
-                }
-            }
-        } else {
-            if (!isAdmin) {
-                if (!oldDoc.user_ctx || (oldDoc.user_ctx.name !== userCtx.name)) {
-                    reportError('Replication documents can only be deleted by ' +
-                        'admins or by the users who created them.');
-                }
-            }
-        }
-    }
-">>).
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index e8ddc84..0e1c3d9 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -87,8 +87,8 @@ start_link(#{] = Job, #{} = JobData) ->
         {ok, Pid} ->
             {ok, Pid};
         {error, Reason} ->
-            #{<<"rep">> := Rep} = JobData,
-            {<<"id">> := Id, <<"source">> := Src, <<"target">> := Ttg} = Rep,
+            #{?REP := Rep} = JobData,
+            {<<"id">> := Id, ?SOURCE := Src, ?TARGET := Ttg} = Rep,
             Source = couch_replicator_api_wrap:db_uri(Src),
             Target = couch_replicator_api_wrap:db_uri(Tgt),
             ErrMsg = "failed to start replication `~s` (`~s` -> `~s`)",
@@ -332,18 +332,18 @@ terminate(shutdown, #rep_state{id = RepId} = State) ->
 terminate({shutdown, max_backoff}, {error, {#{} = Job, #{} = JobData}}) ->
     % Here we handle the case when replication fails during initialization.
     % That is before the #rep_state{} is even built.
-    #{<<"rep">> := #{<<"id">> := RepId}} = JobData,
+    #{?REP := #{<<"id">> := RepId}} = JobData,
     couch_stats:increment_counter([couch_replicator, failed_starts]),
     couch_log:warning("Replication `~s` reached max backoff ", [RepId]),
     finish_couch_job(Job, JobData, <<"error">>, max_backoff);
 
 terminate({shutdown, {error, Error}}, {error, Class, Stack, {Job, JobData}}) ->
     % Here we handle the case when replication fails during initialization.
-    #{<<"rep">> := Rep} = JobData,
+    #{?REP := Rep} = JobData,
     #{
        <<"id">> := Id,
-       <<"source">> := Source0,
-       <<"target">> := Target0,
+       ?SOURCE := Source0,
+       ?TARGET := Target0,
        <<"doc_id">> := DocId,
        <<"db_name">> := DbName
     } = Rep,
@@ -525,7 +525,7 @@ finish_couch_job(#rep_state{} = State, FinishedState, Result) ->
 
 
 finish_couch_job(#{} = Job, #{} = JobData, FinishState, Result0) ->
-    #{<<"rep">> := #{<<"id">> := RepId}} = JobData,
+    #{?REP := #{<<"id">> := RepId}} = JobData,
     case Result of
         null -> null;
         #{} -> Result0;
@@ -534,8 +534,8 @@ finish_couch_job(#{} = Job, #{} = JobData, FinishState, Result0) ->
         Other -> couch_replicator_utils:rep_error_to_binary(Result0)
     end,
     JobData= JobData0#{
-        <<"finished_state">> => FinishState,
-        <<"finished_result">> => Result
+        ?FINISHED_STATE => FinishState,
+        ?FINISHED_RESULT => Result
     },
     case couch_jobs:finish(undefined, Job, JobData) of
         ok ->
@@ -566,18 +566,17 @@ cancel_timer(#rep_state{timer = Timer} = State) ->
     State#rep_state{timer = nil}.
 
 
-init_state(#{} = Job, #{<<"rep">> =: Rep}} = JobData) ->
+init_state(#{} = Job, #{?REP =: Rep}} = JobData) ->
     #{
         <<"id">> := Id,
         <<"base_id">> := BaseId,
-        <<"source">> := Src0,
-        <<"target">> := Tgt,
+        ?SOURCE := Src0,
+        ?TARGET := Tgt,
         <<"type">> := Type,
         <<"view">> := View,
         <<"start_time">> := StartTime,
         <<"stats">> := Stats,
         <<"options">> := OptionsMap,
-        <<"user_ctx">> := UserCtx,
         <<"db_name">> := DbName,
         <<"doc_id">> := DocId,
     } = Rep,
@@ -589,13 +588,9 @@ init_state(#{} = Job, #{<<"rep">> =: Rep}} = JobData) ->
     % Adjust minimum number of http source connections to 2 to avoid deadlock
     Src = adjust_maxconn(Src0, BaseId),
     {ok, Source} = couch_replicator_api_wrap:db_open(Src),
-    {CreateTargetParams} = get_value(create_target_params, Options, {[]}),
-    {ok, Target} = couch_replicator_api_wrap:db_open(Tgt,
-
     CreateTgt = get_value(create_target, Options, false),
-    CreateParams = maps:to_list(get_value(create_target_params, Options, #{}),
-    {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, UserCtx, CreateTgt,
-        CreateParams),
+    TParams = maps:to_list(get_value(create_target_params, Options, #{}),
+    {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, CreateTgt, TParams),
 
     {ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source),
     {ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target),
diff --git a/src/couch_replicator/src/couch_replicator_validate_doc.erl b/src/couch_replicator/src/couch_replicator_validate_doc.erl
new file mode 100644
index 0000000..da14361
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_validate_doc.erl
@@ -0,0 +1,119 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_validate_doc).
+
+
+-include("couch_replicator.hrl").
+
+
+-export([
+    validate/2
+]).
+
+
+validate(#{<<"_replication_state">> := ?ST_FAILED}) ->
+    % If replication failed, allow updating even a malformed document
+    ok;
+
+validate(#{?SOURCE := _, ?TARGET := _} = Doc) ->
+    maps:fold(fun validate_field/2, Doc).
+    validate_mutually_exclusive_filters(Doc);
+
+validate(_) ->
+    fail("Both `source` and `target` fields must exist").
+
+
+validate_field(?SOURCE, V) when is_binary(V) ->
+    ok;
+validate_field(?SOURCE, #{?URL := _} = V) ->
+    maps:fold(fun validate_endpoint_field/2, V);
+validate_field(?SOURCE, _V) ->
+    fail("`source` must be a string or an object with an `url` field").
+
+validate_field(?TARGET, V) when is_binary(V) ->
+    ok;
+validate_field(?TARGET, #{?URL := _} = V) ->
+    maps:fold(fun validate_endpoint_field/2, V);
+validate_field(?TARGET, _V) ->
+    fail("`target` must be a string or an object with an `url` field").
+
+validate_field(?CONTINUOUS, V) when is_boolean(V) ->
+    ok;
+validate_field(?CONTINUOUS, _V) ->
+    fail("`continuous` should be a boolean");
+
+validate_field(?CREATE_TARGET, V) when is_boolean(V) ->
+    ok;
+validate_field(?CREATE_TARGET, _V) ->
+    fail("`create_target` should be a boolean");
+
+validate_field(?DOC_IDS, V) when is_list(V) ->
+    lists:foreach(fun(DocId) ->
+        case is_binary(DocId), byte_size(V) > 1 of
+            true -> ok;
+            false -> fail("`doc_ids` should be a list of strings")
+        end
+    end, V);
+validate_field(?DOC_IDS, _V) ->
+    fail("`doc_ids` should be a list of string");
+
+validate_field(?SELECTOR, #{} = _V) ->
+    ok;
+validate_field(?SELECTOR, _V) ->
+    fail("`selector` should be an object");
+
+validate_field(?FILTER, V) when is_binary(V), byte_size(V) > 1 ->
+    ok;
+validate_field(?FILTER, _V) ->
+    fail("`filter` should be a non-empty string");
+
+validate_field(?QUERY_PARAMS, V) when is_map(V) orelse V =:= null ->
+    ok;
+validate_field(?QUERY_PARAMS, _V) ->
+    fail("`query_params` should be a an object or `null`").
+
+
+validate_endpoint_field(?URL, V) when is_binary(V) ->
+    ok;
+validate_endpoint_field(?URL, _V) ->
+    fail("`url` endpoint field must be a string");
+
+validate_endpoint_field(?AUTH, #{} = V) ->
+    ok;
+validate_endpoint_field(?AUTH, _V) ->
+    fail("`auth` endpoint field must be an object");
+
+validate_endpoint_field(?HEADERS, #{} = V) ->
+    ok;
+validate_endpoint_field(?HEADERS, _V) ->
+    fail("`headers` endpoint field must be an object").
+
+
+validate_mutually_exclusive_filter(#{} = Doc) ->
+    DocIds = maps:get(?DOC_IDS, Doc, undefined),
+    Selector = maps:get(?SELECTOR, Doc, undefined),
+    Filter = maps:get(?FILTER, Doc, undefined),
+    Defined = [V || V <- [DocIds, Selector, Filter], V =/= undefined],
+    case length(Defined) > 1 of
+        true ->
+            fail("`doc_ids`, `selector` and `filter` are mutually exclusive");
+        false ->
+            ok
+    end.
+
+
+fail(Msg) when is_list(Msg) ->
+    fail(list_to_binary(Msg));
+
+fail(Msg) when is_binary(Msg) ->
+    throw({forbidden, Msg}).
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 9ec9f2b..ea32531 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -787,7 +787,7 @@ maybe_add_sys_db_callbacks(Db) ->
                 fun couch_replicator_docs:before_doc_update/3,
                 fun couch_replicator_doc_processor:during_doc_update/3,
                 fun couch_replicator_docs:after_doc_read/2,
-                fun couch_replicator_doc_processor:after_db_create/1,
+                undefined,
                 fun couch_replicator_doc_processor:after_db_delete/1
             };
         IsUsersDb ->


[couchdb] 01/03: FDB Replicator WIP

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch prototype/fdb-replicator-2
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit bab8763c57ff6ca962bbb5f06e1dba39f452dcb5
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Tue Jul 16 12:57:13 2019 -0400

    FDB Replicator WIP
---
 src/chttpd/src/chttpd.erl                          |  61 ---
 src/chttpd/src/chttpd_misc.erl                     |  48 +-
 src/chttpd/test/eunit/chttpd_handlers_tests.erl    |   3 +-
 src/couch_jobs/src/couch_jobs.erl                  |  17 +
 src/couch_replicator/src/couch_replicator.erl      | 145 +++--
 src/couch_replicator/src/couch_replicator.hrl      |   9 +-
 .../src/couch_replicator_api_wrap.erl              | 123 +++--
 .../src/couch_replicator_clustering.erl            | 248 ---------
 .../src/couch_replicator_db_changes.erl            | 108 ----
 .../src/couch_replicator_doc_processor.erl         | 251 +++++----
 src/couch_replicator/src/couch_replicator_docs.erl | 607 +++++++++++----------
 .../src/couch_replicator_filters.erl               |  32 +-
 src/couch_replicator/src/couch_replicator_ids.erl  |  62 ++-
 .../src/couch_replicator_notifier.erl              |  58 --
 .../src/couch_replicator_scheduler_job.erl         | 354 ++++++------
 .../src/couch_replicator_scheduler_sup.erl         |   6 +-
 src/couch_replicator/src/couch_replicator_sup.erl  |  20 +-
 .../src/couch_replicator_utils.erl                 |  82 ++-
 .../test/eunit/couch_replicator_proxy_tests.erl    |   4 +-
 src/fabric/src/fabric2_db.erl                      |  71 ++-
 src/fabric/src/fabric2_fdb.erl                     |   7 +-
 21 files changed, 990 insertions(+), 1326 deletions(-)

diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl
index 4d32c03..7326bca 100644
--- a/src/chttpd/src/chttpd.erl
+++ b/src/chttpd/src/chttpd.erl
@@ -403,20 +403,6 @@ maybe_log(_HttpReq, #httpd_resp{should_log = false}) ->
     ok.
 
 
-%% HACK: replication currently handles two forms of input, #db{} style
-%% and #http_db style. We need a third that makes use of fabric. #db{}
-%% works fine for replicating the dbs and nodes database because they
-%% aren't sharded. So for now when a local db is specified as the source or
-%% the target, it's hacked to make it a full url and treated as a remote.
-possibly_hack(#httpd{path_parts=[<<"_replicate">>]}=Req) ->
-    {Props0} = chttpd:json_body_obj(Req),
-    Props1 = fix_uri(Req, Props0, <<"source">>),
-    Props2 = fix_uri(Req, Props1, <<"target">>),
-    put(post_body, {Props2}),
-    Req;
-possibly_hack(Req) ->
-    Req.
-
 check_request_uri_length(Uri) ->
     check_request_uri_length(Uri, config:get("httpd", "max_uri_length")).
 
@@ -439,53 +425,6 @@ check_url_encoding([$% | _]) ->
 check_url_encoding([_ | Rest]) ->
     check_url_encoding(Rest).
 
-fix_uri(Req, Props, Type) ->
-    case replication_uri(Type, Props) of
-    undefined ->
-        Props;
-    Uri0 ->
-        case is_http(Uri0) of
-        true ->
-            Props;
-        false ->
-            Uri = make_uri(Req, quote(Uri0)),
-            [{Type,Uri}|proplists:delete(Type,Props)]
-        end
-    end.
-
-replication_uri(Type, PostProps) ->
-    case couch_util:get_value(Type, PostProps) of
-    {Props} ->
-        couch_util:get_value(<<"url">>, Props);
-    Else ->
-        Else
-    end.
-
-is_http(<<"http://", _/binary>>) ->
-    true;
-is_http(<<"https://", _/binary>>) ->
-    true;
-is_http(_) ->
-    false.
-
-make_uri(Req, Raw) ->
-    Port = integer_to_list(mochiweb_socket_server:get(chttpd, port)),
-    Url = list_to_binary(["http://", config:get("httpd", "bind_address"),
-                          ":", Port, "/", Raw]),
-    Headers = [
-        {<<"authorization">>, ?l2b(header_value(Req,"authorization",""))},
-        {<<"cookie">>, ?l2b(extract_cookie(Req))}
-    ],
-    {[{<<"url">>,Url}, {<<"headers">>,{Headers}}]}.
-
-extract_cookie(#httpd{mochi_req = MochiReq}) ->
-    case MochiReq:get_cookie_value("AuthSession") of
-        undefined ->
-            "";
-        AuthSession ->
-            "AuthSession=" ++ AuthSession
-    end.
-%%% end hack
 
 set_auth_handlers() ->
     AuthenticationDefault =  "{chttpd_auth, cookie_authentication_handler},
diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl
index 11d2c5b..d832ade 100644
--- a/src/chttpd/src/chttpd_misc.erl
+++ b/src/chttpd/src/chttpd_misc.erl
@@ -203,8 +203,8 @@ handle_task_status_req(Req) ->
 handle_replicate_req(#httpd{method='POST', user_ctx=Ctx} = Req) ->
     chttpd:validate_ctype(Req, "application/json"),
     %% see HACK in chttpd.erl about replication
-    PostBody = get(post_body),
-    case replicate(PostBody, Ctx) of
+    PostBody = chttpd:json_body_obj(Req),
+    case couch_replicator:replicate(PostBody, Ctx) of
         {ok, {continuous, RepId}} ->
             send_json(Req, 202, {[{ok, true}, {<<"_local_id">>, RepId}]});
         {ok, {cancelled, RepId}} ->
@@ -223,50 +223,6 @@ handle_replicate_req(#httpd{method='POST', user_ctx=Ctx} = Req) ->
 handle_replicate_req(Req) ->
     send_method_not_allowed(Req, "POST").
 
-replicate({Props} = PostBody, Ctx) ->
-    case couch_util:get_value(<<"cancel">>, Props) of
-    true ->
-        cancel_replication(PostBody, Ctx);
-    _ ->
-        Node = choose_node([
-            couch_util:get_value(<<"source">>, Props),
-            couch_util:get_value(<<"target">>, Props)
-        ]),
-        case rpc:call(Node, couch_replicator, replicate, [PostBody, Ctx]) of
-        {badrpc, Reason} ->
-            erlang:error(Reason);
-        Res ->
-            Res
-        end
-    end.
-
-cancel_replication(PostBody, Ctx) ->
-    {Res, _Bad} = rpc:multicall(couch_replicator, replicate, [PostBody, Ctx]),
-    case [X || {ok, {cancelled, _}} = X <- Res] of
-    [Success|_] ->
-        % Report success if at least one node canceled the replication
-        Success;
-    [] ->
-        case lists:usort(Res) of
-        [UniqueReply] ->
-            % Report a universally agreed-upon reply
-            UniqueReply;
-        [] ->
-            {error, badrpc};
-        Else ->
-            % Unclear what to do here -- pick the first error?
-            % Except try ignoring any {error, not_found} responses
-            % because we'll always get two of those
-            hd(Else -- [{error, not_found}])
-        end
-    end.
-
-choose_node(Key) when is_binary(Key) ->
-    Checksum = erlang:crc32(Key),
-    Nodes = lists:sort([node()|erlang:nodes()]),
-    lists:nth(1 + Checksum rem length(Nodes), Nodes);
-choose_node(Key) ->
-    choose_node(term_to_binary(Key)).
 
 handle_reload_query_servers_req(#httpd{method='POST'}=Req) ->
     chttpd:validate_ctype(Req, "application/json"),
diff --git a/src/chttpd/test/eunit/chttpd_handlers_tests.erl b/src/chttpd/test/eunit/chttpd_handlers_tests.erl
index f3e8f5d..5ae80d0 100644
--- a/src/chttpd/test/eunit/chttpd_handlers_tests.erl
+++ b/src/chttpd/test/eunit/chttpd_handlers_tests.erl
@@ -70,7 +70,8 @@ request_replicate(Url, Body) ->
     Headers = [{"Content-Type", "application/json"}],
     Handler = {chttpd_misc, handle_replicate_req},
     request(post, Url, Headers, Body, Handler, fun(Req) ->
-        chttpd:send_json(Req, 200, get(post_body))
+        PostBody = chttpd:json_body_obj(Req),
+        chttpd:send_json(Req, 200, PostBody)
     end).
 
 request(Method, Url, Headers, Body, {M, F}, MockFun) ->
diff --git a/src/couch_jobs/src/couch_jobs.erl b/src/couch_jobs/src/couch_jobs.erl
index d469ed4..393f256 100644
--- a/src/couch_jobs/src/couch_jobs.erl
+++ b/src/couch_jobs/src/couch_jobs.erl
@@ -19,6 +19,8 @@
     remove/3,
     get_job_data/3,
     get_job_state/3,
+    get_jobs/2,
+    get_jobs/3,
 
     % Job processing
     accept/1,
@@ -103,6 +105,21 @@ get_job_state(Tx, Type, JobId) when is_binary(JobId) ->
     end).
 
 
+-spec get_jobs(jtx(), job_type()) -> #{}.
+get_jobs(Tx, Type) when is_binary(JobId) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+        couch_job_fdb:get_jobs(JTx, Type)
+    end).
+
+
+-spec get_jobs(jtx(), job_type()) -> #{}.
+get_jobs(Tx, Type, Filter) when is_binary(JobId), is_function(Filter, 1) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+        couch_job_fdb:get_jobs(JTx, Type, Filter)
+    end).
+
+
+
 %% Job processor API
 
 -spec accept(job_type()) -> {ok, job(), job_data()} | {error, any()}.
diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index e4fa31c..225a258 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -52,30 +52,27 @@
     {ok, {cancelled, binary()}} |
     {error, any()} |
     no_return().
-replicate(PostBody, Ctx) ->
-    {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, Ctx),
-    Rep = Rep0#rep{start_time = os:timestamp()},
-    #rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep,
-    case get_value(cancel, Options, false) of
-    true ->
-        CancelRepId = case get_value(id, Options, nil) of
-        nil ->
-            RepId;
-        RepId2 ->
-            RepId2
-        end,
-        case check_authorization(CancelRepId, UserCtx) of
-        ok ->
-            cancel_replication(CancelRepId);
-        not_found ->
-            {error, not_found}
-        end;
-    false ->
-        check_authorization(RepId, UserCtx),
-        {ok, Listener} = rep_result_listener(RepId),
-        Result = do_replication_loop(Rep),
-        couch_replicator_notifier:stop(Listener),
-        Result
+replicate(PostBody, UserCtx) ->
+    {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, UserCtx),
+    Rep = Rep0#{<<"start_time">> => erlang:system_time()},
+    #{<<"id">> := RepId, <<"options">> := Options} = Rep,
+    case maps:get(<<"cancel">>, Options, false) of
+        true ->
+            CancelRepId = case maps:get(<<"id">>, Options, nil) of
+                nil -> RepId;
+                RepId2 -> RepId2
+            end,
+            case check_authorization(CancelRepId, UserCtx) of
+                ok -> cancel_replication(CancelRepId);
+                not_found -> {error, not_found}
+            end;
+        false ->
+            check_authorization(RepId, UserCtx),
+            ok = couch_replicator_scheduler:add_job(Rep),
+            case maps:get(<<"continuous">>, Options, false) of
+                true -> {ok, {continuous, Id}};
+                false -> wait_for_result(Id)
+            end
     end.
 
 
@@ -83,57 +80,50 @@ replicate(PostBody, Ctx) ->
 % it returns `ignore`.
 -spec ensure_rep_db_exists() -> ignore.
 ensure_rep_db_exists() ->
-    {ok, _Db} = couch_replicator_docs:ensure_rep_db_exists(),
+    ok = couch_replicator_docs:ensure_rep_db_exists(),
+    couch_jobs:set_type_timeout(?REP_DOCS, ?REP_DOCS_TIMEOUT_MSEC),
+    couch_jobs:set_type_timeout(?REP_JOBS, ?REP_JOBS_TIMEOUT_MSEC),
     ignore.
 
 
--spec do_replication_loop(#rep{}) ->
-    {ok, {continuous, binary()}} | {ok, tuple()} | {error, any()}.
-do_replication_loop(#rep{id = {BaseId, Ext} = Id, options = Options} = Rep) ->
-    ok = couch_replicator_scheduler:add_job(Rep),
-    case get_value(continuous, Options, false) of
-    true ->
-        {ok, {continuous, ?l2b(BaseId ++ Ext)}};
-    false ->
-        wait_for_result(Id)
-    end.
-
-
--spec rep_result_listener(rep_id()) -> {ok, pid()}.
-rep_result_listener(RepId) ->
-    ReplyTo = self(),
-    {ok, _Listener} = couch_replicator_notifier:start_link(
-        fun({_, RepId2, _} = Ev) when RepId2 =:= RepId ->
-                ReplyTo ! Ev;
-            (_) ->
-                ok
-        end).
-
-
 -spec wait_for_result(rep_id()) ->
     {ok, {[_]}} | {error, any()}.
 wait_for_result(RepId) ->
-    receive
-    {finished, RepId, RepResult} ->
-        {ok, RepResult};
-    {error, RepId, Reason} ->
-        {error, Reason}
+    FinishRes = case couch_jobs:subscribe(?REP_JOBS, RepId) of
+        {ok, finished, JobData} ->
+            {ok, JobData};
+        {ok, SubId, _, _} ->
+            case couch_jobs:wait(SubId, finished, infinity) of
+                {?REP_JOBS, RepId, finished, JobData} -> {ok, JobData};
+                timeout -> timeout
+            end;
+        {error, Error} ->
+            {error, Error}
+    end,
+    case FinishRes of
+       {ok, #{<<"finished_result">> := CheckpointHistory}} ->
+            {ok, CheckpointHistory};
+       timeout ->
+            {error, timeout};
+       {error, Error} ->
+            {error, Error}
     end.
 
 
 -spec cancel_replication(rep_id()) ->
     {ok, {cancelled, binary()}} | {error, not_found}.
-cancel_replication({BasedId, Extension} = RepId) ->
-    FullRepId = BasedId ++ Extension,
-    couch_log:notice("Canceling replication '~s' ...", [FullRepId]),
-    case couch_replicator_scheduler:rep_state(RepId) of
-    #rep{} ->
-        ok = couch_replicator_scheduler:remove_job(RepId),
-        couch_log:notice("Replication '~s' cancelled", [FullRepId]),
-        {ok, {cancelled, ?l2b(FullRepId)}};
-    nil ->
-        couch_log:notice("Replication '~s' not found", [FullRepId]),
-        {error, not_found}
+cancel_replication(RepId) when is_binary(RepId) ->
+    couch_log:notice("Canceling replication '~s' ...", [RepId]),
+    case couch_jobs:get_job_data(undefined, ?REP_JOBS, RepId) of
+        {error_not, found} ->
+            {error, not_found};
+        #{<<"rep">> := #{<<"db_name">> := null}} ->
+            couch_jobs:remove(undefined, ?REP_JOBS, RepId)
+            {ok, {cancelled, ?l2b(FullRepId)}};
+        #{<<"rep">> := #{}} ->
+            % Job was started from a replicator doc canceling via _replicate
+            % doesn't quite make sense, instead replicator should be deleted.
+            {error, not_found}
     end.
 
 
@@ -142,10 +132,10 @@ replication_states() ->
     ?REPLICATION_STATES.
 
 
--spec strip_url_creds(binary() | {[_]}) -> binary().
+-spec strip_url_creds(binary() | #{}) -> binary().
 strip_url_creds(Endpoint) ->
-    case couch_replicator_docs:parse_rep_db(Endpoint, [], []) of
-        #httpdb{url=Url} ->
+    case couch_replicator_docs:parse_rep_db(Endpoint, #{}, #{}) of
+        #{<<"url">> := Url} ->
             iolist_to_binary(couch_util:url_strip_password(Url));
         LocalDb when is_binary(LocalDb) ->
             LocalDb
@@ -286,13 +276,13 @@ state_atom(State) when is_atom(State) ->
 
 -spec check_authorization(rep_id(), #user_ctx{}) -> ok | not_found.
 check_authorization(RepId, #user_ctx{name = Name} = Ctx) ->
-    case couch_replicator_scheduler:rep_state(RepId) of
-    #rep{user_ctx = #user_ctx{name = Name}} ->
-        ok;
-    #rep{} ->
-        couch_httpd:verify_is_server_admin(Ctx);
-    nil ->
-        not_found
+    case couch_jobs:get_job_data(undefined, ?REP_JOBS, RePid) of
+        {error_not, found} ->
+            not_found;
+        #{<<"rep">> := {<<"user">> := Name}} ->
+            ok;
+        #{} ->
+            couch_httpd:verify_is_server_admin(Ctx)
     end.
 
 
@@ -342,13 +332,6 @@ t_replication_not_found() ->
     end).
 
 
-expect_rep_user_ctx(Name, Role) ->
-    meck:expect(couch_replicator_scheduler, rep_state,
-        fun(_Id) ->
-            UserCtx = #user_ctx{name = Name, roles = [Role]},
-            #rep{user_ctx = UserCtx}
-        end).
-
 
 strip_url_creds_test_() ->
      {
diff --git a/src/couch_replicator/src/couch_replicator.hrl b/src/couch_replicator/src/couch_replicator.hrl
index 2a5b7c8..6584078 100644
--- a/src/couch_replicator/src/couch_replicator.hrl
+++ b/src/couch_replicator/src/couch_replicator.hrl
@@ -11,6 +11,10 @@
 % the License.
 
 -define(REP_ID_VERSION, 4).
+-define(REP_DOCS, <<"repdocs">>).
+-define(REP_JOBS, <<"repjobs">>).
+-define(REP_DOCS_TIMEOUT_MSEC, 17000).
+-define(REP_JOBS_TIMEOUT_MSEC, 33000).
 
 -record(rep, {
     id :: rep_id() | '_' | 'undefined',
@@ -22,11 +26,12 @@
     view = nil :: any() | '_',
     doc_id :: any() | '_',
     db_name = null :: null | binary() | '_',
-    start_time = {0, 0, 0} :: erlang:timestamp() | '_',
+    start_time = 0 :: integer() | '_',
     stats = couch_replicator_stats:new() :: orddict:orddict() | '_'
 }).
 
--type rep_id() :: {string(), string()}.
+-type rep_id() :: binary().
+-type user_name() :: binary() | null.
 -type db_doc_id() :: {binary(), binary() | '_'}.
 -type seconds() :: non_neg_integer().
 -type rep_start_result() ::
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index ab1de7d..5c03632 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -38,8 +38,8 @@
     open_doc/3,
     open_doc_revs/6,
     changes_since/5,
-    db_uri/1,
-    normalize_db/1
+    db_uri/1
+    db_from_map/1,
     ]).
 
 -import(couch_replicator_httpc, [
@@ -57,21 +57,19 @@
 -define(MAX_URL_LEN, 7000).
 -define(MIN_URL_LEN, 200).
 
-db_uri(#httpdb{url = Url}) ->
+db_uri(#{<<"url">> := Url}) ->
     couch_util:url_strip_password(Url);
 
-db_uri(DbName) when is_binary(DbName) ->
-    ?b2l(DbName);
+db_uri(#httpdb{url = Url}) ->
+    couch_util:url_strip_password(Url).
 
-db_uri(Db) ->
-    db_uri(couch_db:name(Db)).
 
+db_open(#{} = Db) ->
+    db_open(Db, false, []);
 
-db_open(Db) ->
-    db_open(Db, false, []).
 
-db_open(#httpdb{} = Db1, Create, CreateParams) ->
-    {ok, Db} = couch_replicator_httpc:setup(Db1),
+db_open(#{} = Db0, Create, CreateParams) ->
+    {ok, Db} = couch_replicator_httpc:setup(db_from_json(Db0)),
     try
         case Create of
         false ->
@@ -149,6 +147,7 @@ get_pending_count(#httpdb{} = Db, Seq) ->
         {ok, couch_util:get_value(<<"pending">>, Props, null)}
     end).
 
+
 get_view_info(#httpdb{} = Db, DDocId, ViewName) ->
     Path = io_lib:format("~s/_view/~s/_info", [DDocId, ViewName]),
     send_req(Db, [{path, Path}],
@@ -285,7 +284,6 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) ->
             open_doc_revs(RetryDb, Id, Revs, Options, Fun, Acc)
     end.
 
-
 error_reason({http_request_failed, "GET", _Url, {error, timeout}}) ->
     timeout;
 error_reason({http_request_failed, "GET", _Url, {error, {_, req_timedout}}}) ->
@@ -356,7 +354,6 @@ update_doc(#httpdb{} = HttpDb, #doc{id = DocId} = Doc, Options, Type) ->
                 end
         end).
 
-
 update_docs(Db, DocList, Options) ->
     update_docs(Db, DocList, Options, interactive_edit).
 
@@ -889,23 +886,6 @@ header_value(Key, Headers, Default) ->
     end.
 
 
-% Normalize an #httpdb{} or #db{} record such that it can be used for
-% comparisons. This means remove things like pids and also sort options / props.
-normalize_db(#httpdb{} = HttpDb) ->
-    #httpdb{
-        url = HttpDb#httpdb.url,
-        auth_props = lists:sort(HttpDb#httpdb.auth_props),
-        headers = lists:keysort(1, HttpDb#httpdb.headers),
-        timeout = HttpDb#httpdb.timeout,
-        ibrowse_options = lists:keysort(1, HttpDb#httpdb.ibrowse_options),
-        retries = HttpDb#httpdb.retries,
-        http_connections = HttpDb#httpdb.http_connections
-    };
-
-normalize_db(<<DbName/binary>>) ->
-    DbName.
-
-
 maybe_append_create_query_params(Db, []) ->
     Db;
 
@@ -914,27 +894,72 @@ maybe_append_create_query_params(Db, CreateParams) ->
     Db#httpdb{url = NewUrl}.
 
 
--ifdef(TEST).
+db_from_json(#{} = DbMap) ->
+    #{
+        <<"url">> := Url,
+        <<"auth">> := Auth,
+        <<"headers">> := Headers0,
+        <<"ibrowse_options">> := IBrowseOptions0,
+        <<"timeout">> := Timeout,
+        <<"http_connections">> := HttpConnections,
+        <<"retries">> := Retries,
+        <<"proxy_url">> := ProxyURL0
+    } = DbMap,
+    Headers = maps:fold(fun(K, V, Acc) ->
+        [{binary_to_list(K), binary_to_list(V)} | Acc]
+    end, [], Headers0),
+    IBrowseOptions0 = maps:fold(fun
+        (<<"proxy_protocol">>, V, Acc) ->
+            [{binary_to_atom(K), binary_to_existing_atom(V)} | Acc];
+        (<<"socket_options">>, #{} = SockOpts, Acc) ->
+            SockOptsKVs = maps:fold(fun sock_opts_fold/3, [], SockOpts),
+            [{socket_options, SockOptsKVs} | Acc];
+        (<<"ssl_options">>, #{} = SslOpts, Acc) ->
+            SslOptsKVs = maps:fold(fun ssl_opts_fold/3, [], SslOpts),
+            [{ssl_options, SslOptsKVs} | Acc];
+        (K, V, Acc) when is_binary(V) ->
+            [{binary_to_atom(K), binary_to_list(V)} | Acc];
+        (K, V, Acc) ->
+            [{binary_to_atom(K), V} | Acc]
+    end, [], IBrowseOptions0),
+    ProxyUrl = case ProxyUrl0 of
+        null -> undefined,
+        V when is_binary(V) -> binary_to_list(V)
+    end,
+    #httpdb{
+        url = binary_to_list(Url),
+        auth_props = maps:to_list(Auth),
+        headers = Headers,
+        ibrowse_options = IBrowseOptions,
+        timeout = Timeout,
+        http_connections = HttpConnections,
+        retries = Retries,
+        proxy_url = ProxyURL
+    }.
+
+
 
--include_lib("eunit/include/eunit.hrl").
+% See couch_replicator_docs:ssl_params/1 for ssl parsed options
+% and http://erlang.org/doc/man/ssl.html#type-server_option
+% all latest SSL server options
+%
+ssl_opts_fold(K, V, Acc) when is_boolean(V); is_integer(V) ->
+    [{binary_to_atom(K), V} | Acc];
+
+ssl_opts_fold(K, null, Acc) ->
+    [{binary_to_atom(K), undefined} | Acc];
 
+ssl_opts_fold(<<"verify">>, V, Acc) ->
+    [{binary_to_atom(K), binary_to_atom(V)};
 
-normalize_http_db_test() ->
-    HttpDb =  #httpdb{
-        url = "http://host/db",
-        auth_props = [{"key", "val"}],
-        headers = [{"k2","v2"}, {"k1","v1"}],
-        timeout = 30000,
-        ibrowse_options = [{k2, v2}, {k1, v1}],
-        retries = 10,
-        http_connections = 20
-    },
-    Expected = HttpDb#httpdb{
-        headers = [{"k1","v1"}, {"k2","v2"}],
-        ibrowse_options = [{k1, v1}, {k2, v2}]
-    },
-    ?assertEqual(Expected, normalize_db(HttpDb)),
-    ?assertEqual(<<"local">>, normalize_db(<<"local">>)).
+ssl_opts_fold(K, V, Acc) when is_list(V) ->
+    [{binary_to_atom(K), binary_to_list(V)} | Acc].
 
 
--endif.
+% See ?VALID_SOCK_OPTS in couch_replicator_docs for accepted socket options
+%
+sock_opts_fold(K, V, Acc) when is_list(V) ->
+     [{binary_to_atom(K), binary_to_atom(V)} | Acc];
+
+sock_opts_fold(K, V, Acc) when is_boolean(V); is_integer(V) ->
+    [{binary_to_atom(K), V} | Acc].
diff --git a/src/couch_replicator/src/couch_replicator_clustering.erl b/src/couch_replicator/src/couch_replicator_clustering.erl
deleted file mode 100644
index a7f7573..0000000
--- a/src/couch_replicator/src/couch_replicator_clustering.erl
+++ /dev/null
@@ -1,248 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-
-% Maintain cluster membership and stability notifications for replications.
-% On changes to cluster membership, broadcast events to `replication` gen_event.
-% Listeners will get `{cluster, stable}` or `{cluster, unstable}` events.
-%
-% Cluster stability is defined as "there have been no nodes added or removed in
-% last `QuietPeriod` seconds". QuietPeriod value is configurable. To ensure a
-% speedier startup, during initialization there is a shorter StartupPeriod
-% in effect (also configurable).
-%
-% This module is also in charge of calculating ownership of replications based
-% on where their _replicator db documents shards live.
-
-
--module(couch_replicator_clustering).
-
--behaviour(gen_server).
--behaviour(config_listener).
--behaviour(mem3_cluster).
-
--export([
-    start_link/0
-]).
-
--export([
-    init/1,
-    terminate/2,
-    handle_call/3,
-    handle_info/2,
-    handle_cast/2,
-    code_change/3
-]).
-
--export([
-    owner/2,
-    is_stable/0,
-    link_cluster_event_listener/3
-]).
-
-% config_listener callbacks
--export([
-    handle_config_change/5,
-    handle_config_terminate/3
-]).
-
-% mem3_cluster callbacks
--export([
-    cluster_stable/1,
-    cluster_unstable/1
-]).
-
--include_lib("couch/include/couch_db.hrl").
--include_lib("mem3/include/mem3.hrl").
-
--define(DEFAULT_QUIET_PERIOD, 60). % seconds
--define(DEFAULT_START_PERIOD, 5). % seconds
--define(RELISTEN_DELAY, 5000).
-
--record(state, {
-    mem3_cluster_pid :: pid(),
-    cluster_stable :: boolean()
-}).
-
-
--spec start_link() -> {ok, pid()} | ignore | {error, term()}.
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-
-% owner/2 function computes ownership for a {DbName, DocId} tuple
-% `unstable` if cluster is considered to be unstable i.e. it has changed
-% recently, or returns node() which of the owner.
-%
--spec owner(Dbname :: binary(), DocId :: binary()) -> node() | unstable.
-owner(<<"shards/", _/binary>> = DbName, DocId) ->
-    case is_stable() of
-        false ->
-            unstable;
-        true ->
-            owner_int(DbName, DocId)
-    end;
-owner(_DbName, _DocId) ->
-    node().
-
-
--spec is_stable() -> true | false.
-is_stable() ->
-    gen_server:call(?MODULE, is_stable).
-
-
--spec link_cluster_event_listener(atom(), atom(), list()) -> pid().
-link_cluster_event_listener(Mod, Fun, Args)
-        when is_atom(Mod), is_atom(Fun), is_list(Args) ->
-    CallbackFun =
-        fun(Event = {cluster, _}) -> erlang:apply(Mod, Fun, Args ++ [Event]);
-           (_) -> ok
-        end,
-    {ok, Pid} = couch_replicator_notifier:start_link(CallbackFun),
-    Pid.
-
-
-% Mem3 cluster callbacks
-
-cluster_unstable(Server) ->
-    ok = gen_server:call(Server, set_unstable),
-    couch_replicator_notifier:notify({cluster, unstable}),
-    couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
-    couch_log:notice("~s : cluster unstable", [?MODULE]),
-    Server.
-
-cluster_stable(Server) ->
-    ok = gen_server:call(Server, set_stable),
-    couch_replicator_notifier:notify({cluster, stable}),
-    couch_stats:update_gauge([couch_replicator, cluster_is_stable], 1),
-    couch_log:notice("~s : cluster stable", [?MODULE]),
-    Server.
-
-
-% gen_server callbacks
-
-init([]) ->
-    ok = config:listen_for_changes(?MODULE, nil),
-    Period = abs(config:get_integer("replicator", "cluster_quiet_period",
-        ?DEFAULT_QUIET_PERIOD)),
-    StartPeriod = abs(config:get_integer("replicator", "cluster_start_period",
-        ?DEFAULT_START_PERIOD)),
-    couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
-    {ok, Mem3Cluster} = mem3_cluster:start_link(?MODULE, self(), StartPeriod,
-        Period),
-    {ok, #state{mem3_cluster_pid = Mem3Cluster, cluster_stable = false}}.
-
-
-terminate(_Reason, _State) ->
-    ok.
-
-
-handle_call(is_stable, _From, #state{cluster_stable = IsStable} = State) ->
-    {reply, IsStable, State};
-
-handle_call(set_stable, _From, State) ->
-    {reply, ok, State#state{cluster_stable = true}};
-
-handle_call(set_unstable, _From, State) ->
-    {reply, ok, State#state{cluster_stable = false}}.
-
-
-handle_cast({set_period, Period}, #state{mem3_cluster_pid = Pid} = State) ->
-    ok = mem3_cluster:set_period(Pid, Period),
-    {noreply, State}.
-
-
-handle_info(restart_config_listener, State) ->
-    ok = config:listen_for_changes(?MODULE, nil),
-    {noreply, State}.
-
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-
-%% Internal functions
-
-
-handle_config_change("replicator", "cluster_quiet_period", V, _, S) ->
-    ok = gen_server:cast(?MODULE, {set_period, list_to_integer(V)}),
-    {ok, S};
-handle_config_change(_, _, _, _, S) ->
-    {ok, S}.
-
-
-handle_config_terminate(_, stop, _) -> ok;
-handle_config_terminate(_S, _R, _St) ->
-    Pid = whereis(?MODULE),
-    erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener).
-
-
--spec owner_int(binary(), binary()) -> node().
-owner_int(ShardName, DocId) ->
-    DbName = mem3:dbname(ShardName),
-    Live = [node() | nodes()],
-    Shards = mem3:shards(DbName, DocId),
-    Nodes = [N || #shard{node=N} <- Shards, lists:member(N, Live)],
-    mem3:owner(DbName, DocId, Nodes).
-
-
-
--ifdef(TEST).
-
--include_lib("eunit/include/eunit.hrl").
-
-
-replicator_clustering_test_() ->
-    {
-        foreach,
-        fun setup/0,
-        fun teardown/1,
-        [
-            t_stable_callback(),
-            t_unstable_callback()
-        ]
-    }.
-
-
-t_stable_callback() ->
-    ?_test(begin
-        ?assertEqual(false, is_stable()),
-        cluster_stable(whereis(?MODULE)),
-        ?assertEqual(true, is_stable())
-    end).
-
-
-t_unstable_callback() ->
-    ?_test(begin
-        cluster_stable(whereis(?MODULE)),
-        ?assertEqual(true, is_stable()),
-        cluster_unstable(whereis(?MODULE)),
-        ?assertEqual(false, is_stable())
-    end).
-
-
-setup() ->
-    meck:expect(couch_log, notice, 2, ok),
-    meck:expect(config, get, fun(_, _, Default) -> Default end),
-    meck:expect(config, listen_for_changes, 2, ok),
-    meck:expect(couch_stats, update_gauge, 2, ok),
-    meck:expect(couch_replicator_notifier, notify, 1, ok),
-    {ok, Pid} = start_link(),
-    Pid.
-
-
-teardown(Pid) ->
-    unlink(Pid),
-    exit(Pid, kill),
-    meck:unload().
-
--endif.
diff --git a/src/couch_replicator/src/couch_replicator_db_changes.erl b/src/couch_replicator/src/couch_replicator_db_changes.erl
deleted file mode 100644
index 92b0222..0000000
--- a/src/couch_replicator/src/couch_replicator_db_changes.erl
+++ /dev/null
@@ -1,108 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_db_changes).
-
--behaviour(gen_server).
-
--export([
-   start_link/0
-]).
-
--export([
-   init/1,
-   terminate/2,
-   handle_call/3,
-   handle_info/2,
-   handle_cast/2,
-   code_change/3
-]).
-
--export([
-   notify_cluster_event/2
-]).
-
--record(state, {
-   event_listener :: pid(),
-   mdb_changes :: pid() | nil
-}).
-
-
--spec notify_cluster_event(pid(), {cluster, any()}) -> ok.
-notify_cluster_event(Server, {cluster, _} = Event) ->
-    gen_server:cast(Server, Event).
-
-
--spec start_link() ->
-    {ok, pid()} | ignore | {error, any()}.
-start_link() ->
-    gen_server:start_link(?MODULE, [], []).
-
-
-init([]) ->
-    EvtPid = couch_replicator_clustering:link_cluster_event_listener(?MODULE,
-        notify_cluster_event, [self()]),
-    State = #state{event_listener = EvtPid, mdb_changes = nil},
-    case couch_replicator_clustering:is_stable() of
-        true ->
-            {ok, restart_mdb_changes(State)};
-        false ->
-            {ok, State}
-    end.
-
-
-terminate(_Reason, _State) ->
-    ok.
-
-
-handle_call(_Msg, _From, State) ->
-    {reply, {error, invalid_call}, State}.
-
-
-handle_cast({cluster, unstable}, State) ->
-    {noreply, stop_mdb_changes(State)};
-
-handle_cast({cluster, stable}, State) ->
-    {noreply, restart_mdb_changes(State)}.
-
-
-handle_info(_Msg, State) ->
-    {noreply, State}.
-
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-
--spec restart_mdb_changes(#state{}) -> #state{}.
-restart_mdb_changes(#state{mdb_changes = nil} = State) ->
-    Suffix = <<"_replicator">>,
-    CallbackMod = couch_replicator_doc_processor,
-    Options = [skip_ddocs],
-    {ok, Pid} = couch_multidb_changes:start_link(Suffix, CallbackMod, nil,
-        Options),
-    couch_stats:increment_counter([couch_replicator, db_scans]),
-    couch_log:notice("Started replicator db changes listener ~p", [Pid]),
-    State#state{mdb_changes = Pid};
-
-restart_mdb_changes(#state{mdb_changes = _Pid} = State) ->
-    restart_mdb_changes(stop_mdb_changes(State)).
-
-
--spec stop_mdb_changes(#state{}) -> #state{}.
-stop_mdb_changes(#state{mdb_changes = nil} = State) ->
-    State;
-stop_mdb_changes(#state{mdb_changes = Pid} = State) ->
-    couch_log:notice("Stopping replicator db changes listener ~p", [Pid]),
-    unlink(Pid),
-    exit(Pid, kill),
-    State#state{mdb_changes = nil}.
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index 772037d..6f44814 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -13,7 +13,6 @@
 -module(couch_replicator_doc_processor).
 
 -behaviour(gen_server).
--behaviour(couch_multidb_changes).
 
 -export([
     start_link/0
@@ -29,10 +28,9 @@
 ]).
 
 -export([
-    db_created/2,
-    db_deleted/2,
-    db_found/2,
-    db_change/3
+    during_doc_update/3,
+    after_db_create/1,
+    after_db_delete/1
 ]).
 
 -export([
@@ -40,8 +38,7 @@
     doc/2,
     doc_lookup/3,
     update_docs/0,
-    get_worker_ref/1,
-    notify_cluster_event/2
+    get_worker_ref/1
 ]).
 
 -include_lib("couch/include/couch_db.hrl").
@@ -76,88 +73,103 @@
 }).
 
 
-% couch_multidb_changes API callbacks
+during_doc_update(#doc{} = Doc, Db, _UpdateType) ->
+    couch_stats:increment_counter([couch_replicator, docs, db_changes]),
+    ok = process_change(Db, Doc).
 
-db_created(DbName, Server) ->
+after_db_create(#{name := DbName}) ->
     couch_stats:increment_counter([couch_replicator, docs, dbs_created]),
-    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
-    Server.
+    couch_replicator_docs:ensure_rep_ddoc_exists(DbName).
 
 
-db_deleted(DbName, Server) ->
+after_db_delete(#{name := DbName}) ->
     couch_stats:increment_counter([couch_replicator, docs, dbs_deleted]),
-    ok = gen_server:call(?MODULE, {clean_up_replications, DbName}, infinity),
-    Server.
-
+    remove_replications_by_dbname(DbName).
 
-db_found(DbName, Server) ->
-    couch_stats:increment_counter([couch_replicator, docs, dbs_found]),
-    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
-    Server.
 
+process_change(_Db, #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>}) ->
+    ok;
 
-db_change(DbName, {ChangeProps} = Change, Server) ->
-    couch_stats:increment_counter([couch_replicator, docs, db_changes]),
-    try
-        ok = process_change(DbName, Change)
+process_change(#{name := DbName} = Db, #doc{deleted = true} = Doc) ->
+    Id = docs_job_id(DbName, Doc#doc.id),
+    ok = remove_replication_by_doc_job_id(Db, Id);
+
+process_change(#{name := DbName} = Db, #doc{} = Doc) ->
+    #doc{id = DocId, body = {Props} = Body} = Doc,
+    {Rep, RepError} = try
+        Rep0 = couch_replicator_docs:parse_rep_doc_without_id(Body),
+        Rep1 = Rep0#{
+            <<"db_name">> => DbName,
+            <<"start_time">> => erlang:system_time()
+        },
+        {Rep1, null}
     catch
-    _Tag:Error ->
-        {RepProps} = get_json_value(doc, ChangeProps),
-        DocId = get_json_value(<<"_id">>, RepProps),
-        couch_replicator_docs:update_failed(DbName, DocId, Error)
+        throw:{bad_rep_doc, Reason} ->
+            {null, couch_replicator_utils:rep_error_to_binary(Reason)}
     end,
-    Server.
-
-
--spec get_worker_ref(db_doc_id()) -> reference() | nil.
-get_worker_ref({DbName, DocId}) when is_binary(DbName), is_binary(DocId) ->
-    case ets:lookup(?MODULE, {DbName, DocId}) of
-        [#rdoc{worker = WRef}] when is_reference(WRef) ->
-            WRef;
-        [#rdoc{worker = nil}] ->
-            nil;
-        [] ->
-            nil
+    % We keep track of the doc's state in order to clear it if update_docs
+    % is toggled from true to false
+    DocState = get_json_value(<<"_replication_state">>, Props, null),
+    case couch_jobs:get_job_data(Db, ?REP_DOCS, docs_job_id(DbName, DocId)) of
+        {error, not_found} ->
+            update_replication_job(Db, DbName, DocId, Rep, RepError, DocState);
+        {ok, #{<<"rep">> := null, <<"rep_parse_error">> := RepError}}
+                when Rep =:= null ->
+            % Same error as before occurred, don't bother updating the job
+            ok;
+        {ok, #{<<"rep">> := null}} when Rep =:= null ->
+            % Error occured but it's a different error. Update the job so user
+            % sees the new error
+            update_replication_job(Db, DbName, DocId, Rep, RepError, DocState);
+        {ok, #{<<"rep">> := OldRep, <<"rep_parse_error">> := OldError}} ->
+            NormOldRep = couch_replicator_util:normalize_rep(OldRep),
+            NormRep = couch_replicator_util:normalize_rep(Rep),
+            case NormOldRep == NormRep of
+                true ->
+                    % Document was changed but none of the parameters relevent
+                    % for the replication job have changed, so make it a no-op
+                    ok;
+                false ->
+                    update_replication_job(Db, DbName, DocId, Rep, RepError,
+                        DocState)
+            end
     end.
 
 
-% Cluster membership change notification callback
--spec notify_cluster_event(pid(), {cluster, any()}) -> ok.
-notify_cluster_event(Server, {cluster, _} = Event) ->
-    gen_server:cast(Server, Event).
-
-
-process_change(DbName, {Change}) ->
-    {RepProps} = JsonRepDoc = get_json_value(doc, Change),
-    DocId = get_json_value(<<"_id">>, RepProps),
-    Owner = couch_replicator_clustering:owner(DbName, DocId),
-    Id = {DbName, DocId},
-    case {Owner, get_json_value(deleted, Change, false)} of
-    {_, true} ->
-        ok = gen_server:call(?MODULE, {removed, Id}, infinity);
-    {unstable, false} ->
-        couch_log:notice("Not starting '~s' as cluster is unstable", [DocId]);
-    {ThisNode, false} when ThisNode =:= node() ->
-        case get_json_value(<<"_replication_state">>, RepProps) of
-        undefined ->
-            ok = process_updated(Id, JsonRepDoc);
-        <<"triggered">> ->
-            maybe_remove_state_fields(DbName, DocId),
-            ok = process_updated(Id, JsonRepDoc);
-        <<"completed">> ->
-            ok = gen_server:call(?MODULE, {completed, Id}, infinity);
-        <<"error">> ->
-            % Handle replications started from older versions of replicator
-            % which wrote transient errors to replication docs
-            maybe_remove_state_fields(DbName, DocId),
-            ok = process_updated(Id, JsonRepDoc);
-        <<"failed">> ->
-            ok
-        end;
-    {Owner, false} ->
-        ok
+rep_docs_job_execute(#{} = Job, #{<<"rep">> := null} = JobData) ->
+    #{
+        <<"rep_parse_error">> := Error,
+        <<"db_name">> := DbName,
+        <<"doc_id">> := DocId,
+    } = JobData,
+    JobData1 = JobData#{
+        <<"finished_state">> := <<"failed">>,
+        <<"finished_result">> := Error
+    }
+    case couch_jobs:finish(undefined, Job, JobData1) of
+        ok ->
+            couch_replicator_docs:update_failed(DbName, DocId, Error),
+            ok;
+        {error, JobError} ->
+            Msg = "Replication ~s job could not finish. JobError:~p",
+            couch_log:error(Msg, [RepId, JobError]),
+            {error, JobError}
+    end;
+
+rep_docs_job_execute(#{} = Job, #{} = JobData) ->
+    #{<<"rep">> := Rep, <<"doc_state">> := DocState} = JobData,
+    case lists:member(DocState, [<<"triggered">>, <<"error">>]) of
+        true -> maybe_remove_state_fields(DbName, DocId),
+        false -> ok
     end,
-    ok.
+    % completed jobs should finish right away
+
+    % otherwise start computing the replication id
+
+    Rep1 = update_replication_id(Rep),
+
+    % when done add or update the replicaton job
+    % if jobs has a filter keep checking if filter changes
 
 
 maybe_remove_state_fields(DbName, DocId) ->
@@ -203,8 +215,6 @@ start_link() ->
 init([]) ->
     ?MODULE = ets:new(?MODULE, [named_table, {keypos, #rdoc.id},
         {read_concurrency, true}, {write_concurrency, true}]),
-    couch_replicator_clustering:link_cluster_event_listener(?MODULE,
-        notify_cluster_event, [self()]),
     {ok, nil}.
 
 
@@ -228,15 +238,6 @@ handle_call({clean_up_replications, DbName}, _From, State) ->
     ok = removed_db(DbName),
     {reply, ok, State}.
 
-handle_cast({cluster, unstable}, State) ->
-    % Ignoring unstable state transition
-    {noreply, State};
-
-handle_cast({cluster, stable}, State) ->
-    % Membership changed recheck all the replication document ownership
-    nil = ets:foldl(fun cluster_membership_foldl/2, nil, ?MODULE),
-    {noreply, State};
-
 handle_cast(Msg, State) ->
     {stop, {error, unexpected_message, Msg}, State}.
 
@@ -591,21 +592,57 @@ ejson_doc_state_filter(State, States) when is_list(States), is_atom(State) ->
     lists:member(State, States).
 
 
--spec cluster_membership_foldl(#rdoc{}, nil) -> nil.
-cluster_membership_foldl(#rdoc{id = {DbName, DocId} = Id, rid = RepId}, nil) ->
-    case couch_replicator_clustering:owner(DbName, DocId) of
-        unstable ->
-            nil;
-        ThisNode when ThisNode =:= node() ->
-            nil;
-        OtherNode ->
-            Msg = "Replication doc ~p:~p with id ~p usurped by node ~p",
-            couch_log:notice(Msg, [DbName, DocId, RepId, OtherNode]),
-            removed_doc(Id),
-            nil
+-spec update_replication(any(), binary(), binary(), #{} | null,
+    binary() | null, binary() | null) -> ok.
+update_replication_job(Tx, DbName, DocId, Rep, RepParseError, DocState) ->
+    JobId = docs_job_id(DbName, DocId),
+    ok = remove_replication_by_doc_job_id(Db, JobId),
+    RepDocsJob = #{
+        <<"rep_id">> := null,
+        <<"db_name">> := DbName,
+        <<"doc_id">> := DocId,
+        <<"rep">> := Rep,
+        <<"rep_parse_error">> := RepParseError,
+        <<"doc_state">> := DocState
+    },
+    ok = couch_jobs:add(Tx, ?REP_DOCS, RepDocsJob).
+
+
+docs_job_id(DbName, Id) when is_binary(DbName), is_binary(Id) ->
+    <<DbName/binary, "|", Id/binary>>.
+
+
+-spec remove_replication_by_doc_job_id(Tx, Id) -> ok.
+remove_replication_by_doc_job_id(Tx, Id) ->
+    case couch_jobs:get_job_data(Tx, ?REP_DOCS, Id) of
+        {error, not_found} ->
+            ok;
+        {ok, #{<<"rep_id">> := null}} ->
+            couch_jobs:remove(Tx, ?REP_DOCS, Id),
+            ok;
+        {ok, #{<<"rep_id">> := RepId}} ->
+            couch_jobs:remove(Tx, ?REP_JOBS, RepId),
+            couch_jobs:remove(Tx, ?REP_DOCS, Id),
+            ok
     end.
 
 
+-spec remove_replications_by_dbname(DbName) -> ok.
+remove_replications_by_dbname(DbName) ->
+    DbNameSize = byte_size(DbName),
+    Filter = fun
+        (<<DbName:DbNameSize/binary, "|", _, _/binary>>) -> true;
+        (_) -> false
+    end,
+    JobsMap = couch_job:get_jobs(undefined, ?REP_DOCS, Filter),
+    % Batch these into smaller transactions eventually...
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+        maps:map(fun(Id, _) ->
+            remove_replication_by_doc_job_id(JTx, Id)
+        end, JobsMap)
+    end).
+
+
 -ifdef(TEST).
 
 -include_lib("eunit/include/eunit.hrl").
@@ -634,8 +671,7 @@ doc_processor_test_() ->
             t_failed_change(),
             t_change_for_different_node(),
             t_change_when_cluster_unstable(),
-            t_ejson_docs(),
-            t_cluster_membership_foldl()
+            t_ejson_docs()
         ]
     }.
 
@@ -787,21 +823,6 @@ t_ejson_docs() ->
     end).
 
 
-% Check that when cluster membership changes records from doc processor and job
-% scheduler get removed
-t_cluster_membership_foldl() ->
-   ?_test(begin
-        mock_existing_jobs_lookup([test_rep(?R1)]),
-        ?assertEqual(ok, process_change(?DB, change())),
-        meck:expect(couch_replicator_clustering, owner, 2, different_node),
-        ?assert(ets:member(?MODULE, {?DB, ?DOC1})),
-        gen_server:cast(?MODULE, {cluster, stable}),
-        meck:wait(2, couch_replicator_scheduler, find_jobs_by_doc, 2, 5000),
-        ?assertNot(ets:member(?MODULE, {?DB, ?DOC1})),
-        ?assert(removed_job(?R1))
-   end).
-
-
 get_worker_ref_test_() ->
     {
         setup,
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index c07caa1..cdc6a10 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -22,12 +22,11 @@
     after_doc_read/2,
     ensure_rep_db_exists/0,
     ensure_rep_ddoc_exists/1,
-    ensure_cluster_rep_ddoc_exists/1,
     remove_state_fields/2,
     update_doc_completed/3,
     update_failed/3,
     update_rep_id/1,
-    update_triggered/2,
+    update_triggered/3,
     update_error/2
 ]).
 
@@ -57,6 +56,23 @@
 -define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}).
 -define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
 
+-define(DEFAULT_SOCK_OPTS, "[{keepalive, true}, {nodelay, false}]").
+-define(VALID_SOCK_OPTS, [buffer, delay_send, exit_on_close, ipv6_v6only,
+    keepalive, nodelay, recbuf, send_timeout, send_timout_close, sndbuf,
+    priority, tos, tclass
+]).
+
+-define(CONFIG_DEFAULTS, [
+    {"worker_processes",    "4",                fun list_to_integer/1},
+    {"worker_batch_size",   "500",              fun list_to_integer/1},
+    {"http_connections",    "20",               fun list_to_integer/1},
+    {"connection_timeout",  "30000",            fun list_to_integer/1},
+    {"retries_per_request", "5",                fun list_to_integer/1},
+    {"use_checkpoints",     "true",             fun list_to_existing_atom/1},
+    {"checkpoint_interval", "30000",            fun list_to_integer/1},
+    {"socket_options",      ?DEFAULT_SOCK_OPTS, fun parse_sock_opts/1}
+]).
+
 
 remove_state_fields(DbName, DocId) ->
     update_rep_doc(DbName, DocId, [
@@ -90,28 +106,27 @@ update_failed(DbName, DocId, Error) ->
         failed_state_updates]).
 
 
--spec update_triggered(#rep{}, rep_id()) -> ok.
-update_triggered(Rep, {Base, Ext}) ->
-    #rep{
-        db_name = DbName,
-        doc_id = DocId
-    } = Rep,
+-spec update_triggered(binary(), binary(), binary()) -> ok.
+update_triggered(Id, DocId, DbName) ->
     update_rep_doc(DbName, DocId, [
         {<<"_replication_state">>, <<"triggered">>},
         {<<"_replication_state_reason">>, undefined},
-        {<<"_replication_id">>, iolist_to_binary([Base, Ext])},
+        {<<"_replication_id">>, Id},
         {<<"_replication_stats">>, undefined}]),
     ok.
 
 
--spec update_error(#rep{}, any()) -> ok.
-update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) ->
+-spec update_error(#{}, any()) -> ok.
+update_error(#rep{} = Rep, Error) ->
+    #{
+        <<"id">> := RepId0,
+        <<"db_name">> := DbName,
+        <<"doc_id">> := DocId,
+    } = Rep,
     Reason = error_reason(Error),
-    BinRepId = case RepId of
-        {Base, Ext} ->
-            iolist_to_binary([Base, Ext]);
-        _Other ->
-            null
+    RepId = case RepId0 of
+        Id when is_binary(Id) -> Id;
+        _Other -> null
     end,
     update_rep_doc(DbName, DocId, [
         {<<"_replication_state">>, <<"error">>},
@@ -121,34 +136,22 @@ update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) ->
     ok.
 
 
--spec ensure_rep_db_exists() -> {ok, Db::any()}.
+-spec ensure_rep_db_exists() -> ok.
 ensure_rep_db_exists() ->
-    Db = case couch_db:open_int(?REP_DB_NAME, [?CTX, sys_db,
-            nologifmissing]) of
-        {ok, Db0} ->
-            Db0;
-        _Error ->
-            {ok, Db0} = couch_db:create(?REP_DB_NAME, [?CTX, sys_db]),
-            Db0
-    end,
-    ok = ensure_rep_ddoc_exists(?REP_DB_NAME),
-    {ok, Db}.
-
-
--spec ensure_rep_ddoc_exists(binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb) ->
-    case mem3:belongs(RepDb, ?REP_DESIGN_DOC) of
-        true ->
-            ensure_rep_ddoc_exists(RepDb, ?REP_DESIGN_DOC);
-        false ->
+    Opts = [?CTX, sys_db, nologifmissing],
+    case fabric2_db:create(?REP_DB_NAME, Opts) of
+        {error, file_exists} ->
+            ok;
+        {ok, _Db} ->
             ok
     end.
 
 
--spec ensure_rep_ddoc_exists(binary(), binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb, DDocId) ->
+-spec ensure_rep_ddoc_exists(binary()) -> ok.
+ensure_rep_ddoc_exists(RepDb) ->
+    DDocId = ?REP_DESIGN_DOC,
     case open_rep_doc(RepDb, DDocId) of
-        {not_found, no_db_file} ->
+        {not_found, database_does_not_exist} ->
             %% database was deleted.
             ok;
         {not_found, _Reason} ->
@@ -179,13 +182,6 @@ ensure_rep_ddoc_exists(RepDb, DDocId) ->
     ok.
 
 
--spec ensure_cluster_rep_ddoc_exists(binary()) -> ok.
-ensure_cluster_rep_ddoc_exists(RepDb) ->
-    DDocId = ?REP_DESIGN_DOC,
-    [#shard{name = DbShard} | _] = mem3:shards(RepDb, DDocId),
-    ensure_rep_ddoc_exists(DbShard, DDocId).
-
-
 -spec compare_ejson({[_]}, {[_]}) -> boolean().
 compare_ejson(EJson1, EJson2) ->
     EjsonSorted1 = couch_replicator_filters:ejsort(EJson1),
@@ -202,31 +198,10 @@ replication_design_doc_props(DDocId) ->
     ].
 
 
-% Note: parse_rep_doc can handle filtered replications. During parsing of the
-% replication doc it will make possibly remote http requests to the source
-% database. If failure or parsing of filter docs fails, parse_doc throws a
-% {filter_fetch_error, Error} excation. This exception should be considered
-% transient in respect to the contents of the document itself, since it depends
-% on netowrk availability of the source db and other factors.
--spec parse_rep_doc({[_]}) -> #rep{}.
-parse_rep_doc(RepDoc) ->
-    {ok, Rep} = try
-        parse_rep_doc(RepDoc, rep_user_ctx(RepDoc))
-    catch
-        throw:{error, Reason} ->
-            throw({bad_rep_doc, Reason});
-        throw:{filter_fetch_error, Reason} ->
-            throw({filter_fetch_error, Reason});
-        Tag:Err ->
-            throw({bad_rep_doc, to_binary({Tag, Err})})
-    end,
-    Rep.
-
-
--spec parse_rep_doc_without_id({[_]}) -> #rep{}.
+-spec parse_rep_doc_without_id({[_]}) -> #{}.
 parse_rep_doc_without_id(RepDoc) ->
     {ok, Rep} = try
-        parse_rep_doc_without_id(RepDoc, rep_user_ctx(RepDoc))
+        parse_rep_doc_without_id(RepDoc, rep_user_name(RepDoc))
     catch
         throw:{error, Reason} ->
             throw({bad_rep_doc, Reason});
@@ -236,11 +211,12 @@ parse_rep_doc_without_id(RepDoc) ->
     Rep.
 
 
--spec parse_rep_doc({[_]}, #user_ctx{}) -> {ok, #rep{}}.
-parse_rep_doc(Doc, UserCtx) ->
-    {ok, Rep} = parse_rep_doc_without_id(Doc, UserCtx),
-    Cancel = get_value(cancel, Rep#rep.options, false),
-    Id = get_value(id, Rep#rep.options, nil),
+-spec parse_rep_doc({[_]}, user_name()) -> {ok, #{}}.
+parse_rep_doc({[_]} = Doc, UserName) ->
+    {ok, Rep} = parse_rep_doc_without_id(Doc, UserName),
+    #{<<"options">> := Options} = Rep,
+    Cancel = maps:get(<<"cancel">>, Options, false),
+    Id = maps:get(<<"id">>, Options, nil),
     case {Cancel, Id} of
         {true, nil} ->
             % Cancel request with no id, must parse id out of body contents
@@ -254,38 +230,43 @@ parse_rep_doc(Doc, UserCtx) ->
     end.
 
 
--spec parse_rep_doc_without_id({[_]}, #user_ctx{}) -> {ok, #rep{}}.
-parse_rep_doc_without_id({Props}, UserCtx) ->
-    Proxy = get_value(<<"proxy">>, Props, <<>>),
-    Opts = make_options(Props),
-    case get_value(cancel, Opts, false) andalso
-        (get_value(id, Opts, nil) =/= nil) of
+-spec parse_rep_doc_without_id({[_]} | #{}, user_name()) -> {ok, #{}}.
+parse_rep_doc_without_id({[_]} = EJson, UserName) ->
+    % Normalize all field names to be binaries and turn into a map
+    Map = ?JSON_DECODE(?JSON_ENCODE(EJson)),
+    parse_rep_doc_without_id(Map, UserName);
+
+parse_rep_doc_without_id(#{} = Doc, UserName) ->
+    Proxy = parse_proxy_params(maps:get(<<"proxy">>, Doc, <<>>)),
+    Opts = make_options(Doc),
+    Cancel = maps:get(<<"cancel">>, Opts, false),
+    Id = maps:get(<<"id">>, Opts, nil),
+    case Cancel andalso Id =/= nil of
     true ->
-        {ok, #rep{options = Opts, user_ctx = UserCtx}};
+        {ok, #{<<"options">> => Opts, <<"user">> => UserName}};
     false ->
-        Source = parse_rep_db(get_value(<<"source">>, Props), Proxy, Opts),
-        Target = parse_rep_db(get_value(<<"target">>, Props), Proxy, Opts),
+        #{<<"source">> := Source0, <<"target">> := Target0} = Doc,
+        Source = parse_rep_db(Source0, Proxy, Opts),
+        Target = parse_rep_db(Target0, Proxy, Opts),
         {Type, View} = case couch_replicator_filters:view_type(Props, Opts) of
-        {error, Error} ->
-            throw({bad_request, Error});
-        Result ->
-            Result
+            {error, Error} -> throw({bad_request, Error});
+            Result -> Result
         end,
-        Rep = #rep{
-            source = Source,
-            target = Target,
-            options = Opts,
-            user_ctx = UserCtx,
-            type = Type,
-            view = View,
-            doc_id = get_value(<<"_id">>, Props, null)
+        Rep = #{
+            <<"id">> => null,
+            <<"base_id">> => null,
+            <<"source">> => Source,
+            <<"target">> => Target,
+            <<"options">> => Opts,
+            <<"user">> => UserName,
+            <<"type">> => Type,
+            <<"view">> => View,
+            <<"doc_id">> => maps:get(<<"_id">>, Doc, null)
         },
         % Check if can parse filter code, if not throw exception
         case couch_replicator_filters:parse(Opts) of
-        {error, FilterError} ->
-            throw({error, FilterError});
-        {ok, _Filter} ->
-             ok
+            {error, FilterError} -> throw({error, FilterError});
+            {ok, _Filter} -> ok
         end,
         {ok, Rep}
     end.
@@ -295,9 +276,10 @@ parse_rep_doc_without_id({Props}, UserCtx) ->
 % fetching a filter from the source db, and so it could fail intermetently.
 % In case of a failure to fetch the filter this function will throw a
 %  `{filter_fetch_error, Reason} exception.
-update_rep_id(Rep) ->
-    RepId = couch_replicator_ids:replication_id(Rep),
-    Rep#rep{id = RepId}.
+update_rep_id(#{} = Rep) ->
+    {BaseId, ExtId} = couch_replicator_ids:replication_id(Rep),
+    RepId = erlang:iolist_to_binary([BaseId, ExtId]),
+    Rep#{<<"id">> => RepId, <<"base_id">> = BaseId}.
 
 
 update_rep_doc(RepDbName, RepDocId, KVs) ->
@@ -350,22 +332,21 @@ update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) ->
 
 
 open_rep_doc(DbName, DocId) ->
-    case couch_db:open_int(DbName, [?CTX, sys_db]) of
-        {ok, Db} ->
-            try
-                couch_db:open_doc(Db, DocId, [ejson_body])
-            after
-                couch_db:close(Db)
-            end;
-        Else ->
-            Else
+    try
+        case fabric2_db:open(DbName, [?CTX, sys_db]) of
+            {ok, Db} -> fabric2_db:open_doc(Db, DocId, [ejson_body]);
+            Else -> Else
+        end
+    catch
+        error:database_does_not_exist ->
+            {not_found, database_does_not_exist}
     end.
 
 
 save_rep_doc(DbName, Doc) ->
-    {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
+    {ok, Db} = fabric2_db:open(DbName, [?CTX, sys_db]),
     try
-        couch_db:update_doc(Db, Doc, [])
+        fabric2_db:update_doc(Db, Doc, [])
     catch
         % User can accidently write a VDU which prevents _replicator from
         % updating replication documents. Avoid crashing replicator and thus
@@ -374,54 +355,56 @@ save_rep_doc(DbName, Doc) ->
             Msg = "~p VDU function preventing doc update to ~s ~s ~p",
             couch_log:error(Msg, [?MODULE, DbName, Doc#doc.id, Reason]),
             {ok, forbidden}
-    after
-        couch_db:close(Db)
     end.
 
 
--spec rep_user_ctx({[_]}) -> #user_ctx{}.
-rep_user_ctx({RepDoc}) ->
+-spec rep_user_name({[_]}) -> binary() | null.
+rep_user_name({RepDoc}) ->
     case get_json_value(<<"user_ctx">>, RepDoc) of
-    undefined ->
-        #user_ctx{};
-    {UserCtx} ->
-        #user_ctx{
-            name = get_json_value(<<"name">>, UserCtx, null),
-            roles = get_json_value(<<"roles">>, UserCtx, [])
-        }
+        undefined -> null;
+        {UserCtx} -> get_json_value(<<"name">>, UserCtx, null)
     end.
 
 
--spec parse_rep_db({[_]} | binary(), binary(), [_]) -> #httpd{} | binary().
-parse_rep_db({Props}, Proxy, Options) ->
-    ProxyParams = parse_proxy_params(Proxy),
+-spec parse_rep_db(#{}, #{}, #{}) -> #{}.
+parse_rep_db(#{} = Endpoint, #{} = ProxyParams, #{} = Options) ->
     ProxyURL = case ProxyParams of
-        [] -> undefined;
-        _ -> binary_to_list(Proxy)
+       #{<<"proxy_url">> := PUrl} -> PUrl;
+       _ -> null
     end,
-    Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)),
-    {AuthProps} = get_value(<<"auth">>, Props, {[]}),
-    {BinHeaders} = get_value(<<"headers">>, Props, {[]}),
-    Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]),
-    DefaultHeaders = (#httpdb{})#httpdb.headers,
-    #httpdb{
-        url = Url,
-        auth_props = AuthProps,
-        headers = lists:ukeymerge(1, Headers, DefaultHeaders),
-        ibrowse_options = lists:keysort(1,
-            [{socket_options, get_value(socket_options, Options)} |
-                ProxyParams ++ ssl_params(Url)]),
-        timeout = get_value(connection_timeout, Options),
-        http_connections = get_value(http_connections, Options),
-        retries = get_value(retries, Options),
-        proxy_url = ProxyURL
-    };
+
+    Url0 = maps:get(<<"url">>, Endpoint),
+    Url = maybe_add_trailing_slash(Url0),
+
+    AuthProps = maps:get(<<"auth">>, Endpoint, #{}),
+
+    Headers0 = maps:get(<<"headers">>, Endpoint, #{}),
+    DefaultHeaders = couch_replicator_utils:get_default_headers(),
+    % For same keys values in second map override those in the first
+    Headers = maps:merge(DefaultHeaders, Headers0),
+
+    SockOpts = maps:get(<<"socket_options">>, Options, #{}),
+    SockAndProxy = maps:merge(SockOpts, ProxyParams),
+
+    SslParams = ssl_params(Url),
+
+    #{
+        <<"url">> => Url,
+        <<"auth_props">> => AuthProps,
+        <<"headers">> => Headers,
+        <<"ibrowse_options">> => maps:merge(SslParams, SockAndProxy),
+        <<"timeout">> => maps:get(<<"timeout">>, Options),
+        <<"http_connections">> => maps:get(<<"http_connections">>, Options),
+        <<"retries">> => maps:get(<<"retries">>, Options)
+        <<"proxy_url">> => ProxyUrl
+    }.
+
 
 parse_rep_db(<<"http://", _/binary>> = Url, Proxy, Options) ->
-    parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options);
+    parse_rep_db(#{<<"url">> => Url}, Proxy, Options);
 
 parse_rep_db(<<"https://", _/binary>> = Url, Proxy, Options) ->
-    parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options);
+    parse_rep_db(#{<<"url">> => Url}, Proxy, Options);
 
 parse_rep_db(<<_/binary>>, _Proxy, _Options) ->
     throw({error, <<"Local endpoints not supported since CouchDB 3.x">>});
@@ -430,118 +413,99 @@ parse_rep_db(undefined, _Proxy, _Options) ->
     throw({error, <<"Missing replicator database">>}).
 
 
--spec maybe_add_trailing_slash(binary() | list()) -> list().
+-spec maybe_add_trailing_slash(binary()) -> binary().
+maybe_add_trailing_slash(<<>>) ->
+    <<>>;
+
 maybe_add_trailing_slash(Url) when is_binary(Url) ->
-    maybe_add_trailing_slash(?b2l(Url));
-maybe_add_trailing_slash(Url) ->
-    case lists:member($?, Url) of
-        true ->
-            Url;  % skip if there are query params
-        false ->
-            case lists:last(Url) of
-                $/ ->
-                    Url;
-                _ ->
-                    Url ++ "/"
-            end
+    case binary:match(Url, <<"?">>) of
+        nomatch ->
+            case binary:last(Url) of
+                $/  -> Url;
+                _ -> <<Url/binary, "/">>;
+        _ ->
+            Url  % skip if there are query params
     end.
 
 
--spec make_options([_]) -> [_].
-make_options(Props) ->
-    Options0 = lists:ukeysort(1, convert_options(Props)),
+-spec make_options(#{}) -> #{}.
+make_options(#{} = RepDoc) ->
+    Options0 = maps:fold(fun convert_options/3, #{}, RepDoc)
     Options = check_options(Options0),
-    DefWorkers = config:get("replicator", "worker_processes", "4"),
-    DefBatchSize = config:get("replicator", "worker_batch_size", "500"),
-    DefConns = config:get("replicator", "http_connections", "20"),
-    DefTimeout = config:get("replicator", "connection_timeout", "30000"),
-    DefRetries = config:get("replicator", "retries_per_request", "5"),
-    UseCheckpoints = config:get("replicator", "use_checkpoints", "true"),
-    DefCheckpointInterval = config:get("replicator", "checkpoint_interval",
-        "30000"),
-    {ok, DefSocketOptions} = couch_util:parse_term(
-        config:get("replicator", "socket_options",
-            "[{keepalive, true}, {nodelay, false}]")),
-    lists:ukeymerge(1, Options, lists:keysort(1, [
-        {connection_timeout, list_to_integer(DefTimeout)},
-        {retries, list_to_integer(DefRetries)},
-        {http_connections, list_to_integer(DefConns)},
-        {socket_options, DefSocketOptions},
-        {worker_batch_size, list_to_integer(DefBatchSize)},
-        {worker_processes, list_to_integer(DefWorkers)},
-        {use_checkpoints, list_to_existing_atom(UseCheckpoints)},
-        {checkpoint_interval, list_to_integer(DefCheckpointInterval)}
-    ])).
-
-
--spec convert_options([_]) -> [_].
-convert_options([])->
-    [];
-convert_options([{<<"cancel">>, V} | _R]) when not is_boolean(V)->
+    ConfigOptions = lists:foldl(fun({K, Default, ConversionFun}, Acc) ->
+        V = ConversionFun(config:get("replicator", K, Default)),
+        Acc#{list_to_binary(K) => V}
+    end, #{}, ?CONFIG_DEFAULTS),
+    maps:merge(ConfigOptions, Options).
+
+
+-spec convert_options(binary(), any(), #{}) -> #{}.
+convert_options(<<"cancel">>, V, _Acc) when not is_boolean(V)->
     throw({bad_request, <<"parameter `cancel` must be a boolean">>});
-convert_options([{<<"cancel">>, V} | R]) ->
-    [{cancel, V} | convert_options(R)];
-convert_options([{IdOpt, V} | R]) when IdOpt =:= <<"_local_id">>;
+convert_options(<<"cancel">>, V, Acc) ->
+    Acc#{<<"cancel">> => V};
+convert_options(IdOpt, V, Acc) when IdOpt =:= <<"_local_id">>;
         IdOpt =:= <<"replication_id">>; IdOpt =:= <<"id">> ->
-    [{id, couch_replicator_ids:convert(V)} | convert_options(R)];
-convert_options([{<<"create_target">>, V} | _R]) when not is_boolean(V)->
+    Acc#{<<"id">> => couch_replicator_ids:convert(V)};
+convert_options(<<"create_target">>, V, _Acc) when not is_boolean(V)->
     throw({bad_request, <<"parameter `create_target` must be a boolean">>});
-convert_options([{<<"create_target">>, V} | R]) ->
-    [{create_target, V} | convert_options(R)];
-convert_options([{<<"create_target_params">>, V} | _R]) when not is_tuple(V) ->
+convert_options(<<"create_target">>, V, Acc) ->
+    Acc#{<<"create_target">> => V};
+convert_options(<<"create_target_params">>, V, _Acc) when not is_tuple(V) ->
     throw({bad_request,
         <<"parameter `create_target_params` must be a JSON object">>});
-convert_options([{<<"create_target_params">>, V} | R]) ->
-    [{create_target_params, V} | convert_options(R)];
-convert_options([{<<"continuous">>, V} | _R]) when not is_boolean(V)->
+convert_options(<<"create_target_params">>, V, Acc) ->
+    Acc#{<<"create_target_params">> => V};
+convert_options(<<"continuous">>, V, Acc) when not is_boolean(V)->
     throw({bad_request, <<"parameter `continuous` must be a boolean">>});
-convert_options([{<<"continuous">>, V} | R]) ->
-    [{continuous, V} | convert_options(R)];
-convert_options([{<<"filter">>, V} | R]) ->
-    [{filter, V} | convert_options(R)];
-convert_options([{<<"query_params">>, V} | R]) ->
-    [{query_params, V} | convert_options(R)];
-convert_options([{<<"doc_ids">>, null} | R]) ->
-    convert_options(R);
-convert_options([{<<"doc_ids">>, V} | _R]) when not is_list(V) ->
+convert_options(<<"continuous">>, V, Acc) ->
+    Acc#{<<"continuous">> => V};
+convert_options(<<"filter">>, V, Acc) ->
+    Acc#{<<"filter">> => V};
+convert_options(<<"query_params">>, V, Acc) ->
+    Acc#{<<"query_params">> => V};
+convert_options(<<"doc_ids">>, null, Acc) ->
+    Acc;
+convert_options(<<"doc_ids">>, V, _Acc) when not is_list(V) ->
     throw({bad_request, <<"parameter `doc_ids` must be an array">>});
-convert_options([{<<"doc_ids">>, V} | R]) ->
+convert_options(<<"doc_ids">>, V, Acc) ->
     % Ensure same behaviour as old replicator: accept a list of percent
     % encoded doc IDs.
     DocIds = lists:usort([?l2b(couch_httpd:unquote(Id)) || Id <- V]),
-    [{doc_ids, DocIds} | convert_options(R)];
-convert_options([{<<"selector">>, V} | _R]) when not is_tuple(V) ->
+    Acc#{<<"doc_ids">> => DocIds};
+convert_options(<<"selector">>, V, _Acc) when not is_tuple(V) ->
     throw({bad_request, <<"parameter `selector` must be a JSON object">>});
-convert_options([{<<"selector">>, V} | R]) ->
-    [{selector, V} | convert_options(R)];
-convert_options([{<<"worker_processes">>, V} | R]) ->
-    [{worker_processes, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"worker_batch_size">>, V} | R]) ->
-    [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"http_connections">>, V} | R]) ->
-    [{http_connections, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"connection_timeout">>, V} | R]) ->
-    [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"retries_per_request">>, V} | R]) ->
-    [{retries, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"socket_options">>, V} | R]) ->
-    {ok, SocketOptions} = couch_util:parse_term(V),
-    [{socket_options, SocketOptions} | convert_options(R)];
-convert_options([{<<"since_seq">>, V} | R]) ->
-    [{since_seq, V} | convert_options(R)];
-convert_options([{<<"use_checkpoints">>, V} | R]) ->
-    [{use_checkpoints, V} | convert_options(R)];
-convert_options([{<<"checkpoint_interval">>, V} | R]) ->
-    [{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([_ | R]) -> % skip unknown option
-    convert_options(R).
-
-
--spec check_options([_]) -> [_].
+convert_options(<<"selector">>, V, Acc) ->
+    Acc#{<<"selector">> => V};
+convert_options(<<"worker_processes">>, V, Acc) ->
+    Acc#{<<"worker_processes">> => couch_util:to_integer(V)};
+convert_options(<<"worker_batch_size">>, V, Acc) ->
+    Acc#{<<"worker_batch_size">> => couch_util:to_integer(V)};
+convert_options(<<"http_connections">>, V, Acc) ->
+    Acc#{<<"http_connections">> => couch_util:to_integer(V)};
+convert_options(<<"connection_timeout">>, V, Acc) ->
+    Acc#{<<"connection_timeout">> => couch_util:to_integer(V)};
+convert_options(<<"retries_per_request">>, V, Acc) ->
+    Acc#{<<"retries">> => couch_util:to_integer(V)};
+convert_options(<<"socket_options">>, V, Acc) ->
+    Acc#{<<"socket_options">> => parse_sock_opts(V)};
+convert_options(<<"since_seq">>, V, Acc) ->
+    Acc#{<<"since_seq">> => V};
+convert_options(<<"use_checkpoints">>, V, Acc) when not is_boolean(V)->
+    throw({bad_request, <<"parameter `use_checkpoints` must be a boolean">>});
+convert_options(<<"use_checkpoints">>, V, Acc) ->
+    Acc#{<<"use_checkpoints">> => V};
+convert_options(<<"checkpoint_interval">>, V, Acc) ->
+    Acc#{<<"checkpoint_interval">>, couch_util:to_integer(V)};
+convert_options(_K, _V, Acc) -> % skip unknown option
+    Acc.
+
+
+-spec check_options(#{}) -> #{}.
 check_options(Options) ->
-    DocIds = lists:keyfind(doc_ids, 1, Options),
-    Filter = lists:keyfind(filter, 1, Options),
-    Selector = lists:keyfind(selector, 1, Options),
+    DocIds = maps:is_key(<<"doc_ids">>, Options),
+    Filter = maps:is_key(<<"filter">>, Options),
+    Selector = maps:is_key(<<"selector">>, Options),
     case {DocIds, Filter, Selector} of
         {false, false, false} -> Options;
         {false, false, _} -> Options;
@@ -553,66 +517,113 @@ check_options(Options) ->
     end.
 
 
--spec parse_proxy_params(binary() | [_]) -> [_].
-parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
-    parse_proxy_params(?b2l(ProxyUrl));
-parse_proxy_params([]) ->
-    [];
-parse_proxy_params(ProxyUrl) ->
+parse_sock_opts(V) ->
+    {ok, SocketOptions} = couch_util:parse_term(V),
+    lists:foldl(fun
+        ({K, V}, Acc) when is_atom(K) ->
+            case lists:member(K, ?VALID_SOCKET_OPTIONS) of
+                true -> Acc#{atom_to_binary(K) => V};
+                false -> Acc
+            end;
+        (_, Acc) ->
+            Acc
+    end, #{}, SocketOptions).
+
+
+-spec parse_proxy_params(binary() | #{}) -> #{}.
+parse_proxy_params(<<>>) ->
+    #{};
+parse_proxy_params(ProxyUrl0) when is_binary(ProxyUrl0)->
+    ProxyUrl = binary_to_list(ProxyUrl0),
     #url{
         host = Host,
         port = Port,
         username = User,
         password = Passwd,
-        protocol = Protocol
+        protocol = Protocol0
     } = ibrowse_lib:parse_url(ProxyUrl),
-    [
-        {proxy_protocol, Protocol},
-        {proxy_host, Host},
-        {proxy_port, Port}
-    ] ++ case is_list(User) andalso is_list(Passwd) of
+    Protocol = atom_to_binary(Protocol, utf8),
+    case lists:member(Protocol, [<<"http">>, <<"https">>, <<"socks5">>]) of
+        true ->
+            atom_to_binary(Protocol, utf8);
         false ->
-            [];
+            Error = <<"Unsupported proxy protocol", Protocol/binary>>,
+            throw({bad_request, Error})
+    end,
+    ProxyParams = #{
+        <<"proxy_url">> => ProxyUrl,
+        <<"proxy_protocol">> => Protocol,
+        <<"proxy_host">> => list_to_binary(Host),
+        <<"proxy_port">> => Port
+    #},
+    case is_list(User) andalso is_list(Passwd) of
         true ->
-            [{proxy_user, User}, {proxy_password, Passwd}]
-        end.
+            ProxyParams#{
+                <<"proxy_user">> => list_to_binary(User),
+                <<"proxy_password">> => list_to_binary(Passwd)
+            };
+        false ->
+            ProxyParams
+    end.
 
 
--spec ssl_params([_]) -> [_].
+-spec ssl_params(binary()) -> #{}.
 ssl_params(Url) ->
-    case ibrowse_lib:parse_url(Url) of
+    case ibrowse_lib:parse_url(binary_to_list(Url)) of
     #url{protocol = https} ->
         Depth = list_to_integer(
             config:get("replicator", "ssl_certificate_max_depth", "3")
         ),
         VerifyCerts = config:get("replicator", "verify_ssl_certificates"),
-        CertFile = config:get("replicator", "cert_file", undefined),
-        KeyFile = config:get("replicator", "key_file", undefined),
-        Password = config:get("replicator", "password", undefined),
-        SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts =:= "true")],
-        SslOpts1 = case CertFile /= undefined andalso KeyFile /= undefined of
+        CertFile = config:get("replicator", "cert_file", null),
+        KeyFile = config:get("replicator", "key_file", null),
+        Password = config:get("replicator", "password", null),
+        VerifySslOptions = ssl_verify_options(VerifyCerts =:= "true"),
+        SslOpts = maps:merge(VerifySslOptions, #{<<"depth">> => Depth}),
+        SslOpts1 = case CertFile /= null andalso KeyFile /= null of
             true ->
-                case Password of
-                    undefined ->
-                        [{certfile, CertFile}, {keyfile, KeyFile}] ++ SslOpts;
+                CertFileOpts = case Password of
+                    null ->
+                        #{
+                            <<"certfile">> => list_to_binary(CertFile),
+                            <<"keyfile">> => list_to_binary(KeyFile)
+                        };
                     _ ->
-                        [{certfile, CertFile}, {keyfile, KeyFile},
-                            {password, Password}] ++ SslOpts
-                end;
-            false -> SslOpts
+                        #{
+                            <<"certfile">> => list_to_binary(CertFile),
+                            <<"keyfile">> => list_to_binary(KeyFile),
+                            <<"password">> => list_to_binary(Password)
+                        }
+                end,
+                maps:merge(SslOpts, CertFileOpts)
+            false ->
+                SslOpts
         end,
-        [{is_ssl, true}, {ssl_options, SslOpts1}];
+        #{<<"is_ssl">> => true, <<"ssl_options">> => SslOpts1};
     #url{protocol = http} ->
-        []
+        #{}
     end.
 
 
 -spec ssl_verify_options(true | false) -> [_].
 ssl_verify_options(true) ->
-    CAFile = config:get("replicator", "ssl_trusted_certificates_file"),
-    [{verify, verify_peer}, {cacertfile, CAFile}];
+    case config:get("replicator", "ssl_trusted_certificates_file", undefined) of
+        undefined ->
+            #{
+                <<"verify">> => <<"verify_peer">>,
+                <<"cacertfile">> => null
+            };
+        CAFile when is_list(CAFile) ->
+            #{
+                <<"verify">> => <<"verify_peer">>,
+                <<"cacertfile">> => list_to_binary(CAFile)
+            }
+    end;
+
 ssl_verify_options(false) ->
-    [{verify, verify_none}].
+    #{
+        <<"verify">> => <<"verify_none">>
+    }.
 
 
 -spec before_doc_update(#doc{}, Db::any(), couch_db:update_type()) -> #doc{}.
@@ -622,7 +633,7 @@ before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
     #user_ctx{
        roles = Roles,
        name = Name
-    } = couch_db:get_user_ctx(Db),
+    } = fabric2_db:get_user_ctx(Db),
     case lists:member(<<"_replicator">>, Roles) of
     true ->
         Doc;
@@ -633,7 +644,7 @@ before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
         Name ->
             Doc;
         Other ->
-            case (catch couch_db:check_is_admin(Db)) of
+            case (catch fabric2_db:check_is_admin(Db)) of
             ok when Other =:= null ->
                 Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
             ok ->
@@ -650,8 +661,8 @@ before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
 after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
     Doc;
 after_doc_read(#doc{body = {Body}} = Doc, Db) ->
-    #user_ctx{name = Name} = couch_db:get_user_ctx(Db),
-    case (catch couch_db:check_is_admin(Db)) of
+    #user_ctx{name = Name} = fabric2_db:get_user_ctx(Db),
+    case (catch fabric2_db:check_is_admin(Db)) of
     ok ->
         Doc;
     _ ->
@@ -659,16 +670,15 @@ after_doc_read(#doc{body = {Body}} = Doc, Db) ->
         Name ->
             Doc;
         _Other ->
-            Source = strip_credentials(couch_util:get_value(<<"source">>,
-Body)),
-            Target = strip_credentials(couch_util:get_value(<<"target">>,
-Body)),
+            Source0 = couch_util:get_value(<<"source">>, Body),
+            Target0 = couch_util:get_value(<<"target">>, Body),
+            Source = strip_credentials(Source0),
+            Target = strip_credentials(Target0),
             NewBody0 = ?replace(Body, <<"source">>, Source),
             NewBody = ?replace(NewBody0, <<"target">>, Target),
             #doc{revs = {Pos, [_ | Revs]}} = Doc,
             NewDoc = Doc#doc{body = {NewBody}, revs = {Pos - 1, Revs}},
-            NewRevId = couch_db:new_revid(NewDoc),
-            NewDoc#doc{revs = {Pos, [NewRevId | Revs]}}
+            fabric2_db:new_revid(NewDoc)
         end
     end.
 
@@ -779,27 +789,24 @@ check_strip_credentials_test() ->
 
 setup() ->
     DbName = ?tempdb(),
-    {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
-    ok = couch_db:close(Db),
-    create_vdu(DbName),
+    {ok, Db} = fabric2_db:create(DbName, [?ADMIN_CTX]),
+    create_vdu(Db),
     DbName.
 
 
 teardown(DbName) when is_binary(DbName) ->
-    couch_server:delete(DbName, [?ADMIN_CTX]),
+    fabric2_db:delete(DbName, [?ADMIN_CTX]),
     ok.
 
 
-create_vdu(DbName) ->
-    couch_util:with_db(DbName, fun(Db) ->
-        VduFun = <<"function(newdoc, olddoc, userctx) {throw({'forbidden':'fail'})}">>,
-        Doc = #doc{
-            id = <<"_design/vdu">>,
-            body = {[{<<"validate_doc_update">>, VduFun}]}
-        },
-        {ok, _} = couch_db:update_docs(Db, [Doc]),
-        couch_db:ensure_full_commit(Db)
-    end).
+create_vdu(Db) ->
+    VduFun = <<"function(newdoc, olddoc, userctx) {throw({'forbidden':'fail'})}">>,
+    Doc = #doc{
+        id = <<"_design/vdu">>,
+        body = {[{<<"validate_doc_update">>, VduFun}]}
+    },
+    {ok, _} = fabric2_db:update_doc(Db, [Doc]),
+    ok.
 
 
 update_replicator_doc_with_bad_vdu_test_() ->
diff --git a/src/couch_replicator/src/couch_replicator_filters.erl b/src/couch_replicator/src/couch_replicator_filters.erl
index c898000..b14ea34 100644
--- a/src/couch_replicator/src/couch_replicator_filters.erl
+++ b/src/couch_replicator/src/couch_replicator_filters.erl
@@ -88,22 +88,24 @@ fetch(DDocName, FilterName, Source) ->
 
 
 % Get replication type and view (if any) from replication document props
--spec view_type([_], [_]) ->
-    {view, {binary(), binary()}} | {db, nil} | {error, binary()}.
-view_type(Props, Options) ->
-    case couch_util:get_value(<<"filter">>, Props) of
-        <<"_view">> ->
-            {QP}  = couch_util:get_value(query_params, Options, {[]}),
-            ViewParam = couch_util:get_value(<<"view">>, QP),
-            case re:split(ViewParam, <<"/">>) of
-                [DName, ViewName] ->
-                    {view, {<< "_design/", DName/binary >>, ViewName}};
-                _ ->
-                    {error, <<"Invalid `view` parameter.">>}
-            end;
+-spec view_type(#{}, [_]) ->
+    {binary(), #{}} | {error, binary()}.
+view_type(#{<<"filter">> := <<"_view">>}, Options) ->
+    {QP}  = couch_util:get_value(query_params, Options, {[]}),
+    ViewParam = couch_util:get_value(<<"view">>, QP),
+    case re:split(ViewParam, <<"/">>) of
+        [DName, ViewName] ->
+            DDocMap = #{
+                <<"ddoc">> => <<"_design/",DName/binary>>,
+                <<"view">> => ViewName
+            },
+            {<<"view">>, DDocMap};
         _ ->
-            {db, nil}
-    end.
+            {error, <<"Invalid `view` parameter.">>}
+    end;
+
+view_type(#{}, [_] = Options) ->
+    {<<"db">>, #{}}.
 
 
 % Private functions
diff --git a/src/couch_replicator/src/couch_replicator_ids.erl b/src/couch_replicator/src/couch_replicator_ids.erl
index 04e71c3..a3f6220 100644
--- a/src/couch_replicator/src/couch_replicator_ids.erl
+++ b/src/couch_replicator/src/couch_replicator_ids.erl
@@ -30,28 +30,29 @@
 %  {filter_fetch_error, Error} exception.
 %
 
-replication_id(#rep{options = Options} = Rep) ->
+replication_id(#{<<"options">> := Options} = Rep) ->
     BaseId = replication_id(Rep, ?REP_ID_VERSION),
-    {BaseId, maybe_append_options([continuous, create_target], Options)}.
+    UseOpts = [<<"continuous">>, <<"create_target">>]
+    {BaseId, maybe_append_options(UseOpts, Options)}.
 
 
 % Versioned clauses for generating replication IDs.
 % If a change is made to how replications are identified,
 % please add a new clause and increase ?REP_ID_VERSION.
 
-replication_id(#rep{} = Rep, 4) ->
+replication_id(#{<<"source">> := Src, <<"target">> := Tgt} = Rep, 4) ->
     UUID = couch_server:get_uuid(),
-    SrcInfo = get_v4_endpoint(Rep#rep.source),
-    TgtInfo = get_v4_endpoint(Rep#rep.target),
+    SrcInfo = get_v4_endpoint(Src),
+    TgtInfo = get_v4_endpoint(Tgt),
     maybe_append_filters([UUID, SrcInfo, TgtInfo], Rep);
 
-replication_id(#rep{} = Rep, 3) ->
+replication_id(#{<<"source">> := Src0, <<"target">> := Tgt0} = Rep, 3) ->
     UUID = couch_server:get_uuid(),
-    Src = get_rep_endpoint(Rep#rep.source),
-    Tgt = get_rep_endpoint(Rep#rep.target),
+    Src = get_rep_endpoint(Src0),
+    Tgt = get_rep_endpoint(Tgt0),
     maybe_append_filters([UUID, Src, Tgt], Rep);
 
-replication_id(#rep{} = Rep, 2) ->
+replication_id(#{<<"source">> := Src0, <<"target">> := Tgt0} = Rep, 2) ->
     {ok, HostName} = inet:gethostname(),
     Port = case (catch mochiweb_socket_server:get(couch_httpd, port)) of
     P when is_number(P) ->
@@ -64,14 +65,14 @@ replication_id(#rep{} = Rep, 2) ->
         % ... mochiweb_socket_server:get(https, port)
         list_to_integer(config:get("httpd", "port", "5984"))
     end,
-    Src = get_rep_endpoint(Rep#rep.source),
-    Tgt = get_rep_endpoint(Rep#rep.target),
+    Src = get_rep_endpoint(Src0),
+    Tgt = get_rep_endpoint(Tgt0),
     maybe_append_filters([HostName, Port, Src, Tgt], Rep);
 
-replication_id(#rep{} = Rep, 1) ->
+replication_id(#{<<"source">> := Src0, <<"target">> := Tgt0} = Rep, 1) ->
     {ok, HostName} = inet:gethostname(),
-    Src = get_rep_endpoint(Rep#rep.source),
-    Tgt = get_rep_endpoint(Rep#rep.target),
+    Src = get_rep_endpoint(Src0),
+    Tgt = get_rep_endpoint(Tgt0),
     maybe_append_filters([HostName, Src, Tgt], Rep).
 
 
@@ -83,15 +84,23 @@ convert(Id0) when is_binary(Id0) ->
     % the URL path. So undo the incorrect parsing here to avoid forcing
     % users to url encode + characters.
     Id = binary:replace(Id0, <<" ">>, <<"+">>, [global]),
-    lists:splitwith(fun(Char) -> Char =/= $+ end, ?b2l(Id));
-convert({BaseId, Ext} = Id) when is_list(BaseId), is_list(Ext) ->
+    case binary:split(Id, <<"+">>) of
+        [BaseId, Ext] -> {BaseId, Ext};
+        [BaseId] -> {BaseId, <<>>}
+    end
+convert({BaseId, Ext}) when is_list(BaseId), is_list(Ext) ->
+    {list_to_binary(BaseId), list_to_binary(Ext)};
+convert({BaseId, Ext} = Id) when is_binary(BaseId), is_binary(Ext) ->
     Id.
 
 
 % Private functions
 
-maybe_append_filters(Base,
-        #rep{source = Source, options = Options}) ->
+maybe_append_filters(Base, #{} = Rep) ->
+    #{
+        <<"source">> := Source,
+        <<"options">> := Options
+    } = Rep,
     Base2 = Base ++
         case couch_replicator_filters:parse(Options) of
         {ok, nil} ->
@@ -112,7 +121,8 @@ maybe_append_filters(Base,
         {error, FilterParseError} ->
             throw({error, FilterParseError})
         end,
-    couch_util:to_hex(couch_hash:md5_hash(term_to_binary(Base2))).
+    Res = couch_util:to_hex(couch_hash:md5_hash(term_to_binary(Base2))),
+    list_to_binary(Res).
 
 
 maybe_append_options(Options, RepOptions) ->
@@ -127,12 +137,19 @@ maybe_append_options(Options, RepOptions) ->
     end, [], Options).
 
 
-get_rep_endpoint(#httpdb{url=Url, headers=Headers}) ->
+get_rep_endpoint(#{<<"url">> := Url0, <<"headers">> := Headers0}) ->
+    Url = binary_to_list(Url0),
+    % We turn headers into a proplist of string() KVs to calculate
+    % the same replication ID as CouchDB 2.x
+    Headers1 = maps:fold(fun(K, V, Acc) ->
+        [{binary_to_list(K), binary_to_list(V)} | Acc]
+    end, [], Header0),
+    Headers2 = lists:keysort(1, Headers1),
     DefaultHeaders = (#httpdb{})#httpdb.headers,
-    {remote, Url, Headers -- DefaultHeaders}.
+    {remote, Url, Headers2 -- DefaultHeaders}.
 
 
-get_v4_endpoint(#httpdb{} = HttpDb) ->
+get_v4_endpoint(#{} = HttpDb) ->
     {remote, Url, Headers} = get_rep_endpoint(HttpDb),
     {{UserFromHeaders, _}, HeadersWithoutBasicAuth} =
         couch_replicator_utils:remove_basic_auth_from_headers(Headers),
@@ -141,7 +158,6 @@ get_v4_endpoint(#httpdb{} = HttpDb) ->
     OAuth = undefined, % Keep this to ensure checkpoints don't change
     {remote, User, Host, NonDefaultPort, Path, HeadersWithoutBasicAuth, OAuth}.
 
-
 pick_defined_value(Values) ->
     case [V || V <- Values, V /= undefined] of
         [] ->
diff --git a/src/couch_replicator/src/couch_replicator_notifier.erl b/src/couch_replicator/src/couch_replicator_notifier.erl
deleted file mode 100644
index f7640a3..0000000
--- a/src/couch_replicator/src/couch_replicator_notifier.erl
+++ /dev/null
@@ -1,58 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_notifier).
-
--behaviour(gen_event).
--vsn(1).
-
-% public API
--export([start_link/1, stop/1, notify/1]).
-
-% gen_event callbacks
--export([init/1, terminate/2, code_change/3]).
--export([handle_event/2, handle_call/2, handle_info/2]).
-
--include_lib("couch/include/couch_db.hrl").
-
-start_link(FunAcc) ->
-    couch_event_sup:start_link(couch_replication,
-        {couch_replicator_notifier, make_ref()}, FunAcc).
-
-notify(Event) ->
-    gen_event:notify(couch_replication, Event).
-
-stop(Pid) ->
-    couch_event_sup:stop(Pid).
-
-
-init(FunAcc) ->
-    {ok, FunAcc}.
-
-terminate(_Reason, _State) ->
-    ok.
-
-handle_event(Event, Fun) when is_function(Fun, 1) ->
-    Fun(Event),
-    {ok, Fun};
-handle_event(Event, {Fun, Acc}) when is_function(Fun, 2) ->
-    Acc2 = Fun(Event, Acc),
-    {ok, {Fun, Acc2}}.
-
-handle_call(_Msg, State) ->
-    {ok, ok, State}.
-
-handle_info(_Msg, State) ->
-    {ok, State}.
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 565a2bd..e8ddc84 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -15,7 +15,7 @@
 -behaviour(gen_server).
 
 -export([
-   start_link/1
+   start_link/3
 ]).
 
 -export([
@@ -39,17 +39,16 @@
     to_binary/1
 ]).
 
--import(couch_replicator_utils, [
-    pp_rep_id/1
-]).
-
 
 -define(LOWEST_SEQ, 0).
 -define(DEFAULT_CHECKPOINT_INTERVAL, 30000).
 -define(STARTUP_JITTER_DEFAULT, 5000).
 
 -record(rep_state, {
-    rep_details,
+    job,
+    job_data,
+    id,
+    base_id,
     source_name,
     target_name,
     source,
@@ -73,37 +72,36 @@
     workers,
     stats = couch_replicator_stats:new(),
     session_id,
-    source_monitor = nil,
-    target_monitor = nil,
     source_seq = nil,
     use_checkpoints = true,
     checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL,
     type = db,
-    view = nil
+    view = nil,
+    user = null,
+    options = #{}
 }).
 
 
-start_link(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
-    RepChildId = BaseId ++ Ext,
-    Source = couch_replicator_api_wrap:db_uri(Src),
-    Target = couch_replicator_api_wrap:db_uri(Tgt),
-    ServerName = {global, {?MODULE, Rep#rep.id}},
-
-    case gen_server:start_link(ServerName, ?MODULE, Rep, []) of
+start_link(#{] = Job, #{} = JobData) ->
+    case gen_server:start_link(?MODULE, {Job, JobData}, []) of
         {ok, Pid} ->
             {ok, Pid};
         {error, Reason} ->
-            couch_log:warning("failed to start replication `~s` (`~s` -> `~s`)",
-                           [RepChildId, Source, Target]),
+            #{<<"rep">> := Rep} = JobData,
+            {<<"id">> := Id, <<"source">> := Src, <<"target">> := Ttg} = Rep,
+            Source = couch_replicator_api_wrap:db_uri(Src),
+            Target = couch_replicator_api_wrap:db_uri(Tgt),
+            ErrMsg = "failed to start replication `~s` (`~s` -> `~s`)",
+            couch_log:warning(ErrMsg, [RepId, Source, Target]),
             {error, Reason}
     end.
 
 
-init(InitArgs) ->
-    {ok, InitArgs, 0}.
+init({#{} = Job, #{} = JobData}) ->
+    {ok, {Job, JobData}, 0}.
 
 
-do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
+do_init(#{} = Job, #{} = JobData) ->
     process_flag(trap_exit, true),
 
     timer:sleep(startup_jitter()),
@@ -115,8 +113,12 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
         target_name = TargetName,
         start_seq = {_Ts, StartSeq},
         highest_seq_done = {_, HighestSeq},
-        checkpoint_interval = CheckpointInterval
-    } = State = init_state(Rep),
+        checkpoint_interval = CheckpointInterval,
+        user = User,
+        options = Options,
+        doc_id = DocId,
+        db_name = DbName
+    } = State = init_state(Job, JobData),
 
     NumWorkers = get_value(worker_processes, Options),
     BatchSize = get_value(worker_batch_size, Options),
@@ -147,10 +149,10 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
 
     couch_task_status:add_task([
         {type, replication},
-        {user, UserCtx#user_ctx.name},
-        {replication_id, ?l2b(BaseId ++ Ext)},
-        {database, Rep#rep.db_name},
-        {doc_id, Rep#rep.doc_id},
+        {user, User},
+        {replication_id, State#rep_state.id},
+        {database, DbName},
+        {doc_id, DocId},
         {source, ?l2b(SourceName)},
         {target, ?l2b(TargetName)},
         {continuous, get_value(continuous, Options, false)},
@@ -159,16 +161,6 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
     ] ++ rep_stats(State)),
     couch_task_status:set_update_frequency(1000),
 
-    % Until OTP R14B03:
-    %
-    % Restarting a temporary supervised child implies that the original arguments
-    % (#rep{} record) specified in the MFA component of the supervisor
-    % child spec will always be used whenever the child is restarted.
-    % This implies the same replication performance tunning parameters will
-    % always be used. The solution is to delete the child spec (see
-    % cancel_replication/1) and then start the replication again, but this is
-    % unfortunately not immune to race conditions.
-
     log_replication_start(State),
     couch_log:debug("Worker pids are: ~p", [Workers]),
 
@@ -222,7 +214,6 @@ handle_call({report_seq_done, Seq, StatsInc}, From,
     update_task(NewState),
     {noreply, NewState}.
 
-
 handle_cast(checkpoint, State) ->
     case do_checkpoint(State) of
     {ok, NewState} ->
@@ -242,14 +233,6 @@ handle_cast({report_seq, Seq},
 handle_info(shutdown, St) ->
     {stop, shutdown, St};
 
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
-    couch_log:error("Source database is down. Reason: ~p", [Why]),
-    {stop, source_db_down, St};
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
-    couch_log:error("Target database is down. Reason: ~p", [Why]),
-    {stop, target_db_down, St};
-
 handle_info({'EXIT', Pid, max_backoff}, State) ->
     couch_log:error("Max backoff reached child process ~p", [Pid]),
     {stop, {shutdown, max_backoff}, State};
@@ -308,9 +291,10 @@ handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
         {stop, {worker_died, Pid, Reason}, State2}
     end;
 
-handle_info(timeout, InitArgs) ->
-    try do_init(InitArgs) of {ok, State} ->
-        {noreply, State}
+handle_info(timeout, {#{} = Job, #{} = JobData} = InitArgs) ->
+    try do_init(Job, JobData) of
+        {ok, State} ->
+            {noreply, State}
     catch
         exit:{http_request_failed, _, _, max_backoff} ->
             {stop, {shutdown, max_backoff}, {error, InitArgs}};
@@ -325,13 +309,12 @@ handle_info(timeout, InitArgs) ->
     end.
 
 
-terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep,
-    checkpoint_history = CheckpointHistory} = State) ->
-    terminate_cleanup(State),
-    couch_replicator_notifier:notify({finished, RepId, CheckpointHistory}),
-    doc_update_completed(Rep, rep_stats(State));
+terminate(normal, #rep_state{} = State) ->
+    % Note: when terminating `normal`, the job was already marked as finished.
+    % if that fails then we'd end up in the error terminate clause
+    terminate_cleanup(State).
 
-terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
+terminate(shutdown, #rep_state{id = RepId} = State) ->
     % Replication stopped via _scheduler_sup:terminate_child/1, which can be
     % occur during regular scheduler operation or when job is removed from
     % the scheduler.
@@ -343,53 +326,57 @@ terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
             couch_log:error(LogMsg, [?MODULE, RepId, Error]),
             State
     end,
-    couch_replicator_notifier:notify({stopped, RepId, <<"stopped">>}),
+    finish_couch_job(State1, <<"stopped">>, null),
     terminate_cleanup(State1);
 
-terminate({shutdown, max_backoff}, {error, InitArgs}) ->
-    #rep{id = {BaseId, Ext} = RepId} = InitArgs,
+terminate({shutdown, max_backoff}, {error, {#{} = Job, #{} = JobData}}) ->
+    % Here we handle the case when replication fails during initialization.
+    % That is before the #rep_state{} is even built.
+    #{<<"rep">> := #{<<"id">> := RepId}} = JobData,
     couch_stats:increment_counter([couch_replicator, failed_starts]),
-    couch_log:warning("Replication `~s` reached max backoff ", [BaseId ++ Ext]),
-    couch_replicator_notifier:notify({error, RepId, max_backoff});
-
-terminate({shutdown, {error, Error}}, {error, Class, Stack, InitArgs}) ->
-    #rep{
-        id = {BaseId, Ext} = RepId,
-        source = Source0,
-        target = Target0,
-        doc_id = DocId,
-        db_name = DbName
-    } = InitArgs,
+    couch_log:warning("Replication `~s` reached max backoff ", [RepId]),
+    finish_couch_job(Job, JobData, <<"error">>, max_backoff);
+
+terminate({shutdown, {error, Error}}, {error, Class, Stack, {Job, JobData}}) ->
+    % Here we handle the case when replication fails during initialization.
+    #{<<"rep">> := Rep} = JobData,
+    #{
+       <<"id">> := Id,
+       <<"source">> := Source0,
+       <<"target">> := Target0,
+       <<"doc_id">> := DocId,
+       <<"db_name">> := DbName
+    } = Rep,
     Source = couch_replicator_api_wrap:db_uri(Source0),
     Target = couch_replicator_api_wrap:db_uri(Target0),
-    RepIdStr = BaseId ++ Ext,
     Msg = "~p:~p: Replication ~s failed to start ~p -> ~p doc ~p:~p stack:~p",
-    couch_log:error(Msg, [Class, Error, RepIdStr, Source, Target, DbName,
+    couch_log:error(Msg, [Class, Error, RepId, Source, Target, DbName,
         DocId, Stack]),
     couch_stats:increment_counter([couch_replicator, failed_starts]),
-    couch_replicator_notifier:notify({error, RepId, Error});
+    finish_couch_job(Job, JobData, <<"error">>, Error);
 
-terminate({shutdown, max_backoff}, State) ->
+terminate({shutdown, max_backoff}, #rep_state{} = State) ->
     #rep_state{
+        id = RepId,
         source_name = Source,
         target_name = Target,
-        rep_details = #rep{id = {BaseId, Ext} = RepId}
     } = State,
     couch_log:error("Replication `~s` (`~s` -> `~s`) reached max backoff",
-        [BaseId ++ Ext, Source, Target]),
+        [RepId, Source, Target]),
     terminate_cleanup(State),
-    couch_replicator_notifier:notify({error, RepId, max_backoff});
+    finish_couch_job(State, <<"error">>, max_backoff);
 
 terminate(Reason, State) ->
-#rep_state{
+    #rep_state{
+        id = RepId,
         source_name = Source,
         target_name = Target,
-        rep_details = #rep{id = {BaseId, Ext} = RepId}
     } = State,
     couch_log:error("Replication `~s` (`~s` -> `~s`) failed: ~s",
-        [BaseId ++ Ext, Source, Target, to_binary(Reason)]),
+        [RepId, Source, Target, to_binary(Reason)]),
     terminate_cleanup(State),
-    couch_replicator_notifier:notify({error, RepId, Reason}).
+    finish_couch_job(State, <<"error">>, Reason).
+
 
 terminate_cleanup(State) ->
     update_task(State),
@@ -403,22 +390,19 @@ code_change(_OldVsn, #rep_state{}=State, _Extra) ->
 
 format_status(_Opt, [_PDict, State]) ->
     #rep_state{
+       id = Id,
        source = Source,
        target = Target,
-       rep_details = RepDetails,
        start_seq = StartSeq,
        source_seq = SourceSeq,
        committed_seq = CommitedSeq,
        current_through_seq = ThroughSeq,
        highest_seq_done = HighestSeqDone,
-       session_id = SessionId
-    } = state_strip_creds(State),
-    #rep{
-       id = RepId,
-       options = Options,
+       session_id = SessionId,
        doc_id = DocId,
-       db_name = DbName
-    } = RepDetails,
+       db_name = DbName,
+       options = Options
+    } = state_strip_creds(State),
     [
         {rep_id, RepId},
         {source, couch_replicator_api_wrap:db_uri(Source)},
@@ -462,73 +446,108 @@ httpdb_strip_creds(LocalDb) ->
     LocalDb.
 
 
-rep_strip_creds(#rep{source = Source, target = Target} = Rep) ->
-    Rep#rep{
-        source = httpdb_strip_creds(Source),
-        target = httpdb_strip_creds(Target)
-    }.
-
-
-state_strip_creds(#rep_state{rep_details = Rep, source = Source, target = Target} = State) ->
-    % #rep_state contains the source and target at the top level and also
-    % in the nested #rep_details record
+state_strip_creds(#rep_state{source = Source, target = Target} = State) ->
     State#rep_state{
-        rep_details = rep_strip_creds(Rep),
         source = httpdb_strip_creds(Source),
         target = httpdb_strip_creds(Target)
     }.
 
 
-adjust_maxconn(Src = #httpdb{http_connections = 1}, RepId) ->
+adjust_maxconn(Src = #{<<"http_connections">> : = 1}, RepId) ->
     Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p",
     couch_log:notice(Msg, [RepId]),
-    Src#httpdb{http_connections = 2};
+    Src#{<<"http_connections">> := 2};
 adjust_maxconn(Src, _RepId) ->
     Src.
 
 
--spec doc_update_triggered(#rep{}) -> ok.
-doc_update_triggered(#rep{db_name = null}) ->
+-spec doc_update_triggered(#rep_state{}) -> ok.
+doc_update_triggered(#rep_state{db_name = null}) ->
     ok;
-doc_update_triggered(#rep{id = RepId, doc_id = DocId} = Rep) ->
+doc_update_triggered(#rep_state{} = State) ->
+    #rep_state{id = Id, doc_id = DocId, db_name = DbName} = State,
     case couch_replicator_doc_processor:update_docs() of
         true ->
-            couch_replicator_docs:update_triggered(Rep, RepId);
+            couch_replicator_docs:update_triggered(Id, DocId, DbName);
         false ->
             ok
     end,
-    couch_log:notice("Document `~s` triggered replication `~s`",
-        [DocId, pp_rep_id(RepId)]),
+    couch_log:notice("Document `~s` triggered replication `~s`", [DocId, Id]),
     ok.
 
 
--spec doc_update_completed(#rep{}, list()) -> ok.
-doc_update_completed(#rep{db_name = null}, _Stats) ->
+-spec doc_update_completed(#rep_state{}) -> ok.
+doc_update_completed(#rep_state{db_name = null}) ->
     ok;
-doc_update_completed(#rep{id = RepId, doc_id = DocId, db_name = DbName,
-    start_time = StartTime}, Stats0) ->
-    Stats = Stats0 ++ [{start_time, couch_replicator_utils:iso8601(StartTime)}],
+doc_update_completed(#rep_state{} = State) ->
+    #rep_state{
+        id = Id,
+        doc_id = DocId,
+        db_name = DbName,
+        start_time = Start,
+        stats = Stats0
+    } = State,
+    Stats = Stats0 ++ [{start_time, couch_replicator_utils:iso8601(Start)}],
     couch_replicator_docs:update_doc_completed(DbName, DocId, Stats),
-    couch_log:notice("Replication `~s` completed (triggered by `~s`)",
-        [pp_rep_id(RepId), DocId]),
+    couch_log:notice("Replication `~s` completed (triggered by `~s:~s`)",
+        [Id, DbName, DocId]),
     ok.
 
 
 do_last_checkpoint(#rep_state{seqs_in_progress = [],
     highest_seq_done = {_Ts, ?LOWEST_SEQ}} = State) ->
-    {stop, normal, cancel_timer(State)};
+    History = State#rep_state.checkoint_history,
+    Result = case finish_couch_job(State, <<"completed">>, History) of
+        ok -> normal;
+        {error, _} = Error -> Error
+    end,
+    {stop, Result, cancel_timer(State)};
 do_last_checkpoint(#rep_state{seqs_in_progress = [],
     highest_seq_done = Seq} = State) ->
     case do_checkpoint(State#rep_state{current_through_seq = Seq}) of
     {ok, NewState} ->
         couch_stats:increment_counter([couch_replicator, checkpoints, success]),
-        {stop, normal, cancel_timer(NewState)};
+        History = NewState#rep_state.checkpoint_history,
+        Result = case finish_couch_job(NewState, <<"completed">>, History) of
+            ok -> normal;
+            {error, _} = Error -> Error
+        end,
+        {stop, Result, cancel_timer(NewState)};
     Error ->
         couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
         {stop, Error, State}
     end.
 
 
+finish_couch_job(#rep_state{} = State, FinishedState, Result) ->
+    #rep_state{job = Job, job_data = Jobdata} = State,
+    finish_couch_job(Job, JobData, FinishedState, Result).
+
+
+finish_couch_job(#{} = Job, #{} = JobData, FinishState, Result0) ->
+    #{<<"rep">> := #{<<"id">> := RepId}} = JobData,
+    case Result of
+        null -> null;
+        #{} -> Result0;
+        <<_/binary>> -> Result0;
+        Atom when is_atom(Atom) -> atom_to_binary(Atom, utf8)
+        Other -> couch_replicator_utils:rep_error_to_binary(Result0)
+    end,
+    JobData= JobData0#{
+        <<"finished_state">> => FinishState,
+        <<"finished_result">> => Result
+    },
+    case couch_jobs:finish(undefined, Job, JobData) of
+        ok ->
+            doc_update_completed(State),
+            ok;
+        {error, Error} ->
+            Msg = "Replication ~s job could not finish. Error:~p",
+            couch_log:error(Msg, [RepId, Error]),
+            {error, Error}
+    end.
+
+
 start_timer(State) ->
     After = State#rep_state.checkpoint_interval,
     case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of
@@ -547,21 +566,36 @@ cancel_timer(#rep_state{timer = Timer} = State) ->
     State#rep_state{timer = nil}.
 
 
-init_state(Rep) ->
-    #rep{
-        id = {BaseId, _Ext},
-        source = Src0, target = Tgt,
-        options = Options,
-        type = Type, view = View,
-        start_time = StartTime,
-        stats = Stats
+init_state(#{} = Job, #{<<"rep">> =: Rep}} = JobData) ->
+    #{
+        <<"id">> := Id,
+        <<"base_id">> := BaseId,
+        <<"source">> := Src0,
+        <<"target">> := Tgt,
+        <<"type">> := Type,
+        <<"view">> := View,
+        <<"start_time">> := StartTime,
+        <<"stats">> := Stats,
+        <<"options">> := OptionsMap,
+        <<"user_ctx">> := UserCtx,
+        <<"db_name">> := DbName,
+        <<"doc_id">> := DocId,
     } = Rep,
+
+    Options = maps:fold(fun(K, V, Acc) ->
+        [{binary_to_atom(K, utf8), V} | Acc]
+    end, [], OptionsMap),
+
     % Adjust minimum number of http source connections to 2 to avoid deadlock
     Src = adjust_maxconn(Src0, BaseId),
     {ok, Source} = couch_replicator_api_wrap:db_open(Src),
     {CreateTargetParams} = get_value(create_target_params, Options, {[]}),
     {ok, Target} = couch_replicator_api_wrap:db_open(Tgt,
-        get_value(create_target, Options, false), CreateTargetParams),
+
+    CreateTgt = get_value(create_target, Options, false),
+    CreateParams = maps:to_list(get_value(create_target_params, Options, #{}),
+    {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, UserCtx, CreateTgt,
+        CreateParams),
 
     {ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source),
     {ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target),
@@ -576,7 +610,10 @@ init_state(Rep) ->
 
     #doc{body={CheckpointHistory}} = SourceLog,
     State = #rep_state{
-        rep_details = Rep,
+        job = Job,
+        job_data = JobData,
+        id = Id,
+        base_id = BaseId,
         source_name = couch_replicator_api_wrap:db_uri(Source),
         target_name = couch_replicator_api_wrap:db_uri(Target),
         source = Source,
@@ -592,28 +629,27 @@ init_state(Rep) ->
         src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
         tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
         session_id = couch_uuids:random(),
-        source_monitor = db_monitor(Source),
-        target_monitor = db_monitor(Target),
         source_seq = SourceSeq,
-        use_checkpoints = get_value(use_checkpoints, Options, true),
-        checkpoint_interval = get_value(checkpoint_interval, Options,
-                                        ?DEFAULT_CHECKPOINT_INTERVAL),
+        use_checkpoints = get_value(use_checkpoints, Options),
+        checkpoint_interval = get_value(checkpoint_interval, Options),
         type = Type,
         view = View,
         stats = Stats
+        doc_id = DocId,
+        db_name = DbName
     },
     State#rep_state{timer = start_timer(State)}.
 
 
-find_and_migrate_logs(DbList, #rep{id = {BaseId, _}} = Rep) ->
+find_and_migrate_logs(DbList, #{<<"base_id">> := BaseId} = Rep) ->
     LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId),
-    fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, Rep, []).
+    fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, State, []).
 
 
 fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) ->
     lists:reverse(Acc);
 
-fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
+fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, #{} = Rep, Acc) ->
     case couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body]) of
     {error, <<"not_found">>} when Vsn > 1 ->
         OldRepId = couch_replicator_utils:replication_id(Rep, Vsn - 1),
@@ -633,8 +669,8 @@ fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
     end.
 
 
-maybe_save_migrated_log(Rep, Db, #doc{} = Doc, OldId) ->
-    case get_value(use_checkpoints, Rep#rep.options, true) of
+maybe_save_migrated_log(#{<<"options">> = Options}, Db, #doc{} = Doc, OldId) ->
+    case maps:get(<<"use_checkpoints">>, Options) of
         true ->
             update_checkpoint(Db, Doc),
             Msg = "Migrated replication checkpoint. Db:~p ~p -> ~p",
@@ -697,7 +733,7 @@ do_checkpoint(State) ->
         src_starttime = SrcInstanceStartTime,
         tgt_starttime = TgtInstanceStartTime,
         stats = Stats,
-        rep_details = #rep{options = Options},
+        options = Options,
         session_id = SessionId
     } = State,
     case commit_to_both(Source, Target) of
@@ -906,14 +942,13 @@ has_session_id(SessionId, [{Props} | Rest]) ->
 
 
 db_monitor(#httpdb{}) ->
-	nil;
+    nil;
 db_monitor(Db) ->
-	couch_db:monitor(Db).
+    couch_db:monitor(Db).
 
 
-get_pending_count(St) ->
-    Rep = St#rep_state.rep_details,
-    Timeout = get_value(connection_timeout, Rep#rep.options),
+get_pending_count(#rep_state{options = Options} = St) ->
+    Timeout = get_value(connection_timeout, Options),
     TimeoutMicro = Timeout * 1000,
     case get(pending_count_state) of
         {LastUpdate, PendingCount} ->
@@ -960,8 +995,7 @@ update_task(State) ->
     ]).
 
 
-update_scheduler_job_stats(#rep_state{rep_details = Rep, stats = Stats}) ->
-    JobId = Rep#rep.id,
+update_scheduler_job_stats(#rep_state{id = JobId, stats = Stats}) ->
     couch_replicator_scheduler:update_job_stats(JobId, Stats).
 
 
@@ -998,24 +1032,21 @@ replication_start_error(Error) ->
     Error.
 
 
-log_replication_start(#rep_state{rep_details = Rep} = RepState) ->
-    #rep{
-       id = {BaseId, Ext},
-       doc_id = DocId,
-       db_name = DbName,
-       options = Options
-    } = Rep,
-    Id = BaseId ++ Ext,
-    Workers = get_value(worker_processes, Options),
-    BatchSize = get_value(worker_batch_size, Options),
+log_replication_start(#rep_state{} = RepState) ->
     #rep_state{
-       source_name = Source,  % credentials already stripped
-       target_name = Target,  % credentials already stripped
-       session_id = Sid
+        id = Id,
+        doc_id = DocId,
+        db_name = DbName,
+        options = Options,
+        source_name = Source,
+        target_name = Target,
+        session_id = Sid,
     } = RepState,
+    Workers = get_value(worker_processes, Options),
+    BatchSize = get_value(worker_batch_size, Options),
     From = case DbName of
-        ShardName when is_binary(ShardName) ->
-            io_lib:format("from doc ~s:~s", [mem3:dbname(ShardName), DocId]);
+        Name when is_binary(Name) ->
+            io_lib:format("from doc ~s:~s", [Name, DocId]);
         _ ->
             "from _replicate endpoint"
     end,
@@ -1048,14 +1079,13 @@ scheduler_job_format_status_test() ->
     Target = <<"http://u:p@h2/d2">>,
     Rep = #rep{
         id = {"base", "+ext"},
-        source = couch_replicator_docs:parse_rep_db(Source, [], []),
-        target = couch_replicator_docs:parse_rep_db(Target, [], []),
+        source = couch_replicator_docs:parse_rep_db(Source, #{}, #{}),
+        target = couch_replicator_docs:parse_rep_db(Target, #{}, #{}),
         options = [{create_target, true}],
         doc_id = <<"mydoc">>,
         db_name = <<"mydb">>
     },
     State = #rep_state{
-        rep_details = Rep,
         source = Rep#rep.source,
         target = Rep#rep.target,
         session_id = <<"a">>,
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_sup.erl b/src/couch_replicator/src/couch_replicator_scheduler_sup.erl
index 8ab55f8..3ea9dff 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_sup.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_sup.erl
@@ -17,7 +17,7 @@
 %% public api
 -export([
     start_link/0,
-    start_child/1,
+    start_child/2,
     terminate_child/1
 ]).
 
@@ -37,8 +37,8 @@ start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 
-start_child(#rep{} = Rep) ->
-    supervisor:start_child(?MODULE, [Rep]).
+start_child(#{} = Job, #{} = Rep) ->
+    supervisor:start_child(?MODULE, [Job, Rep]).
 
 
 terminate_child(Pid) ->
diff --git a/src/couch_replicator/src/couch_replicator_sup.erl b/src/couch_replicator/src/couch_replicator_sup.erl
index 5475e8f..b86529f 100644
--- a/src/couch_replicator/src/couch_replicator_sup.erl
+++ b/src/couch_replicator/src/couch_replicator_sup.erl
@@ -20,18 +20,6 @@ start_link() ->
 
 init(_Args) ->
     Children = [
-        {couch_replication_event,
-            {gen_event, start_link, [{local, couch_replication}]},
-            permanent,
-            brutal_kill,
-            worker,
-            dynamic},
-       {couch_replicator_clustering,
-            {couch_replicator_clustering, start_link, []},
-            permanent,
-            brutal_kill,
-            worker,
-            [couch_replicator_clustering]},
        {couch_replicator_connection,
             {couch_replicator_connection, start_link, []},
             permanent,
@@ -70,12 +58,6 @@ init(_Args) ->
             transient,
             brutal_kill,
             worker,
-            [couch_replicator]},
-        {couch_replicator_db_changes,
-            {couch_replicator_db_changes, start_link, []},
-            permanent,
-            brutal_kill,
-            worker,
-            [couch_multidb_changes]}
+            [couch_replicator]}
     ],
     {ok, {{rest_for_one,10,1}, Children}}.
diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl
index ccf2413..b71ffeb 100644
--- a/src/couch_replicator/src/couch_replicator_utils.erl
+++ b/src/couch_replicator/src/couch_replicator_utils.erl
@@ -20,11 +20,11 @@
    rep_error_to_binary/1,
    get_json_value/2,
    get_json_value/3,
-   pp_rep_id/1,
    iso8601/1,
    filter_state/3,
    remove_basic_auth_from_headers/1,
-   normalize_rep/1
+   normalize_rep/1,
+   default_headers_map/0
 ]).
 
 
@@ -74,14 +74,6 @@ get_json_value(Key, Props, Default) when is_binary(Key) ->
     end.
 
 
-% pretty-print replication id
--spec pp_rep_id(#rep{} | rep_id()) -> string().
-pp_rep_id(#rep{id = RepId}) ->
-    pp_rep_id(RepId);
-pp_rep_id({Base, Extension}) ->
-    Base ++ Extension.
-
-
 % NV: TODO: this function is not used outside api wrap module
 % consider moving it there during final cleanup
 is_deleted(Change) ->
@@ -102,8 +94,13 @@ parse_rep_doc(Props, UserCtx) ->
     couch_replicator_docs:parse_rep_doc(Props, UserCtx).
 
 
--spec iso8601(erlang:timestamp()) -> binary().
-iso8601({_Mega, _Sec, _Micro} = Timestamp) ->
+-spec iso8601(integer()) -> binary().
+iso8601(Native) when is_integer(Native) ->
+    ErlangSystemTime = erlang:convert_time_unit(Native, native, microsecond),
+    MegaSecs = ErlangSystemTime div 1000000000000,
+    Secs = ErlangSystemTime div 1000000 - MegaSecs * 1000000,
+    MicroSecs = ErlangSystemTime rem 1000000,
+    {MegaSecs, Secs, MicroSecs}.
     {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time(Timestamp),
     Format = "~B-~2..0B-~2..0BT~2..0B:~2..0B:~2..0BZ",
     iolist_to_binary(io_lib:format(Format, [Y, Mon, D, H, Min, S])).
@@ -157,25 +154,39 @@ decode_basic_creds(Base64) ->
     end.
 
 
-% Normalize a #rep{} record such that it doesn't contain time dependent fields
+% Normalize a rep map such that it doesn't contain time dependent fields
 % pids (like httpc pools), and options / props are sorted. This function would
 % used during comparisons.
--spec normalize_rep(#rep{} | nil) -> #rep{} | nil.
-normalize_rep(nil) ->
-    nil;
-
-normalize_rep(#rep{} = Rep)->
-    #rep{
-        source = couch_replicator_api_wrap:normalize_db(Rep#rep.source),
-        target = couch_replicator_api_wrap:normalize_db(Rep#rep.target),
-        options = Rep#rep.options,  % already sorted in make_options/1
-        type = Rep#rep.type,
-        view = Rep#rep.view,
-        doc_id = Rep#rep.doc_id,
-        db_name = Rep#rep.db_name
+-spec normalize_rep(#{} | null) -> #{} | null.
+normalize_rep(null) ->
+    null;
+
+normalize_rep(#{} = Rep)->
+    Ks = [<<"options">>, <<"type">>, <<"view">>, <<"doc_id">>, <<"db_name">>],
+    Rep1 = maps:with(Ks, Rep),
+    #{<<"source">> := Source, <<"target">> := Target} = Rep,
+    Rep1#{
+        <<"source">> => normalize_endpoint(Source),
+        <<"target">> => normalize_endpoint(Target)
     }.
 
 
+normalize_endpoint(<<DbName/binary>>) ->
+    DbName;
+
+normalize_endpoint(#{} = Endpoint) ->
+    Ks = [<<"url">>, <<"auth_props">>, <<"headers">>, <<"timeout">>,
+        <<"ibrowse_options">>, <<"retries">>, <<"http_connections">>
+    ],
+    maps:with(Ks, Endpoint).
+
+
+get_default_headers() ->
+    lists:foldl(fun({K, V}, Acc) ->
+        Acc#{list_to_binary(K) => list_to_binary(V)}
+    end, #{}, (#httpdb{})#httpdb.headers).
+
+
 -ifdef(TEST).
 
 -include_lib("eunit/include/eunit.hrl").
@@ -254,4 +265,23 @@ normalize_rep_test_() ->
         end)
     }.
 
+
+normalize_endpoint() ->
+    HttpDb =  #httpdb{
+        url = "http://host/db",
+        auth_props = [{"key", "val"}],
+        headers = [{"k2","v2"}, {"k1","v1"}],
+        timeout = 30000,
+        ibrowse_options = [{k2, v2}, {k1, v1}],
+        retries = 10,
+        http_connections = 20
+    },
+    Expected = HttpDb#httpdb{
+        headers = [{"k1","v1"}, {"k2","v2"}],
+        ibrowse_options = [{k1, v1}, {k2, v2}]
+    },
+    ?assertEqual(Expected, normalize_db(HttpDb)),
+    ?assertEqual(<<"local">>, normalize_db(<<"local">>)).
+
+
 -endif.
diff --git a/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl
index 4f545bc..5fb922a 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl
@@ -49,7 +49,7 @@ parse_rep_doc_without_proxy(_) ->
             {<<"source">>, <<"http://unproxied.com">>},
             {<<"target">>, <<"http://otherunproxied.com">>}
         ]},
-        Rep = couch_replicator_docs:parse_rep_doc(NoProxyDoc),
+        Rep = couch_replicator_docs:parse_rep_doc_without_id(NoProxyDoc),
         ?assertEqual((Rep#rep.source)#httpdb.proxy_url, undefined),
         ?assertEqual((Rep#rep.target)#httpdb.proxy_url, undefined)
     end).
@@ -63,7 +63,7 @@ parse_rep_doc_with_proxy(_) ->
             {<<"target">>, <<"http://otherunproxied.com">>},
             {<<"proxy">>, ProxyURL}
         ]},
-        Rep = couch_replicator_docs:parse_rep_doc(ProxyDoc),
+        Rep = couch_replicator_docs:parse_rep_doc_without_id(ProxyDoc),
         ?assertEqual((Rep#rep.source)#httpdb.proxy_url, binary_to_list(ProxyURL)),
         ?assertEqual((Rep#rep.target)#httpdb.proxy_url, binary_to_list(ProxyURL))
     end).
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index c926da9..9ec9f2b 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -29,6 +29,9 @@
     name/1,
     get_after_doc_read_fun/1,
     get_before_doc_update_fun/1,
+    get_during_doc_update_fun/1,
+    get_after_db_create_fun/1,
+    get_after_db_delete_fun/1,
     get_committed_update_seq/1,
     get_compacted_seq/1,
     get_compactor_pid/1,
@@ -155,7 +158,9 @@ create(DbName, Options) ->
         #{} = Db0 ->
             Db1 = maybe_add_sys_db_callbacks(Db0),
             ok = fabric2_server:store(Db1),
-            {ok, Db1#{tx := undefined}};
+            Db2 = Db1#{tx := undefined},
+            ok = apply_after_db_create(Db2),
+            {ok, Db2};
         Error ->
             Error
     end.
@@ -188,6 +193,7 @@ delete(DbName, Options) ->
         fabric2_fdb:delete(TxDb)
     end),
     if Resp /= ok -> Resp; true ->
+        ok = apply_after_db_delete(Db#{tx := undefined}),
         fabric2_server:remove(DbName)
     end.
 
@@ -264,6 +270,19 @@ get_after_doc_read_fun(#{after_doc_read := AfterDocRead}) ->
 get_before_doc_update_fun(#{before_doc_update := BeforeDocUpdate}) ->
     BeforeDocUpdate.
 
+
+get_during_doc_update_fun(#{during_doc_update := DuringDocUpdate}) ->
+    DuringDocUpdate.
+
+
+get_after_db_create_fun(#{after_db_create := AfterDbCreate}) ->
+    AfterDbCreate.
+
+
+get_after_db_delete_fun(#{after_db_delete := AfterDbDelete}) ->
+    AfterDbDelete.
+
+
 get_committed_update_seq(#{} = Db) ->
     get_update_seq(Db).
 
@@ -762,24 +781,33 @@ maybe_add_sys_db_callbacks(Db) ->
     IsReplicatorDb = is_replicator_db(Db),
     IsUsersDb = is_users_db(Db),
 
-    {BDU, ADR} = if
+    {BDU, DDU, ADR, ADC, ADD} = if
         IsReplicatorDb ->
             {
                 fun couch_replicator_docs:before_doc_update/3,
-                fun couch_replicator_docs:after_doc_read/2
+                fun couch_replicator_doc_processor:during_doc_update/3,
+                fun couch_replicator_docs:after_doc_read/2,
+                fun couch_replicator_doc_processor:after_db_create/1,
+                fun couch_replicator_doc_processor:after_db_delete/1
             };
         IsUsersDb ->
             {
                 fun fabric2_users_db:before_doc_update/3,
-                fun fabric2_users_db:after_doc_read/2
+                undefined,
+                fun fabric2_users_db:after_doc_read/2,
+                undefined,
+                undefined
             };
         true ->
-            {undefined, undefined}
+            {undefined, undefined, undefined, undefined, undefined}
     end,
 
     Db#{
         before_doc_update := BDU,
-        after_doc_read := ADR
+        during_doc_update := DDU,
+        after_doc_read := ADR,
+        after_db_create := ADC,
+        after_db_delete := ADD
     }.
 
 
@@ -1042,6 +1070,33 @@ apply_before_doc_update(Db, Docs, Options) ->
     end.
 
 
+apply_during_doc_update(#{during_doc_update := DDU} = Db, Doc, UpdateType)
+        when is_function(DDU, 3) ->
+    DDU(Doc, Db, UpdateType),
+    ok;
+
+apply_during_doc_update(#{during_doc_update := undefined}, _, _) ->
+    ok.
+
+
+apply_after_db_create(#{after_db_create := ADC} = Db)
+        when is_function(ADC, 1) ->
+    ADC(Db),
+    ok;
+
+apply_after_db_create(#{after_db_create := undefined}) ->
+    ok.
+
+
+apply_after_db_delete(#{after_db_delete := ADD} = Db)
+        when is_function(ADD, 1) ->
+    ADD(Db),
+    ok;
+
+apply_after_db_delete(#{after_db_delete := undefined}) ->
+    ok.
+
+
 update_doc_int(#{} = Db, #doc{} = Doc, Options) ->
     IsLocal = case Doc#doc.id of
         <<?LOCAL_DOC_PREFIX, _/binary>> -> true;
@@ -1218,6 +1273,8 @@ update_doc_interactive(Db, Doc0, Future, _Options) ->
             ToRemove
         ),
 
+    ok = apply_during_doc_update(Db, Doc3, interactive_edit),
+
     {ok, {NewRevPos, NewRev}}.
 
 
@@ -1301,6 +1358,8 @@ update_doc_replicated(Db, Doc0, _Options) ->
             ToRemove
         ),
 
+    ok = apply_during_doc_update(Db, Doc3, replicated_changes),
+
     {ok, []}.
 
 
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index 71cb68f..6387afd 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -162,7 +162,10 @@ create(#{} = Db0, Options) ->
 
         validate_doc_update_funs => [],
         before_doc_update => undefined,
+        during_doc_update => undefined,
         after_doc_read => undefined,
+        after_db_create => undefined,
+        after_db_delete => undefined,
         % All other db things as we add features,
 
         db_options => Options
@@ -199,8 +202,10 @@ open(#{} = Db0, Options) ->
         % bits.
         validate_doc_update_funs => [],
         before_doc_update => undefined,
+        during_doc_update => undefined,
         after_doc_read => undefined,
-
+        after_db_create => undefined,
+        after_db_delete => undefined,
         db_options => Options
     },
 


[couchdb] 03/03: WIP 5

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch prototype/fdb-replicator-2
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 159f680ba66e373cddaf45c1d64264183db8d38a
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Mon Aug 12 11:18:34 2019 -0400

    WIP 5
    
      * streamined doc jobs worker (move halt hadling to the top)
      * remove invalid tests
      * updated couch_replicator_docs to take explicit db and doc params
---
 src/couch_replicator/src/couch_replicator.hrl      |   8 +
 .../src/couch_replicator_doc_processor.erl         | 613 +++++++++------------
 .../src/couch_replicator_doc_processor_worker.erl  | 284 ----------
 src/couch_replicator/src/couch_replicator_docs.erl |  37 +-
 .../src/couch_replicator_filters.erl               |  10 +-
 5 files changed, 281 insertions(+), 671 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator.hrl b/src/couch_replicator/src/couch_replicator.hrl
index 050fcb4..d94c3eb 100644
--- a/src/couch_replicator/src/couch_replicator.hrl
+++ b/src/couch_replicator/src/couch_replicator.hrl
@@ -41,6 +41,12 @@
 -define(ST_CRASHING, <<"crashing">>).
 -define(ST_TRIGGERED, <<"triggered">>).
 
+% Some fields from a rep object
+-define(REP_ID, <<"id">>).
+-define(DB_NAME, <<"db_name">>).
+-define(DOC_ID, <<"doc_id">>).
+-define(START_TIME, <<"start_time">>).
+
 % Fields couch job data objects
 -define(REP, <<"rep">>).
 -define(REP_PARSE_ERROR, <<"rep_parse_error">>).
@@ -51,6 +57,8 @@
 -define(DOC_ID, <<"doc_id">>).
 -define(ERROR_COUNT, <<"error_count">>).
 -define(LAST_UPDATED, <<"last_updated">>).
+-define(HISTORY, <<"history">>).
+-define(VER, <<"ver">>).
 
 % Accepted job message tag
 -define(ACCEPTED_JOB, accepted_job).
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index 9899bef..d892339 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -81,116 +81,42 @@ process_change(#{name := DbName} = Db, #doc{deleted = true} = Doc) ->
 
 process_change(#{name := DbName} = Db, #doc{} = Doc) ->
     #doc{id = DocId, body = {Props} = Body} = Doc,
-    {Rep, RepError} = try
+    {Rep, Error} = try
         Rep0 = couch_replicator_docs:parse_rep_doc_without_id(Body),
-        Rep1 = Rep0#{
-            <<"db_name">> => DbName,
-            <<"start_time">> => erlang:system_time()
-        },
+        DocState = get_json_value(<<"_replication_state">>, Props, null),
+        Rep1 = Rep0#{?DB_NAME := DbName, ?DOC_STATE := DocState},
         {Rep1, null}
     catch
         throw:{bad_rep_doc, Reason} ->
             {null, couch_replicator_utils:rep_error_to_binary(Reason)}
     end,
-    % We keep track of the doc's state in order to clear it if update_docs
-    % is toggled from true to false
-    DocState = get_json_value(<<"_replication_state">>, Props, null),
     case couch_jobs:get_job_data(Db, ?REP_DOCS, docs_job_id(DbName, DocId)) of
         {error, not_found} ->
-            add_rep_doc_job(Db, DbName, DocId, Rep, RepError, DocState);
-        {ok, #{?REP := null, ?REP_PARSE_ERROR := RepError}}
+            add_rep_doc_job(Db, DbName, DocId, Rep, Error);
+        {ok, #{?REP := null, ?REP_PARSE_ERROR := Error}}
                 when Rep =:= null ->
             % Same error as before occurred, don't bother updating the job
             ok;
         {ok, #{?REP := null}} when Rep =:= null ->
             % Error occured but it's a different error. Update the job so user
             % sees the new error
-            add_rep_doc_job(Db, DbName, DocId, Rep, RepError, DocState);
+            add_rep_doc_job(Db, DbName, DocId, Rep, Error);
         {ok, #{?REP := OldRep, ?REP_PARSE_ERROR := OldError}} ->
-            NormOldRep = couch_replicator_util:normalize_rep(OldRep),
-            NormRep = couch_replicator_util:normalize_rep(Rep),
-            case NormOldRep == NormRep of
+            case compare_reps(OldRep, Rep) of
                 true ->
                     % Document was changed but none of the parameters relevent
                     % for the replication job have changed, so make it a no-op
                     ok;
                 false ->
-                    add_rep_doc_job(Db, DbName, DocId, Rep, RepError,
-                        DocState)
+                    add_rep_doc_job(Db, DbName, DocId, Rep, Error)
             end
     end.
 
 
-
-
-rep_docs_job_execute(#{} = Job, #{<<"rep">> := null} = JobData) ->
-    #{
-        <<"rep_parse_error">> := Error,
-        <<"db_name">> := DbName,
-        <<"doc_id">> := DocId,
-    } = JobData,
-    JobData1 = JobData#{
-        <<"finished_state">> := <<"failed">>,
-        <<"finished_result">> := Error
-    }
-    case couch_jobs:finish(undefined, Job, JobData1) of
-        ok ->
-            couch_replicator_docs:update_failed(DbName, DocId, Error),
-            ok;
-        {error, JobError} ->
-            Msg = "Replication ~s job could not finish. JobError:~p",
-            couch_log:error(Msg, [RepId, JobError]),
-            {error, JobError}
-    end;
-
-rep_docs_job_execute(#{} = Job, #{} = JobData) ->
-    #{<<"rep">> := Rep, <<"doc_state">> := DocState} = JobData,
-    case lists:member(DocState, [<<"triggered">>, <<"error">>]) of
-        true -> maybe_remove_state_fields(DbName, DocId),
-        false -> ok
-    end,
-    % completed jobs should finish right away
-
-    % otherwise start computing the replication id
-
-    Rep1 = update_replication_id(Rep),
-
-    % when done add or update the replicaton job
-    % if jobs has a filter keep checking if filter changes
-
-
-maybe_remove_state_fields(DbName, DocId) ->
-    case update_docs() of
-        true ->
-            ok;
-        false ->
-            couch_replicator_docs:remove_state_fields(DbName, DocId)
-    end.
-
-
-process_updated({DbName, _DocId} = Id, JsonRepDoc) ->
-    % Parsing replication doc (but not calculating the id) could throw an
-    % exception which would indicate this document is malformed. This exception
-    % should propagate to db_change function and will be recorded as permanent
-    % failure in the document. User will have to update the documet to fix the
-    % problem.
-    Rep0 = couch_replicator_docs:parse_rep_doc_without_id(JsonRepDoc),
-    Rep = Rep0#rep{db_name = DbName, start_time = os:timestamp()},
-    Filter = case couch_replicator_filters:parse(Rep#rep.options) of
-    {ok, nil} ->
-        nil;
-    {ok, {user, _FName, _QP}} ->
-        user;
-    {ok, {view, _FName, _QP}} ->
-        view;
-    {ok, {docids, _DocIds}} ->
-        docids;
-    {ok, {mango, _Selector}} ->
-        mango;
-    {error, FilterError} ->
-        throw(FilterError)
-    end,
-    gen_server:call(?MODULE, {updated, Id, Rep, Filter}, infinity).
+compare_reps(Rep1, Rep2) ->
+    NormRep1 = couch_replicator_util:normalize_rep(Rep1),
+    NormRep2 = couch_replicator_util:normalize_rep(Rep2),
+    NormRep1 =:= NormRep2.
 
 
 start_link() ->
@@ -218,7 +144,7 @@ handle_call(Msg, _From, #{} = St) ->
 
 
 handle_cast({?ACCEPTED_JOB, Job, JobData}, #{} = St) ->
-    {noreply, execute_rep_doc_job(Job, JobData, St)};
+    {noreply, spawn_worker(Job, JobData, St)};
 
 handle_cast(Msg, #{} = St) ->
     {stop, {bad_cast, Msg}, St}.
@@ -326,236 +252,239 @@ start_acceptors(N, #st{} = St) ->
 
 
 start_worker(Job, #{} = JobData, #{workers := Workers} = St) ->
-    Parent  = self(),
-    Pid = spawn_link(fun() -> rep_doc_job_worker(Job, JobData, Parent) end),
+    Pid = spawn_link(fun() -> worker_fun(Job, JobData) end),
     St#{workers := Workers#{Pid => true}}.
 
 
-rep_doc_job_worker(Job, #{?REP := null} = RepDocData, Parent) ->
+worker_fun(Job, JobData) ->
+    try
+        worker_fun1(Job, JobData)
+    catch
+        throw:halt ->
+            Msg = "~p : replication doc job ~p lock conflict",
+            couch_log:error(Msg, [?MODULE, Job]);
+        throw:{rep_doc_not_current, DbName, DocId} ->
+            Msg = "~p : replication doc ~s:~s is not current",
+            couch_log:error(Msg, [?MODULE, DbName, DocID]),
+    end.
+
+
+worker_fun1(Job, #{?REP := null} = RepDocData) ->
     #{
-        ?REP_PARSE_ERROR := Error,
+        ?STATE_INFO := Error,
         ?DB_NAME := DbName,
         ?DOC_ID := DocId
-        ?ERROR_COUNT := ErrorCount,
     } = RepDocData,
-    RepDocData1 = RepDocData#{
-        ?LAST_UPDATED := erlang:system_time(),
-        ?ERROR_COUNT := ErrorCount + 1,
-        ?FINISHED_STATE := ?FAILED,
-        ?FINISHED_RESULT := Error,
-    },
-    case couch_jobs:finish(undefined, Job, RepDocData1) of
-        ok ->
-            couch_replicator_docs:update_failed(DbName, DocId, Error),
-            St;
-        {error, JobError} ->
-            Msg = "~p : replication ~s job could not finish. JobError:~p",
-            couch_log:error(Msg, [?MODULE, RepId, JobError]),
-            St
-    end;
+    finish_with_permanent_failure(undefined, Job, RepDocData, Error),
+    couch_replicator_docs:update_failed(DbName, DocId, Error);
 
-rep_doc_job_worker(Job, #{?REP := #{}} = RepDocData, Parent) ->
-    #{
-        ?REP := Rep1,
-        ?DOC_STATE := DocState,
-    } = RepDocData,
-    ok = remove_old_state_fields(Rep, DocState),
-    % Calculate replication ID. In most case this is fast and easy
-    % but for filtered replications with user JS filters this
-    % means making a remote connection to the source db.
-    Rep2 = #{<<"id">> := RepId} = couch_replicator_docs:update_rep_id(Rep1),
+
+worker_fun1(Job, #{?REP := #{}} = RepDocData) ->
+    #{?REP := Rep} = RepDocData,
+    #{?REP_ID := OldRepId, ?DB_NAME := DbName, ?DOC_ID := DocId} = Rep,
+    ok = remove_old_state_fields(RepDocData),
     try
-        couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
-            maybe_start_replication_job(JTx, Rep)
-        end)
+        RepWithId = couch_replicator_docs:update_rep_id(Rep),
+        worker_fun2(Job, OldRepId, RepWithId, RepDocData)
     catch
-        throw:{job_start_error, halt} ->
-            Msg = "~p : replication doc job ~s lock conflict",
-            couch_log:error(Msg, [?MODULE, RepId]),
-            exit(normal);
-        throw:{job_start_error, Error} ->
-            Msg = "~p : replication job start failed ~slock conflict",
-            couch_log:error(Msg, [?MODULE, RepId])
+        throw:{filter_fetch_error, Error} ->
+            Error1 = io_lib:format("Filter fetch error ~p", [Error]),
+            Error2 = couch_util:to_binary(Error1),
+            finish_with_temporary_error(undefined, Job, RepDocData, Error2),
+            maybe_update_doc_error(OldRepId, DbName, DocId, Error2)
     end.
 
 
-maybe_start_replication_job(JTx, Rep) ->
-     case couch_jobs:get_job_data(JTx, ?REP_JOBS, RepId) of
-        {error, not_found} ->
-            RepJobData = #{
-                ?REP := Rep2,
-                ?STATE := ST_PENDING,
-                ?STATE_INFO := null,
-            },
-            ok = couch_jobs:add(JTx, ?REP_JOBS, RepJobData),
-            RepDocData1 = RepDocData#{
-                ?REP := Rep2,
-                ?STATE := ST_SCHEDULED,
-                ?STATE_INFO := null,
-                ?ERROR_COUNT := 0,
-                ?LAST_UPDATED => erlang:system_info()
-            },
-            case couch_jobs:finish(JTx, Job, RepDocData1) of
-                ok -> ok;
-                Error -> throw({job_start_error, Error})
-
-remove_old_state_fields(#{} = Rep, ?TRIGGERED) ->
-    case update_docs() of
-        true -> ok;
-        false -> couch_replicator_docs:remove_state_fields(DbName, DocId)
-    end;
 
-remove_old_state_fields(#{} = Rep, ?ERROR) ->
-    case update_docs() of
-        true -> ok;
-        false -> couch_replicator_docs:remove_state_fields(DbName, DocId)
-    end;
+worker_fun2(Job, OldRepId, #{} = Rep, #{} = RepDocData) ->
+    Result = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+        check_rep_doc_current(JTx, Rep),
+        remove_stale_replication_job(JTx, OldRepId, Rep),
+        maybe_start_replication_job(JTx, Job, Rep, RepDocData)
+    end),
+    case Result of
+    {ok, RepId} ->
+            maybe_update_doc_triggered(DbName, DocId, RepId);
+        ignore ->
+            ok;
+        {error, {permanent_failure, Error}}  ->
+            couch_replicator_docs:update_failed(DbName, DocId, Error);
+        {error, {temporary_error, RepId, Error}} ->
+            maybe_update_doc_error(RepId, DbName, DocId, Error)
+    end.
 
-maybe_remove_old_state_fields(#{} = _Rep, _) ->
-    ok.
 
+check_rep_doc_current(JTx, #{} = Rep) ->
+    #{?DB_NAME := DbName, ?DOC_ID := DocId, ?VER := Ver} = Rep,
+    case couch_jobs:get_job_data(JTx, ?REP_DOCS, doc_job_id(DbName, DocId)) of
+        {ok, #{?REP := #{?VER: = Ver}}} ->
+            ok;
+        {ok, #{?REP := #{?VER := V}}} when Ver =/= V ->
+            throw({rep_doc_not_current, DbName, DocId});
+        {error, not_found} ->
+            throw({rep_doc_not_current, DbName, DocId});
+    end.
 
-% Doc processor gen_server private helper functions
 
-% Handle doc update -- add to ets, then start a worker to try to turn it into
-% a replication job. In most cases it will succeed quickly but for filtered
-% replications or if there are duplicates, it could take longer
-% (theoretically indefinitely) until a replication could be started. Before
-% adding replication job, make sure to delete all old jobs associated with
-% same document.
--spec updated_doc(db_doc_id(), #rep{}, filter_type()) -> ok.
-updated_doc(Id, Rep, Filter) ->
-    NormCurRep = couch_replicator_utils:normalize_rep(current_rep(Id)),
-    NormNewRep = couch_replicator_utils:normalize_rep(Rep),
-    case NormCurRep == NormNewRep of
-        false ->
-            removed_doc(Id),
-            Row = #rdoc{
-                id = Id,
-                state = initializing,
-                rep = Rep,
-                rid = nil,
-                filter = Filter,
-                info = nil,
-                errcnt = 0,
-                worker = nil,
-                last_updated = os:timestamp()
-            },
-            true = ets:insert(?MODULE, Row),
-            ok = maybe_start_worker(Id);
-        true ->
+% A stale replication job is one still running after the filter
+% has been updated and a new replication id was generated.
+%
+remove_stale_replication_job(_, null, #{}) ->
+    ok;
+
+remove_stale_replication_job(JTx, OldRepId, #{} = Rep) ->
+    #{?REP_ID := RepId, ?VER := Ver} = Rep,
+    case couch_jobs:get_job_data(JTx, ?REP_JOBS, OldRepId) of
+        {error, not_found} ->
+            ok;
+        {ok, #{?REP := {?VER := Ver}} when OldRep =/= RepId ->
+            couch_jobs:remove(JTx, ?REP_JOBS, OldRepId)
+        {ok, #{}} ->
             ok
     end.
 
 
-% Return current #rep{} record if any. If replication hasn't been submitted
-% to the scheduler yet, #rep{} record will be in the document processor's
-% ETS table, otherwise query scheduler for the #rep{} record.
--spec current_rep({binary(), binary()}) -> #rep{} | nil.
-current_rep({DbName, DocId}) when is_binary(DbName), is_binary(DocId) ->
-    case ets:lookup(?MODULE, {DbName, DocId}) of
-        [] ->
-            nil;
-        [#rdoc{state = scheduled, rep = nil, rid = JobId}] ->
-            % When replication is scheduled, #rep{} record which can be quite
-            % large compared to other bits in #rdoc is removed in order to avoid
-            % having to keep 2 copies of it. So have to fetch it from the
-            % scheduler.
-            couch_replicator_scheduler:rep_state(JobId);
-        [#rdoc{rep = Rep}] ->
-            Rep
+maybe_start_replication_job(JTx, Job, #{} = Rep, #{} = RepDocData) ->
+    {#?REP_ID := RepId, ?DB_NAME := DbName, ?DOC_ID := DocId} = Rep,
+    case couch_jobs:get_job_data(JTx, ?REP_JOBS, RepId) of
+        {error, not_found} ->
+            start_replication_job(JTx, Job, Rep, RepDocData);
+        {ok, #{?REP := {?DB_NAME := DbName, ?DOC_ID := DocId}} = CurRep} ->
+            case compare_reps(Rep, CurRep) of
+                true ->
+                    dont_start_replication_job(JTx, Job, Rep, RepDocData);
+                false ->
+                    ok = couch_jobs:remove(JTx, ?REP_JOBS, RepId),
+                    start_replication_job(JTx, Job, Rep, RepDocData)
+            end;
+        {ok, #{?REP := {?DB_NAME := null}}} ->
+            Err1 = io_lib:format("Replication `~s` specified by `~s:~s`"
+                " already running as a transient replication, started via"
+                " `_replicate` API endpoint", [RepId, DbName, DocId]),
+            Err2 = couch_util:to_binary(Err1),
+            ok = finish_with_temporary_error(JTx, Job, RepDocData, Err2),
+            {error, {temporary_error, RepId, Error2}};
+        {ok, #{?REP := {?DB_NAME := OtherDb, ?DOC_ID := OtherDoc}}} ->
+            Err1 = io_lib:format("Replication `~s` specified by `~s:~s`"
+                " already started by document `~s:~s`", [RepId, DocId,
+                DbName, OtherDb, OtherDoc],
+            Error2 = couch_util:to_binary(Err1),
+            ok = finish_with_permanent_failure(JTx, Job, RepDocData, Error),
+            {error, {permanent_failure, Error2}}
     end.
 
 
--spec worker_returned(reference(), db_doc_id(), rep_start_result()) -> ok.
-worker_returned(Ref, Id, {ok, RepId}) ->
-    case ets:lookup(?MODULE, Id) of
-    [#rdoc{worker = Ref} = Row] ->
-        Row0 = Row#rdoc{
-            state = scheduled,
-            errcnt = 0,
-            worker = nil,
-            last_updated = os:timestamp()
-        },
-        NewRow = case Row0 of
-            #rdoc{rid = RepId, filter = user} ->
-                % Filtered replication id didn't change.
-                Row0;
-            #rdoc{rid = nil, filter = user} ->
-                % Calculated new replication id for a filtered replication. Make
-                % sure to schedule another check as filter code could change.
-                % Replication starts could have been failing, so also clear
-                % error count.
-                Row0#rdoc{rid = RepId};
-            #rdoc{rid = OldRepId, filter = user} ->
-                % Replication id of existing replication job with filter has
-                % changed. Remove old replication job from scheduler and
-                % schedule check to check for future changes.
-                ok = couch_replicator_scheduler:remove_job(OldRepId),
-                Msg = io_lib:format("Replication id changed: ~p -> ~p", [
-                    OldRepId, RepId]),
-                Row0#rdoc{rid = RepId, info = couch_util:to_binary(Msg)};
-            #rdoc{rid = nil} ->
-                % Calculated new replication id for non-filtered replication.
-                % Remove replication doc body, after this we won't need it
-                % anymore.
-                Row0#rdoc{rep=nil, rid=RepId, info=nil}
-        end,
-        true = ets:insert(?MODULE, NewRow),
-        ok = maybe_update_doc_triggered(Row#rdoc.rep, RepId),
-        ok = maybe_start_worker(Id);
-    _ ->
-        ok  % doc could have been deleted, ignore
-    end,
-    ok;
+finish_with_temporary_error(JTx, Job, RepDocData, Error) ->
+    #{?ERROR_COUNT := ErrorCount} = RepDocData,
+    ErrorCount1 = ErrorCount + 1,
+    RepDocData1 = RepDocData#{
+        ?STATE := ?ST_ERROR,
+        ?STATE_INFO := Error,
+        ?ERROR_COUNT := ErrorCount1,
+    } = RepDocData,
+    schedule_error_backoff(JTx, Job, ErrorCount1),
+    case couch_jobs:finish(JTx, Job, RepDocData1) of
+        ok -> ok;
+        {error, halt} -> throw(halt)
+    end.
 
-worker_returned(_Ref, _Id, ignore) ->
-    ok;
 
-worker_returned(Ref, Id, {temporary_error, Reason}) ->
-    case ets:lookup(?MODULE, Id) of
-    [#rdoc{worker = Ref, errcnt = ErrCnt} = Row] ->
-        NewRow = Row#rdoc{
-            rid = nil,
-            state = error,
-            info = Reason,
-            errcnt = ErrCnt + 1,
-            worker = nil,
-            last_updated = os:timestamp()
-        },
-        true = ets:insert(?MODULE, NewRow),
-        ok = maybe_update_doc_error(NewRow#rdoc.rep, Reason),
-        ok = maybe_start_worker(Id);
-    _ ->
-        ok  % doc could have been deleted, ignore
-    end,
-    ok;
+finish_with_permanent_failure(JTx, Job, RepDocData, Error) ->
+    #{?ERROR_COUNT := ErrorCount} = RepDocData,
+    RepDocData1 = RepDocData#{
+        ?STATE := ?ST_FAILED,
+        ?STATE_INFO := Error,
+        ?ERROR_COUNT := ErrorCount + 1,
+    } = RepDocData,
+    case couch_jobs:finish(JTx, Job, RepDocData1) of
+        ok -> ok;
+        {error, halt} -> throw(halt)
+    end.
 
-worker_returned(Ref, Id, {permanent_failure, _Reason}) ->
-    case ets:lookup(?MODULE, Id) of
-    [#rdoc{worker = Ref}] ->
-        true = ets:delete(?MODULE, Id);
-    _ ->
-        ok  % doc could have been deleted, ignore
-    end,
+
+dont_start_replication_job(JTx, Job, Rep, RepDocData) ->
+    RepDocData1 = RepDocData#{?LAST_UPDATED => erlang:system_time()},
+    ok = schedule_filter_check(JTx, Job, Rep),
+    case couch_jobs:finish(JTx, Job, RepDocData1) of
+        ok -> ignore;
+        {error, halt} -> throw(halt)
+    end.
+
+
+start_replication_job(JTx, Job, #{} = Rep, #{} = RepDocData) ->
+    #{?REP_ID := RepId} = Rep,
+    RepJobData = #{
+        ?REP => Rep,
+        ?STATE => ?ST_PENDING,
+        ?STATE_INFO => null,
+        ?ERROR_COUNT => 0,
+        ?LAST_UPDATED => erlang:system_time(),
+        ?HISTORY => []
+    },
+    ok = couch_jobs:add(JTx, ?REP_JOBS, RepId, RepJobData),
+    RepDocData1 = RepDocData#{
+       ?REP := Rep,
+       ?STATE := ?ST_SCHEDULED,
+       ?STATE_INFO := null,
+       ?ERROR_COUNT := 0,
+       ?LAST_UPDATED => erlang:system_time()
+    },
+    ok = schedule_filter_check(JTx, Job, Rep),
+    case couch_jobs:finish(JTx, Job, RepDocData1) of
+        ok -> {ok, RepId};
+        {error, halt} -> throw(halt)
+    end.
+
+
+schedule_error_backoff(JTx, Job, ErrorCount) ->
+    Exp = min(ErrCnt, ?ERROR_MAX_BACKOFF_EXPONENT),
+    % ErrCnt is the exponent here. The reason 64 is used is to start at
+    % 64 (about a minute) max range. Then first backoff would be 30 sec
+    % on average. Then 1 minute and so on.
+    NowSec = erlang:system_time(second),
+    When = NowSec + rand:uniform(?INITIAL_BACKOFF_EXPONENT bsl Exp).
+    couch_jobs:resubmit(JTx, Job, trunc(When)).
+
+
+schedule_filter_check(JTx, Job, #{<<"filter_type">> := <<"user">>} = Rep) ->
+    IntervalSec = filter_check_interval_sec(),
+    NowSec = erlang:system_time(second),
+    When = NowSec + 0.5 * IntervalSec + rand:uniform(IntervalSec),
+    couch_jobs:resubmit(JTx, Job, trunc(When)).
+
+schedule_filter_check(_JTx, _Job, #{}) ->
     ok.
 
 
--spec maybe_update_doc_error(#rep{}, any()) -> ok.
-maybe_update_doc_error(Rep, Reason) ->
+remove_old_state_fields(#{?DOC_STATE := DocState} = RepDocData) when
+        DocState =:= ?TRIGGERED orelse DocState =:= ?ERROR ->
     case update_docs() of
         true ->
-            couch_replicator_docs:update_error(Rep, Reason);
+            ok;
+        false ->
+            #{?DB_NAME := DbName, ?DOC_ID := DocId} = RepDocData,
+            couch_replicator_docs:remove_state_fields(DbName, DocId)
+    end;
+
+remove_old_state_fields(#{}) ->
+    ok.
+
+
+-spec maybe_update_doc_error(binary(), binary(), binary(), any()) -> ok.
+maybe_update_doc_error(RepId, DbName, DocId, Error) ->
+    case update_docs() of
+        true ->
+            couch_replicator_docs:update_error(RepId, DbName, DocId, Error);
         false ->
             ok
     end.
 
 
--spec maybe_update_doc_triggered(#rep{}, rep_id()) -> ok.
-maybe_update_doc_triggered(Rep, RepId) ->
+-spec maybe_update_doc_triggered(#{}, rep_id()) -> ok.
+maybe_update_doc_triggered(RepId, DbName, DocId) ->
     case update_docs() of
         true ->
-            couch_replicator_docs:update_triggered(Rep, RepId);
+            couch_replicator_docs:update_triggered(RepId, DbName, DocId);
         false ->
             ok
     end.
@@ -570,75 +499,17 @@ error_backoff(ErrCnt) ->
     couch_rand:uniform(?INITIAL_BACKOFF_EXPONENT bsl Exp).
 
 
--spec filter_backoff() -> seconds().
-filter_backoff() ->
-    Total = ets:info(?MODULE, size),
-    % This value scaled by the number of replications. If the are a lot of them
-    % wait is longer, but not more than a day (?TS_DAY_SEC). If there are just
-    % few, wait is shorter, starting at about 30 seconds. `2 *` is used since
-    % the expected wait would then be 0.5 * Range so it is easier to see the
-    % average wait. `1 +` is used because couch_rand:uniform only
-    % accepts >= 1 values and crashes otherwise.
-    Range = 1 + min(2 * (Total / 10), ?TS_DAY_SEC),
-    ?MIN_FILTER_DELAY_SEC + couch_rand:uniform(round(Range)).
-
-
-% Document removed from db -- clear ets table and remove all scheduled jobs
--spec removed_doc(db_doc_id()) -> ok.
-removed_doc({DbName, DocId} = Id) ->
-    ets:delete(?MODULE, Id),
-    RepIds = couch_replicator_scheduler:find_jobs_by_doc(DbName, DocId),
-    lists:foreach(fun couch_replicator_scheduler:remove_job/1, RepIds).
-
-
-% Whole db shard is gone -- remove all its ets rows and stop jobs
--spec removed_db(binary()) -> ok.
-removed_db(DbName) ->
-    EtsPat = #rdoc{id = {DbName, '_'}, _ = '_'},
-    ets:match_delete(?MODULE, EtsPat),
-    RepIds = couch_replicator_scheduler:find_jobs_by_dbname(DbName),
-    lists:foreach(fun couch_replicator_scheduler:remove_job/1, RepIds).
-
-
-% Spawn a worker process which will attempt to calculate a replication id, then
-% start a replication. Returns a process monitor reference. The worker is
-% guaranteed to exit with rep_start_result() type only.
--spec maybe_start_worker(db_doc_id()) -> ok.
-maybe_start_worker(Id) ->
-    case ets:lookup(?MODULE, Id) of
-    [] ->
-        ok;
-    [#rdoc{state = scheduled, filter = Filter}] when Filter =/= user ->
-        ok;
-    [#rdoc{rep = Rep} = Doc] ->
-        % For any replication with a user created filter function, periodically
-        % (every `filter_backoff/0` seconds) to try to see if the user filter
-        % has changed by using a worker to check for changes. When the worker
-        % returns check if replication ID has changed. If it hasn't keep
-        % checking (spawn another worker and so on). If it has stop the job
-        % with the old ID and continue checking.
-        Wait = get_worker_wait(Doc),
-        Ref = make_ref(),
-        true = ets:insert(?MODULE, Doc#rdoc{worker = Ref}),
-        couch_replicator_doc_processor_worker:spawn_worker(Id, Rep, Wait, Ref),
-        ok
-    end.
-
-
--spec get_worker_wait(#rdoc{}) -> seconds().
-get_worker_wait(#rdoc{state = scheduled, filter = user}) ->
-    filter_backoff();
-get_worker_wait(#rdoc{state = error, errcnt = ErrCnt}) ->
-    error_backoff(ErrCnt);
-get_worker_wait(#rdoc{state = initializing}) ->
-    0.
-
-
 -spec update_docs() -> boolean().
 update_docs() ->
     config:get_boolean("replicator", "update_docs", ?DEFAULT_UPDATE_DOCS).
 
 
+-spec filter_check_interval_sec() -> integer().
+filter_check_interval_sec() ->
+    config:get_integer("replicator", "filter_check_interval_sec",
+        ?DEFAULT_FILTER_CHECK_INTERVAL_SEC).
+
+
 % _scheduler/docs HTTP endpoint helpers
 
 -spec docs([atom()]) -> [{[_]}] | [].
@@ -753,23 +624,33 @@ ejson_doc_state_filter(State, States) when is_list(States), is_atom(State) ->
 
 
 -spec add_rep_doc_job(any(), binary(), binary(), #{} | null,
-    binary() | null, binary() | null) -> ok.
-add_rep_doc_job(Tx, DbName, DocId, Rep, RepParseError, DocState) ->
+    binary() | null) -> ok.
+add_rep_doc_job(Tx, DbName, DocId, Rep, RepParseError) ->
     JobId = docs_job_id(DbName, DocId),
-    ok = remove_replication_by_doc_job_id(Db, JobId),
-    RepDocData = #{
-        ?REP => Rep,
-        ?REP_PARSE_ERROR => RepParseError,
-        ?DOC_STATE => DocState,
-        ?DB_NAME => DbName,
-        ?DOC_ID => DocId,
-        ?STATE => ST_INITIALIZING,
-        ?ERROR_COUNT => 0,
-        ?LAST_UPDATED => erlang:system_time(),
-        ?STATE => null,
-        ?STATE_INFO => null.
-    },
-    ok = couch_jobs:add(Tx, ?REP_DOCS, RepDocData).
+    RepDocData = case Rep of
+        null ->
+            #{
+                ?REP => null,
+                ?DB_NAME => DbName,
+                ?DOC_ID => DocId,
+                ?STATE => ?ST_INITIALIZING,
+                ?STATE_INFO => RepParseError
+                ?ERROR_COUNT => 0,
+                ?LAST_UPDATED => erlang:system_time()
+            };
+        #{} ->
+            #{
+                ?REP => Rep,
+                ?STATE => ?ST_INITIALIZING,
+                ?ERROR_COUNT => 0,
+                ?LAST_UPDATED => erlang:system_time(),
+                ?STATE_INFO => null
+            }
+    end,
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+       ok = remove_replication_by_doc_job_id(JTx, JobId),
+       ok = couch_jobs:add(JTx, ?REP_DOCS, RepDocData)
+    end).
 
 
 docs_job_id(DbName, Id) when is_binary(DbName), is_binary(Id) ->
@@ -781,12 +662,14 @@ remove_replication_by_doc_job_id(Tx, Id) ->
     case couch_jobs:get_job_data(Tx, ?REP_DOCS, Id) of
         {error, not_found} ->
             ok;
-        {ok, #{?REP := {<<"id">> :=  null}}} ->
+        {ok, #{?REP := {?REP_ID :=  null}}} ->
             couch_jobs:remove(Tx, ?REP_DOCS, Id),
             ok;
-        {ok, #{?REP := {<<"id">> := RepId}}} ->
-            couch_jobs:remove(Tx, ?REP_JOBS, RepId),
-            couch_jobs:remove(Tx, ?REP_DOCS, Id),
+        {ok, #{?REP := {?REP_ID := RepId}}} ->
+            couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+                couch_jobs:remove(JTx, ?REP_JOBS, RepId),
+                couch_jobs:remove(JTx, ?REP_DOCS, Id)
+            end),
             ok
     end.
 
@@ -1068,7 +951,7 @@ test_rep(Id) ->
 
 change() ->
     {[
-        {<<"id">>, ?DOC1},
+        {?REP_ID, ?DOC1},
         {doc, {[
             {<<"_id">>, ?DOC1},
             {<<"source">>, <<"http://srchost.local/src">>},
@@ -1079,7 +962,7 @@ change() ->
 
 change(State) ->
     {[
-        {<<"id">>, ?DOC1},
+        {?REP_ID, ?DOC1},
         {doc, {[
             {<<"_id">>, ?DOC1},
             {<<"source">>, <<"http://srchost.local/src">>},
@@ -1091,7 +974,7 @@ change(State) ->
 
 deleted_change() ->
     {[
-        {<<"id">>, ?DOC1},
+        {?REP_ID, ?DOC1},
         {<<"deleted">>, true},
         {doc, {[
             {<<"_id">>, ?DOC1},
@@ -1103,7 +986,7 @@ deleted_change() ->
 
 bad_change() ->
     {[
-        {<<"id">>, ?DOC2},
+        {?REP_ID, ?DOC2},
         {doc, {[
             {<<"_id">>, ?DOC2},
             {<<"source">>, <<"src">>}
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl b/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl
deleted file mode 100644
index a4c8293..0000000
--- a/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl
+++ /dev/null
@@ -1,284 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_doc_processor_worker).
-
--export([
-    spawn_worker/4
-]).
-
--include("couch_replicator.hrl").
-
--import(couch_replicator_utils, [
-    pp_rep_id/1
-]).
-
-% 61 seconds here because request usually have 10, 15, 30 second
-% timeouts set.  We'd want the worker to get a chance to make a few
-% requests (maybe one failing one and a retry) and then fail with its
-% own error (timeout, network error), which would be more specific and
-% informative, before it simply gets killed because of the timeout
-% here. That is, if all fails and the worker is actually blocked then
-% 61 sec is a safety net to brutally kill the worker so doesn't end up
-% hung forever.
--define(WORKER_TIMEOUT_MSEC, 61000).
-
-
-% Spawn a worker which attempts to calculate replication id then add a
-% replication job to scheduler. This function create a monitor to the worker
-% a worker will then exit with the #doc_worker_result{} record within
-% ?WORKER_TIMEOUT_MSEC timeout period.A timeout is considered a
-%`temporary_error`. Result will be sent as the `Reason` in the {'DOWN',...}
-% message.
--spec spawn_worker(db_doc_id(), #rep{}, seconds(), reference()) -> pid().
-spawn_worker(Id, Rep, WaitSec, WRef) ->
-    {Pid, _Ref} = spawn_monitor(fun() ->
-        worker_fun(Id, Rep, WaitSec, WRef)
-    end),
-    Pid.
-
-
-% Private functions
-
--spec worker_fun(db_doc_id(), #rep{}, seconds(), reference()) -> no_return().
-worker_fun(Id, Rep, WaitSec, WRef) ->
-    timer:sleep(WaitSec * 1000),
-    Fun = fun() ->
-        try maybe_start_replication(Id, Rep, WRef) of
-            Res ->
-                exit(Res)
-        catch
-            throw:{filter_fetch_error, Reason} ->
-                exit({temporary_error, Reason});
-            _Tag:Reason ->
-                exit({temporary_error, Reason})
-        end
-    end,
-    {Pid, Ref} = spawn_monitor(Fun),
-    receive
-        {'DOWN', Ref, _, Pid, Result} ->
-            exit(#doc_worker_result{id = Id, wref = WRef, result = Result})
-    after ?WORKER_TIMEOUT_MSEC ->
-        erlang:demonitor(Ref, [flush]),
-        exit(Pid, kill),
-        {DbName, DocId} = Id,
-        TimeoutSec = round(?WORKER_TIMEOUT_MSEC / 1000),
-        Msg = io_lib:format("Replication for db ~p doc ~p failed to start due "
-            "to timeout after ~B seconds", [DbName, DocId, TimeoutSec]),
-        Result = {temporary_error, couch_util:to_binary(Msg)},
-        exit(#doc_worker_result{id = Id, wref = WRef, result = Result})
-    end.
-
-
-% Try to start a replication. Used by a worker. This function should return
-% rep_start_result(), also throws {filter_fetch_error, Reason} if cannot fetch
-% filter.It can also block for an indeterminate amount of time while fetching
-% filter.
-maybe_start_replication(Id, RepWithoutId, WRef) ->
-    Rep = couch_replicator_docs:update_rep_id(RepWithoutId),
-    case maybe_add_job_to_scheduler(Id, Rep, WRef) of
-    ignore ->
-        ignore;
-    {ok, RepId} ->
-        {ok, RepId};
-    {temporary_error, Reason} ->
-        {temporary_error, Reason};
-    {permanent_failure, Reason} ->
-        {DbName, DocId} = Id,
-        couch_replicator_docs:update_failed(DbName, DocId, Reason),
-        {permanent_failure, Reason}
-    end.
-
-
--spec maybe_add_job_to_scheduler(db_doc_id(), #rep{}, reference()) ->
-   rep_start_result().
-maybe_add_job_to_scheduler({DbName, DocId}, Rep, WRef) ->
-    RepId = Rep#rep.id,
-    case couch_replicator_scheduler:rep_state(RepId) of
-    nil ->
-        % Before adding a job check that this worker is still the current
-        % worker. This is to handle a race condition where a worker which was
-        % sleeping and then checking a replication filter may inadvertently
-        % re-add a replication which was already deleted.
-        case couch_replicator_doc_processor:get_worker_ref({DbName, DocId}) of
-        WRef ->
-            ok = couch_replicator_scheduler:add_job(Rep),
-            {ok, RepId};
-        _NilOrOtherWRef ->
-            ignore
-        end;
-    #rep{doc_id = DocId} ->
-        {ok, RepId};
-    #rep{doc_id = null} ->
-        Msg = io_lib:format("Replication `~s` specified by document `~s`"
-            " already running as a transient replication, started via"
-            " `_replicate` API endpoint", [pp_rep_id(RepId), DocId]),
-        {temporary_error, couch_util:to_binary(Msg)};
-    #rep{db_name = OtherDb, doc_id = OtherDocId} ->
-        Msg = io_lib:format("Replication `~s` specified by document `~s`"
-            " already started, triggered by document `~s` from db `~s`",
-            [pp_rep_id(RepId), DocId, OtherDocId, mem3:dbname(OtherDb)]),
-        {permanent_failure, couch_util:to_binary(Msg)}
-    end.
-
-
--ifdef(TEST).
-
--include_lib("eunit/include/eunit.hrl").
-
--define(DB, <<"db">>).
--define(DOC1, <<"doc1">>).
--define(R1, {"ad08e05057046eabe898a2572bbfb573", ""}).
-
-
-doc_processor_worker_test_() ->
-    {
-        foreach,
-        fun setup/0,
-        fun teardown/1,
-        [
-            t_should_add_job(),
-            t_already_running_same_docid(),
-            t_already_running_transient(),
-            t_already_running_other_db_other_doc(),
-            t_spawn_worker(),
-            t_ignore_if_doc_deleted(),
-            t_ignore_if_worker_ref_does_not_match()
-        ]
-    }.
-
-
-% Replication is already running, with same doc id. Ignore change.
-t_should_add_job() ->
-   ?_test(begin
-       Id = {?DB, ?DOC1},
-       Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
-       ?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)),
-       ?assert(added_job())
-   end).
-
-
-% Replication is already running, with same doc id. Ignore change.
-t_already_running_same_docid() ->
-   ?_test(begin
-       Id = {?DB, ?DOC1},
-       mock_already_running(?DB, ?DOC1),
-       Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
-       ?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)),
-       ?assert(did_not_add_job())
-   end).
-
-
-% There is a transient replication with same replication id running. Ignore.
-t_already_running_transient() ->
-   ?_test(begin
-       Id = {?DB, ?DOC1},
-       mock_already_running(null, null),
-       Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
-       ?assertMatch({temporary_error, _}, maybe_start_replication(Id, Rep,
-           nil)),
-       ?assert(did_not_add_job())
-   end).
-
-
-% There is a duplicate replication potentially from a different db and doc.
-% Write permanent failure to doc.
-t_already_running_other_db_other_doc() ->
-   ?_test(begin
-       Id = {?DB, ?DOC1},
-       mock_already_running(<<"otherdb">>, <<"otherdoc">>),
-       Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
-       ?assertMatch({permanent_failure, _}, maybe_start_replication(Id, Rep,
-           nil)),
-       ?assert(did_not_add_job()),
-       1 == meck:num_calls(couch_replicator_docs, update_failed, '_')
-   end).
-
-
-% Should spawn worker
-t_spawn_worker() ->
-   ?_test(begin
-       Id = {?DB, ?DOC1},
-       Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
-       WRef = make_ref(),
-       meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, WRef),
-       Pid = spawn_worker(Id, Rep, 0, WRef),
-       Res = receive  {'DOWN', _Ref, process, Pid, Reason} -> Reason
-           after 1000 -> timeout end,
-       Expect = #doc_worker_result{id = Id, wref = WRef, result = {ok, ?R1}},
-       ?assertEqual(Expect, Res),
-       ?assert(added_job())
-   end).
-
-
-% Should not add job if by the time worker got to fetching the filter
-% and getting a replication id, replication doc was deleted
-t_ignore_if_doc_deleted() ->
-   ?_test(begin
-       Id = {?DB, ?DOC1},
-       Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
-       meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, nil),
-       ?assertEqual(ignore, maybe_start_replication(Id, Rep, make_ref())),
-       ?assertNot(added_job())
-   end).
-
-
-% Should not add job if by the time worker got to fetchign the filter
-% and building a replication id, another worker was spawned.
-t_ignore_if_worker_ref_does_not_match() ->
-    ?_test(begin
-       Id = {?DB, ?DOC1},
-       Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
-       meck:expect(couch_replicator_doc_processor, get_worker_ref, 1,
-           make_ref()),
-       ?assertEqual(ignore, maybe_start_replication(Id, Rep, make_ref())),
-       ?assertNot(added_job())
-   end).
-
-
-% Test helper functions
-
-setup() ->
-    meck:expect(couch_replicator_scheduler, add_job, 1, ok),
-    meck:expect(config, get, fun(_, _, Default) -> Default end),
-    meck:expect(couch_server, get_uuid, 0, this_is_snek),
-    meck:expect(couch_replicator_docs, update_failed, 3, ok),
-    meck:expect(couch_replicator_scheduler, rep_state, 1, nil),
-    meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, nil),
-    ok.
-
-
-teardown(_) ->
-    meck:unload().
-
-
-mock_already_running(DbName, DocId) ->
-    meck:expect(couch_replicator_scheduler, rep_state,
-         fun(RepId) -> #rep{id = RepId, doc_id = DocId, db_name = DbName} end).
-
-
-added_job() ->
-    1 == meck:num_calls(couch_replicator_scheduler, add_job, '_').
-
-
-did_not_add_job() ->
-    0 == meck:num_calls(couch_replicator_scheduler, add_job, '_').
-
-
-change() ->
-    {[
-         {<<"_id">>, ?DOC1},
-         {<<"source">>, <<"http://srchost.local/src">>},
-         {<<"target">>, <<"http://tgthost.local/tgt">>}
-     ]}.
-
--endif.
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index eaf8161..d235bf6 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -27,7 +27,7 @@
     update_failed/3,
     update_rep_id/1,
     update_triggered/3,
-    update_error/2
+    update_error/4
 ]).
 
 
@@ -116,13 +116,8 @@ update_triggered(Id, DocId, DbName) ->
     ok.
 
 
--spec update_error(#{}, any()) -> ok.
-update_error(#rep{} = Rep, Error) ->
-    #{
-        <<"id">> := RepId0,
-        <<"db_name">> := DbName,
-        <<"doc_id">> := DocId,
-    } = Rep,
+-spec update_error(binary(), binary(), binary(), any()) -> ok.
+update_error(RepId0, DbName, DocId, Error) ->
     Reason = error_reason(Error),
     RepId = case RepId0 of
         Id when is_binary(Id) -> Id;
@@ -199,22 +194,30 @@ parse_rep_doc_without_id(#{} = Doc, UserName) ->
             {error, Error} -> throw({bad_request, Error});
             Result -> Result
         end,
+        FilterType = couch_replicator_filters:parse(Options) of
+            {ok, nil} -> null;
+            {ok, {user, _FName, _QP}} -> <<"user">>;
+            {ok, {view, _FName, _QP}} -> <<"view">>;
+            {ok, {docids, _DocIds}} -> <<"doc_ids">>;
+            {ok, {mango, _Selector}} -> <<"mango">>;
+            {error, FilterError} -> throw({error, FilterError})
+        end,
         Rep = #{
-            <<"id">> => null,
+            ?REP_ID => null,
             <<"base_id">> => null,
-            <<"source">> => Source,
-            <<"target">> => Target,
+            ?SOURCE => Source,
+            ?TARGET => Target,
             <<"options">> => Opts,
             <<"user">> => UserName,
+            <<"filter_type">> => FilterType,
             <<"type">> => Type,
             <<"view">> => View,
-            <<"doc_id">> => maps:get(<<"_id">>, Doc, null)
+            ?DOC_ID => maps:get(<<"_id">>, Doc, null),
+            ?DB_NAME => null,
+            ?DOC_STATE => null,
+            ?START_TIME => erlang:system_time(),
+            ?VER => fabric2_util:uuid()
         },
-        % Check if can parse filter code, if not throw exception
-        case couch_replicator_filters:parse(Opts) of
-            {error, FilterError} -> throw({error, FilterError});
-            {ok, _Filter} -> ok
-        end,
         {ok, Rep}
     end.
 
diff --git a/src/couch_replicator/src/couch_replicator_filters.erl b/src/couch_replicator/src/couch_replicator_filters.erl
index b14ea34..fe785a2 100644
--- a/src/couch_replicator/src/couch_replicator_filters.erl
+++ b/src/couch_replicator/src/couch_replicator_filters.erl
@@ -27,17 +27,17 @@
 % For `user` filter, i.e. filters specified as user code
 % in source database, this code doesn't fetch the filter
 % code, but only returns the name of the filter.
--spec parse([_]) ->
+-spec parse(#{}) ->
     {ok, nil} |
     {ok, {view, binary(), {[_]}}} |
     {ok, {user, {binary(), binary()}, {[_]}}} |
     {ok, {docids, [_]}} |
     {ok, {mango, {[_]}}} |
     {error, binary()}.
-parse(Options) ->
-    Filter = couch_util:get_value(filter, Options),
-    DocIds = couch_util:get_value(doc_ids, Options),
-    Selector = couch_util:get_value(selector, Options),
+parse(#{} = Options) ->
+    Filter = maps:get(<<"filter">>, Options, undefined),
+    DocIds = maps:get(<<"doc_ids">>, Options, undefined),
+    Selector = maps:get(<<"selector">>, Options, undefined),
     case {Filter, DocIds, Selector} of
         {undefined, undefined, undefined} ->
             {ok, nil};