File path: src/fabric/src/fabric2_db.erl
@@ -243,9 +245,16 @@ delete(DbName, Options) ->
 undelete(DbName, TgtDbName, TimeStamp, Options) ->
     case validate_dbname(TgtDbName) of
         ok ->
-            fabric2_fdb:transactional(DbName, Options, fun(TxDb) ->
-                fabric2_fdb:undelete(TxDb, TgtDbName, TimeStamp)
-            end);
+            Resp = fabric2_fdb:transactional(DbName, Options,
+                fun(TxDb) ->
+                    fabric2_fdb:undelete(TxDb, TgtDbName, TimeStamp)
+                end
+            ),
+            if Resp /= ok -> Resp; true ->
+                {ok, Db} = open(TgtDbName, Options),
+                fabric2_db_plugin:after_db_create(TgtDbName, get_uuid(Db)),
+                Resp
+            end;

Review comment:
       Style suggestion to reinforce that Resp is always returned rather than having it be the end of both `if` branches.
   if Resp /= ok -> ok; true ->
       {ok, Db} = open(TgtDbName, Options),
       fabric2_db_plugin:after_db_create(TgtDbName, get_uuid(Db))

File path: src/couch_replicator/src/couch_replicator_docs.erl
@@ -13,306 +13,139 @@
-    parse_rep_doc/1,
-    parse_rep_doc/2,
-    parse_rep_db/3,
-    parse_rep_doc_without_id/1,
-    parse_rep_doc_without_id/2,
+    remove_state_fields/3,
+    update_completed/4,
+    update_failed/4,
-    after_doc_read/2,
-    ensure_rep_ddoc_exists/1,
-    ensure_cluster_rep_ddoc_exists/1,
-    remove_state_fields/2,
-    update_doc_completed/3,
-    update_failed/3,
-    update_rep_id/1,
-    update_triggered/2,
-    update_error/2
+    after_doc_read/2
--import(couch_util, [
-    get_value/2,
-    get_value/3,
-    to_binary/1
--import(couch_replicator_utils, [
-    get_json_value/2,
-    get_json_value/3
--define(REP_DB_NAME, <<"_replicator">>).
--define(REP_DESIGN_DOC, <<"_design/_replicator">>).
 -define(OWNER, <<"owner">>).
 -define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}).
 -define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
-remove_state_fields(DbName, DocId) ->
-    update_rep_doc(DbName, DocId, [
-        {<<"_replication_state">>, undefined},
-        {<<"_replication_state_time">>, undefined},
-        {<<"_replication_state_reason">>, undefined},
-        {<<"_replication_id">>, undefined},
-        {<<"_replication_stats">>, undefined}]).
+remove_state_fields(null, null, null) ->
+    ok;
+remove_state_fields(DbName, DbUUID, DocId) ->
+    update_rep_doc(DbName, DbUUID, DocId, [
+        {?REPLICATION_STATE, undefined},
+        {?REPLICATION_STATE_TIME, undefined},
+        {?REPLICATION_STATE_REASON, undefined},
+        {?REPLICATION_ID, undefined},
+        {?REPLICATION_STATS, undefined}]),

Review comment:
       Style nit, but the closing `]),` should be on its own line. Same for the other similar places for  other instances in this module that I won't bother enumerating.

File path: src/couch_replicator/src/couch_replicator_parse.erl
@@ -0,0 +1,534 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+    parse_rep_doc/1,
+    parse_transient_rep/2,
+    parse_rep/2,
+    parse_rep_db/3
+-define(DEFAULT_SOCK_OPTS, "[{keepalive, true}, {nodelay, false}]").
+-define(VALID_SOCK_OPTS, [
+    buffer, delay_send, exit_on_close, ipv6_v6only, keepalive, nodelay, recbuf,

Review comment:
       Lets make these 1 item per line and alphabetized.

File path: src/couch_replicator/src/couch_replicator_job.erl
@@ -0,0 +1,1605 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+    start_link/0
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    format_status/2,
+    code_change/3
+    accept/0,
+    health_threshold/0
+-define(LOWEST_SEQ, 0).
+-define(DEFAULT_MAX_BACKOFF_PENALTY_SEC, 2 * 24 * 3600).
+-define(DEFAULT_MAX_HISTORY, 10).
+-record(rep_state, {
+    job,
+    job_data,
+    id,
+    base_id,
+    doc_id,
+    db_name,
+    db_uuid,
+    source_name,
+    target_name,
+    source,
+    target,
+    history,
+    checkpoint_history,
+    start_seq,
+    committed_seq,
+    current_through_seq,
+    seqs_in_progress = [],
+    highest_seq_done = {0, ?LOWEST_SEQ},
+    source_log,
+    target_log,
+    rep_starttime,
+    src_starttime,
+    tgt_starttime,
+    checkpoint_timer,
+    stats_timer,
+    changes_queue,
+    changes_manager,
+    changes_reader,
+    workers,
+    stats = couch_replicator_stats:new(),
+    session_id,
+    source_seq = nil,
+    use_checkpoints = true,
+    checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL,
+    user = null,
+    options = #{}
+start_link() ->
+    gen_server:start_link(?MODULE, [], []).
+init(_) ->
+    process_flag(trap_exit, true),
+    {ok, delayed_init, 0}.
+terminate(normal, #rep_state{} = State) ->
+    #rep_state{
+        job = Job,
+        job_data = JobData,
+        checkpoint_history = History
+    } = State,
+    ok = complete_job(undefined, Job, JobData, History),
+    close_endpoints(State);
+terminate(shutdown, #rep_state{} = State0) ->
+    % Replication stopped by the job server
+    State1 = cancel_timers(State0),
+    State3 = case do_checkpoint(State1) of
+        {ok, State2} ->
+            State2;
+        Error ->
+            Msg = "~p : Failed last checkpoint. Job: ~p Error: ~p",
+            couch_log:error(Msg, [?MODULE,, Error]),
+            State1
+    end,
+    #rep_state{job = Job, job_data = JobData} = State3,
+    ok = reschedule(undefined, Job, JobData),
+    ok = close_endpoints(State3);
+terminate({shutdown, Error}, {init_error, Stack}) ->
+    % Termination in init, before the job had initialized
+    case Error of
+        max_backoff -> couch_log:warning("~p job backed off", [?MODULE]);
+        finished -> couch_log:notice("~p job finished in init", [?MODULE]);
+        _ -> couch_log:error("~p job failed ~p ~p", [?MODULE, Error, Stack])
+    end,
+    ok;
+terminate({shutdown, finished}, #rep_state{} = State) ->
+    % Job state was already updated and job is marked as finished
+    ok = close_endpoints(State);
+terminate({shutdown, halt}, #rep_state{} = State) ->
+    % Job is re-enqueued and possibly already running somewhere else
+    couch_log:error("~p job ~p halted", [?MODULE,]),
+    ok = close_endpoints(State);
+terminate(Reason0, #rep_state{} = State0) ->
+    State = update_job_state(State0),
+    Reason = case Reason0 of
+        {shutdown, Err} -> Err;
+        _ -> Reason0
+    end,
+    #rep_state{
+        id = RepId,
+        job = Job,
+        job_data = JobData,
+        source_name = Source,
+        target_name = Target
+    } = State,
+    couch_log:error("Replication `~s` (`~s` -> `~s`) failed: ~p",
+        [RepId, Source, Target, Reason]),
+    ok = reschedule_on_error(undefined, Job, JobData, Reason),
+    ok = close_endpoints(State).
+handle_call({add_stats, Stats}, From, State) ->
+    gen_server:reply(From, ok),
+    NewStats = couch_replicator_stats:sum_stats(State#rep_state.stats, Stats),
+    {noreply, State#rep_state{stats = NewStats}};
+handle_call({report_seq_done, Seq, StatsInc}, From, #rep_state{} = State) ->
+    #rep_state{
+        seqs_in_progress = SeqsInProgress,
+        highest_seq_done = HighestDone,
+        current_through_seq = ThroughSeq,
+        stats = Stats
+    } = State,
+    gen_server:reply(From, ok),
+    {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of
+        [] ->
+            {Seq, []};
+        [Seq | Rest] ->
+            {Seq, Rest};
+        [_ | _] ->
+            {ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)}
+    end,
+    NewHighestDone = lists:max([HighestDone, Seq]),
+    NewThroughSeq = case NewSeqsInProgress of
+        [] ->
+            lists:max([NewThroughSeq0, NewHighestDone]);
+        _ ->
+            NewThroughSeq0
+    end,
+    couch_log:debug("Worker reported seq ~p, through seq was ~p, "
+        "new through seq is ~p, highest seq done was ~p, "
+        "new highest seq done is ~p~n"
+        "Seqs in progress were: ~p~nSeqs in progress are now: ~p",
+        [Seq, ThroughSeq, NewThroughSeq, HighestDone,
+            NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
+    NewState = State#rep_state{
+        stats = couch_replicator_stats:sum_stats(Stats, StatsInc),
+        current_through_seq = NewThroughSeq,
+        seqs_in_progress = NewSeqsInProgress,
+        highest_seq_done = NewHighestDone
+    },
+    {noreply, maybe_update_job_state(NewState)};
+handle_call(Msg, _From, St) ->
+    {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+handle_cast({report_seq, Seq},
+    #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
+    NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress),
+    {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}};
+handle_cast(Msg, St) ->
+    {stop, {bad_cast, Msg}, St}.
+handle_info(timeout, delayed_init) ->
+    try delayed_init() of
+        {ok, State} -> {noreply, State};
+        {stop, Reason, State} -> {stop, Reason, State}
+    catch
+        exit:{shutdown, Exit} when Exit =:= finished orelse Exit =:= halt ->
+            Stack = erlang:get_stacktrace(),
+            {stop, {shutdown, Exit}, {init_error, Stack}};
+        _Tag:Error ->
+            ShutdownReason = {error, replication_start_error(Error)},
+            Stack = erlang:get_stacktrace(),
+            {stop, {shutdown, ShutdownReason}, {init_error, Stack}}
+    end;
+handle_info(stats_update, #rep_state{} = State) ->
+    State1 = cancel_stats_timer(State),
+    State2 = update_job_state(State1),
+    {noreply, State2};
+handle_info(checkpoint, State0) ->
+    State = cancel_checkpoint_timer(State0),
+    ok = check_user_filter(State),
+    case do_checkpoint(State) of
+        {ok, State1} ->
+            couch_stats:increment_counter([couch_replicator, checkpoints,
+                success]),
+            {noreply, start_checkpoint_timer(State1)};
+        Error ->
+            couch_stats:increment_counter([couch_replicator, checkpoints,
+                failure]),
+            {stop, Error, State}
+    end;
+handle_info(shutdown, St) ->
+    {stop, shutdown, St};
+handle_info({'EXIT', Pid, max_backoff}, State) ->
+    couch_log:error("Max backoff reached child process ~p", [Pid]),
+    {stop, {shutdown, max_backoff}, State};
+handle_info({'EXIT', Pid, {shutdown, max_backoff}}, State) ->
+    couch_log:error("Max backoff reached child process ~p", [Pid]),
+    {stop, {shutdown, max_backoff}, State};
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
+    {noreply, State};
+handle_info({'EXIT', Pid, Reason0}, #rep_state{changes_reader=Pid} = State) ->
+    couch_stats:increment_counter([couch_replicator, changes_reader_deaths]),
+    Reason = case Reason0 of
+        {changes_req_failed, _, _} = HttpFail ->
+            HttpFail;
+        {http_request_failed, _, _, {error, {code, Code}}} ->
+            {changes_req_failed, Code};
+        {http_request_failed, _, _, {error, Err}} ->
+            {changes_req_failed, Err};
+        Other ->
+            {changes_reader_died, Other}
+    end,
+    couch_log:error("ChangesReader process died with reason: ~p", [Reason]),
+    {stop, {shutdown, Reason}, cancel_timers(State)};
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager=Pid} = State) ->
+    {noreply, State};
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager=Pid} = State) ->
+    couch_stats:increment_counter([couch_replicator, changes_manager_deaths]),
+    couch_log:error("ChangesManager process died with reason: ~p", [Reason]),
+    {stop, {shutdown, {changes_manager_died, Reason}}, cancel_timers(State)};
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
+    {noreply, State};
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
+    couch_stats:increment_counter([couch_replicator, changes_queue_deaths]),
+    couch_log:error("ChangesQueue process died with reason: ~p", [Reason]),
+    {stop, {shutdown, {changes_queue_died, Reason}}, cancel_timers(State)};
+handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
+    case Workers -- [Pid] of
+        Workers ->
+            couch_log:error("unknown pid bit the dust ~p ~n", [Pid]),
+            {noreply, State#rep_state{workers = Workers}};
+            %% not clear why a stop was here before
+            %%{stop, {unknown_process_died, Pid, normal}, State};
+        [] ->
+            catch unlink(State#rep_state.changes_manager),
+            catch exit(State#rep_state.changes_manager, kill),
+            do_last_checkpoint(State);
+        Workers2 ->
+            {noreply, State#rep_state{workers = Workers2}}
+    end;
+handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
+    State2 = cancel_timers(State),
+    case lists:member(Pid, Workers) of
+        false ->
+            {stop, {unknown_process_died, Pid, Reason}, State2};
+        true ->
+            couch_stats:increment_counter([couch_replicator, worker_deaths]),
+            StopReason = case Reason of
+                {shutdown, _} = Err ->
+                    Err;
+                Other ->
+                    ErrLog = "Worker ~p died with reason: ~p",
+                    couch_log:error(ErrLog, [Pid, Reason]),
+                    {worker_died, Pid, Other}
+            end,
+            {stop, StopReason, State2}
+    end;
+handle_info({Ref, ready}, St) when is_reference(Ref) ->
+    LogMsg = "~p : spurious erlfdb future ready message ~p",
+    couch_log:notice(LogMsg, [?MODULE, Ref]),
+    {noreply, St};
+handle_info(Msg, St) ->
+    {stop, {bad_info, Msg}, St}.
+format_status(_Opt, [_PDict, State]) ->
+    #rep_state{
+        id = Id,
+        source = Source,
+        target = Target,
+        start_seq = StartSeq,
+        source_seq = SourceSeq,
+        committed_seq = CommitedSeq,
+        current_through_seq = ThroughSeq,
+        highest_seq_done = HighestSeqDone,
+        session_id = SessionId,
+        doc_id = DocId,
+        db_name = DbName,
+        options = Options
+    } = state_strip_creds(State),
+    [
+        {rep_id, Id},
+        {source, couch_replicator_api_wrap:db_uri(Source)},
+        {target, couch_replicator_api_wrap:db_uri(Target)},
+        {db_name, DbName},
+        {doc_id, DocId},
+        {options, Options},
+        {session_id, SessionId},
+        {start_seq, StartSeq},
+        {source_seq, SourceSeq},
+        {committed_seq, CommitedSeq},
+        {current_through_seq, ThroughSeq},
+        {highest_seq_done, HighestSeqDone}
+    ].
+code_change(_OldVsn, #rep_state{}=State, _Extra) ->
+    {ok, State}.
+accept() ->
+    couch_stats:increment_counter([couch_replicator, jobs, accepts]),
+    Now = erlang:system_time(second),
+    case couch_replicator_jobs:accept_job(Now + 5) of
+        {ok, Job, #{?REP := Rep} = JobData} ->
+            Normal = case Rep of
+                #{?OPTIONS := #{} = Options} ->
+                    not maps:get(<<"continuous">>, Options, false);
+                _ ->
+                    true
+            end,
+            couch_replicator_job_server:accepted(self(), Normal),
+            {ok, Job, JobData};
+        {error, not_found} ->
+            timer:sleep(accept_jitter_msec()),
+            ?MODULE:accept()
+    end.
+% Health threshold is the minimum amount of time an unhealthy job should run
+% crashing before it is considered to be healthy again. HealtThreashold should
+% not be 0 as jobs could start and immediately crash, and it shouldn't be
+% infinity, since then  consecutive crashes would accumulate forever even if
+% job is back to normal.
+health_threshold() ->
+    config:get_integer("replicator", "health_threshold_sec",
+delayed_init() ->
+    {ok, Job, JobData} = accept(),
+    try do_init(Job, JobData) of
+        State = #rep_state{} -> {ok, State}
+    catch
+        exit:{http_request_failed, _, _, max_backoff} ->
+            Stack = erlang:get_stacktrace(),
+            reschedule_on_error(undefined, Job, JobData, max_backoff),
+            {stop, {shutdown, max_backoff}, {init_error, Stack}};
+        exit:{shutdown, Exit} when Exit =:= finished orelse Exit =:= halt ->
+            Stack = erlang:get_stacktrace(),
+            {stop, {shutdown, Exit}, {init_error, Stack}};
+        _Tag:Error ->
+            Reason = {error, replication_start_error(Error)},
+            Stack = erlang:get_stacktrace(),
+            ErrMsg = "~p : job ~p failed during startup ~p stack:~p",
+            couch_log:error(ErrMsg, [?MODULE, Job, Reason, Stack]),
+            reschedule_on_error(undefined, Job, JobData, Reason),
+            {stop, {shutdown, Reason}, {init_error, Stack}}
+    end.
+do_init(Job, #{} = JobData) ->
+    couch_stats:increment_counter([couch_replicator, jobs, starts]),
+    % This may make a network request, then may fail and reschedule the job
+    {RepId, BaseId} = get_rep_id(undefined, Job, JobData),
+    #{
+        ?DB_NAME := DbName,
+        ?DB_UUID := DbUUID,
+        ?DOC_ID := DocId
+    } = JobData,
+    ok = couch_replicator_docs:remove_state_fields(DbName, DbUUID, DocId),
+    % Finish if job is in a failed state already
+    case JobData of
+        #{?STATE := ?ST_FAILED, ?STATE_INFO := Error} ->
+            ok = fail_job(undefined, Job, JobData, Error),
+            exit({shutdown, finished});
+        #{?STATE := St} when is_binary(St), St =/= ?ST_FAILED ->
+            ok
+    end,
+    JobsTx = couch_jobs_fdb:get_jtx(),
+    {Job1, JobData1, Owner} = couch_jobs_fdb:tx(JobsTx, fun(JTx) ->
+        init_job_data(JTx, Job, JobData, RepId, BaseId)
+    end),
+    % Handle ownership decision here to be outside of the transaction
+    case Owner of
+        owner -> ok;
+        not_owner -> exit({shutdown, finished})
+    end,
+    #rep_state{
+        source = Source,
+        target = Target,
+        start_seq = {_Ts, StartSeq},
+        options = Options,
+        doc_id = DocId,
+        db_name = DbName
+    } = State = init_state(Job1, JobData1),
+    NumWorkers = maps:get(<<"worker_processes">>, Options),
+    BatchSize = maps:get(<<"worker_batch_size">>, Options),
+    {ok, ChangesQueue} = couch_work_queue:new([
+        {max_items, BatchSize * NumWorkers * 2},
+        {max_size, 100 * 1024 * NumWorkers}
+    ]),
+    % This starts the _changes reader process. It adds the changes from the
+    % source db to the ChangesQueue.
+    {ok, ChangesReader} = couch_replicator_changes_reader:start_link(
+        StartSeq, Source, ChangesQueue, Options
+    ),
+    % Changes manager - responsible for dequeing batches from the changes queue
+    % and deliver them to the worker processes.
+    ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize),
+    % This starts the worker processes. They ask the changes queue manager for
+    % a a batch of _changes rows to process -> check which revs are missing in
+    % the target, and for the missing ones, it copies them from the source to
+    % the target.
+    MaxConns = maps:get(<<"http_connections">>, Options),
+    Workers = lists:map(fun(_) ->
+        couch_stats:increment_counter([couch_replicator, workers_started]),
+        {ok, Pid} = couch_replicator_worker:start_link(self(), Source, Target,
+            ChangesManager, MaxConns),
+        Pid
+    end, lists:seq(1, NumWorkers)),
+    log_replication_start(State),
+    State1 = State#rep_state{
+            changes_queue = ChangesQueue,
+            changes_manager = ChangesManager,
+            changes_reader = ChangesReader,
+            workers = Workers
+    },
+    update_job_state(State1).
+init_job_data(#{jtx := true} = JTx, Job, #{?REP_ID := RepId} = JobData, RepId,
+        _BaseId) ->
+    {Job, JobData, check_ownership(JTx, Job, JobData)};
+init_job_data(#{jtx := true} = JTx, Job, #{} = JobData, RepId, BaseId) ->
+    #{
+        ?REP := Rep,
+        ?REP_ID := OldRepId,
+        ?DB_UUID := DbUUID,
+        ?DOC_ID := DocId
+    } = JobData,
+    JobId = couch_replicator_ids:job_id(Rep, DbUUID, DocId),
+    Now = erlang:system_time(second),
+    JobData1 = JobData#{
+        ?REP_ID := RepId,
+        ?BASE_ID := BaseId,
+        ?STATE := ?ST_RUNNING,
+        ?STATE_INFO := null,
+        ?LAST_START := Now,
+        ?REP_NODE := erlang:atom_to_binary(node(), utf8),
+        ?REP_PID := list_to_binary(pid_to_list(self())),
+        ?LAST_UPDATED := Now
+    },
+    JobData2 = case is_binary(OldRepId) andalso OldRepId =/= RepId of
+        true ->
+            % Handle Replication ID change
+            ok = couch_replicator_jobs:clear_old_rep_id(JTx, JobId, OldRepId),
+            JobData1#{
+                ?REP_STATS := #{},
+                ?JOB_HISTORY := []
+            };
+        false ->
+            JobData1
+    end,
+    JobData3 = hist_append(?HIST_STARTED, Now, JobData2, undefined),
+    case check_ownership(JTx, Job, JobData3) of
+        owner ->
+            couch_stats:increment_counter([couch_replicator, jobs, starts]),
+            {Job1, JobData4} = update_job_data(JTx, Job, JobData3),
+            {Job1, JobData4, owner};
+        not_owner ->
+            {Job, JobData3, not_owner}
+    end.
+check_ownership(#{jtx := true} = JTx, Job, JobData) ->

Review comment:
       The ownership issue here is covering the case when a _replicator database has more than one document that ends up generating the same replication id, right?

File path: src/couch_replicator/src/couch_replicator_parse.erl
@@ -0,0 +1,534 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+    parse_rep_doc/1,
+    parse_transient_rep/2,
+    parse_rep/2,
+    parse_rep_db/3
+-define(DEFAULT_SOCK_OPTS, "[{keepalive, true}, {nodelay, false}]").
+-define(VALID_SOCK_OPTS, [
+    buffer, delay_send, exit_on_close, ipv6_v6only, keepalive, nodelay, recbuf,
+    send_timeout, send_timout_close, sndbuf, priority, tos, tclass
+-define(VALID_PROXY_PROTOCOLS, [http, https, socks5]).
+    {"worker_processes", "4", fun list_to_integer/1},
+    {"worker_batch_size", "500", fun list_to_integer/1},
+    {"http_connections", "20", fun list_to_integer/1},
+    {"connection_timeout", "30000", fun list_to_integer/1},
+    {"retries_per_request", "5", fun list_to_integer/1},
+    {"use_checkpoints", "true", fun list_to_existing_atom/1},
+    {"checkpoint_interval", "30000", fun list_to_integer/1},
+    {"socket_options", ?DEFAULT_SOCK_OPTS, fun parse_sock_opts/1}
+-spec parse_rep_doc({[_]}) -> #{}.
+parse_rep_doc(RepDoc) ->
+    {ok, Rep} = try
+        parse_rep(RepDoc, null)
+    catch
+        throw:{error, Reason} ->
+            Stack = erlang:get_stacktrace(),
+            LogErr1 = "~p parse_rep_doc fail ~p ~p",
+            couch_log:error(LogErr1, [?MODULE, Reason, Stack]),
+            throw({bad_rep_doc, Reason});
+        Tag:Err ->
+            Stack = erlang:get_stacktrace(),
+            LogErr2 = "~p parse_rep_doc fail ~p:~p ~p",
+            couch_log:error(LogErr2, [?MODULE, Tag, Err, Stack]),
+            throw({bad_rep_doc, couch_util:to_binary({Tag, Err})})
+    end,
+    Rep.
+-spec parse_transient_rep({[_]} | #{}, user_name()) -> {ok, #{}}.
+parse_transient_rep({Props} = EJson, UserName) when is_list(Props) ->
+    Str = couch_util:json_encode(EJson),
+    Map = couch_util:json_decode(Str, [return_maps]),
+    parse_transient_rep(Map, UserName);
+parse_transient_rep(#{} = Body, UserName) ->
+    {ok, Rep} = try
+        parse_rep(Body, UserName)
+    catch
+        throw:{error, Reason} ->
+            Stack = erlang:get_stacktrace(),
+            LogErr1 = "~p parse_transient_rep fail ~p ~p",
+            couch_log:error(LogErr1, [?MODULE, Reason, Stack]),
+            throw({bad_request, Reason});
+        Tag:Err ->
+            Stack = erlang:get_stacktrace(),
+            LogErr2 = "~p parse_transient_rep fail ~p ~p",
+            couch_log:error(LogErr2, [?MODULE, Tag, Err, Stack]),
+            throw({bad_request, couch_util:to_binary({Tag, Err})})
+    end,
+    #{?OPTIONS := Options} = Rep,
+    Cancel = maps:get(<<"cancel">>, Options, false),
+    Id = maps:get(<<"id">>, Options, nil),
+    case {Cancel, Id} of
+        {true, nil} ->
+            % Cancel request with no id, must parse id out of body contents

Review comment:
       Ohh, is this a "id specified explicitly" vs "id derived from replication parameters" difference? If so, nevermind.

File path: src/couch_replicator/src/couch_replicator_parse.erl
@@ -0,0 +1,534 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+    parse_rep_doc/1,
+    parse_transient_rep/2,
+    parse_rep/2,
+    parse_rep_db/3
+-define(DEFAULT_SOCK_OPTS, "[{keepalive, true}, {nodelay, false}]").
+-define(VALID_SOCK_OPTS, [
+    buffer, delay_send, exit_on_close, ipv6_v6only, keepalive, nodelay, recbuf,
+    send_timeout, send_timout_close, sndbuf, priority, tos, tclass
+-define(VALID_PROXY_PROTOCOLS, [http, https, socks5]).
+    {"worker_processes", "4", fun list_to_integer/1},
+    {"worker_batch_size", "500", fun list_to_integer/1},
+    {"http_connections", "20", fun list_to_integer/1},
+    {"connection_timeout", "30000", fun list_to_integer/1},
+    {"retries_per_request", "5", fun list_to_integer/1},
+    {"use_checkpoints", "true", fun list_to_existing_atom/1},
+    {"checkpoint_interval", "30000", fun list_to_integer/1},
+    {"socket_options", ?DEFAULT_SOCK_OPTS, fun parse_sock_opts/1}
+-spec parse_rep_doc({[_]}) -> #{}.
+parse_rep_doc(RepDoc) ->
+    {ok, Rep} = try
+        parse_rep(RepDoc, null)
+    catch
+        throw:{error, Reason} ->
+            Stack = erlang:get_stacktrace(),
+            LogErr1 = "~p parse_rep_doc fail ~p ~p",
+            couch_log:error(LogErr1, [?MODULE, Reason, Stack]),
+            throw({bad_rep_doc, Reason});
+        Tag:Err ->
+            Stack = erlang:get_stacktrace(),
+            LogErr2 = "~p parse_rep_doc fail ~p:~p ~p",
+            couch_log:error(LogErr2, [?MODULE, Tag, Err, Stack]),
+            throw({bad_rep_doc, couch_util:to_binary({Tag, Err})})
+    end,
+    Rep.
+-spec parse_transient_rep({[_]} | #{}, user_name()) -> {ok, #{}}.
+parse_transient_rep({Props} = EJson, UserName) when is_list(Props) ->
+    Str = couch_util:json_encode(EJson),
+    Map = couch_util:json_decode(Str, [return_maps]),
+    parse_transient_rep(Map, UserName);
+parse_transient_rep(#{} = Body, UserName) ->
+    {ok, Rep} = try
+        parse_rep(Body, UserName)
+    catch
+        throw:{error, Reason} ->
+            Stack = erlang:get_stacktrace(),
+            LogErr1 = "~p parse_transient_rep fail ~p ~p",
+            couch_log:error(LogErr1, [?MODULE, Reason, Stack]),
+            throw({bad_request, Reason});
+        Tag:Err ->
+            Stack = erlang:get_stacktrace(),
+            LogErr2 = "~p parse_transient_rep fail ~p ~p",
+            couch_log:error(LogErr2, [?MODULE, Tag, Err, Stack]),
+            throw({bad_request, couch_util:to_binary({Tag, Err})})
+    end,
+    #{?OPTIONS := Options} = Rep,
+    Cancel = maps:get(<<"cancel">>, Options, false),
+    Id = maps:get(<<"id">>, Options, nil),
+    case {Cancel, Id} of
+        {true, nil} ->
+            % Cancel request with no id, must parse id out of body contents

Review comment:
       Where else would the job id come from?

File path: src/couch_replicator/src/couch_replicator_docs.erl
@@ -349,331 +182,37 @@ update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) ->
     _ ->
         % Might not succeed - when the replication doc is deleted right
         % before this update (not an error, ignore).
-        save_rep_doc(RepDbName, RepDoc#doc{body = {NewRepDocBody}})
+        save_rep_doc(RepDbName, RepDbUUID, RepDoc#doc{body = {NewRepDocBody}})
-open_rep_doc(DbName, DocId) ->
-    case couch_db:open_int(DbName, [?CTX, sys_db]) of
-        {ok, Db} ->
-            try
-                couch_db:open_doc(Db, DocId, [ejson_body])
-            after
-                couch_db:close(Db)
-            end;
-        Else ->
-            Else
+open_rep_doc(DbName, DbUUID, DocId) when is_binary(DbName), is_binary(DbUUID),
+            is_binary(DocId) ->
+    try
+        case fabric2_db:open(DbName, [?CTX, sys_db, {uuid, DbUUID}]) of
+            {ok, Db} -> fabric2_db:open_doc(Db, DocId, [ejson_body]);
+            Else -> Else
+        end
+    catch
+        error:database_does_not_exist ->
+            {not_found, database_does_not_exist}
-save_rep_doc(DbName, Doc) ->
-    {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
+save_rep_doc(DbName, DbUUID, Doc) when is_binary(DbName), is_binary(DbUUID) ->
-        couch_db:update_doc(Db, Doc, [])
+        {ok, Db} = fabric2_db:open(DbName, [?CTX, sys_db, {uuid, DbUUID}]),
+        fabric2_db:update_doc(Db, Doc, [])
+        error:database_does_not_exist ->
+            {not_found, database_does_not_exist};
         % User can accidently write a VDU which prevents _replicator from

Review comment:
       Comment needs tweaked to mention the BDU instead of VDU I think.

