You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2022/11/22 16:33:59 UTC

[couchdb] branch improve-replicator-doc-parsing created (now c75ced0f8)

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a change to branch improve-replicator-doc-parsing
in repository https://gitbox.apache.org/repos/asf/couchdb.git


      at c75ced0f8 [wip] Replace the auto-inserted replicator VDU with a BDU

This branch includes the following new commits:

     new c75ced0f8 [wip] Replace the auto-inserted replicator VDU with a BDU

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[couchdb] 01/01: [wip] Replace the auto-inserted replicator VDU with a BDU

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch improve-replicator-doc-parsing
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit c75ced0f8baf85ec3caad3bf9c92092d4a592c26
Author: Nick Vatamaniuc <va...@gmail.com>
AuthorDate: Tue Nov 22 11:31:54 2022 -0500

    [wip] Replace the auto-inserted replicator VDU with a BDU
    
    Switch out the VDU with a BDU (before_doc_update) check. Couch replicator
    already had a BDU to update the `"owner"` field so we can plug right into it
    and validate everything we need there. This way we'll have only single
    validation and parsing code.
---
 src/couch_replicator/src/couch_replicator.erl      |   6 +-
 .../src/couch_replicator_doc_processor.erl         |   9 +-
 .../src/couch_replicator_doc_processor_worker.erl  |  14 +-
 src/couch_replicator/src/couch_replicator_docs.erl | 564 +++------------------
 src/couch_replicator/src/couch_replicator_ids.erl  |   8 +-
 .../src/couch_replicator_js_functions.hrl          | 183 -------
 ...licator_docs.erl => couch_replicator_parse.erl} | 357 +------------
 .../src/couch_replicator_scheduler_job.erl         |   4 +-
 .../src/couch_replicator_utils.erl                 |   8 +-
 .../test/eunit/couch_replicator_compact_tests.erl  |   2 +-
 .../couch_replicator_error_reporting_tests.erl     |   2 +-
 .../test/eunit/couch_replicator_proxy_tests.erl    |  10 +-
 ...ch_replicator_retain_stats_between_job_runs.erl |   2 +-
 .../test/eunit/couch_replicator_test_helper.erl    |   2 +-
 14 files changed, 110 insertions(+), 1061 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index 39b3903ea..3bb177968 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -25,10 +25,8 @@
 -include_lib("couch/include/couch_db.hrl").
 -include("couch_replicator.hrl").
 -include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
--include_lib("couch_mrview/include/couch_mrview.hrl").
 -include_lib("mem3/include/mem3.hrl").
 
--define(DESIGN_DOC_CREATION_DELAY_MSEC, 1000).
 -define(REPLICATION_STATES, [
     % Just added to scheduler
     initializing,
@@ -58,7 +56,7 @@
     | {error, any()}
     | no_return().
 replicate(PostBody, Ctx) ->
-    {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, Ctx),
+    {ok, Rep0} = couch_replicator_parse:parse_rep_doc(PostBody, Ctx),
     Rep = Rep0#rep{start_time = os:timestamp()},
     #rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep,
     case get_value(cancel, Options, false) of
@@ -138,7 +136,7 @@ replication_states() ->
 
 -spec strip_url_creds(binary() | {[_]}) -> binary().
 strip_url_creds(Endpoint) ->
-    try couch_replicator_docs:parse_rep_db(Endpoint, [], []) of
+    try couch_replicator_parse:parse_rep_db(Endpoint, [], []) of
         #httpdb{url = Url} ->
             iolist_to_binary(couch_util:url_strip_password(Url))
     catch
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index 436d7c44d..eb4c02b49 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -44,9 +44,7 @@
     notify_cluster_event/2
 ]).
 
--include_lib("couch/include/couch_db.hrl").
 -include("couch_replicator.hrl").
--include_lib("mem3/include/mem3.hrl").
 
 -import(couch_replicator_utils, [
     get_json_value/2,
@@ -77,9 +75,8 @@
 
 % couch_multidb_changes API callbacks
 
-db_created(DbName, Server) ->
+db_created(_DbName, Server) ->
     couch_stats:increment_counter([couch_replicator, docs, dbs_created]),
-    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
     Server.
 
 db_deleted(DbName, Server) ->
@@ -89,7 +86,7 @@ db_deleted(DbName, Server) ->
 
 db_found(DbName, Server) ->
     couch_stats:increment_counter([couch_replicator, docs, dbs_found]),
-    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
+    couch_replicator_docs:delete_old_rep_ddoc(DbName),
     Server.
 
 db_change(DbName, {ChangeProps} = Change, Server) ->
@@ -169,7 +166,7 @@ process_updated({DbName, _DocId} = Id, JsonRepDoc) ->
     % should propagate to db_change function and will be recorded as permanent
     % failure in the document. User will have to update the documet to fix the
     % problem.
-    Rep0 = couch_replicator_docs:parse_rep_doc_without_id(JsonRepDoc),
+    Rep0 = couch_replicator_parse:parse_rep_doc_without_id(JsonRepDoc),
     Rep = Rep0#rep{db_name = DbName, start_time = os:timestamp()},
     Filter =
         case couch_replicator_filters:parse(Rep#rep.options) of
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl b/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl
index 22c5f8584..b1014ffa5 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl
@@ -162,7 +162,7 @@ doc_processor_worker_test_() ->
 t_should_add_job() ->
     ?_test(begin
         Id = {?DB, ?DOC1},
-        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+        Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
         ?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)),
         ?assert(added_job())
     end).
@@ -172,7 +172,7 @@ t_already_running_same_docid() ->
     ?_test(begin
         Id = {?DB, ?DOC1},
         mock_already_running(?DB, ?DOC1),
-        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+        Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
         ?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)),
         ?assert(did_not_add_job())
     end).
