You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2022/11/30 05:23:14 UTC

[couchdb] branch main updated: Improve validation of replicator job parameters

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/main by this push:
     new f13ceb46c Improve validation of replicator job parameters
f13ceb46c is described below

commit f13ceb46ce3c120e1960fa47bfda0a606601900e
Author: Nick Vatamaniuc <va...@gmail.com>
AuthorDate: Tue Nov 22 11:31:54 2022 -0500

    Improve validation of replicator job parameters
    
    There are two main improvements:
    
      * Replace the auto-inserted replicator VDU with a BDU. Replicator already had
        a BDU to update the `"owner"` field, so plug right into it and validate
        everything we need there. This way, the validation and parsing logic is all
        in one module. The previously inserted VDU design doc will be deleted.
    
      * Allow constraining endpoint protocol types and socket options. Previously,
        users could create replications with any low level socket options. Some of
        those are dangerous and are possible "foot-guns". Restrict those options to
        a more usable set.
    
    In addition to those improvements, increase test coverage a bit by explicitly
    checking a few more parsing corner cases.
    
    Fixes #4273
---
 rel/overlay/etc/default.ini                        |  12 +
 src/chttpd/src/chttpd.erl                          |   4 +
 src/couch_replicator/src/couch_replicator.erl      |  10 +-
 .../src/couch_replicator_doc_processor.erl         |   9 +-
 .../src/couch_replicator_doc_processor_worker.erl  |  14 +-
 src/couch_replicator/src/couch_replicator_docs.erl | 820 +++++----------------
 src/couch_replicator/src/couch_replicator_ids.erl  |   8 +-
 .../src/couch_replicator_js_functions.hrl          | 183 -----
 ...licator_docs.erl => couch_replicator_parse.erl} | 750 +++++++------------
 .../src/couch_replicator_scheduler_job.erl         |  21 +-
 .../src/couch_replicator_utils.erl                 |  10 +-
 .../test/eunit/couch_replicator_compact_tests.erl  |   2 +-
 .../couch_replicator_error_reporting_tests.erl     |   2 +-
 .../test/eunit/couch_replicator_proxy_tests.erl    |  10 +-
 ...ch_replicator_retain_stats_between_job_runs.erl |   2 +-
 .../test/eunit/couch_replicator_test_helper.erl    |   2 +-
 src/docs/src/config/replicator.rst                 |  34 +
 src/fabric/src/fabric_doc_update.erl               |  14 +-
 18 files changed, 560 insertions(+), 1347 deletions(-)

diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 04448aabd..0efc4cb23 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -555,6 +555,18 @@ partitioned||* = true
 ; See the `inet` Erlang module's man page for the full list of options.
 ;socket_options = [{keepalive, true}, {nodelay, false}]
 
+; Valid socket options. Options not in this list are ignored. The full list of
+; options may be found at https://www.erlang.org/doc/man/inet.html#setopts-2.
+;valid_socket_options = buffer,keepalive,nodelay,priority,recbuf,sndbuf
+
+; Valid replication endpoint protocols. Replication jobs with endpoint urls not
+; in this list will fail to run.
+;valid_endpoint_protocols = http,https
+
+; Valid replication proxy protocols. Replication jobs with proxy urls not in
+; this list will fail to run.
+;valid_proxy_protocols = http,https,socks5
+
 ; Path to a file containing the user's certificate.
 ;cert_file = /full/path/to/server_cert.pem
 
diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl
index 13e919cb5..c25c18838 100644
--- a/src/chttpd/src/chttpd.erl
+++ b/src/chttpd/src/chttpd.erl
@@ -1096,6 +1096,10 @@ error_info({error, {database_name_too_long, DbName}}) ->
         <<"At least one path segment of `", DbName/binary, "` is too long.">>};
 error_info({doc_validation, Reason}) ->
     {400, <<"doc_validation">>, Reason};
+error_info({error, <<"endpoint has an invalid url">> = Reason}) ->
+    {400, <<"invalid_replication">>, Reason};
+error_info({error, <<"proxy has an invalid url">> = Reason}) ->
+    {400, <<"invalid_replication">>, Reason};
 error_info({missing_stub, Reason}) ->
     {412, <<"missing_stub">>, Reason};
 error_info(request_entity_too_large) ->
diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index 39b3903ea..935daaa80 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -25,10 +25,8 @@
 -include_lib("couch/include/couch_db.hrl").
 -include("couch_replicator.hrl").
 -include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
--include_lib("couch_mrview/include/couch_mrview.hrl").
 -include_lib("mem3/include/mem3.hrl").
 
--define(DESIGN_DOC_CREATION_DELAY_MSEC, 1000).
 -define(REPLICATION_STATES, [
     % Just added to scheduler
     initializing,
@@ -58,7 +56,7 @@
     | {error, any()}
     | no_return().
 replicate(PostBody, Ctx) ->
-    {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, Ctx),
+    {ok, Rep0} = couch_replicator_parse:parse_rep_doc(PostBody, Ctx),
     Rep = Rep0#rep{start_time = os:timestamp()},
     #rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep,
     case get_value(cancel, Options, false) of
@@ -138,12 +136,16 @@ replication_states() ->
 
 -spec strip_url_creds(binary() | {[_]}) -> binary().
 strip_url_creds(Endpoint) ->
-    try couch_replicator_docs:parse_rep_db(Endpoint, [], []) of
+    try couch_replicator_parse:parse_rep_db(Endpoint, [], []) of
         #httpdb{url = Url} ->
             iolist_to_binary(couch_util:url_strip_password(Url))
     catch
         throw:{error, local_endpoints_not_supported} ->
             Endpoint;
+        throw:{error, _} ->
+            % Avoid exposing any part of the URL in case there is a password in
+            % the malformed endpoint URL
+            null;
         error:_ ->
             % Avoid exposing any part of the URL in case there is a password in
             % the malformed endpoint URL
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index 436d7c44d..eb4c02b49 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -44,9 +44,7 @@
     notify_cluster_event/2
 ]).
 
--include_lib("couch/include/couch_db.hrl").
 -include("couch_replicator.hrl").
--include_lib("mem3/include/mem3.hrl").
 
 -import(couch_replicator_utils, [
     get_json_value/2,
@@ -77,9 +75,8 @@
 
 % couch_multidb_changes API callbacks
 
-db_created(DbName, Server) ->
+db_created(_DbName, Server) ->
     couch_stats:increment_counter([couch_replicator, docs, dbs_created]),
-    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
     Server.
 
 db_deleted(DbName, Server) ->
@@ -89,7 +86,7 @@ db_deleted(DbName, Server) ->
 
 db_found(DbName, Server) ->
     couch_stats:increment_counter([couch_replicator, docs, dbs_found]),
-    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
+    couch_replicator_docs:delete_old_rep_ddoc(DbName),
     Server.
 
 db_change(DbName, {ChangeProps} = Change, Server) ->
@@ -169,7 +166,7 @@ process_updated({DbName, _DocId} = Id, JsonRepDoc) ->
     % should propagate to db_change function and will be recorded as permanent
     % failure in the document. User will have to update the documet to fix the
     % problem.
-    Rep0 = couch_replicator_docs:parse_rep_doc_without_id(JsonRepDoc),
+    Rep0 = couch_replicator_parse:parse_rep_doc_without_id(JsonRepDoc),
     Rep = Rep0#rep{db_name = DbName, start_time = os:timestamp()},
     Filter =
         case couch_replicator_filters:parse(Rep#rep.options) of
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl b/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl
index 22c5f8584..b1014ffa5 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl
@@ -162,7 +162,7 @@ doc_processor_worker_test_() ->
 t_should_add_job() ->
     ?_test(begin
         Id = {?DB, ?DOC1},
-        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+        Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
         ?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)),
         ?assert(added_job())
     end).
@@ -172,7 +172,7 @@ t_already_running_same_docid() ->
     ?_test(begin
         Id = {?DB, ?DOC1},
         mock_already_running(?DB, ?DOC1),
-        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+        Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
         ?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)),
         ?assert(did_not_add_job())
     end).
