You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2022/11/22 16:34:00 UTC

[couchdb] 01/01: [wip] Replace the auto-inserted replicator VDU with a BDU

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch improve-replicator-doc-parsing
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit c75ced0f8baf85ec3caad3bf9c92092d4a592c26
Author: Nick Vatamaniuc <va...@gmail.com>
AuthorDate: Tue Nov 22 11:31:54 2022 -0500

    [wip] Replace the auto-inserted replicator VDU with a BDU
    
    Switch out the VDU with a BDU (before_doc_update) check. Couch replicator
    already had a BDU to update the `"owner"` field so we can plug right into it
    and validate everything we need there. This way we'll have only single
    validation and parsing code.
---
 src/couch_replicator/src/couch_replicator.erl      |   6 +-
 .../src/couch_replicator_doc_processor.erl         |   9 +-
 .../src/couch_replicator_doc_processor_worker.erl  |  14 +-
 src/couch_replicator/src/couch_replicator_docs.erl | 564 +++------------------
 src/couch_replicator/src/couch_replicator_ids.erl  |   8 +-
 .../src/couch_replicator_js_functions.hrl          | 183 -------
 ...licator_docs.erl => couch_replicator_parse.erl} | 357 +------------
 .../src/couch_replicator_scheduler_job.erl         |   4 +-
 .../src/couch_replicator_utils.erl                 |   8 +-
 .../test/eunit/couch_replicator_compact_tests.erl  |   2 +-
 .../couch_replicator_error_reporting_tests.erl     |   2 +-
 .../test/eunit/couch_replicator_proxy_tests.erl    |  10 +-
 ...ch_replicator_retain_stats_between_job_runs.erl |   2 +-
 .../test/eunit/couch_replicator_test_helper.erl    |   2 +-
 14 files changed, 110 insertions(+), 1061 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index 39b3903ea..3bb177968 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -25,10 +25,8 @@
 -include_lib("couch/include/couch_db.hrl").
 -include("couch_replicator.hrl").
 -include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
--include_lib("couch_mrview/include/couch_mrview.hrl").
 -include_lib("mem3/include/mem3.hrl").
 
--define(DESIGN_DOC_CREATION_DELAY_MSEC, 1000).
 -define(REPLICATION_STATES, [
     % Just added to scheduler
     initializing,
@@ -58,7 +56,7 @@
     | {error, any()}
     | no_return().
 replicate(PostBody, Ctx) ->
-    {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, Ctx),
+    {ok, Rep0} = couch_replicator_parse:parse_rep_doc(PostBody, Ctx),
     Rep = Rep0#rep{start_time = os:timestamp()},
     #rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep,
     case get_value(cancel, Options, false) of
@@ -138,7 +136,7 @@ replication_states() ->
 
 -spec strip_url_creds(binary() | {[_]}) -> binary().
 strip_url_creds(Endpoint) ->
-    try couch_replicator_docs:parse_rep_db(Endpoint, [], []) of
+    try couch_replicator_parse:parse_rep_db(Endpoint, [], []) of
         #httpdb{url = Url} ->
             iolist_to_binary(couch_util:url_strip_password(Url))
     catch
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index 436d7c44d..eb4c02b49 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -44,9 +44,7 @@
     notify_cluster_event/2
 ]).
 
--include_lib("couch/include/couch_db.hrl").
 -include("couch_replicator.hrl").
--include_lib("mem3/include/mem3.hrl").
 
 -import(couch_replicator_utils, [
     get_json_value/2,
@@ -77,9 +75,8 @@
 
 % couch_multidb_changes API callbacks
 
-db_created(DbName, Server) ->
+db_created(_DbName, Server) ->
     couch_stats:increment_counter([couch_replicator, docs, dbs_created]),
-    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
     Server.
 
 db_deleted(DbName, Server) ->
@@ -89,7 +86,7 @@ db_deleted(DbName, Server) ->
 
 db_found(DbName, Server) ->
     couch_stats:increment_counter([couch_replicator, docs, dbs_found]),
-    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
+    couch_replicator_docs:delete_old_rep_ddoc(DbName),
     Server.
 
 db_change(DbName, {ChangeProps} = Change, Server) ->
@@ -169,7 +166,7 @@ process_updated({DbName, _DocId} = Id, JsonRepDoc) ->
     % should propagate to db_change function and will be recorded as permanent
     % failure in the document. User will have to update the documet to fix the
     % problem.
-    Rep0 = couch_replicator_docs:parse_rep_doc_without_id(JsonRepDoc),
+    Rep0 = couch_replicator_parse:parse_rep_doc_without_id(JsonRepDoc),
     Rep = Rep0#rep{db_name = DbName, start_time = os:timestamp()},
     Filter =
         case couch_replicator_filters:parse(Rep#rep.options) of
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl b/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl
index 22c5f8584..b1014ffa5 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl
@@ -162,7 +162,7 @@ doc_processor_worker_test_() ->
 t_should_add_job() ->
     ?_test(begin
         Id = {?DB, ?DOC1},
-        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+        Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
         ?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)),
         ?assert(added_job())
     end).
@@ -172,7 +172,7 @@ t_already_running_same_docid() ->
     ?_test(begin
         Id = {?DB, ?DOC1},
         mock_already_running(?DB, ?DOC1),
-        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+        Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
         ?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)),
         ?assert(did_not_add_job())
     end).
