You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2019/07/16 16:57:35 UTC

[couchdb] 01/01: FDB Replicator WIP

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch prototype/fdb-replicator
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 61808e5ee13be0a8e5984e9c16ee3e982d837b83
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Tue Jul 16 12:57:13 2019 -0400

    FDB Replicator WIP
---
 src/couch_replicator/src/couch_replicator.erl      |   4 +-
 src/couch_replicator/src/couch_replicator.hrl      |   6 +-
 .../src/couch_replicator_db_changes.erl            | 108 --------------
 .../src/couch_replicator_doc_processor.erl         | 158 ++++++++++++---------
 src/couch_replicator/src/couch_replicator_docs.erl | 128 ++++++++---------
 src/couch_replicator/src/couch_replicator_sup.erl  |   8 +-
 src/fabric/src/fabric2_db.erl                      |  71 ++++++++-
 src/fabric/src/fabric2_fdb.erl                     |   7 +-
 8 files changed, 234 insertions(+), 256 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index 39141c3..e2d1964 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -83,7 +83,9 @@ replicate(PostBody, Ctx) ->
 % it returns `ignore`.
 -spec ensure_rep_db_exists() -> ignore.
 ensure_rep_db_exists() ->
-    {ok, _Db} = couch_replicator_docs:ensure_rep_db_exists(),
+    ok = couch_replicator_docs:ensure_rep_db_exists(),
+    couch_jobs:set_type_timeout(?REP_DOCS, ?REP_DOCS_TIMEOUT_MSEC),
+    couch_jobs:set_type_timeout(?REP_JOBS, ?REP_JOBS_TIMEOUT_MSEC),
     ignore.
 
 
diff --git a/src/couch_replicator/src/couch_replicator.hrl b/src/couch_replicator/src/couch_replicator.hrl
index 2a5b7c8..8f7a77a 100644
--- a/src/couch_replicator/src/couch_replicator.hrl
+++ b/src/couch_replicator/src/couch_replicator.hrl
@@ -11,6 +11,10 @@
 % the License.
 
 -define(REP_ID_VERSION, 4).
+-define(REP_DOCS, <<"repdocs">>).
+-define(REP_JOBS, <<"repjobs">>).
+-define(REP_DOCS_TIMEOUT_MSEC, 17000).
+-define(REP_JOBS_TIMEOUT_MSEC, 33000).
 
 -record(rep, {
     id :: rep_id() | '_' | 'undefined',
@@ -22,7 +26,7 @@
     view = nil :: any() | '_',
     doc_id :: any() | '_',
     db_name = null :: null | binary() | '_',
-    start_time = {0, 0, 0} :: erlang:timestamp() | '_',
+    start_time = 0 :: integer() | '_',
     stats = couch_replicator_stats:new() :: orddict:orddict() | '_'
 }).
 
