You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2020/07/08 22:28:27 UTC

[couchdb] 02/03: WIP 4

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch prototype/fdb-replicator-2
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 453787f12a0ac1e6388dd91c6bff1e9af68729fb
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Tue Aug 6 12:58:55 2019 -0400

    WIP 4
    
     * Replaced VDU with a simpler Erlang module
       - Avoid conflict handling, updates etc
     * Don't auto-create _replicator dbs
     * Doc job acceptor - max acceptors vs max jobs
---
 src/couch_replicator/src/couch_replicator.erl      |  11 +-
 src/couch_replicator/src/couch_replicator.hrl      |  65 +++--
 .../src/couch_replicator_acceptor.erl              |  49 ++++
 .../src/couch_replicator_doc_processor.erl         | 286 ++++++++++++++++-----
 src/couch_replicator/src/couch_replicator_docs.erl |  87 ++-----
 .../src/couch_replicator_js_functions.hrl          | 177 -------------
 .../src/couch_replicator_scheduler_job.erl         |  33 +--
 .../src/couch_replicator_validate_doc.erl          | 119 +++++++++
 src/fabric/src/fabric2_db.erl                      |   2 +-
 9 files changed, 474 insertions(+), 355 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index 225a258..262aacc 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -52,8 +52,8 @@
     {ok, {cancelled, binary()}} |
     {error, any()} |
     no_return().