@@ -182,7 +182,7 @@ t_already_running_transient() ->
     ?_test(begin
         Id = {?DB, ?DOC1},
         mock_already_running(null, null),
-        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+        Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
         ?assertMatch(
             {temporary_error, _},
             maybe_start_replication(
@@ -200,7 +200,7 @@ t_already_running_other_db_other_doc() ->
     ?_test(begin
         Id = {?DB, ?DOC1},
         mock_already_running(<<"otherdb">>, <<"otherdoc">>),
-        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+        Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
         ?assertMatch(
             {permanent_failure, _},
             maybe_start_replication(
@@ -217,7 +217,7 @@ t_already_running_other_db_other_doc() ->
 t_spawn_worker() ->
     ?_test(begin
         Id = {?DB, ?DOC1},
-        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+        Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
         WRef = make_ref(),
         meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, WRef),
         Pid = spawn_worker(Id, Rep, 0, WRef),
@@ -236,7 +236,7 @@ t_spawn_worker() ->
 t_ignore_if_doc_deleted() ->
     ?_test(begin
         Id = {?DB, ?DOC1},
-        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+        Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
         meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, nil),
         ?assertEqual(ignore, maybe_start_replication(Id, Rep, make_ref())),
         ?assertNot(added_job())
@@ -247,7 +247,7 @@ t_ignore_if_doc_deleted() ->
 t_ignore_if_worker_ref_does_not_match() ->
     ?_test(begin
         Id = {?DB, ?DOC1},
-        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+        Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
         meck:expect(
             couch_replicator_doc_processor,
             get_worker_ref,
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index a60f1a1e1..5fb86c4f5 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -13,15 +13,9 @@
 -module(couch_replicator_docs).
 
 -export([
-    parse_rep_doc/1,
-    parse_rep_doc/2,
-    parse_rep_db/3,
-    parse_rep_doc_without_id/1,
-    parse_rep_doc_without_id/2,
     before_doc_update/3,
     after_doc_read/2,
-    ensure_rep_ddoc_exists/1,
-    ensure_cluster_rep_ddoc_exists/1,
+    delete_old_rep_ddoc/1,
     remove_state_fields/2,
     update_doc_completed/3,
     update_failed/3,
@@ -31,24 +25,10 @@
 ]).
 
 -include_lib("couch/include/couch_db.hrl").
--include_lib("ibrowse/include/ibrowse.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
 -include("couch_replicator.hrl").
--include("couch_replicator_js_functions.hrl").
 
--import(couch_util, [
-    get_value/2,
-    get_value/3,
-    to_binary/1
-]).
-
--import(couch_replicator_utils, [
-    get_json_value/2,
-    get_json_value/3
-]).
-
--define(REP_DB_NAME, <<"_replicator">>).
+% The ID of now deleted design doc. On every *_replicator db discovery we try
+% to delete it. At some point in the future, remove this logic altogether.
 -define(REP_DESIGN_DOC, <<"_design/_replicator">>).
 -define(OWNER, <<"owner">>).
 -define(CTX, {user_ctx, #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]}}).
@@ -126,176 +106,32 @@ update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) ->
     ]),
     ok.
 
--spec ensure_rep_ddoc_exists(binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb) ->
+-spec delete_old_rep_ddoc(binary()) -> ok.
+delete_old_rep_ddoc(RepDb) ->
     case mem3:belongs(RepDb, ?REP_DESIGN_DOC) of
-        true ->
-            ensure_rep_ddoc_exists(RepDb, ?REP_DESIGN_DOC);
-        false ->
-            ok
+        true -> delete_old_rep_ddoc(RepDb, ?REP_DESIGN_DOC);
+        false -> ok
     end.
 
--spec ensure_rep_ddoc_exists(binary(), binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb, DDocId) ->
+-spec delete_old_rep_ddoc(binary(), binary()) -> ok.
+delete_old_rep_ddoc(RepDb, DDocId) ->
     case open_rep_doc(RepDb, DDocId) of
         {not_found, no_db_file} ->
-            %% database was deleted.
             ok;
         {not_found, _Reason} ->
-            DocProps = replication_design_doc_props(DDocId),
-            DDoc = couch_doc:from_json_obj({DocProps}),
-            couch_log:notice("creating replicator ddoc ~p", [RepDb]),
-            {ok, _Rev} = save_rep_doc(RepDb, DDoc);
+            ok;
         {ok, Doc} ->
-            Latest = replication_design_doc_props(DDocId),
-            {Props0} = couch_doc:to_json_obj(Doc, []),
-            {value, {_, Rev}, Props} = lists:keytake(<<"_rev">>, 1, Props0),
-            case compare_ejson({Props}, {Latest}) of
-                true ->
-                    ok;
-                false ->
-                    LatestWithRev = [{<<"_rev">>, Rev} | Latest],
-                    DDoc = couch_doc:from_json_obj({LatestWithRev}),
-                    couch_log:notice("updating replicator ddoc ~p", [RepDb]),
-                    try
-                        {ok, _} = save_rep_doc(RepDb, DDoc)
-                    catch
-                        throw:conflict ->
-                            %% ignore, we'll retry next time
-                            ok
-                    end
+            DeletedDoc = Doc#doc{deleted = true, body = {[]}},
+            try
+                save_rep_doc(RepDb, DeletedDoc)
+            catch
+                throw:conflict ->
+                    % ignore, we'll retry next time
+                    ok
             end
     end,
     ok.
 
--spec ensure_cluster_rep_ddoc_exists(binary()) -> ok.
-ensure_cluster_rep_ddoc_exists(RepDb) ->
-    DDocId = ?REP_DESIGN_DOC,
-    [#shard{name = DbShard} | _] = mem3:shards(RepDb, DDocId),
-    ensure_rep_ddoc_exists(DbShard, DDocId).
-
--spec compare_ejson({[_]}, {[_]}) -> boolean().
-compare_ejson(EJson1, EJson2) ->
-    EjsonSorted1 = couch_replicator_filters:ejsort(EJson1),
-    EjsonSorted2 = couch_replicator_filters:ejsort(EJson2),
-    EjsonSorted1 == EjsonSorted2.
-
--spec replication_design_doc_props(binary()) -> [_].
-replication_design_doc_props(DDocId) ->
-    [
-        {<<"_id">>, DDocId},
-        {<<"language">>, <<"javascript">>},
-        {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
-    ].
-
-% Note: parse_rep_doc can handle filtered replications. During parsing of the
-% replication doc it will make possibly remote http requests to the source
-% database. If failure or parsing of filter docs fails, parse_doc throws a
-% {filter_fetch_error, Error} excation. This exception should be considered
-% transient in respect to the contents of the document itself, since it depends
-% on netowrk availability of the source db and other factors.
--spec parse_rep_doc({[_]}) -> #rep{}.
-parse_rep_doc(RepDoc) ->
-    {ok, Rep} =
-        try
-            parse_rep_doc(RepDoc, rep_user_ctx(RepDoc))
-        catch
-            throw:{error, Reason} ->
-                throw({bad_rep_doc, Reason});
-            throw:{filter_fetch_error, Reason} ->
-                throw({filter_fetch_error, Reason});
-            Tag:Err ->
-                throw({bad_rep_doc, to_binary({Tag, Err})})
-        end,
-    Rep.
-
--spec parse_rep_doc_without_id({[_]}) -> #rep{}.
-parse_rep_doc_without_id(RepDoc) ->
-    {ok, Rep} =
-        try
-            parse_rep_doc_without_id(RepDoc, rep_user_ctx(RepDoc))
-        catch
-            throw:{error, Reason} ->
-                throw({bad_rep_doc, Reason});
-            Tag:Err ->
-                throw({bad_rep_doc, to_binary({Tag, Err})})
-        end,
-    Rep.
-
--spec parse_rep_doc({[_]}, #user_ctx{}) -> {ok, #rep{}}.
-parse_rep_doc(Doc, UserCtx) ->
-    {ok, Rep} = parse_rep_doc_without_id(Doc, UserCtx),
-    Cancel = get_value(cancel, Rep#rep.options, false),
-    Id = get_value(id, Rep#rep.options, nil),
-    case {Cancel, Id} of
-        {true, nil} ->
-            % Cancel request with no id, must parse id out of body contents
-            {ok, update_rep_id(Rep)};
-        {true, Id} ->
-            % Cancel request with an id specified, so do not parse id from body
-            {ok, Rep};
-        {false, _Id} ->
-            % Not a cancel request, regular replication doc
-            {ok, update_rep_id(Rep)}
-    end.
-
--spec parse_rep_doc_without_id({[_]}, #user_ctx{}) -> {ok, #rep{}}.
-parse_rep_doc_without_id({Props}, UserCtx) ->
-    {SrcProxy, TgtProxy} = parse_proxy_settings(Props),
-    Opts = make_options(Props),
-    case
-        get_value(cancel, Opts, false) andalso
-            (get_value(id, Opts, nil) =/= nil)
-    of
-        true ->
-            {ok, #rep{options = Opts, user_ctx = UserCtx}};
-        false ->
-            Source = parse_rep_db(get_value(<<"source">>, Props), SrcProxy, Opts),
-            Target = parse_rep_db(get_value(<<"target">>, Props), TgtProxy, Opts),
-            {Type, View} =
-                case couch_replicator_filters:view_type(Props, Opts) of
-                    {error, Error} ->
-                        throw({bad_request, Error});
-                    Result ->
-                        Result
-                end,
-            Rep = #rep{
-                source = Source,
-                target = Target,
-                options = Opts,
-                user_ctx = UserCtx,
-                type = Type,
-                view = View,
-                doc_id = get_value(<<"_id">>, Props, null)
-            },
-            % Check if can parse filter code, if not throw exception
-            case couch_replicator_filters:parse(Opts) of
-                {error, FilterError} ->
-                    throw({error, FilterError});
-                {ok, _Filter} ->
-                    ok
-            end,
-            {ok, Rep}
-    end.
-
-parse_proxy_settings(Props) when is_list(Props) ->
-    Proxy = get_value(<<"proxy">>, Props, <<>>),
-    SrcProxy = get_value(<<"source_proxy">>, Props, <<>>),
-    TgtProxy = get_value(<<"target_proxy">>, Props, <<>>),
-
-    case Proxy =/= <<>> of
-        true when SrcProxy =/= <<>> ->
-            Error = "`proxy` is mutually exclusive with `source_proxy`",
-            throw({bad_request, Error});
-        true when TgtProxy =/= <<>> ->
-            Error = "`proxy` is mutually exclusive with `target_proxy`",
-            throw({bad_request, Error});
-        true ->
-            {Proxy, Proxy};
-        false ->
-            {SrcProxy, TgtProxy}
-    end.
-
 % Update a #rep{} record with a replication_id. Calculating the id might involve
 % fetching a filter from the source db, and so it could fail intermetently.
 % In case of a failure to fetch the filter this function will throw a
@@ -386,316 +222,47 @@ save_rep_doc(DbName, Doc) ->
         couch_db:close(Db)
     end.
 
--spec rep_user_ctx({[_]}) -> #user_ctx{}.
-rep_user_ctx({RepDoc}) ->
-    case get_json_value(<<"user_ctx">>, RepDoc) of
-        undefined ->
-            #user_ctx{};
-        {UserCtx} ->
-            #user_ctx{
-                name = get_json_value(<<"name">>, UserCtx, null),
-                roles = get_json_value(<<"roles">>, UserCtx, [])
-            }
-    end.
-
--spec parse_rep_db({[_]} | binary(), [_] | binary(), [_]) -> #httpdb{} | no_return().
-parse_rep_db({Props}, Proxy, Options) ->
-    ProxyParams = parse_proxy_params(Proxy),
-    ProxyURL =
-        case ProxyParams of
-            [] -> undefined;
-            _ -> binary_to_list(Proxy)
-        end,
-    Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)),
-    {AuthProps} = get_value(<<"auth">>, Props, {[]}),
-    {BinHeaders} = get_value(<<"headers">>, Props, {[]}),
-    Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]),
-    DefaultHeaders = (#httpdb{})#httpdb.headers,
-    HttpDb = #httpdb{
-        url = Url,
-        auth_props = AuthProps,
-        headers = lists:ukeymerge(1, Headers, DefaultHeaders),
-        ibrowse_options = lists:keysort(
-            1,
-            [
-                {socket_options, get_value(socket_options, Options)}
-                | ProxyParams ++ ssl_params(Url)
-            ]
-        ),
-        timeout = get_value(connection_timeout, Options),
-        http_connections = get_value(http_connections, Options),
-        retries = get_value(retries, Options),
-        proxy_url = ProxyURL
-    },
-    couch_replicator_utils:normalize_basic_auth(HttpDb);
-parse_rep_db(<<"http://", _/binary>> = Url, Proxy, Options) ->
-    parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options);
-parse_rep_db(<<"https://", _/binary>> = Url, Proxy, Options) ->
-    parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options);
-parse_rep_db(<<_/binary>>, _Proxy, _Options) ->
-    throw({error, local_endpoints_not_supported});
-parse_rep_db(undefined, _Proxy, _Options) ->
-    throw({error, <<"Missing replicator database">>}).
-
--spec maybe_add_trailing_slash(binary() | list()) -> list().
-maybe_add_trailing_slash(Url) when is_binary(Url) ->
-    maybe_add_trailing_slash(?b2l(Url));
-maybe_add_trailing_slash(Url) ->
-    case lists:member($?, Url) of
-        true ->
-            % skip if there are query params
-            Url;
-        false ->
-            case lists:last(Url) of
-                $/ ->
-                    Url;
-                _ ->
-                    Url ++ "/"
-            end
-    end.
-
--spec make_options([_]) -> [_].
-make_options(Props) ->
-    Options0 = lists:ukeysort(1, convert_options(Props)),
-    Options = check_options(Options0),
-    DefWorkers = config:get_integer("replicator", "worker_processes", 4),
-    DefBatchSize = config:get_integer("replicator", "worker_batch_size", 500),
-    DefConns = config:get_integer("replicator", "http_connections", 20),
-    DefTimeout = config:get_integer("replicator", "connection_timeout", 30000),
-    DefRetries = config:get_integer("replicator", "retries_per_request", 5),
-    UseCheckpoints = config:get_boolean("replicator", "use_checkpoints", true),
-    UseBulkGet = config:get_boolean("replicator", "use_bulk_get", true),
-    DefCheckpointInterval = config:get_integer(
-        "replicator",
-        "checkpoint_interval",
-        30000
-    ),
-    {ok, DefSocketOptions} = couch_util:parse_term(
-        config:get(
-            "replicator",
-            "socket_options",
-            "[{keepalive, true}, {nodelay, false}]"
-        )
-    ),
-    lists:ukeymerge(
-        1,
-        Options,
-        lists:keysort(1, [
-            {connection_timeout, DefTimeout},
-            {retries, DefRetries},
-            {http_connections, DefConns},
-            {socket_options, DefSocketOptions},
-            {worker_batch_size, DefBatchSize},
-            {worker_processes, DefWorkers},
-            {use_checkpoints, UseCheckpoints},
-            {use_bulk_get, UseBulkGet},
-            {checkpoint_interval, DefCheckpointInterval}
-        ])
-    ).
-
--spec convert_options([_]) -> [_].
-convert_options([]) ->
-    [];
-convert_options([{<<"cancel">>, V} | _R]) when not is_boolean(V) ->
-    throw({bad_request, <<"parameter `cancel` must be a boolean">>});
-convert_options([{<<"cancel">>, V} | R]) ->
-    [{cancel, V} | convert_options(R)];
-convert_options([{IdOpt, V} | R]) when
-    IdOpt =:= <<"_local_id">>;
-    IdOpt =:= <<"replication_id">>;
-    IdOpt =:= <<"id">>
-->
-    [{id, couch_replicator_ids:convert(V)} | convert_options(R)];
-convert_options([{<<"create_target">>, V} | _R]) when not is_boolean(V) ->
-    throw({bad_request, <<"parameter `create_target` must be a boolean">>});
-convert_options([{<<"create_target">>, V} | R]) ->
-    [{create_target, V} | convert_options(R)];
-convert_options([{<<"create_target_params">>, V} | _R]) when not is_tuple(V) ->
-    throw({bad_request, <<"parameter `create_target_params` must be a JSON object">>});
-convert_options([{<<"create_target_params">>, V} | R]) ->
-    [{create_target_params, V} | convert_options(R)];
-convert_options([{<<"winning_revs_only">>, V} | _R]) when not is_boolean(V) ->
-    throw({bad_request, <<"parameter `winning_revs_only` must be a boolean">>});
-convert_options([{<<"winning_revs_only">>, V} | R]) ->
-    [{winning_revs_only, V} | convert_options(R)];
-convert_options([{<<"continuous">>, V} | _R]) when not is_boolean(V) ->
-    throw({bad_request, <<"parameter `continuous` must be a boolean">>});
-convert_options([{<<"continuous">>, V} | R]) ->
-    [{continuous, V} | convert_options(R)];
-convert_options([{<<"filter">>, V} | R]) ->
-    [{filter, V} | convert_options(R)];
-convert_options([{<<"query_params">>, V} | R]) ->
-    [{query_params, V} | convert_options(R)];
-convert_options([{<<"doc_ids">>, null} | R]) ->
-    convert_options(R);
-convert_options([{<<"doc_ids">>, V} | _R]) when not is_list(V) ->
-    throw({bad_request, <<"parameter `doc_ids` must be an array">>});
-convert_options([{<<"doc_ids">>, V} | R]) ->
-    % Ensure same behaviour as old replicator: accept a list of percent
-    % encoded doc IDs.
-    DocIds = lists:usort([?l2b(couch_httpd:unquote(Id)) || Id <- V]),
-    [{doc_ids, DocIds} | convert_options(R)];
-convert_options([{<<"selector">>, V} | _R]) when not is_tuple(V) ->
-    throw({bad_request, <<"parameter `selector` must be a JSON object">>});
-convert_options([{<<"selector">>, V} | R]) ->
-    [{selector, V} | convert_options(R)];
-convert_options([{<<"worker_processes">>, V} | R]) ->
-    [{worker_processes, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"worker_batch_size">>, V} | R]) ->
-    [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"http_connections">>, V} | R]) ->
-    [{http_connections, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"connection_timeout">>, V} | R]) ->
-    [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"retries_per_request">>, V} | R]) ->
-    [{retries, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"socket_options">>, V} | R]) ->
-    {ok, SocketOptions} = couch_util:parse_term(V),
-    [{socket_options, SocketOptions} | convert_options(R)];
-convert_options([{<<"since_seq">>, V} | R]) ->
-    [{since_seq, V} | convert_options(R)];
-convert_options([{<<"use_checkpoints">>, V} | R]) ->
-    [{use_checkpoints, V} | convert_options(R)];
-convert_options([{<<"use_bulk_get">>, V} | _R]) when not is_boolean(V) ->
-    throw({bad_request, <<"parameter `use_bulk_get` must be a boolean">>});
-convert_options([{<<"use_bulk_get">>, V} | R]) ->
-    [{use_bulk_get, V} | convert_options(R)];
-convert_options([{<<"checkpoint_interval">>, V} | R]) ->
-    [{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)];
-% skip unknown option
-convert_options([_ | R]) ->
-    convert_options(R).
-
--spec check_options([_]) -> [_].
-check_options(Options) ->
-    DocIds = lists:keyfind(doc_ids, 1, Options),
-    Filter = lists:keyfind(filter, 1, Options),
-    Selector = lists:keyfind(selector, 1, Options),
-    case {DocIds, Filter, Selector} of
-        {false, false, false} -> Options;
-        {false, false, _} -> Options;
-        {false, _, false} -> Options;
-        {_, false, false} -> Options;
-        _ -> throw({bad_request, "`doc_ids`,`filter`,`selector` are mutually exclusive"})
-    end.
-
--spec parse_proxy_params(binary() | [_]) -> [_].
-parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
-    parse_proxy_params(?b2l(ProxyUrl));
-parse_proxy_params([]) ->
-    [];
-parse_proxy_params(ProxyUrl) ->
-    #url{
-        host = Host,
-        port = Port,
-        username = User,
-        password = Passwd,
-        protocol = Protocol
-    } = ibrowse_lib:parse_url(ProxyUrl),
-    Params =
-        [
-            {proxy_host, Host},
-            {proxy_port, Port}
-        ] ++
-            case is_list(User) andalso is_list(Passwd) of
-                false ->
-                    [];
-                true ->
-                    [{proxy_user, User}, {proxy_password, Passwd}]
-            end,
-    case Protocol of
-        socks5 ->
-            [proxy_to_socks5(Param) || Param <- Params];
-        _ ->
-            Params
-    end.
-
--spec proxy_to_socks5({atom(), string()}) -> {atom(), string()}.
-proxy_to_socks5({proxy_host, Val}) ->
-    {socks5_host, Val};
-proxy_to_socks5({proxy_port, Val}) ->
-    {socks5_port, Val};
-proxy_to_socks5({proxy_user, Val}) ->
-    {socks5_user, Val};
-proxy_to_socks5({proxy_password, Val}) ->
-    {socks5_password, Val}.
-
--spec ssl_params([_]) -> [_].
-ssl_params(Url) ->
-    case ibrowse_lib:parse_url(Url) of
-        #url{protocol = https} ->
-            Depth = config:get_integer(
-                "replicator",
-                "ssl_certificate_max_depth",
-                3
-            ),
-            VerifyCerts = config:get_boolean(
-                "replicator",
-                "verify_ssl_certificates",
-                false
-            ),
-            CertFile = config:get("replicator", "cert_file", undefined),
-            KeyFile = config:get("replicator", "key_file", undefined),
-            Password = config:get("replicator", "password", undefined),
-            SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts)],
-            SslOpts1 =
-                case CertFile /= undefined andalso KeyFile /= undefined of
-                    true ->
-                        case Password of
-                            undefined ->
-                                [{certfile, CertFile}, {keyfile, KeyFile}] ++ SslOpts;
-                            _ ->
-                                [
-                                    {certfile, CertFile},
-                                    {keyfile, KeyFile},
-                                    {password, Password}
-                                ] ++ SslOpts
-                        end;
-                    false ->
-                        SslOpts
-                end,
-            [{is_ssl, true}, {ssl_options, SslOpts1}];
-        #url{protocol = http} ->
-            []
-    end.
-
--spec ssl_verify_options(true | false) -> [_].
-ssl_verify_options(true) ->
-    CAFile = config:get("replicator", "ssl_trusted_certificates_file"),
-    [{verify, verify_peer}, {cacertfile, CAFile}];
-ssl_verify_options(false) ->
-    [{verify, verify_none}].
-
 -spec before_doc_update(#doc{}, Db :: any(), couch_db:update_type()) -> #doc{}.
 before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db, _UpdateType) ->
     Doc;
