You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2020/09/15 20:14:08 UTC

[couchdb] 11/16: Update frontend replicator modules

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch prototype/fdb-layer
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 5b98e8a6c169449d1a3e362e52e86822ef350ed5
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Fri Aug 28 04:34:21 2020 -0400

    Update frontend replicator modules
    
    The frontend is the part responsible for parsing replication parameters and
    creating replication jobs. Most of that happens in the `couch_replicator`
    module.
    
     - `couch_replicator:replicate/2` is the main API for creating transient
       replication jobs.
    
     - Replication jobs from `_replicator` documents are updated from
      `couch_replicator:after_*` callbacks. `after_db_create/2` besides being
      called on db creation also gets called when a database is undeleted and
      `add_jobs_from_db/1` function will attempt to parse them all.
    
    `couch_replicator` exports monitoring functions `docs/2,3 and jobs/0,1`. Those
    get called from HTTP handlers for `_scheduler/docs` and `_scheduler/jobs`
    respectively.
    
    For hands-on remsh access there some debuging functions such as:
    
     - rescan_jobs/0,1 : Simulates a db being re-created so all the jobs are added
     - reenqueue_jobs/0,1 : Deletes all the jobs from a db then re-adds them
     - remove_jobs/0 : Removes all the replication jobs
     - get_job_ids/0 : Read the RepId -> JobId mapping area
---
 src/couch_replicator/src/couch_replicator.erl      | 716 +++++++++++++--------
 src/couch_replicator/src/couch_replicator_epi.erl  |  58 ++
 .../src/couch_replicator_fabric2_plugin.erl        |  36 ++
 3 files changed, 530 insertions(+), 280 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index b38f31b..f34ac7d 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -14,279 +14,484 @@
 
 -export([
     replicate/2,
-    replication_states/0,
+
+    jobs/0,
     job/1,
-    doc/3,
-    active_doc/2,
-    info_from_doc/2,
-    restart_job/1
+    docs/2,
+    doc/2,
+
+    after_db_create/2,
+    after_db_delete/2,
+    after_doc_write/6,
+
+    ensure_rep_db_exists/0,
+
+    rescan_jobs/0,
+    rescan_jobs/1,
+    reenqueue_jobs/0,
+    reenqueue_jobs/1,
+    remove_jobs/0,
+    get_job_ids/0
 ]).
 
+
 -include_lib("couch/include/couch_db.hrl").
 -include("couch_replicator.hrl").
--include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
--include_lib("couch_mrview/include/couch_mrview.hrl").
--include_lib("mem3/include/mem3.hrl").
-
--define(DESIGN_DOC_CREATION_DELAY_MSEC, 1000).
--define(REPLICATION_STATES, [
-    initializing,  % Just added to scheduler
-    error,         % Could not be turned into a replication job
-    running,       % Scheduled and running
-    pending,       % Scheduled and waiting to run
-    crashing,      % Scheduled but crashing, backed off by the scheduler
-    completed,     % Non-continuous (normal) completed replication
-    failed         % Terminal failure, will not be retried anymore
-]).
-
--import(couch_util, [
-    get_value/2,
-    get_value/3
-]).
 
 
 -spec replicate({[_]}, any()) ->
     {ok, {continuous, binary()}} |