@@ -182,7 +182,7 @@ t_already_running_transient() ->
     ?_test(begin
         Id = {?DB, ?DOC1},
         mock_already_running(null, null),
-        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+        Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
         ?assertMatch(
             {temporary_error, _},
             maybe_start_replication(
@@ -200,7 +200,7 @@ t_already_running_other_db_other_doc() ->
     ?_test(begin
         Id = {?DB, ?DOC1},
         mock_already_running(<<"otherdb">>, <<"otherdoc">>),
-        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+        Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
         ?assertMatch(
             {permanent_failure, _},
             maybe_start_replication(
@@ -217,7 +217,7 @@ t_already_running_other_db_other_doc() ->
 t_spawn_worker() ->
     ?_test(begin
         Id = {?DB, ?DOC1},
-        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+        Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
         WRef = make_ref(),
         meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, WRef),
         Pid = spawn_worker(Id, Rep, 0, WRef),
@@ -236,7 +236,7 @@ t_spawn_worker() ->
 t_ignore_if_doc_deleted() ->
     ?_test(begin
         Id = {?DB, ?DOC1},
-        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+        Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
         meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, nil),
         ?assertEqual(ignore, maybe_start_replication(Id, Rep, make_ref())),
         ?assertNot(added_job())
@@ -247,7 +247,7 @@ t_ignore_if_doc_deleted() ->
 t_ignore_if_worker_ref_does_not_match() ->
     ?_test(begin
         Id = {?DB, ?DOC1},
-        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+        Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
         meck:expect(
             couch_replicator_doc_processor,
             get_worker_ref,
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index a60f1a1e1..20dbc0d67 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -13,15 +13,14 @@
 -module(couch_replicator_docs).
 
 -export([
-    parse_rep_doc/1,
-    parse_rep_doc/2,
-    parse_rep_db/3,
-    parse_rep_doc_without_id/1,
-    parse_rep_doc_without_id/2,
+    %% parse_rep_doc/1,
+    %% parse_rep_doc/2,
+    %% parse_rep_db/3,
+    %% parse_rep_doc_without_id/1,
+    %% parse_rep_doc_without_id/2,
     before_doc_update/3,
     after_doc_read/2,
-    ensure_rep_ddoc_exists/1,
-    ensure_cluster_rep_ddoc_exists/1,
+    delete_old_rep_ddoc/1,
     remove_state_fields/2,
     update_doc_completed/3,
     update_failed/3,
@@ -31,24 +30,10 @@
 ]).
 
 -include_lib("couch/include/couch_db.hrl").
--include_lib("ibrowse/include/ibrowse.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
 -include("couch_replicator.hrl").
--include("couch_replicator_js_functions.hrl").
 
--import(couch_util, [
-    get_value/2,
-    get_value/3,
-    to_binary/1
-]).
-
--import(couch_replicator_utils, [
-    get_json_value/2,
-    get_json_value/3
-]).
-
--define(REP_DB_NAME, <<"_replicator">>).
+% The ID of now deleted design doc. On every *_replicator db discovery we try
+% to delete it. At some point in the future, remove this logic altogether.
 -define(REP_DESIGN_DOC, <<"_design/_replicator">>).
 -define(OWNER, <<"owner">>).
 -define(CTX, {user_ctx, #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]}}).
@@ -126,176 +111,32 @@ update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) ->
     ]),
     ok.
 
--spec ensure_rep_ddoc_exists(binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb) ->
+-spec delete_old_rep_ddoc(binary()) -> ok.
+delete_old_rep_ddoc(RepDb) ->
     case mem3:belongs(RepDb, ?REP_DESIGN_DOC) of
-        true ->
-            ensure_rep_ddoc_exists(RepDb, ?REP_DESIGN_DOC);
-        false ->
-            ok
+        true -> delete_old_rep_ddoc(RepDb, ?REP_DESIGN_DOC);
+        false -> ok
     end.
 
--spec ensure_rep_ddoc_exists(binary(), binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb, DDocId) ->
+-spec delete_old_rep_ddoc(binary(), binary()) -> ok.
+delete_old_rep_ddoc(RepDb, DDocId) ->
     case open_rep_doc(RepDb, DDocId) of
         {not_found, no_db_file} ->
-            %% database was deleted.
             ok;
         {not_found, _Reason} ->
-            DocProps = replication_design_doc_props(DDocId),
-            DDoc = couch_doc:from_json_obj({DocProps}),
-            couch_log:notice("creating replicator ddoc ~p", [RepDb]),
-            {ok, _Rev} = save_rep_doc(RepDb, DDoc);
+            ok;
         {ok, Doc} ->
-            Latest = replication_design_doc_props(DDocId),
-            {Props0} = couch_doc:to_json_obj(Doc, []),
-            {value, {_, Rev}, Props} = lists:keytake(<<"_rev">>, 1, Props0),
-            case compare_ejson({Props}, {Latest}) of
-                true ->
-                    ok;
-                false ->
-                    LatestWithRev = [{<<"_rev">>, Rev} | Latest],
-                    DDoc = couch_doc:from_json_obj({LatestWithRev}),
-                    couch_log:notice("updating replicator ddoc ~p", [RepDb]),
-                    try
-                        {ok, _} = save_rep_doc(RepDb, DDoc)
-                    catch
-                        throw:conflict ->
-                            %% ignore, we'll retry next time
-                            ok
-                    end
+            DeletedDoc = Doc#doc{deleted = true, body = {[]}},
+            try
+                save_rep_doc(RepDb, DeletedDoc)
+            catch
+                throw:conflict ->
+                    % ignore, we'll retry next time
+                    ok
             end
     end,
     ok.
 
--spec ensure_cluster_rep_ddoc_exists(binary()) -> ok.
-ensure_cluster_rep_ddoc_exists(RepDb) ->
-    DDocId = ?REP_DESIGN_DOC,
-    [#shard{name = DbShard} | _] = mem3:shards(RepDb, DDocId),
-    ensure_rep_ddoc_exists(DbShard, DDocId).
-
--spec compare_ejson({[_]}, {[_]}) -> boolean().
-compare_ejson(EJson1, EJson2) ->
-    EjsonSorted1 = couch_replicator_filters:ejsort(EJson1),
-    EjsonSorted2 = couch_replicator_filters:ejsort(EJson2),
-    EjsonSorted1 == EjsonSorted2.
-
--spec replication_design_doc_props(binary()) -> [_].
-replication_design_doc_props(DDocId) ->
-    [
-        {<<"_id">>, DDocId},
-        {<<"language">>, <<"javascript">>},
-        {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
-    ].
-
-% Note: parse_rep_doc can handle filtered replications. During parsing of the
-% replication doc it will make possibly remote http requests to the source
-% database. If failure or parsing of filter docs fails, parse_doc throws a
-% {filter_fetch_error, Error} excation. This exception should be considered
-% transient in respect to the contents of the document itself, since it depends
-% on netowrk availability of the source db and other factors.
--spec parse_rep_doc({[_]}) -> #rep{}.
-parse_rep_doc(RepDoc) ->
-    {ok, Rep} =
-        try
-            parse_rep_doc(RepDoc, rep_user_ctx(RepDoc))
-        catch
-            throw:{error, Reason} ->
-                throw({bad_rep_doc, Reason});
-            throw:{filter_fetch_error, Reason} ->
-                throw({filter_fetch_error, Reason});
-            Tag:Err ->
-                throw({bad_rep_doc, to_binary({Tag, Err})})
-        end,
-    Rep.
-
--spec parse_rep_doc_without_id({[_]}) -> #rep{}.
-parse_rep_doc_without_id(RepDoc) ->
-    {ok, Rep} =
-        try
-            parse_rep_doc_without_id(RepDoc, rep_user_ctx(RepDoc))
-        catch
-            throw:{error, Reason} ->
-                throw({bad_rep_doc, Reason});
-            Tag:Err ->
-                throw({bad_rep_doc, to_binary({Tag, Err})})
-        end,
-    Rep.
-
--spec parse_rep_doc({[_]}, #user_ctx{}) -> {ok, #rep{}}.
-parse_rep_doc(Doc, UserCtx) ->
-    {ok, Rep} = parse_rep_doc_without_id(Doc, UserCtx),
-    Cancel = get_value(cancel, Rep#rep.options, false),
-    Id = get_value(id, Rep#rep.options, nil),
-    case {Cancel, Id} of
-        {true, nil} ->
-            % Cancel request with no id, must parse id out of body contents
-            {ok, update_rep_id(Rep)};
-        {true, Id} ->
-            % Cancel request with an id specified, so do not parse id from body
-            {ok, Rep};
-        {false, _Id} ->
-            % Not a cancel request, regular replication doc
-            {ok, update_rep_id(Rep)}
-    end.
-
--spec parse_rep_doc_without_id({[_]}, #user_ctx{}) -> {ok, #rep{}}.
-parse_rep_doc_without_id({Props}, UserCtx) ->
-    {SrcProxy, TgtProxy} = parse_proxy_settings(Props),
-    Opts = make_options(Props),
-    case
-        get_value(cancel, Opts, false) andalso
-            (get_value(id, Opts, nil) =/= nil)
-    of
-        true ->
-            {ok, #rep{options = Opts, user_ctx = UserCtx}};
-        false ->
-            Source = parse_rep_db(get_value(<<"source">>, Props), SrcProxy, Opts),
-            Target = parse_rep_db(get_value(<<"target">>, Props), TgtProxy, Opts),
-            {Type, View} =
-                case couch_replicator_filters:view_type(Props, Opts) of
-                    {error, Error} ->
-                        throw({bad_request, Error});
-                    Result ->
-                        Result
-                end,
-            Rep = #rep{
-                source = Source,
-                target = Target,
-                options = Opts,
-                user_ctx = UserCtx,
-                type = Type,
-                view = View,
-                doc_id = get_value(<<"_id">>, Props, null)
-            },
-            % Check if can parse filter code, if not throw exception
-            case couch_replicator_filters:parse(Opts) of
-                {error, FilterError} ->
-                    throw({error, FilterError});
-                {ok, _Filter} ->
-                    ok
-            end,
-            {ok, Rep}
-    end.
-
-parse_proxy_settings(Props) when is_list(Props) ->
-    Proxy = get_value(<<"proxy">>, Props, <<>>),
-    SrcProxy = get_value(<<"source_proxy">>, Props, <<>>),
-    TgtProxy = get_value(<<"target_proxy">>, Props, <<>>),
-
-    case Proxy =/= <<>> of
-        true when SrcProxy =/= <<>> ->
-            Error = "`proxy` is mutually exclusive with `source_proxy`",
-            throw({bad_request, Error});
-        true when TgtProxy =/= <<>> ->
-            Error = "`proxy` is mutually exclusive with `target_proxy`",
-            throw({bad_request, Error});
-        true ->
-            {Proxy, Proxy};
-        false ->
-            {SrcProxy, TgtProxy}
-    end.
-
 % Update a #rep{} record with a replication_id. Calculating the id might involve
 % fetching a filter from the source db, and so it could fail intermetently.
 % In case of a failure to fetch the filter this function will throw a
@@ -386,316 +227,46 @@ save_rep_doc(DbName, Doc) ->
         couch_db:close(Db)
     end.
 
--spec rep_user_ctx({[_]}) -> #user_ctx{}.
-rep_user_ctx({RepDoc}) ->
-    case get_json_value(<<"user_ctx">>, RepDoc) of
-        undefined ->
-            #user_ctx{};
-        {UserCtx} ->
-            #user_ctx{
-                name = get_json_value(<<"name">>, UserCtx, null),
-                roles = get_json_value(<<"roles">>, UserCtx, [])
-            }
-    end.
-
--spec parse_rep_db({[_]} | binary(), [_] | binary(), [_]) -> #httpdb{} | no_return().
-parse_rep_db({Props}, Proxy, Options) ->
-    ProxyParams = parse_proxy_params(Proxy),
-    ProxyURL =
-        case ProxyParams of
-            [] -> undefined;
-            _ -> binary_to_list(Proxy)
-        end,
-    Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)),
-    {AuthProps} = get_value(<<"auth">>, Props, {[]}),
-    {BinHeaders} = get_value(<<"headers">>, Props, {[]}),
-    Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]),
-    DefaultHeaders = (#httpdb{})#httpdb.headers,
-    HttpDb = #httpdb{
-        url = Url,
-        auth_props = AuthProps,
-        headers = lists:ukeymerge(1, Headers, DefaultHeaders),
-        ibrowse_options = lists:keysort(
-            1,
-            [
-                {socket_options, get_value(socket_options, Options)}
-                | ProxyParams ++ ssl_params(Url)
-            ]
-        ),
-        timeout = get_value(connection_timeout, Options),
-        http_connections = get_value(http_connections, Options),
-        retries = get_value(retries, Options),
-        proxy_url = ProxyURL
-    },
-    couch_replicator_utils:normalize_basic_auth(HttpDb);
-parse_rep_db(<<"http://", _/binary>> = Url, Proxy, Options) ->
-    parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options);
-parse_rep_db(<<"https://", _/binary>> = Url, Proxy, Options) ->
-    parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options);
-parse_rep_db(<<_/binary>>, _Proxy, _Options) ->
-    throw({error, local_endpoints_not_supported});
-parse_rep_db(undefined, _Proxy, _Options) ->
-    throw({error, <<"Missing replicator database">>}).
-
--spec maybe_add_trailing_slash(binary() | list()) -> list().
-maybe_add_trailing_slash(Url) when is_binary(Url) ->
-    maybe_add_trailing_slash(?b2l(Url));
-maybe_add_trailing_slash(Url) ->
-    case lists:member($?, Url) of
-        true ->
-            % skip if there are query params
-            Url;
-        false ->
-            case lists:last(Url) of
-                $/ ->
-                    Url;
-                _ ->
-                    Url ++ "/"
-            end
-    end.
-
--spec make_options([_]) -> [_].
-make_options(Props) ->
-    Options0 = lists:ukeysort(1, convert_options(Props)),
-    Options = check_options(Options0),
-    DefWorkers = config:get_integer("replicator", "worker_processes", 4),
-    DefBatchSize = config:get_integer("replicator", "worker_batch_size", 500),
-    DefConns = config:get_integer("replicator", "http_connections", 20),
-    DefTimeout = config:get_integer("replicator", "connection_timeout", 30000),
-    DefRetries = config:get_integer("replicator", "retries_per_request", 5),
-    UseCheckpoints = config:get_boolean("replicator", "use_checkpoints", true),
-    UseBulkGet = config:get_boolean("replicator", "use_bulk_get", true),
-    DefCheckpointInterval = config:get_integer(
-        "replicator",
-        "checkpoint_interval",
-        30000
-    ),
-    {ok, DefSocketOptions} = couch_util:parse_term(
-        config:get(
-            "replicator",
-            "socket_options",
-            "[{keepalive, true}, {nodelay, false}]"
-        )
-    ),
-    lists:ukeymerge(
-        1,
-        Options,
-        lists:keysort(1, [
-            {connection_timeout, DefTimeout},
-            {retries, DefRetries},
-            {http_connections, DefConns},
-            {socket_options, DefSocketOptions},
-            {worker_batch_size, DefBatchSize},
-            {worker_processes, DefWorkers},
-            {use_checkpoints, UseCheckpoints},
-            {use_bulk_get, UseBulkGet},
-            {checkpoint_interval, DefCheckpointInterval}
-        ])
-    ).
-
--spec convert_options([_]) -> [_].
-convert_options([]) ->
-    [];
-convert_options([{<<"cancel">>, V} | _R]) when not is_boolean(V) ->
-    throw({bad_request, <<"parameter `cancel` must be a boolean">>});
-convert_options([{<<"cancel">>, V} | R]) ->
-    [{cancel, V} | convert_options(R)];
-convert_options([{IdOpt, V} | R]) when
-    IdOpt =:= <<"_local_id">>;
-    IdOpt =:= <<"replication_id">>;
-    IdOpt =:= <<"id">>
-->
-    [{id, couch_replicator_ids:convert(V)} | convert_options(R)];
-convert_options([{<<"create_target">>, V} | _R]) when not is_boolean(V) ->
-    throw({bad_request, <<"parameter `create_target` must be a boolean">>});
-convert_options([{<<"create_target">>, V} | R]) ->
-    [{create_target, V} | convert_options(R)];
-convert_options([{<<"create_target_params">>, V} | _R]) when not is_tuple(V) ->
-    throw({bad_request, <<"parameter `create_target_params` must be a JSON object">>});
-convert_options([{<<"create_target_params">>, V} | R]) ->
-    [{create_target_params, V} | convert_options(R)];
-convert_options([{<<"winning_revs_only">>, V} | _R]) when not is_boolean(V) ->
-    throw({bad_request, <<"parameter `winning_revs_only` must be a boolean">>});
-convert_options([{<<"winning_revs_only">>, V} | R]) ->
-    [{winning_revs_only, V} | convert_options(R)];
-convert_options([{<<"continuous">>, V} | _R]) when not is_boolean(V) ->
-    throw({bad_request, <<"parameter `continuous` must be a boolean">>});
-convert_options([{<<"continuous">>, V} | R]) ->
-    [{continuous, V} | convert_options(R)];
-convert_options([{<<"filter">>, V} | R]) ->
-    [{filter, V} | convert_options(R)];
-convert_options([{<<"query_params">>, V} | R]) ->
-    [{query_params, V} | convert_options(R)];
-convert_options([{<<"doc_ids">>, null} | R]) ->
-    convert_options(R);
-convert_options([{<<"doc_ids">>, V} | _R]) when not is_list(V) ->
-    throw({bad_request, <<"parameter `doc_ids` must be an array">>});
-convert_options([{<<"doc_ids">>, V} | R]) ->
-    % Ensure same behaviour as old replicator: accept a list of percent
-    % encoded doc IDs.
-    DocIds = lists:usort([?l2b(couch_httpd:unquote(Id)) || Id <- V]),
-    [{doc_ids, DocIds} | convert_options(R)];
-convert_options([{<<"selector">>, V} | _R]) when not is_tuple(V) ->
-    throw({bad_request, <<"parameter `selector` must be a JSON object">>});
-convert_options([{<<"selector">>, V} | R]) ->
-    [{selector, V} | convert_options(R)];
-convert_options([{<<"worker_processes">>, V} | R]) ->
-    [{worker_processes, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"worker_batch_size">>, V} | R]) ->
-    [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"http_connections">>, V} | R]) ->
-    [{http_connections, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"connection_timeout">>, V} | R]) ->
-    [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"retries_per_request">>, V} | R]) ->
-    [{retries, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"socket_options">>, V} | R]) ->
-    {ok, SocketOptions} = couch_util:parse_term(V),
-    [{socket_options, SocketOptions} | convert_options(R)];
-convert_options([{<<"since_seq">>, V} | R]) ->
-    [{since_seq, V} | convert_options(R)];
-convert_options([{<<"use_checkpoints">>, V} | R]) ->
-    [{use_checkpoints, V} | convert_options(R)];
-convert_options([{<<"use_bulk_get">>, V} | _R]) when not is_boolean(V) ->
-    throw({bad_request, <<"parameter `use_bulk_get` must be a boolean">>});
-convert_options([{<<"use_bulk_get">>, V} | R]) ->
-    [{use_bulk_get, V} | convert_options(R)];
-convert_options([{<<"checkpoint_interval">>, V} | R]) ->
-    [{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)];
-% skip unknown option
-convert_options([_ | R]) ->
-    convert_options(R).
-
--spec check_options([_]) -> [_].
-check_options(Options) ->
-    DocIds = lists:keyfind(doc_ids, 1, Options),
-    Filter = lists:keyfind(filter, 1, Options),
-    Selector = lists:keyfind(selector, 1, Options),
-    case {DocIds, Filter, Selector} of
-        {false, false, false} -> Options;
-        {false, false, _} -> Options;
-        {false, _, false} -> Options;
-        {_, false, false} -> Options;
-        _ -> throw({bad_request, "`doc_ids`,`filter`,`selector` are mutually exclusive"})
-    end.
-
--spec parse_proxy_params(binary() | [_]) -> [_].
-parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
-    parse_proxy_params(?b2l(ProxyUrl));
-parse_proxy_params([]) ->
-    [];
-parse_proxy_params(ProxyUrl) ->
-    #url{
-        host = Host,
-        port = Port,
-        username = User,
-        password = Passwd,
-        protocol = Protocol
-    } = ibrowse_lib:parse_url(ProxyUrl),
-    Params =
-        [
-            {proxy_host, Host},
-            {proxy_port, Port}
-        ] ++
-            case is_list(User) andalso is_list(Passwd) of
-                false ->
-                    [];
-                true ->
-                    [{proxy_user, User}, {proxy_password, Passwd}]
-            end,
-    case Protocol of
-        socks5 ->
-            [proxy_to_socks5(Param) || Param <- Params];
-        _ ->
-            Params
-    end.
-
--spec proxy_to_socks5({atom(), string()}) -> {atom(), string()}.
-proxy_to_socks5({proxy_host, Val}) ->
-    {socks5_host, Val};
-proxy_to_socks5({proxy_port, Val}) ->
-    {socks5_port, Val};
-proxy_to_socks5({proxy_user, Val}) ->
-    {socks5_user, Val};
-proxy_to_socks5({proxy_password, Val}) ->
-    {socks5_password, Val}.
-
--spec ssl_params([_]) -> [_].
-ssl_params(Url) ->
-    case ibrowse_lib:parse_url(Url) of
-        #url{protocol = https} ->
-            Depth = config:get_integer(
-                "replicator",
-                "ssl_certificate_max_depth",
-                3
-            ),
-            VerifyCerts = config:get_boolean(
-                "replicator",
-                "verify_ssl_certificates",
-                false
-            ),
-            CertFile = config:get("replicator", "cert_file", undefined),
-            KeyFile = config:get("replicator", "key_file", undefined),
-            Password = config:get("replicator", "password", undefined),
-            SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts)],
-            SslOpts1 =
-                case CertFile /= undefined andalso KeyFile /= undefined of
-                    true ->
-                        case Password of
-                            undefined ->
-                                [{certfile, CertFile}, {keyfile, KeyFile}] ++ SslOpts;
-                            _ ->
-                                [
-                                    {certfile, CertFile},
-                                    {keyfile, KeyFile},
-                                    {password, Password}
-                                ] ++ SslOpts
-                        end;
-                    false ->
-                        SslOpts
-                end,
-            [{is_ssl, true}, {ssl_options, SslOpts1}];
-        #url{protocol = http} ->
-            []
-    end.
-
--spec ssl_verify_options(true | false) -> [_].
-ssl_verify_options(true) ->
-    CAFile = config:get("replicator", "ssl_trusted_certificates_file"),
-    [{verify, verify_peer}, {cacertfile, CAFile}];
-ssl_verify_options(false) ->
-    [{verify, verify_none}].
-
 -spec before_doc_update(#doc{}, Db :: any(), couch_db:update_type()) -> #doc{}.
 before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db, _UpdateType) ->
     Doc;