@@ -182,7 +182,7 @@ t_already_running_transient() ->
     ?_test(begin
         Id = {?DB, ?DOC1},
         mock_already_running(null, null),
-        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+        Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
         ?assertMatch(
             {temporary_error, _},
             maybe_start_replication(
@@ -200,7 +200,7 @@ t_already_running_other_db_other_doc() ->
     ?_test(begin
         Id = {?DB, ?DOC1},
         mock_already_running(<<"otherdb">>, <<"otherdoc">>),
-        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+        Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
         ?assertMatch(
             {permanent_failure, _},
             maybe_start_replication(
@@ -217,7 +217,7 @@ t_already_running_other_db_other_doc() ->
 t_spawn_worker() ->
     ?_test(begin
         Id = {?DB, ?DOC1},
-        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+        Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
         WRef = make_ref(),
         meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, WRef),
         Pid = spawn_worker(Id, Rep, 0, WRef),
@@ -236,7 +236,7 @@ t_spawn_worker() ->
 t_ignore_if_doc_deleted() ->
     ?_test(begin
         Id = {?DB, ?DOC1},
-        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+        Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
         meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, nil),
         ?assertEqual(ignore, maybe_start_replication(Id, Rep, make_ref())),
         ?assertNot(added_job())
@@ -247,7 +247,7 @@ t_ignore_if_doc_deleted() ->
 t_ignore_if_worker_ref_does_not_match() ->
     ?_test(begin
         Id = {?DB, ?DOC1},
-        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+        Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
         meck:expect(
             couch_replicator_doc_processor,
             get_worker_ref,
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index a60f1a1e1..20dbc0d67 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -13,15 +13,14 @@
 -module(couch_replicator_docs).
 
 -export([
-    parse_rep_doc/1,
-    parse_rep_doc/2,
-    parse_rep_db/3,
-    parse_rep_doc_without_id/1,
-    parse_rep_doc_without_id/2,
+    %% parse_rep_doc/1,
+    %% parse_rep_doc/2,
+    %% parse_rep_db/3,
+    %% parse_rep_doc_without_id/1,
+    %% parse_rep_doc_without_id/2,
     before_doc_update/3,
     after_doc_read/2,
-    ensure_rep_ddoc_exists/1,
-    ensure_cluster_rep_ddoc_exists/1,
+    delete_old_rep_ddoc/1,
     remove_state_fields/2,
     update_doc_completed/3,
     update_failed/3,
@@ -31,24 +30,10 @@
 ]).
 
 -include_lib("couch/include/couch_db.hrl").
--include_lib("ibrowse/include/ibrowse.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
 -include("couch_replicator.hrl").
--include("couch_replicator_js_functions.hrl").
 
--import(couch_util, [
-    get_value/2,
-    get_value/3,
-    to_binary/1
-]).
-
--import(couch_replicator_utils, [
-    get_json_value/2,
-    get_json_value/3
-]).
-
--define(REP_DB_NAME, <<"_replicator">>).
+% The ID of now deleted design doc. On every *_replicator db discovery we try
+% to delete it. At some point in the future, remove this logic altogether.
 -define(REP_DESIGN_DOC, <<"_design/_replicator">>).
 -define(OWNER, <<"owner">>).
 -define(CTX, {user_ctx, #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]}}).
@@ -126,176 +111,32 @@ update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) ->
     ]),
     ok.
 
--spec ensure_rep_ddoc_exists(binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb) ->
+-spec delete_old_rep_ddoc(binary()) -> ok.
+delete_old_rep_ddoc(RepDb) ->
     case mem3:belongs(RepDb, ?REP_DESIGN_DOC) of
-        true ->
-            ensure_rep_ddoc_exists(RepDb, ?REP_DESIGN_DOC);
-        false ->
-            ok
+        true -> delete_old_rep_ddoc(RepDb, ?REP_DESIGN_DOC);
+        false -> ok
     end.
 
--spec ensure_rep_ddoc_exists(binary(), binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb, DDocId) ->
+-spec delete_old_rep_ddoc(binary(), binary()) -> ok.
+delete_old_rep_ddoc(RepDb, DDocId) ->
     case open_rep_doc(RepDb, DDocId) of
         {not_found, no_db_file} ->
-            %% database was deleted.
             ok;
         {not_found, _Reason} ->
-            DocProps = replication_design_doc_props(DDocId),
-            DDoc = couch_doc:from_json_obj({DocProps}),
-            couch_log:notice("creating replicator ddoc ~p", [RepDb]),
-            {ok, _Rev} = save_rep_doc(RepDb, DDoc);
+            ok;
         {ok, Doc} ->
-            Latest = replication_design_doc_props(DDocId),
-            {Props0} = couch_doc:to_json_obj(Doc, []),
-            {value, {_, Rev}, Props} = lists:keytake(<<"_rev">>, 1, Props0),
-            case compare_ejson({Props}, {Latest}) of
-                true ->
-                    ok;
-                false ->
-                    LatestWithRev = [{<<"_rev">>, Rev} | Latest],
-                    DDoc = couch_doc:from_json_obj({LatestWithRev}),
-                    couch_log:notice("updating replicator ddoc ~p", [RepDb]),
-                    try
-                        {ok, _} = save_rep_doc(RepDb, DDoc)
-                    catch
-                        throw:conflict ->
-                            %% ignore, we'll retry next time
-                            ok
-                    end
+            DeletedDoc = Doc#doc{deleted = true, body = {[]}},
+            try
+                save_rep_doc(RepDb, DeletedDoc)
+            catch
+                throw:conflict ->
+                    % ignore, we'll retry next time
+                    ok
             end
     end,
     ok.
 
--spec ensure_cluster_rep_ddoc_exists(binary()) -> ok.
-ensure_cluster_rep_ddoc_exists(RepDb) ->
-    DDocId = ?REP_DESIGN_DOC,
-    [#shard{name = DbShard} | _] = mem3:shards(RepDb, DDocId),
-    ensure_rep_ddoc_exists(DbShard, DDocId).
-
--spec compare_ejson({[_]}, {[_]}) -> boolean().
-compare_ejson(EJson1, EJson2) ->
-    EjsonSorted1 = couch_replicator_filters:ejsort(EJson1),
-    EjsonSorted2 = couch_replicator_filters:ejsort(EJson2),
-    EjsonSorted1 == EjsonSorted2.
-
--spec replication_design_doc_props(binary()) -> [_].
-replication_design_doc_props(DDocId) ->
-    [
-        {<<"_id">>, DDocId},
-        {<<"language">>, <<"javascript">>},
-        {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
-    ].
-
-% Note: parse_rep_doc can handle filtered replications. During parsing of the
-% replication doc it will make possibly remote http requests to the source
-% database. If failure or parsing of filter docs fails, parse_doc throws a
-% {filter_fetch_error, Error} excation. This exception should be considered
-% transient in respect to the contents of the document itself, since it depends
-% on netowrk availability of the source db and other factors.
--spec parse_rep_doc({[_]}) -> #rep{}.
-parse_rep_doc(RepDoc) ->
-    {ok, Rep} =
-        try
-            parse_rep_doc(RepDoc, rep_user_ctx(RepDoc))
-        catch
-            throw:{error, Reason} ->
-                throw({bad_rep_doc, Reason});
-            throw:{filter_fetch_error, Reason} ->
-                throw({filter_fetch_error, Reason});
-            Tag:Err ->
-                throw({bad_rep_doc, to_binary({Tag, Err})})
-        end,
-    Rep.
-
--spec parse_rep_doc_without_id({[_]}) -> #rep{}.
-parse_rep_doc_without_id(RepDoc) ->
-    {ok, Rep} =
-        try
-            parse_rep_doc_without_id(RepDoc, rep_user_ctx(RepDoc))
-        catch
-            throw:{error, Reason} ->
-                throw({bad_rep_doc, Reason});
-            Tag:Err ->
-                throw({bad_rep_doc, to_binary({Tag, Err})})
-        end,
-    Rep.
-
--spec parse_rep_doc({[_]}, #user_ctx{}) -> {ok, #rep{}}.
-parse_rep_doc(Doc, UserCtx) ->
-    {ok, Rep} = parse_rep_doc_without_id(Doc, UserCtx),
-    Cancel = get_value(cancel, Rep#rep.options, false),
-    Id = get_value(id, Rep#rep.options, nil),
-    case {Cancel, Id} of
-        {true, nil} ->
-            % Cancel request with no id, must parse id out of body contents
-            {ok, update_rep_id(Rep)};
-        {true, Id} ->
-            % Cancel request with an id specified, so do not parse id from body
-            {ok, Rep};
-        {false, _Id} ->
-            % Not a cancel request, regular replication doc
-            {ok, update_rep_id(Rep)}
-    end.
-
--spec parse_rep_doc_without_id({[_]}, #user_ctx{}) -> {ok, #rep{}}.
-parse_rep_doc_without_id({Props}, UserCtx) ->
-    {SrcProxy, TgtProxy} = parse_proxy_settings(Props),
-    Opts = make_options(Props),
-    case
-        get_value(cancel, Opts, false) andalso
-            (get_value(id, Opts, nil) =/= nil)
-    of
-        true ->
-            {ok, #rep{options = Opts, user_ctx = UserCtx}};
-        false ->
-            Source = parse_rep_db(get_value(<<"source">>, Props), SrcProxy, Opts),
-            Target = parse_rep_db(get_value(<<"target">>, Props), TgtProxy, Opts),
-            {Type, View} =
-                case couch_replicator_filters:view_type(Props, Opts) of
-                    {error, Error} ->
-                        throw({bad_request, Error});
-                    Result ->
-                        Result
-                end,
-            Rep = #rep{
-                source = Source,
-                target = Target,
-                options = Opts,
-                user_ctx = UserCtx,
-                type = Type,
-                view = View,
-                doc_id = get_value(<<"_id">>, Props, null)
-            },
-            % Check if can parse filter code, if not throw exception
-            case couch_replicator_filters:parse(Opts) of
-                {error, FilterError} ->
-                    throw({error, FilterError});
-                {ok, _Filter} ->
-                    ok
-            end,
-            {ok, Rep}
-    end.
-
-parse_proxy_settings(Props) when is_list(Props) ->
-    Proxy = get_value(<<"proxy">>, Props, <<>>),
-    SrcProxy = get_value(<<"source_proxy">>, Props, <<>>),
-    TgtProxy = get_value(<<"target_proxy">>, Props, <<>>),
-
-    case Proxy =/= <<>> of
-        true when SrcProxy =/= <<>> ->
-            Error = "`proxy` is mutually exclusive with `source_proxy`",
-            throw({bad_request, Error});
-        true when TgtProxy =/= <<>> ->
-            Error = "`proxy` is mutually exclusive with `target_proxy`",
-            throw({bad_request, Error});
-        true ->
-            {Proxy, Proxy};
-        false ->
-            {SrcProxy, TgtProxy}
-    end.
-
 % Update a #rep{} record with a replication_id. Calculating the id might involve
 % fetching a filter from the source db, and so it could fail intermetently.
 % In case of a failure to fetch the filter this function will throw a
@@ -386,316 +227,46 @@ save_rep_doc(DbName, Doc) ->
         couch_db:close(Db)
     end.
 
--spec rep_user_ctx({[_]}) -> #user_ctx{}.
-rep_user_ctx({RepDoc}) ->
-    case get_json_value(<<"user_ctx">>, RepDoc) of
-        undefined ->
-            #user_ctx{};
-        {UserCtx} ->
-            #user_ctx{
-                name = get_json_value(<<"name">>, UserCtx, null),
-                roles = get_json_value(<<"roles">>, UserCtx, [])
-            }
-    end.
-
--spec parse_rep_db({[_]} | binary(), [_] | binary(), [_]) -> #httpdb{} | no_return().
-parse_rep_db({Props}, Proxy, Options) ->
-    ProxyParams = parse_proxy_params(Proxy),
-    ProxyURL =
-        case ProxyParams of
-            [] -> undefined;
-            _ -> binary_to_list(Proxy)
-        end,
-    Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)),
-    {AuthProps} = get_value(<<"auth">>, Props, {[]}),
-    {BinHeaders} = get_value(<<"headers">>, Props, {[]}),
-    Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]),
-    DefaultHeaders = (#httpdb{})#httpdb.headers,
-    HttpDb = #httpdb{
-        url = Url,
-        auth_props = AuthProps,
-        headers = lists:ukeymerge(1, Headers, DefaultHeaders),
-        ibrowse_options = lists:keysort(
-            1,
-            [
-                {socket_options, get_value(socket_options, Options)}
-                | ProxyParams ++ ssl_params(Url)
-            ]
-        ),
-        timeout = get_value(connection_timeout, Options),
-        http_connections = get_value(http_connections, Options),
-        retries = get_value(retries, Options),
-        proxy_url = ProxyURL
-    },
-    couch_replicator_utils:normalize_basic_auth(HttpDb);
-parse_rep_db(<<"http://", _/binary>> = Url, Proxy, Options) ->
-    parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options);
-parse_rep_db(<<"https://", _/binary>> = Url, Proxy, Options) ->
-    parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options);
-parse_rep_db(<<_/binary>>, _Proxy, _Options) ->
-    throw({error, local_endpoints_not_supported});
-parse_rep_db(undefined, _Proxy, _Options) ->
-    throw({error, <<"Missing replicator database">>}).
-
--spec maybe_add_trailing_slash(binary() | list()) -> list().
-maybe_add_trailing_slash(Url) when is_binary(Url) ->
-    maybe_add_trailing_slash(?b2l(Url));
-maybe_add_trailing_slash(Url) ->
-    case lists:member($?, Url) of
-        true ->
-            % skip if there are query params
-            Url;
-        false ->
-            case lists:last(Url) of
-                $/ ->
-                    Url;
-                _ ->
-                    Url ++ "/"
-            end
-    end.
-
--spec make_options([_]) -> [_].
-make_options(Props) ->
-    Options0 = lists:ukeysort(1, convert_options(Props)),
-    Options = check_options(Options0),
-    DefWorkers = config:get_integer("replicator", "worker_processes", 4),
-    DefBatchSize = config:get_integer("replicator", "worker_batch_size", 500),
-    DefConns = config:get_integer("replicator", "http_connections", 20),
-    DefTimeout = config:get_integer("replicator", "connection_timeout", 30000),
-    DefRetries = config:get_integer("replicator", "retries_per_request", 5),
-    UseCheckpoints = config:get_boolean("replicator", "use_checkpoints", true),
-    UseBulkGet = config:get_boolean("replicator", "use_bulk_get", true),
-    DefCheckpointInterval = config:get_integer(
-        "replicator",
-        "checkpoint_interval",
-        30000
-    ),
-    {ok, DefSocketOptions} = couch_util:parse_term(
-        config:get(
-            "replicator",
-            "socket_options",
-            "[{keepalive, true}, {nodelay, false}]"
-        )
-    ),
-    lists:ukeymerge(
-        1,
-        Options,
-        lists:keysort(1, [
-            {connection_timeout, DefTimeout},
-            {retries, DefRetries},
-            {http_connections, DefConns},
-            {socket_options, DefSocketOptions},
-            {worker_batch_size, DefBatchSize},
-            {worker_processes, DefWorkers},
-            {use_checkpoints, UseCheckpoints},
-            {use_bulk_get, UseBulkGet},
-            {checkpoint_interval, DefCheckpointInterval}
-        ])
-    ).
-
--spec convert_options([_]) -> [_].
-convert_options([]) ->
-    [];
-convert_options([{<<"cancel">>, V} | _R]) when not is_boolean(V) ->
-    throw({bad_request, <<"parameter `cancel` must be a boolean">>});
-convert_options([{<<"cancel">>, V} | R]) ->
-    [{cancel, V} | convert_options(R)];
-convert_options([{IdOpt, V} | R]) when
-    IdOpt =:= <<"_local_id">>;
-    IdOpt =:= <<"replication_id">>;
-    IdOpt =:= <<"id">>
-->
-    [{id, couch_replicator_ids:convert(V)} | convert_options(R)];
-convert_options([{<<"create_target">>, V} | _R]) when not is_boolean(V) ->
-    throw({bad_request, <<"parameter `create_target` must be a boolean">>});
-convert_options([{<<"create_target">>, V} | R]) ->
-    [{create_target, V} | convert_options(R)];
-convert_options([{<<"create_target_params">>, V} | _R]) when not is_tuple(V) ->
-    throw({bad_request, <<"parameter `create_target_params` must be a JSON object">>});
-convert_options([{<<"create_target_params">>, V} | R]) ->
-    [{create_target_params, V} | convert_options(R)];
-convert_options([{<<"winning_revs_only">>, V} | _R]) when not is_boolean(V) ->
-    throw({bad_request, <<"parameter `winning_revs_only` must be a boolean">>});
-convert_options([{<<"winning_revs_only">>, V} | R]) ->
-    [{winning_revs_only, V} | convert_options(R)];
-convert_options([{<<"continuous">>, V} | _R]) when not is_boolean(V) ->
-    throw({bad_request, <<"parameter `continuous` must be a boolean">>});
-convert_options([{<<"continuous">>, V} | R]) ->
-    [{continuous, V} | convert_options(R)];
-convert_options([{<<"filter">>, V} | R]) ->
-    [{filter, V} | convert_options(R)];
-convert_options([{<<"query_params">>, V} | R]) ->
-    [{query_params, V} | convert_options(R)];
-convert_options([{<<"doc_ids">>, null} | R]) ->
-    convert_options(R);
-convert_options([{<<"doc_ids">>, V} | _R]) when not is_list(V) ->
-    throw({bad_request, <<"parameter `doc_ids` must be an array">>});
-convert_options([{<<"doc_ids">>, V} | R]) ->
-    % Ensure same behaviour as old replicator: accept a list of percent
-    % encoded doc IDs.
-    DocIds = lists:usort([?l2b(couch_httpd:unquote(Id)) || Id <- V]),
-    [{doc_ids, DocIds} | convert_options(R)];
-convert_options([{<<"selector">>, V} | _R]) when not is_tuple(V) ->
-    throw({bad_request, <<"parameter `selector` must be a JSON object">>});
-convert_options([{<<"selector">>, V} | R]) ->
-    [{selector, V} | convert_options(R)];
-convert_options([{<<"worker_processes">>, V} | R]) ->
-    [{worker_processes, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"worker_batch_size">>, V} | R]) ->
-    [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"http_connections">>, V} | R]) ->
-    [{http_connections, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"connection_timeout">>, V} | R]) ->
-    [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"retries_per_request">>, V} | R]) ->
-    [{retries, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"socket_options">>, V} | R]) ->
-    {ok, SocketOptions} = couch_util:parse_term(V),
-    [{socket_options, SocketOptions} | convert_options(R)];
-convert_options([{<<"since_seq">>, V} | R]) ->
-    [{since_seq, V} | convert_options(R)];
-convert_options([{<<"use_checkpoints">>, V} | R]) ->
-    [{use_checkpoints, V} | convert_options(R)];
-convert_options([{<<"use_bulk_get">>, V} | _R]) when not is_boolean(V) ->
-    throw({bad_request, <<"parameter `use_bulk_get` must be a boolean">>});
-convert_options([{<<"use_bulk_get">>, V} | R]) ->
-    [{use_bulk_get, V} | convert_options(R)];
-convert_options([{<<"checkpoint_interval">>, V} | R]) ->
-    [{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)];
-% skip unknown option
-convert_options([_ | R]) ->
-    convert_options(R).
-
--spec check_options([_]) -> [_].
-check_options(Options) ->
-    DocIds = lists:keyfind(doc_ids, 1, Options),
-    Filter = lists:keyfind(filter, 1, Options),
-    Selector = lists:keyfind(selector, 1, Options),
-    case {DocIds, Filter, Selector} of
-        {false, false, false} -> Options;
-        {false, false, _} -> Options;
-        {false, _, false} -> Options;
-        {_, false, false} -> Options;
-        _ -> throw({bad_request, "`doc_ids`,`filter`,`selector` are mutually exclusive"})
-    end.
-
--spec parse_proxy_params(binary() | [_]) -> [_].
-parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
-    parse_proxy_params(?b2l(ProxyUrl));
-parse_proxy_params([]) ->
-    [];
-parse_proxy_params(ProxyUrl) ->
-    #url{
-        host = Host,
-        port = Port,
-        username = User,
-        password = Passwd,
-        protocol = Protocol
-    } = ibrowse_lib:parse_url(ProxyUrl),
-    Params =
-        [
-            {proxy_host, Host},
-            {proxy_port, Port}
-        ] ++
-            case is_list(User) andalso is_list(Passwd) of
-                false ->
-                    [];
-                true ->
-                    [{proxy_user, User}, {proxy_password, Passwd}]
-            end,
-    case Protocol of
-        socks5 ->
-            [proxy_to_socks5(Param) || Param <- Params];
-        _ ->
-            Params
-    end.
-
--spec proxy_to_socks5({atom(), string()}) -> {atom(), string()}.
-proxy_to_socks5({proxy_host, Val}) ->
-    {socks5_host, Val};
-proxy_to_socks5({proxy_port, Val}) ->
-    {socks5_port, Val};
-proxy_to_socks5({proxy_user, Val}) ->
-    {socks5_user, Val};
-proxy_to_socks5({proxy_password, Val}) ->
-    {socks5_password, Val}.
-
--spec ssl_params([_]) -> [_].
-ssl_params(Url) ->
-    case ibrowse_lib:parse_url(Url) of
-        #url{protocol = https} ->
-            Depth = config:get_integer(
-                "replicator",
-                "ssl_certificate_max_depth",
-                3
-            ),
-            VerifyCerts = config:get_boolean(
-                "replicator",
-                "verify_ssl_certificates",
-                false
-            ),
-            CertFile = config:get("replicator", "cert_file", undefined),
-            KeyFile = config:get("replicator", "key_file", undefined),
-            Password = config:get("replicator", "password", undefined),
-            SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts)],
-            SslOpts1 =
-                case CertFile /= undefined andalso KeyFile /= undefined of
-                    true ->
-                        case Password of
-                            undefined ->
-                                [{certfile, CertFile}, {keyfile, KeyFile}] ++ SslOpts;
-                            _ ->
-                                [
-                                    {certfile, CertFile},
-                                    {keyfile, KeyFile},
-                                    {password, Password}
-                                ] ++ SslOpts
-                        end;
-                    false ->
-                        SslOpts
-                end,
-            [{is_ssl, true}, {ssl_options, SslOpts1}];
-        #url{protocol = http} ->
-            []
-    end.
-
--spec ssl_verify_options(true | false) -> [_].
-ssl_verify_options(true) ->
-    CAFile = config:get("replicator", "ssl_trusted_certificates_file"),
-    [{verify, verify_peer}, {cacertfile, CAFile}];
-ssl_verify_options(false) ->
-    [{verify, verify_none}].
-
 -spec before_doc_update(#doc{}, Db :: any(), couch_db:update_type()) -> #doc{}.
 before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db, _UpdateType) ->
     Doc;
