You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@couchdb.apache.org by GitBox <gi...@apache.org> on 2020/09/11 19:24:09 UTC

[GitHub] [couchdb] davisp commented on a change in pull request #3015: Replicator implementation for CouchDB 4

davisp commented on a change in pull request #3015:
URL: https://github.com/apache/couchdb/pull/3015#discussion_r487181771



##########
File path: src/fabric/src/fabric2_db.erl
##########
@@ -243,9 +245,16 @@ delete(DbName, Options) ->
 undelete(DbName, TgtDbName, TimeStamp, Options) ->
     case validate_dbname(TgtDbName) of
         ok ->
-            fabric2_fdb:transactional(DbName, Options, fun(TxDb) ->
-                fabric2_fdb:undelete(TxDb, TgtDbName, TimeStamp)
-            end);
+            Resp = fabric2_fdb:transactional(DbName, Options,
+                fun(TxDb) ->
+                    fabric2_fdb:undelete(TxDb, TgtDbName, TimeStamp)
+                end
+            ),
+            if Resp /= ok -> Resp; true ->
+                {ok, Db} = open(TgtDbName, Options),
+                fabric2_db_plugin:after_db_create(TgtDbName, get_uuid(Db)),
+                Resp
+            end;

Review comment:
       Style suggestion to reinforce that Resp is always returned rather than having it be the end of both `if` branches.
   
   ```erlang
   if Resp /= ok -> ok; true ->
       {ok, Db} = open(TgtDbName, Options),
       fabric2_db_plugin:after_db_create(TgtDbName, get_uuid(Db))
   end,
   Resp;
   ```

##########
File path: src/couch_replicator/src/couch_replicator_docs.erl
##########
@@ -13,306 +13,139 @@
 -module(couch_replicator_docs).
 
 -export([
-    parse_rep_doc/1,
-    parse_rep_doc/2,
-    parse_rep_db/3,
-    parse_rep_doc_without_id/1,
-    parse_rep_doc_without_id/2,
+    remove_state_fields/3,
+    update_completed/4,
+    update_failed/4,
     before_doc_update/3,
-    after_doc_read/2,
-    ensure_rep_ddoc_exists/1,
-    ensure_cluster_rep_ddoc_exists/1,
-    remove_state_fields/2,
-    update_doc_completed/3,
-    update_failed/3,
-    update_rep_id/1,
-    update_triggered/2,
-    update_error/2
+    after_doc_read/2
 ]).
 
 
 -include_lib("couch/include/couch_db.hrl").
--include_lib("ibrowse/include/ibrowse.hrl").
--include_lib("mem3/include/mem3.hrl").
 -include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
 -include("couch_replicator.hrl").
--include("couch_replicator_js_functions.hrl").
-
--import(couch_util, [
-    get_value/2,
-    get_value/3,
-    to_binary/1
-]).
-
--import(couch_replicator_utils, [
-    get_json_value/2,
-    get_json_value/3
-]).
 
 
--define(REP_DB_NAME, <<"_replicator">>).
--define(REP_DESIGN_DOC, <<"_design/_replicator">>).
 -define(OWNER, <<"owner">>).
 -define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}).
 -define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
 
 
-remove_state_fields(DbName, DocId) ->
-    update_rep_doc(DbName, DocId, [
-        {<<"_replication_state">>, undefined},
-        {<<"_replication_state_time">>, undefined},
-        {<<"_replication_state_reason">>, undefined},
-        {<<"_replication_id">>, undefined},
-        {<<"_replication_stats">>, undefined}]).
+remove_state_fields(null, null, null) ->
+    ok;
 
+remove_state_fields(DbName, DbUUID, DocId) ->
+    update_rep_doc(DbName, DbUUID, DocId, [
+        {?REPLICATION_STATE, undefined},
+        {?REPLICATION_STATE_TIME, undefined},
+        {?REPLICATION_STATE_REASON, undefined},
+        {?REPLICATION_ID, undefined},
+        {?REPLICATION_STATS, undefined}]),

Review comment:
       Style nit, but the closing `]),` should be on its own line. Same for the other similar places for  other instances in this module that I won't bother enumerating.

