You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2019/07/16 16:57:34 UTC

[couchdb] branch prototype/fdb-replicator created (now 61808e5)

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a change to branch prototype/fdb-replicator
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


      at 61808e5  FDB Replicator WIP

This branch includes the following new commits:

     new 61808e5  FDB Replicator WIP

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[couchdb] 01/01: FDB Replicator WIP

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch prototype/fdb-replicator
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 61808e5ee13be0a8e5984e9c16ee3e982d837b83
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Tue Jul 16 12:57:13 2019 -0400

    FDB Replicator WIP
---
 src/couch_replicator/src/couch_replicator.erl      |   4 +-
 src/couch_replicator/src/couch_replicator.hrl      |   6 +-
 .../src/couch_replicator_db_changes.erl            | 108 --------------
 .../src/couch_replicator_doc_processor.erl         | 158 ++++++++++++---------
 src/couch_replicator/src/couch_replicator_docs.erl | 128 ++++++++---------
 src/couch_replicator/src/couch_replicator_sup.erl  |   8 +-
 src/fabric/src/fabric2_db.erl                      |  71 ++++++++-
 src/fabric/src/fabric2_fdb.erl                     |   7 +-
 8 files changed, 234 insertions(+), 256 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index 39141c3..e2d1964 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -83,7 +83,9 @@ replicate(PostBody, Ctx) ->
 % it returns `ignore`.
 -spec ensure_rep_db_exists() -> ignore.
 ensure_rep_db_exists() ->
-    {ok, _Db} = couch_replicator_docs:ensure_rep_db_exists(),
+    ok = couch_replicator_docs:ensure_rep_db_exists(),
+    couch_jobs:set_type_timeout(?REP_DOCS, ?REP_DOCS_TIMEOUT_MSEC),
+    couch_jobs:set_type_timeout(?REP_JOBS, ?REP_JOBS_TIMEOUT_MSEC),
     ignore.
 
 
diff --git a/src/couch_replicator/src/couch_replicator.hrl b/src/couch_replicator/src/couch_replicator.hrl
index 2a5b7c8..8f7a77a 100644
--- a/src/couch_replicator/src/couch_replicator.hrl
+++ b/src/couch_replicator/src/couch_replicator.hrl
@@ -11,6 +11,10 @@
 % the License.
 
 -define(REP_ID_VERSION, 4).
+-define(REP_DOCS, <<"repdocs">>).
+-define(REP_JOBS, <<"repjobs">>).
+-define(REP_DOCS_TIMEOUT_MSEC, 17000).
+-define(REP_JOBS_TIMEOUT_MSEC, 33000).
 
 -record(rep, {
     id :: rep_id() | '_' | 'undefined',
@@ -22,7 +26,7 @@
     view = nil :: any() | '_',
     doc_id :: any() | '_',
     db_name = null :: null | binary() | '_',
-    start_time = {0, 0, 0} :: erlang:timestamp() | '_',
+    start_time = 0 :: integer() | '_',
     stats = couch_replicator_stats:new() :: orddict:orddict() | '_'
 }).
 