+before_doc_update(#doc{} = Doc, _Db, ?REPLICATED_CHANGES) ->
+    % Skip internal replicator updates
+    Doc;
 before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
-    #user_ctx{
-        roles = Roles,
-        name = Name
-    } = couch_db:get_user_ctx(Db),
-    case lists:member(<<"_replicator">>, Roles) of
+    #user_ctx{roles = Roles, name = Name} = couch_db:get_user_ctx(Db),
+    IsReplicator = lists:member(<<"_replicator">>, Roles),
+    Doc1 =
+        case IsReplicator of
+            true -> Doc;
+            false -> before_doc_update_owner(get_value(?OWNER, Body), Name, Db, Doc)
+        end,
+    IsFailed = get_value(<<"_replication_state">>, Body) =:= <<"failed">>,
+    case IsReplicator orelse Doc1#doc.deleted orelse IsFailed of
         true ->
-            Doc;
+            ok;
         false ->
-            case couch_util:get_value(?OWNER, Body) of
-                undefined ->
-                    Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
-                Name ->
-                    Doc;
-                Other ->
-                    case (catch couch_db:check_is_admin(Db)) of
-                        ok when Other =:= null ->
-                            Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
-                        ok ->
-                            Doc;
-                        _ ->
-                            throw(
-                                {forbidden,
-                                    <<"Can't update replication documents", " from other users.">>}
-                            )
-                    end
+            try
+                couch_replicator_parse:parse_rep_doc_without_id(Doc1#doc.body)
+            catch
+                throw:{bad_rep_doc, Error} ->
+                    throw({forbidden, Error})
             end
+    end,
+    Doc1.
+
+before_doc_update_owner(undefined, Name, _Db, #doc{body = {Body}} = Doc) ->
+    Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
+before_doc_update_owner(Name, Name, _Db, #doc{} = Doc) ->
+    Doc;
+before_doc_update_owner(Other, Name, Db, #doc{body = {Body}} = Doc) ->
+    case (catch couch_db:check_is_admin(Db)) of
+        ok when Other =:= null ->
+            Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
+        ok ->
+            Doc;
+        _ ->
+            Err = <<"Can't update replication documents from other users.">>,
+            throw({forbidden, Err})
     end.
 
 -spec after_doc_read(#doc{}, Db :: any()) -> #doc{}.
@@ -707,22 +274,12 @@ after_doc_read(#doc{body = {Body}} = Doc, Db) ->
         ok ->
             Doc;
         _ ->
-            case couch_util:get_value(?OWNER, Body) of
+            case get_value(?OWNER, Body) of
                 Name ->
                     Doc;
                 _Other ->
-                    Source = strip_credentials(
-                        couch_util:get_value(
-                            <<"source">>,
-                            Body
-                        )
-                    ),
-                    Target = strip_credentials(
-                        couch_util:get_value(
-                            <<"target">>,
-                            Body
-                        )
-                    ),
+                    Source = strip_credentials(get_value(<<"source">>, Body)),
+                    Target = strip_credentials(get_value(<<"target">>, Body)),
                     NewBody0 = ?replace(Body, <<"source">>, Source),
                     NewBody = ?replace(NewBody0, <<"target">>, Target),
                     #doc{revs = {Pos, [_ | Revs]}} = Doc,
@@ -765,89 +322,19 @@ error_reason({error, Reason}) ->
 error_reason(Reason) ->
     to_binary(Reason).
 
+to_binary(Val) ->
+    couch_util:to_binary(Val).
+
+get_value(Key, Props) ->
+    couch_util:get_value(Key, Props).
+
+get_json_value(Key, Obj) ->
+    couch_replicator_utils:get_json_value(Key, Obj).
+
 -ifdef(TEST).
 
 -include_lib("couch/include/couch_eunit.hrl").
 
-check_options_pass_values_test() ->
-    ?assertEqual(check_options([]), []),
-    ?assertEqual(check_options([baz, {other, fiz}]), [baz, {other, fiz}]),
-    ?assertEqual(check_options([{doc_ids, x}]), [{doc_ids, x}]),
-    ?assertEqual(check_options([{filter, x}]), [{filter, x}]),
-    ?assertEqual(check_options([{selector, x}]), [{selector, x}]).
-
-check_options_fail_values_test() ->
-    ?assertThrow(
-        {bad_request, _},
-        check_options([{doc_ids, x}, {filter, y}])
-    ),
-    ?assertThrow(
-        {bad_request, _},
-        check_options([{doc_ids, x}, {selector, y}])
-    ),
-    ?assertThrow(
-        {bad_request, _},
-        check_options([{filter, x}, {selector, y}])
-    ),
-    ?assertThrow(
-        {bad_request, _},
-        check_options([{doc_ids, x}, {selector, y}, {filter, z}])
-    ).
-
-check_convert_options_pass_test() ->
-    ?assertEqual([], convert_options([])),
-    ?assertEqual([], convert_options([{<<"random">>, 42}])),
-    ?assertEqual(
-        [{cancel, true}],
-        convert_options([{<<"cancel">>, true}])
-    ),
-    ?assertEqual(
-        [{create_target, true}],
-        convert_options([{<<"create_target">>, true}])
-    ),
-    ?assertEqual(
-        [{winning_revs_only, true}],
-        convert_options([{<<"winning_revs_only">>, true}])
-    ),
-    ?assertEqual(
-        [{continuous, true}],
-        convert_options([{<<"continuous">>, true}])
-    ),
-    ?assertEqual(
-        [{doc_ids, [<<"id">>]}],
-        convert_options([{<<"doc_ids">>, [<<"id">>]}])
-    ),
-    ?assertEqual(
-        [{selector, {key, value}}],
-        convert_options([{<<"selector">>, {key, value}}])
-    ).
-
-check_convert_options_fail_test() ->
-    ?assertThrow(
-        {bad_request, _},
-        convert_options([{<<"cancel">>, <<"true">>}])
-    ),
-    ?assertThrow(
-        {bad_request, _},
-        convert_options([{<<"create_target">>, <<"true">>}])
-    ),
-    ?assertThrow(
-        {bad_request, _},
-        convert_options([{<<"winning_revs_only">>, <<"foo">>}])
-    ),
-    ?assertThrow(
-        {bad_request, _},
-        convert_options([{<<"continuous">>, <<"true">>}])
-    ),
-    ?assertThrow(
-        {bad_request, _},
-        convert_options([{<<"doc_ids">>, not_a_list}])
-    ),
-    ?assertThrow(
-        {bad_request, _},
-        convert_options([{<<"selector">>, [{key, value}]}])
-    ).
-
 check_strip_credentials_test() ->
     [
         ?assertEqual(Expected, strip_credentials(Body))
@@ -879,48 +366,25 @@ check_strip_credentials_test() ->
         ]
     ].
 
-parse_proxy_params_test() ->
-    ?assertEqual(
-        [
-            {proxy_host, "foo.com"},
-            {proxy_port, 443},
-            {proxy_user, "u"},
-            {proxy_password, "p"}
-        ],
-        parse_proxy_params("https://u:p@foo.com")
-    ),
-    ?assertEqual(
-        [
-            {socks5_host, "foo.com"},
-            {socks5_port, 1080},
-            {socks5_user, "u"},
-            {socks5_password, "p"}
-        ],
-        parse_proxy_params("socks5://u:p@foo.com")
-    ).
-
 setup() ->
-    DbName = ?tempdb(),
+    TmpDbName = ?tempdb(),
+    DbName = <<TmpDbName/binary, "/_replicator">>,
     {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
     ok = couch_db:close(Db),
-    create_vdu(DbName),
     DbName.
 
 teardown(DbName) when is_binary(DbName) ->
     couch_server:delete(DbName, [?ADMIN_CTX]),
     ok.
 
-create_vdu(DbName) ->
+create_old_rep_ddoc(DbName) ->
     couch_util:with_db(DbName, fun(Db) ->
-        VduFun = <<"function(newdoc, olddoc, userctx) {throw({'forbidden':'fail'})}">>,
-        Doc = #doc{
-            id = <<"_design/vdu">>,
-            body = {[{<<"validate_doc_update">>, VduFun}]}
-        },
-        {ok, _} = couch_db:update_docs(Db, [Doc])
+        Doc = #doc{id = ?REP_DESIGN_DOC, body = {[]}},
+        {ok, _} = couch_db:update_docs(Db, [Doc]),
+        ok
     end).
 
-update_replicator_doc_with_bad_vdu_test_() ->
+clean_old_replicator_ddoc_test_() ->
     {
         setup,
         fun test_util:start_couch/0,
@@ -930,43 +394,129 @@ update_replicator_doc_with_bad_vdu_test_() ->
             fun setup/0,
             fun teardown/1,
             [
-                fun t_vdu_does_not_crash_on_save/1
+                ?TDEF_FE(t_clean_old_ddoc),
+                ?TDEF_FE(t_old_ddoc_already_cleaned),
+                ?TDEF_FE(t_ddoc_delete_missing_db)
             ]
         }
     }.
 
-t_vdu_does_not_crash_on_save(DbName) ->
-    ?_test(begin
-        Doc = #doc{id = <<"some_id">>, body = {[{<<"foo">>, 42}]}},
-        ?assertEqual({ok, forbidden}, save_rep_doc(DbName, Doc))
-    end).
+t_clean_old_ddoc(DbName) ->
+    ok = create_old_rep_ddoc(DbName),
+    ?assertMatch({ok, #doc{}}, open_rep_doc(DbName, ?REP_DESIGN_DOC)),
+    delete_old_rep_ddoc(DbName),
+    ?assertEqual({not_found, deleted}, open_rep_doc(DbName, ?REP_DESIGN_DOC)).
+
+t_old_ddoc_already_cleaned(DbName) ->
+    ok = delete_old_rep_ddoc(DbName),
+    ?assertEqual({not_found, missing}, open_rep_doc(DbName, ?REP_DESIGN_DOC)).
 
-local_replication_endpoint_error_test_() ->
+t_ddoc_delete_missing_db(_DbName) ->
+    ok = delete_old_rep_ddoc(<<"someotherdb">>).
+
+replicator_can_update_docs_test_() ->
     {
-        foreach,
-        fun() ->
-            meck:expect(
-                config,
-                get,
-                fun(_, _, Default) -> Default end
-            )
-        end,
-        fun(_) -> meck:unload() end,
-        [
-            t_error_on_local_endpoint()
-        ]
+        setup,
+        fun test_util:start_couch/0,
+        fun test_util:stop_couch/1,
+        {
+            foreach,
+            fun setup/0,
+            fun teardown/1,
+            [
+                ?TDEF_FE(t_remove_state_fields),
+                ?TDEF_FE(t_update_doc_completed),
+                ?TDEF_FE(t_update_failed),
+                ?TDEF_FE(t_update_triggered),
+                ?TDEF_FE(t_update_error)
+            ]
+        }
     }.
 
-t_error_on_local_endpoint() ->
-    ?_test(begin
-        RepDoc =
+t_remove_state_fields(DbName) ->
+    DocId = <<"doc1">>,
+    Doc = #doc{
+        id = DocId,
+        body = {[{<<"_replication_state">>, <<"triggered">>}]}
+    },
+    save_rep_doc(DbName, Doc),
+    remove_state_fields(DbName, DocId),
+    {ok, Doc2} = open_rep_doc(DbName, DocId),
+    ?assertEqual({[]}, Doc2#doc.body).
+
+t_update_doc_completed(DbName) ->
+    DocId = <<"doc1">>,
+    Doc = #doc{
+        id = DocId,
+        body = {[{<<"_replication_state">>, <<"triggered">>}]}
+    },
+    save_rep_doc(DbName, Doc),
+    update_doc_completed(DbName, DocId, [{<<"foo">>, 1}]),
+    {ok, Doc2} = open_rep_doc(DbName, DocId),
+    {Props} = Doc2#doc.body,
+    State = get_value(<<"_replication_state">>, Props),
+    ?assertEqual(<<"completed">>, State),
+    Stats = get_value(<<"_replication_stats">>, Props),
+    ?assertEqual({[{<<"foo">>, 1}]}, Stats).
+
+t_update_failed(DbName) ->
+    DocId = <<"doc1">>,
+    Doc = #doc{
+        id = DocId,
+        body =
             {[
-                {<<"_id">>, <<"someid">>},
-                {<<"source">>, <<"localdb">>},
-                {<<"target">>, <<"http://somehost.local/tgt">>}
-            ]},
-        Expect = local_endpoints_not_supported,
-        ?assertThrow({bad_rep_doc, Expect}, parse_rep_doc_without_id(RepDoc))
-    end).
+                {<<"_replication_state">>, <<"triggered">>},
+                {<<"_replication_stats">>, {[{<<"foo">>, 1}]}}
+            ]}
+    },
+    save_rep_doc(DbName, Doc),
+    Error = {error, {foo, bar}},
+    update_failed(DbName, DocId, Error),
+    {ok, Doc2} = open_rep_doc(DbName, DocId),
+    {Props} = Doc2#doc.body,
+    State = get_value(<<"_replication_state">>, Props),
+    ?assertEqual(<<"failed">>, State),
+    Reason = get_value(<<"_replication_state_reason">>, Props),
+    ?assertEqual(<<"{foo,bar}">>, Reason),
+    % stats should have been cleared
+    Stats = get_value(<<"_replication_stats">>, Props),
+    ?assertEqual(undefined, Stats).
+
+t_update_triggered(DbName) ->
+    DocId = <<"doc1">>,
+    Doc = #doc{
+        id = DocId,
+        body = {[{}]}
+    },
+    save_rep_doc(DbName, Doc),
+    Rep = #rep{db_name = DbName, doc_id = DocId},
+    update_triggered(Rep, {"123", "+continuous"}),
+    {ok, Doc2} = open_rep_doc(DbName, DocId),
+    {Props} = Doc2#doc.body,
+    State = get_value(<<"_replication_state">>, Props),
+    ?assertEqual(<<"triggered">>, State),
+    Stats = get_value(<<"_replication_stats">>, Props),
+    ?assertEqual(undefined, Stats),
+    RepId = get_value(<<"_replication_id">>, Props),
+    ?assertEqual(<<"123+continuous">>, RepId).
+
+t_update_error(DbName) ->
+    DocId = <<"doc1">>,
+    Doc = #doc{
+        id = DocId,
+        body = {[{}]}
+    },
+    save_rep_doc(DbName, Doc),
+    Rep = #rep{db_name = DbName, doc_id = DocId, id = null},
+    Error = {error, foo},
+    update_error(Rep, Error),
+    {ok, Doc2} = open_rep_doc(DbName, DocId),
+    {Props} = Doc2#doc.body,
+    State = get_value(<<"_replication_state">>, Props),
+    ?assertEqual(<<"error">>, State),
+    Stats = get_value(<<"_replication_stats">>, Props),
+    ?assertEqual(undefined, Stats),
+    RepId = get_value(<<"_replication_id">>, Props),
+    ?assertEqual(null, RepId).
 
 -endif.