+before_doc_update(#doc{} = Doc, _Db, ?REPLICATED_CHANGES) ->
+    % Skip internal replicator updates
+    Doc;
 before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
-    #user_ctx{
-        roles = Roles,
-        name = Name
-    } = couch_db:get_user_ctx(Db),
-    case lists:member(<<"_replicator">>, Roles) of
+    #user_ctx{roles = Roles, name = Name} = couch_db:get_user_ctx(Db),
+    IsReplicator = lists:member(<<"_replicator">>, Roles),
+    Doc1 = case IsReplicator of
+        true -> Doc;
+        false -> before_doc_update_owner(get_value(?OWNER, Body), Name, Db, Doc)
+    end,
+    IsFailed = get_value(<<"_replication_state">>, Body) =:= <<"failed">>,
+    case IsReplicator orelse Doc1#doc.deleted orelse IsFailed of
         true ->
-            Doc;
+            ok;
         false ->
-            case couch_util:get_value(?OWNER, Body) of
-                undefined ->
-                    Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
-                Name ->
-                    Doc;
-                Other ->
-                    case (catch couch_db:check_is_admin(Db)) of
-                        ok when Other =:= null ->
-                            Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
-                        ok ->
-                            Doc;
-                        _ ->
-                            throw(
-                                {forbidden,
-                                    <<"Can't update replication documents", " from other users.">>}
-                            )
-                    end
+            try
+                couch_replicator_parse:parse_rep_doc(Doc1#doc.body)
+            catch
+                throw:{bad_rep_doc, Error} ->
+                    throw({forbidden, Error})
             end
+    end,
+    Doc1.
+
+before_doc_update_owner(undefined, Name, _Db, #doc{body = {Body}} = Doc) ->
+    Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
+before_doc_update_owner(Name, Name, _Db, #doc{} = Doc) ->
+    Doc;
+before_doc_update_owner(Other, Name, Db, #doc{body = {Body}} = Doc) ->
+    case (catch couch_db:check_is_admin(Db)) of
+        ok when Other =:= null ->
+           Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
+       ok ->
+           Doc;
+       _ ->
+           Err = <<"Can't update replication documents from other users.">>,
+           throw({forbidden, Err})
     end.
 
 -spec after_doc_read(#doc{}, Db :: any()) -> #doc{}.
@@ -707,22 +278,12 @@ after_doc_read(#doc{body = {Body}} = Doc, Db) ->
         ok ->
             Doc;
         _ ->
-            case couch_util:get_value(?OWNER, Body) of
+            case get_value(?OWNER, Body) of
                 Name ->
                     Doc;
                 _Other ->
-                    Source = strip_credentials(
-                        couch_util:get_value(
-                            <<"source">>,
-                            Body
-                        )
-                    ),
-                    Target = strip_credentials(
-                        couch_util:get_value(
-                            <<"target">>,
-                            Body
-                        )
-                    ),
+                    Source = strip_credentials(get_value(<<"source">>, Body)),
+                    Target = strip_credentials(get_value(<<"target">>, Body)),
                     NewBody0 = ?replace(Body, <<"source">>, Source),
                     NewBody = ?replace(NewBody0, <<"target">>, Target),
                     #doc{revs = {Pos, [_ | Revs]}} = Doc,
@@ -765,6 +326,15 @@ error_reason({error, Reason}) ->
 error_reason(Reason) ->
     to_binary(Reason).
 
+to_binary(Val) ->
+    couch_util:to_binary(Val).
+
+get_value(Key, Props) ->
+    couch_util:get_value(Key, Props).
+
+get_json_value(Key, Obj) ->
+    couch_replicator_utils:get_json_value(Key, Obj).
+
 -ifdef(TEST).
 
 -include_lib("couch/include/couch_eunit.hrl").
diff --git a/src/couch_replicator/src/couch_replicator_ids.erl b/src/couch_replicator/src/couch_replicator_ids.erl
index 939070b95..86fe1f26e 100644
--- a/src/couch_replicator/src/couch_replicator_ids.erl
+++ b/src/couch_replicator/src/couch_replicator_ids.erl
@@ -196,10 +196,10 @@ winning_revs_generates_new_id(_) ->
         {<<"source">>, <<"http://foo.example.bar">>},
         {<<"target">>, <<"http://bar.example.foo">>}
     ],
-    Rep1 = couch_replicator_docs:parse_rep_doc_without_id({RepDoc1}),
+    Rep1 = couch_replicator_parse:parse_rep_doc_without_id({RepDoc1}),
     Id1 = replication_id(Rep1),
     RepDoc2 = RepDoc1 ++ [{<<"winning_revs_only">>, true}],
-    Rep2 = couch_replicator_docs:parse_rep_doc_without_id({RepDoc2}),
+    Rep2 = couch_replicator_parse:parse_rep_doc_without_id({RepDoc2}),
     Id2 = replication_id(Rep2),
     ?assertNotEqual(Id1, Id2).
 
@@ -208,10 +208,10 @@ winning_revs_false_same_as_undefined(_) ->
         {<<"source">>, <<"http://foo.example.bar">>},
         {<<"target">>, <<"http://bar.example.foo">>}
     ],
-    Rep1 = couch_replicator_docs:parse_rep_doc_without_id({RepDoc1}),
+    Rep1 = couch_replicator_parse:parse_rep_doc_without_id({RepDoc1}),
     Id1 = replication_id(Rep1),
     RepDoc2 = RepDoc1 ++ [{<<"winning_revs_only">>, false}],
-    Rep2 = couch_replicator_docs:parse_rep_doc_without_id({RepDoc2}),
+    Rep2 = couch_replicator_parse:parse_rep_doc_without_id({RepDoc2}),
     Id2 = replication_id(Rep2),
     ?assertEqual(Id1, Id2).
 
diff --git a/src/couch_replicator/src/couch_replicator_js_functions.hrl b/src/couch_replicator/src/couch_replicator_js_functions.hrl
deleted file mode 100644
index 4f4369075..000000000
--- a/src/couch_replicator/src/couch_replicator_js_functions.hrl
+++ /dev/null
@@ -1,183 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License.  You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--define(REP_DB_DOC_VALIDATE_FUN, <<"
-    function(newDoc, oldDoc, userCtx) {
-        function reportError(error_msg) {
-            log('Error writing document `' + newDoc._id +
-                '\\' to the replicator database: ' + error_msg);
-            throw({forbidden: error_msg});
-        }
-
-        function validateEndpoint(endpoint, fieldName) {
-            if ((typeof endpoint !== 'string') &&
-                ((typeof endpoint !== 'object') || (endpoint === null))) {
-
-                reportError('The `' + fieldName + '\\' property must exist' +
-                    ' and be either a string or an object.');
-            }
-
-            if (typeof endpoint === 'object') {
-                if ((typeof endpoint.url !== 'string') || !endpoint.url) {
-                    reportError('The url property must exist in the `' +
-                        fieldName + '\\' field and must be a non-empty string.');
-                }
-
-                if ((typeof endpoint.auth !== 'undefined') &&
-                    ((typeof endpoint.auth !== 'object') ||
-                        endpoint.auth === null)) {
-
-                    reportError('`' + fieldName +
-                        '.auth\\' must be a non-null object.');
-                }
-
-                if ((typeof endpoint.headers !== 'undefined') &&
-                    ((typeof endpoint.headers !== 'object') ||
-                        endpoint.headers === null)) {
-
-                    reportError('`' + fieldName +
-                        '.headers\\' must be a non-null object.');
-                }
-            }
-        }
-
-        var isReplicator = (userCtx.roles.indexOf('_replicator') >= 0);
-        var isAdmin = (userCtx.roles.indexOf('_admin') >= 0);
-
-        if (isReplicator) {
-            // Always let replicator update the replication document
-            return;
-        }
-
-        if (newDoc._replication_state === 'failed') {
-            // Skip validation in case when we update the document with the
-            // failed state. In this case it might be malformed. However,
-            // replicator will not pay attention to failed documents so this
-            // is safe.
-            return;
-        }
-
-        if (!newDoc._deleted) {
-            validateEndpoint(newDoc.source, 'source');
-            validateEndpoint(newDoc.target, 'target');
-
-            if ((typeof newDoc.create_target !== 'undefined') &&
-                (typeof newDoc.create_target !== 'boolean')) {
-
-                reportError('The `create_target\\' field must be a boolean.');
-            }
-
-            if ((typeof newDoc.winning_revs_only !== 'undefined') &&
-                (typeof newDoc.winning_revs_only !== 'boolean')) {
-
-                reportError('The `winning_revs_only\\' field must be a boolean.');
-            }
-
-            if ((typeof newDoc.continuous !== 'undefined') &&
-                (typeof newDoc.continuous !== 'boolean')) {
-
-                reportError('The `continuous\\' field must be a boolean.');
-            }
-
-            if ((typeof newDoc.doc_ids !== 'undefined') &&
-                !isArray(newDoc.doc_ids)) {
-
-                reportError('The `doc_ids\\' field must be an array of strings.');
-            }
-
-            if ((typeof newDoc.selector !== 'undefined') &&
-                (typeof newDoc.selector !== 'object')) {
-
-                reportError('The `selector\\' field must be an object.');
-            }
-
-            if ((typeof newDoc.filter !== 'undefined') &&
-                ((typeof newDoc.filter !== 'string') || !newDoc.filter)) {
-
-                reportError('The `filter\\' field must be a non-empty string.');
-            }
-
-            if ((typeof newDoc.doc_ids !== 'undefined') &&
-                (typeof newDoc.selector !== 'undefined')) {
-
-                reportError('`doc_ids\\' field is incompatible with `selector\\'.');
-            }
-
-            if ( ((typeof newDoc.doc_ids !== 'undefined') ||
-                  (typeof newDoc.selector !== 'undefined')) &&
-                 (typeof newDoc.filter !== 'undefined') ) {
-
-                reportError('`filter\\' field is incompatible with `selector\\' and `doc_ids\\'.');
-            }
-
-            if ((typeof newDoc.query_params !== 'undefined') &&
-                ((typeof newDoc.query_params !== 'object') ||
-                    newDoc.query_params === null)) {
-
-                reportError('The `query_params\\' field must be an object.');
-            }
-
-            if (newDoc.user_ctx) {
-                var user_ctx = newDoc.user_ctx;
-
-                if ((typeof user_ctx !== 'object') || (user_ctx === null)) {
-                    reportError('The `user_ctx\\' property must be a ' +
-                        'non-null object.');
-                }
-
-                if (!(user_ctx.name === null ||
-                    (typeof user_ctx.name === 'undefined') ||
-                    ((typeof user_ctx.name === 'string') &&
-                        user_ctx.name.length > 0))) {
-
-                    reportError('The `user_ctx.name\\' property must be a ' +
-                        'non-empty string or null.');
-                }
-
-                if (!isAdmin && (user_ctx.name !== userCtx.name)) {
-                    reportError('The given `user_ctx.name\\' is not valid');
-                }
-
-                if (user_ctx.roles && !isArray(user_ctx.roles)) {
-                    reportError('The `user_ctx.roles\\' property must be ' +
-                        'an array of strings.');
-                }
-
-                if (!isAdmin && user_ctx.roles) {
-                    for (var i = 0; i < user_ctx.roles.length; i++) {
-                        var role = user_ctx.roles[i];
-
-                        if (typeof role !== 'string' || role.length === 0) {
-                            reportError('Roles must be non-empty strings.');
-                        }
-                        if (userCtx.roles.indexOf(role) === -1) {
-                            reportError('Invalid role (`' + role +
-                                '\\') in the `user_ctx\\'');
-                        }
-                    }
-                }
-            } else {
-                if (!isAdmin) {
-                    reportError('The `user_ctx\\' property is missing (it is ' +
-                       'optional for admins only).');
-                }
-            }
-        } else {
-            if (!isAdmin) {
-                if (!oldDoc.user_ctx || (oldDoc.user_ctx.name !== userCtx.name)) {
-                    reportError('Replication documents can only be deleted by ' +
-                        'admins or by the users who created them.');
-                }
-            }
-        }
-    }
-">>).
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_parse.erl
similarity index 65%
copy from src/couch_replicator/src/couch_replicator_docs.erl
copy to src/couch_replicator/src/couch_replicator_parse.erl
index a60f1a1e1..a584d0306 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_parse.erl
@@ -10,183 +10,20 @@
 % License for the specific language governing permissions and limitations under
 % the License.
 
--module(couch_replicator_docs).
+-module(couch_replicator_parse).
 
 -export([
     parse_rep_doc/1,
     parse_rep_doc/2,
     parse_rep_db/3,
     parse_rep_doc_without_id/1,
-    parse_rep_doc_without_id/2,
-    before_doc_update/3,
-    after_doc_read/2,
-    ensure_rep_ddoc_exists/1,
-    ensure_cluster_rep_ddoc_exists/1,
-    remove_state_fields/2,
-    update_doc_completed/3,
-    update_failed/3,
-    update_rep_id/1,
-    update_triggered/2,
-    update_error/2
+    parse_rep_doc_without_id/2
 ]).
 
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("ibrowse/include/ibrowse.hrl").
--include_lib("mem3/include/mem3.hrl").
 -include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
 -include("couch_replicator.hrl").
--include("couch_replicator_js_functions.hrl").
-
--import(couch_util, [
-    get_value/2,
-    get_value/3,
-    to_binary/1
-]).
-
--import(couch_replicator_utils, [
-    get_json_value/2,
-    get_json_value/3
-]).
-
--define(REP_DB_NAME, <<"_replicator">>).
--define(REP_DESIGN_DOC, <<"_design/_replicator">>).
--define(OWNER, <<"owner">>).
--define(CTX, {user_ctx, #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]}}).
--define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
-
-remove_state_fields(DbName, DocId) ->
-    update_rep_doc(DbName, DocId, [
-        {<<"_replication_state">>, undefined},
-        {<<"_replication_state_time">>, undefined},
-        {<<"_replication_state_reason">>, undefined},
-        {<<"_replication_id">>, undefined},
-        {<<"_replication_stats">>, undefined}
-    ]).
-
--spec update_doc_completed(binary(), binary(), [_]) -> any().
-update_doc_completed(DbName, DocId, Stats) ->
-    update_rep_doc(DbName, DocId, [
-        {<<"_replication_state">>, <<"completed">>},
-        {<<"_replication_state_reason">>, undefined},
-        {<<"_replication_stats">>, {Stats}}
-    ]),
-    couch_stats:increment_counter([
-        couch_replicator,
-        docs,
-        completed_state_updates
-    ]).
-
--spec update_failed(binary(), binary(), any()) -> any().
-update_failed(DbName, DocId, Error) ->
-    Reason = error_reason(Error),
-    couch_log:error(
-        "Error processing replication doc `~s` from `~s`: ~s",
-        [DocId, DbName, Reason]
-    ),
-    update_rep_doc(DbName, DocId, [
-        {<<"_replication_state">>, <<"failed">>},
-        {<<"_replication_stats">>, undefined},
-        {<<"_replication_state_reason">>, Reason}
-    ]),
-    couch_stats:increment_counter([
-        couch_replicator,
-        docs,
-        failed_state_updates
-    ]).
-
--spec update_triggered(#rep{}, rep_id()) -> ok.
-update_triggered(Rep, {Base, Ext}) ->
-    #rep{
-        db_name = DbName,
-        doc_id = DocId
-    } = Rep,
-    update_rep_doc(DbName, DocId, [
-        {<<"_replication_state">>, <<"triggered">>},
-        {<<"_replication_state_reason">>, undefined},
-        {<<"_replication_id">>, iolist_to_binary([Base, Ext])},
-        {<<"_replication_stats">>, undefined}
-    ]),
-    ok.
-
--spec update_error(#rep{}, any()) -> ok.
-update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) ->
-    Reason = error_reason(Error),
-    BinRepId =
-        case RepId of
-            {Base, Ext} ->
-                iolist_to_binary([Base, Ext]);
-            _Other ->
-                null
-        end,
-    update_rep_doc(DbName, DocId, [
-        {<<"_replication_state">>, <<"error">>},
-        {<<"_replication_state_reason">>, Reason},
-        {<<"_replication_stats">>, undefined},
-        {<<"_replication_id">>, BinRepId}
-    ]),
-    ok.
-
--spec ensure_rep_ddoc_exists(binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb) ->
-    case mem3:belongs(RepDb, ?REP_DESIGN_DOC) of
-        true ->
-            ensure_rep_ddoc_exists(RepDb, ?REP_DESIGN_DOC);
-        false ->
-            ok
-    end.
-
--spec ensure_rep_ddoc_exists(binary(), binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb, DDocId) ->
-    case open_rep_doc(RepDb, DDocId) of
-        {not_found, no_db_file} ->
-            %% database was deleted.
-            ok;
-        {not_found, _Reason} ->
-            DocProps = replication_design_doc_props(DDocId),
-            DDoc = couch_doc:from_json_obj({DocProps}),
-            couch_log:notice("creating replicator ddoc ~p", [RepDb]),
-            {ok, _Rev} = save_rep_doc(RepDb, DDoc);
-        {ok, Doc} ->
-            Latest = replication_design_doc_props(DDocId),
-            {Props0} = couch_doc:to_json_obj(Doc, []),
-            {value, {_, Rev}, Props} = lists:keytake(<<"_rev">>, 1, Props0),
-            case compare_ejson({Props}, {Latest}) of
-                true ->
-                    ok;
-                false ->
-                    LatestWithRev = [{<<"_rev">>, Rev} | Latest],
-                    DDoc = couch_doc:from_json_obj({LatestWithRev}),
-                    couch_log:notice("updating replicator ddoc ~p", [RepDb]),
-                    try
-                        {ok, _} = save_rep_doc(RepDb, DDoc)
-                    catch
-                        throw:conflict ->
-                            %% ignore, we'll retry next time
-                            ok
-                    end
-            end
-    end,
-    ok.
-
--spec ensure_cluster_rep_ddoc_exists(binary()) -> ok.
-ensure_cluster_rep_ddoc_exists(RepDb) ->
-    DDocId = ?REP_DESIGN_DOC,
-    [#shard{name = DbShard} | _] = mem3:shards(RepDb, DDocId),
-    ensure_rep_ddoc_exists(DbShard, DDocId).
-
--spec compare_ejson({[_]}, {[_]}) -> boolean().
-compare_ejson(EJson1, EJson2) ->
-    EjsonSorted1 = couch_replicator_filters:ejsort(EJson1),
-    EjsonSorted2 = couch_replicator_filters:ejsort(EJson2),
-    EjsonSorted1 == EjsonSorted2.
-
--spec replication_design_doc_props(binary()) -> [_].
-replication_design_doc_props(DDocId) ->
-    [
-        {<<"_id">>, DDocId},
-        {<<"language">>, <<"javascript">>},
-        {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
-    ].
 
 % Note: parse_rep_doc can handle filtered replications. During parsing of the
 % replication doc it will make possibly remote http requests to the source
@@ -304,88 +141,6 @@ update_rep_id(Rep) ->
     RepId = couch_replicator_ids:replication_id(Rep),
     Rep#rep{id = RepId}.
 
-update_rep_doc(RepDbName, RepDocId, KVs) ->
-    update_rep_doc(RepDbName, RepDocId, KVs, 1).
-
-update_rep_doc(RepDbName, RepDocId, KVs, Wait) when is_binary(RepDocId) ->
-    try
-        case open_rep_doc(RepDbName, RepDocId) of
-            {ok, LastRepDoc} ->
-                update_rep_doc(RepDbName, LastRepDoc, KVs, Wait * 2);
-            _ ->
-                ok
-        end
-    catch
-        throw:conflict ->
-            Msg = "Conflict when updating replication doc `~s`. Retrying.",
-            couch_log:error(Msg, [RepDocId]),
-            ok = timer:sleep(couch_rand:uniform(erlang:min(128, Wait)) * 100),
-            update_rep_doc(RepDbName, RepDocId, KVs, Wait * 2)
-    end;
-update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) ->
-    NewRepDocBody = lists:foldl(
-        fun
-            ({K, undefined}, Body) ->
-                lists:keydelete(K, 1, Body);
-            ({<<"_replication_state">> = K, State} = KV, Body) ->
-                case get_json_value(K, Body) of
-                    State ->
-                        Body;
-                    _ ->
-                        Body1 = lists:keystore(K, 1, Body, KV),
-                        Timestamp = couch_replicator_utils:iso8601(os:timestamp()),
-                        lists:keystore(
-                            <<"_replication_state_time">>,
-                            1,
-                            Body1,
-                            {<<"_replication_state_time">>, Timestamp}
-                        )
-                end;
-            ({K, _V} = KV, Body) ->
-                lists:keystore(K, 1, Body, KV)
-        end,
-        RepDocBody,
-        KVs
-    ),
-    case NewRepDocBody of
-        RepDocBody ->
-            ok;
-        _ ->
-            % Might not succeed - when the replication doc is deleted right
-            % before this update (not an error, ignore).
-            save_rep_doc(RepDbName, RepDoc#doc{body = {NewRepDocBody}})
-    end.
-
-open_rep_doc(DbName, DocId) ->
-    ioq:maybe_set_io_priority({system, DbName}),
-    case couch_db:open_int(DbName, [?CTX, sys_db]) of
-        {ok, Db} ->
-            try
-                couch_db:open_doc(Db, DocId, [ejson_body])
-            after
-                couch_db:close(Db)
-            end;
-        Else ->
-            Else
-    end.
-
-save_rep_doc(DbName, Doc) ->
-    ioq:maybe_set_io_priority({system, DbName}),
-    {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
-    try
-        couch_db:update_doc(Db, Doc, [])
-    catch
-        % User can accidentally write a VDU which prevents _replicator from
-        % updating replication documents. Avoid crashing replicator and thus
-        % preventing all other replication jobs on the node from running.
-        throw:{forbidden, Reason} ->
-            Msg = "~p VDU function preventing doc update to ~s ~s ~p",
-            couch_log:error(Msg, [?MODULE, DbName, Doc#doc.id, Reason]),
-            {ok, forbidden}
-    after
-        couch_db:close(Db)
-    end.
-
 -spec rep_user_ctx({[_]}) -> #user_ctx{}.
 rep_user_ctx({RepDoc}) ->
     case get_json_value(<<"user_ctx">>, RepDoc) of
@@ -666,104 +421,20 @@ ssl_verify_options(true) ->
 ssl_verify_options(false) ->
     [{verify, verify_none}].
 
--spec before_doc_update(#doc{}, Db :: any(), couch_db:update_type()) -> #doc{}.
-before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db, _UpdateType) ->
-    Doc;
-before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
-    #user_ctx{
-        roles = Roles,
-        name = Name
-    } = couch_db:get_user_ctx(Db),
-    case lists:member(<<"_replicator">>, Roles) of
-        true ->
-            Doc;
-        false ->
-            case couch_util:get_value(?OWNER, Body) of
-                undefined ->
-                    Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
-                Name ->
-                    Doc;
-                Other ->
-                    case (catch couch_db:check_is_admin(Db)) of
-                        ok when Other =:= null ->
-                            Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
-                        ok ->
-                            Doc;
-                        _ ->
-                            throw(
-                                {forbidden,
-                                    <<"Can't update replication documents", " from other users.">>}
-                            )
-                    end
-            end
-    end.
+get_value(Key, Props) ->
+    couch_util:get_value(Key, Props).
 
--spec after_doc_read(#doc{}, Db :: any()) -> #doc{}.
-after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
-    Doc;
-after_doc_read(#doc{body = {Body}} = Doc, Db) ->
-    #user_ctx{name = Name} = couch_db:get_user_ctx(Db),
-    case (catch couch_db:check_is_admin(Db)) of
-        ok ->
-            Doc;
-        _ ->
-            case couch_util:get_value(?OWNER, Body) of
-                Name ->
-                    Doc;
-                _Other ->
-                    Source = strip_credentials(
-                        couch_util:get_value(
-                            <<"source">>,
-                            Body
-                        )
-                    ),
-                    Target = strip_credentials(
-                        couch_util:get_value(
-                            <<"target">>,
-                            Body
-                        )
-                    ),
-                    NewBody0 = ?replace(Body, <<"source">>, Source),
-                    NewBody = ?replace(NewBody0, <<"target">>, Target),
-                    #doc{revs = {Pos, [_ | Revs]}} = Doc,
-                    NewDoc = Doc#doc{body = {NewBody}, revs = {Pos - 1, Revs}},
-                    NewRevId = couch_db:new_revid(NewDoc),
-                    NewDoc#doc{revs = {Pos, [NewRevId | Revs]}}
-            end
-    end.
+get_value(Key, Props, Default) ->
+     couch_util:get_value(Key, Props, Default).
 
--spec strip_credentials
-    (undefined) -> undefined;
-    (binary()) -> binary();
-    ({[_]}) -> {[_]}.
-strip_credentials(undefined) ->
-    undefined;
-strip_credentials(Url) when is_binary(Url) ->
-    re:replace(
-        Url,
-        "http(s)?://(?:[^:]+):[^@]+@(.*)$",
-        "http\\1://\\2",
-        [{return, binary}]
-    );
-strip_credentials({Props0}) ->
-    Props1 = lists:keydelete(<<"headers">>, 1, Props0),
-    % Strip "auth" just like headers, for replication plugins it can be a place
-    % to stash credential that are not necessarily in headers
-    Props2 = lists:keydelete(<<"auth">>, 1, Props1),
-    {Props2}.
-
-error_reason({shutdown, Error}) ->
-    error_reason(Error);
-error_reason({bad_rep_doc, Reason}) ->
-    to_binary(Reason);
-error_reason({error, {Error, Reason}}) when
-    is_atom(Error), is_binary(Reason)
-->
-    to_binary(io_lib:format("~s: ~s", [Error, Reason]));
-error_reason({error, Reason}) ->
-    to_binary(Reason);
-error_reason(Reason) ->
-    to_binary(Reason).
+to_binary(Val) ->
+    couch_util:to_binary(Val).
+
+get_json_value(Key, Obj) ->
+    couch_replicator_utils:get_json_value(Key, Obj).
+
+get_json_value(Key, Obj, Default) ->
+    couch_replicator_util:get_json_value(Key, Obj, Default).
 
 -ifdef(TEST).
 
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 1ba933a5e..06d3052be 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -1149,8 +1149,8 @@ scheduler_job_format_status_test() ->
     Target = <<"http://u:p@h2/d2">>,
     Rep = #rep{
         id = {"base", "+ext"},
-        source = couch_replicator_docs:parse_rep_db(Source, [], []),
-        target = couch_replicator_docs:parse_rep_db(Target, [], []),
+        source = couch_replicator_parse:parse_rep_db(Source, [], []),
+        target = couch_replicator_parse:parse_rep_db(Target, [], []),
         options = [{create_target, true}],
         doc_id = <<"mydoc">>,
         db_name = <<"mydb">>
diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl
index e4a2cd12f..36700d9d5 100644
--- a/src/couch_replicator/src/couch_replicator_utils.erl
+++ b/src/couch_replicator/src/couch_replicator_utils.erl
@@ -13,7 +13,6 @@
 -module(couch_replicator_utils).
 
 -export([
-    parse_rep_doc/2,
     replication_id/2,
     sum_stats/2,
     is_deleted/1,
@@ -95,9 +94,6 @@ replication_id(Rep, Version) ->
 sum_stats(S1, S2) ->
     couch_replicator_stats:sum_stats(S1, S2).
 
-parse_rep_doc(Props, UserCtx) ->
-    couch_replicator_docs:parse_rep_doc(Props, UserCtx).
-
 -spec iso8601(erlang:timestamp()) -> binary().
 iso8601({_Mega, _Sec, _Micro} = Timestamp) ->
     {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time(Timestamp),
@@ -351,7 +347,7 @@ normalize_rep_test_() ->
                     {<<"doc_ids">>, [<<"a">>, <<"c">>, <<"b">>]},
                     {<<"other_field">>, <<"some_value">>}
                 ]},
-            Rep1 = couch_replicator_docs:parse_rep_doc_without_id(EJson1),
+            Rep1 = couch_replicator_parse:parse_rep_doc_without_id(EJson1),
             EJson2 =
                 {[
                     {<<"other_field">>, <<"unrelated">>},
@@ -360,7 +356,7 @@ normalize_rep_test_() ->
                     {<<"doc_ids">>, [<<"c">>, <<"a">>, <<"b">>]},
                     {<<"other_field2">>, <<"unrelated2">>}
                 ]},
-            Rep2 = couch_replicator_docs:parse_rep_doc_without_id(EJson2),
+            Rep2 = couch_replicator_parse:parse_rep_doc_without_id(EJson2),
             ?assertEqual(normalize_rep(Rep1), normalize_rep(Rep2))
         end)
     }.
diff --git a/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl
index 2d5ef96b1..df8074f1f 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl
@@ -304,7 +304,7 @@ replicate(Source, Target) ->
             {<<"target">>, db_url(Target)},
             {<<"continuous">>, true}
         ]},
-    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
+    {ok, Rep} = couch_replicator_parse:parse_rep_doc(RepObject, ?ADMIN_USER),
     ok = couch_replicator_scheduler:add_job(Rep),
     couch_replicator_scheduler:reschedule(),
     Pid = couch_replicator_test_helper:get_pid(Rep#rep.id),
diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
index 6bdb4ecb2..7ba6bc69d 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
@@ -226,7 +226,7 @@ replicate(Source, Target) ->
             % Low connection timeout so _changes feed gets restarted quicker
             {<<"connection_timeout">>, 3000}
         ]},
-    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
+    {ok, Rep} = couch_replicator_parse:parse_rep_doc(RepObject, ?ADMIN_USER),
     ok = couch_replicator_scheduler:add_job(Rep),
     couch_replicator_scheduler:reschedule(),
     {ok, Rep#rep.id}.
diff --git a/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl
index 3468cda73..758c44f2b 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl
@@ -50,7 +50,7 @@ parse_rep_doc_without_proxy(_) ->
             {<<"source">>, <<"http://unproxied.com">>},
             {<<"target">>, <<"http://otherunproxied.com">>}
         ]},
-    Rep = couch_replicator_docs:parse_rep_doc(NoProxyDoc),
+    Rep = couch_replicator_parse:parse_rep_doc(NoProxyDoc),
     ?assertEqual((Rep#rep.source)#httpdb.proxy_url, undefined),
     ?assertEqual((Rep#rep.target)#httpdb.proxy_url, undefined).
 
@@ -62,7 +62,7 @@ parse_rep_doc_with_proxy(_) ->
             {<<"target">>, <<"http://otherunproxied.com">>},
             {<<"proxy">>, ProxyURL}
         ]},
-    Rep = couch_replicator_docs:parse_rep_doc(ProxyDoc),
+    Rep = couch_replicator_parse:parse_rep_doc(ProxyDoc),
     ?assertEqual((Rep#rep.source)#httpdb.proxy_url, binary_to_list(ProxyURL)),
     ?assertEqual((Rep#rep.target)#httpdb.proxy_url, binary_to_list(ProxyURL)).
 
@@ -76,7 +76,7 @@ parse_rep_source_target_proxy(_) ->
             {<<"source_proxy">>, SrcProxyURL},
             {<<"target_proxy">>, TgtProxyURL}
         ]},
-    Rep = couch_replicator_docs:parse_rep_doc(ProxyDoc),
+    Rep = couch_replicator_parse:parse_rep_doc(ProxyDoc),
     ?assertEqual(
         (Rep#rep.source)#httpdb.proxy_url,
         binary_to_list(SrcProxyURL)
@@ -96,7 +96,7 @@ mutually_exclusive_proxy_and_source_proxy(_) ->
         ]},
     ?assertThrow(
         {bad_rep_doc, _},
-        couch_replicator_docs:parse_rep_doc(ProxyDoc)
+        couch_replicator_parse:parse_rep_doc(ProxyDoc)
     ).
 
 mutually_exclusive_proxy_and_target_proxy(_) ->
@@ -109,5 +109,5 @@ mutually_exclusive_proxy_and_target_proxy(_) ->
         ]},
     ?assertThrow(
         {bad_rep_doc, _},
-        couch_replicator_docs:parse_rep_doc(ProxyDoc)
+        couch_replicator_parse:parse_rep_doc(ProxyDoc)
     ).
diff --git a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
index 04c665af5..f413e5cf4 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
@@ -214,7 +214,7 @@ replicate(Source, Target) ->
             {<<"target">>, db_url(Target)},
             {<<"continuous">>, true}
         ]},
-    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
+    {ok, Rep} = couch_replicator_parse:parse_rep_doc(RepObject, ?ADMIN_USER),
     ok = couch_replicator_scheduler:add_job(Rep),
     couch_replicator_scheduler:reschedule(),
     Pid = couch_replicator_test_helper:get_pid(Rep#rep.id),
diff --git a/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl b/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl
index f30bdb1cd..f862527f4 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl
@@ -173,7 +173,7 @@ replicate(Source, Target) ->
     ).
 
 replicate({[_ | _]} = RepObject) ->
-    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
+    {ok, Rep} = couch_replicator_parse:parse_rep_doc(RepObject, ?ADMIN_USER),
     ok = couch_replicator_scheduler:add_job(Rep),
     couch_replicator_scheduler:reschedule(),
     Pid = get_pid(Rep#rep.id),