diff --git a/src/couch_replicator/src/couch_replicator_db_changes.erl b/src/couch_replicator/src/couch_replicator_db_changes.erl
deleted file mode 100644
index 92b0222..0000000
--- a/src/couch_replicator/src/couch_replicator_db_changes.erl
+++ /dev/null
@@ -1,108 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_db_changes).
-
--behaviour(gen_server).
-
--export([
-   start_link/0
-]).
-
--export([
-   init/1,
-   terminate/2,
-   handle_call/3,
-   handle_info/2,
-   handle_cast/2,
-   code_change/3
-]).
-
--export([
-   notify_cluster_event/2
-]).
-
--record(state, {
-   event_listener :: pid(),
-   mdb_changes :: pid() | nil
-}).
-
-
--spec notify_cluster_event(pid(), {cluster, any()}) -> ok.
-notify_cluster_event(Server, {cluster, _} = Event) ->
-    gen_server:cast(Server, Event).
-
-
--spec start_link() ->
-    {ok, pid()} | ignore | {error, any()}.
-start_link() ->
-    gen_server:start_link(?MODULE, [], []).
-
-
-init([]) ->
-    EvtPid = couch_replicator_clustering:link_cluster_event_listener(?MODULE,
-        notify_cluster_event, [self()]),
-    State = #state{event_listener = EvtPid, mdb_changes = nil},
-    case couch_replicator_clustering:is_stable() of
-        true ->
-            {ok, restart_mdb_changes(State)};
-        false ->
-            {ok, State}
-    end.
-
-
-terminate(_Reason, _State) ->
-    ok.
-
-
-handle_call(_Msg, _From, State) ->
-    {reply, {error, invalid_call}, State}.
-
-
-handle_cast({cluster, unstable}, State) ->
-    {noreply, stop_mdb_changes(State)};
-
-handle_cast({cluster, stable}, State) ->
-    {noreply, restart_mdb_changes(State)}.
-
-
-handle_info(_Msg, State) ->
-    {noreply, State}.
-
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-
--spec restart_mdb_changes(#state{}) -> #state{}.
-restart_mdb_changes(#state{mdb_changes = nil} = State) ->
-    Suffix = <<"_replicator">>,
-    CallbackMod = couch_replicator_doc_processor,
-    Options = [skip_ddocs],
-    {ok, Pid} = couch_multidb_changes:start_link(Suffix, CallbackMod, nil,
-        Options),
-    couch_stats:increment_counter([couch_replicator, db_scans]),
-    couch_log:notice("Started replicator db changes listener ~p", [Pid]),
-    State#state{mdb_changes = Pid};
-
-restart_mdb_changes(#state{mdb_changes = _Pid} = State) ->
-    restart_mdb_changes(stop_mdb_changes(State)).
-
-
--spec stop_mdb_changes(#state{}) -> #state{}.
-stop_mdb_changes(#state{mdb_changes = nil} = State) ->
-    State;
-stop_mdb_changes(#state{mdb_changes = Pid} = State) ->
-    couch_log:notice("Stopping replicator db changes listener ~p", [Pid]),
-    unlink(Pid),
-    exit(Pid, kill),
-    State#state{mdb_changes = nil}.
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index 1b43598..174fdda 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -13,7 +13,6 @@
 -module(couch_replicator_doc_processor).
 
 -behaviour(gen_server).
--behaviour(couch_multidb_changes).
 
 -export([
     start_link/0
@@ -29,10 +28,9 @@
 ]).
 
 -export([
-    db_created/2,
-    db_deleted/2,
-    db_found/2,
-    db_change/3
+    during_doc_update/3,
+    after_db_create/1,
+    after_db_delete/1
 ]).
 
 -export([
@@ -76,88 +74,118 @@
 }).
 
 
-% couch_multidb_changes API callbacks
+during_doc_update(#doc{} = Doc, Db, _UpdateType) ->
+    couch_stats:increment_counter([couch_replicator, docs, db_changes]),
+    try
+        process_change(Db, Doc)
+    catch
+        _Tag:Error ->
+            DocId = Doc#doc.id,
+            #{name := DbName} = Db,
+            couch_replicator_docs:update_failed(DbName, DocId, Error)
+    end,
+    ok.
+
 
-db_created(DbName, Server) ->
+after_db_create(#{name := DbName}) ->
     couch_stats:increment_counter([couch_replicator, docs, dbs_created]),
-    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
-    Server.
+    couch_replicator_docs:ensure_rep_ddoc_exists(DbName).
 
 
-db_deleted(DbName, Server) ->
+after_db_delete(#{name := DbName}) ->
     couch_stats:increment_counter([couch_replicator, docs, dbs_deleted]),
-    ok = gen_server:call(?MODULE, {clean_up_replications, DbName}, infinity),
-    Server.
+    gen_server:call(?MODULE, {clean_up_replications, DbName}, infinity).
 
 
-db_found(DbName, Server) ->
-    couch_stats:increment_counter([couch_replicator, docs, dbs_found]),
-    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
-    Server.
+docs_job_id(DbName, Id) when is_binary(DbName), is_binary(Id) ->
+    <<DbName/binary, Id/binary>>.
 
 
-db_change(DbName, {ChangeProps} = Change, Server) ->
-    couch_stats:increment_counter([couch_replicator, docs, db_changes]),
-    try
-        ok = process_change(DbName, Change)
-    catch
-    _Tag:Error ->
-        {RepProps} = get_json_value(doc, ChangeProps),
-        DocId = get_json_value(<<"_id">>, RepProps),
-        couch_replicator_docs:update_failed(DbName, DocId, Error)
-    end,
-    Server.
-
+process_change(_Db, #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>}) ->
+    ok;
 
--spec get_worker_ref(db_doc_id()) -> reference() | nil.
-get_worker_ref({DbName, DocId}) when is_binary(DbName), is_binary(DocId) ->
-    case ets:lookup(?MODULE, {DbName, DocId}) of
-        [#rdoc{worker = WRef}] when is_reference(WRef) ->
-            WRef;
-        [#rdoc{worker = nil}] ->
-            nil;
-        [] ->
-            nil
+process_change(#{name := DbName} = Db, #doc{deleted = true} = Doc) ->
+    Id = docs_job_id(DbName, Doc#doc.id),
+    case couch_jobs:get_job_data(Db, ?REP_DOCS, Id) of
+        {error, not_found} ->
+            ok;
+        {ok, #{<<"rid">> := null}} ->
+            couch_jobs:remove(Db, ?REP_DOCS, Id),
+            ok;
+        {ok, #{<<"rid">> := RepId}} ->
+            couch_jobs:remove(Db, ?REP_JOBS, RepId),
+            couch_jobs:remove(Db, ?REP_DOCS, Id),
+            ok
     end.
 
 
-% Cluster membership change notification callback
--spec notify_cluster_event(pid(), {cluster, any()}) -> ok.
-notify_cluster_event(Server, {cluster, _} = Event) ->
-    gen_server:cast(Server, Event).
+process_change(#{name := DbName} = Db, #doc{} = Doc) ->
+    #doc{id = DocId, body = {Props} = Body} = Doc,
+    Id = docs_job_id(DbName, DocId),
+    DocState = get_json_value(<<"_replication_state">>, Props),
+    {Rep, RepParseError} = try
+        Rep0 = couch_replicator_docs:parse_rep_doc_without_id(Body),
+        Rep1 = Rep0#{db_name = DbName, start_time = erlang:system_time()},
+        {Rep1, null}
+    catch
+        throw:{bad_rep_doc, Reason} ->
+            {null, Reason}
+    end,
+    RepMap = couch_replicator_docs:rep_to_map(Rep),
+    case couch_jobs:get_job_data(Db, ?REP_DOCS, Id) of
+        {error, not_found} ->
+            RepDocsJob = #{
+                <<"rep_id">> := null,
+                <<"db_name">> := DbName,
+                <<"doc_id">> := Doc#doc.id,
+                <<"rep">> := RepMap,
+                <<"rep_parse_error">> := RepParseError
+            },
+            couch_jobs:add(Db, ?REP_DOCS, RepDocsJob);
+        {ok, #{} = Old} ->
+            % Normalize old rep and new rep and only update job
+            % if changed
+            #{<<"rep">> := OldRep, <<"rep_parse_error">> := OldError} = Old,
+            NOldRep = couch_replicator_util:normalize_rep(OldRep),
+            NRep = couch_replicator_util:normalize_rep(Rep),
+            RepDocsJob = #{
+                <<"rep_id">> := null,
+                <<"db_name">> := DbName,
+                <<"doc_id">> := Doc#doc.id,
+                <<"rep">> := RepMap,
+                <<"rep_parse_error">> := RepParseError
+            }
+    end.
 
 
-process_change(DbName, {Change}) ->
-    {RepProps} = JsonRepDoc = get_json_value(doc, Change),
-    DocId = get_json_value(<<"_id">>, RepProps),
-    Owner = couch_replicator_clustering:owner(DbName, DocId),
+process_change(#{name := DbName} = Db, #doc{} = Doc) ->
+    #doc{id = DocId, body = {Props} = Body, deleted = Deleted} = Doc,
     Id = {DbName, DocId},
-    case {Owner, get_json_value(deleted, Change, false)} of
-    {_, true} ->
-        ok = gen_server:call(?MODULE, {removed, Id}, infinity);
-    {unstable, false} ->
-        couch_log:notice("Not starting '~s' as cluster is unstable", [DocId]);
-    {ThisNode, false} when ThisNode =:= node() ->
-        case get_json_value(<<"_replication_state">>, RepProps) of
-        undefined ->
-            ok = process_updated(Id, JsonRepDoc);
-        <<"triggered">> ->
+    case Deleted of
+        true ->
+            process_deleted_doc(Db, Id);
+        false ->
+            process_updated_doc(Db, Id, Body)
+    end.
+    State = get_json_value(<<"_replication_state">>, Props),
+    case {Deleted, State} of
+        {true, _} ->
+            ok = gen_server:call(?MODULE, {removed, Id}, infinity);
+        {false, undefined} ->
+            ok = process_updated(Id, Body);
+        {false, <<"triggered">>} ->
             maybe_remove_state_fields(DbName, DocId),
-            ok = process_updated(Id, JsonRepDoc);
-        <<"completed">> ->
+            ok = process_updated(Id, Body);
+        {false, <<"completed">>} ->
             ok = gen_server:call(?MODULE, {completed, Id}, infinity);
-        <<"error">> ->
+        {false, <<"error">>} ->
             % Handle replications started from older versions of replicator
             % which wrote transient errors to replication docs
             maybe_remove_state_fields(DbName, DocId),
-            ok = process_updated(Id, JsonRepDoc);
-        <<"failed">> ->
+            ok = process_updated(Id, Body);
+        {false, <<"failed">>} ->
             ok
-        end;
-    {Owner, false} ->
-        ok
-    end,
-    ok.
+    end.
 
 
 maybe_remove_state_fields(DbName, DocId) ->
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index bbf9694..de34b53 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -18,11 +18,11 @@
     parse_rep_db/3,
     parse_rep_doc_without_id/1,
     parse_rep_doc_without_id/2,
+    rep_to_map/1,
     before_doc_update/3,
     after_doc_read/2,
     ensure_rep_db_exists/0,
     ensure_rep_ddoc_exists/1,
-    ensure_cluster_rep_ddoc_exists/1,
     remove_state_fields/2,
     update_doc_completed/3,
     update_failed/3,
@@ -121,34 +121,22 @@ update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) ->
     ok.
 
 
--spec ensure_rep_db_exists() -> {ok, Db::any()}.
+-spec ensure_rep_db_exists() -> ok.
 ensure_rep_db_exists() ->
-    Db = case couch_db:open_int(?REP_DB_NAME, [?CTX, sys_db,
-            nologifmissing]) of
-        {ok, Db0} ->
-            Db0;
-        _Error ->
-            {ok, Db0} = couch_db:create(?REP_DB_NAME, [?CTX, sys_db]),
-            Db0
-    end,
-    ok = ensure_rep_ddoc_exists(?REP_DB_NAME),
-    {ok, Db}.
-
-
--spec ensure_rep_ddoc_exists(binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb) ->
-    case mem3:belongs(RepDb, ?REP_DESIGN_DOC) of
-        true ->
-            ensure_rep_ddoc_exists(RepDb, ?REP_DESIGN_DOC);
-        false ->
+    Opts = [?CTX, sys_db, nologifmissing],
+    case fabric2_db:create(?REP_DB_NAME, Opts) of
+        {error, file_exists} ->
+            ok;
+        {ok, _Db} ->
             ok
     end.
 
 
--spec ensure_rep_ddoc_exists(binary(), binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb, DDocId) ->
+-spec ensure_rep_ddoc_exists(binary()) -> ok.
+ensure_rep_ddoc_exists(RepDb) ->
+    DDocId = ?REP_DESIGN_DOC,
     case open_rep_doc(RepDb, DDocId) of
-        {not_found, no_db_file} ->
+        {not_found, database_does_not_exist} ->
             %% database was deleted.
             ok;
         {not_found, _Reason} ->
@@ -179,13 +167,6 @@ ensure_rep_ddoc_exists(RepDb, DDocId) ->
     ok.
 
 
--spec ensure_cluster_rep_ddoc_exists(binary()) -> ok.
-ensure_cluster_rep_ddoc_exists(RepDb) ->
-    DDocId = ?REP_DESIGN_DOC,
-    [#shard{name = DbShard} | _] = mem3:shards(RepDb, DDocId),
-    ensure_rep_ddoc_exists(DbShard, DDocId).
-
-
 -spec compare_ejson({[_]}, {[_]}) -> boolean().
 compare_ejson(EJson1, EJson2) ->
     EjsonSorted1 = couch_replicator_filters:ejsort(EJson1),
@@ -291,6 +272,26 @@ parse_rep_doc_without_id({Props}, UserCtx) ->
     end.
 
 
+rep_to_map(null) ->
+    null;
+
+rep_to_map(#rep{} = Rep) ->
+    {IdBase, IdExt} = Rep#rep.id,
+    #{
+        <<"id_base">> => IdBase,
+        <<"id_ext">> => IdExt,
+        <<"source">> => Rep#rep.source,
+        <<"target">> => Rep#rep.target,
+        <<"options">> => Rep#rep.options,
+        <<"user_ctx">> => Rep#rep.user_ctx,
+        <<"type">> => Rep#rep.type,
+        <<"view">> => Rep#rep.view,
+        <<"doc_id">> => Rep#rep.doc_id,
+        <<"db_name">> => Rep#rep.db_name,
+        <<"start_time">> = Rep#rep.start_time,
+        <<"stats">> = Rep#rep.stats
+    }.
+
 % Update a #rep{} record with a replication_id. Calculating the id might involve
 % fetching a filter from the source db, and so it could fail intermetently.
 % In case of a failure to fetch the filter this function will throw a
@@ -350,22 +351,21 @@ update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) ->
 
 
 open_rep_doc(DbName, DocId) ->
-    case couch_db:open_int(DbName, [?CTX, sys_db]) of
-        {ok, Db} ->
-            try
-                couch_db:open_doc(Db, DocId, [ejson_body])
-            after
-                couch_db:close(Db)
-            end;
-        Else ->
-            Else
+    try
+        case fabric2_db:open(DbName, [?CTX, sys_db]) of
+            {ok, Db} -> fabric2_db:open_doc(Db, DocId, [ejson_body]);
+            Else -> Else
+        end
+    catch
+        error:database_does_not_exist ->
+            {not_found, database_does_not_exist}
     end.
 
 
 save_rep_doc(DbName, Doc) ->
-    {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
+    {ok, Db} = fabric2_db:open(DbName, [?CTX, sys_db]),
     try
-        couch_db:update_doc(Db, Doc, [])
+        fabric2_db:update_doc(Db, Doc, [])
     catch
         % User can accidently write a VDU which prevents _replicator from
         % updating replication documents. Avoid crashing replicator and thus
@@ -374,8 +374,6 @@ save_rep_doc(DbName, Doc) ->
             Msg = "~p VDU function preventing doc update to ~s ~s ~p",
             couch_log:error(Msg, [?MODULE, DbName, Doc#doc.id, Reason]),
             {ok, forbidden}
-    after
-        couch_db:close(Db)
     end.
 
 
@@ -622,7 +620,7 @@ before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
     #user_ctx{
        roles = Roles,
        name = Name
-    } = couch_db:get_user_ctx(Db),
+    } = fabric2_db:get_user_ctx(Db),
     case lists:member(<<"_replicator">>, Roles) of
     true ->
         Doc;
@@ -633,7 +631,7 @@ before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
         Name ->
             Doc;
         Other ->
-            case (catch couch_db:check_is_admin(Db)) of
+            case (catch fabric2_db:check_is_admin(Db)) of
             ok when Other =:= null ->
                 Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
             ok ->
@@ -650,8 +648,8 @@ before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
 after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
     Doc;
 after_doc_read(#doc{body = {Body}} = Doc, Db) ->
-    #user_ctx{name = Name} = couch_db:get_user_ctx(Db),
-    case (catch couch_db:check_is_admin(Db)) of
+    #user_ctx{name = Name} = fabric2_db:get_user_ctx(Db),
+    case (catch fabric2_db:check_is_admin(Db)) of
     ok ->
         Doc;
     _ ->
@@ -659,16 +657,15 @@ after_doc_read(#doc{body = {Body}} = Doc, Db) ->
         Name ->
             Doc;
         _Other ->
-            Source = strip_credentials(couch_util:get_value(<<"source">>,
-Body)),
-            Target = strip_credentials(couch_util:get_value(<<"target">>,
-Body)),
+            Source0 = couch_util:get_value(<<"source">>, Body),
+            Target0 = couch_util:get_value(<<"target">>, Body),
+            Source = strip_credentials(Source0),
+            Target = strip_credentials(Target0),
             NewBody0 = ?replace(Body, <<"source">>, Source),
             NewBody = ?replace(NewBody0, <<"target">>, Target),
             #doc{revs = {Pos, [_ | Revs]}} = Doc,
             NewDoc = Doc#doc{body = {NewBody}, revs = {Pos - 1, Revs}},
-            NewRevId = couch_db:new_revid(NewDoc),
-            NewDoc#doc{revs = {Pos, [NewRevId | Revs]}}
+            fabric2_db:new_revid(NewDoc)
         end
     end.
 
@@ -779,27 +776,24 @@ check_strip_credentials_test() ->
 
 setup() ->
     DbName = ?tempdb(),
-    {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
-    ok = couch_db:close(Db),
-    create_vdu(DbName),
+    {ok, Db} = fabric2_db:create(DbName, [?ADMIN_CTX]),
+    create_vdu(Db),
     DbName.
 
 
 teardown(DbName) when is_binary(DbName) ->
-    couch_server:delete(DbName, [?ADMIN_CTX]),
+    fabric2_db:delete(DbName, [?ADMIN_CTX]),
     ok.
 
 
-create_vdu(DbName) ->
-    couch_util:with_db(DbName, fun(Db) ->
-        VduFun = <<"function(newdoc, olddoc, userctx) {throw({'forbidden':'fail'})}">>,
-        Doc = #doc{
-            id = <<"_design/vdu">>,
-            body = {[{<<"validate_doc_update">>, VduFun}]}
-        },
-        {ok, _} = couch_db:update_docs(Db, [Doc]),
-        couch_db:ensure_full_commit(Db)
-    end).
+create_vdu(Db) ->
+    VduFun = <<"function(newdoc, olddoc, userctx) {throw({'forbidden':'fail'})}">>,
+    Doc = #doc{
+        id = <<"_design/vdu">>,
+        body = {[{<<"validate_doc_update">>, VduFun}]}
+    },
+    {ok, _} = fabric2_db:update_doc(Db, [Doc]),
+    ok.
 
 
 update_replicator_doc_with_bad_vdu_test_() ->
diff --git a/src/couch_replicator/src/couch_replicator_sup.erl b/src/couch_replicator/src/couch_replicator_sup.erl
index 5475e8f..8202b1c 100644
--- a/src/couch_replicator/src/couch_replicator_sup.erl
+++ b/src/couch_replicator/src/couch_replicator_sup.erl
@@ -70,12 +70,6 @@ init(_Args) ->
             transient,
             brutal_kill,
             worker,
-            [couch_replicator]},
-        {couch_replicator_db_changes,
-            {couch_replicator_db_changes, start_link, []},
-            permanent,
-            brutal_kill,
-            worker,
-            [couch_multidb_changes]}
+            [couch_replicator]}
     ],
     {ok, {{rest_for_one,10,1}, Children}}.
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 7114903..b4ff8dc 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -29,6 +29,9 @@
     name/1,
     get_after_doc_read_fun/1,
     get_before_doc_update_fun/1,
+    get_during_doc_update_fun/1,
+    get_after_db_create_fun/1,
+    get_after_db_delete_fun/1,
     get_committed_update_seq/1,
     get_compacted_seq/1,
     get_compactor_pid/1,
@@ -153,7 +156,9 @@ create(DbName, Options) ->
         #{} = Db0 ->
             Db1 = maybe_add_sys_db_callbacks(Db0),
             ok = fabric2_server:store(Db1),
-            {ok, Db1#{tx := undefined}};
+            Db2 = Db1#{tx := undefined},
+            ok = apply_after_db_create(Db2),
+            {ok, Db2};
         Error ->
             Error
     end.
@@ -186,6 +191,7 @@ delete(DbName, Options) ->
         fabric2_fdb:delete(TxDb)
     end),
     if Resp /= ok -> Resp; true ->
+        ok = apply_after_db_delete(Db#{tx := undefined}),
         fabric2_server:remove(DbName)
     end.
 
@@ -262,6 +268,19 @@ get_after_doc_read_fun(#{after_doc_read := AfterDocRead}) ->
 get_before_doc_update_fun(#{before_doc_update := BeforeDocUpdate}) ->
     BeforeDocUpdate.
 
+
+get_during_doc_update_fun(#{during_doc_update := DuringDocUpdate}) ->
+    DuringDocUpdate.
+
+
+get_after_db_create_fun(#{after_db_create := AfterDbCreate}) ->
+    AfterDbCreate.
+
+
+get_after_db_delete_fun(#{after_db_delete := AfterDbDelete}) ->
+    AfterDbDelete.
+
+
 get_committed_update_seq(#{} = Db) ->
     get_update_seq(Db).
 
@@ -745,24 +764,33 @@ maybe_add_sys_db_callbacks(Db) ->
     IsGlobalUsersDb = fabric2_util:dbname_ends_with(Db, <<"_users">>),
     IsUsersDb = IsAuthCache orelse IsCfgUsersDb orelse IsGlobalUsersDb,
 
-    {BDU, ADR} = if
+    {BDU, DDU, ADR, ADC, ADD} = if
         IsReplicatorDb ->
             {
                 fun couch_replicator_docs:before_doc_update/3,
-                fun couch_replicator_docs:after_doc_read/2
+                fun couch_replicator_doc_processor:during_doc_update/3,
+                fun couch_replicator_docs:after_doc_read/2,
+                fun couch_replicator_doc_processor:after_db_create/1,
+                fun couch_replicator_doc_processor:after_db_delete/1
             };
         IsUsersDb ->
             {
                 fun fabric2_users_db:before_doc_update/3,
-                fun fabric2_users_db:after_doc_read/2
+                undefined,
+                fun fabric2_users_db:after_doc_read/2,
+                undefined,
+                undefined
             };
         true ->
-            {undefined, undefined}
+            {undefined, undefined, undefined, undefined, undefined}
     end,
 
     Db#{
         before_doc_update := BDU,
-        after_doc_read := ADR
+        during_doc_update := DDU,
+        after_doc_read := ADR,
+        after_db_create := ADC,
+        after_db_delete := ADD
     }.
 
 
@@ -996,6 +1024,33 @@ apply_before_doc_update(Db, Docs, Options) ->
     end.
 
 
+apply_during_doc_update(#{during_doc_update := DDU} = Db, Doc, UpdateType)
+        when is_function(DDU, 3) ->
+    DDU(Doc, Db, UpdateType),
+    ok;
+
+apply_during_doc_update(#{during_doc_update := undefined}, _, _) ->
+    ok.
+
+
+apply_after_db_create(#{after_db_create := ADC} = Db)
+        when is_function(ADC, 1) ->
+    ADC(Db),
+    ok;
+
+apply_after_db_create(#{after_db_create := undefined}) ->
+    ok.
+
+
+apply_after_db_delete(#{after_db_delete := ADD} = Db)
+        when is_function(ADD, 1) ->
+    ADD(Db),
+    ok;
+
+apply_after_db_delete(#{after_db_delete := undefined}) ->
+    ok.
+
+
 update_doc_int(#{} = Db, #doc{} = Doc, Options) ->
     IsLocal = case Doc#doc.id of
         <<?LOCAL_DOC_PREFIX, _/binary>> -> true;
@@ -1172,6 +1227,8 @@ update_doc_interactive(Db, Doc0, Future, _Options) ->
             ToRemove
         ),
 
+    ok = apply_during_doc_update(Db, Doc3, interactive_edit),
+
     {ok, {NewRevPos, NewRev}}.
 
 
@@ -1255,6 +1312,8 @@ update_doc_replicated(Db, Doc0, _Options) ->
             ToRemove
         ),
 
+    ok = apply_during_doc_update(Db, Doc3, replicated_changes),
+
     {ok, []}.
 
 
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index 670ce8b..cc513d1 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -162,7 +162,10 @@ create(#{} = Db0, Options) ->
 
         validate_doc_update_funs => [],
         before_doc_update => undefined,
+        during_doc_update => undefined,
         after_doc_read => undefined,
+        after_db_create => undefined,
+        after_db_delete => undefined,
         % All other db things as we add features,
 
         db_options => Options
@@ -199,8 +202,10 @@ open(#{} = Db0, Options) ->
         % bits.
         validate_doc_update_funs => [],
         before_doc_update => undefined,
+        during_doc_update => undefined,
         after_doc_read => undefined,
-
+        after_db_create => undefined,
+        after_db_delete => undefined,
         db_options => Options
     },