##########
File path: src/couch_replicator/src/couch_replicator_parse.erl
##########
@@ -0,0 +1,534 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_parse).
+
+
+-export([
+    parse_rep_doc/1,
+    parse_transient_rep/2,
+    parse_rep/2,
+    parse_rep_db/3
+]).
+
+
+-include_lib("ibrowse/include/ibrowse.hrl").
+-include("couch_replicator.hrl").
+
+
+-define(DEFAULT_SOCK_OPTS, "[{keepalive, true}, {nodelay, false}]").
+-define(VALID_SOCK_OPTS, [
+    buffer, delay_send, exit_on_close, ipv6_v6only, keepalive, nodelay, recbuf,

Review comment:
       Lets make these 1 item per line and alphabetized.

##########
File path: src/couch_replicator/src/couch_replicator_job.erl
##########
@@ -0,0 +1,1605 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_job).
+
+
+-behaviour(gen_server).
+
+
+-export([
+    start_link/0
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    format_status/2,
+    code_change/3
+]).
+
+-export([
+    accept/0,
+    health_threshold/0
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
+-include("couch_replicator.hrl").
+
+
+-define(LOWEST_SEQ, 0).
+-define(DEFAULT_CHECKPOINT_INTERVAL, 30000).
+-define(STARTUP_JITTER_DEFAULT, 5000).
+-define(DEFAULT_MIN_BACKOFF_PENALTY_SEC, 32).
+-define(DEFAULT_MAX_BACKOFF_PENALTY_SEC, 2 * 24 * 3600).
+-define(DEFAULT_HEALTH_THRESHOLD_SEC, 2 * 60).
+-define(DEFAULT_MAX_HISTORY, 10).
+-define(DEFAULT_STATS_UPDATE_INTERVAL_SEC, 10).
+
+
+-record(rep_state, {
+    job,
+    job_data,
+    id,
+    base_id,
+    doc_id,
+    db_name,
+    db_uuid,
+    source_name,
+    target_name,
+    source,
+    target,
+    history,
+    checkpoint_history,
+    start_seq,
+    committed_seq,
+    current_through_seq,
+    seqs_in_progress = [],
+    highest_seq_done = {0, ?LOWEST_SEQ},
+    source_log,
+    target_log,
+    rep_starttime,
+    src_starttime,
+    tgt_starttime,
+    checkpoint_timer,
+    stats_timer,
+    changes_queue,
+    changes_manager,
+    changes_reader,
+    workers,
+    stats = couch_replicator_stats:new(),
+    session_id,
+    source_seq = nil,
+    use_checkpoints = true,
+    checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL,
+    user = null,
+    options = #{}
+}).
+
+
+start_link() ->
+    gen_server:start_link(?MODULE, [], []).
+
+
+init(_) ->
+    process_flag(trap_exit, true),
+    {ok, delayed_init, 0}.
+
+
+terminate(normal, #rep_state{} = State) ->
+    #rep_state{
+        job = Job,
+        job_data = JobData,
+        checkpoint_history = History
+    } = State,
+    ok = complete_job(undefined, Job, JobData, History),
+    close_endpoints(State);
+
+terminate(shutdown, #rep_state{} = State0) ->
+    % Replication stopped by the job server
+    State1 = cancel_timers(State0),
+    State3 = case do_checkpoint(State1) of
+        {ok, State2} ->
+            State2;
+        Error ->
+            Msg = "~p : Failed last checkpoint. Job: ~p Error: ~p",
+            couch_log:error(Msg, [?MODULE, State1#rep_state.id, Error]),
+            State1
+    end,
+    #rep_state{job = Job, job_data = JobData} = State3,
+    ok = reschedule(undefined, Job, JobData),
+    ok = close_endpoints(State3);
+
+terminate({shutdown, Error}, {init_error, Stack}) ->
+    % Termination in init, before the job had initialized
+    case Error of
+        max_backoff -> couch_log:warning("~p job backed off", [?MODULE]);
+        finished -> couch_log:notice("~p job finished in init", [?MODULE]);
+        _ -> couch_log:error("~p job failed ~p ~p", [?MODULE, Error, Stack])
+    end,
+    ok;
+
+terminate({shutdown, finished}, #rep_state{} = State) ->
+    % Job state was already updated and job is marked as finished
+    ok = close_endpoints(State);
+
+terminate({shutdown, halt}, #rep_state{} = State) ->
+    % Job is re-enqueued and possibly already running somewhere else
+    couch_log:error("~p job ~p halted", [?MODULE, State#rep_state.id]),
+    ok = close_endpoints(State);
+
+terminate(Reason0, #rep_state{} = State0) ->
+    State = update_job_state(State0),
+    Reason = case Reason0 of
+        {shutdown, Err} -> Err;
+        _ -> Reason0
+    end,
+    #rep_state{
+        id = RepId,
+        job = Job,
+        job_data = JobData,
+        source_name = Source,
+        target_name = Target
+    } = State,
+    couch_log:error("Replication `~s` (`~s` -> `~s`) failed: ~p",
+        [RepId, Source, Target, Reason]),
+    ok = reschedule_on_error(undefined, Job, JobData, Reason),
+    ok = close_endpoints(State).
+
+
+handle_call({add_stats, Stats}, From, State) ->
+    gen_server:reply(From, ok),
+    NewStats = couch_replicator_stats:sum_stats(State#rep_state.stats, Stats),
+    {noreply, State#rep_state{stats = NewStats}};
+
+handle_call({report_seq_done, Seq, StatsInc}, From, #rep_state{} = State) ->
+    #rep_state{
+        seqs_in_progress = SeqsInProgress,
+        highest_seq_done = HighestDone,
+        current_through_seq = ThroughSeq,
+        stats = Stats
+    } = State,
+    gen_server:reply(From, ok),
+    {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of
+        [] ->
+            {Seq, []};
+        [Seq | Rest] ->
+            {Seq, Rest};
+        [_ | _] ->
+            {ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)}
+    end,
+    NewHighestDone = lists:max([HighestDone, Seq]),
+    NewThroughSeq = case NewSeqsInProgress of
+        [] ->
+            lists:max([NewThroughSeq0, NewHighestDone]);
+        _ ->
+            NewThroughSeq0
+    end,
+    couch_log:debug("Worker reported seq ~p, through seq was ~p, "
+        "new through seq is ~p, highest seq done was ~p, "
+        "new highest seq done is ~p~n"
+        "Seqs in progress were: ~p~nSeqs in progress are now: ~p",
+        [Seq, ThroughSeq, NewThroughSeq, HighestDone,
+            NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
+    NewState = State#rep_state{
+        stats = couch_replicator_stats:sum_stats(Stats, StatsInc),
+        current_through_seq = NewThroughSeq,
+        seqs_in_progress = NewSeqsInProgress,
+        highest_seq_done = NewHighestDone
+    },
+    {noreply, maybe_update_job_state(NewState)};
+
+handle_call(Msg, _From, St) ->
+    {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast({report_seq, Seq},
+    #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
+    NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress),
+    {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}};
+
+handle_cast(Msg, St) ->
+    {stop, {bad_cast, Msg}, St}.
+
+
+handle_info(timeout, delayed_init) ->
+    try delayed_init() of
+        {ok, State} -> {noreply, State};
+        {stop, Reason, State} -> {stop, Reason, State}
+    catch
+        exit:{shutdown, Exit} when Exit =:= finished orelse Exit =:= halt ->
+            Stack = erlang:get_stacktrace(),
+            {stop, {shutdown, Exit}, {init_error, Stack}};
+        _Tag:Error ->
+            ShutdownReason = {error, replication_start_error(Error)},
+            Stack = erlang:get_stacktrace(),
+            {stop, {shutdown, ShutdownReason}, {init_error, Stack}}
+    end;
+
+handle_info(stats_update, #rep_state{} = State) ->
+    State1 = cancel_stats_timer(State),
+    State2 = update_job_state(State1),
+    {noreply, State2};
+
+handle_info(checkpoint, State0) ->
+    State = cancel_checkpoint_timer(State0),
+    ok = check_user_filter(State),
+    case do_checkpoint(State) of
+        {ok, State1} ->
+            couch_stats:increment_counter([couch_replicator, checkpoints,
+                success]),
+            {noreply, start_checkpoint_timer(State1)};
+        Error ->
+            couch_stats:increment_counter([couch_replicator, checkpoints,
+                failure]),
+            {stop, Error, State}
+    end;
+
+handle_info(shutdown, St) ->
+    {stop, shutdown, St};
+
+handle_info({'EXIT', Pid, max_backoff}, State) ->
+    couch_log:error("Max backoff reached child process ~p", [Pid]),
+    {stop, {shutdown, max_backoff}, State};
+
+handle_info({'EXIT', Pid, {shutdown, max_backoff}}, State) ->
+    couch_log:error("Max backoff reached child process ~p", [Pid]),
+    {stop, {shutdown, max_backoff}, State};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
+    {noreply, State};
+
+handle_info({'EXIT', Pid, Reason0}, #rep_state{changes_reader=Pid} = State) ->
+    couch_stats:increment_counter([couch_replicator, changes_reader_deaths]),
+    Reason = case Reason0 of
+        {changes_req_failed, _, _} = HttpFail ->
+            HttpFail;
+        {http_request_failed, _, _, {error, {code, Code}}} ->
+            {changes_req_failed, Code};
+        {http_request_failed, _, _, {error, Err}} ->
+            {changes_req_failed, Err};
+        Other ->
+            {changes_reader_died, Other}
+    end,
+    couch_log:error("ChangesReader process died with reason: ~p", [Reason]),
+    {stop, {shutdown, Reason}, cancel_timers(State)};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager=Pid} = State) ->
+    {noreply, State};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager=Pid} = State) ->
+    couch_stats:increment_counter([couch_replicator, changes_manager_deaths]),
+    couch_log:error("ChangesManager process died with reason: ~p", [Reason]),
+    {stop, {shutdown, {changes_manager_died, Reason}}, cancel_timers(State)};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
+    {noreply, State};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
+    couch_stats:increment_counter([couch_replicator, changes_queue_deaths]),
+    couch_log:error("ChangesQueue process died with reason: ~p", [Reason]),
+    {stop, {shutdown, {changes_queue_died, Reason}}, cancel_timers(State)};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
+    case Workers -- [Pid] of
+        Workers ->
+            couch_log:error("unknown pid bit the dust ~p ~n", [Pid]),
+            {noreply, State#rep_state{workers = Workers}};
+            %% not clear why a stop was here before
+            %%{stop, {unknown_process_died, Pid, normal}, State};
+        [] ->
+            catch unlink(State#rep_state.changes_manager),
+            catch exit(State#rep_state.changes_manager, kill),
+            do_last_checkpoint(State);
+        Workers2 ->
+            {noreply, State#rep_state{workers = Workers2}}
+    end;
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
+    State2 = cancel_timers(State),
+    case lists:member(Pid, Workers) of
+        false ->
+            {stop, {unknown_process_died, Pid, Reason}, State2};
+        true ->
+            couch_stats:increment_counter([couch_replicator, worker_deaths]),
+            StopReason = case Reason of
+                {shutdown, _} = Err ->
+                    Err;
+                Other ->
+                    ErrLog = "Worker ~p died with reason: ~p",
+                    couch_log:error(ErrLog, [Pid, Reason]),
+                    {worker_died, Pid, Other}
+            end,
+            {stop, StopReason, State2}
+    end;
+
+handle_info({Ref, ready}, St) when is_reference(Ref) ->
+    LogMsg = "~p : spurious erlfdb future ready message ~p",
+    couch_log:notice(LogMsg, [?MODULE, Ref]),
+    {noreply, St};
+
+handle_info(Msg, St) ->
+    {stop, {bad_info, Msg}, St}.
+
+
+format_status(_Opt, [_PDict, State]) ->
+    #rep_state{
+        id = Id,
+        source = Source,
+        target = Target,
+        start_seq = StartSeq,
+        source_seq = SourceSeq,
+        committed_seq = CommitedSeq,
+        current_through_seq = ThroughSeq,
+        highest_seq_done = HighestSeqDone,
+        session_id = SessionId,
+        doc_id = DocId,
+        db_name = DbName,
+        options = Options
+    } = state_strip_creds(State),
+    [
+        {rep_id, Id},
+        {source, couch_replicator_api_wrap:db_uri(Source)},
+        {target, couch_replicator_api_wrap:db_uri(Target)},
+        {db_name, DbName},
+        {doc_id, DocId},
+        {options, Options},
+        {session_id, SessionId},
+        {start_seq, StartSeq},
+        {source_seq, SourceSeq},
+        {committed_seq, CommitedSeq},
+        {current_through_seq, ThroughSeq},
+        {highest_seq_done, HighestSeqDone}
+    ].
+
+
+code_change(_OldVsn, #rep_state{}=State, _Extra) ->
+    {ok, State}.
+
+
+accept() ->
+    couch_stats:increment_counter([couch_replicator, jobs, accepts]),
+    Now = erlang:system_time(second),
+    case couch_replicator_jobs:accept_job(Now + 5) of
+        {ok, Job, #{?REP := Rep} = JobData} ->
+            Normal = case Rep of
+                #{?OPTIONS := #{} = Options} ->
+                    not maps:get(<<"continuous">>, Options, false);
+                _ ->
+                    true
+            end,
+            couch_replicator_job_server:accepted(self(), Normal),
+            {ok, Job, JobData};
+        {error, not_found} ->
+            timer:sleep(accept_jitter_msec()),
+            ?MODULE:accept()
+    end.
+
+
+% Health threshold is the minimum amount of time an unhealthy job should run
+% crashing before it is considered to be healthy again. HealtThreashold should
+% not be 0 as jobs could start and immediately crash, and it shouldn't be
+% infinity, since then  consecutive crashes would accumulate forever even if
+% job is back to normal.
+health_threshold() ->
+    config:get_integer("replicator", "health_threshold_sec",
+        ?DEFAULT_HEALTH_THRESHOLD_SEC).
+
+
+delayed_init() ->
+    {ok, Job, JobData} = accept(),
+    try do_init(Job, JobData) of
+        State = #rep_state{} -> {ok, State}
+    catch
+        exit:{http_request_failed, _, _, max_backoff} ->
+            Stack = erlang:get_stacktrace(),
+            reschedule_on_error(undefined, Job, JobData, max_backoff),
+            {stop, {shutdown, max_backoff}, {init_error, Stack}};
+        exit:{shutdown, Exit} when Exit =:= finished orelse Exit =:= halt ->
+            Stack = erlang:get_stacktrace(),
+            {stop, {shutdown, Exit}, {init_error, Stack}};
+        _Tag:Error ->
+            Reason = {error, replication_start_error(Error)},
+            Stack = erlang:get_stacktrace(),
+            ErrMsg = "~p : job ~p failed during startup ~p stack:~p",
+            couch_log:error(ErrMsg, [?MODULE, Job, Reason, Stack]),
+            reschedule_on_error(undefined, Job, JobData, Reason),
+            {stop, {shutdown, Reason}, {init_error, Stack}}
+    end.
+
+
+do_init(Job, #{} = JobData) ->
+    couch_stats:increment_counter([couch_replicator, jobs, starts]),
+    % This may make a network request, then may fail and reschedule the job
+    {RepId, BaseId} = get_rep_id(undefined, Job, JobData),
+    #{
+        ?DB_NAME := DbName,
+        ?DB_UUID := DbUUID,
+        ?DOC_ID := DocId
+    } = JobData,
+
+    ok = couch_replicator_docs:remove_state_fields(DbName, DbUUID, DocId),
+
+    % Finish if job is in a failed state already
+    case JobData of
+        #{?STATE := ?ST_FAILED, ?STATE_INFO := Error} ->
+            ok = fail_job(undefined, Job, JobData, Error),
+            exit({shutdown, finished});
+        #{?STATE := St} when is_binary(St), St =/= ?ST_FAILED ->
+            ok
+    end,
+
+    JobsTx = couch_jobs_fdb:get_jtx(),
+    {Job1, JobData1, Owner} = couch_jobs_fdb:tx(JobsTx, fun(JTx) ->
+        init_job_data(JTx, Job, JobData, RepId, BaseId)
+    end),
+
+    % Handle ownership decision here to be outside of the transaction
+    case Owner of
+        owner -> ok;
+        not_owner -> exit({shutdown, finished})
+    end,
+
+    #rep_state{
+        source = Source,
+        target = Target,
+        start_seq = {_Ts, StartSeq},
+        options = Options,
+        doc_id = DocId,
+        db_name = DbName
+    } = State = init_state(Job1, JobData1),
+
+    NumWorkers = maps:get(<<"worker_processes">>, Options),
+    BatchSize = maps:get(<<"worker_batch_size">>, Options),
+    {ok, ChangesQueue} = couch_work_queue:new([
+        {max_items, BatchSize * NumWorkers * 2},
+        {max_size, 100 * 1024 * NumWorkers}
+    ]),
+
+    % This starts the _changes reader process. It adds the changes from the
+    % source db to the ChangesQueue.
+    {ok, ChangesReader} = couch_replicator_changes_reader:start_link(
+        StartSeq, Source, ChangesQueue, Options
+    ),
+
+    % Changes manager - responsible for dequeing batches from the changes queue
+    % and deliver them to the worker processes.
+    ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize),
+
+    % This starts the worker processes. They ask the changes queue manager for
+    % a a batch of _changes rows to process -> check which revs are missing in
+    % the target, and for the missing ones, it copies them from the source to
+    % the target.
+    MaxConns = maps:get(<<"http_connections">>, Options),
+    Workers = lists:map(fun(_) ->
+        couch_stats:increment_counter([couch_replicator, workers_started]),
+        {ok, Pid} = couch_replicator_worker:start_link(self(), Source, Target,
+            ChangesManager, MaxConns),
+        Pid
+    end, lists:seq(1, NumWorkers)),
+
+    log_replication_start(State),
+
+    State1 = State#rep_state{
+            changes_queue = ChangesQueue,
+            changes_manager = ChangesManager,
+            changes_reader = ChangesReader,
+            workers = Workers
+    },
+
+    update_job_state(State1).
+
+
+init_job_data(#{jtx := true} = JTx, Job, #{?REP_ID := RepId} = JobData, RepId,
+        _BaseId) ->
+    {Job, JobData, check_ownership(JTx, Job, JobData)};
+
+init_job_data(#{jtx := true} = JTx, Job, #{} = JobData, RepId, BaseId) ->
+    #{
+        ?REP := Rep,
+        ?REP_ID := OldRepId,
+        ?DB_UUID := DbUUID,
+        ?DOC_ID := DocId
+    } = JobData,
+    JobId = couch_replicator_ids:job_id(Rep, DbUUID, DocId),
+    Now = erlang:system_time(second),
+    JobData1 = JobData#{
+        ?REP_ID := RepId,
+        ?BASE_ID := BaseId,
+        ?STATE := ?ST_RUNNING,
+        ?STATE_INFO := null,
+        ?LAST_START := Now,
+        ?REP_NODE := erlang:atom_to_binary(node(), utf8),
+        ?REP_PID := list_to_binary(pid_to_list(self())),
+        ?LAST_UPDATED := Now
+    },
+    JobData2 = case is_binary(OldRepId) andalso OldRepId =/= RepId of
+        true ->
+            % Handle Replication ID change
+            ok = couch_replicator_jobs:clear_old_rep_id(JTx, JobId, OldRepId),
+            JobData1#{
+                ?REP_STATS := #{},
+                ?JOB_HISTORY := []
+            };
+        false ->
+            JobData1
+    end,
+    JobData3 = hist_append(?HIST_STARTED, Now, JobData2, undefined),
+    case check_ownership(JTx, Job, JobData3) of
+        owner ->
+            couch_stats:increment_counter([couch_replicator, jobs, starts]),
+            {Job1, JobData4} = update_job_data(JTx, Job, JobData3),
+            {Job1, JobData4, owner};
+        not_owner ->
+            {Job, JobData3, not_owner}
+    end.
+
+
+check_ownership(#{jtx := true} = JTx, Job, JobData) ->

Review comment:
       The ownership issue here is covering the case when a _replicator database has more than one document that ends up generating the same replication id, right?

##########
File path: src/couch_replicator/src/couch_replicator_parse.erl
##########
@@ -0,0 +1,534 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_parse).
+
+
+-export([
+    parse_rep_doc/1,
+    parse_transient_rep/2,
+    parse_rep/2,
+    parse_rep_db/3
+]).
+
+
+-include_lib("ibrowse/include/ibrowse.hrl").
+-include("couch_replicator.hrl").
+
+
+-define(DEFAULT_SOCK_OPTS, "[{keepalive, true}, {nodelay, false}]").
+-define(VALID_SOCK_OPTS, [
+    buffer, delay_send, exit_on_close, ipv6_v6only, keepalive, nodelay, recbuf,
+    send_timeout, send_timout_close, sndbuf, priority, tos, tclass
+]).
+-define(VALID_PROXY_PROTOCOLS, [http, https, socks5]).
+-define(CONFIG_DEFAULTS, [
+    {"worker_processes", "4", fun list_to_integer/1},
+    {"worker_batch_size", "500", fun list_to_integer/1},
+    {"http_connections", "20", fun list_to_integer/1},
+    {"connection_timeout", "30000", fun list_to_integer/1},
+    {"retries_per_request", "5", fun list_to_integer/1},
+    {"use_checkpoints", "true", fun list_to_existing_atom/1},
+    {"checkpoint_interval", "30000", fun list_to_integer/1},
+    {"socket_options", ?DEFAULT_SOCK_OPTS, fun parse_sock_opts/1}
+]).
+
+
+-spec parse_rep_doc({[_]}) -> #{}.
+parse_rep_doc(RepDoc) ->
+    {ok, Rep} = try
+        parse_rep(RepDoc, null)
+    catch
+        throw:{error, Reason} ->
+            Stack = erlang:get_stacktrace(),
+            LogErr1 = "~p parse_rep_doc fail ~p ~p",
+            couch_log:error(LogErr1, [?MODULE, Reason, Stack]),
+            throw({bad_rep_doc, Reason});
+        Tag:Err ->
+            Stack = erlang:get_stacktrace(),
+            LogErr2 = "~p parse_rep_doc fail ~p:~p ~p",
+            couch_log:error(LogErr2, [?MODULE, Tag, Err, Stack]),
+            throw({bad_rep_doc, couch_util:to_binary({Tag, Err})})
+    end,
+    Rep.
+
+
+-spec parse_transient_rep({[_]} | #{}, user_name()) -> {ok, #{}}.
+parse_transient_rep({Props} = EJson, UserName) when is_list(Props) ->
+    Str = couch_util:json_encode(EJson),
+    Map = couch_util:json_decode(Str, [return_maps]),
+    parse_transient_rep(Map, UserName);
+
+parse_transient_rep(#{} = Body, UserName) ->
+    {ok, Rep} = try
+        parse_rep(Body, UserName)
+    catch
+        throw:{error, Reason} ->
+            Stack = erlang:get_stacktrace(),
+            LogErr1 = "~p parse_transient_rep fail ~p ~p",
+            couch_log:error(LogErr1, [?MODULE, Reason, Stack]),
+            throw({bad_request, Reason});
+        Tag:Err ->
+            Stack = erlang:get_stacktrace(),
+            LogErr2 = "~p parse_transient_rep fail ~p ~p",
+            couch_log:error(LogErr2, [?MODULE, Tag, Err, Stack]),
+            throw({bad_request, couch_util:to_binary({Tag, Err})})
+    end,
+    #{?OPTIONS := Options} = Rep,
+    Cancel = maps:get(<<"cancel">>, Options, false),
+    Id = maps:get(<<"id">>, Options, nil),
+    case {Cancel, Id} of
+        {true, nil} ->
+            % Cancel request with no id, must parse id out of body contents

Review comment:
       Ohh, is this a "id specified explicitly" vs "id derived from replication parameters" difference? If so, nevermind.

##########
File path: src/couch_replicator/src/couch_replicator_parse.erl
##########
@@ -0,0 +1,534 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_parse).
+
+
+-export([
+    parse_rep_doc/1,
+    parse_transient_rep/2,
+    parse_rep/2,
+    parse_rep_db/3
+]).
+
+
+-include_lib("ibrowse/include/ibrowse.hrl").
+-include("couch_replicator.hrl").
+
+
+-define(DEFAULT_SOCK_OPTS, "[{keepalive, true}, {nodelay, false}]").
+-define(VALID_SOCK_OPTS, [
+    buffer, delay_send, exit_on_close, ipv6_v6only, keepalive, nodelay, recbuf,
+    send_timeout, send_timout_close, sndbuf, priority, tos, tclass
+]).
+-define(VALID_PROXY_PROTOCOLS, [http, https, socks5]).
+-define(CONFIG_DEFAULTS, [
+    {"worker_processes", "4", fun list_to_integer/1},
+    {"worker_batch_size", "500", fun list_to_integer/1},
+    {"http_connections", "20", fun list_to_integer/1},
+    {"connection_timeout", "30000", fun list_to_integer/1},
+    {"retries_per_request", "5", fun list_to_integer/1},
+    {"use_checkpoints", "true", fun list_to_existing_atom/1},
+    {"checkpoint_interval", "30000", fun list_to_integer/1},
+    {"socket_options", ?DEFAULT_SOCK_OPTS, fun parse_sock_opts/1}
+]).
+
+
+-spec parse_rep_doc({[_]}) -> #{}.
+parse_rep_doc(RepDoc) ->
+    {ok, Rep} = try
+        parse_rep(RepDoc, null)
+    catch
+        throw:{error, Reason} ->
+            Stack = erlang:get_stacktrace(),
+            LogErr1 = "~p parse_rep_doc fail ~p ~p",
+            couch_log:error(LogErr1, [?MODULE, Reason, Stack]),
+            throw({bad_rep_doc, Reason});
+        Tag:Err ->
+            Stack = erlang:get_stacktrace(),
+            LogErr2 = "~p parse_rep_doc fail ~p:~p ~p",
+            couch_log:error(LogErr2, [?MODULE, Tag, Err, Stack]),
+            throw({bad_rep_doc, couch_util:to_binary({Tag, Err})})
+    end,
+    Rep.
+
+
+-spec parse_transient_rep({[_]} | #{}, user_name()) -> {ok, #{}}.
+parse_transient_rep({Props} = EJson, UserName) when is_list(Props) ->
+    Str = couch_util:json_encode(EJson),
+    Map = couch_util:json_decode(Str, [return_maps]),
+    parse_transient_rep(Map, UserName);
+
+parse_transient_rep(#{} = Body, UserName) ->
+    {ok, Rep} = try
+        parse_rep(Body, UserName)
+    catch
+        throw:{error, Reason} ->
+            Stack = erlang:get_stacktrace(),
+            LogErr1 = "~p parse_transient_rep fail ~p ~p",
+            couch_log:error(LogErr1, [?MODULE, Reason, Stack]),
+            throw({bad_request, Reason});
+        Tag:Err ->
+            Stack = erlang:get_stacktrace(),
+            LogErr2 = "~p parse_transient_rep fail ~p ~p",
+            couch_log:error(LogErr2, [?MODULE, Tag, Err, Stack]),
+            throw({bad_request, couch_util:to_binary({Tag, Err})})
+    end,
+    #{?OPTIONS := Options} = Rep,
+    Cancel = maps:get(<<"cancel">>, Options, false),
+    Id = maps:get(<<"id">>, Options, nil),
+    case {Cancel, Id} of
+        {true, nil} ->
+            % Cancel request with no id, must parse id out of body contents

Review comment:
       Where else would the job id come from?

##########
File path: src/couch_replicator/src/couch_replicator_docs.erl
##########
@@ -349,331 +182,37 @@ update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) ->
     _ ->
         % Might not succeed - when the replication doc is deleted right
         % before this update (not an error, ignore).
-        save_rep_doc(RepDbName, RepDoc#doc{body = {NewRepDocBody}})
+        save_rep_doc(RepDbName, RepDbUUID, RepDoc#doc{body = {NewRepDocBody}})
     end.
 
 
-open_rep_doc(DbName, DocId) ->
-    case couch_db:open_int(DbName, [?CTX, sys_db]) of
-        {ok, Db} ->
-            try
-                couch_db:open_doc(Db, DocId, [ejson_body])
-            after
-                couch_db:close(Db)
-            end;
-        Else ->
-            Else
+open_rep_doc(DbName, DbUUID, DocId) when is_binary(DbName), is_binary(DbUUID),
+            is_binary(DocId) ->
+    try
+        case fabric2_db:open(DbName, [?CTX, sys_db, {uuid, DbUUID}]) of
+            {ok, Db} -> fabric2_db:open_doc(Db, DocId, [ejson_body]);
+            Else -> Else
+        end
+    catch
+        error:database_does_not_exist ->
+            {not_found, database_does_not_exist}
     end.
 
 
-save_rep_doc(DbName, Doc) ->
-    {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
+save_rep_doc(DbName, DbUUID, Doc) when is_binary(DbName), is_binary(DbUUID) ->
     try
-        couch_db:update_doc(Db, Doc, [])
+        {ok, Db} = fabric2_db:open(DbName, [?CTX, sys_db, {uuid, DbUUID}]),
+        fabric2_db:update_doc(Db, Doc, [])
     catch
+        error:database_does_not_exist ->
+            {not_found, database_does_not_exist};
         % User can accidently write a VDU which prevents _replicator from

Review comment:
       Comment needs tweaked to mention the BDU instead of VDU I think.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org