You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2019/07/16 16:57:35 UTC
[couchdb] 01/01: FDB Replicator WIP
This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch prototype/fdb-replicator
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 61808e5ee13be0a8e5984e9c16ee3e982d837b83
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Tue Jul 16 12:57:13 2019 -0400
FDB Replicator WIP
---
src/couch_replicator/src/couch_replicator.erl | 4 +-
src/couch_replicator/src/couch_replicator.hrl | 6 +-
.../src/couch_replicator_db_changes.erl | 108 --------------
.../src/couch_replicator_doc_processor.erl | 158 ++++++++++++---------
src/couch_replicator/src/couch_replicator_docs.erl | 128 ++++++++---------
src/couch_replicator/src/couch_replicator_sup.erl | 8 +-
src/fabric/src/fabric2_db.erl | 71 ++++++++-
src/fabric/src/fabric2_fdb.erl | 7 +-
8 files changed, 234 insertions(+), 256 deletions(-)
diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index 39141c3..e2d1964 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -83,7 +83,9 @@ replicate(PostBody, Ctx) ->
% it returns `ignore`.
-spec ensure_rep_db_exists() -> ignore.
ensure_rep_db_exists() ->
- {ok, _Db} = couch_replicator_docs:ensure_rep_db_exists(),
+ ok = couch_replicator_docs:ensure_rep_db_exists(),
+ couch_jobs:set_type_timeout(?REP_DOCS, ?REP_DOCS_TIMEOUT_MSEC),
+ couch_jobs:set_type_timeout(?REP_JOBS, ?REP_JOBS_TIMEOUT_MSEC),
ignore.
diff --git a/src/couch_replicator/src/couch_replicator.hrl b/src/couch_replicator/src/couch_replicator.hrl
index 2a5b7c8..8f7a77a 100644
--- a/src/couch_replicator/src/couch_replicator.hrl
+++ b/src/couch_replicator/src/couch_replicator.hrl
@@ -11,6 +11,10 @@
% the License.
-define(REP_ID_VERSION, 4).
+-define(REP_DOCS, <<"repdocs">>).
+-define(REP_JOBS, <<"repjobs">>).
+-define(REP_DOCS_TIMEOUT_MSEC, 17000).
+-define(REP_JOBS_TIMEOUT_MSEC, 33000).
-record(rep, {
id :: rep_id() | '_' | 'undefined',
@@ -22,7 +26,7 @@
view = nil :: any() | '_',
doc_id :: any() | '_',
db_name = null :: null | binary() | '_',
- start_time = {0, 0, 0} :: erlang:timestamp() | '_',
+ start_time = 0 :: integer() | '_',
stats = couch_replicator_stats:new() :: orddict:orddict() | '_'
}).
diff --git a/src/couch_replicator/src/couch_replicator_db_changes.erl b/src/couch_replicator/src/couch_replicator_db_changes.erl
deleted file mode 100644
index 92b0222..0000000
--- a/src/couch_replicator/src/couch_replicator_db_changes.erl
+++ /dev/null
@@ -1,108 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_db_changes).
-
--behaviour(gen_server).
-
--export([
- start_link/0
-]).
-
--export([
- init/1,
- terminate/2,
- handle_call/3,
- handle_info/2,
- handle_cast/2,
- code_change/3
-]).
-
--export([
- notify_cluster_event/2
-]).
-
--record(state, {
- event_listener :: pid(),
- mdb_changes :: pid() | nil
-}).
-
-
--spec notify_cluster_event(pid(), {cluster, any()}) -> ok.
-notify_cluster_event(Server, {cluster, _} = Event) ->
- gen_server:cast(Server, Event).
-
-
--spec start_link() ->
- {ok, pid()} | ignore | {error, any()}.
-start_link() ->
- gen_server:start_link(?MODULE, [], []).
-
-
-init([]) ->
- EvtPid = couch_replicator_clustering:link_cluster_event_listener(?MODULE,
- notify_cluster_event, [self()]),
- State = #state{event_listener = EvtPid, mdb_changes = nil},
- case couch_replicator_clustering:is_stable() of
- true ->
- {ok, restart_mdb_changes(State)};
- false ->
- {ok, State}
- end.
-
-
-terminate(_Reason, _State) ->
- ok.
-
-
-handle_call(_Msg, _From, State) ->
- {reply, {error, invalid_call}, State}.
-
-
-handle_cast({cluster, unstable}, State) ->
- {noreply, stop_mdb_changes(State)};
-
-handle_cast({cluster, stable}, State) ->
- {noreply, restart_mdb_changes(State)}.
-
-
-handle_info(_Msg, State) ->
- {noreply, State}.
-
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-
--spec restart_mdb_changes(#state{}) -> #state{}.
-restart_mdb_changes(#state{mdb_changes = nil} = State) ->
- Suffix = <<"_replicator">>,
- CallbackMod = couch_replicator_doc_processor,
- Options = [skip_ddocs],
- {ok, Pid} = couch_multidb_changes:start_link(Suffix, CallbackMod, nil,
- Options),
- couch_stats:increment_counter([couch_replicator, db_scans]),
- couch_log:notice("Started replicator db changes listener ~p", [Pid]),
- State#state{mdb_changes = Pid};
-
-restart_mdb_changes(#state{mdb_changes = _Pid} = State) ->
- restart_mdb_changes(stop_mdb_changes(State)).
-
-
--spec stop_mdb_changes(#state{}) -> #state{}.
-stop_mdb_changes(#state{mdb_changes = nil} = State) ->
- State;
-stop_mdb_changes(#state{mdb_changes = Pid} = State) ->
- couch_log:notice("Stopping replicator db changes listener ~p", [Pid]),
- unlink(Pid),
- exit(Pid, kill),
- State#state{mdb_changes = nil}.
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index 1b43598..174fdda 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -13,7 +13,6 @@
-module(couch_replicator_doc_processor).
-behaviour(gen_server).
--behaviour(couch_multidb_changes).
-export([
start_link/0
@@ -29,10 +28,9 @@
]).
-export([
- db_created/2,
- db_deleted/2,
- db_found/2,
- db_change/3
+ during_doc_update/3,
+ after_db_create/1,
+ after_db_delete/1
]).
-export([
@@ -76,88 +74,118 @@
}).
-% couch_multidb_changes API callbacks
+during_doc_update(#doc{} = Doc, Db, _UpdateType) ->
+ couch_stats:increment_counter([couch_replicator, docs, db_changes]),
+ try
+ process_change(Db, Doc)
+ catch
+ _Tag:Error ->
+ DocId = Doc#doc.id,
+ #{name := DbName} = Db,
+ couch_replicator_docs:update_failed(DbName, DocId, Error)
+ end,
+ ok.
+
-db_created(DbName, Server) ->
+after_db_create(#{name := DbName}) ->
couch_stats:increment_counter([couch_replicator, docs, dbs_created]),
- couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
- Server.
+ couch_replicator_docs:ensure_rep_ddoc_exists(DbName).
-db_deleted(DbName, Server) ->
+after_db_delete(#{name := DbName}) ->
couch_stats:increment_counter([couch_replicator, docs, dbs_deleted]),
- ok = gen_server:call(?MODULE, {clean_up_replications, DbName}, infinity),
- Server.
+ gen_server:call(?MODULE, {clean_up_replications, DbName}, infinity).
-db_found(DbName, Server) ->
- couch_stats:increment_counter([couch_replicator, docs, dbs_found]),
- couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
- Server.
+docs_job_id(DbName, Id) when is_binary(DbName), is_binary(Id) ->
+ <<DbName/binary, Id/binary>>.
-db_change(DbName, {ChangeProps} = Change, Server) ->
- couch_stats:increment_counter([couch_replicator, docs, db_changes]),
- try
- ok = process_change(DbName, Change)
- catch
- _Tag:Error ->
- {RepProps} = get_json_value(doc, ChangeProps),
- DocId = get_json_value(<<"_id">>, RepProps),
- couch_replicator_docs:update_failed(DbName, DocId, Error)
- end,
- Server.
-
+process_change(_Db, #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>}) ->
+ ok;
--spec get_worker_ref(db_doc_id()) -> reference() | nil.
-get_worker_ref({DbName, DocId}) when is_binary(DbName), is_binary(DocId) ->
- case ets:lookup(?MODULE, {DbName, DocId}) of
- [#rdoc{worker = WRef}] when is_reference(WRef) ->
- WRef;
- [#rdoc{worker = nil}] ->
- nil;
- [] ->
- nil
+process_change(#{name := DbName} = Db, #doc{deleted = true} = Doc) ->
+ Id = docs_job_id(DbName, Doc#doc.id),
+ case couch_jobs:get_job_data(Db, ?REP_DOCS, Id) of
+ {error, not_found} ->
+ ok;
+ {ok, #{<<"rid">> := null}} ->
+ couch_jobs:remove(Db, ?REP_DOCS, Id),
+ ok;
+ {ok, #{<<"rid">> := RepId}} ->
+ couch_jobs:remove(Db, ?REP_JOBS, RepId),
+ couch_jobs:remove(Db, ?REP_DOCS, Id),
+ ok
end.
-% Cluster membership change notification callback
--spec notify_cluster_event(pid(), {cluster, any()}) -> ok.
-notify_cluster_event(Server, {cluster, _} = Event) ->
- gen_server:cast(Server, Event).
+process_change(#{name := DbName} = Db, #doc{} = Doc) ->
+ #doc{id = DocId, body = {Props} = Body} = Doc,
+ Id = docs_job_id(DbName, DocId),
+ DocState = get_json_value(<<"_replication_state">>, Props),
+ {Rep, RepParseError} = try
+ Rep0 = couch_replicator_docs:parse_rep_doc_without_id(Body),
+ Rep1 = Rep0#{db_name = DbName, start_time = erlang:system_time()},
+ {Rep1, null}
+ catch
+ throw:{bad_rep_doc, Reason} ->
+ {null, Reason}
+ end,
+ RepMap = couch_replicator_docs:rep_to_map(Rep),
+ case couch_jobs:get_job_data(Db, ?REP_DOCS, Id) of
+ {error, not_found} ->
+ RepDocsJob = #{
+ <<"rep_id">> := null,
+ <<"db_name">> := DbName,
+ <<"doc_id">> := Doc#doc.id,
+ <<"rep">> := RepMap,
+ <<"rep_parse_error">> := RepParseError
+ },
+ couch_jobs:add(Db, ?REP_DOCS, RepDocsJob);
+ {ok, #{} = Old} ->
+ % Normalize old rep and new rep and only update job
+ % if changed
+ #{<<"rep">> := OldRep, <<"rep_parse_error">> := OldError} = Old,
+ NOldRep = couch_replicator_util:normalize_rep(OldRep),
+ NRep = couch_replicator_util:normalize_rep(Rep),
+ RepDocsJob = #{
+ <<"rep_id">> := null,
+ <<"db_name">> := DbName,
+ <<"doc_id">> := Doc#doc.id,
+ <<"rep">> := RepMap,
+ <<"rep_parse_error">> := RepParseError
+ }
+ end.
-process_change(DbName, {Change}) ->
- {RepProps} = JsonRepDoc = get_json_value(doc, Change),
- DocId = get_json_value(<<"_id">>, RepProps),
- Owner = couch_replicator_clustering:owner(DbName, DocId),
+process_change(#{name := DbName} = Db, #doc{} = Doc) ->
+ #doc{id = DocId, body = {Props} = Body, deleted = Deleted} = Doc,
Id = {DbName, DocId},
- case {Owner, get_json_value(deleted, Change, false)} of
- {_, true} ->
- ok = gen_server:call(?MODULE, {removed, Id}, infinity);
- {unstable, false} ->
- couch_log:notice("Not starting '~s' as cluster is unstable", [DocId]);
- {ThisNode, false} when ThisNode =:= node() ->
- case get_json_value(<<"_replication_state">>, RepProps) of
- undefined ->
- ok = process_updated(Id, JsonRepDoc);
- <<"triggered">> ->
+ case Deleted of
+ true ->
+ process_deleted_doc(Db, Id);
+ false ->
+ process_updated_doc(Db, Id, Body)
+ end.
+ State = get_json_value(<<"_replication_state">>, Props),
+ case {Deleted, State} of
+ {true, _} ->
+ ok = gen_server:call(?MODULE, {removed, Id}, infinity);
+ {false, undefined} ->
+ ok = process_updated(Id, Body);
+ {false, <<"triggered">>} ->
maybe_remove_state_fields(DbName, DocId),
- ok = process_updated(Id, JsonRepDoc);
- <<"completed">> ->
+ ok = process_updated(Id, Body);
+ {false, <<"completed">>} ->
ok = gen_server:call(?MODULE, {completed, Id}, infinity);
- <<"error">> ->
+ {false, <<"error">>} ->
% Handle replications started from older versions of replicator
% which wrote transient errors to replication docs
maybe_remove_state_fields(DbName, DocId),
- ok = process_updated(Id, JsonRepDoc);
- <<"failed">> ->
+ ok = process_updated(Id, Body);
+ {false, <<"failed">>} ->
ok
- end;
- {Owner, false} ->
- ok
- end,
- ok.
+ end.
maybe_remove_state_fields(DbName, DocId) ->
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index bbf9694..de34b53 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -18,11 +18,11 @@
parse_rep_db/3,
parse_rep_doc_without_id/1,
parse_rep_doc_without_id/2,
+ rep_to_map/1,
before_doc_update/3,
after_doc_read/2,
ensure_rep_db_exists/0,
ensure_rep_ddoc_exists/1,
- ensure_cluster_rep_ddoc_exists/1,
remove_state_fields/2,
update_doc_completed/3,
update_failed/3,
@@ -121,34 +121,22 @@ update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) ->
ok.
--spec ensure_rep_db_exists() -> {ok, Db::any()}.
+-spec ensure_rep_db_exists() -> ok.
ensure_rep_db_exists() ->
- Db = case couch_db:open_int(?REP_DB_NAME, [?CTX, sys_db,
- nologifmissing]) of
- {ok, Db0} ->
- Db0;
- _Error ->
- {ok, Db0} = couch_db:create(?REP_DB_NAME, [?CTX, sys_db]),
- Db0
- end,
- ok = ensure_rep_ddoc_exists(?REP_DB_NAME),
- {ok, Db}.
-
-
--spec ensure_rep_ddoc_exists(binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb) ->
- case mem3:belongs(RepDb, ?REP_DESIGN_DOC) of
- true ->
- ensure_rep_ddoc_exists(RepDb, ?REP_DESIGN_DOC);
- false ->
+ Opts = [?CTX, sys_db, nologifmissing],
+ case fabric2_db:create(?REP_DB_NAME, Opts) of
+ {error, file_exists} ->
+ ok;
+ {ok, _Db} ->
ok
end.
--spec ensure_rep_ddoc_exists(binary(), binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb, DDocId) ->
+-spec ensure_rep_ddoc_exists(binary()) -> ok.
+ensure_rep_ddoc_exists(RepDb) ->
+ DDocId = ?REP_DESIGN_DOC,
case open_rep_doc(RepDb, DDocId) of
- {not_found, no_db_file} ->
+ {not_found, database_does_not_exist} ->
%% database was deleted.
ok;
{not_found, _Reason} ->
@@ -179,13 +167,6 @@ ensure_rep_ddoc_exists(RepDb, DDocId) ->
ok.
--spec ensure_cluster_rep_ddoc_exists(binary()) -> ok.
-ensure_cluster_rep_ddoc_exists(RepDb) ->
- DDocId = ?REP_DESIGN_DOC,
- [#shard{name = DbShard} | _] = mem3:shards(RepDb, DDocId),
- ensure_rep_ddoc_exists(DbShard, DDocId).
-
-
-spec compare_ejson({[_]}, {[_]}) -> boolean().
compare_ejson(EJson1, EJson2) ->
EjsonSorted1 = couch_replicator_filters:ejsort(EJson1),
@@ -291,6 +272,26 @@ parse_rep_doc_without_id({Props}, UserCtx) ->
end.
+rep_to_map(null) ->
+ null;
+
+rep_to_map(#rep{} = Rep) ->
+ {IdBase, IdExt} = Rep#rep.id,
+ #{
+ <<"id_base">> => IdBase,
+ <<"id_ext">> => IdExt,
+ <<"source">> => Rep#rep.source,
+ <<"target">> => Rep#rep.target,
+ <<"options">> => Rep#rep.options,
+ <<"user_ctx">> => Rep#rep.user_ctx,
+ <<"type">> => Rep#rep.type,
+ <<"view">> => Rep#rep.view,
+ <<"doc_id">> => Rep#rep.doc_id,
+ <<"db_name">> => Rep#rep.db_name,
+ <<"start_time">> = Rep#rep.start_time,
+ <<"stats">> = Rep#rep.stats
+ }.
+
% Update a #rep{} record with a replication_id. Calculating the id might involve
% fetching a filter from the source db, and so it could fail intermetently.
% In case of a failure to fetch the filter this function will throw a
@@ -350,22 +351,21 @@ update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) ->
open_rep_doc(DbName, DocId) ->
- case couch_db:open_int(DbName, [?CTX, sys_db]) of
- {ok, Db} ->
- try
- couch_db:open_doc(Db, DocId, [ejson_body])
- after
- couch_db:close(Db)
- end;
- Else ->
- Else
+ try
+ case fabric2_db:open(DbName, [?CTX, sys_db]) of
+ {ok, Db} -> fabric2_db:open_doc(Db, DocId, [ejson_body]);
+ Else -> Else
+ end
+ catch
+ error:database_does_not_exist ->
+ {not_found, database_does_not_exist}
end.
save_rep_doc(DbName, Doc) ->
- {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
+ {ok, Db} = fabric2_db:open(DbName, [?CTX, sys_db]),
try
- couch_db:update_doc(Db, Doc, [])
+ fabric2_db:update_doc(Db, Doc, [])
catch
% User can accidently write a VDU which prevents _replicator from
% updating replication documents. Avoid crashing replicator and thus
@@ -374,8 +374,6 @@ save_rep_doc(DbName, Doc) ->
Msg = "~p VDU function preventing doc update to ~s ~s ~p",
couch_log:error(Msg, [?MODULE, DbName, Doc#doc.id, Reason]),
{ok, forbidden}
- after
- couch_db:close(Db)
end.
@@ -622,7 +620,7 @@ before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
#user_ctx{
roles = Roles,
name = Name
- } = couch_db:get_user_ctx(Db),
+ } = fabric2_db:get_user_ctx(Db),
case lists:member(<<"_replicator">>, Roles) of
true ->
Doc;
@@ -633,7 +631,7 @@ before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
Name ->
Doc;
Other ->
- case (catch couch_db:check_is_admin(Db)) of
+ case (catch fabric2_db:check_is_admin(Db)) of
ok when Other =:= null ->
Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
ok ->
@@ -650,8 +648,8 @@ before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
Doc;
after_doc_read(#doc{body = {Body}} = Doc, Db) ->
- #user_ctx{name = Name} = couch_db:get_user_ctx(Db),
- case (catch couch_db:check_is_admin(Db)) of
+ #user_ctx{name = Name} = fabric2_db:get_user_ctx(Db),
+ case (catch fabric2_db:check_is_admin(Db)) of
ok ->
Doc;
_ ->
@@ -659,16 +657,15 @@ after_doc_read(#doc{body = {Body}} = Doc, Db) ->
Name ->
Doc;
_Other ->
- Source = strip_credentials(couch_util:get_value(<<"source">>,
-Body)),
- Target = strip_credentials(couch_util:get_value(<<"target">>,
-Body)),
+ Source0 = couch_util:get_value(<<"source">>, Body),
+ Target0 = couch_util:get_value(<<"target">>, Body),
+ Source = strip_credentials(Source0),
+ Target = strip_credentials(Target0),
NewBody0 = ?replace(Body, <<"source">>, Source),
NewBody = ?replace(NewBody0, <<"target">>, Target),
#doc{revs = {Pos, [_ | Revs]}} = Doc,
NewDoc = Doc#doc{body = {NewBody}, revs = {Pos - 1, Revs}},
- NewRevId = couch_db:new_revid(NewDoc),
- NewDoc#doc{revs = {Pos, [NewRevId | Revs]}}
+ fabric2_db:new_revid(NewDoc)
end
end.
@@ -779,27 +776,24 @@ check_strip_credentials_test() ->
setup() ->
DbName = ?tempdb(),
- {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
- ok = couch_db:close(Db),
- create_vdu(DbName),
+ {ok, Db} = fabric2_db:create(DbName, [?ADMIN_CTX]),
+ create_vdu(Db),
DbName.
teardown(DbName) when is_binary(DbName) ->
- couch_server:delete(DbName, [?ADMIN_CTX]),
+ fabric2_db:delete(DbName, [?ADMIN_CTX]),
ok.
-create_vdu(DbName) ->
- couch_util:with_db(DbName, fun(Db) ->
- VduFun = <<"function(newdoc, olddoc, userctx) {throw({'forbidden':'fail'})}">>,
- Doc = #doc{
- id = <<"_design/vdu">>,
- body = {[{<<"validate_doc_update">>, VduFun}]}
- },
- {ok, _} = couch_db:update_docs(Db, [Doc]),
- couch_db:ensure_full_commit(Db)
- end).
+create_vdu(Db) ->
+ VduFun = <<"function(newdoc, olddoc, userctx) {throw({'forbidden':'fail'})}">>,
+ Doc = #doc{
+ id = <<"_design/vdu">>,
+ body = {[{<<"validate_doc_update">>, VduFun}]}
+ },
+ {ok, _} = fabric2_db:update_doc(Db, [Doc]),
+ ok.
update_replicator_doc_with_bad_vdu_test_() ->
diff --git a/src/couch_replicator/src/couch_replicator_sup.erl b/src/couch_replicator/src/couch_replicator_sup.erl
index 5475e8f..8202b1c 100644
--- a/src/couch_replicator/src/couch_replicator_sup.erl
+++ b/src/couch_replicator/src/couch_replicator_sup.erl
@@ -70,12 +70,6 @@ init(_Args) ->
transient,
brutal_kill,
worker,
- [couch_replicator]},
- {couch_replicator_db_changes,
- {couch_replicator_db_changes, start_link, []},
- permanent,
- brutal_kill,
- worker,
- [couch_multidb_changes]}
+ [couch_replicator]}
],
{ok, {{rest_for_one,10,1}, Children}}.
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 7114903..b4ff8dc 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -29,6 +29,9 @@
name/1,
get_after_doc_read_fun/1,
get_before_doc_update_fun/1,
+ get_during_doc_update_fun/1,
+ get_after_db_create_fun/1,
+ get_after_db_delete_fun/1,
get_committed_update_seq/1,
get_compacted_seq/1,
get_compactor_pid/1,
@@ -153,7 +156,9 @@ create(DbName, Options) ->
#{} = Db0 ->
Db1 = maybe_add_sys_db_callbacks(Db0),
ok = fabric2_server:store(Db1),
- {ok, Db1#{tx := undefined}};
+ Db2 = Db1#{tx := undefined},
+ ok = apply_after_db_create(Db2),
+ {ok, Db2};
Error ->
Error
end.
@@ -186,6 +191,7 @@ delete(DbName, Options) ->
fabric2_fdb:delete(TxDb)
end),
if Resp /= ok -> Resp; true ->
+ ok = apply_after_db_delete(Db#{tx := undefined}),
fabric2_server:remove(DbName)
end.
@@ -262,6 +268,19 @@ get_after_doc_read_fun(#{after_doc_read := AfterDocRead}) ->
get_before_doc_update_fun(#{before_doc_update := BeforeDocUpdate}) ->
BeforeDocUpdate.
+
+get_during_doc_update_fun(#{during_doc_update := DuringDocUpdate}) ->
+ DuringDocUpdate.
+
+
+get_after_db_create_fun(#{after_db_create := AfterDbCreate}) ->
+ AfterDbCreate.
+
+
+get_after_db_delete_fun(#{after_db_delete := AfterDbDelete}) ->
+ AfterDbDelete.
+
+
get_committed_update_seq(#{} = Db) ->
get_update_seq(Db).
@@ -745,24 +764,33 @@ maybe_add_sys_db_callbacks(Db) ->
IsGlobalUsersDb = fabric2_util:dbname_ends_with(Db, <<"_users">>),
IsUsersDb = IsAuthCache orelse IsCfgUsersDb orelse IsGlobalUsersDb,
- {BDU, ADR} = if
+ {BDU, DDU, ADR, ADC, ADD} = if
IsReplicatorDb ->
{
fun couch_replicator_docs:before_doc_update/3,
- fun couch_replicator_docs:after_doc_read/2
+ fun couch_replicator_doc_processor:during_doc_update/3,
+ fun couch_replicator_docs:after_doc_read/2,
+ fun couch_replicator_doc_processor:after_db_create/1,
+ fun couch_replicator_doc_processor:after_db_delete/1
};
IsUsersDb ->
{
fun fabric2_users_db:before_doc_update/3,
- fun fabric2_users_db:after_doc_read/2
+ undefined,
+ fun fabric2_users_db:after_doc_read/2,
+ undefined,
+ undefined
};
true ->
- {undefined, undefined}
+ {undefined, undefined, undefined, undefined, undefined}
end,
Db#{
before_doc_update := BDU,
- after_doc_read := ADR
+ during_doc_update := DDU,
+ after_doc_read := ADR,
+ after_db_create := ADC,
+ after_db_delete := ADD
}.
@@ -996,6 +1024,33 @@ apply_before_doc_update(Db, Docs, Options) ->
end.
+apply_during_doc_update(#{during_doc_update := DDU} = Db, Doc, UpdateType)
+ when is_function(DDU, 3) ->
+ DDU(Doc, Db, UpdateType),
+ ok;
+
+apply_during_doc_update(#{during_doc_update := undefined}, _, _) ->
+ ok.
+
+
+apply_after_db_create(#{after_db_create := ADC} = Db)
+ when is_function(ADC, 1) ->
+ ADC(Db),
+ ok;
+
+apply_after_db_create(#{after_db_create := undefined}) ->
+ ok.
+
+
+apply_after_db_delete(#{after_db_delete := ADD} = Db)
+ when is_function(ADD, 1) ->
+ ADD(Db),
+ ok;
+
+apply_after_db_delete(#{after_db_delete := undefined}) ->
+ ok.
+
+
update_doc_int(#{} = Db, #doc{} = Doc, Options) ->
IsLocal = case Doc#doc.id of
<<?LOCAL_DOC_PREFIX, _/binary>> -> true;
@@ -1172,6 +1227,8 @@ update_doc_interactive(Db, Doc0, Future, _Options) ->
ToRemove
),
+ ok = apply_during_doc_update(Db, Doc3, interactive_edit),
+
{ok, {NewRevPos, NewRev}}.
@@ -1255,6 +1312,8 @@ update_doc_replicated(Db, Doc0, _Options) ->
ToRemove
),
+ ok = apply_during_doc_update(Db, Doc3, replicated_changes),
+
{ok, []}.
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index 670ce8b..cc513d1 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -162,7 +162,10 @@ create(#{} = Db0, Options) ->
validate_doc_update_funs => [],
before_doc_update => undefined,
+ during_doc_update => undefined,
after_doc_read => undefined,
+ after_db_create => undefined,
+ after_db_delete => undefined,
% All other db things as we add features,
db_options => Options
@@ -199,8 +202,10 @@ open(#{} = Db0, Options) ->
% bits.
validate_doc_update_funs => [],
before_doc_update => undefined,
+ during_doc_update => undefined,
after_doc_read => undefined,
-
+ after_db_create => undefined,
+ after_db_delete => undefined,
db_options => Options
},