diff --git a/src/couch_replicator/src/couch_replicator_ids.erl b/src/couch_replicator/src/couch_replicator_ids.erl
index 939070b95..86fe1f26e 100644
--- a/src/couch_replicator/src/couch_replicator_ids.erl
+++ b/src/couch_replicator/src/couch_replicator_ids.erl
@@ -196,10 +196,10 @@ winning_revs_generates_new_id(_) ->
         {<<"source">>, <<"http://foo.example.bar">>},
         {<<"target">>, <<"http://bar.example.foo">>}
     ],
-    Rep1 = couch_replicator_docs:parse_rep_doc_without_id({RepDoc1}),
+    Rep1 = couch_replicator_parse:parse_rep_doc_without_id({RepDoc1}),
     Id1 = replication_id(Rep1),
     RepDoc2 = RepDoc1 ++ [{<<"winning_revs_only">>, true}],
-    Rep2 = couch_replicator_docs:parse_rep_doc_without_id({RepDoc2}),
+    Rep2 = couch_replicator_parse:parse_rep_doc_without_id({RepDoc2}),
     Id2 = replication_id(Rep2),
     ?assertNotEqual(Id1, Id2).
 
@@ -208,10 +208,10 @@ winning_revs_false_same_as_undefined(_) ->
         {<<"source">>, <<"http://foo.example.bar">>},
         {<<"target">>, <<"http://bar.example.foo">>}
     ],
-    Rep1 = couch_replicator_docs:parse_rep_doc_without_id({RepDoc1}),
+    Rep1 = couch_replicator_parse:parse_rep_doc_without_id({RepDoc1}),
     Id1 = replication_id(Rep1),
     RepDoc2 = RepDoc1 ++ [{<<"winning_revs_only">>, false}],
-    Rep2 = couch_replicator_docs:parse_rep_doc_without_id({RepDoc2}),
+    Rep2 = couch_replicator_parse:parse_rep_doc_without_id({RepDoc2}),
     Id2 = replication_id(Rep2),
     ?assertEqual(Id1, Id2).
 