-    {ok, {[_]}} |
+    {ok, #{}} |
     {ok, {cancelled, binary()}} |
     {error, any()} |
     no_return().
-replicate(PostBody, Ctx) ->
-    {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, Ctx),
-    Rep = Rep0#rep{start_time = os:timestamp()},
-    #rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep,
-    case get_value(cancel, Options, false) of
-    true ->
-        CancelRepId = case get_value(id, Options, nil) of
-        nil ->
-            RepId;
-        RepId2 ->
-            RepId2
-        end,
-        case check_authorization(CancelRepId, UserCtx) of
-        ok ->
-            cancel_replication(CancelRepId);
-        not_found ->
-            {error, not_found}
-        end;
-    false ->
-        check_authorization(RepId, UserCtx),
-        {ok, Listener} = rep_result_listener(RepId),
-        Result = do_replication_loop(Rep),
-        couch_replicator_notifier:stop(Listener),
-        Result
+replicate(Body, #user_ctx{name = User} = UserCtx) ->
+    {ok, Id, Rep} = couch_replicator_parse:parse_transient_rep(Body, User),
+    #{?OPTIONS := Options} = Rep,
+    JobId = case couch_replicator_jobs:get_job_id(undefined, Id) of
+        {ok, JobId0} -> JobId0;
+        {error, not_found} -> Id
+    end,
+    case maps:get(<<"cancel">>, Options, false) of
+        true ->
+            case check_authorization(JobId, UserCtx) of
+                ok -> cancel_replication(JobId);
+                not_found -> {error, not_found}
+            end;
+        false ->
+            check_authorization(JobId, UserCtx),
+            ok = start_transient_job(JobId, Rep),
+            case maps:get(<<"continuous">>, Options, false) of
+                true ->
+                    case couch_replicator_jobs:wait_running(JobId) of
+                        {ok, #{?STATE := ?ST_RUNNING} = JobData} ->
+                            {ok, {continuous, maps:get(?REP_ID, JobData)}};
+                        {ok, #{?STATE := ?ST_FAILED} = JobData} ->
+                            {error, maps:get(?STATE_INFO, JobData)};
+                        {error, Error} ->
+                            {error, Error}
+                    end;
+                false ->
+                    case couch_replicator_jobs:wait_result(JobId) of
+                        {ok, #{?STATE := ?ST_COMPLETED} = JobData} ->
+                            {ok, maps:get(?CHECKPOINT_HISTORY, JobData)};
+                        {ok, #{?STATE := ?ST_FAILED} = JobData} ->
+                            {error, maps:get(?STATE_INFO, JobData)};
+                        {error, Error} ->
+                            {error, Error}
+                    end
+            end
     end.
 
 
--spec do_replication_loop(#rep{}) ->
-    {ok, {continuous, binary()}} | {ok, tuple()} | {error, any()}.
-do_replication_loop(#rep{id = {BaseId, Ext} = Id, options = Options} = Rep) ->
-    ok = couch_replicator_scheduler:add_job(Rep),
-    case get_value(continuous, Options, false) of
-    true ->
-        {ok, {continuous, ?l2b(BaseId ++ Ext)}};
-    false ->
-        wait_for_result(Id)
+jobs() ->
+    FoldFun = fun(_JTx, _JobId, CouchJobsState, JobData, Acc) ->
+        case CouchJobsState of
+            pending -> [job_ejson(JobData) | Acc];
+            running -> [job_ejson(JobData) | Acc];
+            finished -> Acc
+        end
+    end,
+    couch_replicator_jobs:fold_jobs(undefined, FoldFun, []).
+
+
+job(Id0) when is_binary(Id0) ->
+    Id1 = couch_replicator_ids:convert(Id0),
+    JobId = case couch_replicator_jobs:get_job_id(undefined, Id1) of
+        {ok, JobId0} -> JobId0;
+        {error, not_found} -> Id1
+    end,
+    case couch_replicator_jobs:get_job_data(undefined, JobId) of
+        {ok, #{} = JobData} -> {ok, job_ejson(JobData)};
+        {error, not_found} -> {error, not_found}
     end.
 
 
--spec rep_result_listener(rep_id()) -> {ok, pid()}.
-rep_result_listener(RepId) ->
-    ReplyTo = self(),
-    {ok, _Listener} = couch_replicator_notifier:start_link(
-        fun({_, RepId2, _} = Ev) when RepId2 =:= RepId ->
-                ReplyTo ! Ev;
-            (_) ->
-                ok
-        end).
+docs(#{} = Db, States) when is_list(States) ->
+    DbName = fabric2_db:name(Db),
+    FoldFun = fun(_JTx, _JobId, _, JobData, Acc) ->
+        case JobData of
+            #{?DB_NAME := DbName, ?STATE := State} ->
+                case {States, lists:member(State, States)} of
+                    {[], _} ->  [doc_ejson(JobData) | Acc];
+                    {[_ | _], true} ->  [doc_ejson(JobData) | Acc];
+                    {[_ | _], false} -> Acc
+                end;
+            #{} ->
+                Acc
+        end
+    end,
+    couch_replicator_jobs:fold_jobs(undefined, FoldFun, []).
 
 
--spec wait_for_result(rep_id()) ->
-    {ok, {[_]}} | {error, any()}.
-wait_for_result(RepId) ->
-    receive
-    {finished, RepId, RepResult} ->
-        {ok, RepResult};
-    {error, RepId, Reason} ->
-        {error, Reason}
+doc(#{} = Db, DocId) when is_binary(DocId) ->
+    DbUUID = fabric2_db:get_uuid(Db),
+    JobId = couch_replicator_ids:job_id(DbUUID, DocId),
+    case couch_replicator_jobs:get_job_data(undefined, JobId) of
+        {ok, #{} = JobData} -> {ok, doc_ejson(JobData)};
+        {error, not_found} ->  {error, not_found}
     end.
 
 
--spec cancel_replication(rep_id()) ->
-    {ok, {cancelled, binary()}} | {error, not_found}.
-cancel_replication({BasedId, Extension} = RepId) ->
-    FullRepId = BasedId ++ Extension,
-    couch_log:notice("Canceling replication '~s' ...", [FullRepId]),
-    case couch_replicator_scheduler:rep_state(RepId) of
-    #rep{} ->
-        ok = couch_replicator_scheduler:remove_job(RepId),
-        couch_log:notice("Replication '~s' cancelled", [FullRepId]),
-        {ok, {cancelled, ?l2b(FullRepId)}};
-    nil ->
-        couch_log:notice("Replication '~s' not found", [FullRepId]),
-        {error, not_found}
-    end.
+after_db_create(DbName, DbUUID) when ?IS_REP_DB(DbName)->
+    couch_stats:increment_counter([couch_replicator, docs, dbs_created]),
+    try fabric2_db:open(DbName, [{uuid, DbUUID}, ?ADMIN_CTX]) of
+        {ok, Db} ->
+            fabric2_fdb:transactional(Db, fun(TxDb) ->
+                ok = add_jobs_from_db(TxDb)
+            end)
+    catch
+        error:database_does_not_exist ->
+            ok
+    end;
+
+after_db_create(_DbName, _DbUUID) ->
+    ok.
+
+
+after_db_delete(DbName, DbUUID) when ?IS_REP_DB(DbName) ->
+    couch_stats:increment_counter([couch_replicator, docs, dbs_deleted]),
+    FoldFun = fun(JTx, JobId, _, JobData, ok) ->
+        case JobData of
+            #{?DB_UUID := DbUUID} ->
+                ok = couch_replicator_jobs:remove_job(JTx, JobId);
+            #{} ->
+                ok
+        end
+    end,
+    couch_replicator_jobs:fold_jobs(undefined, FoldFun, ok);
+
+after_db_delete(_DbName, _DbUUID) ->
+    ok.
+
+
+after_doc_write(#{name := DbName} = Db, #doc{} = Doc, _NewWinner, _OldWinner,
+        _NewRevId, _Seq) when ?IS_REP_DB(DbName) ->
+    couch_stats:increment_counter([couch_replicator, docs, db_changes]),
+    {Props} = Doc#doc.body,
+    case couch_util:get_value(?REPLICATION_STATE, Props) of
+        ?ST_COMPLETED -> ok;
+        ?ST_FAILED -> ok;
+        _ -> process_change(Db, Doc)
+    end;
+
+after_doc_write(_Db, _Doc, _NewWinner, _OldWinner, _NewRevId, _Seq) ->
+    ok.
+
+
+% This is called from supervisor, must return ignore.
+-spec ensure_rep_db_exists() -> ignore.
+ensure_rep_db_exists() ->
+    couch_replicator_jobs:set_timeout(),
+    case config:get_boolean("replicator", "create_replicator_db", false) of
+        true ->
+            UserCtx = #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]},
+            Opts = [{user_ctx, UserCtx}, sys_db],
+            case fabric2_db:create(?REP_DB_NAME, Opts) of
+                {error, file_exists} -> ok;
+                {ok, _Db} -> ok
+            end;
+        false ->
+            ok
+    end,
+    ignore.
 
 
--spec replication_states() -> [atom()].
-replication_states() ->
-    ?REPLICATION_STATES.
+% Testing and debug functions
 
+rescan_jobs() ->
+    rescan_jobs(?REP_DB_NAME).
 
--spec strip_url_creds(binary() | {[_]}) -> binary().
-strip_url_creds(Endpoint) ->
-    try
-        couch_replicator_docs:parse_rep_db(Endpoint, [], []) of
-            #httpdb{url = Url} ->
-                iolist_to_binary(couch_util:url_strip_password(Url))
+
+rescan_jobs(DbName) when is_binary(DbName), ?IS_REP_DB(DbName) ->
+    try fabric2_db:open(DbName, [?ADMIN_CTX]) of
+        {ok, Db} ->
+            after_db_create(DbName, fabric2_db:get_uuid(Db))
     catch
-        throw:{error, local_endpoints_not_supported} ->
-            Endpoint
+        error:database_does_not_exist ->
+            database_does_not_exist
     end.
 
 
--spec job(binary()) -> {ok, {[_]}} | {error, not_found}.
-job(JobId0) when is_binary(JobId0) ->
-    JobId = couch_replicator_ids:convert(JobId0),
-    {Res, _Bad} = rpc:multicall(couch_replicator_scheduler, job, [JobId]),
-    case [JobInfo || {ok, JobInfo} <- Res] of
-        [JobInfo| _] ->
-            {ok, JobInfo};
-        [] ->
-            {error, not_found}
-    end.
+reenqueue_jobs() ->
+    reenqueue_jobs(?REP_DB_NAME).
 
 
--spec restart_job(binary() | list() | rep_id()) ->
-    {ok, {[_]}} | {error, not_found}.
-restart_job(JobId0) ->
-    JobId = couch_replicator_ids:convert(JobId0),
-    {Res, _} = rpc:multicall(couch_replicator_scheduler, restart_job, [JobId]),
-    case [JobInfo || {ok, JobInfo} <- Res] of
-        [JobInfo| _] ->
-            {ok, JobInfo};
-        [] ->
-            {error, not_found}
+reenqueue_jobs(DbName) when is_binary(DbName), ?IS_REP_DB(DbName) ->
+    try fabric2_db:open(DbName, [?ADMIN_CTX]) of
+        {ok, Db} ->
+            DbUUID = fabric2_db:get_uuid(Db),
+            ok = after_db_delete(DbName, DbUUID),
+            ok = after_db_create(DbName, DbUUID)
+    catch
+        error:database_does_not_exist ->
+            database_does_not_exist
     end.
 
 
--spec active_doc(binary(), binary()) -> {ok, {[_]}} | {error, not_found}.
-active_doc(DbName, DocId) ->
-    try
-        Shards = mem3:shards(DbName),
-        Live = [node() | nodes()],
-        Nodes = lists:usort([N || #shard{node=N} <- Shards,
-            lists:member(N, Live)]),
-        Owner = mem3:owner(DbName, DocId, Nodes),
-        case active_doc_rpc(DbName, DocId, [Owner]) of
-            {ok, DocInfo} ->
-                {ok, DocInfo};
+remove_jobs() ->
+    % If we clear a large number of jobs make sure to use batching so we don't
+    % take too long, if use individual transactions, and also don't timeout if
+    % use a single transaction
+    FoldFun = fun
+        (_, JobId, _, _, Acc) when length(Acc) > 250 ->
+            couch_replicator_jobs:remove_jobs(undefined, [JobId | Acc]);
+        (_, JobId, _, _, Acc) ->
+            [JobId | Acc]
+    end,
+    Acc = couch_replicator_jobs:fold_jobs(undefined, FoldFun, []),
+    [] = couch_replicator_jobs:remove_jobs(undefined, Acc),
+    ok.
+
+
+get_job_ids() ->
+    couch_replicator_jobs:get_job_ids(undefined).
+
+
+% Private functions
+
+-spec start_transient_job(binary(), #{}) -> ok.
+start_transient_job(JobId, #{} = Rep) ->
+    JobData = couch_replicator_jobs:new_job(Rep, null, null, null,
+        ?ST_PENDING, null, null),
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        case couch_replicator_jobs:get_job_data(JTx, JobId) of
+            {ok, #{?REP := OldRep, ?STATE := State}} ->
+                SameRep = couch_replicator_utils:compare_reps(Rep, OldRep),
+                Active = State =:= ?ST_PENDING orelse State =:= ?ST_RUNNING,
+                case SameRep andalso Active of
+                    true ->
+                        % If a job with the same paremeters is running we don't
+                        % stop and just ignore the request. This is mainly for
+                        % compatibility where users are able to idempotently
+                        % POST the same job without it being stopped and
+                        % restarted.
+                        ok;
+                    false ->
+                        couch_replicator_jobs:add_job(JTx, JobId, JobData)
+                end;
             {error, not_found} ->
-                active_doc_rpc(DbName, DocId, Nodes -- [Owner])
+                ok = couch_replicator_jobs:add_job(JTx, JobId, JobData)
         end
-    catch
-        % Might be a local database
-        error:database_does_not_exist ->
-            active_doc_rpc(DbName, DocId, [node()])
-    end.
+    end).
 
 
--spec active_doc_rpc(binary(), binary(), [node()]) ->
-    {ok, {[_]}} | {error, not_found}.
-active_doc_rpc(_DbName, _DocId, []) ->
-    {error, not_found};
-active_doc_rpc(DbName, DocId, [Node]) when Node =:= node() ->
-    couch_replicator_doc_processor:doc(DbName, DocId);
-active_doc_rpc(DbName, DocId, Nodes) ->
-    {Res, _Bad} = rpc:multicall(Nodes, couch_replicator_doc_processor, doc,
-        [DbName, DocId]),
-    case [DocInfo || {ok, DocInfo} <- Res] of
-        [DocInfo | _] ->
-            {ok, DocInfo};
-        [] ->
-            {error, not_found}
-    end.
+-spec cancel_replication(job_id()) ->
+    {ok, {cancelled, binary()}} | {error, not_found}.
+cancel_replication(JobId) when is_binary(JobId) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        Id = case couch_replicator_jobs:get_job_data(JTx, JobId) of
+            {ok, #{?REP_ID := RepId}} when is_binary(RepId) ->
+                RepId;
+            _ ->
+                JobId
+        end,
+        couch_log:notice("Canceling replication '~s'", [Id]),
+        case couch_replicator_jobs:remove_job(JTx, JobId) of
+            {error, not_found} ->
+                {error, not_found};
+            ok ->
+                {ok, {cancelled, Id}}
+        end
+    end).
 
 
--spec doc(binary(), binary(), any()) -> {ok, {[_]}} | {error, not_found}.
-doc(RepDb, DocId, UserCtx) ->
-    case active_doc(RepDb, DocId) of
-        {ok, DocInfo} ->
-            {ok, DocInfo};
-        {error, not_found} ->
-            doc_from_db(RepDb, DocId, UserCtx)
-    end.
+process_change(_Db, #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>}) ->
+    ok;
 
+process_change(#{} = Db, #doc{deleted = true} = Doc) ->
+    DbUUID = fabric2_db:get_uuid(Db),
+    JobId = couch_replicator_ids:job_id(DbUUID, Doc#doc.id),
+    couch_replicator_jobs:remove_job(undefined, JobId);
 
--spec doc_from_db(binary(), binary(), any()) -> {ok, {[_]}} | {error, not_found}.
-doc_from_db(RepDb, DocId, UserCtx) ->
-    case fabric:open_doc(RepDb, DocId, [UserCtx, ejson_body]) of
-        {ok, Doc} ->
-            {ok, info_from_doc(RepDb, couch_doc:to_json_obj(Doc, []))};
-         {not_found, _Reason} ->
-            {error, not_found}
-    end.
+process_change(#{} = Db, #doc{deleted = false} = Doc) ->
+    #doc{id = DocId, body = {Props} = Body} = Doc,
+    DbName = fabric2_db:name(Db),
+    DbUUID = fabric2_db:get_uuid(Db),
+    {Rep, DocState, Error} = try
+        Rep0 = couch_replicator_parse:parse_rep_doc(Body),
+        DocState0 = couch_util:get_value(?REPLICATION_STATE, Props, null),
+        {Rep0, DocState0, null}
+    catch
+        throw:{bad_rep_doc, Reason} ->
+            {null, null, couch_replicator_utils:rep_error_to_binary(Reason)}
+    end,
+    JobId = couch_replicator_ids:job_id(DbUUID, DocId),
+    JobData = case Rep of
+        null ->
+            couch_relicator_jobs:new_job(Rep, DbName, DbUUID, DocId,
+                ?ST_FAILED, Error, null);
+        #{} ->
+            couch_replicator_jobs:new_job(Rep, DbName, DbUUID, DocId,
+                ?ST_PENDING, null, DocState)
+    end,
 
+    LogMsg = "~p : replication doc update db:~s doc:~s job_id:~s doc_state:~s",
+    couch_log:notice(LogMsg, [?MODULE, DbName, DocId, JobId, DocState]),
+
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Db), fun(JTx) ->
+        case couch_replicator_jobs:get_job_data(JTx, JobId) of
+            {ok, #{?REP := null, ?STATE_INFO := Error}} when Rep =:= null ->
+                % Same error as before occurred, don't bother updating the job
+                ok;
+            {ok, #{?REP := null}} when Rep =:= null ->
+                % New error so the job is updated
+                couch_replicator_jobs:add_job(JTx, JobId, JobData);
+            {ok, #{?REP := OldRep, ?STATE := State}} when is_map(Rep) ->
+                SameRep = couch_replicator_utils:compare_reps(Rep, OldRep),
+                Active = State =:= ?ST_PENDING orelse State =:= ?ST_RUNNING,
+                case SameRep andalso Active of
+                    true ->
+                        % Document was changed but none of the parameters
+                        % relevent for the replication job have changed, so
+                        % make it a no-op
+                        ok;
+                    false ->
+                        couch_replicator_jobs:add_job(JTx, JobId, JobData)
+                end;
+            {error, not_found} ->
+                couch_replicator_jobs:add_job(JTx, JobId, JobData)
+        end
 
--spec info_from_doc(binary(), {[_]}) -> {[_]}.
-info_from_doc(RepDb, {Props}) ->
-    DocId = get_value(<<"_id">>, Props),
-    Source = get_value(<<"source">>, Props),
-    Target = get_value(<<"target">>, Props),
-    State0 = state_atom(get_value(<<"_replication_state">>, Props, null)),
-    StateTime = get_value(<<"_replication_state_time">>, Props, null),
-    {State1, StateInfo, ErrorCount, StartTime} = case State0 of
-        completed ->
-            {InfoP} = get_value(<<"_replication_stats">>, Props, {[]}),
-            case lists:keytake(<<"start_time">>, 1, InfoP) of
-                {value, {_, Time}, InfoP1} ->
-                    {State0, {InfoP1}, 0, Time};
-                false ->
-                    case lists:keytake(start_time, 1, InfoP) of
-                        {value, {_, Time}, InfoP1} ->
-                            {State0, {InfoP1}, 0, Time};
-                        false ->
-                            {State0, {InfoP}, 0, null}
-                        end
-            end;
-        failed ->
-            Info = get_value(<<"_replication_state_reason">>, Props, nil),
-            EJsonInfo = couch_replicator_utils:ejson_state_info(Info),
-            {State0, EJsonInfo, 1, StateTime};
-        _OtherState ->
-            {null, null, 0, null}
+    end).
+
+
+-spec add_jobs_from_db(#{}) -> ok.
+add_jobs_from_db(#{} = TxDb) ->
+    FoldFun  = fun
+        ({meta, _Meta}, ok) ->
+            {ok, ok};
+        (complete, ok) ->
+            {ok, ok};
+        ({row, Row}, ok) ->
+            Db = TxDb#{tx := undefined},
+            ok = process_change(Db, get_doc(TxDb, Row)),
+            {ok, ok}
     end,
-    {[
-        {doc_id, DocId},
-        {database, RepDb},
-        {id, null},
-        {source, strip_url_creds(Source)},
-        {target, strip_url_creds(Target)},
-        {state, State1},
-        {error_count, ErrorCount},
-        {info, StateInfo},
-        {start_time, StartTime},
-        {last_updated, StateTime}
-     ]}.
-
-
-state_atom(<<"triggered">>) ->
-    triggered;  % This handles a legacy case were document wasn't converted yet
-state_atom(State) when is_binary(State) ->
-    erlang:binary_to_existing_atom(State, utf8);
-state_atom(State) when is_atom(State) ->
-    State.
+    Opts = [{restart_tx, true}],
+    {ok, ok} = fabric2_db:fold_docs(TxDb, FoldFun, ok, Opts),
+    ok.
+
+
+-spec get_doc(#{}, list()) -> #doc{}.
+get_doc(TxDb, Row) ->
+    {_, DocId} = lists:keyfind(id, 1, Row),
+    {ok, #doc{deleted = false} = Doc} = fabric2_db:open_doc(TxDb, DocId, []),
+    Doc.
+
+
+doc_ejson(#{} = JobData) ->
+    #{
+        ?REP := Rep,
+        ?REP_ID := RepId,
+        ?DB_NAME := DbName,
+        ?DOC_ID := DocId,
+        ?STATE := State,
+        ?STATE_INFO := Info0,
+        ?ERROR_COUNT := ErrorCount,
+        ?LAST_UPDATED := LastUpdatedSec,
+        ?REP_NODE := Node,
+        ?REP_PID := Pid,
+        ?REP_STATS := Stats
+    } = JobData,
+
+    #{
+        ?SOURCE := #{<<"url">> := Source, <<"proxy_url">> := SourceProxy},
+        ?TARGET := #{<<"url">> := Target, <<"proxy_url">> := TargetProxy},
+        ?START_TIME := StartSec
+    } = Rep,
+
+    LastUpdatedISO8601 = couch_replicator_utils:iso8601(LastUpdatedSec),
+    StartISO8601 = couch_replicator_utils:iso8601(StartSec),
+
+    Info = case State of
+        ?ST_RUNNING -> Stats;
+        ?ST_PENDING -> Stats;
+        _Other -> Info0
+    end,
+
+    #{
+        <<"id">> => RepId,
+        <<"database">> => DbName,
+        <<"doc_id">> => DocId,
+        <<"source">> => ejson_url(Source),
+        <<"target">> => ejson_url(Target),
+        <<"source_proxy">> => ejson_url(SourceProxy),
+        <<"target_proxy">> => ejson_url(TargetProxy),
+        <<"state">> => State,
+        <<"info">> => Info,
+        <<"error_count">> => ErrorCount,
+        <<"last_updated">> => LastUpdatedISO8601,
+        <<"start_time">> => StartISO8601,
+        <<"node">> => Node,
+        <<"pid">> => Pid
+    }.
+
+
+job_ejson(#{} = JobData) ->
+    #{
+        ?REP := Rep,
+        ?REP_ID := RepId,
+        ?DB_NAME := DbName,
+        ?DOC_ID := DocId,
+        ?STATE := State,
+        ?STATE_INFO := Info0,
+        ?JOB_HISTORY := History,
+        ?REP_STATS := Stats
+    } = JobData,
+
+    #{
+        ?SOURCE := #{<<"url">> := Source},
+        ?TARGET := #{<<"url">> := Target},
+        ?REP_USER := User,
+        ?START_TIME := StartSec
+    } = Rep,
+
+    StartISO8601 = couch_replicator_utils:iso8601(StartSec),
+
+    History1 = lists:map(fun(#{?HIST_TIMESTAMP := Ts} = Evt) ->
+        Evt#{?HIST_TIMESTAMP := couch_replicator_utils:iso8601(Ts)}
+    end, History),
+
+    Info = case State of
+        ?ST_RUNNING -> Stats;
+        ?ST_PENDING -> Stats;
+        _Other -> Info0
+    end,
+
+    #{
+        <<"id">> => RepId,
+        <<"database">> => DbName,
+        <<"doc_id">> => DocId,
+        <<"source">> => ejson_url(Source),
+        <<"target">> => ejson_url(Target),
+        <<"state">> => State,
+        <<"info">> => Info,
+        <<"user">> => User,
+        <<"history">> => History1,
+        <<"start_time">> => StartISO8601
+    }.
+
+
+ejson_url(Url) when is_binary(Url) ->
+    list_to_binary(couch_util:url_strip_password(Url));
+
+ejson_url(null) ->
+    null.
 
 
 -spec check_authorization(rep_id(), #user_ctx{}) -> ok | not_found.
-check_authorization(RepId, #user_ctx{name = Name} = Ctx) ->
-    case couch_replicator_scheduler:rep_state(RepId) of
-    #rep{user_ctx = #user_ctx{name = Name}} ->
-        ok;
-    #rep{} ->
-        couch_httpd:verify_is_server_admin(Ctx);
-    nil ->
-        not_found
+check_authorization(JobId, #user_ctx{} = Ctx) when is_binary(JobId) ->
+    #user_ctx{name = Name} = Ctx,
+    case couch_replicator_jobs:get_job_data(undefined, JobId) of
+        {error, not_found} ->
+            not_found;
+        {ok, #{?DB_NAME := DbName}} when is_binary(DbName) ->
+            throw({unauthorized, <<"Persistent replication collision">>});
+        {ok, #{?REP := #{?REP_USER := Name}}} ->
+            ok;
+        {ok, #{}} ->
+            couch_httpd:verify_is_server_admin(Ctx)
     end.
 
 
@@ -309,16 +514,16 @@ authorization_test_() ->
 
 t_admin_is_always_authorized() ->
     ?_test(begin
-        expect_rep_user_ctx(<<"someuser">>, <<"_admin">>),
+        expect_job_data({ok, #{?REP => #{?REP_USER => <<"someuser">>}}}),
         UserCtx = #user_ctx{name = <<"adm">>, roles = [<<"_admin">>]},
         ?assertEqual(ok, check_authorization(<<"RepId">>, UserCtx))
     end).
 
 
 t_username_must_match() ->
-     ?_test(begin
-        expect_rep_user_ctx(<<"user">>, <<"somerole">>),
-        UserCtx1 = #user_ctx{name = <<"user">>, roles = [<<"somerole">>]},
+    ?_test(begin
+        expect_job_data({ok, #{?REP => #{?REP_USER => <<"user1">>}}}),
+        UserCtx1 = #user_ctx{name = <<"user1">>, roles = [<<"somerole">>]},
         ?assertEqual(ok, check_authorization(<<"RepId">>, UserCtx1)),
         UserCtx2 = #user_ctx{name = <<"other">>, roles = [<<"somerole">>]},
         ?assertThrow({unauthorized, _}, check_authorization(<<"RepId">>,
@@ -327,8 +532,8 @@ t_username_must_match() ->
 
 
 t_replication_not_found() ->
-     ?_test(begin
-        meck:expect(couch_replicator_scheduler, rep_state, 1, nil),
+    ?_test(begin
+        expect_job_data({error, not_found}),
         UserCtx1 = #user_ctx{name = <<"user">>, roles = [<<"somerole">>]},
         ?assertEqual(not_found, check_authorization(<<"RepId">>, UserCtx1)),
         UserCtx2 = #user_ctx{name = <<"adm">>, roles = [<<"_admin">>]},
@@ -336,57 +541,8 @@ t_replication_not_found() ->
     end).
 
 
-expect_rep_user_ctx(Name, Role) ->
-    meck:expect(couch_replicator_scheduler, rep_state,
-        fun(_Id) ->
-            UserCtx = #user_ctx{name = Name, roles = [Role]},
-            #rep{user_ctx = UserCtx}
-        end).
-
+expect_job_data(JobDataRes) ->
+    meck:expect(couch_replicator_jobs, get_job_data, 2, JobDataRes).
 
-strip_url_creds_test_() ->
-     {
-        setup,
-        fun() ->
-            meck:expect(config, get, fun(_, _, Default) -> Default end)
-        end,
-        fun(_) ->
-            meck:unload()
-        end,
-        [
-            t_strip_http_basic_creds(),
-            t_strip_http_props_creds(),
-            t_strip_local_db_creds()
-        ]
-    }.
-
-
-t_strip_local_db_creds() ->
-    ?_test(?assertEqual(<<"localdb">>, strip_url_creds(<<"localdb">>))).
-
-
-t_strip_http_basic_creds() ->
-    ?_test(begin
-        Url1 = <<"http://adm:pass@host/db">>,
-        ?assertEqual(<<"http://adm:*****@host/db/">>, strip_url_creds(Url1)),
-        Url2 = <<"https://adm:pass@host/db">>,
-        ?assertEqual(<<"https://adm:*****@host/db/">>, strip_url_creds(Url2)),
-        Url3 = <<"http://adm:pass@host:80/db">>,
-        ?assertEqual(<<"http://adm:*****@host:80/db/">>, strip_url_creds(Url3)),
-        Url4 = <<"http://adm:pass@host/db?a=b&c=d">>,
-        ?assertEqual(<<"http://adm:*****@host/db?a=b&c=d">>,
-            strip_url_creds(Url4))
-    end).
-
-
-t_strip_http_props_creds() ->
-    ?_test(begin
-        Props1 = {[{<<"url">>, <<"http://adm:pass@host/db">>}]},
-        ?assertEqual(<<"http://adm:*****@host/db/">>, strip_url_creds(Props1)),
-        Props2 = {[ {<<"url">>, <<"http://host/db">>},
-            {<<"headers">>, {[{<<"Authorization">>, <<"Basic pa55">>}]}}
-        ]},
-        ?assertEqual(<<"http://host/db/">>, strip_url_creds(Props2))
-    end).
 
 -endif.
diff --git a/src/couch_replicator/src/couch_replicator_epi.erl b/src/couch_replicator/src/couch_replicator_epi.erl
new file mode 100644
index 0000000..9fb1790
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_epi.erl
@@ -0,0 +1,58 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+-module(couch_replicator_epi).
+
+
+-behaviour(couch_epi_plugin).
+
+
+-export([
+    app/0,
+    providers/0,
+    services/0,
+    data_subscriptions/0,
+    data_providers/0,
+    processes/0,
+    notify/3
+]).
+
+
+app() ->
+    couch_replicator.
+
+
+providers() ->
+    [
+        {fabric2_db, couch_replicator_fabric2_plugin}
+    ].
+
+
+services() ->
+    [].
+
+
+data_subscriptions() ->
+    [].
+
+
+data_providers() ->
+    [].
+
+
+processes() ->
+    [].
+
+
+notify(_Key, _Old, _New) ->
+    ok.
diff --git a/src/couch_replicator/src/couch_replicator_fabric2_plugin.erl b/src/couch_replicator/src/couch_replicator_fabric2_plugin.erl
new file mode 100644
index 0000000..7bf6145
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_fabric2_plugin.erl
@@ -0,0 +1,36 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+-module(couch_replicator_fabric2_plugin).
+
+
+-export([
+    after_db_create/2,
+    after_db_delete/2,
+    after_doc_write/6
+]).
+
+
+after_db_create(DbName, DbUUID) ->
+    couch_replicator:after_db_create(DbName, DbUUID),
+    [DbName, DbUUID].
+
+
+after_db_delete(DbName, DbUUID) ->
+    couch_replicator:after_db_delete(DbName, DbUUID),
+    [DbName, DbUUID].
+
+
+after_doc_write(Db, Doc, Winner, OldWinner, RevId, Seq)->
+    couch_replicator:after_doc_write(Db, Doc, Winner, OldWinner, RevId, Seq),
+    [Db, Doc, Winner, OldWinner, RevId, Seq].