diff --git a/src/couch_replicator/src/couch_replicator_db_changes.erl b/src/couch_replicator/src/couch_replicator_db_changes.erl
deleted file mode 100644
index 92b0222..0000000
--- a/src/couch_replicator/src/couch_replicator_db_changes.erl
+++ /dev/null
@@ -1,108 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_db_changes).
-
--behaviour(gen_server).
-
--export([
-   start_link/0
-]).
-
--export([
-   init/1,
-   terminate/2,
-   handle_call/3,
-   handle_info/2,
-   handle_cast/2,
-   code_change/3
-]).
-
--export([
-   notify_cluster_event/2
-]).
-
--record(state, {
-   event_listener :: pid(),
-   mdb_changes :: pid() | nil
-}).
-
-
--spec notify_cluster_event(pid(), {cluster, any()}) -> ok.
-notify_cluster_event(Server, {cluster, _} = Event) ->
-    gen_server:cast(Server, Event).
-
-
--spec start_link() ->
-    {ok, pid()} | ignore | {error, any()}.
-start_link() ->
-    gen_server:start_link(?MODULE, [], []).
-
-
-init([]) ->
-    EvtPid = couch_replicator_clustering:link_cluster_event_listener(?MODULE,
-        notify_cluster_event, [self()]),
-    State = #state{event_listener = EvtPid, mdb_changes = nil},
-    case couch_replicator_clustering:is_stable() of
-        true ->
-            {ok, restart_mdb_changes(State)};
-        false ->
-            {ok, State}
-    end.
-
-
-terminate(_Reason, _State) ->
-    ok.
-
-
-handle_call(_Msg, _From, State) ->
-    {reply, {error, invalid_call}, State}.
-
-
-handle_cast({cluster, unstable}, State) ->
-    {noreply, stop_mdb_changes(State)};
-
-handle_cast({cluster, stable}, State) ->
-    {noreply, restart_mdb_changes(State)}.
-
-
-handle_info(_Msg, State) ->
-    {noreply, State}.
-
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-
--spec restart_mdb_changes(#state{}) -> #state{}.
-restart_mdb_changes(#state{mdb_changes = nil} = State) ->
-    Suffix = <<"_replicator">>,
-    CallbackMod = couch_replicator_doc_processor,
-    Options = [skip_ddocs],
-    {ok, Pid} = couch_multidb_changes:start_link(Suffix, CallbackMod, nil,
-        Options),
-    couch_stats:increment_counter([couch_replicator, db_scans]),
-    couch_log:notice("Started replicator db changes listener ~p", [Pid]),
-    State#state{mdb_changes = Pid};
-
-restart_mdb_changes(#state{mdb_changes = _Pid} = State) ->
-    restart_mdb_changes(stop_mdb_changes(State)).
-
-
--spec stop_mdb_changes(#state{}) -> #state{}.
-stop_mdb_changes(#state{mdb_changes = nil} = State) ->
-    State;
-stop_mdb_changes(#state{mdb_changes = Pid} = State) ->
-    couch_log:notice("Stopping replicator db changes listener ~p", [Pid]),
-    unlink(Pid),
-    exit(Pid, kill),
-    State#state{mdb_changes = nil}.
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index 1b43598..174fdda 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -13,7 +13,6 @@
 -module(couch_replicator_doc_processor).
 
 -behaviour(gen_server).
--behaviour(couch_multidb_changes).
 
 -export([
     start_link/0
@@ -29,10 +28,9 @@
 ]).
 
 -export([
-    db_created/2,
-    db_deleted/2,
-    db_found/2,
-    db_change/3
+    during_doc_update/3,
+    after_db_create/1,
+    after_db_delete/1
 ]).
 
 -export([
@@ -76,88 +74,118 @@
 }).
 
 
-% couch_multidb_changes API callbacks
+during_doc_update(#doc{} = Doc, Db, _UpdateType) ->
+    couch_stats:increment_counter([couch_replicator, docs, db_changes]),
+    try
+        process_change(Db, Doc)
+    catch
+        _Tag:Error ->
+            DocId = Doc#doc.id,
+            #{name := DbName} = Db,
+            couch_replicator_docs:update_failed(DbName, DocId, Error)
+    end,
+    ok.
+
 
-db_created(DbName, Server) ->
+after_db_create(#{name := DbName}) ->
     couch_stats:increment_counter([couch_replicator, docs, dbs_created]),
-    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
-    Server.
+    couch_replicator_docs:ensure_rep_ddoc_exists(DbName).
 
 
-db_deleted(DbName, Server) ->
+after_db_delete(#{name := DbName}) ->
     couch_stats:increment_counter([couch_replicator, docs, dbs_deleted]),
-    ok = gen_server:call(?MODULE, {clean_up_replications, DbName}, infinity),
-    Server.
+    gen_server:call(?MODULE, {clean_up_replications, DbName}, infinity).
 
 
-db_found(DbName, Server) ->
-    couch_stats:increment_counter([couch_replicator, docs, dbs_found]),
-    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
-    Server.
+docs_job_id(DbName, Id) when is_binary(DbName), is_binary(Id) ->
+    <<DbName/binary, Id/binary>>.
 
 
-db_change(DbName, {ChangeProps} = Change, Server) ->
-    couch_stats:increment_counter([couch_replicator, docs, db_changes]),
-    try
-        ok = process_change(DbName, Change)
-    catch
-    _Tag:Error ->
-        {RepProps} = get_json_value(doc, ChangeProps),
-        DocId = get_json_value(<<"_id">>, RepProps),
-        couch_replicator_docs:update_failed(DbName, DocId, Error)
-    end,
-    Server.
-
+process_change(_Db, #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>}) ->
+    ok;
 
--spec get_worker_ref(db_doc_id()) -> reference() | nil.
-get_worker_ref({DbName, DocId}) when is_binary(DbName), is_binary(DocId) ->
-    case ets:lookup(?MODULE, {DbName, DocId}) of
-        [#rdoc{worker = WRef}] when is_reference(WRef) ->
-            WRef;
-        [#rdoc{worker = nil}] ->
-            nil;
-        [] ->
-            nil
+process_change(#{name := DbName} = Db, #doc{deleted = true} = Doc) ->
+    Id = docs_job_id(DbName, Doc#doc.id),
+    case couch_jobs:get_job_data(Db, ?REP_DOCS, Id) of
+        {error, not_found} ->
+            ok;
+        {ok, #{<<"rid">> := null}} ->
+            couch_jobs:remove(Db, ?REP_DOCS, Id),
+            ok;
+        {ok, #{<<"rid">> := RepId}} ->
+            couch_jobs:remove(Db, ?REP_JOBS, RepId),
+            couch_jobs:remove(Db, ?REP_DOCS, Id),
+            ok
     end.
 
 
-% Cluster membership change notification callback
--spec notify_cluster_event(pid(), {cluster, any()}) -> ok.
-notify_cluster_event(Server, {cluster, _} = Event) ->
-    gen_server:cast(Server, Event).
+process_change(#{name := DbName} = Db, #doc{} = Doc) ->
+    #doc{id = DocId, body = {Props} = Body} = Doc,
+    Id = docs_job_id(DbName, DocId),
+    DocState = get_json_value(<<"_replication_state">>, Props),
+    {Rep, RepParseError} = try
+        Rep0 = couch_replicator_docs:parse_rep_doc_without_id(Body),
+        Rep1 = Rep0#{db_name = DbName, start_time = erlang:system_time()},
+        {Rep1, null}
+    catch
+        throw:{bad_rep_doc, Reason} ->
+            {null, Reason}
+    end,
+    RepMap = couch_replicator_docs:rep_to_map(Rep),
+    case couch_jobs:get_job_data(Db, ?REP_DOCS, Id) of
+        {error, not_found} ->
+            RepDocsJob = #{
+                <<"rep_id">> := null,
+                <<"db_name">> := DbName,
+                <<"doc_id">> := Doc#doc.id,
+                <<"rep">> := RepMap,
+                <<"rep_parse_error">> := RepParseError
+            },
+            couch_jobs:add(Db, ?REP_DOCS, RepDocsJob);
+        {ok, #{} = Old} ->
+            % Normalize old rep and new rep and only update job
+            % if changed
+            #{<<"rep">> := OldRep, <<"rep_parse_error">> := OldError} = Old,
+            NOldRep = couch_replicator_util:normalize_rep(OldRep),
+            NRep = couch_replicator_util:normalize_rep(Rep),
+            RepDocsJob = #{
+                <<"rep_id">> := null,
+                <<"db_name">> := DbName,
+                <<"doc_id">> := Doc#doc.id,
+                <<"rep">> := RepMap,
+                <<"rep_parse_error">> := RepParseError
+            }
+    end.
 
 
-process_change(DbName, {Change}) ->
-    {RepProps} = JsonRepDoc = get_json_value(doc, Change),
-    DocId = get_json_value(<<"_id">>, RepProps),
-    Owner = couch_replicator_clustering:owner(DbName, DocId),
+process_change(#{name := DbName} = Db, #doc{} = Doc) ->
+    #doc{id = DocId, body = {Props} = Body, deleted = Deleted} = Doc,
     Id = {DbName, DocId},
-    case {Owner, get_json_value(deleted, Change, false)} of
-    {_, true} ->
-        ok = gen_server:call(?MODULE, {removed, Id}, infinity);
-    {unstable, false} ->
-        couch_log:notice("Not starting '~s' as cluster is unstable", [DocId]);
-    {ThisNode, false} when ThisNode =:= node() ->
-        case get_json_value(<<"_replication_state">>, RepProps) of
-        undefined ->
-            ok = process_updated(Id, JsonRepDoc);
-        <<"triggered">> ->
+    case Deleted of
+        true ->
+            process_deleted_doc(Db, Id);
+        false ->
+            process_updated_doc(Db, Id, Body)
+    end.
+    State = get_json_value(<<"_replication_state">>, Props),
+    case {Deleted, State} of
+        {true, _} ->
+            ok = gen_server:call(?MODULE, {removed, Id}, infinity);
+        {false, undefined} ->
+            ok = process_updated(Id, Body);
+        {false, <<"triggered">>} ->
             maybe_remove_state_fields(DbName, DocId),
-            ok = process_updated(Id, JsonRepDoc);
-        <<"completed">> ->
+            ok = process_updated(Id, Body);
+        {false, <<"completed">>} ->
             ok = gen_server:call(?MODULE, {completed, Id}, infinity);
-        <<"error">> ->
+        {false, <<"error">>} ->
             % Handle replications started from older versions of replicator
             % which wrote transient errors to replication docs
             maybe_remove_state_fields(DbName, DocId),
-            ok = process_updated(Id, JsonRepDoc);
-        <<"failed">> ->
+            ok = process_updated(Id, Body);
+        {false, <<"failed">>} ->
             ok
-        end;
-    {Owner, false} ->
-        ok
-    end,
-    ok.
+    end.
 
 
 maybe_remove_state_fields(DbName, DocId) ->
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index bbf9694..de34b53 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -18,11 +18,11 @@
     parse_rep_db/3,
     parse_rep_doc_without_id/1,
     parse_rep_doc_without_id/2,
+    rep_to_map/1,
     before_doc_update/3,
     after_doc_read/2,
     ensure_rep_db_exists/0,
     ensure_rep_ddoc_exists/1,
-    ensure_cluster_rep_ddoc_exists/1,
     remove_state_fields/2,
     update_doc_completed/3,
     update_failed/3,
@@ -121,34 +121,22 @@ update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) ->
     ok.
 
 
--spec ensure_rep_db_exists() -> {ok, Db::any()}.
+-spec ensure_rep_db_exists() -> ok.
 ensure_rep_db_exists() ->
-    Db = case couch_db:open_int(?REP_DB_NAME, [?CTX, sys_db,
-            nologifmissing]) of
-        {ok, Db0} ->
-            Db0;
-        _Error ->
-            {ok, Db0} = couch_db:create(?REP_DB_NAME, [?CTX, sys_db]),
-            Db0
-    end,
-    ok = ensure_rep_ddoc_exists(?REP_DB_NAME),
-    {ok, Db}.
-
-
--spec ensure_rep_ddoc_exists(binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb) ->
-    case mem3:belongs(RepDb, ?REP_DESIGN_DOC) of
-        true ->
-            ensure_rep_ddoc_exists(RepDb, ?REP_DESIGN_DOC);
-        false ->
+    Opts = [?CTX, sys_db, nologifmissing],
+    case fabric2_db:create(?REP_DB_NAME, Opts) of
+        {error, file_exists} ->
+            ok;
+        {ok, _Db} ->
             ok
     end.
 
 
--spec ensure_rep_ddoc_exists(binary(), binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb, DDocId) ->
+-spec ensure_rep_ddoc_exists(binary()) -> ok.
+ensure_rep_ddoc_exists(RepDb) ->
+    DDocId = ?REP_DESIGN_DOC,
     case open_rep_doc(RepDb, DDocId) of
-        {not_found, no_db_file} ->
+        {not_found, database_does_not_exist} ->
             %% database was deleted.
             ok;
         {not_found, _Reason} ->
@@ -179,13 +167,6 @@ ensure_rep_ddoc_exists(RepDb, DDocId) ->
     ok.
 
 
--spec ensure_cluster_rep_ddoc_exists(binary()) -> ok.
-ensure_cluster_rep_ddoc_exists(RepDb) ->
-    DDocId = ?REP_DESIGN_DOC,
-    [#shard{name = DbShard} | _] = mem3:shards(RepDb, DDocId),
-    ensure_rep_ddoc_exists(DbShard, DDocId).
-
-
 -spec compare_ejson({[_]}, {[_]}) -> boolean().
 compare_ejson(EJson1, EJson2) ->
     EjsonSorted1 = couch_replicator_filters:ejsort(EJson1),
@@ -291,6 +272,26 @@ parse_rep_doc_without_id({Props}, UserCtx) ->
     end.
 
 
+rep_to_map(null) ->
+    null;
+
+rep_to_map(#rep{} = Rep) ->
+    {IdBase, IdExt} = Rep#rep.id,
+    #{
+        <<"id_base">> => IdBase,
+        <<"id_ext">> => IdExt,
+        <<"source">> => Rep#rep.source,
+        <<"target">> => Rep#rep.target,
+        <<"options">> => Rep#rep.options,
+        <<"user_ctx">> => Rep#rep.user_ctx,
+        <<"type">> => Rep#rep.type,
+        <<"view">> => Rep#rep.view,
+        <<"doc_id">> => Rep#rep.doc_id,
+        <<"db_name">> => Rep#rep.db_name,
+        <<"start_time">> = Rep#rep.start_time,
+        <<"stats">> = Rep#rep.stats
+    }.
+
 % Update a #rep{} record with a replication_id. Calculating the id might involve
 % fetching a filter from the source db, and so it could fail intermetently.
 % In case of a failure to fetch the filter this function will throw a
@@ -350,22 +351,21 @@ update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) ->
 
 
 open_rep_doc(DbName, DocId) ->
-    case couch_db:open_int(DbName, [?CTX, sys_db]) of
-        {ok, Db} ->
-            try
-                couch_db:open_doc(Db, DocId, [ejson_body])
-            after
-                couch_db:close(Db)
-            end;
-        Else ->
-            Else
+    try
+        case fabric2_db:open(DbName, [?CTX, sys_db]) of
+            {ok, Db} -> fabric2_db:open_doc(Db, DocId, [ejson_body]);
+            Else -> Else
+        end
+    catch
+        error:database_does_not_exist ->
+            {not_found, database_does_not_exist}
     end.
 
 
 save_rep_doc(DbName, Doc) ->
-    {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
+    {ok, Db} = fabric2_db:open(DbName, [?CTX, sys_db]),
     try
-        couch_db:update_doc(Db, Doc, [])
+        fabric2_db:update_doc(Db, Doc, [])
     catch
         % User can accidently write a VDU which prevents _replicator from
         % updating replication documents. Avoid crashing replicator and thus
@@ -374,8 +374,6 @@ save_rep_doc(DbName, Doc) ->
             Msg = "~p VDU function preventing doc update to ~s ~s ~p",
             couch_log:error(Msg, [?MODULE, DbName, Doc#doc.id, Reason]),
             {ok, forbidden}
-    after
-        couch_db:close(Db)
     end.
 
 
@@ -622,7 +620,7 @@ before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
     #user_ctx{
        roles = Roles,
        name = Name
-    } = couch_db:get_user_ctx(Db),
+    } = fabric2_db:get_user_ctx(Db),
     case lists:member(<<"_replicator">>, Roles) of
     true ->
         Doc;
@@ -633,7 +631,7 @@ before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
         Name ->
             Doc;
         Other ->
-            case (catch couch_db:check_is_admin(Db)) of
+            case (catch fabric2_db:check_is_admin(Db)) of
             ok when Other =:= null ->
                 Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
             ok ->
@@ -650,8 +648,8 @@ before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
 after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
     Doc;
 after_doc_read(#doc{body = {Body}} = Doc, Db) ->
-    #user_ctx{name = Name} = couch_db:get_user_ctx(Db),
-    case (catch couch_db:check_is_admin(Db)) of
+    #user_ctx{name = Name} = fabric2_db:get_user_ctx(Db),
+    case (catch fabric2_db:check_is_admin(Db)) of
     ok ->
         Doc;
     _ ->
@@ -659,16 +657,15 @@ after_doc_read(#doc{body = {Body}} = Doc, Db) ->
         Name ->
             Doc;
         _Other ->
-            Source = strip_credentials(couch_util:get_value(<<"source">>,
-Body)),
-            Target = strip_credentials(couch_util:get_value(<<"target">>,
-Body)),
+            Source0 = couch_util:get_value(<<"source">>, Body),
+            Target0 = couch_util:get_value(<<"target">>, Body),
+            Source = strip_credentials(Source0),
+            Target = strip_credentials(Target0),
             NewBody0 = ?replace(Body, <<"source">>, Source),
             NewBody = ?replace(NewBody0, <<"target">>, Target),
             #doc{revs = {Pos, [_ | Revs]}} = Doc,
             NewDoc = Doc#doc{body = {NewBody}, revs = {Pos - 1, Revs}},
-            NewRevId = couch_db:new_revid(NewDoc),
-            NewDoc#doc{revs = {Pos, [NewRevId | Revs]}}
+            fabric2_db:new_revid(NewDoc)
         end
     end.
 
@@ -779,27 +776,24 @@ check_strip_credentials_test() ->
 
 setup() ->
     DbName = ?tempdb(),
-    {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
-    ok = couch_db:close(Db),
-    create_vdu(DbName),
+    {ok, Db} = fabric2_db:create(DbName, [?ADMIN_CTX]),
+    create_vdu(Db),
     DbName.
 
 
 teardown(DbName) when is_binary(DbName) ->
-    couch_server:delete(DbName, [?ADMIN_CTX]),
+    fabric2_db:delete(DbName, [?ADMIN_CTX]),
     ok.
 
 
-create_vdu(DbName) ->
-    couch_util:with_db(DbName, fun(Db) ->
-        VduFun = <<"function(newdoc, olddoc, userctx) {throw({'forbidden':'fail'})}">>,
-        Doc = #doc{
-            id = <<"_design/vdu">>,
-            body = {[{<<"validate_doc_update">>, VduFun}]}
-        },
-        {ok, _} = couch_db:update_docs(Db, [Doc]),
-        couch_db:ensure_full_commit(Db)
-    end).
+create_vdu(Db) ->
+    VduFun = <<"function(newdoc, olddoc, userctx) {throw({'forbidden':'fail'})}">>,
+    Doc = #doc{
+        id = <<"_design/vdu">>,
+        body = {[{<<"validate_doc_update">>, VduFun}]}
+    },
+    {ok, _} = fabric2_db:update_doc(Db, [Doc]),
+    ok.
 
 
 update_replicator_doc_with_bad_vdu_test_() ->
diff --git a/src/couch_replicator/src/couch_replicator_sup.erl b/src/couch_replicator/src/couch_replicator_sup.erl
index 5475e8f..8202b1c 100644
--- a/src/couch_replicator/src/couch_replicator_sup.erl
+++ b/src/couch_replicator/src/couch_replicator_sup.erl
@@ -70,12 +70,6 @@ init(_Args) ->
             transient,
             brutal_kill,
             worker,
-            [couch_replicator]},
-        {couch_replicator_db_changes,
-            {couch_replicator_db_changes, start_link, []},
-            permanent,
-            brutal_kill,
-            worker,
-            [couch_multidb_changes]}
+            [couch_replicator]}
     ],
     {ok, {{rest_for_one,10,1}, Children}}.
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 7114903..b4ff8dc 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -29,6 +29,9 @@
     name/1,
     get_after_doc_read_fun/1,
     get_before_doc_update_fun/1,
+    get_during_doc_update_fun/1,
+    get_after_db_create_fun/1,
+    get_after_db_delete_fun/1,
     get_committed_update_seq/1,
     get_compacted_seq/1,
     get_compactor_pid/1,
@@ -153,7 +156,9 @@ create(DbName, Options) ->
         #{} = Db0 ->
             Db1 = maybe_add_sys_db_callbacks(Db0),
             ok = fabric2_server:store(Db1),
-            {ok, Db1#{tx := undefined}};
+            Db2 = Db1#{tx := undefined},
+            ok = apply_after_db_create(Db2),
+            {ok, Db2};
         Error ->
             Error
     end.
@@ -186,6 +191,7 @@ delete(DbName, Options) ->
         fabric2_fdb:delete(TxDb)
     end),
     if Resp /= ok -> Resp; true ->
+        ok = apply_after_db_delete(Db#{tx := undefined}),
         fabric2_server:remove(DbName)
     end.
 
@@ -262,6 +268,19 @@ get_after_doc_read_fun(#{after_doc_read := AfterDocRead}) ->
 get_before_doc_update_fun(#{before_doc_update := BeforeDocUpdate}) ->
     BeforeDocUpdate.
 
+
+get_during_doc_update_fun(#{during_doc_update := DuringDocUpdate}) ->
+    DuringDocUpdate.
+
+
+get_after_db_create_fun(#{after_db_create := AfterDbCreate}) ->
+    AfterDbCreate.
+
+
+get_after_db_delete_fun(#{after_db_delete := AfterDbDelete}) ->
+    AfterDbDelete.
+
+
 get_committed_update_seq(#{} = Db) ->
     get_update_seq(Db).
 
@@ -745,24 +764,33 @@ maybe_add_sys_db_callbacks(Db) ->
     IsGlobalUsersDb = fabric2_util:dbname_ends_with(Db, <<"_users">>),
     IsUsersDb = IsAuthCache orelse IsCfgUsersDb orelse IsGlobalUsersDb,
 
-    {BDU, ADR} = if
+    {BDU, DDU, ADR, ADC, ADD} = if
         IsReplicatorDb ->
             {
                 fun couch_replicator_docs:before_doc_update/3,
-                fun couch_replicator_docs:after_doc_read/2
+                fun couch_replicator_doc_processor:during_doc_update/3,
+                fun couch_replicator_docs:after_doc_read/2,
+                fun couch_replicator_doc_processor:after_db_create/1,
+                fun couch_replicator_doc_processor:after_db_delete/1
             };
         IsUsersDb ->
             {
                 fun fabric2_users_db:before_doc_update/3,
-                fun fabric2_users_db:after_doc_read/2
+                undefined,
+                fun fabric2_users_db:after_doc_read/2,
+                undefined,
+                undefined
             };
         true ->
-            {undefined, undefined}
+            {undefined, undefined, undefined, undefined, undefined}
     end,
 
     Db#{
         before_doc_update := BDU,
-        after_doc_read := ADR
+        during_doc_update := DDU,
+        after_doc_read := ADR,
+        after_db_create := ADC,
+        after_db_delete := ADD
     }.
 
 
@@ -996,6 +1024,33 @@ apply_before_doc_update(Db, Docs, Options) ->
     end.
 
 
+apply_during_doc_update(#{during_doc_update := DDU} = Db, Doc, UpdateType)
+        when is_function(DDU, 3) ->
+    DDU(Doc, Db, UpdateType),
+    ok;
+
+apply_during_doc_update(#{during_doc_update := undefined}, _, _) ->
+    ok.
+
+
+apply_after_db_create(#{after_db_create := ADC} = Db)
+        when is_function(ADC, 1) ->
+    ADC(Db),
+    ok;
+
+apply_after_db_create(#{after_db_create := undefined}) ->
+    ok.
+
+
+apply_after_db_delete(#{after_db_delete := ADD} = Db)
+        when is_function(ADD, 1) ->
+    ADD(Db),
+    ok;
+
+apply_after_db_delete(#{after_db_delete := undefined}) ->
+    ok.
+
+
 update_doc_int(#{} = Db, #doc{} = Doc, Options) ->
     IsLocal = case Doc#doc.id of
         <<?LOCAL_DOC_PREFIX, _/binary>> -> true;
@@ -1172,6 +1227,8 @@ update_doc_interactive(Db, Doc0, Future, _Options) ->
             ToRemove
         ),
 
+    ok = apply_during_doc_update(Db, Doc3, interactive_edit),
+
     {ok, {NewRevPos, NewRev}}.
 
 
@@ -1255,6 +1312,8 @@ update_doc_replicated(Db, Doc0, _Options) ->
             ToRemove
         ),
 
+    ok = apply_during_doc_update(Db, Doc3, replicated_changes),
+
     {ok, []}.
 
 
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index 670ce8b..cc513d1 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -162,7 +162,10 @@ create(#{} = Db0, Options) ->
 
         validate_doc_update_funs => [],
         before_doc_update => undefined,
+        during_doc_update => undefined,
         after_doc_read => undefined,
+        after_db_create => undefined,
+        after_db_delete => undefined,
         % All other db things as we add features,
 
         db_options => Options
@@ -199,8 +202,10 @@ open(#{} = Db0, Options) ->
         % bits.
         validate_doc_update_funs => [],
         before_doc_update => undefined,
+        during_doc_update => undefined,
         after_doc_read => undefined,
-
+        after_db_create => undefined,
+        after_db_delete => undefined,
         db_options => Options
     },