diff --git a/src/couch_replicator/src/couch_replicator_js_functions.hrl b/src/couch_replicator/src/couch_replicator_js_functions.hrl
deleted file mode 100644
index 4f4369075..000000000
--- a/src/couch_replicator/src/couch_replicator_js_functions.hrl
+++ /dev/null
@@ -1,183 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License.  You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--define(REP_DB_DOC_VALIDATE_FUN, <<"
-    function(newDoc, oldDoc, userCtx) {
-        function reportError(error_msg) {
-            log('Error writing document `' + newDoc._id +
-                '\\' to the replicator database: ' + error_msg);
-            throw({forbidden: error_msg});
-        }
-
-        function validateEndpoint(endpoint, fieldName) {
-            if ((typeof endpoint !== 'string') &&
-                ((typeof endpoint !== 'object') || (endpoint === null))) {
-
-                reportError('The `' + fieldName + '\\' property must exist' +
-                    ' and be either a string or an object.');
-            }
-
-            if (typeof endpoint === 'object') {
-                if ((typeof endpoint.url !== 'string') || !endpoint.url) {
-                    reportError('The url property must exist in the `' +
-                        fieldName + '\\' field and must be a non-empty string.');
-                }
-
-                if ((typeof endpoint.auth !== 'undefined') &&
-                    ((typeof endpoint.auth !== 'object') ||
-                        endpoint.auth === null)) {
-
-                    reportError('`' + fieldName +
-                        '.auth\\' must be a non-null object.');
-                }
-
-                if ((typeof endpoint.headers !== 'undefined') &&
-                    ((typeof endpoint.headers !== 'object') ||
-                        endpoint.headers === null)) {
-
-                    reportError('`' + fieldName +
-                        '.headers\\' must be a non-null object.');
-                }
-            }
-        }
-
-        var isReplicator = (userCtx.roles.indexOf('_replicator') >= 0);
-        var isAdmin = (userCtx.roles.indexOf('_admin') >= 0);
-
-        if (isReplicator) {
-            // Always let replicator update the replication document
-            return;
-        }
-
-        if (newDoc._replication_state === 'failed') {
-            // Skip validation in case when we update the document with the
-            // failed state. In this case it might be malformed. However,
-            // replicator will not pay attention to failed documents so this
-            // is safe.
-            return;
-        }
-
-        if (!newDoc._deleted) {
-            validateEndpoint(newDoc.source, 'source');
-            validateEndpoint(newDoc.target, 'target');
-
-            if ((typeof newDoc.create_target !== 'undefined') &&
-                (typeof newDoc.create_target !== 'boolean')) {
-
-                reportError('The `create_target\\' field must be a boolean.');
-            }
-
-            if ((typeof newDoc.winning_revs_only !== 'undefined') &&
-                (typeof newDoc.winning_revs_only !== 'boolean')) {
-
-                reportError('The `winning_revs_only\\' field must be a boolean.');
-            }
-
-            if ((typeof newDoc.continuous !== 'undefined') &&
-                (typeof newDoc.continuous !== 'boolean')) {
-
-                reportError('The `continuous\\' field must be a boolean.');
-            }
-
-            if ((typeof newDoc.doc_ids !== 'undefined') &&
-                !isArray(newDoc.doc_ids)) {
-
-                reportError('The `doc_ids\\' field must be an array of strings.');
-            }
-
-            if ((typeof newDoc.selector !== 'undefined') &&
-                (typeof newDoc.selector !== 'object')) {
-
-                reportError('The `selector\\' field must be an object.');
-            }
-
-            if ((typeof newDoc.filter !== 'undefined') &&
-                ((typeof newDoc.filter !== 'string') || !newDoc.filter)) {
-
-                reportError('The `filter\\' field must be a non-empty string.');
-            }
-
-            if ((typeof newDoc.doc_ids !== 'undefined') &&
-                (typeof newDoc.selector !== 'undefined')) {
-
-                reportError('`doc_ids\\' field is incompatible with `selector\\'.');
-            }
-
-            if ( ((typeof newDoc.doc_ids !== 'undefined') ||
-                  (typeof newDoc.selector !== 'undefined')) &&
-                 (typeof newDoc.filter !== 'undefined') ) {
-
-                reportError('`filter\\' field is incompatible with `selector\\' and `doc_ids\\'.');
-            }
-
-            if ((typeof newDoc.query_params !== 'undefined') &&
-                ((typeof newDoc.query_params !== 'object') ||
-                    newDoc.query_params === null)) {
-
-                reportError('The `query_params\\' field must be an object.');
-            }
-
-            if (newDoc.user_ctx) {
-                var user_ctx = newDoc.user_ctx;
-
-                if ((typeof user_ctx !== 'object') || (user_ctx === null)) {
-                    reportError('The `user_ctx\\' property must be a ' +
-                        'non-null object.');
-                }
-
-                if (!(user_ctx.name === null ||
-                    (typeof user_ctx.name === 'undefined') ||
-                    ((typeof user_ctx.name === 'string') &&
-                        user_ctx.name.length > 0))) {
-
-                    reportError('The `user_ctx.name\\' property must be a ' +
-                        'non-empty string or null.');
-                }
-
-                if (!isAdmin && (user_ctx.name !== userCtx.name)) {
-                    reportError('The given `user_ctx.name\\' is not valid');
-                }
-
-                if (user_ctx.roles && !isArray(user_ctx.roles)) {
-                    reportError('The `user_ctx.roles\\' property must be ' +
-                        'an array of strings.');
-                }
-
-                if (!isAdmin && user_ctx.roles) {
-                    for (var i = 0; i < user_ctx.roles.length; i++) {
-                        var role = user_ctx.roles[i];
-
-                        if (typeof role !== 'string' || role.length === 0) {
-                            reportError('Roles must be non-empty strings.');
-                        }
-                        if (userCtx.roles.indexOf(role) === -1) {
-                            reportError('Invalid role (`' + role +
-                                '\\') in the `user_ctx\\'');
-                        }
-                    }
-                }
-            } else {
-                if (!isAdmin) {
-                    reportError('The `user_ctx\\' property is missing (it is ' +
-                       'optional for admins only).');
-                }
-            }
-        } else {
-            if (!isAdmin) {
-                if (!oldDoc.user_ctx || (oldDoc.user_ctx.name !== userCtx.name)) {
-                    reportError('Replication documents can only be deleted by ' +
-                        'admins or by the users who created them.');
-                }
-            }
-        }
-    }
-">>).
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_parse.erl
similarity index 51%
copy from src/couch_replicator/src/couch_replicator_docs.erl
copy to src/couch_replicator/src/couch_replicator_parse.erl
index a60f1a1e1..2359bbd2f 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_parse.erl
@@ -10,188 +10,53 @@
 % License for the specific language governing permissions and limitations under
 % the License.
 
--module(couch_replicator_docs).
+-module(couch_replicator_parse).
 
 -export([
     parse_rep_doc/1,
     parse_rep_doc/2,
     parse_rep_db/3,
     parse_rep_doc_without_id/1,
-    parse_rep_doc_without_id/2,
-    before_doc_update/3,
-    after_doc_read/2,
-    ensure_rep_ddoc_exists/1,
-    ensure_cluster_rep_ddoc_exists/1,
-    remove_state_fields/2,
-    update_doc_completed/3,
-    update_failed/3,
-    update_rep_id/1,
-    update_triggered/2,
-    update_error/2
+    parse_rep_doc_without_id/2
 ]).
 
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("ibrowse/include/ibrowse.hrl").
--include_lib("mem3/include/mem3.hrl").
 -include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
 -include("couch_replicator.hrl").
--include("couch_replicator_js_functions.hrl").
 
--import(couch_util, [
-    get_value/2,
-    get_value/3,
-    to_binary/1
+-define(DEFAULT_SOCK_OPTS, [{keepalive, true}, {nodelay, false}]).
+-define(VALID_SOCK_OPTS, [
+    buffer,
+    keepalive,
+    nodelay,
+    priority,
+    recbuf,
+    sndbuf
 ]).
+-define(VALID_PROTOCOLS, #{
+    endpoint => [http, https],
+    proxy => [http, https, socks5]
+}).
 
--import(couch_replicator_utils, [
-    get_json_value/2,
-    get_json_value/3
-]).
-
--define(REP_DB_NAME, <<"_replicator">>).
--define(REP_DESIGN_DOC, <<"_design/_replicator">>).
--define(OWNER, <<"owner">>).
--define(CTX, {user_ctx, #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]}}).
--define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
-
-remove_state_fields(DbName, DocId) ->
-    update_rep_doc(DbName, DocId, [
-        {<<"_replication_state">>, undefined},
-        {<<"_replication_state_time">>, undefined},
-        {<<"_replication_state_reason">>, undefined},
-        {<<"_replication_id">>, undefined},
-        {<<"_replication_stats">>, undefined}
-    ]).
-
--spec update_doc_completed(binary(), binary(), [_]) -> any().
-update_doc_completed(DbName, DocId, Stats) ->
-    update_rep_doc(DbName, DocId, [
-        {<<"_replication_state">>, <<"completed">>},
-        {<<"_replication_state_reason">>, undefined},
-        {<<"_replication_stats">>, {Stats}}
-    ]),
-    couch_stats:increment_counter([
-        couch_replicator,
-        docs,
-        completed_state_updates
-    ]).
-
--spec update_failed(binary(), binary(), any()) -> any().
-update_failed(DbName, DocId, Error) ->
-    Reason = error_reason(Error),
-    couch_log:error(
-        "Error processing replication doc `~s` from `~s`: ~s",
-        [DocId, DbName, Reason]
-    ),
-    update_rep_doc(DbName, DocId, [
-        {<<"_replication_state">>, <<"failed">>},
-        {<<"_replication_stats">>, undefined},
-        {<<"_replication_state_reason">>, Reason}
-    ]),
-    couch_stats:increment_counter([
-        couch_replicator,
-        docs,
-        failed_state_updates
-    ]).
-
--spec update_triggered(#rep{}, rep_id()) -> ok.
-update_triggered(Rep, {Base, Ext}) ->
-    #rep{
-        db_name = DbName,
-        doc_id = DocId
-    } = Rep,
-    update_rep_doc(DbName, DocId, [
-        {<<"_replication_state">>, <<"triggered">>},
-        {<<"_replication_state_reason">>, undefined},
-        {<<"_replication_id">>, iolist_to_binary([Base, Ext])},
-        {<<"_replication_stats">>, undefined}
-    ]),
-    ok.
-
--spec update_error(#rep{}, any()) -> ok.
-update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) ->
-    Reason = error_reason(Error),
-    BinRepId =
-        case RepId of
-            {Base, Ext} ->
-                iolist_to_binary([Base, Ext]);
-            _Other ->
-                null
-        end,
-    update_rep_doc(DbName, DocId, [
-        {<<"_replication_state">>, <<"error">>},
-        {<<"_replication_state_reason">>, Reason},
-        {<<"_replication_stats">>, undefined},
-        {<<"_replication_id">>, BinRepId}
-    ]),
-    ok.
-
--spec ensure_rep_ddoc_exists(binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb) ->
-    case mem3:belongs(RepDb, ?REP_DESIGN_DOC) of
-        true ->
-            ensure_rep_ddoc_exists(RepDb, ?REP_DESIGN_DOC);
-        false ->
-            ok
-    end.
-
--spec ensure_rep_ddoc_exists(binary(), binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb, DDocId) ->
-    case open_rep_doc(RepDb, DDocId) of
-        {not_found, no_db_file} ->
-            %% database was deleted.
-            ok;
-        {not_found, _Reason} ->
-            DocProps = replication_design_doc_props(DDocId),
-            DDoc = couch_doc:from_json_obj({DocProps}),
-            couch_log:notice("creating replicator ddoc ~p", [RepDb]),
-            {ok, _Rev} = save_rep_doc(RepDb, DDoc);
-        {ok, Doc} ->
-            Latest = replication_design_doc_props(DDocId),
-            {Props0} = couch_doc:to_json_obj(Doc, []),
-            {value, {_, Rev}, Props} = lists:keytake(<<"_rev">>, 1, Props0),
-            case compare_ejson({Props}, {Latest}) of
-                true ->
-                    ok;
-                false ->
-                    LatestWithRev = [{<<"_rev">>, Rev} | Latest],
-                    DDoc = couch_doc:from_json_obj({LatestWithRev}),
-                    couch_log:notice("updating replicator ddoc ~p", [RepDb]),
-                    try
-                        {ok, _} = save_rep_doc(RepDb, DDoc)
-                    catch
-                        throw:conflict ->
-                            %% ignore, we'll retry next time
-                            ok
-                    end
-            end
-    end,
-    ok.
-
--spec ensure_cluster_rep_ddoc_exists(binary()) -> ok.
-ensure_cluster_rep_ddoc_exists(RepDb) ->
-    DDocId = ?REP_DESIGN_DOC,
-    [#shard{name = DbShard} | _] = mem3:shards(RepDb, DDocId),
-    ensure_rep_ddoc_exists(DbShard, DDocId).
-
--spec compare_ejson({[_]}, {[_]}) -> boolean().
-compare_ejson(EJson1, EJson2) ->
-    EjsonSorted1 = couch_replicator_filters:ejsort(EJson1),
-    EjsonSorted2 = couch_replicator_filters:ejsort(EJson2),
-    EjsonSorted1 == EjsonSorted2.
-
--spec replication_design_doc_props(binary()) -> [_].
-replication_design_doc_props(DDocId) ->
+%% erlfmt-ignore
+default_options() ->
     [
-        {<<"_id">>, DDocId},
-        {<<"language">>, <<"javascript">>},
-        {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
+        {connection_timeout,  cfg_int("connection_timeout", 30000)},
+        {retries,             cfg_int("retries_per_request", 5)},
+        {http_connections,    cfg_int("http_connections", 20)},
+        {worker_batch_size,   cfg_int("worker_batch_size", 500)},
+        {worker_processes,    cfg_int("worker_processes", 4)},
+        {checkpoint_interval, cfg_int("checkpoint_interval", 30000)},
+        {use_checkpoints,     cfg_boolean("use_checkpoints", true)},
+        {use_bulk_get,        cfg_boolean("use_bulk_get", true)},
+        {socket_options,      cfg_sock_opts()}
     ].
 
 % Note: parse_rep_doc can handle filtered replications. During parsing of the
 % replication doc it will make possibly remote http requests to the source
 % database. If failure or parsing of filter docs fails, parse_doc throws a
-% {filter_fetch_error, Error} excation. This exception should be considered
+% {filter_fetch_error, Error} exception. This exception should be considered
 % transient in respect to the contents of the document itself, since it depends
 % on netowrk availability of the source db and other factors.
 -spec parse_rep_doc({[_]}) -> #rep{}.
@@ -304,88 +169,6 @@ update_rep_id(Rep) ->
     RepId = couch_replicator_ids:replication_id(Rep),
     Rep#rep{id = RepId}.
 
-update_rep_doc(RepDbName, RepDocId, KVs) ->
-    update_rep_doc(RepDbName, RepDocId, KVs, 1).
-
-update_rep_doc(RepDbName, RepDocId, KVs, Wait) when is_binary(RepDocId) ->
-    try
-        case open_rep_doc(RepDbName, RepDocId) of
-            {ok, LastRepDoc} ->
-                update_rep_doc(RepDbName, LastRepDoc, KVs, Wait * 2);
-            _ ->
-                ok
-        end
-    catch
-        throw:conflict ->
-            Msg = "Conflict when updating replication doc `~s`. Retrying.",
-            couch_log:error(Msg, [RepDocId]),
-            ok = timer:sleep(couch_rand:uniform(erlang:min(128, Wait)) * 100),
-            update_rep_doc(RepDbName, RepDocId, KVs, Wait * 2)
-    end;
-update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) ->
-    NewRepDocBody = lists:foldl(
-        fun
-            ({K, undefined}, Body) ->
-                lists:keydelete(K, 1, Body);
-            ({<<"_replication_state">> = K, State} = KV, Body) ->
-                case get_json_value(K, Body) of
-                    State ->
-                        Body;
-                    _ ->
-                        Body1 = lists:keystore(K, 1, Body, KV),
-                        Timestamp = couch_replicator_utils:iso8601(os:timestamp()),
-                        lists:keystore(
-                            <<"_replication_state_time">>,
-                            1,
-                            Body1,
-                            {<<"_replication_state_time">>, Timestamp}
-                        )
-                end;
-            ({K, _V} = KV, Body) ->
-                lists:keystore(K, 1, Body, KV)
-        end,
-        RepDocBody,
-        KVs
-    ),
-    case NewRepDocBody of
-        RepDocBody ->
-            ok;
-        _ ->
-            % Might not succeed - when the replication doc is deleted right
-            % before this update (not an error, ignore).
-            save_rep_doc(RepDbName, RepDoc#doc{body = {NewRepDocBody}})
-    end.
-
-open_rep_doc(DbName, DocId) ->
-    ioq:maybe_set_io_priority({system, DbName}),
-    case couch_db:open_int(DbName, [?CTX, sys_db]) of
-        {ok, Db} ->
-            try
-                couch_db:open_doc(Db, DocId, [ejson_body])
-            after
-                couch_db:close(Db)
-            end;
-        Else ->
-            Else
-    end.
-
-save_rep_doc(DbName, Doc) ->
-    ioq:maybe_set_io_priority({system, DbName}),
-    {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
-    try
-        couch_db:update_doc(Db, Doc, [])
-    catch
-        % User can accidentally write a VDU which prevents _replicator from
-        % updating replication documents. Avoid crashing replicator and thus
-        % preventing all other replication jobs on the node from running.
-        throw:{forbidden, Reason} ->
-            Msg = "~p VDU function preventing doc update to ~s ~s ~p",
-            couch_log:error(Msg, [?MODULE, DbName, Doc#doc.id, Reason]),
-            {ok, forbidden}
-    after
-        couch_db:close(Db)
-    end.
-
 -spec rep_user_ctx({[_]}) -> #user_ctx{}.
 rep_user_ctx({RepDoc}) ->
     case get_json_value(<<"user_ctx">>, RepDoc) of
@@ -400,13 +183,15 @@ rep_user_ctx({RepDoc}) ->
 
 -spec parse_rep_db({[_]} | binary(), [_] | binary(), [_]) -> #httpdb{} | no_return().
 parse_rep_db({Props}, Proxy, Options) ->
+    Url0 = get_value(<<"url">>, Props),
+    ok = check_url(Url0, endpoint),
+    Url = maybe_add_trailing_slash(Url0),
     ProxyParams = parse_proxy_params(Proxy),
     ProxyURL =
         case ProxyParams of
             [] -> undefined;
             _ -> binary_to_list(Proxy)
         end,
-    Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)),
     {AuthProps} = get_value(<<"auth">>, Props, {[]}),
     {BinHeaders} = get_value(<<"headers">>, Props, {[]}),
     Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]),
@@ -437,6 +222,32 @@ parse_rep_db(<<_/binary>>, _Proxy, _Options) ->
 parse_rep_db(undefined, _Proxy, _Options) ->
     throw({error, <<"Missing replicator database">>}).
 
+check_url(<<_/binary>> = Url, Type) when is_atom(Type) ->
+    case ibrowse_lib:parse_url(?b2l(Url)) of
+        #url{protocol = Protocol} ->
+            check_protocol(Protocol, Type);
+        {error, _} ->
+            BinType = atom_to_binary(Type),
+            throw({error, <<BinType/binary, " has an invalid url">>})
+    end;
+check_url(_, Type) when is_atom(Type) ->
+    BinType = atom_to_binary(Type),
+    throw({error, <<BinType/binary, " has an invalid url">>}).
+
+check_protocol(Protocol, Type) ->
+    CfgName = "valid_" ++ atom_to_list(Type) ++ "_protocols",
+    Allowed = cfg_atoms(CfgName, maps:get(Type, ?VALID_PROTOCOLS)),
+    check_protocol(Protocol, Type, Allowed).
+
+check_protocol(Protocol, Type, Allowed) ->
+    case lists:member(Protocol, Allowed) of
+        true ->
+            ok;
+        false ->
+            BinType = atom_to_binary(Type),
+            throw({error, <<BinType/binary, " has an invalid url">>})
+    end.
+
 -spec maybe_add_trailing_slash(binary() | list()) -> list().
 maybe_add_trailing_slash(Url) when is_binary(Url) ->
     maybe_add_trailing_slash(?b2l(Url));
@@ -458,40 +269,44 @@ maybe_add_trailing_slash(Url) ->
 make_options(Props) ->
     Options0 = lists:ukeysort(1, convert_options(Props)),
     Options = check_options(Options0),
-    DefWorkers = config:get_integer("replicator", "worker_processes", 4),
-    DefBatchSize = config:get_integer("replicator", "worker_batch_size", 500),
-    DefConns = config:get_integer("replicator", "http_connections", 20),
-    DefTimeout = config:get_integer("replicator", "connection_timeout", 30000),
-    DefRetries = config:get_integer("replicator", "retries_per_request", 5),
-    UseCheckpoints = config:get_boolean("replicator", "use_checkpoints", true),
-    UseBulkGet = config:get_boolean("replicator", "use_bulk_get", true),
-    DefCheckpointInterval = config:get_integer(
-        "replicator",
-        "checkpoint_interval",
-        30000
-    ),
-    {ok, DefSocketOptions} = couch_util:parse_term(
-        config:get(
-            "replicator",
-            "socket_options",
-            "[{keepalive, true}, {nodelay, false}]"
-        )
-    ),
-    lists:ukeymerge(
-        1,
-        Options,
-        lists:keysort(1, [
-            {connection_timeout, DefTimeout},
-            {retries, DefRetries},
-            {http_connections, DefConns},
-            {socket_options, DefSocketOptions},
-            {worker_batch_size, DefBatchSize},
-            {worker_processes, DefWorkers},
-            {use_checkpoints, UseCheckpoints},
-            {use_bulk_get, UseBulkGet},
-            {checkpoint_interval, DefCheckpointInterval}
-        ])
-    ).
+    Defaults = lists:keysort(1, default_options()),
+    lists:ukeymerge(1, Options, Defaults).
+
+cfg_int(Var, Default) ->
+    config:get_integer("replicator", Var, Default).
+
+cfg_boolean(Var, Default) ->
+    config:get_boolean("replicator", Var, Default).
+
+cfg_atoms(Cfg, Default) ->
+    case cfg(Cfg) of
+        undefined ->
+            Default;
+        V when is_list(V) ->
+            [list_to_atom(string:strip(S)) || S <- string:split(V, ",")]
+    end.
+
+cfg_sock_opts() ->
+    CfgTerm = cfg("socket_options"),
+    parse_sock_opts(CfgTerm, ?DEFAULT_SOCK_OPTS, ?VALID_SOCK_OPTS).
+
+cfg(Var) ->
+    config:get("replicator", Var).
+
+parse_sock_opts(undefined, Defaults, _) ->
+    Defaults;
+parse_sock_opts(Term, _Defaults, ValidOpts) ->
+    SocketOptions =
+        case couch_util:parse_term(Term) of
+            {ok, Opts} -> Opts;
+            {error, _Error} -> []
+        end,
+    Fun = fun({K, _}) -> lists:member(K, ValidOpts) end,
+    lists:filtermap(Fun, SocketOptions).
+
+sock_opts(CfgTerm) ->
+    ValidOpts = cfg_atoms("valid_socket_options", ?VALID_SOCK_OPTS),
+    parse_sock_opts(CfgTerm, ?DEFAULT_SOCK_OPTS, ValidOpts).
 
 -spec convert_options([_]) -> [_].
 convert_options([]) ->
@@ -524,6 +339,8 @@ convert_options([{<<"continuous">>, V} | R]) ->
     [{continuous, V} | convert_options(R)];
 convert_options([{<<"filter">>, V} | R]) ->
     [{filter, V} | convert_options(R)];
+convert_options([{<<"query_params">>, V} | _R]) when not is_tuple(V) ->
+    throw({bad_request, <<"parameter `query_params` must be an object">>});
 convert_options([{<<"query_params">>, V} | R]) ->
     [{query_params, V} | convert_options(R)];
 convert_options([{<<"doc_ids">>, null} | R]) ->
@@ -550,8 +367,7 @@ convert_options([{<<"connection_timeout">>, V} | R]) ->
 convert_options([{<<"retries_per_request">>, V} | R]) ->
     [{retries, couch_util:to_integer(V)} | convert_options(R)];
 convert_options([{<<"socket_options">>, V} | R]) ->
-    {ok, SocketOptions} = couch_util:parse_term(V),
-    [{socket_options, SocketOptions} | convert_options(R)];
+    [{socket_options, sock_opts(V)} | convert_options(R)];
 convert_options([{<<"since_seq">>, V} | R]) ->
     [{since_seq, V} | convert_options(R)];
 convert_options([{<<"use_checkpoints">>, V} | R]) ->
@@ -584,7 +400,7 @@ parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
     parse_proxy_params(?b2l(ProxyUrl));
 parse_proxy_params([]) ->
     [];
-parse_proxy_params(ProxyUrl) ->
+parse_proxy_params(ProxyUrl) when is_list(ProxyUrl) ->
     #url{
         host = Host,
         port = Port,
@@ -603,12 +419,15 @@ parse_proxy_params(ProxyUrl) ->
                 true ->
                     [{proxy_user, User}, {proxy_password, Passwd}]
             end,
+    ok = check_protocol(Protocol, proxy),
     case Protocol of
         socks5 ->
             [proxy_to_socks5(Param) || Param <- Params];
         _ ->
             Params
-    end.
+    end;
+parse_proxy_params(_) ->
+    throw({error, <<"Invalid proxy url">>}).
 
 -spec proxy_to_socks5({atom(), string()}) -> {atom(), string()}.
 proxy_to_socks5({proxy_host, Val}) ->
@@ -624,19 +443,11 @@ proxy_to_socks5({proxy_password, Val}) ->
 ssl_params(Url) ->
     case ibrowse_lib:parse_url(Url) of
         #url{protocol = https} ->
-            Depth = config:get_integer(
-                "replicator",
-                "ssl_certificate_max_depth",
-                3
-            ),
-            VerifyCerts = config:get_boolean(
-                "replicator",
-                "verify_ssl_certificates",
-                false
-            ),
-            CertFile = config:get("replicator", "cert_file", undefined),
-            KeyFile = config:get("replicator", "key_file", undefined),
-            Password = config:get("replicator", "password", undefined),
+            Depth = cfg_int("ssl_certificate_max_depth", 3),
+            VerifyCerts = cfg_boolean("verify_ssl_certificates", false),
+            CertFile = cfg("cert_file"),
+            KeyFile = cfg("key_file"),
+            Password = cfg("password"),
             SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts)],
             SslOpts1 =
                 case CertFile /= undefined andalso KeyFile /= undefined of
@@ -661,109 +472,25 @@ ssl_params(Url) ->
 
 -spec ssl_verify_options(true | false) -> [_].
 ssl_verify_options(true) ->
-    CAFile = config:get("replicator", "ssl_trusted_certificates_file"),
+    CAFile = cfg("ssl_trusted_certificates_file"),
     [{verify, verify_peer}, {cacertfile, CAFile}];
 ssl_verify_options(false) ->
     [{verify, verify_none}].
 
--spec before_doc_update(#doc{}, Db :: any(), couch_db:update_type()) -> #doc{}.
-before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db, _UpdateType) ->
-    Doc;
-before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
-    #user_ctx{
-        roles = Roles,
-        name = Name
-    } = couch_db:get_user_ctx(Db),
-    case lists:member(<<"_replicator">>, Roles) of
-        true ->
-            Doc;
-        false ->
-            case couch_util:get_value(?OWNER, Body) of
-                undefined ->
-                    Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
-                Name ->
-                    Doc;
-                Other ->
-                    case (catch couch_db:check_is_admin(Db)) of
-                        ok when Other =:= null ->
-                            Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
-                        ok ->
-                            Doc;
-                        _ ->
-                            throw(
-                                {forbidden,
-                                    <<"Can't update replication documents", " from other users.">>}
-                            )
-                    end
-            end
-    end.
+get_value(Key, Props) ->
+    couch_util:get_value(Key, Props).
 
--spec after_doc_read(#doc{}, Db :: any()) -> #doc{}.
-after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
-    Doc;
-after_doc_read(#doc{body = {Body}} = Doc, Db) ->
-    #user_ctx{name = Name} = couch_db:get_user_ctx(Db),
-    case (catch couch_db:check_is_admin(Db)) of
-        ok ->
-            Doc;
-        _ ->
-            case couch_util:get_value(?OWNER, Body) of
-                Name ->
-                    Doc;
-                _Other ->
-                    Source = strip_credentials(
-                        couch_util:get_value(
-                            <<"source">>,
-                            Body
-                        )
-                    ),
-                    Target = strip_credentials(
-                        couch_util:get_value(
-                            <<"target">>,
-                            Body
-                        )
-                    ),
-                    NewBody0 = ?replace(Body, <<"source">>, Source),
-                    NewBody = ?replace(NewBody0, <<"target">>, Target),
-                    #doc{revs = {Pos, [_ | Revs]}} = Doc,
-                    NewDoc = Doc#doc{body = {NewBody}, revs = {Pos - 1, Revs}},
-                    NewRevId = couch_db:new_revid(NewDoc),
-                    NewDoc#doc{revs = {Pos, [NewRevId | Revs]}}
-            end
-    end.
+get_value(Key, Props, Default) ->
+    couch_util:get_value(Key, Props, Default).
 
--spec strip_credentials
-    (undefined) -> undefined;
-    (binary()) -> binary();
-    ({[_]}) -> {[_]}.
-strip_credentials(undefined) ->
-    undefined;
-strip_credentials(Url) when is_binary(Url) ->
-    re:replace(
-        Url,
-        "http(s)?://(?:[^:]+):[^@]+@(.*)$",
-        "http\\1://\\2",
-        [{return, binary}]
-    );
-strip_credentials({Props0}) ->
-    Props1 = lists:keydelete(<<"headers">>, 1, Props0),
-    % Strip "auth" just like headers, for replication plugins it can be a place
-    % to stash credential that are not necessarily in headers
-    Props2 = lists:keydelete(<<"auth">>, 1, Props1),
-    {Props2}.
-
-error_reason({shutdown, Error}) ->
-    error_reason(Error);
-error_reason({bad_rep_doc, Reason}) ->
-    to_binary(Reason);
-error_reason({error, {Error, Reason}}) when
-    is_atom(Error), is_binary(Reason)
-->
-    to_binary(io_lib:format("~s: ~s", [Error, Reason]));
-error_reason({error, Reason}) ->
-    to_binary(Reason);
-error_reason(Reason) ->
-    to_binary(Reason).
+to_binary(Val) ->
+    couch_util:to_binary(Val).
+
+get_json_value(Key, Obj) ->
+    couch_replicator_utils:get_json_value(Key, Obj).
+
+get_json_value(Key, Obj, Default) ->
+    couch_replicator_util:get_json_value(Key, Obj, Default).
 
 -ifdef(TEST).
 
@@ -820,6 +547,10 @@ check_convert_options_pass_test() ->
     ?assertEqual(
         [{selector, {key, value}}],
         convert_options([{<<"selector">>, {key, value}}])
+    ),
+    ?assertEqual(
+        [{query_params, {[{<<"x">>, 1}]}}],
+        convert_options([{<<"query_params">>, {[{<<"x">>, 1}]}}])
     ).
 
 check_convert_options_fail_test() ->
@@ -846,40 +577,43 @@ check_convert_options_fail_test() ->
     ?assertThrow(
         {bad_request, _},
         convert_options([{<<"selector">>, [{key, value}]}])
+    ),
+    ?assertThrow(
+        {bad_request, _},
+        convert_options([{<<"query_params">>, 42}])
     ).
 
-check_strip_credentials_test() ->
-    [
-        ?assertEqual(Expected, strip_credentials(Body))
-     || {Expected, Body} <- [
-            {
-                undefined,
-                undefined
-            },
-            {
-                <<"https://remote_server/database">>,
-                <<"https://foo:bar@remote_server/database">>
-            },
-            {
-                {[{<<"_id">>, <<"foo">>}]},
-                {[{<<"_id">>, <<"foo">>}, {<<"headers">>, <<"bar">>}]}
-            },
-            {
-                {[{<<"_id">>, <<"foo">>}, {<<"other">>, <<"bar">>}]},
-                {[{<<"_id">>, <<"foo">>}, {<<"other">>, <<"bar">>}]}
-            },
-            {
-                {[{<<"_id">>, <<"foo">>}]},
-                {[{<<"_id">>, <<"foo">>}, {<<"headers">>, <<"baz">>}]}
-            },
-            {
-                {[{<<"_id">>, <<"foo">>}]},
-                {[{<<"_id">>, <<"foo">>}, {<<"auth">>, <<"pluginsecret">>}]}
-            }
+local_replication_endpoint_error_test_() ->
+    {
+        foreach,
+        fun meck_config/0,
+        fun(_) -> meck:unload() end,
+        [
+            ?TDEF_FE(t_error_on_local_endpoint),
+            ?TDEF_FE(t_proxy_params_default),
+            ?TDEF_FE(t_parse_db_string),
+            ?TDEF_FE(t_parse_db_url),
+            ?TDEF_FE(t_parse_db_invalid_protocol),
+            ?TDEF_FE(t_parse_proxy_invalid_protocol),
+            ?TDEF_FE(t_parse_sock_opts),
+            ?TDEF_FE(t_parse_sock_opts_invalid)
         ]
-    ].
+    }.
+
+meck_config() ->
+    meck:expect(config, get, fun(_, _, Default) -> Default end).
 
-parse_proxy_params_test() ->
+t_error_on_local_endpoint(_) ->
+    RepDoc =
+        {[
+            {<<"_id">>, <<"someid">>},
+            {<<"source">>, <<"localdb">>},
+            {<<"target">>, <<"http://somehost.local/tgt">>}
+        ]},
+    Expect = local_endpoints_not_supported,
+    ?assertThrow({bad_rep_doc, Expect}, parse_rep_doc_without_id(RepDoc)).
+
+t_proxy_params_default(_) ->
     ?assertEqual(
         [
             {proxy_host, "foo.com"},
@@ -899,74 +633,118 @@ parse_proxy_params_test() ->
         parse_proxy_params("socks5://u:p@foo.com")
     ).
 
-setup() ->
-    DbName = ?tempdb(),
-    {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
-    ok = couch_db:close(Db),
-    create_vdu(DbName),
-    DbName.
-
-teardown(DbName) when is_binary(DbName) ->
-    couch_server:delete(DbName, [?ADMIN_CTX]),
-    ok.
-
-create_vdu(DbName) ->
-    couch_util:with_db(DbName, fun(Db) ->
-        VduFun = <<"function(newdoc, olddoc, userctx) {throw({'forbidden':'fail'})}">>,
-        Doc = #doc{
-            id = <<"_design/vdu">>,
-            body = {[{<<"validate_doc_update">>, VduFun}]}
+t_parse_db_string(_) ->
+    ?assertMatch(
+        #httpdb{
+            url = "http://a/",
+            proxy_url = "http://x"
         },
-        {ok, _} = couch_db:update_docs(Db, [Doc])
-    end).
-
-update_replicator_doc_with_bad_vdu_test_() ->
-    {
-        setup,
-        fun test_util:start_couch/0,
-        fun test_util:stop_couch/1,
-        {
-            foreach,
-            fun setup/0,
-            fun teardown/1,
-            [
-                fun t_vdu_does_not_crash_on_save/1
-            ]
-        }
-    }.
+        parse_rep_db(<<"http://a">>, <<"http://x">>, [])
+    ),
+    ?assertMatch(
+        #httpdb{
+            url = "https://a/",
+            proxy_url = "https://x"
+        },
+        parse_rep_db(<<"https://a">>, <<"https://x">>, [])
+    ),
+    ?assertThrow({error, _}, parse_rep_db(<<"abc">>, <<"foo">>, [])),
+    ?assertThrow({error, _}, parse_rep_db(undefined, <<"foo">>, [])).
+
+t_parse_db_url(_) ->
+    ?assertMatch(
+        #httpdb{
+            url = "http://a?foo",
+            proxy_url = "http://x"
+        },
+        parse_rep_db({[{<<"url">>, <<"http://a?foo">>}]}, <<"http://x">>, [])
+    ),
+    ?assertMatch(
+        #httpdb{
+            url = "https://a/",
+            proxy_url = "https://x"
+        },
+        parse_rep_db({[{<<"url">>, <<"https://a">>}]}, <<"https://x">>, [])
+    ),
+    PUrl = <<"http://x">>,
+    ?assertThrow({error, _}, parse_rep_db({[{<<"url">>, <<"abc">>}]}, PUrl, [])),
+    ?assertThrow({error, _}, parse_rep_db({[{<<"url">>, <<"httpx://a">>}]}, PUrl, [])),
+    ?assertThrow({error, _}, parse_rep_db({[]}, PUrl, [])).
+
+t_parse_db_invalid_protocol(_) ->
+    MeckFun = fun
+        ("replicator", "valid_endpoint_protocols", _) -> "https";
+        (_, _, Default) -> Default
+    end,
+    meck:expect(config, get, MeckFun),
+    PUrl = <<"http://x">>,
+    ?assertMatch(
+        #httpdb{
+            url = "https://a/",
+            proxy_url = "http://x"
+        },
+        parse_rep_db({[{<<"url">>, <<"https://a">>}]}, PUrl, [])
+    ),
+    ?assertThrow({error, _}, parse_rep_db({[{<<"url">>, <<"http://a">>}]}, PUrl, [])).
 
-t_vdu_does_not_crash_on_save(DbName) ->
-    ?_test(begin
-        Doc = #doc{id = <<"some_id">>, body = {[{<<"foo">>, 42}]}},
-        ?assertEqual({ok, forbidden}, save_rep_doc(DbName, Doc))
-    end).
+t_parse_proxy_invalid_protocol(_) ->
+    MeckFun = fun
+        ("replicator", "valid_proxy_protocols", _) -> "socks5";
+        (_, _, Default) -> Default
+    end,
+    meck:expect(config, get, MeckFun),
+    Url = <<"http://a">>,
+    ?assertMatch(
+        #httpdb{
+            url = "https://a/",
+            proxy_url = "socks5://x"
+        },
+        parse_rep_db({[{<<"url">>, <<"https://a">>}]}, <<"socks5://x">>, [])
+    ),
+    ?assertThrow({error, _}, parse_rep_db({[{<<"url">>, Url}]}, <<"http://x">>, [])).
 
-local_replication_endpoint_error_test_() ->
-    {
-        foreach,
-        fun() ->
-            meck:expect(
-                config,
-                get,
-                fun(_, _, Default) -> Default end
-            )
-        end,
-        fun(_) -> meck:unload() end,
+t_parse_sock_opts(_) ->
+    Allowed = "priority, sndbuf",
+    MeckFun = fun
+        ("replicator", "valid_socket_options", _) -> Allowed;
+        (_, _, Default) -> Default
+    end,
+    meck:expect(config, get, MeckFun),
+    RepDoc =
+        {[
+            {<<"source">>, <<"http://a">>},
+            {<<"target">>, <<"http://b/">>},
+            {<<"socket_options">>, <<"[{priority, 3}, {potato, true}, {sndbuf, 10000}]">>}
+        ]},
+    Rep = parse_rep_doc_without_id(RepDoc),
+    ?assertMatch(
+        #rep{
+            source = #httpdb{url = "http://a/"},
+            target = #httpdb{url = "http://b/"},
+            options = [{_, _} | _]
+        },
+        Rep
+    ),
+    Options = Rep#rep.options,
+    ?assertEqual(
         [
-            t_error_on_local_endpoint()
-        ]
-    }.
-
-t_error_on_local_endpoint() ->
-    ?_test(begin
-        RepDoc =
-            {[
-                {<<"_id">>, <<"someid">>},
-                {<<"source">>, <<"localdb">>},
-                {<<"target">>, <<"http://somehost.local/tgt">>}
+            {checkpoint_interval, 30000},
+            {connection_timeout, 30000},
+            {http_connections, 20},
+            {retries, 5},
+            {socket_options, [
+                {priority, 3},
+                {sndbuf, 10000}
             ]},
-        Expect = local_endpoints_not_supported,
-        ?assertThrow({bad_rep_doc, Expect}, parse_rep_doc_without_id(RepDoc))
-    end).
+            {use_bulk_get, true},
+            {use_checkpoints, true},
+            {worker_batch_size, 500},
+            {worker_processes, 4}
+        ],
+        Options
+    ).
+
+t_parse_sock_opts_invalid(_) ->
+    ?assertEqual([], parse_sock_opts(<<"<}garbage]][">>, [], [])).
 
 -endif.
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 1ba933a5e..1d3e70c5a 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -1118,7 +1118,7 @@ log_replication_start(#rep_state{rep_details = Rep} = RepState) ->
 
 -ifdef(TEST).
 
--include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
 
 replication_start_error_test() ->
     ?assertEqual(
@@ -1144,13 +1144,26 @@ replication_start_error_test() ->
         replication_start_error({http_request_failed, "GET", "http://x/y", {error, {code, 503}}})
     ).
 
-scheduler_job_format_status_test() ->
+format_status_test_() ->
+    {
+        foreach,
+        fun meck_config/0,
+        fun(_) -> meck:unload() end,
+        [
+            ?TDEF_FE(t_scheduler_job_format_status)
+        ]
+    }.
+
+meck_config() ->
+    meck:expect(config, get, fun(_, _, Default) -> Default end).
+
+t_scheduler_job_format_status(_) ->
     Source = <<"http://u:p@h1/d1">>,
     Target = <<"http://u:p@h2/d2">>,
     Rep = #rep{
         id = {"base", "+ext"},
-        source = couch_replicator_docs:parse_rep_db(Source, [], []),
-        target = couch_replicator_docs:parse_rep_db(Target, [], []),
+        source = couch_replicator_parse:parse_rep_db(Source, [], []),
+        target = couch_replicator_parse:parse_rep_db(Target, [], []),
         options = [{create_target, true}],
         doc_id = <<"mydoc">>,
         db_name = <<"mydb">>
diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl
index e4a2cd12f..40d188516 100644
--- a/src/couch_replicator/src/couch_replicator_utils.erl
+++ b/src/couch_replicator/src/couch_replicator_utils.erl
@@ -13,7 +13,6 @@
 -module(couch_replicator_utils).
 
 -export([
-    parse_rep_doc/2,
     replication_id/2,
     sum_stats/2,
     is_deleted/1,
@@ -95,9 +94,6 @@ replication_id(Rep, Version) ->
 sum_stats(S1, S2) ->
     couch_replicator_stats:sum_stats(S1, S2).
 
-parse_rep_doc(Props, UserCtx) ->
-    couch_replicator_docs:parse_rep_doc(Props, UserCtx).
-
 -spec iso8601(erlang:timestamp()) -> binary().
 iso8601({_Mega, _Sec, _Micro} = Timestamp) ->
     {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time(Timestamp),
@@ -282,7 +278,7 @@ seq_encode(Seq) ->
 
 -ifdef(TEST).
 
--include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
 
 remove_basic_auth_from_headers_test_() ->
     [
@@ -351,7 +347,7 @@ normalize_rep_test_() ->
                     {<<"doc_ids">>, [<<"a">>, <<"c">>, <<"b">>]},
                     {<<"other_field">>, <<"some_value">>}
                 ]},
-            Rep1 = couch_replicator_docs:parse_rep_doc_without_id(EJson1),
+            Rep1 = couch_replicator_parse:parse_rep_doc_without_id(EJson1),
             EJson2 =
                 {[
                     {<<"other_field">>, <<"unrelated">>},
@@ -360,7 +356,7 @@ normalize_rep_test_() ->
                     {<<"doc_ids">>, [<<"c">>, <<"a">>, <<"b">>]},
                     {<<"other_field2">>, <<"unrelated2">>}
                 ]},
-            Rep2 = couch_replicator_docs:parse_rep_doc_without_id(EJson2),
+            Rep2 = couch_replicator_parse:parse_rep_doc_without_id(EJson2),
             ?assertEqual(normalize_rep(Rep1), normalize_rep(Rep2))
         end)
     }.
diff --git a/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl
index 2d5ef96b1..df8074f1f 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl
@@ -304,7 +304,7 @@ replicate(Source, Target) ->
             {<<"target">>, db_url(Target)},
             {<<"continuous">>, true}
         ]},
-    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
+    {ok, Rep} = couch_replicator_parse:parse_rep_doc(RepObject, ?ADMIN_USER),
     ok = couch_replicator_scheduler:add_job(Rep),
     couch_replicator_scheduler:reschedule(),
     Pid = couch_replicator_test_helper:get_pid(Rep#rep.id),
diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
index 6bdb4ecb2..7ba6bc69d 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
@@ -226,7 +226,7 @@ replicate(Source, Target) ->
             % Low connection timeout so _changes feed gets restarted quicker
             {<<"connection_timeout">>, 3000}
         ]},
-    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
+    {ok, Rep} = couch_replicator_parse:parse_rep_doc(RepObject, ?ADMIN_USER),
     ok = couch_replicator_scheduler:add_job(Rep),
     couch_replicator_scheduler:reschedule(),
     {ok, Rep#rep.id}.
diff --git a/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl
index 3468cda73..758c44f2b 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl
@@ -50,7 +50,7 @@ parse_rep_doc_without_proxy(_) ->
             {<<"source">>, <<"http://unproxied.com">>},
             {<<"target">>, <<"http://otherunproxied.com">>}
         ]},
-    Rep = couch_replicator_docs:parse_rep_doc(NoProxyDoc),
+    Rep = couch_replicator_parse:parse_rep_doc(NoProxyDoc),
     ?assertEqual((Rep#rep.source)#httpdb.proxy_url, undefined),
     ?assertEqual((Rep#rep.target)#httpdb.proxy_url, undefined).
 
@@ -62,7 +62,7 @@ parse_rep_doc_with_proxy(_) ->
             {<<"target">>, <<"http://otherunproxied.com">>},
             {<<"proxy">>, ProxyURL}
         ]},
-    Rep = couch_replicator_docs:parse_rep_doc(ProxyDoc),
+    Rep = couch_replicator_parse:parse_rep_doc(ProxyDoc),
     ?assertEqual((Rep#rep.source)#httpdb.proxy_url, binary_to_list(ProxyURL)),
     ?assertEqual((Rep#rep.target)#httpdb.proxy_url, binary_to_list(ProxyURL)).
 
@@ -76,7 +76,7 @@ parse_rep_source_target_proxy(_) ->
             {<<"source_proxy">>, SrcProxyURL},
             {<<"target_proxy">>, TgtProxyURL}
         ]},
-    Rep = couch_replicator_docs:parse_rep_doc(ProxyDoc),
+    Rep = couch_replicator_parse:parse_rep_doc(ProxyDoc),
     ?assertEqual(
         (Rep#rep.source)#httpdb.proxy_url,
         binary_to_list(SrcProxyURL)
@@ -96,7 +96,7 @@ mutually_exclusive_proxy_and_source_proxy(_) ->
         ]},
     ?assertThrow(
         {bad_rep_doc, _},
-        couch_replicator_docs:parse_rep_doc(ProxyDoc)
+        couch_replicator_parse:parse_rep_doc(ProxyDoc)
     ).
 
 mutually_exclusive_proxy_and_target_proxy(_) ->
@@ -109,5 +109,5 @@ mutually_exclusive_proxy_and_target_proxy(_) ->
         ]},
     ?assertThrow(
         {bad_rep_doc, _},
-        couch_replicator_docs:parse_rep_doc(ProxyDoc)
+        couch_replicator_parse:parse_rep_doc(ProxyDoc)
     ).
diff --git a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
index 04c665af5..f413e5cf4 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
@@ -214,7 +214,7 @@ replicate(Source, Target) ->
             {<<"target">>, db_url(Target)},
             {<<"continuous">>, true}
         ]},
-    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
+    {ok, Rep} = couch_replicator_parse:parse_rep_doc(RepObject, ?ADMIN_USER),
     ok = couch_replicator_scheduler:add_job(Rep),
     couch_replicator_scheduler:reschedule(),
     Pid = couch_replicator_test_helper:get_pid(Rep#rep.id),
diff --git a/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl b/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl
index f30bdb1cd..f862527f4 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl
@@ -173,7 +173,7 @@ replicate(Source, Target) ->
     ).
 
 replicate({[_ | _]} = RepObject) ->
-    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
+    {ok, Rep} = couch_replicator_parse:parse_rep_doc(RepObject, ?ADMIN_USER),
     ok = couch_replicator_scheduler:add_job(Rep),
     couch_replicator_scheduler:reschedule(),
     Pid = get_pid(Rep#rep.id),
diff --git a/src/docs/src/config/replicator.rst b/src/docs/src/config/replicator.rst
index 092711450..63caca88e 100644
--- a/src/docs/src/config/replicator.rst
+++ b/src/docs/src/config/replicator.rst
@@ -152,6 +152,40 @@ Replicator Database Configuration
 
         .. _inet: http://www.erlang.org/doc/man/inet.html#setopts-2
 
+    .. config:option:: valid_socket_options :: Erlang socket options
+
+        .. versionadded:: 3.3
+
+        Valid socket options. Options not in this list are ignored. Most of
+        those options are low level and setting some of them may lead to
+        unintended or unpredictable behavior. See `inet`_ Erlang docs for the
+        full list of options::
+
+            [replicator]
+            valid_socket_options = buffer,keepalive,nodelay,priority,recbuf,sndbuf
+
+        .. _inet: http://www.erlang.org/doc/man/inet.html#setopts-2
+
+     .. config:option:: valid_endpoint_protocols :: Replicator endpoint protocols
+
+        .. versionadded:: 3.3
+
+        Valid replication endpoint protocols. Replication jobs with endpoint
+        urls not in this list will fail to run::
+
+            [replicator]
+            valid_endpoint_protocols = http,https
+
+     .. config:option:: valid_proxy_protocols :: Replicator proxy protocols
+
+        .. versionadded:: 3.3
+
+        Valid replication proxy protocols. Replication jobs with proxy
+        urls not in this list will fail to run::
+
+            [replicator]
+            valid_proxy_protocols = http,https,socks5
+
     .. config:option:: checkpoint_interval :: Replication checkpoint interval
 
         .. versionadded:: 1.6
diff --git a/src/fabric/src/fabric_doc_update.erl b/src/fabric/src/fabric_doc_update.erl
index 5a60dcb32..b7b9e5972 100644
--- a/src/fabric/src/fabric_doc_update.erl
+++ b/src/fabric/src/fabric_doc_update.erl
@@ -114,19 +114,29 @@ handle_message({request_entity_too_large, Entity}, _, _) ->
     throw({request_entity_too_large, Entity}).
 
 before_doc_update(DbName, Docs, Opts) ->
+    % Use the same pattern as in couch_db:validate_doc_update/3. If the document was already
+    % checked during the interactive edit we don't want to spend time in the internal replicator
+    % revalidating everything.
+    UpdateType =
+        case get(io_priority) of
+            {internal_repl, _} ->
+                ?REPLICATED_CHANGES;
+            _ ->
+                ?INTERACTIVE_EDIT
+        end,
     case {fabric_util:is_replicator_db(DbName), fabric_util:is_users_db(DbName)} of
         {true, _} ->
             %% cluster db is expensive to create so we only do it if we have to
             Db = fabric_util:open_cluster_db(DbName, Opts),
             [
-                couch_replicator_docs:before_doc_update(Doc, Db, ?REPLICATED_CHANGES)
+                couch_replicator_docs:before_doc_update(Doc, Db, UpdateType)
              || Doc <- Docs
             ];
         {_, true} ->
             %% cluster db is expensive to create so we only do it if we have to
             Db = fabric_util:open_cluster_db(DbName, Opts),
             [
-                couch_users_db:before_doc_update(Doc, Db, ?INTERACTIVE_EDIT)
+                couch_users_db:before_doc_update(Doc, Db, UpdateType)
              || Doc <- Docs
             ];
         _ ->