You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2020/07/08 22:28:27 UTC
[couchdb] 02/03: WIP 4
This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch prototype/fdb-replicator-2
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 453787f12a0ac1e6388dd91c6bff1e9af68729fb
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Tue Aug 6 12:58:55 2019 -0400
WIP 4
* Replaced VDU with a simpler Erlang module
- Avoid conflict handling, updates etc
* Don't auto-create _replicator dbs
* Doc job acceptor - max acceptors vs max jobs
---
src/couch_replicator/src/couch_replicator.erl | 11 +-
src/couch_replicator/src/couch_replicator.hrl | 65 +++--
.../src/couch_replicator_acceptor.erl | 49 ++++
.../src/couch_replicator_doc_processor.erl | 286 ++++++++++++++++-----
src/couch_replicator/src/couch_replicator_docs.erl | 87 ++-----
.../src/couch_replicator_js_functions.hrl | 177 -------------
.../src/couch_replicator_scheduler_job.erl | 33 +--
.../src/couch_replicator_validate_doc.erl | 119 +++++++++
src/fabric/src/fabric2_db.erl | 2 +-
9 files changed, 474 insertions(+), 355 deletions(-)
diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index 225a258..262aacc 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -52,8 +52,8 @@
{ok, {cancelled, binary()}} |
{error, any()} |
no_return().
-replicate(PostBody, UserCtx) ->
- {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, UserCtx),
+replicate(PostBody, #user_ctx{name = UserName}) ->
+ {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, UserName),
Rep = Rep0#{<<"start_time">> => erlang:system_time()},
#{<<"id">> := RepId, <<"options">> := Options} = Rep,
case maps:get(<<"cancel">>, Options, false) of
@@ -80,9 +80,14 @@ replicate(PostBody, UserCtx) ->
% it returns `ignore`.
-spec ensure_rep_db_exists() -> ignore.
ensure_rep_db_exists() ->
- ok = couch_replicator_docs:ensure_rep_db_exists(),
couch_jobs:set_type_timeout(?REP_DOCS, ?REP_DOCS_TIMEOUT_MSEC),
couch_jobs:set_type_timeout(?REP_JOBS, ?REP_JOBS_TIMEOUT_MSEC),
+ case config:get_boolean("replicator", "create_replicator_db", false) of
+ true ->
+ ok = couch_replicator_docs:ensure_rep_db_exists();
+ false ->
+ ok
+ end,
ignore.
diff --git a/src/couch_replicator/src/couch_replicator.hrl b/src/couch_replicator/src/couch_replicator.hrl
index 6584078..050fcb4 100644
--- a/src/couch_replicator/src/couch_replicator.hrl
+++ b/src/couch_replicator/src/couch_replicator.hrl
@@ -11,38 +11,53 @@
% the License.
-define(REP_ID_VERSION, 4).
+
+% Couch jobs types and timeouts
-define(REP_DOCS, <<"repdocs">>).
-define(REP_JOBS, <<"repjobs">>).
-define(REP_DOCS_TIMEOUT_MSEC, 17000).
-define(REP_JOBS_TIMEOUT_MSEC, 33000).
--record(rep, {
- id :: rep_id() | '_' | 'undefined',
- source :: any() | '_',
- target :: any() | '_',
- options :: [_] | '_',
- user_ctx :: any() | '_',
- type = db :: atom() | '_',
- view = nil :: any() | '_',
- doc_id :: any() | '_',
- db_name = null :: null | binary() | '_',
- start_time = 0 :: integer() | '_',
- stats = couch_replicator_stats:new() :: orddict:orddict() | '_'
-}).
+% Some fields from the replication doc
+-define(SOURCE, <<"source">>).
+-define(TARGET, <<"target">>).
+-define(CREATE_TARGET, <<"create_target">>).
+-define(DOC_IDS, <<"doc_ids">>).
+-define(SELECTOR, <<"selector">>).
+-define(FILTER, <<"filter">>).
+-define(QUERY_PARAMS, <<"query_params">>).
+-define(URL, <<"url">>).
+-define(AUTH, <<"auth">>).
+-define(HEADERS, <<"headers">>).
+
+% Replication states
+-define(ST_ERROR, <<"error">>).
+-define(ST_FINISHED, <<"completed">>).
+-define(ST_RUNNING, <<"running">>).
+-define(ST_INITIALIZING, <<"initializing">>).
+-define(ST_FAILED, <<"failed">>).
+-define(ST_PENDING, <<"pending">>).
+-define(ST_ERROR, <<"error">>).
+-define(ST_CRASHING, <<"crashing">>).
+-define(ST_TRIGGERED, <<"triggered">>).
+
+% Fields couch job data objects
+-define(REP, <<"rep">>).
+-define(REP_PARSE_ERROR, <<"rep_parse_error">>).
+-define(STATE, <<"state">>).
+-define(STATE_INFO, <<"state_info">>).
+-define(DOC_STATE, <<"doc_state">>).
+-define(DB_NAME, <<"db_name">>).
+-define(DOC_ID, <<"doc_id">>).
+-define(ERROR_COUNT, <<"error_count">>).
+-define(LAST_UPDATED, <<"last_updated">>).
+
+% Accepted job message tag
+-define(ACCEPTED_JOB, accepted_job).
+
+
-type rep_id() :: binary().
-type user_name() :: binary() | null.
-type db_doc_id() :: {binary(), binary() | '_'}.
-type seconds() :: non_neg_integer().
--type rep_start_result() ::
- {ok, rep_id()} |
- ignore |
- {temporary_error, binary()} |
- {permanent_failure, binary()}.
-
-
--record(doc_worker_result, {
- id :: db_doc_id(),
- wref :: reference(),
- result :: rep_start_result()
-}).
diff --git a/src/couch_replicator/src/couch_replicator_acceptor.erl b/src/couch_replicator/src/couch_replicator_acceptor.erl
new file mode 100644
index 0000000..bd67d66
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_acceptor.erl
@@ -0,0 +1,49 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_acceptor).
+
+
+-include("couch_replicator.hrl").
+
+
+-export([
+ start/4,
+ stop/1
+]).
+
+
+start(_Type, Count, _Parent, _Opts) when Count < 1 ->
+ #{};
+
+start(Type, Count, Parent, Opts) when Count > 1 ->
+ lists:foldl(fun(_N, Acc) ->
+ Pid = spawn_link(fun() -> accept(Type, Parent, Opts) end),
+ Acc#{Pid => true}
+ end, #{}, lists:seq(1, Max)).
+
+
+stop(#{} = Acceptors) ->
+ maps:map(fun(Pid, true) ->
+ unlink(Pid),
+ exit(Pid, kill)
+ end, Acceptors),
+ ok.
+
+
+accept(Type, Parent, Opts) ->
+ case couch_jobs:accept(Type, Opts) of
+ {ok, Job, JobData} ->
+ ok = gen_server:cast(Parent, {?ACCEPTED_JOB, Job, JobData});
+ {error, not_found} ->
+ ok
+ end.
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index 6f44814..9899bef 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -29,7 +29,6 @@
-export([
during_doc_update/3,
- after_db_create/1,
after_db_delete/1
]).
@@ -56,31 +55,17 @@
-define(INITIAL_BACKOFF_EXPONENT, 64).
-define(MIN_FILTER_DELAY_SEC, 60).
--type filter_type() :: nil | view | user | docids | mango.
-type repstate() :: initializing | error | scheduled.
--record(rdoc, {
- id :: db_doc_id() | '_' | {any(), '_'},
- state :: repstate() | '_',
- rep :: #rep{} | nil | '_',
- rid :: rep_id() | nil | '_',
- filter :: filter_type() | '_',
- info :: binary() | nil | '_',
- errcnt :: non_neg_integer() | '_',
- worker :: reference() | nil | '_',
- last_updated :: erlang:timestamp() | '_'
-}).
+-define(MAX_ACCEPTORS, 10).
+-define(MAX_JOBS, 500).
during_doc_update(#doc{} = Doc, Db, _UpdateType) ->
couch_stats:increment_counter([couch_replicator, docs, db_changes]),
ok = process_change(Db, Doc).
-after_db_create(#{name := DbName}) ->
- couch_stats:increment_counter([couch_replicator, docs, dbs_created]),
- couch_replicator_docs:ensure_rep_ddoc_exists(DbName).
-
after_db_delete(#{name := DbName}) ->
couch_stats:increment_counter([couch_replicator, docs, dbs_deleted]),
@@ -112,16 +97,16 @@ process_change(#{name := DbName} = Db, #doc{} = Doc) ->
DocState = get_json_value(<<"_replication_state">>, Props, null),
case couch_jobs:get_job_data(Db, ?REP_DOCS, docs_job_id(DbName, DocId)) of
{error, not_found} ->
- update_replication_job(Db, DbName, DocId, Rep, RepError, DocState);
- {ok, #{<<"rep">> := null, <<"rep_parse_error">> := RepError}}
+ add_rep_doc_job(Db, DbName, DocId, Rep, RepError, DocState);
+ {ok, #{?REP := null, ?REP_PARSE_ERROR := RepError}}
when Rep =:= null ->
% Same error as before occurred, don't bother updating the job
ok;
- {ok, #{<<"rep">> := null}} when Rep =:= null ->
+ {ok, #{?REP := null}} when Rep =:= null ->
% Error occured but it's a different error. Update the job so user
% sees the new error
- update_replication_job(Db, DbName, DocId, Rep, RepError, DocState);
- {ok, #{<<"rep">> := OldRep, <<"rep_parse_error">> := OldError}} ->
+ add_rep_doc_job(Db, DbName, DocId, Rep, RepError, DocState);
+ {ok, #{?REP := OldRep, ?REP_PARSE_ERROR := OldError}} ->
NormOldRep = couch_replicator_util:normalize_rep(OldRep),
NormRep = couch_replicator_util:normalize_rep(Rep),
case NormOldRep == NormRep of
@@ -130,12 +115,14 @@ process_change(#{name := DbName} = Db, #doc{} = Doc) ->
% for the replication job have changed, so make it a no-op
ok;
false ->
- update_replication_job(Db, DbName, DocId, Rep, RepError,
+ add_rep_doc_job(Db, DbName, DocId, Rep, RepError,
DocState)
end
end.
+
+
rep_docs_job_execute(#{} = Job, #{<<"rep">> := null} = JobData) ->
#{
<<"rep_parse_error">> := Error,
@@ -206,55 +193,228 @@ process_updated({DbName, _DocId} = Id, JsonRepDoc) ->
gen_server:call(?MODULE, {updated, Id, Rep, Filter}, infinity).
-% Doc processor gen_server API and callbacks
-
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
- ?MODULE = ets:new(?MODULE, [named_table, {keypos, #rdoc.id},
- {read_concurrency, true}, {write_concurrency, true}]),
- {ok, nil}.
+ process_flag(trap_exit, true),
+ St = #{
+ acceptors => #{},
+ workers => #{}
+ },
+ {ok, update_config(St), 0}.
-terminate(_Reason, _State) ->
+terminate(_Reason, #{} = St) ->
+ #{workers := Workers, acceptors := Acceptors},
+ lists:foreach(fun(WPid) -> unlink(Pid), exit(Pid, 9) end, Workers),
+ couch_replicator_job_acceptor:stop(Acceptors),
ok.
-handle_call({updated, Id, Rep, Filter}, _From, State) ->
- ok = updated_doc(Id, Rep, Filter),
- {reply, ok, State};
+handle_call(Msg, _From, #{} = St) ->
+ {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
-handle_call({removed, Id}, _From, State) ->
- ok = removed_doc(Id),
- {reply, ok, State};
+handle_cast({?ACCEPTED_JOB, Job, JobData}, #{} = St) ->
+ {noreply, execute_rep_doc_job(Job, JobData, St)};
-handle_call({completed, Id}, _From, State) ->
- true = ets:delete(?MODULE, Id),
- {reply, ok, State};
+handle_cast(Msg, #{} = St) ->
+ {stop, {bad_cast, Msg}, St}.
-handle_call({clean_up_replications, DbName}, _From, State) ->
- ok = removed_db(DbName),
- {reply, ok, State}.
-handle_cast(Msg, State) ->
- {stop, {error, unexpected_message, Msg}, State}.
+handle_info({'EXIT', Pid, Reason}, #{} = St) ->
+ #{workers := Workers, acceptors := Acceptors} = St,
+ case {maps:is_key(Pid, Acceptors), maps:is_key(Pid, Workers)} of
+ {false, false} -> handle_unknown_pid(Pid, Reason, St);
+ {true, false} -> handle_acceptor_died(Pid, Reason, St);
+ {false, true} -> handle_worker_died(Pid, Reason, St)
+ end;
+
+handle_info(timeout, #{} = St) ->
+ {noreply, maybe_start_acceptors(St)};
+handle_info(Msg, St) ->
+ {stop, {bad_info, Msg}, St}.
-handle_info({'DOWN', _, _, _, #doc_worker_result{id = Id, wref = Ref,
- result = Res}}, State) ->
- ok = worker_returned(Ref, Id, Res),
- {noreply, State};
-handle_info(_Msg, State) ->
- {noreply, State}.
+%% handle_info({'DOWN', _, _, _, #doc_worker_result{id = Id, wref = Ref,
+%% result = Res}}, State) ->
+%% ok = worker_returned(Ref, Id, Res),
+%% {noreply, State};
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+update_config(#{} = St) ->
+ MaxJobs = config:get_integer("replicator",
+ "max_doc_processor_jobs", ?MAX_JOBS),
+ MaxAcceptors = config:get_integer("replicator",
+ "max_doc_processor_acceptors", ?MAX_ACCEPTORS),
+ AcceptTimeoutSec = config:get_integer("replicator",
+ "doc_processor_accept_timeout_sec", ?ACCEPT_TIMEOUT_SEC),
+ AcceptFudgeSec = config:get_integer("replicator",
+ "doc_processor_accept_fudge_sec", ?ACCEPT_FUDGE_SEC),
+ St#{
+ max_jobs => MaxJobs,
+ max_acceptors => MaxAcceptors,
+ accept_timeout_sec => AcceptTimeoutSec,
+ accept_fudge_sec => AcceptFudgeSec,
+ }.
+
+
+handle_acceptor_died(Pid, normal, #{acceptors := Acceptors} = St1) ->
+ St2 = St1#{acceptors := maps:remove(Pid, Acceptors)},
+ St3 = update_config(St2),
+ St4 = maybe_start_acceptors(St3),
+ {noreply, St4};
+
+handle_acceptor_died(Pid, Error, #{acceptors := Acceptors} = St) ->
+ Msg = "~p : acceptor ~p died with ~p",
+ couch_log:error(Msg, [?ERROR, Pid, Reason]),
+ {stop, {acceptor_pid_exit, Pid, Reason}, St}.
+
+
+handle_worker_died(Pid, normal, #{workers := Workers} = St1) ->
+ St2 = St1#{workers := maps:remove(Pid, Workers)},
+ St3 = maybe_start_acceptors(St2),
+ {noreply, St3};
+
+handle_worker_died(Pid, Error, #{workers := Workers} = St) ->
+ Msg = "~p : acceptor ~p died with ~p",
+ couch_log:error(Msg, [?ERROR, Pid, Reason]),
+ {stop, {worker_pid_exit, Pid, Reason}, St}.
+
+
+handle_unknown_pid(Pid, Reason, #{} = St) ->
+ Msg = "~p : unknown pid ~p died with ~p",
+ couch_log:error(Msg, [?MODULE, Pid, Reason]),
+ {stop, {unknown_pid_exit, Pid, Reason}, St};
+
+
+maybe_start_acceptors(#st{} = St1) when
+ St2 = update_config(St1),
+ #{
+ workers := Workers,
+ acceptors := Acceptors,
+ max_jobs := MaxJobs,
+ max_acceptors := MaxAcceptors
+ } = St2,
+ WCount = map_size(Workers),
+ ACount = map_size(Acceptors),
+ case ACount + WCount < MaxJobs of
+ true -> start_acceptors(MaxAcceptors - ACount, St2);
+ false -> St2
+ end.
+
+
+start_acceptors(N, #st{} = St) ->
+ St#{
+ acceptors := Acceptors,
+ accept_timeout_sec := TimeoutSec,
+ accept_fudge_sec := FudgeSec
+ },
+ Opts = #{
+ timeout = TimeoutSec,
+ max_sched_time = erlang:system_time(second) + FudgeSec
+ },
+ Pids = couch_replicator_acceptor:start(?REP_DOCS, N, self(), Opts),
+ St#{acceptors := maps:merge(Acceptors, Pids)}.
+
+
+start_worker(Job, #{} = JobData, #{workers := Workers} = St) ->
+ Parent = self(),
+ Pid = spawn_link(fun() -> rep_doc_job_worker(Job, JobData, Parent) end),
+ St#{workers := Workers#{Pid => true}}.
+
+
+rep_doc_job_worker(Job, #{?REP := null} = RepDocData, Parent) ->
+ #{
+ ?REP_PARSE_ERROR := Error,
+ ?DB_NAME := DbName,
+ ?DOC_ID := DocId
+ ?ERROR_COUNT := ErrorCount,
+ } = RepDocData,
+ RepDocData1 = RepDocData#{
+ ?LAST_UPDATED := erlang:system_time(),
+ ?ERROR_COUNT := ErrorCount + 1,
+ ?FINISHED_STATE := ?FAILED,
+ ?FINISHED_RESULT := Error,
+ },
+ case couch_jobs:finish(undefined, Job, RepDocData1) of
+ ok ->
+ couch_replicator_docs:update_failed(DbName, DocId, Error),
+ St;
+ {error, JobError} ->
+ Msg = "~p : replication ~s job could not finish. JobError:~p",
+ couch_log:error(Msg, [?MODULE, RepId, JobError]),
+ St
+ end;
+
+rep_doc_job_worker(Job, #{?REP := #{}} = RepDocData, Parent) ->
+ #{
+ ?REP := Rep1,
+ ?DOC_STATE := DocState,
+ } = RepDocData,
+ ok = remove_old_state_fields(Rep, DocState),
+ % Calculate replication ID. In most case this is fast and easy
+ % but for filtered replications with user JS filters this
+ % means making a remote connection to the source db.
+ Rep2 = #{<<"id">> := RepId} = couch_replicator_docs:update_rep_id(Rep1),
+ try
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+ maybe_start_replication_job(JTx, Rep)
+ end)
+ catch
+ throw:{job_start_error, halt} ->
+ Msg = "~p : replication doc job ~s lock conflict",
+ couch_log:error(Msg, [?MODULE, RepId]),
+ exit(normal);
+ throw:{job_start_error, Error} ->
+ Msg = "~p : replication job start failed ~slock conflict",
+ couch_log:error(Msg, [?MODULE, RepId])
+ end.
+
+
+maybe_start_replication_job(JTx, Rep) ->
+ case couch_jobs:get_job_data(JTx, ?REP_JOBS, RepId) of
+ {error, not_found} ->
+ RepJobData = #{
+ ?REP := Rep2,
+ ?STATE := ST_PENDING,
+ ?STATE_INFO := null,
+ },
+ ok = couch_jobs:add(JTx, ?REP_JOBS, RepJobData),
+ RepDocData1 = RepDocData#{
+ ?REP := Rep2,
+ ?STATE := ST_SCHEDULED,
+ ?STATE_INFO := null,
+ ?ERROR_COUNT := 0,
+ ?LAST_UPDATED => erlang:system_info()
+ },
+ case couch_jobs:finish(JTx, Job, RepDocData1) of
+ ok -> ok;
+ Error -> throw({job_start_error, Error})
+
+remove_old_state_fields(#{} = Rep, ?TRIGGERED) ->
+ case update_docs() of
+ true -> ok;
+ false -> couch_replicator_docs:remove_state_fields(DbName, DocId)
+ end;
+
+remove_old_state_fields(#{} = Rep, ?ERROR) ->
+ case update_docs() of
+ true -> ok;
+ false -> couch_replicator_docs:remove_state_fields(DbName, DocId)
+ end;
+
+maybe_remove_old_state_fields(#{} = _Rep, _) ->
+ ok.
+
+
% Doc processor gen_server private helper functions
% Handle doc update -- add to ets, then start a worker to try to turn it into
@@ -592,20 +752,24 @@ ejson_doc_state_filter(State, States) when is_list(States), is_atom(State) ->
lists:member(State, States).
--spec update_replication(any(), binary(), binary(), #{} | null,
+-spec add_rep_doc_job(any(), binary(), binary(), #{} | null,
binary() | null, binary() | null) -> ok.
-update_replication_job(Tx, DbName, DocId, Rep, RepParseError, DocState) ->
+add_rep_doc_job(Tx, DbName, DocId, Rep, RepParseError, DocState) ->
JobId = docs_job_id(DbName, DocId),
ok = remove_replication_by_doc_job_id(Db, JobId),
- RepDocsJob = #{
- <<"rep_id">> := null,
- <<"db_name">> := DbName,
- <<"doc_id">> := DocId,
- <<"rep">> := Rep,
- <<"rep_parse_error">> := RepParseError,
- <<"doc_state">> := DocState
+ RepDocData = #{
+ ?REP => Rep,
+ ?REP_PARSE_ERROR => RepParseError,
+ ?DOC_STATE => DocState,
+ ?DB_NAME => DbName,
+ ?DOC_ID => DocId,
+ ?STATE => ST_INITIALIZING,
+ ?ERROR_COUNT => 0,
+ ?LAST_UPDATED => erlang:system_time(),
+ ?STATE => null,
+ ?STATE_INFO => null.
},
- ok = couch_jobs:add(Tx, ?REP_DOCS, RepDocsJob).
+ ok = couch_jobs:add(Tx, ?REP_DOCS, RepDocData).
docs_job_id(DbName, Id) when is_binary(DbName), is_binary(Id) ->
@@ -617,10 +781,10 @@ remove_replication_by_doc_job_id(Tx, Id) ->
case couch_jobs:get_job_data(Tx, ?REP_DOCS, Id) of
{error, not_found} ->
ok;
- {ok, #{<<"rep_id">> := null}} ->
+ {ok, #{?REP := {<<"id">> := null}}} ->
couch_jobs:remove(Tx, ?REP_DOCS, Id),
ok;
- {ok, #{<<"rep_id">> := RepId}} ->
+ {ok, #{?REP := {<<"id">> := RepId}}} ->
couch_jobs:remove(Tx, ?REP_JOBS, RepId),
couch_jobs:remove(Tx, ?REP_DOCS, Id),
ok
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index cdc6a10..eaf8161 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -138,70 +138,17 @@ update_error(#rep{} = Rep, Error) ->
-spec ensure_rep_db_exists() -> ok.
ensure_rep_db_exists() ->
- Opts = [?CTX, sys_db, nologifmissing],
- case fabric2_db:create(?REP_DB_NAME, Opts) of
- {error, file_exists} ->
- ok;
- {ok, _Db} ->
- ok
+ Opts = [?CTX, sys_db],
+ case fabric2_db:create(?REP_DB_NAME, [?CTX, sys_db]) of
+ {error, file_exists} -> ok;
+ {ok, _Db} -> ok
end.
--spec ensure_rep_ddoc_exists(binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb) ->
- DDocId = ?REP_DESIGN_DOC,
- case open_rep_doc(RepDb, DDocId) of
- {not_found, database_does_not_exist} ->
- %% database was deleted.
- ok;
- {not_found, _Reason} ->
- DocProps = replication_design_doc_props(DDocId),
- DDoc = couch_doc:from_json_obj({DocProps}),
- couch_log:notice("creating replicator ddoc ~p", [RepDb]),
- {ok, _Rev} = save_rep_doc(RepDb, DDoc);
- {ok, Doc} ->
- Latest = replication_design_doc_props(DDocId),
- {Props0} = couch_doc:to_json_obj(Doc, []),
- {value, {_, Rev}, Props} = lists:keytake(<<"_rev">>, 1, Props0),
- case compare_ejson({Props}, {Latest}) of
- true ->
- ok;
- false ->
- LatestWithRev = [{<<"_rev">>, Rev} | Latest],
- DDoc = couch_doc:from_json_obj({LatestWithRev}),
- couch_log:notice("updating replicator ddoc ~p", [RepDb]),
- try
- {ok, _} = save_rep_doc(RepDb, DDoc)
- catch
- throw:conflict ->
- %% ignore, we'll retry next time
- ok
- end
- end
- end,
- ok.
-
-
--spec compare_ejson({[_]}, {[_]}) -> boolean().
-compare_ejson(EJson1, EJson2) ->
- EjsonSorted1 = couch_replicator_filters:ejsort(EJson1),
- EjsonSorted2 = couch_replicator_filters:ejsort(EJson2),
- EjsonSorted1 == EjsonSorted2.
-
-
--spec replication_design_doc_props(binary()) -> [_].
-replication_design_doc_props(DDocId) ->
- [
- {<<"_id">>, DDocId},
- {<<"language">>, <<"javascript">>},
- {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
- ].
-
-
-spec parse_rep_doc_without_id({[_]}) -> #{}.
parse_rep_doc_without_id(RepDoc) ->
{ok, Rep} = try
- parse_rep_doc_without_id(RepDoc, rep_user_name(RepDoc))
+ parse_rep_doc_without_id(RepDoc, null)
catch
throw:{error, Reason} ->
throw({bad_rep_doc, Reason});
@@ -358,14 +305,6 @@ save_rep_doc(DbName, Doc) ->
end.
--spec rep_user_name({[_]}) -> binary() | null.
-rep_user_name({RepDoc}) ->
- case get_json_value(<<"user_ctx">>, RepDoc) of
- undefined -> null;
- {UserCtx} -> get_json_value(<<"name">>, UserCtx, null)
- end.
-
-
-spec parse_rep_db(#{}, #{}, #{}) -> #{}.
parse_rep_db(#{} = Endpoint, #{} = ProxyParams, #{} = Options) ->
ProxyURL = case ProxyParams of
@@ -634,7 +573,8 @@ before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
roles = Roles,
name = Name
} = fabric2_db:get_user_ctx(Db),
- case lists:member(<<"_replicator">>, Roles) of
+ IsReplicator = case lists:member(<<"_replicator">>, Roles),
+ Doc1 = case IsReplicator of
true ->
Doc;
false ->
@@ -649,12 +589,21 @@ before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
ok ->
Doc;
- _ ->
+ false ->
throw({forbidden, <<"Can't update replication documents",
" from other users.">>})
end
end
- end.
+ end,
+ case IsReplicator orelse Doc1#doc.deleted of
+ true ->
+ ok; % If replicator or deleting don't validate doc body
+ false ->
+ % Encode as a map and normalize all field names as binaries
+ BMap = ?JSON_DECODE(?JSON_ENCODE(Doc1#doc.body)),
+ couch_replicator_validate_doc:validate(BMap)
+ end,
+ Doc1.
-spec after_doc_read(#doc{}, Db::any()) -> #doc{}.
diff --git a/src/couch_replicator/src/couch_replicator_js_functions.hrl b/src/couch_replicator/src/couch_replicator_js_functions.hrl
deleted file mode 100644
index d410433..0000000
--- a/src/couch_replicator/src/couch_replicator_js_functions.hrl
+++ /dev/null
@@ -1,177 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--define(REP_DB_DOC_VALIDATE_FUN, <<"
- function(newDoc, oldDoc, userCtx) {
- function reportError(error_msg) {
- log('Error writing document `' + newDoc._id +
- '\\' to the replicator database: ' + error_msg);
- throw({forbidden: error_msg});
- }
-
- function validateEndpoint(endpoint, fieldName) {
- if ((typeof endpoint !== 'string') &&
- ((typeof endpoint !== 'object') || (endpoint === null))) {
-
- reportError('The `' + fieldName + '\\' property must exist' +
- ' and be either a string or an object.');
- }
-
- if (typeof endpoint === 'object') {
- if ((typeof endpoint.url !== 'string') || !endpoint.url) {
- reportError('The url property must exist in the `' +
- fieldName + '\\' field and must be a non-empty string.');
- }
-
- if ((typeof endpoint.auth !== 'undefined') &&
- ((typeof endpoint.auth !== 'object') ||
- endpoint.auth === null)) {
-
- reportError('`' + fieldName +
- '.auth\\' must be a non-null object.');
- }
-
- if ((typeof endpoint.headers !== 'undefined') &&
- ((typeof endpoint.headers !== 'object') ||
- endpoint.headers === null)) {
-
- reportError('`' + fieldName +
- '.headers\\' must be a non-null object.');
- }
- }
- }
-
- var isReplicator = (userCtx.roles.indexOf('_replicator') >= 0);
- var isAdmin = (userCtx.roles.indexOf('_admin') >= 0);
-
- if (isReplicator) {
- // Always let replicator update the replication document
- return;
- }
-
- if (newDoc._replication_state === 'failed') {
- // Skip validation in case when we update the document with the
- // failed state. In this case it might be malformed. However,
- // replicator will not pay attention to failed documents so this
- // is safe.
- return;
- }
-
- if (!newDoc._deleted) {
- validateEndpoint(newDoc.source, 'source');
- validateEndpoint(newDoc.target, 'target');
-
- if ((typeof newDoc.create_target !== 'undefined') &&
- (typeof newDoc.create_target !== 'boolean')) {
-
- reportError('The `create_target\\' field must be a boolean.');
- }
-
- if ((typeof newDoc.continuous !== 'undefined') &&
- (typeof newDoc.continuous !== 'boolean')) {
-
- reportError('The `continuous\\' field must be a boolean.');
- }
-
- if ((typeof newDoc.doc_ids !== 'undefined') &&
- !isArray(newDoc.doc_ids)) {
-
- reportError('The `doc_ids\\' field must be an array of strings.');
- }
-
- if ((typeof newDoc.selector !== 'undefined') &&
- (typeof newDoc.selector !== 'object')) {
-
- reportError('The `selector\\' field must be an object.');
- }
-
- if ((typeof newDoc.filter !== 'undefined') &&
- ((typeof newDoc.filter !== 'string') || !newDoc.filter)) {
-
- reportError('The `filter\\' field must be a non-empty string.');
- }
-
- if ((typeof newDoc.doc_ids !== 'undefined') &&
- (typeof newDoc.selector !== 'undefined')) {
-
- reportError('`doc_ids\\' field is incompatible with `selector\\'.');
- }
-
- if ( ((typeof newDoc.doc_ids !== 'undefined') ||
- (typeof newDoc.selector !== 'undefined')) &&
- (typeof newDoc.filter !== 'undefined') ) {
-
- reportError('`filter\\' field is incompatible with `selector\\' and `doc_ids\\'.');
- }
-
- if ((typeof newDoc.query_params !== 'undefined') &&
- ((typeof newDoc.query_params !== 'object') ||
- newDoc.query_params === null)) {
-
- reportError('The `query_params\\' field must be an object.');
- }
-
- if (newDoc.user_ctx) {
- var user_ctx = newDoc.user_ctx;
-
- if ((typeof user_ctx !== 'object') || (user_ctx === null)) {
- reportError('The `user_ctx\\' property must be a ' +
- 'non-null object.');
- }
-
- if (!(user_ctx.name === null ||
- (typeof user_ctx.name === 'undefined') ||
- ((typeof user_ctx.name === 'string') &&
- user_ctx.name.length > 0))) {
-
- reportError('The `user_ctx.name\\' property must be a ' +
- 'non-empty string or null.');
- }
-
- if (!isAdmin && (user_ctx.name !== userCtx.name)) {
- reportError('The given `user_ctx.name\\' is not valid');
- }
-
- if (user_ctx.roles && !isArray(user_ctx.roles)) {
- reportError('The `user_ctx.roles\\' property must be ' +
- 'an array of strings.');
- }
-
- if (!isAdmin && user_ctx.roles) {
- for (var i = 0; i < user_ctx.roles.length; i++) {
- var role = user_ctx.roles[i];
-
- if (typeof role !== 'string' || role.length === 0) {
- reportError('Roles must be non-empty strings.');
- }
- if (userCtx.roles.indexOf(role) === -1) {
- reportError('Invalid role (`' + role +
- '\\') in the `user_ctx\\'');
- }
- }
- }
- } else {
- if (!isAdmin) {
- reportError('The `user_ctx\\' property is missing (it is ' +
- 'optional for admins only).');
- }
- }
- } else {
- if (!isAdmin) {
- if (!oldDoc.user_ctx || (oldDoc.user_ctx.name !== userCtx.name)) {
- reportError('Replication documents can only be deleted by ' +
- 'admins or by the users who created them.');
- }
- }
- }
- }
-">>).
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index e8ddc84..0e1c3d9 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -87,8 +87,8 @@ start_link(#{] = Job, #{} = JobData) ->
{ok, Pid} ->
{ok, Pid};
{error, Reason} ->
- #{<<"rep">> := Rep} = JobData,
- {<<"id">> := Id, <<"source">> := Src, <<"target">> := Ttg} = Rep,
+ #{?REP := Rep} = JobData,
+ {<<"id">> := Id, ?SOURCE := Src, ?TARGET := Ttg} = Rep,
Source = couch_replicator_api_wrap:db_uri(Src),
Target = couch_replicator_api_wrap:db_uri(Tgt),
ErrMsg = "failed to start replication `~s` (`~s` -> `~s`)",
@@ -332,18 +332,18 @@ terminate(shutdown, #rep_state{id = RepId} = State) ->
terminate({shutdown, max_backoff}, {error, {#{} = Job, #{} = JobData}}) ->
% Here we handle the case when replication fails during initialization.
% That is before the #rep_state{} is even built.
- #{<<"rep">> := #{<<"id">> := RepId}} = JobData,
+ #{?REP := #{<<"id">> := RepId}} = JobData,
couch_stats:increment_counter([couch_replicator, failed_starts]),
couch_log:warning("Replication `~s` reached max backoff ", [RepId]),
finish_couch_job(Job, JobData, <<"error">>, max_backoff);
terminate({shutdown, {error, Error}}, {error, Class, Stack, {Job, JobData}}) ->
% Here we handle the case when replication fails during initialization.
- #{<<"rep">> := Rep} = JobData,
+ #{?REP := Rep} = JobData,
#{
<<"id">> := Id,
- <<"source">> := Source0,
- <<"target">> := Target0,
+ ?SOURCE := Source0,
+ ?TARGET := Target0,
<<"doc_id">> := DocId,
<<"db_name">> := DbName
} = Rep,
@@ -525,7 +525,7 @@ finish_couch_job(#rep_state{} = State, FinishedState, Result) ->
finish_couch_job(#{} = Job, #{} = JobData, FinishState, Result0) ->
- #{<<"rep">> := #{<<"id">> := RepId}} = JobData,
+ #{?REP := #{<<"id">> := RepId}} = JobData,
case Result of
null -> null;
#{} -> Result0;
@@ -534,8 +534,8 @@ finish_couch_job(#{} = Job, #{} = JobData, FinishState, Result0) ->
Other -> couch_replicator_utils:rep_error_to_binary(Result0)
end,
JobData= JobData0#{
- <<"finished_state">> => FinishState,
- <<"finished_result">> => Result
+ ?FINISHED_STATE => FinishState,
+ ?FINISHED_RESULT => Result
},
case couch_jobs:finish(undefined, Job, JobData) of
ok ->
@@ -566,18 +566,17 @@ cancel_timer(#rep_state{timer = Timer} = State) ->
State#rep_state{timer = nil}.
-init_state(#{} = Job, #{<<"rep">> =: Rep}} = JobData) ->
+init_state(#{} = Job, #{?REP =: Rep}} = JobData) ->
#{
<<"id">> := Id,
<<"base_id">> := BaseId,
- <<"source">> := Src0,
- <<"target">> := Tgt,
+ ?SOURCE := Src0,
+ ?TARGET := Tgt,
<<"type">> := Type,
<<"view">> := View,
<<"start_time">> := StartTime,
<<"stats">> := Stats,
<<"options">> := OptionsMap,
- <<"user_ctx">> := UserCtx,
<<"db_name">> := DbName,
<<"doc_id">> := DocId,
} = Rep,
@@ -589,13 +588,9 @@ init_state(#{} = Job, #{<<"rep">> =: Rep}} = JobData) ->
% Adjust minimum number of http source connections to 2 to avoid deadlock
Src = adjust_maxconn(Src0, BaseId),
{ok, Source} = couch_replicator_api_wrap:db_open(Src),
- {CreateTargetParams} = get_value(create_target_params, Options, {[]}),
- {ok, Target} = couch_replicator_api_wrap:db_open(Tgt,
-
CreateTgt = get_value(create_target, Options, false),
- CreateParams = maps:to_list(get_value(create_target_params, Options, #{}),
- {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, UserCtx, CreateTgt,
- CreateParams),
+ TParams = maps:to_list(get_value(create_target_params, Options, #{}),
+ {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, CreateTgt, TParams),
{ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source),
{ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target),
diff --git a/src/couch_replicator/src/couch_replicator_validate_doc.erl b/src/couch_replicator/src/couch_replicator_validate_doc.erl
new file mode 100644
index 0000000..da14361
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_validate_doc.erl
@@ -0,0 +1,119 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_validate_doc).
+
+
+-include("couch_replicator.hrl").
+
+
+-export([
+ validate/2
+]).
+
+
+validate(#{<<"_replication_state">> := ?ST_FAILED}) ->
+ % If replication failed, allow updating even a malformed document
+ ok;
+
+validate(#{?SOURCE := _, ?TARGET := _} = Doc) ->
+ maps:fold(fun validate_field/2, Doc).
+ validate_mutually_exclusive_filters(Doc);
+
+validate(_) ->
+ fail("Both `source` and `target` fields must exist").
+
+
+validate_field(?SOURCE, V) when is_binary(V) ->
+ ok;
+validate_field(?SOURCE, #{?URL := _} = V) ->
+ maps:fold(fun validate_endpoint_field/2, V);
+validate_field(?SOURCE, _V) ->
+ fail("`source` must be a string or an object with an `url` field").
+
+validate_field(?TARGET, V) when is_binary(V) ->
+ ok;
+validate_field(?TARGET, #{?URL := _} = V) ->
+ maps:fold(fun validate_endpoint_field/2, V);
+validate_field(?TARGET, _V) ->
+ fail("`target` must be a string or an object with an `url` field").
+
+validate_field(?CONTINUOUS, V) when is_boolean(V) ->
+ ok;
+validate_field(?CONTINUOUS, _V) ->
+ fail("`continuous` should be a boolean");
+
+validate_field(?CREATE_TARGET, V) when is_boolean(V) ->
+ ok;
+validate_field(?CREATE_TARGET, _V) ->
+ fail("`create_target` should be a boolean");
+
+validate_field(?DOC_IDS, V) when is_list(V) ->
+ lists:foreach(fun(DocId) ->
+ case is_binary(DocId), byte_size(V) > 1 of
+ true -> ok;
+ false -> fail("`doc_ids` should be a list of strings")
+ end
+ end, V);
+validate_field(?DOC_IDS, _V) ->
+ fail("`doc_ids` should be a list of string");
+
+validate_field(?SELECTOR, #{} = _V) ->
+ ok;
+validate_field(?SELECTOR, _V) ->
+ fail("`selector` should be an object");
+
+validate_field(?FILTER, V) when is_binary(V), byte_size(V) > 1 ->
+ ok;
+validate_field(?FILTER, _V) ->
+ fail("`filter` should be a non-empty string");
+
+validate_field(?QUERY_PARAMS, V) when is_map(V) orelse V =:= null ->
+ ok;
+validate_field(?QUERY_PARAMS, _V) ->
+ fail("`query_params` should be a an object or `null`").
+
+
+validate_endpoint_field(?URL, V) when is_binary(V) ->
+ ok;
+validate_endpoint_field(?URL, _V) ->
+ fail("`url` endpoint field must be a string");
+
+validate_endpoint_field(?AUTH, #{} = V) ->
+ ok;
+validate_endpoint_field(?AUTH, _V) ->
+ fail("`auth` endpoint field must be an object");
+
+validate_endpoint_field(?HEADERS, #{} = V) ->
+ ok;
+validate_endpoint_field(?HEADERS, _V) ->
+ fail("`headers` endpoint field must be an object").
+
+
+validate_mutually_exclusive_filter(#{} = Doc) ->
+ DocIds = maps:get(?DOC_IDS, Doc, undefined),
+ Selector = maps:get(?SELECTOR, Doc, undefined),
+ Filter = maps:get(?FILTER, Doc, undefined),
+ Defined = [V || V <- [DocIds, Selector, Filter], V =/= undefined],
+ case length(Defined) > 1 of
+ true ->
+ fail("`doc_ids`, `selector` and `filter` are mutually exclusive");
+ false ->
+ ok
+ end.
+
+
+fail(Msg) when is_list(Msg) ->
+ fail(list_to_binary(Msg));
+
+fail(Msg) when is_binary(Msg) ->
+ throw({forbidden, Msg}).
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 9ec9f2b..ea32531 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -787,7 +787,7 @@ maybe_add_sys_db_callbacks(Db) ->
fun couch_replicator_docs:before_doc_update/3,
fun couch_replicator_doc_processor:during_doc_update/3,
fun couch_replicator_docs:after_doc_read/2,
- fun couch_replicator_doc_processor:after_db_create/1,
+ undefined,
fun couch_replicator_doc_processor:after_db_delete/1
};
IsUsersDb ->