+before_doc_update(#doc{} = Doc, _Db, ?REPLICATED_CHANGES) ->
+    % Skip internal replicator updates
+    Doc;
 before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
-    #user_ctx{
-        roles = Roles,
-        name = Name
-    } = couch_db:get_user_ctx(Db),
-    case lists:member(<<"_replicator">>, Roles) of
+    #user_ctx{roles = Roles, name = Name} = couch_db:get_user_ctx(Db),
+    IsReplicator = lists:member(<<"_replicator">>, Roles),
+    Doc1 = case IsReplicator of
+        true -> Doc;
+        false -> before_doc_update_owner(get_value(?OWNER, Body), Name, Db, Doc)
+    end,
+    IsFailed = get_value(<<"_replication_state">>, Body) =:= <<"failed">>,
+    case IsReplicator orelse Doc1#doc.deleted orelse IsFailed of
         true ->
-            Doc;
+            ok;
         false ->
-            case couch_util:get_value(?OWNER, Body) of
-                undefined ->
-                    Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
-                Name ->
-                    Doc;
-                Other ->
-                    case (catch couch_db:check_is_admin(Db)) of
-                        ok when Other =:= null ->
-                            Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
-                        ok ->
-                            Doc;
-                        _ ->
-                            throw(
-                                {forbidden,
-                                    <<"Can't update replication documents", " from other users.">>}
-                            )
-                    end
+            try
+                couch_replicator_parse:parse_rep_doc(Doc1#doc.body)
+            catch
+                throw:{bad_rep_doc, Error} ->
+                    throw({forbidden, Error})
             end
+    end,
+    Doc1.
+
+before_doc_update_owner(undefined, Name, _Db, #doc{body = {Body}} = Doc) ->
+    Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
+before_doc_update_owner(Name, Name, _Db, #doc{} = Doc) ->
+    Doc;
+before_doc_update_owner(Other, Name, Db, #doc{body = {Body}} = Doc) ->
+    case (catch couch_db:check_is_admin(Db)) of
+        ok when Other =:= null ->
+           Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
+       ok ->
+           Doc;
+       _ ->
+           Err = <<"Can't update replication documents from other users.">>,
+           throw({forbidden, Err})
     end.
 
 -spec after_doc_read(#doc{}, Db :: any()) -> #doc{}.
@@ -707,22 +278,12 @@ after_doc_read(#doc{body = {Body}} = Doc, Db) ->
         ok ->
             Doc;
         _ ->
-            case couch_util:get_value(?OWNER, Body) of
+            case get_value(?OWNER, Body) of
                 Name ->
                     Doc;
                 _Other ->
-                    Source = strip_credentials(
-                        couch_util:get_value(
-                            <<"source">>,
-                            Body
-                        )
-                    ),
-                    Target = strip_credentials(
-                        couch_util:get_value(
-                            <<"target">>,
-                            Body
-                        )
-                    ),
+                    Source = strip_credentials(get_value(<<"source">>, Body)),
+                    Target = strip_credentials(get_value(<<"target">>, Body)),
                     NewBody0 = ?replace(Body, <<"source">>, Source),
                     NewBody = ?replace(NewBody0, <<"target">>, Target),
                     #doc{revs = {Pos, [_ | Revs]}} = Doc,
@@ -765,6 +326,15 @@ error_reason({error, Reason}) ->
 error_reason(Reason) ->
     to_binary(Reason).
 
+to_binary(Val) ->
+    couch_util:to_binary(Val).
+
+get_value(Key, Props) ->
+    couch_util:get_value(Key, Props).
+
+get_json_value(Key, Obj) ->
+    couch_replicator_utils:get_json_value(Key, Obj).
+
 -ifdef(TEST).
 
 -include_lib("couch/include/couch_eunit.hrl").
diff --git a/src/couch_replicator/src/couch_replicator_ids.erl b/src/couch_replicator/src/couch_replicator_ids.erl
index 939070b95..86fe1f26e 100644
--- a/src/couch_replicator/src/couch_replicator_ids.erl
+++ b/src/couch_replicator/src/couch_replicator_ids.erl
@@ -196,10 +196,10 @@ winning_revs_generates_new_id(_) ->
         {<<"source">>, <<"http://foo.example.bar">>},
         {<<"target">>, <<"http://bar.example.foo">>}
     ],
-    Rep1 = couch_replicator_docs:parse_rep_doc_without_id({RepDoc1}),
+    Rep1 = couch_replicator_parse:parse_rep_doc_without_id({RepDoc1}),
     Id1 = replication_id(Rep1),
     RepDoc2 = RepDoc1 ++ [{<<"winning_revs_only">>, true}],
-    Rep2 = couch_replicator_docs:parse_rep_doc_without_id({RepDoc2}),
+    Rep2 = couch_replicator_parse:parse_rep_doc_without_id({RepDoc2}),
     Id2 = replication_id(Rep2),
     ?assertNotEqual(Id1, Id2).
 
@@ -208,10 +208,10 @@ winning_revs_false_same_as_undefined(_) ->
         {<<"source">>, <<"http://foo.example.bar">>},
         {<<"target">>, <<"http://bar.example.foo">>}
     ],
-    Rep1 = couch_replicator_docs:parse_rep_doc_without_id({RepDoc1}),
+    Rep1 = couch_replicator_parse:parse_rep_doc_without_id({RepDoc1}),
     Id1 = replication_id(Rep1),
     RepDoc2 = RepDoc1 ++ [{<<"winning_revs_only">>, false}],
-    Rep2 = couch_replicator_docs:parse_rep_doc_without_id({RepDoc2}),
+    Rep2 = couch_replicator_parse:parse_rep_doc_without_id({RepDoc2}),
     Id2 = replication_id(Rep2),
     ?assertEqual(Id1, Id2).
 
diff --git a/src/couch_replicator/src/couch_replicator_js_functions.hrl b/src/couch_replicator/src/couch_replicator_js_functions.hrl
deleted file mode 100644
index 4f4369075..000000000
--- a/src/couch_replicator/src/couch_replicator_js_functions.hrl
+++ /dev/null
@@ -1,183 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License.  You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--define(REP_DB_DOC_VALIDATE_FUN, <<"
-    function(newDoc, oldDoc, userCtx) {
-        function reportError(error_msg) {
-            log('Error writing document `' + newDoc._id +
-                '\\' to the replicator database: ' + error_msg);
-            throw({forbidden: error_msg});
-        }
-
-        function validateEndpoint(endpoint, fieldName) {
-            if ((typeof endpoint !== 'string') &&
-                ((typeof endpoint !== 'object') || (endpoint === null))) {
-
-                reportError('The `' + fieldName + '\\' property must exist' +
-                    ' and be either a string or an object.');
-            }
-
-            if (typeof endpoint === 'object') {
-                if ((typeof endpoint.url !== 'string') || !endpoint.url) {
-                    reportError('The url property must exist in the `' +
-                        fieldName + '\\' field and must be a non-empty string.');
-                }
-
-                if ((typeof endpoint.auth !== 'undefined') &&
-                    ((typeof endpoint.auth !== 'object') ||
-                        endpoint.auth === null)) {
-
-                    reportError('`' + fieldName +
-                        '.auth\\' must be a non-null object.');
-                }
-
-                if ((typeof endpoint.headers !== 'undefined') &&
-                    ((typeof endpoint.headers !== 'object') ||
-                        endpoint.headers === null)) {
-
-                    reportError('`' + fieldName +
-                        '.headers\\' must be a non-null object.');
-                }
-            }
-        }
-
-        var isReplicator = (userCtx.roles.indexOf('_replicator') >= 0);
-        var isAdmin = (userCtx.roles.indexOf('_admin') >= 0);
-
-        if (isReplicator) {
-            // Always let replicator update the replication document
-            return;
-        }
-
-        if (newDoc._replication_state === 'failed') {
-            // Skip validation in case when we update the document with the
-            // failed state. In this case it might be malformed. However,
-            // replicator will not pay attention to failed documents so this
-            // is safe.
-            return;
-        }
-
-        if (!newDoc._deleted) {
-            validateEndpoint(newDoc.source, 'source');
-            validateEndpoint(newDoc.target, 'target');
-
-            if ((typeof newDoc.create_target !== 'undefined') &&
-                (typeof newDoc.create_target !== 'boolean')) {
-
-                reportError('The `create_target\\' field must be a boolean.');
-            }
-
-            if ((typeof newDoc.winning_revs_only !== 'undefined') &&
-                (typeof newDoc.winning_revs_only !== 'boolean')) {
-
-                reportError('The `winning_revs_only\\' field must be a boolean.');
-            }
-
-            if ((typeof newDoc.continuous !== 'undefined') &&
-                (typeof newDoc.continuous !== 'boolean')) {
-
-                reportError('The `continuous\\' field must be a boolean.');
-            }
-
-            if ((typeof newDoc.doc_ids !== 'undefined') &&
-                !isArray(newDoc.doc_ids)) {
-
-                reportError('The `doc_ids\\' field must be an array of strings.');
-            }
-
-            if ((typeof newDoc.selector !== 'undefined') &&
-                (typeof newDoc.selector !== 'object')) {
-
-                reportError('The `selector\\' field must be an object.');
-            }
-
-            if ((typeof newDoc.filter !== 'undefined') &&
-                ((typeof newDoc.filter !== 'string') || !newDoc.filter)) {
-
-                reportError('The `filter\\' field must be a non-empty string.');
-            }
-
-            if ((typeof newDoc.doc_ids !== 'undefined') &&
-                (typeof newDoc.selector !== 'undefined')) {
-
-                reportError('`doc_ids\\' field is incompatible with `selector\\'.');
-            }
-
-            if ( ((typeof newDoc.doc_ids !== 'undefined') ||
-                  (typeof newDoc.selector !== 'undefined')) &&
-                 (typeof newDoc.filter !== 'undefined') ) {
-
-                reportError('`filter\\' field is incompatible with `selector\\' and `doc_ids\\'.');
-            }
-
-            if ((typeof newDoc.query_params !== 'undefined') &&
-                ((typeof newDoc.query_params !== 'object') ||
-                    newDoc.query_params === null)) {
-
-                reportError('The `query_params\\' field must be an object.');
-            }
-
-            if (newDoc.user_ctx) {
-                var user_ctx = newDoc.user_ctx;
-
-                if ((typeof user_ctx !== 'object') || (user_ctx === null)) {
-                    reportError('The `user_ctx\\' property must be a ' +
-                        'non-null object.');
-                }
-
-                if (!(user_ctx.name === null ||
-                    (typeof user_ctx.name === 'undefined') ||
-                    ((typeof user_ctx.name === 'string') &&
-                        user_ctx.name.length > 0))) {
-
-                    reportError('The `user_ctx.name\\' property must be a ' +
-                        'non-empty string or null.');
-                }
-
-                if (!isAdmin && (user_ctx.name !== userCtx.name)) {
-                    reportError('The given `user_ctx.name\\' is not valid');
-                }
-
-                if (user_ctx.roles && !isArray(user_ctx.roles)) {
-                    reportError('The `user_ctx.roles\\' property must be ' +
-                        'an array of strings.');
-                }
-
-                if (!isAdmin && user_ctx.roles) {
-                    for (var i = 0; i < user_ctx.roles.length; i++) {
-                        var role = user_ctx.roles[i];
-
-                        if (typeof role !== 'string' || role.length === 0) {
-                            reportError('Roles must be non-empty strings.');
-                        }
-                        if (userCtx.roles.indexOf(role) === -1) {
-                            reportError('Invalid role (`' + role +
-                                '\\') in the `user_ctx\\'');
-                        }
-                    }
-                }
-            } else {
-                if (!isAdmin) {
-                    reportError('The `user_ctx\\' property is missing (it is ' +
-                       'optional for admins only).');
-                }
-            }
-        } else {
-            if (!isAdmin) {
-                if (!oldDoc.user_ctx || (oldDoc.user_ctx.name !== userCtx.name)) {
-                    reportError('Replication documents can only be deleted by ' +
-                        'admins or by the users who created them.');
-                }
-            }
-        }
-    }
-">>).
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_parse.erl
similarity index 65%
copy from src/couch_replicator/src/couch_replicator_docs.erl
copy to src/couch_replicator/src/couch_replicator_parse.erl
index a60f1a1e1..a584d0306 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_parse.erl
@@ -10,183 +10,20 @@
 % License for the specific language governing permissions and limitations under
 % the License.
 
--module(couch_replicator_docs).
+-module(couch_replicator_parse).
 
 -export([
     parse_rep_doc/1,
     parse_rep_doc/2,
     parse_rep_db/3,
     parse_rep_doc_without_id/1,
-    parse_rep_doc_without_id/2,
-    before_doc_update/3,
-    after_doc_read/2,
-    ensure_rep_ddoc_exists/1,
-    ensure_cluster_rep_ddoc_exists/1,
-    remove_state_fields/2,
-    update_doc_completed/3,
-    update_failed/3,
-    update_rep_id/1,
-    update_triggered/2,
-    update_error/2
+    parse_rep_doc_without_id/2
 ]).
 
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("ibrowse/include/ibrowse.hrl").
--include_lib("mem3/include/mem3.hrl").
 -include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
 -include("couch_replicator.hrl").
--include("couch_replicator_js_functions.hrl").
-
--import(couch_util, [
-    get_value/2,
-    get_value/3,
-    to_binary/1
-]).
-
--import(couch_replicator_utils, [
-    get_json_value/2,
-    get_json_value/3
-]).
-
--define(REP_DB_NAME, <<"_replicator">>).
--define(REP_DESIGN_DOC, <<"_design/_replicator">>).
--define(OWNER, <<"owner">>).
--define(CTX, {user_ctx, #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]}}).
--define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
-
-remove_state_fields(DbName, DocId) ->
-    update_rep_doc(DbName, DocId, [
-        {<<"_replication_state">>, undefined},
-        {<<"_replication_state_time">>, undefined},
-        {<<"_replication_state_reason">>, undefined},
-        {<<"_replication_id">>, undefined},
-        {<<"_replication_stats">>, undefined}
-    ]).
-
--spec update_doc_completed(binary(), binary(), [_]) -> any().
-update_doc_completed(DbName, DocId, Stats) ->
-    update_rep_doc(DbName, DocId, [
-        {<<"_replication_state">>, <<"completed">>},
-        {<<"_replication_state_reason">>, undefined},
-        {<<"_replication_stats">>, {Stats}}
-    ]),
-    couch_stats:increment_counter([
-        couch_replicator,
-        docs,
-        completed_state_updates
-    ]).
-
--spec update_failed(binary(), binary(), any()) -> any().
-update_failed(DbName, DocId, Error) ->
-    Reason = error_reason(Error),
-    couch_log:error(
-        "Error processing replication doc `~s` from `~s`: ~s",
-        [DocId, DbName, Reason]
-    ),
-    update_rep_doc(DbName, DocId, [
-        {<<"_replication_state">>, <<"failed">>},
-        {<<"_replication_stats">>, undefined},
-        {<<"_replication_state_reason">>, Reason}
-    ]),
-    couch_stats:increment_counter([
-        couch_replicator,
-        docs,
-        failed_state_updates
-    ]).
-
--spec update_triggered(#rep{}, rep_id()) -> ok.
-update_triggered(Rep, {Base, Ext}) ->
-    #rep{
-        db_name = DbName,
-        doc_id = DocId
-    } = Rep,
-    update_rep_doc(DbName, DocId, [
-        {<<"_replication_state">>, <<"triggered">>},
-        {<<"_replication_state_reason">>, undefined},
-        {<<"_replication_id">>, iolist_to_binary([Base, Ext])},
-        {<<"_replication_stats">>, undefined}
-    ]),
-    ok.
-
--spec update_error(#rep{}, any()) -> ok.
-update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) ->
-    Reason = error_reason(Error),
-    BinRepId =
-        case RepId of
-            {Base, Ext} ->
-                iolist_to_binary([Base, Ext]);
-            _Other ->
-                null
-        end,
-    update_rep_doc(DbName, DocId, [
-        {<<"_replication_state">>, <<"error">>},
-        {<<"_replication_state_reason">>, Reason},
-        {<<"_replication_stats">>, undefined},
-        {<<"_replication_id">>, BinRepId}
-    ]),
-    ok.
-
--spec ensure_rep_ddoc_exists(binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb) ->
-    case mem3:belongs(RepDb, ?REP_DESIGN_DOC) of
-        true ->
-            ensure_rep_ddoc_exists(RepDb, ?REP_DESIGN_DOC);
-        false ->
-            ok
-    end.
-
--spec ensure_rep_ddoc_exists(binary(), binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb, DDocId) ->
-    case open_rep_doc(RepDb, DDocId) of
-        {not_found, no_db_file} ->
-            %% database was deleted.
-            ok;
-        {not_found, _Reason} ->
-            DocProps = replication_design_doc_props(DDocId),
-            DDoc = couch_doc:from_json_obj({DocProps}),
-            couch_log:notice("creating replicator ddoc ~p", [RepDb]),
-            {ok, _Rev} = save_rep_doc(RepDb, DDoc);
-        {ok, Doc} ->
-            Latest = replication_design_doc_props(DDocId),
-            {Props0} = couch_doc:to_json_obj(Doc, []),
-            {value, {_, Rev}, Props} = lists:keytake(<<"_rev">>, 1, Props0),
-            case compare_ejson({Props}, {Latest}) of
-                true ->
-                    ok;
-                false ->
-                    LatestWithRev = [{<<"_rev">>, Rev} | Latest],
-                    DDoc = couch_doc:from_json_obj({LatestWithRev}),
-                    couch_log:notice("updating replicator ddoc ~p", [RepDb]),
-                    try
-                        {ok, _} = save_rep_doc(RepDb, DDoc)
-                    catch
-                        throw:conflict ->
-                            %% ignore, we'll retry next time
-                            ok
-                    end
-            end
-    end,
-    ok.
-
--spec ensure_cluster_rep_ddoc_exists(binary()) -> ok.
-ensure_cluster_rep_ddoc_exists(RepDb) ->
-    DDocId = ?REP_DESIGN_DOC,
-    [#shard{name = DbShard} | _] = mem3:shards(RepDb, DDocId),
-    ensure_rep_ddoc_exists(DbShard, DDocId).
-
--spec compare_ejson({[_]}, {[_]}) -> boolean().
-compare_ejson(EJson1, EJson2) ->
-    EjsonSorted1 = couch_replicator_filters:ejsort(EJson1),
-    EjsonSorted2 = couch_replicator_filters:ejsort(EJson2),
-    EjsonSorted1 == EjsonSorted2.
-
--spec replication_design_doc_props(binary()) -> [_].
-replication_design_doc_props(DDocId) ->
-    [
-        {<<"_id">>, DDocId},
-        {<<"language">>, <<"javascript">>},
-        {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
-    ].
 
 % Note: parse_rep_doc can handle filtered replications. During parsing of the
 % replication doc it will make possibly remote http requests to the source
@@ -304,88 +141,6 @@ update_rep_id(Rep) ->
     RepId = couch_replicator_ids:replication_id(Rep),
     Rep#rep{id = RepId}.
 
-update_rep_doc(RepDbName, RepDocId, KVs) ->
-    update_rep_doc(RepDbName, RepDocId, KVs, 1).
-
-update_rep_doc(RepDbName, RepDocId, KVs, Wait) when is_binary(RepDocId) ->
-    try
-        case open_rep_doc(RepDbName, RepDocId) of
-            {ok, LastRepDoc} ->
-                update_rep_doc(RepDbName, LastRepDoc, KVs, Wait * 2);
-            _ ->
-                ok
-        end
-    catch
-        throw:conflict ->
-            Msg = "Conflict when updating replication doc `~s`. Retrying.",
-            couch_log:error(Msg, [RepDocId]),
-            ok = timer:sleep(couch_rand:uniform(erlang:min(128, Wait)) * 100),
-            update_rep_doc(RepDbName, RepDocId, KVs, Wait * 2)
-    end;
-update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) ->
-    NewRepDocBody = lists:foldl(
-        fun
-            ({K, undefined}, Body) ->
-                lists:keydelete(K, 1, Body);
-            ({<<"_replication_state">> = K, State} = KV, Body) ->
-                case get_json_value(K, Body) of
-                    State ->
-                        Body;
-                    _ ->
-                        Body1 = lists:keystore(K, 1, Body, KV),
-                        Timestamp = couch_replicator_utils:iso8601(os:timestamp()),
-                        lists:keystore(
-                            <<"_replication_state_time">>,
-                            1,
-                            Body1,
-                            {<<"_replication_state_time">>, Timestamp}
-                        )
-                end;
-            ({K, _V} = KV, Body) ->
-                lists:keystore(K, 1, Body, KV)
-        end,
-        RepDocBody,
-        KVs
-    ),
-    case NewRepDocBody of
-        RepDocBody ->
-            ok;
-        _ ->
-            % Might not succeed - when the replication doc is deleted right
-            % before this update (not an error, ignore).
-            save_rep_doc(RepDbName, RepDoc#doc{body = {NewRepDocBody}})
-    end.
-
-open_rep_doc(DbName, DocId) ->
-    ioq:maybe_set_io_priority({system, DbName}),
-    case couch_db:open_int(DbName, [?CTX, sys_db]) of
-        {ok, Db} ->
-            try
-                couch_db:open_doc(Db, DocId, [ejson_body])
-            after
-                couch_db:close(Db)
-            end;
-        Else ->
-            Else
-    end.
-
-save_rep_doc(DbName, Doc) ->
-    ioq:maybe_set_io_priority({system, DbName}),
-    {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
-    try
-        couch_db:update_doc(Db, Doc, [])
-    catch
-        % User can accidentally write a VDU which prevents _replicator from
-        % updating replication documents. Avoid crashing replicator and thus
-        % preventing all other replication jobs on the node from running.
-        throw:{forbidden, Reason} ->
-            Msg = "~p VDU function preventing doc update to ~s ~s ~p",
-            couch_log:error(Msg, [?MODULE, DbName, Doc#doc.id, Reason]),
-            {ok, forbidden}
-    after
-        couch_db:close(Db)
-    end.
-
 -spec rep_user_ctx({[_]}) -> #user_ctx{}.
 rep_user_ctx({RepDoc}) ->
     case get_json_value(<<"user_ctx">>, RepDoc) of
@@ -666,104 +421,20 @@ ssl_verify_options(true) ->
 ssl_verify_options(false) ->
     [{verify, verify_none}].
 
--spec before_doc_update(#doc{}, Db :: any(), couch_db:update_type()) -> #doc{}.
-before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db, _UpdateType) ->
-    Doc;
-before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
-    #user_ctx{
-        roles = Roles,
-        name = Name
-    } = couch_db:get_user_ctx(Db),
-    case lists:member(<<"_replicator">>, Roles) of
-        true ->
-            Doc;
-        false ->
-            case couch_util:get_value(?OWNER, Body) of
-                undefined ->
-                    Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
-                Name ->
-                    Doc;
-                Other ->
-                    case (catch couch_db:check_is_admin(Db)) of
-                        ok when Other =:= null ->
-                            Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
-                        ok ->
-                            Doc;
-                        _ ->
-                            throw(
-                                {forbidden,
-                                    <<"Can't update replication documents", " from other users.">>}
-                            )
-                    end
-            end
-    end.
+get_value(Key, Props) ->
+    couch_util:get_value(Key, Props).
 
--spec after_doc_read(#doc{}, Db :: any()) -> #doc{}.
-after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
-    Doc;
-after_doc_read(#doc{body = {Body}} = Doc, Db) ->
-    #user_ctx{name = Name} = couch_db:get_user_ctx(Db),
-    case (catch couch_db:check_is_admin(Db)) of
-        ok ->
-            Doc;
-        _ ->
-            case couch_util:get_value(?OWNER, Body) of
-                Name ->
-                    Doc;
-                _Other ->
-                    Source = strip_credentials(
-                        couch_util:get_value(
-                            <<"source">>,
-                            Body
-                        )
-                    ),
-                    Target = strip_credentials(
-                        couch_util:get_value(
-                            <<"target">>,
-                            Body
-                        )
-                    ),
-                    NewBody0 = ?replace(Body, <<"source">>, Source),
-                    NewBody = ?replace(NewBody0, <<"target">>, Target),
-                    #doc{revs = {Pos, [_ | Revs]}} = Doc,
-                    NewDoc = Doc#doc{body = {NewBody}, revs = {Pos - 1, Revs}},
-                    NewRevId = couch_db:new_revid(NewDoc),
-                    NewDoc#doc{revs = {Pos, [NewRevId | Revs]}}
-            end
-    end.
+get_value(Key, Props, Default) ->
+     couch_util:get_value(Key, Props, Default).
 
--spec strip_credentials
-    (undefined) -> undefined;
-    (binary()) -> binary();
-    ({[_]}) -> {[_]}.
-strip_credentials(undefined) ->
-    undefined;
-strip_credentials(Url) when is_binary(Url) ->
-    re:replace(
-        Url,
-        "http(s)?://(?:[^:]+):[^@]+@(.*)$",
-        "http\\1://\\2",
-        [{return, binary}]
-    );
-strip_credentials({Props0}) ->
-    Props1 = lists:keydelete(<<"headers">>, 1, Props0),
-    % Strip "auth" just like headers, for replication plugins it can be a place
-    % to stash credential that are not necessarily in headers
-    Props2 = lists:keydelete(<<"auth">>, 1, Props1),
-    {Props2}.
-
-error_reason({shutdown, Error}) ->
-    error_reason(Error);
-error_reason({bad_rep_doc, Reason}) ->
-    to_binary(Reason);
-error_reason({error, {Error, Reason}}) when
-    is_atom(Error), is_binary(Reason)
-->
-    to_binary(io_lib:format("~s: ~s", [Error, Reason]));
-error_reason({error, Reason}) ->
-    to_binary(Reason);
-error_reason(Reason) ->
-    to_binary(Reason).
+to_binary(Val) ->
+    couch_util:to_binary(Val).
+
+get_json_value(Key, Obj) ->
+    couch_replicator_utils:get_json_value(Key, Obj).
+
+get_json_value(Key, Obj, Default) ->
+    couch_replicator_util:get_json_value(Key, Obj, Default).
 
 -ifdef(TEST).
 
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 1ba933a5e..06d3052be 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -1149,8 +1149,8 @@ scheduler_job_format_status_test() ->
     Target = <<"http://u:p@h2/d2">>,
     Rep = #rep{
         id = {"base", "+ext"},
-        source = couch_replicator_docs:parse_rep_db(Source, [], []),
-        target = couch_replicator_docs:parse_rep_db(Target, [], []),
+        source = couch_replicator_parse:parse_rep_db(Source, [], []),
+        target = couch_replicator_parse:parse_rep_db(Target, [], []),
         options = [{create_target, true}],
         doc_id = <<"mydoc">>,
         db_name = <<"mydb">>
diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl
index e4a2cd12f..36700d9d5 100644
--- a/src/couch_replicator/src/couch_replicator_utils.erl
+++ b/src/couch_replicator/src/couch_replicator_utils.erl
@@ -13,7 +13,6 @@
 -module(couch_replicator_utils).
 
 -export([
-    parse_rep_doc/2,
     replication_id/2,
     sum_stats/2,
     is_deleted/1,
@@ -95,9 +94,6 @@ replication_id(Rep, Version) ->
 sum_stats(S1, S2) ->
     couch_replicator_stats:sum_stats(S1, S2).
 
-parse_rep_doc(Props, UserCtx) ->
-    couch_replicator_docs:parse_rep_doc(Props, UserCtx).
-
 -spec iso8601(erlang:timestamp()) -> binary().
 iso8601({_Mega, _Sec, _Micro} = Timestamp) ->
     {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time(Timestamp),
@@ -351,7 +347,7 @@ normalize_rep_test_() ->
                     {<<"doc_ids">>, [<<"a">>, <<"c">>, <<"b">>]},
                     {<<"other_field">>, <<"some_value">>}
                 ]},
-            Rep1 = couch_replicator_docs:parse_rep_doc_without_id(EJson1),
+            Rep1 = couch_replicator_parse:parse_rep_doc_without_id(EJson1),
             EJson2 =
                 {[
                     {<<"other_field">>, <<"unrelated">>},
@@ -360,7 +356,7 @@ normalize_rep_test_() ->
                     {<<"doc_ids">>, [<<"c">>, <<"a">>, <<"b">>]},
                     {<<"other_field2">>, <<"unrelated2">>}
                 ]},
-            Rep2 = couch_replicator_docs:parse_rep_doc_without_id(EJson2),
+            Rep2 = couch_replicator_parse:parse_rep_doc_without_id(EJson2),
             ?assertEqual(normalize_rep(Rep1), normalize_rep(Rep2))
         end)
     }.
diff --git a/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl
index 2d5ef96b1..df8074f1f 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl
@@ -304,7 +304,7 @@ replicate(Source, Target) ->
             {<<"target">>, db_url(Target)},
             {<<"continuous">>, true}
         ]},
-    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
+    {ok, Rep} = couch_replicator_parse:parse_rep_doc(RepObject, ?ADMIN_USER),
     ok = couch_replicator_scheduler:add_job(Rep),
     couch_replicator_scheduler:reschedule(),
     Pid = couch_replicator_test_helper:get_pid(Rep#rep.id),
diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
index 6bdb4ecb2..7ba6bc69d 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
@@ -226,7 +226,7 @@ replicate(Source, Target) ->
             % Low connection timeout so _changes feed gets restarted quicker
             {<<"connection_timeout">>, 3000}
         ]},
-    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
+    {ok, Rep} = couch_replicator_parse:parse_rep_doc(RepObject, ?ADMIN_USER),
     ok = couch_replicator_scheduler:add_job(Rep),
     couch_replicator_scheduler:reschedule(),
     {ok, Rep#rep.id}.
diff --git a/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl
index 3468cda73..758c44f2b 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl
@@ -50,7 +50,7 @@ parse_rep_doc_without_proxy(_) ->
             {<<"source">>, <<"http://unproxied.com">>},
             {<<"target">>, <<"http://otherunproxied.com">>}
         ]},
-    Rep = couch_replicator_docs:parse_rep_doc(NoProxyDoc),
+    Rep = couch_replicator_parse:parse_rep_doc(NoProxyDoc),
     ?assertEqual((Rep#rep.source)#httpdb.proxy_url, undefined),
     ?assertEqual((Rep#rep.target)#httpdb.proxy_url, undefined).
 
@@ -62,7 +62,7 @@ parse_rep_doc_with_proxy(_) ->
             {<<"target">>, <<"http://otherunproxied.com">>},
             {<<"proxy">>, ProxyURL}
         ]},
-    Rep = couch_replicator_docs:parse_rep_doc(ProxyDoc),
+    Rep = couch_replicator_parse:parse_rep_doc(ProxyDoc),
     ?assertEqual((Rep#rep.source)#httpdb.proxy_url, binary_to_list(ProxyURL)),
     ?assertEqual((Rep#rep.target)#httpdb.proxy_url, binary_to_list(ProxyURL)).
 
@@ -76,7 +76,7 @@ parse_rep_source_target_proxy(_) ->
             {<<"source_proxy">>, SrcProxyURL},
             {<<"target_proxy">>, TgtProxyURL}
         ]},
-    Rep = couch_replicator_docs:parse_rep_doc(ProxyDoc),
+    Rep = couch_replicator_parse:parse_rep_doc(ProxyDoc),
     ?assertEqual(
         (Rep#rep.source)#httpdb.proxy_url,
         binary_to_list(SrcProxyURL)
@@ -96,7 +96,7 @@ mutually_exclusive_proxy_and_source_proxy(_) ->
         ]},
     ?assertThrow(
         {bad_rep_doc, _},
-        couch_replicator_docs:parse_rep_doc(ProxyDoc)
+        couch_replicator_parse:parse_rep_doc(ProxyDoc)
     ).
 
 mutually_exclusive_proxy_and_target_proxy(_) ->
@@ -109,5 +109,5 @@ mutually_exclusive_proxy_and_target_proxy(_) ->
         ]},
     ?assertThrow(
         {bad_rep_doc, _},
-        couch_replicator_docs:parse_rep_doc(ProxyDoc)
+        couch_replicator_parse:parse_rep_doc(ProxyDoc)
     ).
diff --git a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
index 04c665af5..f413e5cf4 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
@@ -214,7 +214,7 @@ replicate(Source, Target) ->
             {<<"target">>, db_url(Target)},
             {<<"continuous">>, true}
         ]},
-    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
+    {ok, Rep} = couch_replicator_parse:parse_rep_doc(RepObject, ?ADMIN_USER),
     ok = couch_replicator_scheduler:add_job(Rep),
     couch_replicator_scheduler:reschedule(),
     Pid = couch_replicator_test_helper:get_pid(Rep#rep.id),
diff --git a/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl b/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl
index f30bdb1cd..f862527f4 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl
@@ -173,7 +173,7 @@ replicate(Source, Target) ->
     ).
 
 replicate({[_ | _]} = RepObject) ->
-    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
+    {ok, Rep} = couch_replicator_parse:parse_rep_doc(RepObject, ?ADMIN_USER),
     ok = couch_replicator_scheduler:add_job(Rep),
     couch_replicator_scheduler:reschedule(),
     Pid = get_pid(Rep#rep.id),