-replicate(PostBody, UserCtx) ->
-    {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, UserCtx),
+replicate(PostBody, #user_ctx{name = UserName}) ->
+    {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, UserName),
     Rep = Rep0#{<<"start_time">> => erlang:system_time()},
     #{<<"id">> := RepId, <<"options">> := Options} = Rep,
     case maps:get(<<"cancel">>, Options, false) of
@@ -80,9 +80,14 @@ replicate(PostBody, UserCtx) ->
 % it returns `ignore`.
 -spec ensure_rep_db_exists() -> ignore.
 ensure_rep_db_exists() ->
-    ok = couch_replicator_docs:ensure_rep_db_exists(),
     couch_jobs:set_type_timeout(?REP_DOCS, ?REP_DOCS_TIMEOUT_MSEC),
     couch_jobs:set_type_timeout(?REP_JOBS, ?REP_JOBS_TIMEOUT_MSEC),
+    case config:get_boolean("replicator", "create_replicator_db", false) of
+        true ->
+            ok = couch_replicator_docs:ensure_rep_db_exists();
+        false ->
+            ok
+    end,
     ignore.
 
 
diff --git a/src/couch_replicator/src/couch_replicator.hrl b/src/couch_replicator/src/couch_replicator.hrl
index 6584078..050fcb4 100644
--- a/src/couch_replicator/src/couch_replicator.hrl
+++ b/src/couch_replicator/src/couch_replicator.hrl
@@ -11,38 +11,53 @@
 % the License.
 
 -define(REP_ID_VERSION, 4).
+
+% Couch jobs types and timeouts
 -define(REP_DOCS, <<"repdocs">>).
 -define(REP_JOBS, <<"repjobs">>).
 -define(REP_DOCS_TIMEOUT_MSEC, 17000).
 -define(REP_JOBS_TIMEOUT_MSEC, 33000).
 
--record(rep, {
-    id :: rep_id() | '_' | 'undefined',
-    source :: any() | '_',
-    target :: any() | '_',
-    options :: [_] | '_',
-    user_ctx :: any() | '_',
-    type = db :: atom() | '_',
-    view = nil :: any() | '_',
-    doc_id :: any() | '_',
-    db_name = null :: null | binary() | '_',
-    start_time = 0 :: integer() | '_',
-    stats = couch_replicator_stats:new() :: orddict:orddict() | '_'
-}).
+% Some fields from the replication doc
+-define(SOURCE, <<"source">>).
+-define(TARGET, <<"target">>).
+-define(CREATE_TARGET, <<"create_target">>).
+-define(DOC_IDS, <<"doc_ids">>).
+-define(SELECTOR, <<"selector">>).
+-define(FILTER, <<"filter">>).
+-define(QUERY_PARAMS, <<"query_params">>).
+-define(URL, <<"url">>).
+-define(AUTH, <<"auth">>).
+-define(HEADERS, <<"headers">>).
+
+% Replication states
+-define(ST_ERROR, <<"error">>).
+-define(ST_FINISHED, <<"completed">>).
+-define(ST_RUNNING, <<"running">>).
+-define(ST_INITIALIZING, <<"initializing">>).
+-define(ST_FAILED, <<"failed">>).
+-define(ST_PENDING, <<"pending">>).
+-define(ST_ERROR, <<"error">>).
+-define(ST_CRASHING, <<"crashing">>).
+-define(ST_TRIGGERED, <<"triggered">>).
+
+% Fields couch job data objects
+-define(REP, <<"rep">>).
+-define(REP_PARSE_ERROR, <<"rep_parse_error">>).
+-define(STATE, <<"state">>).
+-define(STATE_INFO, <<"state_info">>).
+-define(DOC_STATE, <<"doc_state">>).
+-define(DB_NAME, <<"db_name">>).
+-define(DOC_ID, <<"doc_id">>).
+-define(ERROR_COUNT, <<"error_count">>).
+-define(LAST_UPDATED, <<"last_updated">>).
+
+% Accepted job message tag
+-define(ACCEPTED_JOB, accepted_job).
+
+
 
 -type rep_id() :: binary().
 -type user_name() :: binary() | null.
 -type db_doc_id() :: {binary(), binary() | '_'}.
 -type seconds() :: non_neg_integer().
--type rep_start_result() ::
-    {ok, rep_id()} |
-    ignore |
-    {temporary_error, binary()} |
-    {permanent_failure, binary()}.
-
-
--record(doc_worker_result, {
-    id :: db_doc_id(),
-    wref :: reference(),
-    result :: rep_start_result()
-}).
diff --git a/src/couch_replicator/src/couch_replicator_acceptor.erl b/src/couch_replicator/src/couch_replicator_acceptor.erl
new file mode 100644
index 0000000..bd67d66
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_acceptor.erl
@@ -0,0 +1,49 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_acceptor).
+
+
+-include("couch_replicator.hrl").
+
+
+-export([
+   start/4,
+   stop/1
+]).
+
+
+start(_Type, Count, _Parent, _Opts) when Count < 1 ->
+    #{};
+
+start(Type, Count, Parent, Opts) when Count > 1 ->
+    lists:foldl(fun(_N, Acc) ->
+        Pid = spawn_link(fun() -> accept(Type, Parent, Opts) end),
+        Acc#{Pid => true}
+    end, #{}, lists:seq(1, Max)).
+
+
+stop(#{} = Acceptors) ->
+    maps:map(fun(Pid, true) ->
+        unlink(Pid),
+        exit(Pid, kill)
+    end, Acceptors),
+    ok.
+
+
+accept(Type, Parent, Opts) ->
+    case couch_jobs:accept(Type, Opts) of
+        {ok, Job, JobData} ->
+            ok = gen_server:cast(Parent, {?ACCEPTED_JOB, Job, JobData});
+        {error, not_found} ->
+            ok
+    end.
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index 6f44814..9899bef 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -29,7 +29,6 @@
 
 -export([
     during_doc_update/3,
-    after_db_create/1,
     after_db_delete/1
 ]).
 
@@ -56,31 +55,17 @@
 -define(INITIAL_BACKOFF_EXPONENT, 64).
 -define(MIN_FILTER_DELAY_SEC, 60).
 
--type filter_type() ::  nil | view | user | docids | mango.
 -type repstate() :: initializing | error | scheduled.
 
 
--record(rdoc, {
-    id :: db_doc_id() | '_' | {any(), '_'},
-    state :: repstate() | '_',
-    rep :: #rep{} | nil | '_',
-    rid :: rep_id() | nil | '_',
-    filter :: filter_type() | '_',
-    info :: binary() | nil | '_',
-    errcnt :: non_neg_integer() | '_',
-    worker :: reference() | nil | '_',
-    last_updated :: erlang:timestamp() | '_'
-}).
+-define(MAX_ACCEPTORS, 10).
+-define(MAX_JOBS, 500).
 
 
 during_doc_update(#doc{} = Doc, Db, _UpdateType) ->
     couch_stats:increment_counter([couch_replicator, docs, db_changes]),
     ok = process_change(Db, Doc).
 
-after_db_create(#{name := DbName}) ->
-    couch_stats:increment_counter([couch_replicator, docs, dbs_created]),
-    couch_replicator_docs:ensure_rep_ddoc_exists(DbName).
-
 
 after_db_delete(#{name := DbName}) ->
     couch_stats:increment_counter([couch_replicator, docs, dbs_deleted]),
@@ -112,16 +97,16 @@ process_change(#{name := DbName} = Db, #doc{} = Doc) ->
     DocState = get_json_value(<<"_replication_state">>, Props, null),
     case couch_jobs:get_job_data(Db, ?REP_DOCS, docs_job_id(DbName, DocId)) of
         {error, not_found} ->
-            update_replication_job(Db, DbName, DocId, Rep, RepError, DocState);
-        {ok, #{<<"rep">> := null, <<"rep_parse_error">> := RepError}}
+            add_rep_doc_job(Db, DbName, DocId, Rep, RepError, DocState);
+        {ok, #{?REP := null, ?REP_PARSE_ERROR := RepError}}
                 when Rep =:= null ->
             % Same error as before occurred, don't bother updating the job
             ok;
-        {ok, #{<<"rep">> := null}} when Rep =:= null ->
+        {ok, #{?REP := null}} when Rep =:= null ->
             % Error occured but it's a different error. Update the job so user
             % sees the new error
-            update_replication_job(Db, DbName, DocId, Rep, RepError, DocState);
-        {ok, #{<<"rep">> := OldRep, <<"rep_parse_error">> := OldError}} ->
+            add_rep_doc_job(Db, DbName, DocId, Rep, RepError, DocState);
+        {ok, #{?REP := OldRep, ?REP_PARSE_ERROR := OldError}} ->
             NormOldRep = couch_replicator_util:normalize_rep(OldRep),
             NormRep = couch_replicator_util:normalize_rep(Rep),
             case NormOldRep == NormRep of
@@ -130,12 +115,14 @@ process_change(#{name := DbName} = Db, #doc{} = Doc) ->
                     % for the replication job have changed, so make it a no-op
                     ok;
                 false ->
-                    update_replication_job(Db, DbName, DocId, Rep, RepError,
+                    add_rep_doc_job(Db, DbName, DocId, Rep, RepError,
                         DocState)
             end
     end.
 
 
+
+
 rep_docs_job_execute(#{} = Job, #{<<"rep">> := null} = JobData) ->
     #{
         <<"rep_parse_error">> := Error,
@@ -206,55 +193,228 @@ process_updated({DbName, _DocId} = Id, JsonRepDoc) ->
     gen_server:call(?MODULE, {updated, Id, Rep, Filter}, infinity).
 
 
-% Doc processor gen_server API and callbacks
-
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [],  []).
 
 
 init([]) ->
-    ?MODULE = ets:new(?MODULE, [named_table, {keypos, #rdoc.id},
-        {read_concurrency, true}, {write_concurrency, true}]),
-    {ok, nil}.
+    process_flag(trap_exit, true),
+    St = #{
+        acceptors => #{},
+        workers => #{}
+    },
+    {ok, update_config(St), 0}.
 
 
-terminate(_Reason, _State) ->
+terminate(_Reason, #{} = St) ->
+    #{workers := Workers, acceptors := Acceptors},
+    lists:foreach(fun(WPid) -> unlink(Pid), exit(Pid, 9) end, Workers),
+    couch_replicator_job_acceptor:stop(Acceptors),
     ok.
 
 
-handle_call({updated, Id, Rep, Filter}, _From, State) ->
-    ok = updated_doc(Id, Rep, Filter),
-    {reply, ok, State};
+handle_call(Msg, _From, #{} = St) ->
+    {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
 
-handle_call({removed, Id}, _From, State) ->
-    ok = removed_doc(Id),
-    {reply, ok, State};
+handle_cast({?ACCEPTED_JOB, Job, JobData}, #{} = St) ->
+    {noreply, execute_rep_doc_job(Job, JobData, St)};
 
-handle_call({completed, Id}, _From, State) ->
-    true = ets:delete(?MODULE, Id),
-    {reply, ok, State};
+handle_cast(Msg, #{} = St) ->
+    {stop, {bad_cast, Msg}, St}.
 
-handle_call({clean_up_replications, DbName}, _From, State) ->
-    ok = removed_db(DbName),
-    {reply, ok, State}.
 
-handle_cast(Msg, State) ->
-    {stop, {error, unexpected_message, Msg}, State}.
+handle_info({'EXIT', Pid, Reason}, #{} = St) ->
+    #{workers := Workers, acceptors := Acceptors} = St,
+    case {maps:is_key(Pid, Acceptors), maps:is_key(Pid, Workers)} of
+        {false, false} -> handle_unknown_pid(Pid, Reason, St);
+        {true, false} -> handle_acceptor_died(Pid, Reason, St);
+        {false, true} -> handle_worker_died(Pid, Reason, St)
+    end;
+
+handle_info(timeout, #{} = St) ->
+    {noreply, maybe_start_acceptors(St)};
 
+handle_info(Msg, St) ->
+    {stop, {bad_info, Msg}, St}.
 
-handle_info({'DOWN', _, _, _, #doc_worker_result{id = Id, wref = Ref,
-        result = Res}}, State) ->
-    ok = worker_returned(Ref, Id, Res),
-    {noreply, State};
 
-handle_info(_Msg, State) ->
-    {noreply, State}.
+%% handle_info({'DOWN', _, _, _, #doc_worker_result{id = Id, wref = Ref,
+%%         result = Res}}, State) ->
+%%     ok = worker_returned(Ref, Id, Res),
+%%     {noreply, State};
 
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
 
+update_config(#{} = St) ->
+    MaxJobs = config:get_integer("replicator",
+        "max_doc_processor_jobs", ?MAX_JOBS),
+    MaxAcceptors = config:get_integer("replicator",
+        "max_doc_processor_acceptors", ?MAX_ACCEPTORS),
+    AcceptTimeoutSec = config:get_integer("replicator",
+        "doc_processor_accept_timeout_sec", ?ACCEPT_TIMEOUT_SEC),
+    AcceptFudgeSec = config:get_integer("replicator",
+        "doc_processor_accept_fudge_sec", ?ACCEPT_FUDGE_SEC),
+    St#{
+        max_jobs => MaxJobs,
+        max_acceptors => MaxAcceptors,
+        accept_timeout_sec => AcceptTimeoutSec,
+        accept_fudge_sec => AcceptFudgeSec,
+    }.
+
+
+handle_acceptor_died(Pid, normal, #{acceptors := Acceptors} = St1) ->
+    St2 = St1#{acceptors := maps:remove(Pid, Acceptors)},
+    St3 = update_config(St2),
+    St4 = maybe_start_acceptors(St3),
+    {noreply, St4};
+
+handle_acceptor_died(Pid, Error, #{acceptors := Acceptors} = St) ->
+    Msg = "~p : acceptor ~p died with ~p",
+    couch_log:error(Msg, [?ERROR, Pid, Reason]),
+    {stop, {acceptor_pid_exit, Pid, Reason}, St}.
+
+
+handle_worker_died(Pid, normal, #{workers := Workers} = St1) ->
+    St2 = St1#{workers := maps:remove(Pid, Workers)},
+    St3 = maybe_start_acceptors(St2),
+    {noreply, St3};
+
+handle_worker_died(Pid, Error, #{workers := Workers} = St) ->
+    Msg = "~p : acceptor ~p died with ~p",
+    couch_log:error(Msg, [?ERROR, Pid, Reason]),
+    {stop, {worker_pid_exit, Pid, Reason}, St}.
+
+
+handle_unknown_pid(Pid, Reason, #{} = St) ->
+    Msg = "~p : unknown pid ~p died with ~p",
+    couch_log:error(Msg, [?MODULE, Pid, Reason]),
+    {stop, {unknown_pid_exit, Pid, Reason}, St};
+
+
+maybe_start_acceptors(#st{} = St1) when
+    St2 = update_config(St1),
+    #{
+        workers := Workers,
+        acceptors := Acceptors,
+        max_jobs := MaxJobs,
+        max_acceptors := MaxAcceptors
+    } = St2,
+    WCount = map_size(Workers),
+    ACount = map_size(Acceptors),
+    case ACount + WCount < MaxJobs of
+        true -> start_acceptors(MaxAcceptors - ACount, St2);
+        false -> St2
+    end.
+
+
+start_acceptors(N, #st{} = St) ->
+    St#{
+       acceptors := Acceptors,
+       accept_timeout_sec := TimeoutSec,
+       accept_fudge_sec := FudgeSec
+    },
+    Opts = #{
+        timeout = TimeoutSec,
+        max_sched_time = erlang:system_time(second) + FudgeSec
+    },
+    Pids = couch_replicator_acceptor:start(?REP_DOCS, N, self(), Opts),
+    St#{acceptors := maps:merge(Acceptors, Pids)}.
+
+
+start_worker(Job, #{} = JobData, #{workers := Workers} = St) ->
+    Parent  = self(),
+    Pid = spawn_link(fun() -> rep_doc_job_worker(Job, JobData, Parent) end),
+    St#{workers := Workers#{Pid => true}}.
+
+
+rep_doc_job_worker(Job, #{?REP := null} = RepDocData, Parent) ->
+    #{
+        ?REP_PARSE_ERROR := Error,
+        ?DB_NAME := DbName,
+        ?DOC_ID := DocId
+        ?ERROR_COUNT := ErrorCount,
+    } = RepDocData,
+    RepDocData1 = RepDocData#{
+        ?LAST_UPDATED := erlang:system_time(),
+        ?ERROR_COUNT := ErrorCount + 1,
+        ?FINISHED_STATE := ?FAILED,
+        ?FINISHED_RESULT := Error,
+    },
+    case couch_jobs:finish(undefined, Job, RepDocData1) of
+        ok ->
+            couch_replicator_docs:update_failed(DbName, DocId, Error),
+            St;
+        {error, JobError} ->
+            Msg = "~p : replication ~s job could not finish. JobError:~p",
+            couch_log:error(Msg, [?MODULE, RepId, JobError]),
+            St
+    end;
+
+rep_doc_job_worker(Job, #{?REP := #{}} = RepDocData, Parent) ->
+    #{
+        ?REP := Rep1,
+        ?DOC_STATE := DocState,
+    } = RepDocData,
+    ok = remove_old_state_fields(Rep, DocState),
+    % Calculate replication ID. In most case this is fast and easy
+    % but for filtered replications with user JS filters this
+    % means making a remote connection to the source db.
+    Rep2 = #{<<"id">> := RepId} = couch_replicator_docs:update_rep_id(Rep1),
+    try
+        couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+            maybe_start_replication_job(JTx, Rep)
+        end)
+    catch
+        throw:{job_start_error, halt} ->
+            Msg = "~p : replication doc job ~s lock conflict",
+            couch_log:error(Msg, [?MODULE, RepId]),
+            exit(normal);
+        throw:{job_start_error, Error} ->
+            Msg = "~p : replication job start failed ~slock conflict",
+            couch_log:error(Msg, [?MODULE, RepId])
+    end.
+
+
+maybe_start_replication_job(JTx, Rep) ->
+     case couch_jobs:get_job_data(JTx, ?REP_JOBS, RepId) of
+        {error, not_found} ->
+            RepJobData = #{
+                ?REP := Rep2,
+                ?STATE := ST_PENDING,
+                ?STATE_INFO := null,
+            },
+            ok = couch_jobs:add(JTx, ?REP_JOBS, RepJobData),
+            RepDocData1 = RepDocData#{
+                ?REP := Rep2,
+                ?STATE := ST_SCHEDULED,
+                ?STATE_INFO := null,
+                ?ERROR_COUNT := 0,
+                ?LAST_UPDATED => erlang:system_info()
+            },
+            case couch_jobs:finish(JTx, Job, RepDocData1) of
+                ok -> ok;
+                Error -> throw({job_start_error, Error})
+
+remove_old_state_fields(#{} = Rep, ?TRIGGERED) ->
+    case update_docs() of
+        true -> ok;
+        false -> couch_replicator_docs:remove_state_fields(DbName, DocId)
+    end;
+
+remove_old_state_fields(#{} = Rep, ?ERROR) ->
+    case update_docs() of
+        true -> ok;
+        false -> couch_replicator_docs:remove_state_fields(DbName, DocId)
+    end;
+
+maybe_remove_old_state_fields(#{} = _Rep, _) ->
+    ok.
+
+
 % Doc processor gen_server private helper functions
 
 % Handle doc update -- add to ets, then start a worker to try to turn it into
@@ -592,20 +752,24 @@ ejson_doc_state_filter(State, States) when is_list(States), is_atom(State) ->
     lists:member(State, States).
 
 
--spec update_replication(any(), binary(), binary(), #{} | null,
+-spec add_rep_doc_job(any(), binary(), binary(), #{} | null,
     binary() | null, binary() | null) -> ok.
-update_replication_job(Tx, DbName, DocId, Rep, RepParseError, DocState) ->
+add_rep_doc_job(Tx, DbName, DocId, Rep, RepParseError, DocState) ->
     JobId = docs_job_id(DbName, DocId),
     ok = remove_replication_by_doc_job_id(Db, JobId),
-    RepDocsJob = #{
-        <<"rep_id">> := null,
-        <<"db_name">> := DbName,
-        <<"doc_id">> := DocId,
-        <<"rep">> := Rep,
-        <<"rep_parse_error">> := RepParseError,
-        <<"doc_state">> := DocState
+    RepDocData = #{
+        ?REP => Rep,
+        ?REP_PARSE_ERROR => RepParseError,
+        ?DOC_STATE => DocState,
+        ?DB_NAME => DbName,
+        ?DOC_ID => DocId,
+        ?STATE => ST_INITIALIZING,
+        ?ERROR_COUNT => 0,
+        ?LAST_UPDATED => erlang:system_time(),
+        ?STATE => null,
+        ?STATE_INFO => null.
     },
-    ok = couch_jobs:add(Tx, ?REP_DOCS, RepDocsJob).
+    ok = couch_jobs:add(Tx, ?REP_DOCS, RepDocData).
 
 
 docs_job_id(DbName, Id) when is_binary(DbName), is_binary(Id) ->
@@ -617,10 +781,10 @@ remove_replication_by_doc_job_id(Tx, Id) ->
     case couch_jobs:get_job_data(Tx, ?REP_DOCS, Id) of
         {error, not_found} ->
             ok;
-        {ok, #{<<"rep_id">> := null}} ->
+        {ok, #{?REP := {<<"id">> :=  null}}} ->
             couch_jobs:remove(Tx, ?REP_DOCS, Id),
             ok;
-        {ok, #{<<"rep_id">> := RepId}} ->
+        {ok, #{?REP := {<<"id">> := RepId}}} ->
             couch_jobs:remove(Tx, ?REP_JOBS, RepId),
             couch_jobs:remove(Tx, ?REP_DOCS, Id),
             ok
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index cdc6a10..eaf8161 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -138,70 +138,17 @@ update_error(#rep{} = Rep, Error) ->
 
 -spec ensure_rep_db_exists() -> ok.
 ensure_rep_db_exists() ->
-    Opts = [?CTX, sys_db, nologifmissing],
-    case fabric2_db:create(?REP_DB_NAME, Opts) of
-        {error, file_exists} ->
-            ok;
-        {ok, _Db} ->
-            ok
+    Opts = [?CTX, sys_db],
+    case fabric2_db:create(?REP_DB_NAME, [?CTX, sys_db]) of
+        {error, file_exists} -> ok;
+        {ok, _Db} -> ok
     end.
 
 
--spec ensure_rep_ddoc_exists(binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb) ->
-    DDocId = ?REP_DESIGN_DOC,
-    case open_rep_doc(RepDb, DDocId) of
-        {not_found, database_does_not_exist} ->
-            %% database was deleted.
-            ok;
-        {not_found, _Reason} ->
-            DocProps = replication_design_doc_props(DDocId),
-            DDoc = couch_doc:from_json_obj({DocProps}),
-            couch_log:notice("creating replicator ddoc ~p", [RepDb]),
-            {ok, _Rev} = save_rep_doc(RepDb, DDoc);
-        {ok, Doc} ->
-            Latest = replication_design_doc_props(DDocId),
-            {Props0} = couch_doc:to_json_obj(Doc, []),
-            {value, {_, Rev}, Props} = lists:keytake(<<"_rev">>, 1, Props0),
-            case compare_ejson({Props}, {Latest}) of
-                true ->
-                    ok;
-                false ->
-                    LatestWithRev = [{<<"_rev">>, Rev} | Latest],
-                    DDoc = couch_doc:from_json_obj({LatestWithRev}),
-                    couch_log:notice("updating replicator ddoc ~p", [RepDb]),
-                    try
-                        {ok, _} = save_rep_doc(RepDb, DDoc)
-                    catch
-                        throw:conflict ->
-                            %% ignore, we'll retry next time
-                            ok
-                    end
-            end
-    end,
-    ok.
-
-
--spec compare_ejson({[_]}, {[_]}) -> boolean().
-compare_ejson(EJson1, EJson2) ->
-    EjsonSorted1 = couch_replicator_filters:ejsort(EJson1),
-    EjsonSorted2 = couch_replicator_filters:ejsort(EJson2),
-    EjsonSorted1 == EjsonSorted2.
-
-
--spec replication_design_doc_props(binary()) -> [_].
-replication_design_doc_props(DDocId) ->
-    [
-        {<<"_id">>, DDocId},
-        {<<"language">>, <<"javascript">>},
-        {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
-    ].
-
-
 -spec parse_rep_doc_without_id({[_]}) -> #{}.
 parse_rep_doc_without_id(RepDoc) ->
     {ok, Rep} = try
-        parse_rep_doc_without_id(RepDoc, rep_user_name(RepDoc))
+        parse_rep_doc_without_id(RepDoc, null)
     catch
         throw:{error, Reason} ->
             throw({bad_rep_doc, Reason});
@@ -358,14 +305,6 @@ save_rep_doc(DbName, Doc) ->
     end.
 
 
--spec rep_user_name({[_]}) -> binary() | null.
-rep_user_name({RepDoc}) ->
-    case get_json_value(<<"user_ctx">>, RepDoc) of
-        undefined -> null;
-        {UserCtx} -> get_json_value(<<"name">>, UserCtx, null)
-    end.
-
-
 -spec parse_rep_db(#{}, #{}, #{}) -> #{}.
 parse_rep_db(#{} = Endpoint, #{} = ProxyParams, #{} = Options) ->
     ProxyURL = case ProxyParams of
@@ -634,7 +573,8 @@ before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
        roles = Roles,
        name = Name
     } = fabric2_db:get_user_ctx(Db),
-    case lists:member(<<"_replicator">>, Roles) of
+    IsReplicator = case lists:member(<<"_replicator">>, Roles),
+    Doc1 = case IsReplicator of
     true ->
         Doc;
     false ->
@@ -649,12 +589,21 @@ before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
                 Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
             ok ->
                 Doc;
-            _ ->
+            false ->
                 throw({forbidden, <<"Can't update replication documents",
                     " from other users.">>})
             end
         end
-    end.
+    end,
+    case IsReplicator orelse Doc1#doc.deleted of
+        true ->
+            ok;  % If replicator or deleting don't validate doc body
+        false ->
+            % Encode as a map and normalize all field names as binaries
+            BMap = ?JSON_DECODE(?JSON_ENCODE(Doc1#doc.body)),
+            couch_replicator_validate_doc:validate(BMap)
+    end,
+    Doc1.
 
 
 -spec after_doc_read(#doc{}, Db::any()) -> #doc{}.
diff --git a/src/couch_replicator/src/couch_replicator_js_functions.hrl b/src/couch_replicator/src/couch_replicator_js_functions.hrl
deleted file mode 100644
index d410433..0000000
--- a/src/couch_replicator/src/couch_replicator_js_functions.hrl
+++ /dev/null
@@ -1,177 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License.  You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--define(REP_DB_DOC_VALIDATE_FUN, <<"
-    function(newDoc, oldDoc, userCtx) {
-        function reportError(error_msg) {
-            log('Error writing document `' + newDoc._id +
-                '\\' to the replicator database: ' + error_msg);
-            throw({forbidden: error_msg});
-        }
-
-        function validateEndpoint(endpoint, fieldName) {
-            if ((typeof endpoint !== 'string') &&
-                ((typeof endpoint !== 'object') || (endpoint === null))) {
-
-                reportError('The `' + fieldName + '\\' property must exist' +
-                    ' and be either a string or an object.');
-            }
-
-            if (typeof endpoint === 'object') {
-                if ((typeof endpoint.url !== 'string') || !endpoint.url) {
-                    reportError('The url property must exist in the `' +
-                        fieldName + '\\' field and must be a non-empty string.');
-                }
-
-                if ((typeof endpoint.auth !== 'undefined') &&
-                    ((typeof endpoint.auth !== 'object') ||
-                        endpoint.auth === null)) {
-
-                    reportError('`' + fieldName +
-                        '.auth\\' must be a non-null object.');
-                }
-
-                if ((typeof endpoint.headers !== 'undefined') &&
-                    ((typeof endpoint.headers !== 'object') ||
-                        endpoint.headers === null)) {
-
-                    reportError('`' + fieldName +
-                        '.headers\\' must be a non-null object.');
-                }
-            }
-        }
-
-        var isReplicator = (userCtx.roles.indexOf('_replicator') >= 0);
-        var isAdmin = (userCtx.roles.indexOf('_admin') >= 0);
-
-        if (isReplicator) {
-            // Always let replicator update the replication document
-            return;
-        }
-
-        if (newDoc._replication_state === 'failed') {
-            // Skip validation in case when we update the document with the
-            // failed state. In this case it might be malformed. However,
-            // replicator will not pay attention to failed documents so this
-            // is safe.
-            return;
-        }
-
-        if (!newDoc._deleted) {
-            validateEndpoint(newDoc.source, 'source');
-            validateEndpoint(newDoc.target, 'target');
-
-            if ((typeof newDoc.create_target !== 'undefined') &&
-                (typeof newDoc.create_target !== 'boolean')) {
-
-                reportError('The `create_target\\' field must be a boolean.');
-            }
-
-            if ((typeof newDoc.continuous !== 'undefined') &&
-                (typeof newDoc.continuous !== 'boolean')) {
-
-                reportError('The `continuous\\' field must be a boolean.');
-            }
-
-            if ((typeof newDoc.doc_ids !== 'undefined') &&
-                !isArray(newDoc.doc_ids)) {
-
-                reportError('The `doc_ids\\' field must be an array of strings.');
-            }
-
-            if ((typeof newDoc.selector !== 'undefined') &&
-                (typeof newDoc.selector !== 'object')) {
-
-                reportError('The `selector\\' field must be an object.');
-            }
-
-            if ((typeof newDoc.filter !== 'undefined') &&
-                ((typeof newDoc.filter !== 'string') || !newDoc.filter)) {
-
-                reportError('The `filter\\' field must be a non-empty string.');
-            }
-
-            if ((typeof newDoc.doc_ids !== 'undefined') &&
-                (typeof newDoc.selector !== 'undefined')) {
-
-                reportError('`doc_ids\\' field is incompatible with `selector\\'.');
-            }
-
-            if ( ((typeof newDoc.doc_ids !== 'undefined') ||
-                  (typeof newDoc.selector !== 'undefined')) &&
-                 (typeof newDoc.filter !== 'undefined') ) {
-
-                reportError('`filter\\' field is incompatible with `selector\\' and `doc_ids\\'.');
-            }
-
-            if ((typeof newDoc.query_params !== 'undefined') &&
-                ((typeof newDoc.query_params !== 'object') ||
-                    newDoc.query_params === null)) {
-
-                reportError('The `query_params\\' field must be an object.');
-            }
-
-            if (newDoc.user_ctx) {
-                var user_ctx = newDoc.user_ctx;
-
-                if ((typeof user_ctx !== 'object') || (user_ctx === null)) {
-                    reportError('The `user_ctx\\' property must be a ' +
-                        'non-null object.');
-                }
-
-                if (!(user_ctx.name === null ||
-                    (typeof user_ctx.name === 'undefined') ||
-                    ((typeof user_ctx.name === 'string') &&
-                        user_ctx.name.length > 0))) {
-
-                    reportError('The `user_ctx.name\\' property must be a ' +
-                        'non-empty string or null.');
-                }
-
-                if (!isAdmin && (user_ctx.name !== userCtx.name)) {
-                    reportError('The given `user_ctx.name\\' is not valid');
-                }
-
-                if (user_ctx.roles && !isArray(user_ctx.roles)) {
-                    reportError('The `user_ctx.roles\\' property must be ' +
-                        'an array of strings.');
-                }
-
-                if (!isAdmin && user_ctx.roles) {
-                    for (var i = 0; i < user_ctx.roles.length; i++) {
-                        var role = user_ctx.roles[i];
-
-                        if (typeof role !== 'string' || role.length === 0) {
-                            reportError('Roles must be non-empty strings.');
-                        }
-                        if (userCtx.roles.indexOf(role) === -1) {
-                            reportError('Invalid role (`' + role +
-                                '\\') in the `user_ctx\\'');
-                        }
-                    }
-                }
-            } else {
-                if (!isAdmin) {
-                    reportError('The `user_ctx\\' property is missing (it is ' +
-                       'optional for admins only).');
-                }
-            }
-        } else {
-            if (!isAdmin) {
-                if (!oldDoc.user_ctx || (oldDoc.user_ctx.name !== userCtx.name)) {
-                    reportError('Replication documents can only be deleted by ' +
-                        'admins or by the users who created them.');
-                }
-            }
-        }
-    }
-">>).
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index e8ddc84..0e1c3d9 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -87,8 +87,8 @@ start_link(#{] = Job, #{} = JobData) ->
         {ok, Pid} ->
             {ok, Pid};
         {error, Reason} ->
-            #{<<"rep">> := Rep} = JobData,
-            {<<"id">> := Id, <<"source">> := Src, <<"target">> := Ttg} = Rep,
+            #{?REP := Rep} = JobData,
+            {<<"id">> := Id, ?SOURCE := Src, ?TARGET := Ttg} = Rep,
             Source = couch_replicator_api_wrap:db_uri(Src),
             Target = couch_replicator_api_wrap:db_uri(Tgt),
             ErrMsg = "failed to start replication `~s` (`~s` -> `~s`)",
@@ -332,18 +332,18 @@ terminate(shutdown, #rep_state{id = RepId} = State) ->
 terminate({shutdown, max_backoff}, {error, {#{} = Job, #{} = JobData}}) ->
     % Here we handle the case when replication fails during initialization.
     % That is before the #rep_state{} is even built.
-    #{<<"rep">> := #{<<"id">> := RepId}} = JobData,
+    #{?REP := #{<<"id">> := RepId}} = JobData,
     couch_stats:increment_counter([couch_replicator, failed_starts]),
     couch_log:warning("Replication `~s` reached max backoff ", [RepId]),
     finish_couch_job(Job, JobData, <<"error">>, max_backoff);
 
 terminate({shutdown, {error, Error}}, {error, Class, Stack, {Job, JobData}}) ->
     % Here we handle the case when replication fails during initialization.
-    #{<<"rep">> := Rep} = JobData,
+    #{?REP := Rep} = JobData,
     #{
        <<"id">> := Id,
-       <<"source">> := Source0,
-       <<"target">> := Target0,
+       ?SOURCE := Source0,
+       ?TARGET := Target0,
        <<"doc_id">> := DocId,
        <<"db_name">> := DbName
     } = Rep,
@@ -525,7 +525,7 @@ finish_couch_job(#rep_state{} = State, FinishedState, Result) ->
 
 
 finish_couch_job(#{} = Job, #{} = JobData, FinishState, Result0) ->
-    #{<<"rep">> := #{<<"id">> := RepId}} = JobData,
+    #{?REP := #{<<"id">> := RepId}} = JobData,
     case Result of
         null -> null;
         #{} -> Result0;
@@ -534,8 +534,8 @@ finish_couch_job(#{} = Job, #{} = JobData, FinishState, Result0) ->
         Other -> couch_replicator_utils:rep_error_to_binary(Result0)
     end,
     JobData= JobData0#{
-        <<"finished_state">> => FinishState,
-        <<"finished_result">> => Result
+        ?FINISHED_STATE => FinishState,
+        ?FINISHED_RESULT => Result
     },
     case couch_jobs:finish(undefined, Job, JobData) of
         ok ->
@@ -566,18 +566,17 @@ cancel_timer(#rep_state{timer = Timer} = State) ->
     State#rep_state{timer = nil}.
 
 
-init_state(#{} = Job, #{<<"rep">> =: Rep}} = JobData) ->
+init_state(#{} = Job, #{?REP =: Rep}} = JobData) ->
     #{
         <<"id">> := Id,
         <<"base_id">> := BaseId,
-        <<"source">> := Src0,
-        <<"target">> := Tgt,
+        ?SOURCE := Src0,
+        ?TARGET := Tgt,
         <<"type">> := Type,
         <<"view">> := View,
         <<"start_time">> := StartTime,
         <<"stats">> := Stats,
         <<"options">> := OptionsMap,
-        <<"user_ctx">> := UserCtx,
         <<"db_name">> := DbName,
         <<"doc_id">> := DocId,
     } = Rep,
@@ -589,13 +588,9 @@ init_state(#{} = Job, #{<<"rep">> =: Rep}} = JobData) ->
     % Adjust minimum number of http source connections to 2 to avoid deadlock
     Src = adjust_maxconn(Src0, BaseId),
     {ok, Source} = couch_replicator_api_wrap:db_open(Src),
-    {CreateTargetParams} = get_value(create_target_params, Options, {[]}),
-    {ok, Target} = couch_replicator_api_wrap:db_open(Tgt,
-
     CreateTgt = get_value(create_target, Options, false),
-    CreateParams = maps:to_list(get_value(create_target_params, Options, #{}),
-    {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, UserCtx, CreateTgt,
-        CreateParams),
+    TParams = maps:to_list(get_value(create_target_params, Options, #{}),
+    {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, CreateTgt, TParams),
 
     {ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source),
     {ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target),
diff --git a/src/couch_replicator/src/couch_replicator_validate_doc.erl b/src/couch_replicator/src/couch_replicator_validate_doc.erl
new file mode 100644
index 0000000..da14361
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_validate_doc.erl
@@ -0,0 +1,119 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_validate_doc).
+
+
+-include("couch_replicator.hrl").
+
+
+-export([
+    validate/2
+]).
+
+
+validate(#{<<"_replication_state">> := ?ST_FAILED}) ->
+    % If replication failed, allow updating even a malformed document
+    ok;
+
+validate(#{?SOURCE := _, ?TARGET := _} = Doc) ->
+    maps:fold(fun validate_field/2, Doc).
+    validate_mutually_exclusive_filters(Doc);
+
+validate(_) ->
+    fail("Both `source` and `target` fields must exist").
+
+
+validate_field(?SOURCE, V) when is_binary(V) ->
+    ok;
+validate_field(?SOURCE, #{?URL := _} = V) ->
+    maps:fold(fun validate_endpoint_field/2, V);
+validate_field(?SOURCE, _V) ->
+    fail("`source` must be a string or an object with an `url` field").
+
+validate_field(?TARGET, V) when is_binary(V) ->
+    ok;
+validate_field(?TARGET, #{?URL := _} = V) ->
+    maps:fold(fun validate_endpoint_field/2, V);
+validate_field(?TARGET, _V) ->
+    fail("`target` must be a string or an object with an `url` field").
+
+validate_field(?CONTINUOUS, V) when is_boolean(V) ->
+    ok;
+validate_field(?CONTINUOUS, _V) ->
+    fail("`continuous` should be a boolean");
+
+validate_field(?CREATE_TARGET, V) when is_boolean(V) ->
+    ok;
+validate_field(?CREATE_TARGET, _V) ->
+    fail("`create_target` should be a boolean");
+
+validate_field(?DOC_IDS, V) when is_list(V) ->
+    lists:foreach(fun(DocId) ->
+        case is_binary(DocId), byte_size(V) > 1 of
+            true -> ok;
+            false -> fail("`doc_ids` should be a list of strings")
+        end
+    end, V);
+validate_field(?DOC_IDS, _V) ->
+    fail("`doc_ids` should be a list of string");
+
+validate_field(?SELECTOR, #{} = _V) ->
+    ok;
+validate_field(?SELECTOR, _V) ->
+    fail("`selector` should be an object");
+
+validate_field(?FILTER, V) when is_binary(V), byte_size(V) > 1 ->
+    ok;
+validate_field(?FILTER, _V) ->
+    fail("`filter` should be a non-empty string");
+
+validate_field(?QUERY_PARAMS, V) when is_map(V) orelse V =:= null ->
+    ok;
+validate_field(?QUERY_PARAMS, _V) ->
+    fail("`query_params` should be a an object or `null`").
+
+
+validate_endpoint_field(?URL, V) when is_binary(V) ->
+    ok;
+validate_endpoint_field(?URL, _V) ->
+    fail("`url` endpoint field must be a string");
+
+validate_endpoint_field(?AUTH, #{} = V) ->
+    ok;
+validate_endpoint_field(?AUTH, _V) ->
+    fail("`auth` endpoint field must be an object");
+
+validate_endpoint_field(?HEADERS, #{} = V) ->
+    ok;
+validate_endpoint_field(?HEADERS, _V) ->
+    fail("`headers` endpoint field must be an object").
+
+
+validate_mutually_exclusive_filter(#{} = Doc) ->
+    DocIds = maps:get(?DOC_IDS, Doc, undefined),
+    Selector = maps:get(?SELECTOR, Doc, undefined),
+    Filter = maps:get(?FILTER, Doc, undefined),
+    Defined = [V || V <- [DocIds, Selector, Filter], V =/= undefined],
+    case length(Defined) > 1 of
+        true ->
+            fail("`doc_ids`, `selector` and `filter` are mutually exclusive");
+        false ->
+            ok
+    end.
+
+
+fail(Msg) when is_list(Msg) ->
+    fail(list_to_binary(Msg));
+
+fail(Msg) when is_binary(Msg) ->
+    throw({forbidden, Msg}).
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 9ec9f2b..ea32531 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -787,7 +787,7 @@ maybe_add_sys_db_callbacks(Db) ->
                 fun couch_replicator_docs:before_doc_update/3,
                 fun couch_replicator_doc_processor:during_doc_update/3,
                 fun couch_replicator_docs:after_doc_read/2,
-                fun couch_replicator_doc_processor:after_db_create/1,
+                undefined,
                 fun couch_replicator_doc_processor:after_db_delete/1
             };
         IsUsersDb ->