You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2019/07/30 16:58:53 UTC

[couchdb] branch prototype/fdb-replicator updated: WIP 3

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch prototype/fdb-replicator
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/prototype/fdb-replicator by this push:
     new 17f6c23  WIP 3
17f6c23 is described below

commit 17f6c235a6d366f68cfdc1c29a8e9a9fbff4ad24
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Tue Jul 30 12:55:44 2019 -0400

    WIP 3
    
     * Local endpoints have been cleaned up
     * #rep{} records parsed into #{}
      - #httpdb{} records also parsed (include ibrowse options + ssl opts)
     * #httpdb{} endpoint records are initialized from #{}
     * _replicate logic (adding + canceling) interface
     * _replicator doc update handler job adding logic
      - listen for db creates
      - listen for db deletes
      - listen for doc updates
---
 src/couch_jobs/src/couch_jobs.erl                  |  17 ++
 src/couch_replicator/src/couch_replicator.erl      |  50 ++--
 src/couch_replicator/src/couch_replicator.hrl      |   1 +
 .../src/couch_replicator_api_wrap.erl              |  45 +++-
 .../src/couch_replicator_doc_processor.erl         | 229 ++++++++---------
 src/couch_replicator/src/couch_replicator_docs.erl | 280 ++++++++++++---------
 .../src/couch_replicator_filters.erl               |  42 ++--
 src/couch_replicator/src/couch_replicator_ids.erl  |  46 ++--
 .../src/couch_replicator_scheduler_job.erl         |   4 +-
 .../src/couch_replicator_utils.erl                 |  75 +-----
 .../src/couch_replicator_worker.erl                | 140 +----------
 11 files changed, 418 insertions(+), 511 deletions(-)

diff --git a/src/couch_jobs/src/couch_jobs.erl b/src/couch_jobs/src/couch_jobs.erl
index d469ed4..393f256 100644
--- a/src/couch_jobs/src/couch_jobs.erl
+++ b/src/couch_jobs/src/couch_jobs.erl
@@ -19,6 +19,8 @@
     remove/3,
     get_job_data/3,
     get_job_state/3,
+    get_jobs/2,
+    get_jobs/3,
 
     % Job processing
     accept/1,
@@ -103,6 +105,21 @@ get_job_state(Tx, Type, JobId) when is_binary(JobId) ->
     end).
 
 
+-spec get_jobs(jtx(), job_type()) -> #{}.
+get_jobs(Tx, Type) when is_binary(JobId) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+        couch_job_fdb:get_jobs(JTx, Type)
+    end).
+
+
+-spec get_jobs(jtx(), job_type()) -> #{}.
+get_jobs(Tx, Type, Filter) when is_binary(JobId), is_function(Filter, 1) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+        couch_job_fdb:get_jobs(JTx, Type, Filter)
+    end).
+
+
+
 %% Job processor API
 
 -spec accept(job_type()) -> {ok, job(), job_data()} | {error, any()}.
diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index 3dee919..cb50df7 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -112,17 +112,18 @@ wait_for_result(RepId) ->
 
 -spec cancel_replication(rep_id()) ->
     {ok, {cancelled, binary()}} | {error, not_found}.
-cancel_replication({BasedId, Extension} = RepId) ->
-    FullRepId = BasedId ++ Extension,
-    couch_log:notice("Canceling replication '~s' ...", [FullRepId]),
-    case couch_replicator_scheduler:rep_state(RepId) of
-    #rep{} ->
-        ok = couch_replicator_scheduler:remove_job(RepId),
-        couch_log:notice("Replication '~s' cancelled", [FullRepId]),
-        {ok, {cancelled, ?l2b(FullRepId)}};
-    nil ->
-        couch_log:notice("Replication '~s' not found", [FullRepId]),
-        {error, not_found}
+cancel_replication(RepId) when is_binary(RepId) ->
+    couch_log:notice("Canceling replication '~s' ...", [RepId]),
+    case couch_jobs:get_job_data(undefined, ?REP_JOBS, RepId) of
+        {error_not, found} ->
+            {error, not_found};
+        #{<<"rep">> := #{<<"db_name">> := null}} ->
+            couch_jobs:remove(undefined, ?REP_JOBS, RepId)
+            {ok, {cancelled, ?l2b(FullRepId)}};
+        #{<<"rep">> := #{}} ->
+            % Job was started from a replicator doc canceling via _replicate
+            % doesn't quite make sense, instead replicator should be deleted.
+            {error, not_found}
     end.
 
 
@@ -131,10 +132,10 @@ replication_states() ->
     ?REPLICATION_STATES.
 
 
--spec strip_url_creds(binary() | {[_]}) -> binary().
+-spec strip_url_creds(binary() | #{}) -> binary().
 strip_url_creds(Endpoint) ->
-    case couch_replicator_docs:parse_rep_db(Endpoint, [], []) of
-        #httpdb{url=Url} ->
+    case couch_replicator_docs:parse_rep_db(Endpoint, #{}, #{}) of
+        #{<<"url">> := Url} ->
             iolist_to_binary(couch_util:url_strip_password(Url));
         LocalDb when is_binary(LocalDb) ->
             LocalDb
@@ -275,13 +276,13 @@ state_atom(State) when is_atom(State) ->
 
 -spec check_authorization(rep_id(), #user_ctx{}) -> ok | not_found.
 check_authorization(RepId, #user_ctx{name = Name} = Ctx) ->
-    case couch_replicator_scheduler:rep_state(RepId) of
-    #{<<"user_ctx">> := #{<<"name">> := Name}} ->
-        ok;
-    #{} ->
-        couch_httpd:verify_is_server_admin(Ctx);
-    nil ->
-        not_found
+    case couch_jobs:get_job_data(undefined, ?REP_JOBS, RePid) of
+        {error_not, found} ->
+            not_found;
+        #{<<"rep">> := {<<"user">> := Name}} ->
+            ok;
+        #{} ->
+            couch_httpd:verify_is_server_admin(Ctx)
     end.
 
 
@@ -331,13 +332,6 @@ t_replication_not_found() ->
     end).
 
 
-expect_rep_user_ctx(Name, Role) ->
-    meck:expect(couch_replicator_scheduler, rep_state,
-        fun(_Id) ->
-            UserCtx = #user_ctx{name = Name, roles = [Role]},
-            #rep{user_ctx = UserCtx}
-        end).
-
 
 strip_url_creds_test_() ->
      {
diff --git a/src/couch_replicator/src/couch_replicator.hrl b/src/couch_replicator/src/couch_replicator.hrl
index 30cb485..6584078 100644
--- a/src/couch_replicator/src/couch_replicator.hrl
+++ b/src/couch_replicator/src/couch_replicator.hrl
@@ -31,6 +31,7 @@
 }).
 
 -type rep_id() :: binary().
+-type user_name() :: binary() | null.
 -type db_doc_id() :: {binary(), binary() | '_'}.
 -type seconds() :: non_neg_integer().
 -type rep_start_result() ::
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index 24ec99b..66319bf 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -23,8 +23,8 @@
 -include("couch_replicator_api_wrap.hrl").
 
 -export([
-    db_open/2,
-    db_open/4,
+    db_open/1,
+    db_open/3,
     db_close/1,
     get_db_info/1,
     get_pending_count/2,
@@ -64,11 +64,11 @@ db_uri(#httpdb{url = Url}) ->
     couch_util:url_strip_password(Url).
 
 
-db_open(Db, #{} = UserCtxMap) when is_map(Db) orelse is_binary(Db) ->
-    db_open(Db, #{} = UserCtxMap, false, []);
+db_open(#{} = Db) ->
+    db_open(Db, false, []);
 
 
-db_open(#{} = Db0, #httpdb{} = Db1, #{} =_UserCtxMap, Create, CreateParams) ->
+db_open(#{} = Db0, Create, CreateParams) ->
     {ok, Db} = couch_replicator_httpc:setup(db_from_json(Db0)),
     try
         case Create of
@@ -910,6 +910,14 @@ db_from_json(#{} = DbMap) ->
         [{binary_to_list(K), binary_to_list(V)} | Acc]
     end, [], Headers0),
     IBrowseOptions0 = maps:fold(fun
+        (<<"proxy_protocol">>, V, Acc) ->
+            [{binary_to_atom(K), binary_to_existing_atom(V)} | Acc];
+        (<<"socket_options">>, #{} = SockOpts, Acc) ->
+            SockOptsKVs = maps:fold(fun sock_opts_fold/3, [], SockOpts),
+            [{socket_options, SockOptsKVs} | Acc];
+        (<<"ssl_options">>, #{} = SslOpts, Acc) ->
+            SslOptsKVs = maps:fold(fun ssl_opts_fold/3, [], SslOpts),
+            [{ssl_options, SslOptsKVs} | Acc];
         (K, V, Acc) when is_binary(V) ->
             [{binary_to_atom(K), binary_to_list(V)} | Acc];
         (K, V, Acc) ->
@@ -929,3 +937,30 @@ db_from_json(#{} = DbMap) ->
         retries = Retries,
         proxy_url = ProxyURL
     }.
+
+
+
+% See couch_replicator_docs:ssl_params/1 for ssl parsed options
+% and http://erlang.org/doc/man/ssl.html#type-server_option
+% all latest SSL server options
+%
+ssl_opts_fold(K, V, Acc) when is_boolean(V); is_integer(V) ->
+    [{binary_to_atom(K), V} | Acc];
+
+ssl_opts_fold(K, null, Acc) ->
+    [{binary_to_atom(K), undefined} | Acc];
+
+ssl_opts_fold(<<"verify">>, V, Acc) ->
+    [{binary_to_atom(K), binary_to_atom(V)};
+
+ssl_opts_fold(K, V, Acc) when is_list(V) ->
+    [{binary_to_atom(K), binary_to_list(V)} | Acc].
+
+
+% See ?VALID_SOCK_OPTS in couch_replicator_docs for accepted socket options
+%
+sock_opts_fold(K, V, Acc) when is_list(V) ->
+     [{binary_to_atom(K), binary_to_atom(V)} | Acc];
+
+sock_opts_fold(K, V, Acc) when is_boolean(V); is_integer(V) ->
+    [{binary_to_atom(K), V} | Acc].
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index a2887a0..74ec014 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -75,16 +75,7 @@
 
 during_doc_update(#doc{} = Doc, Db, _UpdateType) ->
     couch_stats:increment_counter([couch_replicator, docs, db_changes]),
-    try
-        process_change(Db, Doc)
-    catch
-        _Tag:Error ->
-            DocId = Doc#doc.id,
-            #{name := DbName} = Db,
-            couch_replicator_docs:update_failed(DbName, DocId, Error)
-    end,
-    ok.
-
+    ok = process_change(Db, Doc).
 
 after_db_create(#{name := DbName}) ->
     couch_stats:increment_counter([couch_replicator, docs, dbs_created]),
@@ -93,11 +84,7 @@ after_db_create(#{name := DbName}) ->
 
 after_db_delete(#{name := DbName}) ->
     couch_stats:increment_counter([couch_replicator, docs, dbs_deleted]),
-    gen_server:call(?MODULE, {clean_up_replications, DbName}, infinity).
-
-
-docs_job_id(DbName, Id) when is_binary(DbName), is_binary(Id) ->
-    <<DbName/binary, Id/binary>>.
+    remove_replications_by_dbname(DbName).
 
 
 process_change(_Db, #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>}) ->
@@ -105,86 +92,84 @@ process_change(_Db, #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>}) ->
 
 process_change(#{name := DbName} = Db, #doc{deleted = true} = Doc) ->
     Id = docs_job_id(DbName, Doc#doc.id),
-    case couch_jobs:get_job_data(Db, ?REP_DOCS, Id) of
-        {error, not_found} ->
-            ok;
-        {ok, #{<<"rid">> := null}} ->
-            couch_jobs:remove(Db, ?REP_DOCS, Id),
-            ok;
-        {ok, #{<<"rid">> := RepId}} ->
-            couch_jobs:remove(Db, ?REP_JOBS, RepId),
-            couch_jobs:remove(Db, ?REP_DOCS, Id),
-            ok
-    end.
-
+    ok = remove_replication_by_doc_job_id(Db, Id);
 
 process_change(#{name := DbName} = Db, #doc{} = Doc) ->
     #doc{id = DocId, body = {Props} = Body} = Doc,
-    Id = docs_job_id(DbName, DocId),
-    DocState = get_json_value(<<"_replication_state">>, Props),
-    {Rep, RepParseError} = try
+    {Rep, RepError} = try
         Rep0 = couch_replicator_docs:parse_rep_doc_without_id(Body),
-        Rep1 = Rep0#{db_name = DbName, start_time = erlang:system_time()},
+        Rep1 = Rep0#{
+            <<"db_name">> => DbName,
+            <<"start_time">> => erlang:system_time()
+        },
         {Rep1, null}
     catch
         throw:{bad_rep_doc, Reason} ->
-            {null, Reason}
+            {null, couch_replicator_utils:rep_error_to_binary(Reason)}
     end,
-    RepMap = couch_replicator_docs:rep_to_map(Rep),
-    case couch_jobs:get_job_data(Db, ?REP_DOCS, Id) of
+    % We keep track of the doc's state in order to clear it if update_docs
+    % is toggled from true to false
+    DocState = get_json_value(<<"_replication_state">>, Props, null),
+    case couch_jobs:get_job_data(Db, ?REP_DOCS, docs_job_id(DbName, DocId)) of
         {error, not_found} ->
-            RepDocsJob = #{
-                <<"rep_id">> := null,
-                <<"db_name">> := DbName,
-                <<"doc_id">> := Doc#doc.id,
-                <<"rep">> := RepMap,
-                <<"rep_parse_error">> := RepParseError
-            },
-            couch_jobs:add(Db, ?REP_DOCS, RepDocsJob);
-        {ok, #{} = Old} ->
-            % Normalize old rep and new rep and only update job
-            % if changed
-            #{<<"rep">> := OldRep, <<"rep_parse_error">> := OldError} = Old,
-            NOldRep = couch_replicator_util:normalize_rep(OldRep),
-            NRep = couch_replicator_util:normalize_rep(Rep),
-            RepDocsJob = #{
-                <<"rep_id">> := null,
-                <<"db_name">> := DbName,
-                <<"doc_id">> := Doc#doc.id,
-                <<"rep">> := RepMap,
-                <<"rep_parse_error">> := RepParseError
-            }
+            update_replication_job(Db, DbName, DocId, Rep, RepError, DocState);
+        {ok, #{<<"rep">> := null, <<"rep_parse_error">> := RepError}}
+                when Rep =:= null ->
+            % Same error as before occurred, don't bother updating the job
+            ok;
+        {ok, #{<<"rep">> := null}} when Rep =:= null ->
+            % Error occured but it's a different error. Update the job so user
+            % sees the new error
+            update_replication_job(Db, DbName, DocId, Rep, RepError, DocState);
+        {ok, #{<<"rep">> := OldRep, <<"rep_parse_error">> := OldError}} ->
+            NormOldRep = couch_replicator_util:normalize_rep(OldRep),
+            NormRep = couch_replicator_util:normalize_rep(Rep),
+            case NormOldRep == NormRep of
+                true ->
+                    % Document was changed but none of the parameters relevent
+                    % for the replication job have changed, so make it a no-op
+                    ok;
+                false ->
+                    update_replication_job(Db, DbName, DocId, Rep, RepError,
+                        DocState)
+            end
     end.
 
 
-process_change(#{name := DbName} = Db, #doc{} = Doc) ->
-    #doc{id = DocId, body = {Props} = Body, deleted = Deleted} = Doc,
-    Id = {DbName, DocId},
-    case Deleted of
-        true ->
-            process_deleted_doc(Db, Id);
-        false ->
-            process_updated_doc(Db, Id, Body)
-    end.
-    State = get_json_value(<<"_replication_state">>, Props),
-    case {Deleted, State} of
-        {true, _} ->
-            ok = gen_server:call(?MODULE, {removed, Id}, infinity);
-        {false, undefined} ->
-            ok = process_updated(Id, Body);
-        {false, <<"triggered">>} ->
-            maybe_remove_state_fields(DbName, DocId),
-            ok = process_updated(Id, Body);
-        {false, <<"completed">>} ->
-            ok = gen_server:call(?MODULE, {completed, Id}, infinity);
-        {false, <<"error">>} ->
-            % Handle replications started from older versions of replicator
-            % which wrote transient errors to replication docs
-            maybe_remove_state_fields(DbName, DocId),
-            ok = process_updated(Id, Body);
-        {false, <<"failed">>} ->
-            ok
-    end.
+rep_docs_job_execute(#{} = Job, #{<<"rep">> := null} = JobData) ->
+    #{
+        <<"rep_parse_error">> := Error,
+        <<"db_name">> := DbName,
+        <<"doc_id">> := DocId,
+    } = JobData,
+    JobData1 = JobData#{
+        <<"finished_state">> := <<"failed">>,
+        <<"finished_result">> := Error
+    }
+    case couch_jobs:finish(undefined, Job, JobData1) of
+        ok ->
+            couch_replicator_docs:update_failed(DbName, DocId, Error),
+            ok;
+        {error, JobError} ->
+            Msg = "Replication ~s job could not finish. JobError:~p",
+            couch_log:error(Msg, [RepId, JobError]),
+            {error, JobError}
+    end;
+
+rep_docs_job_execute(#{} = Job, #{} = JobData) ->
+    #{<<"rep">> := Rep, <<"doc_state">> := DocState} = JobData,
+    case lists:member(DocState, [<<"triggered">>, <<"error">>]) of
+        true -> maybe_remove_state_fields(DbName, DocId),
+        false -> ok
+    end,
+    % completed jobs should finish right away
+
+    % otherwise start computing the replication id
+
+    Rep1 = update_replication_id(Rep),
+
+    % when done add or update the replicaton job
+    % if jobs has a filter keep checking if filter changes
 
 
 maybe_remove_state_fields(DbName, DocId) ->
@@ -607,21 +592,57 @@ ejson_doc_state_filter(State, States) when is_list(States), is_atom(State) ->
     lists:member(State, States).
 
 
--spec cluster_membership_foldl(#rdoc{}, nil) -> nil.
-cluster_membership_foldl(#rdoc{id = {DbName, DocId} = Id, rid = RepId}, nil) ->
-    case couch_replicator_clustering:owner(DbName, DocId) of
-        unstable ->
-            nil;
-        ThisNode when ThisNode =:= node() ->
-            nil;
-        OtherNode ->
-            Msg = "Replication doc ~p:~p with id ~p usurped by node ~p",
-            couch_log:notice(Msg, [DbName, DocId, RepId, OtherNode]),
-            removed_doc(Id),
-            nil
+-spec update_replication(any(), binary(), binary(), #{} | null,
+    binary() | null, binary() | null) -> ok.
+update_replication_job(Tx, DbName, DocId, Rep, RepParseError, DocState) ->
+    JobId = docs_job_id(DbName, DocId),
+    ok = remove_replication_by_doc_job_id(Db, JobId),
+    RepDocsJob = #{
+        <<"rep_id">> := null,
+        <<"db_name">> := DbName,
+        <<"doc_id">> := DocId,
+        <<"rep">> := Rep,
+        <<"rep_parse_error">> := RepParseError,
+        <<"doc_state">> := DocState
+    },
+    ok = couch_jobs:add(Tx, ?REP_DOCS, RepDocsJob).
+
+
+docs_job_id(DbName, Id) when is_binary(DbName), is_binary(Id) ->
+    <<DbName/binary, "|", Id/binary>>.
+
+
+-spec remove_replication_by_doc_job_id(Tx, Id) -> ok.
+remove_replication_by_doc_job_id(Tx, Id) ->
+    case couch_jobs:get_job_data(Tx, ?REP_DOCS, Id) of
+        {error, not_found} ->
+            ok;
+        {ok, #{<<"rep_id">> := null}} ->
+            couch_jobs:remove(Tx, ?REP_DOCS, Id),
+            ok;
+        {ok, #{<<"rep_id">> := RepId}} ->
+            couch_jobs:remove(Tx, ?REP_JOBS, RepId),
+            couch_jobs:remove(Tx, ?REP_DOCS, Id),
+            ok
     end.
 
 
+-spec remove_replications_by_dbname(DbName) -> ok.
+remove_replications_by_dbname(DbName) ->
+    DbNameSize = byte_size(DbName),
+    Filter = fun
+        (<<DbName:DbNameSize/binary, "|", _, _/binary>>) -> true;
+        (_) -> false
+    end,
+    JobsMap = couch_job:get_jobs(undefined, ?REP_DOCS, Filter),
+    % Batch these into smaller transactions eventually...
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+        maps:map(fun(Id, _) ->
+            remove_replication_by_doc_job_id(JTx, Id)
+        end, JobsMap)
+    end).
+
+
 -ifdef(TEST).
 
 -include_lib("eunit/include/eunit.hrl").
@@ -650,8 +671,7 @@ doc_processor_test_() ->
             t_failed_change(),
             t_change_for_different_node(),
             t_change_when_cluster_unstable(),
-            t_ejson_docs(),
-            t_cluster_membership_foldl()
+            t_ejson_docs()
         ]
     }.
 
@@ -803,21 +823,6 @@ t_ejson_docs() ->
     end).
 
 
-% Check that when cluster membership changes records from doc processor and job
-% scheduler get removed
-t_cluster_membership_foldl() ->
-   ?_test(begin
-        mock_existing_jobs_lookup([test_rep(?R1)]),
-        ?assertEqual(ok, process_change(?DB, change())),
-        meck:expect(couch_replicator_clustering, owner, 2, different_node),
-        ?assert(ets:member(?MODULE, {?DB, ?DOC1})),
-        gen_server:cast(?MODULE, {cluster, stable}),
-        meck:wait(2, couch_replicator_scheduler, find_jobs_by_doc, 2, 5000),
-        ?assertNot(ets:member(?MODULE, {?DB, ?DOC1})),
-        ?assert(removed_job(?R1))
-   end).
-
-
 get_worker_ref_test_() ->
     {
         setup,
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index b427d9a..3d2c2d7 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -18,7 +18,6 @@
     parse_rep_db/3,
     parse_rep_doc_without_id/1,
     parse_rep_doc_without_id/2,
-    rep_to_map/1,
     before_doc_update/3,
     after_doc_read/2,
     ensure_rep_db_exists/0,
@@ -62,6 +61,7 @@
     keepalive, nodelay, recbuf, send_timeout, send_timout_close, sndbuf,
     priority, tos, tclass
 ]).
+
 -define(CONFIG_DEFAULTS, [
     {"worker_processes",    "4",                fun list_to_integer/1},
     {"worker_batch_size",   "500",              fun list_to_integer/1},
@@ -116,14 +116,17 @@ update_triggered(Id, DocId, DbName) ->
     ok.
 
 
--spec update_error(#rep{}, any()) -> ok.
-update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) ->
+-spec update_error(#{}, any()) -> ok.
+update_error(#rep{} = Rep, Error) ->
+    #{
+        <<"id">> := RepId0,
+        <<"db_name">> := DbName,
+        <<"doc_id">> := DocId,
+    } = Rep,
     Reason = error_reason(Error),
-    BinRepId = case RepId of
-        {Base, Ext} ->
-            iolist_to_binary([Base, Ext]);
-        _Other ->
-            null
+    RepId = case RepId0 of
+        Id when is_binary(Id) -> Id;
+        _Other -> null
     end,
     update_rep_doc(DbName, DocId, [
         {<<"_replication_state">>, <<"error">>},
@@ -198,7 +201,7 @@ replication_design_doc_props(DDocId) ->
 -spec parse_rep_doc_without_id({[_]}) -> #{}.
 parse_rep_doc_without_id(RepDoc) ->
     {ok, Rep} = try
-        parse_rep_doc_without_id(RepDoc, rep_user_ctx(RepDoc))
+        parse_rep_doc_without_id(RepDoc, rep_user_name(RepDoc))
     catch
         throw:{error, Reason} ->
             throw({bad_rep_doc, Reason});
@@ -208,9 +211,9 @@ parse_rep_doc_without_id(RepDoc) ->
     Rep.
 
 
--spec parse_rep_doc({[_]}, #user_ctx{}) -> {ok, #{}}.
-parse_rep_doc(Doc, UserCtx) ->
-    {ok, Rep} = parse_rep_doc_without_id(Doc, UserCtx),
+-spec parse_rep_doc({[_]}, user_name()) -> {ok, #{}}.
+parse_rep_doc({[_]} = Doc, UserName) ->
+    {ok, Rep} = parse_rep_doc_without_id(Doc, UserName),
     #{<<"options">> := Options} = Rep,
     Cancel = maps:get(<<"cancel">>, Options, false),
     Id = maps:get(<<"id">>, Options, nil),
@@ -227,44 +230,43 @@ parse_rep_doc(Doc, UserCtx) ->
     end.
 
 
--spec parse_rep_doc_without_id({[_]} | #{}, #user_ctx{}) -> {ok, #{}}.
-parse_rep_doc_without_id({[_]} = EJSon, UserCtx) ->
+-spec parse_rep_doc_without_id({[_]} | #{}, user_name()) -> {ok, #{}}.
+parse_rep_doc_without_id({[_]} = EJson, UserName) ->
     % Normalize all field names to be binaries and turn into a map
-    Map = ?JSON_DECODE(?JSON_ENCODE(EJSon)),
-    parse_rep_doc_without_id(Map, UserCtx);
+    Map = ?JSON_DECODE(?JSON_ENCODE(EJson)),
+    parse_rep_doc_without_id(Map, UserName);
 
-parse_rep_doc_without_id(#{} = Doc, UserCtx) ->
-    Proxy = maps:get(<<"proxy">>, Doc, <<>>),
+parse_rep_doc_without_id(#{} = Doc, UserName) ->
+    Proxy = parse_proxy_params(maps:get(<<"proxy">>, Doc, <<>>)),
     Opts = make_options(Doc),
     Cancel = maps:get(<<"cancel">>, Opts, false),
     Id = maps:get(<<"id">>, Opts, nil),
-    case Cancel andalso Id =/= nill of
+    case Cancel andalso Id =/= nil of
     true ->
-        {ok, #{<<"options">> => Opts, user_ctx = UserCtx}};
+        {ok, #{<<"options">> => Opts, <<"user">> => UserName}};
     false ->
-        Source = parse_rep_db(get_value(<<"source">>, Props), Proxy, Opts),
-        Target = parse_rep_db(get_value(<<"target">>, Props), Proxy, Opts),
+        #{<<"source">> := Source0, <<"target">> := Target0} = Doc,
+        Source = parse_rep_db(Source0, Proxy, Opts),
+        Target = parse_rep_db(Target0, Proxy, Opts),
         {Type, View} = case couch_replicator_filters:view_type(Props, Opts) of
-        {error, Error} ->
-            throw({bad_request, Error});
-        Result ->
-            Result
+            {error, Error} -> throw({bad_request, Error});
+            Result -> Result
         end,
-        Rep = #rep{
-            source = Source,
-            target = Target,
-            options = Opts,
-            user_ctx = UserCtx,
-            type = Type,
-            view = View,
-            doc_id = get_value(<<"_id">>, Props, null)
+        Rep = #{
+            <<"id">> => null,
+            <<"base_id">> => null,
+            <<"source">> => Source,
+            <<"target">> => Target,
+            <<"options">> => Opts,
+            <<"user">> => UserName,
+            <<"type">> => Type,
+            <<"view">> => View,
+            <<"doc_id">> => maps:get(<<"_id">>, Doc, null)
         },
         % Check if can parse filter code, if not throw exception
         case couch_replicator_filters:parse(Opts) of
-        {error, FilterError} ->
-            throw({error, FilterError});
-        {ok, _Filter} ->
-             ok
+            {error, FilterError} -> throw({error, FilterError});
+            {ok, _Filter} -> ok
         end,
         {ok, Rep}
     end.
@@ -277,7 +279,7 @@ parse_rep_doc_without_id(#{} = Doc, UserCtx) ->
 update_rep_id(#{} = Rep) ->
     {BaseId, ExtId} = couch_replicator_ids:replication_id(Rep),
     RepId = erlang:iolist_to_binary([BaseId, ExtId]),
-    Rep#{<<"id">> => RepId, <<"base_id">> = list_toBaseId}.
+    Rep#{<<"id">> => RepId, <<"base_id">> = BaseId}.
 
 
 update_rep_doc(RepDbName, RepDocId, KVs) ->
@@ -356,71 +358,73 @@ save_rep_doc(DbName, Doc) ->
     end.
 
 
--spec rep_user_ctx({[_]}) -> #user_ctx{}.
-rep_user_ctx({RepDoc}) ->
+-spec rep_user_name({[_]}) -> binary() | null.
+rep_user_name({RepDoc}) ->
     case get_json_value(<<"user_ctx">>, RepDoc) of
-    undefined ->
-        #user_ctx{};
-    {UserCtx} ->
-        #user_ctx{
-            name = get_json_value(<<"name">>, UserCtx, null),
-            roles = get_json_value(<<"roles">>, UserCtx, [])
-        }
+        undefined -> null;
+        {UserCtx} -> get_json_value(<<"name">>, UserCtx, null)
     end.
 
 
--spec parse_rep_db({[_]} | binary(), binary(), [_]) -> #httpd{} | binary().
-parse_rep_db({Props}, Proxy, Options) ->
-    ProxyParams = parse_proxy_params(Proxy),
+-spec parse_rep_db(#{}, #{}, #{}) -> #{}.
+parse_rep_db(#{} = Endpoint, #{} = ProxyParams, #{} = Options) ->
     ProxyURL = case ProxyParams of
-        [] -> undefined;
-        _ -> binary_to_list(Proxy)
+       #{<<"proxy_url">> := PUrl} -> PUrl;
+       _ -> null
     end,
-    Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)),
-    {AuthProps} = get_value(<<"auth">>, Props, {[]}),
-    {BinHeaders} = get_value(<<"headers">>, Props, {[]}),
-    Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]),
-    DefaultHeaders = (#httpdb{})#httpdb.headers,
-    #httpdb{
-        url = Url,
-        auth_props = AuthProps,
-        headers = lists:ukeymerge(1, Headers, DefaultHeaders),
-        ibrowse_options = lists:keysort(1,
-            [{socket_options, get_value(socket_options, Options)} |
-                ProxyParams ++ ssl_params(Url)]),
-        timeout = get_value(connection_timeout, Options),
-        http_connections = get_value(http_connections, Options),
-        retries = get_value(retries, Options),
-        proxy_url = ProxyURL
-    };
+
+    Url0 = maps:get(<<"url">>, Endpoint),
+    Url = maybe_add_trailing_slash(Url0),
+
+    AuthProps = maps:get(<<"auth">>, Endpoint, #{}),
+
+    Headers0 = maps:get(<<"headers">>, Endpoint, #{}),
+    DefaultHeaders = couch_replicator_utils:get_default_headers(),
+    % For same keys values in second map override those in the first
+    Headers = maps:merge(DefaultHeaders, Headers0),
+
+    SockOpts = maps:get(<<"socket_options">>, Options, #{}),
+    SockAndProxy = maps:merge(SockOpts, ProxyParams),
+
+    SslParams = ssl_params(Url),
+
+    #{
+        <<"url">> => Url,
+        <<"auth_props">> => AuthProps,
+        <<"headers">> => Headers,
+        <<"ibrowse_options">> => maps:merge(SslParams, SockAndProxy),
+        <<"timeout">> => maps:get(<<"timeout">>, Options),
+        <<"http_connections">> => maps:get(<<"http_connections">>, Options),
+        <<"retries">> => maps:get(<<"retries">>, Options)
+        <<"proxy_url">> => ProxyUrl
+    }.
+
 
 parse_rep_db(<<"http://", _/binary>> = Url, Proxy, Options) ->
-    parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options);
+    parse_rep_db(#{<<"url">> => Url}, Proxy, Options);
 
 parse_rep_db(<<"https://", _/binary>> = Url, Proxy, Options) ->
-    parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options);
+    parse_rep_db(#{<<"url">> => Url}, Proxy, Options);
 
-parse_rep_db(<<DbName/binary>>, _Proxy, _Options) ->
-    DbName;
+parse_rep_db(<<DbName/binary>> = LocalDb, _Proxyh, _Options) ->
+    throw({error, <<"Local endpoint not supported: ", DbName/binary>>});
 
 parse_rep_db(undefined, _Proxy, _Options) ->
     throw({error, <<"Missing replicator database">>}).
 
 
--spec maybe_add_trailing_slash(binary() | list()) -> list().
+-spec maybe_add_trailing_slash(binary()) -> binary().
+maybe_add_trailing_slash(<<>>) ->
+    <<>>;
+
 maybe_add_trailing_slash(Url) when is_binary(Url) ->
-    maybe_add_trailing_slash(?b2l(Url));
-maybe_add_trailing_slash(Url) ->
-    case lists:member($?, Url) of
-        true ->
-            Url;  % skip if there are query params
-        false ->
-            case lists:last(Url) of
-                $/ ->
-                    Url;
-                _ ->
-                    Url ++ "/"
-            end
+    case binary:match(Url, <<"?">>) of
+        nomatch ->
+            case binary:last(Url) of
+                $/  -> Url;
+                _ -> <<Url/binary, "/">>;
+        _ ->
+            Url  % skip if there are query params
     end.
 
 
@@ -526,66 +530,100 @@ parse_sock_opts(V) ->
     end, #{}, SocketOptions).
 
 
--spec parse_proxy_params(binary() | [_]) -> [_].
-parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
-    parse_proxy_params(?b2l(ProxyUrl));
-parse_proxy_params([]) ->
-    [];
-parse_proxy_params(ProxyUrl) ->
+-spec parse_proxy_params(binary() | #{}) -> #{}.
+parse_proxy_params(<<>>) ->
+    #{};
+parse_proxy_params(ProxyUrl0) when is_binary(ProxyUrl0)->
+    ProxyUrl = binary_to_list(ProxyUrl0),
     #url{
         host = Host,
         port = Port,
         username = User,
         password = Passwd,
-        protocol = Protocol
+        protocol = Protocol0
     } = ibrowse_lib:parse_url(ProxyUrl),
-    [
-        {proxy_protocol, Protocol},
-        {proxy_host, Host},
-        {proxy_port, Port}
-    ] ++ case is_list(User) andalso is_list(Passwd) of
+    Protocol = atom_to_binary(Protocol, utf8),
+    case lists:member(Protocol, [<<"http">>, <<"https">>, <<"socks5">>]) of
+        true ->
+            atom_to_binary(Protocol, utf8);
         false ->
-            [];
+            Error = <<"Unsupported proxy protocol", Protocol/binary>>,
+            throw({bad_request, Error})
+    end,
+    ProxyParams = #{
+        <<"proxy_url">> => ProxyUrl,
+        <<"proxy_protocol">> => Protocol,
+        <<"proxy_host">> => list_to_binary(Host),
+        <<"proxy_port">> => Port
+    #},
+    case is_list(User) andalso is_list(Passwd) of
         true ->
-            [{proxy_user, User}, {proxy_password, Passwd}]
-        end.
+            ProxyParams#{
+                <<"proxy_user">> => list_to_binary(User),
+                <<"proxy_password">> => list_to_binary(Passwd)
+            };
+        false ->
+            ProxyParams
+    end.
 
 
--spec ssl_params([_]) -> [_].
+-spec ssl_params(binary()) -> #{}.
 ssl_params(Url) ->
-    case ibrowse_lib:parse_url(Url) of
+    case ibrowse_lib:parse_url(binary_to_list(Url)) of
     #url{protocol = https} ->
         Depth = list_to_integer(
             config:get("replicator", "ssl_certificate_max_depth", "3")
         ),
         VerifyCerts = config:get("replicator", "verify_ssl_certificates"),
-        CertFile = config:get("replicator", "cert_file", undefined),
-        KeyFile = config:get("replicator", "key_file", undefined),
-        Password = config:get("replicator", "password", undefined),
-        SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts =:= "true")],
-        SslOpts1 = case CertFile /= undefined andalso KeyFile /= undefined of
+        CertFile = config:get("replicator", "cert_file", null),
+        KeyFile = config:get("replicator", "key_file", null),
+        Password = config:get("replicator", "password", null),
+        VerifySslOptions = ssl_verify_options(VerifyCerts =:= "true"),
+        SslOpts = maps:merge(VerifySslOptions, #{<<"depth">> => Depth}),
+        SslOpts1 = case CertFile /= null andalso KeyFile /= null of
             true ->
-                case Password of
-                    undefined ->
-                        [{certfile, CertFile}, {keyfile, KeyFile}] ++ SslOpts;
+                CertFileOpts = case Password of
+                    null ->
+                        #{
+                            <<"certfile">> => list_to_binary(CertFile),
+                            <<"keyfile">> => list_to_binary(KeyFile)
+                        };
                     _ ->
-                        [{certfile, CertFile}, {keyfile, KeyFile},
-                            {password, Password}] ++ SslOpts
-                end;
-            false -> SslOpts
+                        #{
+                            <<"certfile">> => list_to_binary(CertFile),
+                            <<"keyfile">> => list_to_binary(KeyFile),
+                            <<"password">> => list_to_binary(Password)
+                        }
+                end,
+                maps:merge(SslOpts, CertFileOpts)
+            false ->
+                SslOpts
         end,
-        [{is_ssl, true}, {ssl_options, SslOpts1}];
+        #{<<"is_ssl">> => true, <<"ssl_options">> => SslOpts1};
     #url{protocol = http} ->
-        []
+        #{}
     end.
 
 
 -spec ssl_verify_options(true | false) -> [_].
 ssl_verify_options(true) ->
-    CAFile = config:get("replicator", "ssl_trusted_certificates_file"),
-    [{verify, verify_peer}, {cacertfile, CAFile}];
+    case config:get("replicator", "ssl_trusted_certificates_file", undefined) of
+        undefined ->
+            #{
+                <<"verify">> => <<"verify_peer">>,
+                <<"cacertfile">> => null
+            };
+        CAFile when is_list(CAFile) ->
+            #{
+                <<"verify">> => <<"verify_peer">>,
+                <<"cacertfile">> => list_to_binary(CAFile)
+            }
+    end;
+
 ssl_verify_options(false) ->
-    [{verify, verify_none}].
+    #{
+        <<"verify">> => <<"verify_none">>
+    }.
 
 
 -spec before_doc_update(#doc{}, Db::any(), couch_db:update_type()) -> #doc{}.
diff --git a/src/couch_replicator/src/couch_replicator_filters.erl b/src/couch_replicator/src/couch_replicator_filters.erl
index 940b8ad..a03ffe8 100644
--- a/src/couch_replicator/src/couch_replicator_filters.erl
+++ b/src/couch_replicator/src/couch_replicator_filters.erl
@@ -63,11 +63,11 @@ parse(Options) ->
 % Fetches body of filter function from source database. Guaranteed to either
 % return {ok, Body} or an {error, Reason}. Also assume this function might
 % block due to network / socket issues for an undeterminted amount of time.
--spec fetch(binary(), binary(), binary(), #{}) ->
+-spec fetch(binary(), binary(), binary()) ->
     {ok, {[_]}} | {error, binary()}.
-fetch(DDocName, FilterName, Source, #{} = UserCtx) ->
+fetch(DDocName, FilterName, Source) ->
     {Pid, Ref} = spawn_monitor(fun() ->
-        try fetch_internal(DDocName, FilterName, Source, UserCtx) of
+        try fetch_internal(DDocName, FilterName, Source) of
             Resp ->
                 exit({exit_ok, Resp})
         catch
@@ -88,28 +88,30 @@ fetch(DDocName, FilterName, Source, #{} = UserCtx) ->
 
 
 % Get replication type and view (if any) from replication document props
--spec view_type([_], [_]) ->
-    {view, {binary(), binary()}} | {db, nil} | {error, binary()}.
-view_type(Props, Options) ->
-    case couch_util:get_value(<<"filter">>, Props) of
-        <<"_view">> ->
-            {QP}  = couch_util:get_value(query_params, Options, {[]}),
-            ViewParam = couch_util:get_value(<<"view">>, QP),
-            case re:split(ViewParam, <<"/">>) of
-                [DName, ViewName] ->
-                    {view, {<< "_design/", DName/binary >>, ViewName}};
-                _ ->
-                    {error, <<"Invalid `view` parameter.">>}
-            end;
+-spec view_type(#{}, [_]) ->
+    {binary(), #{}} | {error, binary()}.
+view_type(#{<<"filter">> := <<"_view">>}, Options) ->
+    {QP}  = couch_util:get_value(query_params, Options, {[]}),
+    ViewParam = couch_util:get_value(<<"view">>, QP),
+    case re:split(ViewParam, <<"/">>) of
+        [DName, ViewName] ->
+            DDocMap = #{
+                <<"ddoc">> => <<"_design/",DName/binary>>,
+                <<"view">> => ViewName
+            },
+            {<<"view">>, DDocMap};
         _ ->
-            {db, nil}
-    end.
+            {error, <<"Invalid `view` parameter.">>}
+    end;
+
+view_type(#{}, [_] = Options) ->
+    {<<"db">>, #{}}.
 
 
 % Private functions
 
-fetch_internal(DDocName, FilterName, Source, #{} = UserCtx) ->
-    Db = case (catch couch_replicator_api_wrap:db_open(Source, UserCtx) of
+fetch_internal(DDocName, FilterName, Source) ->
+    Db = case (catch couch_replicator_api_wrap:db_open(Source) of
     {ok, Db0} ->
         Db0;
     DbError ->
diff --git a/src/couch_replicator/src/couch_replicator_ids.erl b/src/couch_replicator/src/couch_replicator_ids.erl
index 0d73cbf..da30532 100644
--- a/src/couch_replicator/src/couch_replicator_ids.erl
+++ b/src/couch_replicator/src/couch_replicator_ids.erl
@@ -40,19 +40,19 @@ replication_id(#{<<"options">> := Options} = Rep) ->
 % If a change is made to how replications are identified,
 % please add a new clause and increase ?REP_ID_VERSION.
 
-replication_id(#{<<"user_ctx">> := UserCtx} = Rep, 4) ->
+replication_id(#{<<"source">> := Src, <<"target">> := Tgt} = Rep, 4) ->
     UUID = couch_server:get_uuid(),
-    SrcInfo = get_v4_endpoint(UserCtx, maps:get(<<"source">>, Rep)),
-    TgtInfo = get_v4_endpoint(UserCtx, maps:get(<<"target">>, Rep)),
+    SrcInfo = get_v4_endpoint(Src),
+    TgtInfo = get_v4_endpoint(Tgt),
     maybe_append_filters([UUID, SrcInfo, TgtInfo], Rep);
 
-replication_id(#{<<"user_ctx">> := UserCtx} = Rep, 3) ->
+replication_id(#{<<"source">> := Src0, <<"target">> := Tgt0} = Rep, 3) ->
     UUID = couch_server:get_uuid(),
-    Src = get_rep_endpoint(UserCtx, maps:get(<<"source">>, Rep)),
-    Tgt = get_rep_endpoint(UserCtx, maps:get(<<"target">>, Res)),
+    Src = get_rep_endpoint(Src0),
+    Tgt = get_rep_endpoint(Tgt0),
     maybe_append_filters([UUID, Src, Tgt], Rep);
 
-replication_id(#{<<"user_ctx">> := UserCtx} = Rep, 2) ->
+replication_id(#{<<"source">> := Src0, <<"target">> := Tgt0} = Rep, 2) ->
     {ok, HostName} = inet:gethostname(),
     Port = case (catch mochiweb_socket_server:get(couch_httpd, port)) of
     P when is_number(P) ->
@@ -65,14 +65,14 @@ replication_id(#{<<"user_ctx">> := UserCtx} = Rep, 2) ->
         % ... mochiweb_socket_server:get(https, port)
         list_to_integer(config:get("httpd", "port", "5984"))
     end,
-    Src = get_rep_endpoint(UserCtx, maps:get(<<"source">>, Rep)),
-    Tgt = get_rep_endpoint(UserCtx, maps:get(<<"target">>, Rep)),
+    Src = get_rep_endpoint(Src0),
+    Tgt = get_rep_endpoint(Tgt0),
     maybe_append_filters([HostName, Port, Src, Tgt], Rep);
 
-replication_id(#{<<"user_ctx">> := UserCtx} = Rep, 1) ->
+replication_id(#{<<"source">> := Src0, <<"target">> := Tgt0} = Rep, 1) ->
     {ok, HostName} = inet:gethostname(),
-    Src = get_rep_endpoint(UserCtx, maps:get(<<"source">>, Rep)),
-    Tgt = get_rep_endpoint(UserCtx, maps:get(<<"target">>, Rep)),
+    Src = get_rep_endpoint(Src0),
+    Tgt = get_rep_endpoint(Tgt0),
     maybe_append_filters([HostName, Src, Tgt], Rep).
 
 
@@ -99,7 +99,6 @@ convert({BaseId, Ext} = Id) when is_binary(BaseId), is_binary(Ext) ->
 maybe_append_filters(Base, #{} = Rep) ->
     #{
         <<"source">> := Source,
-        <<"user_ctx">> := UserCtx,
         <<"options">> := Options
     } = Rep,
     Base2 = Base ++
@@ -109,7 +108,7 @@ maybe_append_filters(Base, #{} = Rep) ->
         {ok, {view, Filter, QueryParams}} ->
             [Filter, QueryParams];
         {ok, {user, {Doc, Filter}, QueryParams}} ->
-            case couch_replicator_filters:fetch(Doc, Filter, Source, UserCtx) of
+            case couch_replicator_filters:fetch(Doc, Filter, Source) of
                 {ok, Code} ->
                     [Code, QueryParams];
                 {error, Error} ->
@@ -138,31 +137,26 @@ maybe_append_options(Options, RepOptions) ->
     end, [], Options).
 
 
-get_rep_endpoint(_UserCtx, #{<<"url">> = Url0, <<"headers">> = Headers0}) ->
+get_rep_endpoint(#{<<"url">> := Url0, <<"headers">> := Headers0}) ->
     Url = binary_to_list(Url0),
+    % We turn headers into a proplist of string() KVs to calculate
+    % the same replication ID as CouchDB 2.x
     Headers1 = maps:fold(fun(K, V, Acc) ->
         [{binary_to_list(K), binary_to_list(V)} | Acc]
     end, [], Header0),
     Headers2 = lists:keysort(1, Headers1),
     DefaultHeaders = (#httpdb{})#httpdb.headers,
-    {remote, Url, Headers2 -- DefaultHeaders};
-get_rep_endpoint(#{} = UserCtx, <<DbName/binary>>) ->
-    UserCtxRec = couch_replicator_utils:user_ctx_from_json(UserCtx),
-    {local, DbName, UserCtxRec}.
+    {remote, Url, Headers2 -- DefaultHeaders}.
 
 
-get_v4_endpoint(#{} = UserCtx, #{} = HttpDb) ->
-    {remote, Url, Headers} = get_rep_endpoint(UserCtx, HttpDb),
+get_v4_endpoint(#{} = HttpDb) ->
+    {remote, Url, Headers} = get_rep_endpoint(HttpDb),
     {{UserFromHeaders, _}, HeadersWithoutBasicAuth} =
         couch_replicator_utils:remove_basic_auth_from_headers(Headers),
     {UserFromUrl, Host, NonDefaultPort, Path} = get_v4_url_info(Url),
     User = pick_defined_value([UserFromUrl, UserFromHeaders]),
     OAuth = undefined, % Keep this to ensure checkpoints don't change
-    {remote, User, Host, NonDefaultPort, Path, HeadersWithoutBasicAuth, OAuth};
-get_v4_endpoint(#{} = UserCtx, <<DbName/binary>>) ->
-    UserCtxRec = couch_replicator_utils:user_ctx_from_json(UserCtx),
-    {local, DbName, UserCtxRec}.
-
+    {remote, User, Host, NonDefaultPort, Path, HeadersWithoutBasicAuth, OAuth}.
 
 pick_defined_value(Values) ->
     case [V || V <- Values, V /= undefined] of
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index e6e8a4d..b205c84 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -1078,8 +1078,8 @@ scheduler_job_format_status_test() ->
     Target = <<"http://u:p@h2/d2">>,
     Rep = #rep{
         id = {"base", "+ext"},
-        source = couch_replicator_docs:parse_rep_db(Source, [], []),
-        target = couch_replicator_docs:parse_rep_db(Target, [], []),
+        source = couch_replicator_docs:parse_rep_db(Source, #{}, #{}),
+        target = couch_replicator_docs:parse_rep_db(Target, #{}, #{}),
         options = [{create_target, true}],
         doc_id = <<"mydoc">>,
         db_name = <<"mydb">>
diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl
index ff61fcf..c6efde5 100644
--- a/src/couch_replicator/src/couch_replicator_utils.erl
+++ b/src/couch_replicator/src/couch_replicator_utils.erl
@@ -14,11 +14,6 @@
 
 -export([
    parse_rep_doc/2,
-   open_db/1,
-   close_db/1,
-   local_db_name/1,
-   start_db_compaction_notifier/2,
-   stop_db_compaction_notifier/1,
    replication_id/2,
    sum_stats/2,
    is_deleted/1,
@@ -29,8 +24,7 @@
    filter_state/3,
    remove_basic_auth_from_headers/1,
    normalize_rep/1,
-   user_ctx_from_json/1,
-   user_ctx_to_json/1
+   default_headers_map/0
 ]).
 
 -export([
@@ -47,49 +41,6 @@
 ]).
 
 
-open_db(#httpdb{} = HttpDb) ->
-    HttpDb;
-open_db(Db) ->
-    DbName = couch_db:name(Db),
-    UserCtx = couch_db:get_user_ctx(Db),
-    {ok, NewDb} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
-    NewDb.
-
-
-close_db(#httpdb{}) ->
-    ok;
-close_db(Db) ->
-    couch_db:close(Db).
-
-
-local_db_name(#httpdb{}) ->
-    undefined;
-local_db_name(Db) ->
-    couch_db:name(Db).
-
-
-start_db_compaction_notifier(#httpdb{}, _) ->
-    nil;
-start_db_compaction_notifier(Db, Server) ->
-    DbName = couch_db:name(Db),
-    {ok, Pid} = couch_event:link_listener(
-            ?MODULE, handle_db_event, Server, [{dbname, DbName}]
-        ),
-    Pid.
-
-
-stop_db_compaction_notifier(nil) ->
-    ok;
-stop_db_compaction_notifier(Listener) ->
-    couch_event:stop_listener(Listener).
-
-
-handle_db_event(DbName, compacted, Server) ->
-    gen_server:cast(Server, {db_compacted, DbName}),
-    {ok, Server};
-handle_db_event(_DbName, _Event, Server) ->
-    {ok, Server}.
-
 
 rep_error_to_binary(Error) ->
     couch_util:to_binary(error_reason(Error)).
@@ -228,30 +179,16 @@ normalize_endpoint(<<DbName/binary>>) ->
     DbName;
 
 normalize_endpoint(#{} = Endpoint) ->
-    Ks = [<<"url">>,<<"auth_props">>, <<"headers">>, <<"timeout">>,
+    Ks = [<<"url">>, <<"auth_props">>, <<"headers">>, <<"timeout">>,
         <<"ibrowse_options">>, <<"retries">>, <<"http_connections">>
     ],
     maps:with(Ks, Endpoint).
 
 
-user_ctx_to_json(#user_ctx{name = Name, roles = Roles0} = UserCtx) ->
-    {AtomRoles0, Roles} = lists:partition(fun erlang:is_atom/1, Roles0),
-    AtomRoles = lists:map(fun(R) -> atom_to_binary(V, utf8) end, AtomRoles0),
-    UserCtxMap = #{
-        <<"name">> => Name,
-        <<"roles">> => Roles,
-        <<"atom_roles">> => AtomRoles
-    }.
-
-
-user_ctx_from_json(#{} = UserCtxMap) ->
-    #{
-        <<"name">> := Name,
-        <<"roles">> := Roles
-        <<"atom_roles">> := AtomRoles0
-    },
-    AtomRoles = lists:map(fun(R) -> binary_to_atom(V, utf8) end, AtomRoles0),
-    #user_ctx{name = Name, roles = lists:sort(Roles ++ AtomRoles)}.
+get_default_headers() ->
+    lists:foldl(fun({K, V}, Acc) ->
+        Acc#{list_to_binary(K) => list_to_binary(V)}
+    end, #{}, (#httpdb{})#httpdb.headers).
 
 
 -ifdef(TEST).
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl
index ec98fa0..0787303 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -28,18 +28,11 @@
 
 % TODO: maybe make both buffer max sizes configurable
 -define(DOC_BUFFER_BYTE_SIZE, 512 * 1024).   % for remote targets
--define(DOC_BUFFER_LEN, 10).                 % for local targets, # of documents
 -define(MAX_BULK_ATT_SIZE, 64 * 1024).
 -define(MAX_BULK_ATTS_PER_DOC, 8).
 -define(STATS_DELAY, 10000000).              % 10 seconds (in microseconds)
 -define(MISSING_DOC_RETRY_MSEC, 2000).
 
--import(couch_replicator_utils, [
-    open_db/1,
-    close_db/1,
-    start_db_compaction_notifier/2,
-    stop_db_compaction_notifier/1
-]).
 -import(couch_util, [
     to_binary/1,
     get_value/3
@@ -62,8 +55,6 @@
     pending_fetch = nil,
     flush_waiter = nil,
     stats = couch_replicator_stats:new(),
-    source_db_compaction_notifier = nil,
-    target_db_compaction_notifier = nil,
     batch = #batch{}
 }).
 
@@ -71,14 +62,7 @@
 
 start_link(Cp, #httpdb{} = Source, Target, ChangesManager, MaxConns) ->
     gen_server:start_link(
-        ?MODULE, {Cp, Source, Target, ChangesManager, MaxConns}, []);
-
-start_link(Cp, Source, Target, ChangesManager, _MaxConns) ->
-    Pid = spawn_link(fun() ->
-        erlang:put(last_stats_report, os:timestamp()),
-        queue_fetch_loop(Source, Target, Cp, Cp, ChangesManager)
-    end),
-    {ok, Pid}.
+        ?MODULE, {Cp, Source, Target, ChangesManager, MaxConns}, []).
 
 
 init({Cp, Source, Target, ChangesManager, MaxConns}) ->
@@ -92,12 +76,8 @@ init({Cp, Source, Target, ChangesManager, MaxConns}) ->
         cp = Cp,
         max_parallel_conns = MaxConns,
         loop = LoopPid,
-        source = open_db(Source),
-        target = open_db(Target),
-        source_db_compaction_notifier =
-            start_db_compaction_notifier(Source, self()),
-        target_db_compaction_notifier =
-            start_db_compaction_notifier(Target, self())
+        source = Source,
+        target = Target,
     },
     {ok, State}.
 
@@ -141,24 +121,6 @@ handle_call(flush, {Pid, _} = From,
     {noreply, State2#state{flush_waiter = From}}.
 
 
-handle_cast({db_compacted, DbName} = Msg, #state{} = State) ->
-    #state{
-        source = Source,
-        target = Target
-    } = State,
-    SourceName = couch_replicator_utils:local_db_name(Source),
-    TargetName = couch_replicator_utils:local_db_name(Target),
-    case DbName of
-        SourceName ->
-            {ok, NewSource} = couch_db:reopen(Source),
-            {noreply, State#state{source = NewSource}};
-        TargetName ->
-            {ok, NewTarget} = couch_db:reopen(Target),
-            {noreply, State#state{target = NewTarget}};
-        _Else ->
-            {stop, {unexpected_async_call, Msg}, State}
-    end;
-
 handle_cast(Msg, State) ->
     {stop, {unexpected_async_call, Msg}, State}.
 
@@ -214,10 +176,8 @@ handle_info({'EXIT', Pid, Reason}, State) ->
 
 
 terminate(_Reason, State) ->
-    close_db(State#state.source),
-    close_db(State#state.target),
-    stop_db_compaction_notifier(State#state.source_db_compaction_notifier),
-    stop_db_compaction_notifier(State#state.target_db_compaction_notifier).
+    ok.
+
 
 format_status(_Opt, [_PDict, State]) ->
     #state{
@@ -253,20 +213,10 @@ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) ->
         ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity),
         queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager);
     {changes, ChangesManager, Changes, ReportSeq} ->
-        Target2 = open_db(Target),
-        {IdRevs, Stats0} = find_missing(Changes, Target2),
-        case Source of
-        #httpdb{} ->
-            ok = gen_server:call(Parent, {add_stats, Stats0}, infinity),
-            remote_process_batch(IdRevs, Parent),
-            {ok, Stats} = gen_server:call(Parent, flush, infinity);
-        _Db ->
-            Source2 = open_db(Source),
-            Stats = local_process_batch(
-                IdRevs, Cp, Source2, Target2, #batch{}, Stats0),
-            close_db(Source2)
-        end,
-        close_db(Target2),
+        {IdRevs, Stats0} = find_missing(Changes, Target),
+        ok = gen_server:call(Parent, {add_stats, Stats0}, infinity),
+        remote_process_batch(IdRevs, Parent),
+        {ok, Stats} = gen_server:call(Parent, flush, infinity),
         ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity),
         erlang:put(last_stats_report, os:timestamp()),
         couch_log:debug("Worker reported completion of seq ~p", [ReportSeq]),
@@ -274,32 +224,6 @@ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) ->
     end.
 
 
-local_process_batch([], _Cp, _Src, _Tgt, #batch{docs = []}, Stats) ->
-    Stats;
-
-local_process_batch([], Cp, Source, Target, #batch{docs = Docs, size = Size}, Stats) ->
-    case Target of
-    #httpdb{} ->
-        couch_log:debug("Worker flushing doc batch of size ~p bytes", [Size]);
-    _Db ->
-        couch_log:debug("Worker flushing doc batch of ~p docs", [Size])
-    end,
-    Stats2 = flush_docs(Target, Docs),
-    Stats3 = couch_replicator_utils:sum_stats(Stats, Stats2),
-    local_process_batch([], Cp, Source, Target, #batch{}, Stats3);
-
-local_process_batch([IdRevs | Rest], Cp, Source, Target, Batch, Stats) ->
-    {ok, {_, DocList, Stats2, _}} = fetch_doc(
-        Source, IdRevs, fun local_doc_handler/2, {Target, [], Stats, Cp}),
-    {Batch2, Stats3} = lists:foldl(
-        fun(Doc, {Batch0, Stats0}) ->
-            {Batch1, S} = maybe_flush_docs(Target, Batch0, Doc),
-            {Batch1, couch_replicator_utils:sum_stats(Stats0, S)}
-        end,
-        {Batch, Stats2}, DocList),
-    local_process_batch(Rest, Cp, Source, Target, Batch2, Stats3).
-
-
 remote_process_batch([], _Parent) ->
     ok;
 
@@ -319,10 +243,8 @@ remote_process_batch([{Id, Revs, PAs} | Rest], Parent) ->
 spawn_doc_reader(Source, Target, FetchParams) ->
     Parent = self(),
     spawn_link(fun() ->
-        Source2 = open_db(Source),
         fetch_doc(
-            Source2, FetchParams, fun remote_doc_handler/2, {Parent, Target}),
-        close_db(Source2)
+            Source, FetchParams, fun remote_doc_handler/2, {Parent, Target}),
     end).
 
 
@@ -350,29 +272,6 @@ fetch_doc(Source, {Id, Revs, PAs}, DocHandler, Acc) ->
     end.
 
 
-local_doc_handler({ok, Doc}, {Target, DocList, Stats, Cp}) ->
-    Stats2 = couch_replicator_stats:increment(docs_read, Stats),
-    case batch_doc(Doc) of
-    true ->
-        {ok, {Target, [Doc | DocList], Stats2, Cp}};
-    false ->
-        couch_log:debug("Worker flushing doc with attachments", []),
-        Target2 = open_db(Target),
-        Success = (flush_doc(Target2, Doc) =:= ok),
-        close_db(Target2),
-        Stats3 = case Success of
-        true ->
-            couch_replicator_stats:increment(docs_written, Stats2);
-        false ->
-            couch_replicator_stats:increment(doc_write_failures, Stats2)
-        end,
-        Stats4 = maybe_report_stats(Cp, Stats3),
-        {ok, {Target, DocList, Stats4, Cp}}
-    end;
-local_doc_handler(_, Acc) ->
-    {ok, Acc}.
-
-
 remote_doc_handler({ok, #doc{atts = []} = Doc}, {Parent, _} = Acc) ->
     ok = gen_server:call(Parent, {batch_doc, Doc}, infinity),
     {ok, Acc};
@@ -383,9 +282,7 @@ remote_doc_handler({ok, Doc}, {Parent, Target} = Acc) ->
     % convenient to call it ASAP to avoid ibrowse inactivity timeouts.
     Stats = couch_replicator_stats:new([{docs_read, 1}]),
     couch_log:debug("Worker flushing doc with attachments", []),
-    Target2 = open_db(Target),
-    Success = (flush_doc(Target2, Doc) =:= ok),
-    close_db(Target2),
+    Success = (flush_doc(Target, Doc) =:= ok),
     {Result, Stats2} = case Success of
     true ->
         {{ok, Acc}, couch_replicator_stats:increment(docs_written, Stats)};
@@ -410,9 +307,7 @@ spawn_writer(Target, #batch{docs = DocList, size = Size}) ->
     Parent = self(),
     spawn_link(
         fun() ->
-            Target2 = open_db(Target),
-            Stats = flush_docs(Target2, DocList),
-            close_db(Target2),
+            Stats = flush_docs(Target, DocList),
             ok = gen_server:call(Parent, {add_stats, Stats}, infinity)
         end).
 
@@ -462,17 +357,6 @@ maybe_flush_docs(#httpdb{} = Target, Batch, Doc) ->
             Stats = couch_replicator_stats:new(),
             {#batch{docs = [JsonDoc | DocAcc], size = SizeAcc2}, Stats}
         end
-    end;
-
-maybe_flush_docs(Target, #batch{docs = DocAcc, size = SizeAcc}, Doc) ->
-    case SizeAcc + 1 of
-    SizeAcc2 when SizeAcc2 >= ?DOC_BUFFER_LEN ->
-        couch_log:debug("Worker flushing doc batch of ~p docs", [SizeAcc2]),
-        Stats = flush_docs(Target, [Doc | DocAcc]),
-        {#batch{}, Stats};
-    SizeAcc2 ->
-        Stats = couch_replicator_stats:new(),
-        {#batch{docs = [Doc | DocAcc], size = SizeAcc2}, Stats}
     end.