You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2019/07/23 17:03:53 UTC

[couchdb] branch prototype/fdb-replicator updated: WIP 2

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch prototype/fdb-replicator
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/prototype/fdb-replicator by this push:
     new 86ce528  WIP 2
86ce528 is described below

commit 86ce528bf18e2825cfed8a026cd44cb1665f2fd7
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Tue Jul 23 12:58:47 2019 -0400

    WIP 2
---
 src/chttpd/src/chttpd.erl                          |  61 ----
 src/chttpd/src/chttpd_misc.erl                     |  48 +--
 src/chttpd/test/chttpd_handlers_tests.erl          |   3 +-
 src/couch_replicator/src/couch_replicator.erl      |  95 +++---
 src/couch_replicator/src/couch_replicator.hrl      |   2 +-
 .../src/couch_replicator_api_wrap.erl              | 235 +++----------
 .../src/couch_replicator_clustering.erl            | 248 --------------
 .../src/couch_replicator_doc_processor.erl         |  14 +-
 src/couch_replicator/src/couch_replicator_docs.erl | 265 +++++++-------
 .../src/couch_replicator_filters.erl               |   9 +-
 src/couch_replicator/src/couch_replicator_ids.erl  |  70 ++--
 .../src/couch_replicator_notifier.erl              |  58 ----
 .../src/couch_replicator_scheduler_job.erl         | 380 +++++++++++----------
 .../src/couch_replicator_scheduler_sup.erl         |   6 +-
 src/couch_replicator/src/couch_replicator_sup.erl  |  12 -
 .../src/couch_replicator_utils.erl                 |  95 ++++--
 .../test/couch_replicator_proxy_tests.erl          |   4 +-
 17 files changed, 537 insertions(+), 1068 deletions(-)

diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl
index 4d32c03..7326bca 100644
--- a/src/chttpd/src/chttpd.erl
+++ b/src/chttpd/src/chttpd.erl
@@ -403,20 +403,6 @@ maybe_log(_HttpReq, #httpd_resp{should_log = false}) ->
     ok.
 
 
-%% HACK: replication currently handles two forms of input, #db{} style
-%% and #http_db style. We need a third that makes use of fabric. #db{}
-%% works fine for replicating the dbs and nodes database because they
-%% aren't sharded. So for now when a local db is specified as the source or
-%% the target, it's hacked to make it a full url and treated as a remote.
-possibly_hack(#httpd{path_parts=[<<"_replicate">>]}=Req) ->
-    {Props0} = chttpd:json_body_obj(Req),
-    Props1 = fix_uri(Req, Props0, <<"source">>),
-    Props2 = fix_uri(Req, Props1, <<"target">>),
-    put(post_body, {Props2}),
-    Req;
-possibly_hack(Req) ->
-    Req.
-
 check_request_uri_length(Uri) ->
     check_request_uri_length(Uri, config:get("httpd", "max_uri_length")).
 
@@ -439,53 +425,6 @@ check_url_encoding([$% | _]) ->
 check_url_encoding([_ | Rest]) ->
     check_url_encoding(Rest).
 
-fix_uri(Req, Props, Type) ->
-    case replication_uri(Type, Props) of
-    undefined ->
-        Props;
-    Uri0 ->
-        case is_http(Uri0) of
-        true ->
-            Props;
-        false ->
-            Uri = make_uri(Req, quote(Uri0)),
-            [{Type,Uri}|proplists:delete(Type,Props)]
-        end
-    end.
-
-replication_uri(Type, PostProps) ->
-    case couch_util:get_value(Type, PostProps) of
-    {Props} ->
-        couch_util:get_value(<<"url">>, Props);
-    Else ->
-        Else
-    end.
-
-is_http(<<"http://", _/binary>>) ->
-    true;
-is_http(<<"https://", _/binary>>) ->
-    true;
-is_http(_) ->
-    false.
-
-make_uri(Req, Raw) ->
-    Port = integer_to_list(mochiweb_socket_server:get(chttpd, port)),
-    Url = list_to_binary(["http://", config:get("httpd", "bind_address"),
-                          ":", Port, "/", Raw]),
-    Headers = [
-        {<<"authorization">>, ?l2b(header_value(Req,"authorization",""))},
-        {<<"cookie">>, ?l2b(extract_cookie(Req))}
-    ],
-    {[{<<"url">>,Url}, {<<"headers">>,{Headers}}]}.
-
-extract_cookie(#httpd{mochi_req = MochiReq}) ->
-    case MochiReq:get_cookie_value("AuthSession") of
-        undefined ->
-            "";
-        AuthSession ->
-            "AuthSession=" ++ AuthSession
-    end.
-%%% end hack
 
 set_auth_handlers() ->
     AuthenticationDefault =  "{chttpd_auth, cookie_authentication_handler},
diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl
index 11d2c5b..d832ade 100644
--- a/src/chttpd/src/chttpd_misc.erl
+++ b/src/chttpd/src/chttpd_misc.erl
@@ -203,8 +203,8 @@ handle_task_status_req(Req) ->
 handle_replicate_req(#httpd{method='POST', user_ctx=Ctx} = Req) ->
     chttpd:validate_ctype(Req, "application/json"),
     %% see HACK in chttpd.erl about replication
-    PostBody = get(post_body),
-    case replicate(PostBody, Ctx) of
+    PostBody = chttpd:json_body_obj(Req),
+    case couch_replicator:replicate(PostBody, Ctx) of
         {ok, {continuous, RepId}} ->
             send_json(Req, 202, {[{ok, true}, {<<"_local_id">>, RepId}]});
         {ok, {cancelled, RepId}} ->
@@ -223,50 +223,6 @@ handle_replicate_req(#httpd{method='POST', user_ctx=Ctx} = Req) ->
 handle_replicate_req(Req) ->
     send_method_not_allowed(Req, "POST").
 
-replicate({Props} = PostBody, Ctx) ->
-    case couch_util:get_value(<<"cancel">>, Props) of
-    true ->
-        cancel_replication(PostBody, Ctx);
-    _ ->
-        Node = choose_node([
-            couch_util:get_value(<<"source">>, Props),
-            couch_util:get_value(<<"target">>, Props)
-        ]),
-        case rpc:call(Node, couch_replicator, replicate, [PostBody, Ctx]) of
-        {badrpc, Reason} ->
-            erlang:error(Reason);
-        Res ->
-            Res
-        end
-    end.
-
-cancel_replication(PostBody, Ctx) ->
-    {Res, _Bad} = rpc:multicall(couch_replicator, replicate, [PostBody, Ctx]),
-    case [X || {ok, {cancelled, _}} = X <- Res] of
-    [Success|_] ->
-        % Report success if at least one node canceled the replication
-        Success;
-    [] ->
-        case lists:usort(Res) of
-        [UniqueReply] ->
-            % Report a universally agreed-upon reply
-            UniqueReply;
-        [] ->
-            {error, badrpc};
-        Else ->
-            % Unclear what to do here -- pick the first error?
-            % Except try ignoring any {error, not_found} responses
-            % because we'll always get two of those
-            hd(Else -- [{error, not_found}])
-        end
-    end.
-
-choose_node(Key) when is_binary(Key) ->
-    Checksum = erlang:crc32(Key),
-    Nodes = lists:sort([node()|erlang:nodes()]),
-    lists:nth(1 + Checksum rem length(Nodes), Nodes);
-choose_node(Key) ->
-    choose_node(term_to_binary(Key)).
 
 handle_reload_query_servers_req(#httpd{method='POST'}=Req) ->
     chttpd:validate_ctype(Req, "application/json"),
diff --git a/src/chttpd/test/chttpd_handlers_tests.erl b/src/chttpd/test/chttpd_handlers_tests.erl
index f3e8f5d..5ae80d0 100644
--- a/src/chttpd/test/chttpd_handlers_tests.erl
+++ b/src/chttpd/test/chttpd_handlers_tests.erl
@@ -70,7 +70,8 @@ request_replicate(Url, Body) ->
     Headers = [{"Content-Type", "application/json"}],
     Handler = {chttpd_misc, handle_replicate_req},
     request(post, Url, Headers, Body, Handler, fun(Req) ->
-        chttpd:send_json(Req, 200, get(post_body))
+        PostBody = chttpd:json_body_obj(Req),
+        chttpd:send_json(Req, 200, PostBody)
     end).
 
 request(Method, Url, Headers, Body, {M, F}, MockFun) ->
diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index e2d1964..3dee919 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -52,30 +52,27 @@
     {ok, {cancelled, binary()}} |
     {error, any()} |
     no_return().
-replicate(PostBody, Ctx) ->
-    {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, Ctx),
-    Rep = Rep0#rep{start_time = os:timestamp()},
-    #rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep,
-    case get_value(cancel, Options, false) of
-    true ->
-        CancelRepId = case get_value(id, Options, nil) of
-        nil ->
-            RepId;
-        RepId2 ->
-            RepId2
-        end,
-        case check_authorization(CancelRepId, UserCtx) of
-        ok ->
-            cancel_replication(CancelRepId);
-        not_found ->
-            {error, not_found}
-        end;
-    false ->
-        check_authorization(RepId, UserCtx),
-        {ok, Listener} = rep_result_listener(RepId),
-        Result = do_replication_loop(Rep),
-        couch_replicator_notifier:stop(Listener),
-        Result
+replicate(PostBody, UserCtx) ->
+    {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, UserCtx),
+    Rep = Rep0#{<<"start_time">> => erlang:system_time()},
+    #{<<"id">> := RepId, <<"options">> := Options} = Rep,
+    case maps:get(<<"cancel">>, Options, false) of
+        true ->
+            CancelRepId = case maps:get(<<"id">>, Options, nil) of
+                nil -> RepId;
+                RepId2 -> RepId2
+            end,
+            case check_authorization(CancelRepId, UserCtx) of
+                ok -> cancel_replication(CancelRepId);
+                not_found -> {error, not_found}
+            end;
+        false ->
+            check_authorization(RepId, UserCtx),
+            ok = couch_replicator_scheduler:add_job(Rep),
+            case maps:get(<<"continuous">>, Options, false) of
+                true -> {ok, {continuous, Id}};
+                false -> wait_for_result(Id)
+            end
     end.
 
 
@@ -89,37 +86,27 @@ ensure_rep_db_exists() ->
     ignore.
 
 
--spec do_replication_loop(#rep{}) ->
-    {ok, {continuous, binary()}} | {ok, tuple()} | {error, any()}.
-do_replication_loop(#rep{id = {BaseId, Ext} = Id, options = Options} = Rep) ->
-    ok = couch_replicator_scheduler:add_job(Rep),
-    case get_value(continuous, Options, false) of
-    true ->
-        {ok, {continuous, ?l2b(BaseId ++ Ext)}};
-    false ->
-        wait_for_result(Id)
-    end.
-
-
--spec rep_result_listener(rep_id()) -> {ok, pid()}.
-rep_result_listener(RepId) ->
-    ReplyTo = self(),
-    {ok, _Listener} = couch_replicator_notifier:start_link(
-        fun({_, RepId2, _} = Ev) when RepId2 =:= RepId ->
-                ReplyTo ! Ev;
-            (_) ->
-                ok
-        end).
-
-
 -spec wait_for_result(rep_id()) ->
     {ok, {[_]}} | {error, any()}.
 wait_for_result(RepId) ->
-    receive
-    {finished, RepId, RepResult} ->
-        {ok, RepResult};
-    {error, RepId, Reason} ->
-        {error, Reason}
+    FinishRes = case couch_jobs:subscribe(?REP_JOBS, RepId) of
+        {ok, finished, JobData} ->
+            {ok, JobData};
+        {ok, SubId, _, _} ->
+            case couch_jobs:wait(SubId, finished, infinity) of
+                {?REP_JOBS, RepId, finished, JobData} -> {ok, JobData};
+                timeout -> timeout
+            end;
+        {error, Error} ->
+            {error, Error}
+    end,
+    case FinishRes of
+       {ok, #{<<"finished_result">> := CheckpointHistory}} ->
+            {ok, CheckpointHistory};
+       timeout ->
+            {error, timeout};
+       {error, Error} ->
+            {error, Error}
     end.
 
 
@@ -289,9 +276,9 @@ state_atom(State) when is_atom(State) ->
 -spec check_authorization(rep_id(), #user_ctx{}) -> ok | not_found.
 check_authorization(RepId, #user_ctx{name = Name} = Ctx) ->
     case couch_replicator_scheduler:rep_state(RepId) of
-    #rep{user_ctx = #user_ctx{name = Name}} ->
+    #{<<"user_ctx">> := #{<<"name">> := Name}} ->
         ok;
-    #rep{} ->
+    #{} ->
         couch_httpd:verify_is_server_admin(Ctx);
     nil ->
         not_found
diff --git a/src/couch_replicator/src/couch_replicator.hrl b/src/couch_replicator/src/couch_replicator.hrl
index 8f7a77a..30cb485 100644
--- a/src/couch_replicator/src/couch_replicator.hrl
+++ b/src/couch_replicator/src/couch_replicator.hrl
@@ -30,7 +30,7 @@
     stats = couch_replicator_stats:new() :: orddict:orddict() | '_'
 }).
 
--type rep_id() :: {string(), string()}.
+-type rep_id() :: binary().
 -type db_doc_id() :: {binary(), binary() | '_'}.
 -type seconds() :: non_neg_integer().
 -type rep_start_result() ::
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index 44c290d..24ec99b 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -38,8 +38,8 @@
     open_doc/3,
     open_doc_revs/6,
     changes_since/5,
-    db_uri/1,
-    normalize_db/1
+    db_uri/1
+    db_from_map/1,
     ]).
 
 -import(couch_replicator_httpc, [
@@ -57,21 +57,19 @@
 -define(MAX_URL_LEN, 7000).
 -define(MIN_URL_LEN, 200).
 
-db_uri(#httpdb{url = Url}) ->
+db_uri(#{<<"url">> := Url}) ->
     couch_util:url_strip_password(Url);
 
-db_uri(DbName) when is_binary(DbName) ->
-    ?b2l(DbName);
+db_uri(#httpdb{url = Url}) ->
+    couch_util:url_strip_password(Url).
 
-db_uri(Db) ->
-    db_uri(couch_db:name(Db)).
 
+db_open(Db, #{} = UserCtxMap) when is_map(Db) orelse is_binary(Db) ->
+    db_open(Db, #{} = UserCtxMap, false, []);
 
-db_open(Db, Options) ->
-    db_open(Db, Options, false, []).
 
-db_open(#httpdb{} = Db1, _Options, Create, CreateParams) ->
-    {ok, Db} = couch_replicator_httpc:setup(Db1),
+db_open(#{} = Db0, #httpdb{} = Db1, #{} =_UserCtxMap, Create, CreateParams) ->
+    {ok, Db} = couch_replicator_httpc:setup(db_from_json(Db0)),
     try
         case Create of
         false ->
@@ -118,50 +116,19 @@ db_open(#httpdb{} = Db1, _Options, Create, CreateParams) ->
         exit:Error ->
             db_close(Db),
             erlang:exit(Error)
-    end;
-db_open(DbName, Options, Create, _CreateParams) ->
-    try
-        case Create of
-        false ->
-            ok;
-        true ->
-            ok = couch_httpd:verify_is_server_admin(
-                get_value(user_ctx, Options)),
-            couch_db:create(DbName, Options)
-        end,
-        case couch_db:open(DbName, Options) of
-        {error, {illegal_database_name, _}} ->
-            throw({db_not_found, DbName});
-        {not_found, _Reason} ->
-            throw({db_not_found, DbName});
-        {ok, _Db} = Success ->
-            Success
-        end
-    catch
-    throw:{unauthorized, _} ->
-        throw({unauthorized, DbName})
     end.
 
 db_close(#httpdb{httpc_pool = Pool} = HttpDb) ->
     couch_replicator_auth:cleanup(HttpDb),
     unlink(Pool),
-    ok = couch_replicator_httpc_pool:stop(Pool);
-db_close(DbName) ->
-    catch couch_db:close(DbName).
+    ok = couch_replicator_httpc_pool:stop(Pool).
 
 
 get_db_info(#httpdb{} = Db) ->
     send_req(Db, [],
         fun(200, _, {Props}) ->
             {ok, Props}
-        end);
-get_db_info(Db) ->
-    DbName = couch_db:name(Db),
-    UserCtx = couch_db:get_user_ctx(Db),
-    {ok, InfoDb} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
-    {ok, Info} = couch_db:get_db_info(InfoDb),
-    couch_db:close(InfoDb),
-    {ok, [{couch_util:to_binary(K), V} || {K, V} <- Info]}.
+        end).
 
 
 get_pending_count(#httpdb{} = Db, Seq) when is_number(Seq) ->
@@ -179,14 +146,8 @@ get_pending_count(#httpdb{} = Db, Seq) ->
     Options = [{path, "_changes"}, {qs, [{"since", ?JSON_ENCODE(Seq)}, {"limit", "0"}]}],
     send_req(Db, Options, fun(200, _, {Props}) ->
         {ok, couch_util:get_value(<<"pending">>, Props, null)}
-    end);
-get_pending_count(Db, Seq) when is_number(Seq) ->
-    DbName = couch_db:name(Db),
-    UserCtx = couch_db:get_user_ctx(Db),
-    {ok, CountDb} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
-    Pending = couch_db:count_changes_since(CountDb, Seq),
-    couch_db:close(CountDb),
-    {ok, Pending}.
+    end).
+
 
 get_view_info(#httpdb{} = Db, DDocId, ViewName) ->
     Path = io_lib:format("~s/_view/~s/_info", [DDocId, ViewName]),
@@ -194,11 +155,7 @@ get_view_info(#httpdb{} = Db, DDocId, ViewName) ->
         fun(200, _, {Props}) ->
             {VInfo} = couch_util:get_value(<<"view_index">>, Props, {[]}),
             {ok, VInfo}
-        end);
-get_view_info(Db, DDocId, ViewName) ->
-    DbName = couch_db:name(Db),
-    {ok, VInfo} = couch_mrview:get_view_info(DbName, DDocId, ViewName),
-    {ok, [{couch_util:to_binary(K), V} || {K, V} <- VInfo]}.
+        end).
 
 
 ensure_full_commit(#httpdb{} = Db) ->
@@ -210,9 +167,7 @@ ensure_full_commit(#httpdb{} = Db) ->
             {ok, get_value(<<"instance_start_time">>, Props)};
         (_, _, {Props}) ->
             {error, get_value(<<"error">>, Props)}
-        end);
-ensure_full_commit(Db) ->
-    couch_db:ensure_full_commit(Db).
+        end).
 
 
 get_missing_revs(#httpdb{} = Db, IdRevs) ->
@@ -232,10 +187,7 @@ get_missing_revs(#httpdb{} = Db, IdRevs) ->
                 {Id, MissingRevs, PossibleAncestors}
             end,
             {ok, lists:map(ConvertToNativeFun, Props)}
-        end);
-get_missing_revs(Db, IdRevs) ->
-    couch_db:get_missing_revs(Db, IdRevs).
-
+        end).
 
 
 open_doc_revs(#httpdb{retries = 0} = HttpDb, Id, Revs, Options, _Fun, _Acc) ->
@@ -331,10 +283,7 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) ->
                 wait = Wait
             },
             open_doc_revs(RetryDb, Id, Revs, Options, Fun, Acc)
-    end;
-open_doc_revs(Db, Id, Revs, Options, Fun, Acc) ->
-    {ok, Results} = couch_db:open_doc_revs(Db, Id, Revs, Options),
-    {ok, lists:foldl(fun(R, A) -> {_, A2} = Fun(R, A), A2 end, Acc, Results)}.
+    end.
 
 error_reason({http_request_failed, "GET", _Url, {error, timeout}}) ->
     timeout;
@@ -353,14 +302,7 @@ open_doc(#httpdb{} = Db, Id, Options) ->
             {ok, couch_doc:from_json_obj(Body)};
         (_, _, {Props}) ->
             {error, get_value(<<"error">>, Props)}
-        end);
-open_doc(Db, Id, Options) ->
-    case couch_db:open_doc(Db, Id, Options) of
-    {ok, _} = Ok ->
-        Ok;
-    {not_found, _Reason} ->
-        {error, <<"not_found">>}
-    end.
+        end).
 
 
 update_doc(Db, Doc, Options) ->
@@ -411,10 +353,7 @@ update_doc(#httpdb{} = HttpDb, #doc{id = DocId} = Doc, Options, Type) ->
                 {_, Error} ->
                     {error, Error}
                 end
-        end);
-update_doc(Db, Doc, Options, Type) ->
-    couch_db:update_doc(Db, Doc, Options, Type).
-
+        end).
 
 update_docs(Db, DocList, Options) ->
     update_docs(Db, DocList, Options, interactive_edit).
@@ -468,10 +407,7 @@ update_docs(#httpdb{} = HttpDb, DocList, Options, UpdateType) ->
                 {error, request_body_too_large};
            (417, _, Results) when is_list(Results) ->
                 {ok, bulk_results_to_errors(DocList, Results, remote)}
-        end);
-update_docs(Db, DocList, Options, UpdateType) ->
-    Result = couch_db:update_docs(Db, DocList, Options, UpdateType),
-    {ok, bulk_results_to_errors(DocList, Result, UpdateType)}.
+        end).
 
 
 changes_since(#httpdb{headers = Headers1, timeout = InactiveTimeout} = HttpDb,
@@ -538,38 +474,7 @@ changes_since(#httpdb{headers = Headers1, timeout = InactiveTimeout} = HttpDb,
             throw(retry_no_limit);
         exit:{http_request_failed, _, _, _} = Error ->
             throw({retry_limit, Error})
-    end;
-changes_since(Db, Style, StartSeq, UserFun, Options) ->
-    DocIds = get_value(doc_ids, Options),
-    Selector = get_value(selector, Options),
-    Filter = case {DocIds, Selector} of
-    {undefined, undefined} ->
-        ?b2l(get_value(filter, Options, <<>>));
-    {_, undefined} ->
-        "_doc_ids";
-    {undefined, _} ->
-        "_selector"
-    end,
-    Args = #changes_args{
-        style = Style,
-        since = StartSeq,
-        filter = Filter,
-        feed = case get_value(continuous, Options, false) of
-            true ->
-                "continuous";
-            false ->
-                "normal"
-        end,
-        timeout = infinity
-    },
-    QueryParams = get_value(query_params, Options, {[]}),
-    Req = changes_json_req(Db, Filter, QueryParams, Options),
-    ChangesFeedFun = couch_changes:handle_db_changes(Args, {json_req, Req}, Db),
-    ChangesFeedFun(fun({change, Change, _}, _) ->
-            UserFun(json_to_doc_info(Change));
-        (_, _) ->
-            ok
-    end).
+    end.
 
 
 % internal functions
@@ -614,29 +519,6 @@ parse_changes_feed(Options, UserFun, DataStreamFun) ->
         json_stream_parse:events(DataStreamFun, EventFun)
     end.
 
-changes_json_req(_Db, "", _QueryParams, _Options) ->
-    {[]};
-changes_json_req(_Db, "_doc_ids", _QueryParams, Options) ->
-    {[{<<"doc_ids">>, get_value(doc_ids, Options)}]};
-changes_json_req(_Db, "_selector", _QueryParams, Options) ->
-    {[{<<"selector">>, get_value(selector, Options)}]};
-changes_json_req(Db, FilterName, {QueryParams}, _Options) ->
-    {ok, Info} = couch_db:get_db_info(Db),
-    % simulate a request to db_name/_changes
-    {[
-        {<<"info">>, {Info}},
-        {<<"id">>, null},
-        {<<"method">>, 'GET'},
-        {<<"path">>, [couch_db:name(Db), <<"_changes">>]},
-        {<<"query">>, {[{<<"filter">>, FilterName} | QueryParams]}},
-        {<<"headers">>, []},
-        {<<"body">>, []},
-        {<<"peer">>, <<"replicator">>},
-        {<<"form">>, []},
-        {<<"cookie">>, []},
-        {<<"userCtx">>, couch_util:json_user_ctx(Db)}
-    ]}.
-
 
 options_to_query_args(HttpDb, Path, Options0) ->
     case lists:keytake(max_url_len, 1, Options0) of
@@ -1005,23 +887,6 @@ header_value(Key, Headers, Default) ->
     end.
 
 
-% Normalize an #httpdb{} or #db{} record such that it can be used for
-% comparisons. This means remove things like pids and also sort options / props.
-normalize_db(#httpdb{} = HttpDb) ->
-    #httpdb{
-        url = HttpDb#httpdb.url,
-        auth_props = lists:sort(HttpDb#httpdb.auth_props),
-        headers = lists:keysort(1, HttpDb#httpdb.headers),
-        timeout = HttpDb#httpdb.timeout,
-        ibrowse_options = lists:keysort(1, HttpDb#httpdb.ibrowse_options),
-        retries = HttpDb#httpdb.retries,
-        http_connections = HttpDb#httpdb.http_connections
-    };
-
-normalize_db(<<DbName/binary>>) ->
-    DbName.
-
-
 maybe_append_create_query_params(Db, []) ->
     Db;
 
@@ -1030,27 +895,37 @@ maybe_append_create_query_params(Db, CreateParams) ->
     Db#httpdb{url = NewUrl}.
 
 
--ifdef(TEST).
-
--include_lib("eunit/include/eunit.hrl").
-
-
-normalize_http_db_test() ->
-    HttpDb =  #httpdb{
-        url = "http://host/db",
-        auth_props = [{"key", "val"}],
-        headers = [{"k2","v2"}, {"k1","v1"}],
-        timeout = 30000,
-        ibrowse_options = [{k2, v2}, {k1, v1}],
-        retries = 10,
-        http_connections = 20
-    },
-    Expected = HttpDb#httpdb{
-        headers = [{"k1","v1"}, {"k2","v2"}],
-        ibrowse_options = [{k1, v1}, {k2, v2}]
-    },
-    ?assertEqual(Expected, normalize_db(HttpDb)),
-    ?assertEqual(<<"local">>, normalize_db(<<"local">>)).
-
-
--endif.
+db_from_json(#{} = DbMap) ->
+    #{
+        <<"url">> := Url,
+        <<"auth">> := Auth,
+        <<"headers">> := Headers0,
+        <<"ibrowse_options">> := IBrowseOptions0,
+        <<"timeout">> := Timeout,
+        <<"http_connections">> := HttpConnections,
+        <<"retries">> := Retries,
+        <<"proxy_url">> := ProxyURL0
+    } = DbMap,
+    Headers = maps:fold(fun(K, V, Acc) ->
+        [{binary_to_list(K), binary_to_list(V)} | Acc]
+    end, [], Headers0),
+    IBrowseOptions0 = maps:fold(fun
+        (K, V, Acc) when is_binary(V) ->
+            [{binary_to_atom(K), binary_to_list(V)} | Acc];
+        (K, V, Acc) ->
+            [{binary_to_atom(K), V} | Acc]
+    end, [], IBrowseOptions0),
+    ProxyUrl = case ProxyUrl0 of
+        null -> undefined,
+        V when is_binary(V) -> binary_to_list(V)
+    end,
+    #httpdb{
+        url = binary_to_list(Url),
+        auth_props = maps:to_list(Auth),
+        headers = Headers,
+        ibrowse_options = IBrowseOptions,
+        timeout = Timeout,
+        http_connections = HttpConnections,
+        retries = Retries,
+        proxy_url = ProxyURL
+    }.
diff --git a/src/couch_replicator/src/couch_replicator_clustering.erl b/src/couch_replicator/src/couch_replicator_clustering.erl
deleted file mode 100644
index a7f7573..0000000
--- a/src/couch_replicator/src/couch_replicator_clustering.erl
+++ /dev/null
@@ -1,248 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-
-% Maintain cluster membership and stability notifications for replications.
-% On changes to cluster membership, broadcast events to `replication` gen_event.
-% Listeners will get `{cluster, stable}` or `{cluster, unstable}` events.
-%
-% Cluster stability is defined as "there have been no nodes added or removed in
-% last `QuietPeriod` seconds". QuietPeriod value is configurable. To ensure a
-% speedier startup, during initialization there is a shorter StartupPeriod
-% in effect (also configurable).
-%
-% This module is also in charge of calculating ownership of replications based
-% on where their _replicator db documents shards live.
-
-
--module(couch_replicator_clustering).
-
--behaviour(gen_server).
--behaviour(config_listener).
--behaviour(mem3_cluster).
-
--export([
-    start_link/0
-]).
-
--export([
-    init/1,
-    terminate/2,
-    handle_call/3,
-    handle_info/2,
-    handle_cast/2,
-    code_change/3
-]).
-
--export([
-    owner/2,
-    is_stable/0,
-    link_cluster_event_listener/3
-]).
-
-% config_listener callbacks
--export([
-    handle_config_change/5,
-    handle_config_terminate/3
-]).
-
-% mem3_cluster callbacks
--export([
-    cluster_stable/1,
-    cluster_unstable/1
-]).
-
--include_lib("couch/include/couch_db.hrl").
--include_lib("mem3/include/mem3.hrl").
-
--define(DEFAULT_QUIET_PERIOD, 60). % seconds
--define(DEFAULT_START_PERIOD, 5). % seconds
--define(RELISTEN_DELAY, 5000).
-
--record(state, {
-    mem3_cluster_pid :: pid(),
-    cluster_stable :: boolean()
-}).
-
-
--spec start_link() -> {ok, pid()} | ignore | {error, term()}.
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-
-% owner/2 function computes ownership for a {DbName, DocId} tuple
-% `unstable` if cluster is considered to be unstable i.e. it has changed
-% recently, or returns node() which of the owner.
-%
--spec owner(Dbname :: binary(), DocId :: binary()) -> node() | unstable.
-owner(<<"shards/", _/binary>> = DbName, DocId) ->
-    case is_stable() of
-        false ->
-            unstable;
-        true ->
-            owner_int(DbName, DocId)
-    end;
-owner(_DbName, _DocId) ->
-    node().
-
-
--spec is_stable() -> true | false.
-is_stable() ->
-    gen_server:call(?MODULE, is_stable).
-
-
--spec link_cluster_event_listener(atom(), atom(), list()) -> pid().
-link_cluster_event_listener(Mod, Fun, Args)
-        when is_atom(Mod), is_atom(Fun), is_list(Args) ->
-    CallbackFun =
-        fun(Event = {cluster, _}) -> erlang:apply(Mod, Fun, Args ++ [Event]);
-           (_) -> ok
-        end,
-    {ok, Pid} = couch_replicator_notifier:start_link(CallbackFun),
-    Pid.
-
-
-% Mem3 cluster callbacks
-
-cluster_unstable(Server) ->
-    ok = gen_server:call(Server, set_unstable),
-    couch_replicator_notifier:notify({cluster, unstable}),
-    couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
-    couch_log:notice("~s : cluster unstable", [?MODULE]),
-    Server.
-
-cluster_stable(Server) ->
-    ok = gen_server:call(Server, set_stable),
-    couch_replicator_notifier:notify({cluster, stable}),
-    couch_stats:update_gauge([couch_replicator, cluster_is_stable], 1),
-    couch_log:notice("~s : cluster stable", [?MODULE]),
-    Server.
-
-
-% gen_server callbacks
-
-init([]) ->
-    ok = config:listen_for_changes(?MODULE, nil),
-    Period = abs(config:get_integer("replicator", "cluster_quiet_period",
-        ?DEFAULT_QUIET_PERIOD)),
-    StartPeriod = abs(config:get_integer("replicator", "cluster_start_period",
-        ?DEFAULT_START_PERIOD)),
-    couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
-    {ok, Mem3Cluster} = mem3_cluster:start_link(?MODULE, self(), StartPeriod,
-        Period),
-    {ok, #state{mem3_cluster_pid = Mem3Cluster, cluster_stable = false}}.
-
-
-terminate(_Reason, _State) ->
-    ok.
-
-
-handle_call(is_stable, _From, #state{cluster_stable = IsStable} = State) ->
-    {reply, IsStable, State};
-
-handle_call(set_stable, _From, State) ->
-    {reply, ok, State#state{cluster_stable = true}};
-
-handle_call(set_unstable, _From, State) ->
-    {reply, ok, State#state{cluster_stable = false}}.
-
-
-handle_cast({set_period, Period}, #state{mem3_cluster_pid = Pid} = State) ->
-    ok = mem3_cluster:set_period(Pid, Period),
-    {noreply, State}.
-
-
-handle_info(restart_config_listener, State) ->
-    ok = config:listen_for_changes(?MODULE, nil),
-    {noreply, State}.
-
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-
-%% Internal functions
-
-
-handle_config_change("replicator", "cluster_quiet_period", V, _, S) ->
-    ok = gen_server:cast(?MODULE, {set_period, list_to_integer(V)}),
-    {ok, S};
-handle_config_change(_, _, _, _, S) ->
-    {ok, S}.
-
-
-handle_config_terminate(_, stop, _) -> ok;
-handle_config_terminate(_S, _R, _St) ->
-    Pid = whereis(?MODULE),
-    erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener).
-
-
--spec owner_int(binary(), binary()) -> node().
-owner_int(ShardName, DocId) ->
-    DbName = mem3:dbname(ShardName),
-    Live = [node() | nodes()],
-    Shards = mem3:shards(DbName, DocId),
-    Nodes = [N || #shard{node=N} <- Shards, lists:member(N, Live)],
-    mem3:owner(DbName, DocId, Nodes).
-
-
-
--ifdef(TEST).
-
--include_lib("eunit/include/eunit.hrl").
-
-
-replicator_clustering_test_() ->
-    {
-        foreach,
-        fun setup/0,
-        fun teardown/1,
-        [
-            t_stable_callback(),
-            t_unstable_callback()
-        ]
-    }.
-
-
-t_stable_callback() ->
-    ?_test(begin
-        ?assertEqual(false, is_stable()),
-        cluster_stable(whereis(?MODULE)),
-        ?assertEqual(true, is_stable())
-    end).
-
-
-t_unstable_callback() ->
-    ?_test(begin
-        cluster_stable(whereis(?MODULE)),
-        ?assertEqual(true, is_stable()),
-        cluster_unstable(whereis(?MODULE)),
-        ?assertEqual(false, is_stable())
-    end).
-
-
-setup() ->
-    meck:expect(couch_log, notice, 2, ok),
-    meck:expect(config, get, fun(_, _, Default) -> Default end),
-    meck:expect(config, listen_for_changes, 2, ok),
-    meck:expect(couch_stats, update_gauge, 2, ok),
-    meck:expect(couch_replicator_notifier, notify, 1, ok),
-    {ok, Pid} = start_link(),
-    Pid.
-
-
-teardown(Pid) ->
-    unlink(Pid),
-    exit(Pid, kill),
-    meck:unload().
-
--endif.
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index 174fdda..a2887a0 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -38,8 +38,7 @@
     doc/2,
     doc_lookup/3,
     update_docs/0,
-    get_worker_ref/1,
-    notify_cluster_event/2
+    get_worker_ref/1
 ]).
 
 -include_lib("couch/include/couch_db.hrl").
@@ -231,8 +230,6 @@ start_link() ->
 init([]) ->
     ?MODULE = ets:new(?MODULE, [named_table, {keypos, #rdoc.id},
         {read_concurrency, true}, {write_concurrency, true}]),
-    couch_replicator_clustering:link_cluster_event_listener(?MODULE,
-        notify_cluster_event, [self()]),
     {ok, nil}.
 
 
@@ -256,15 +253,6 @@ handle_call({clean_up_replications, DbName}, _From, State) ->
     ok = removed_db(DbName),
     {reply, ok, State}.
 
-handle_cast({cluster, unstable}, State) ->
-    % Ignoring unstable state transition
-    {noreply, State};
-
-handle_cast({cluster, stable}, State) ->
-    % Membership changed recheck all the replication document ownership
-    nil = ets:foldl(fun cluster_membership_foldl/2, nil, ?MODULE),
-    {noreply, State};
-
 handle_cast(Msg, State) ->
     {stop, {error, unexpected_message, Msg}, State}.
 
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index de34b53..b427d9a 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -27,7 +27,7 @@
     update_doc_completed/3,
     update_failed/3,
     update_rep_id/1,
-    update_triggered/2,
+    update_triggered/3,
     update_error/2
 ]).
 
@@ -57,6 +57,22 @@
 -define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}).
 -define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
 
+-define(DEFAULT_SOCK_OPTS, "[{keepalive, true}, {nodelay, false}]").
+-define(VALID_SOCK_OPTS, [buffer, delay_send, exit_on_close, ipv6_v6only,
+    keepalive, nodelay, recbuf, send_timeout, send_timout_close, sndbuf,
+    priority, tos, tclass
+]).
+-define(CONFIG_DEFAULTS, [
+    {"worker_processes",    "4",                fun list_to_integer/1},
+    {"worker_batch_size",   "500",              fun list_to_integer/1},
+    {"http_connections",    "20",               fun list_to_integer/1},
+    {"connection_timeout",  "30000",            fun list_to_integer/1},
+    {"retries_per_request", "5",                fun list_to_integer/1},
+    {"use_checkpoints",     "true",             fun list_to_existing_atom/1},
+    {"checkpoint_interval", "30000",            fun list_to_integer/1},
+    {"socket_options",      ?DEFAULT_SOCK_OPTS, fun parse_sock_opts/1}
+]).
+
 
 remove_state_fields(DbName, DocId) ->
     update_rep_doc(DbName, DocId, [
@@ -90,16 +106,12 @@ update_failed(DbName, DocId, Error) ->
         failed_state_updates]).
 
 
--spec update_triggered(#rep{}, rep_id()) -> ok.
-update_triggered(Rep, {Base, Ext}) ->
-    #rep{
-        db_name = DbName,
-        doc_id = DocId
-    } = Rep,
+-spec update_triggered(binary(), binary(), binary()) -> ok.
+update_triggered(Id, DocId, DbName) ->
     update_rep_doc(DbName, DocId, [
         {<<"_replication_state">>, <<"triggered">>},
         {<<"_replication_state_reason">>, undefined},
-        {<<"_replication_id">>, iolist_to_binary([Base, Ext])},
+        {<<"_replication_id">>, Id},
         {<<"_replication_stats">>, undefined}]),
     ok.
 
@@ -183,28 +195,7 @@ replication_design_doc_props(DDocId) ->
     ].
 
 
-% Note: parse_rep_doc can handle filtered replications. During parsing of the
-% replication doc it will make possibly remote http requests to the source
-% database. If failure or parsing of filter docs fails, parse_doc throws a
-% {filter_fetch_error, Error} excation. This exception should be considered
-% transient in respect to the contents of the document itself, since it depends
-% on netowrk availability of the source db and other factors.
--spec parse_rep_doc({[_]}) -> #rep{}.
-parse_rep_doc(RepDoc) ->
-    {ok, Rep} = try
-        parse_rep_doc(RepDoc, rep_user_ctx(RepDoc))
-    catch
-        throw:{error, Reason} ->
-            throw({bad_rep_doc, Reason});
-        throw:{filter_fetch_error, Reason} ->
-            throw({filter_fetch_error, Reason});
-        Tag:Err ->
-            throw({bad_rep_doc, to_binary({Tag, Err})})
-    end,
-    Rep.
-
-
--spec parse_rep_doc_without_id({[_]}) -> #rep{}.
+-spec parse_rep_doc_without_id({[_]}) -> #{}.
 parse_rep_doc_without_id(RepDoc) ->
     {ok, Rep} = try
         parse_rep_doc_without_id(RepDoc, rep_user_ctx(RepDoc))
@@ -217,11 +208,12 @@ parse_rep_doc_without_id(RepDoc) ->
     Rep.
 
 
--spec parse_rep_doc({[_]}, #user_ctx{}) -> {ok, #rep{}}.
+-spec parse_rep_doc({[_]}, #user_ctx{}) -> {ok, #{}}.
 parse_rep_doc(Doc, UserCtx) ->
     {ok, Rep} = parse_rep_doc_without_id(Doc, UserCtx),
-    Cancel = get_value(cancel, Rep#rep.options, false),
-    Id = get_value(id, Rep#rep.options, nil),
+    #{<<"options">> := Options} = Rep,
+    Cancel = maps:get(<<"cancel">>, Options, false),
+    Id = maps:get(<<"id">>, Options, nil),
     case {Cancel, Id} of
         {true, nil} ->
             % Cancel request with no id, must parse id out of body contents
@@ -235,14 +227,20 @@ parse_rep_doc(Doc, UserCtx) ->
     end.
 
 
--spec parse_rep_doc_without_id({[_]}, #user_ctx{}) -> {ok, #rep{}}.
-parse_rep_doc_without_id({Props}, UserCtx) ->
-    Proxy = get_value(<<"proxy">>, Props, <<>>),
-    Opts = make_options(Props),
-    case get_value(cancel, Opts, false) andalso
-        (get_value(id, Opts, nil) =/= nil) of
+-spec parse_rep_doc_without_id({[_]} | #{}, #user_ctx{}) -> {ok, #{}}.
+parse_rep_doc_without_id({[_]} = EJSon, UserCtx) ->
+    % Normalize all field names to be binaries and turn into a map
+    Map = ?JSON_DECODE(?JSON_ENCODE(EJSon)),
+    parse_rep_doc_without_id(Map, UserCtx);
+
+parse_rep_doc_without_id(#{} = Doc, UserCtx) ->
+    Proxy = maps:get(<<"proxy">>, Doc, <<>>),
+    Opts = make_options(Doc),
+    Cancel = maps:get(<<"cancel">>, Opts, false),
+    Id = maps:get(<<"id">>, Opts, nil),
+    case Cancel andalso Id =/= nill of
     true ->
-        {ok, #rep{options = Opts, user_ctx = UserCtx}};
+        {ok, #{<<"options">> => Opts, user_ctx = UserCtx}};
     false ->
         Source = parse_rep_db(get_value(<<"source">>, Props), Proxy, Opts),
         Target = parse_rep_db(get_value(<<"target">>, Props), Proxy, Opts),
@@ -272,33 +270,14 @@ parse_rep_doc_without_id({Props}, UserCtx) ->
     end.
 
 
-rep_to_map(null) ->
-    null;
-
-rep_to_map(#rep{} = Rep) ->
-    {IdBase, IdExt} = Rep#rep.id,
-    #{
-        <<"id_base">> => IdBase,
-        <<"id_ext">> => IdExt,
-        <<"source">> => Rep#rep.source,
-        <<"target">> => Rep#rep.target,
-        <<"options">> => Rep#rep.options,
-        <<"user_ctx">> => Rep#rep.user_ctx,
-        <<"type">> => Rep#rep.type,
-        <<"view">> => Rep#rep.view,
-        <<"doc_id">> => Rep#rep.doc_id,
-        <<"db_name">> => Rep#rep.db_name,
-        <<"start_time">> = Rep#rep.start_time,
-        <<"stats">> = Rep#rep.stats
-    }.
-
 % Update a #rep{} record with a replication_id. Calculating the id might involve
 % fetching a filter from the source db, and so it could fail intermetently.
 % In case of a failure to fetch the filter this function will throw a
 %  `{filter_fetch_error, Reason} exception.
-update_rep_id(Rep) ->
-    RepId = couch_replicator_ids:replication_id(Rep),
-    Rep#rep{id = RepId}.
+update_rep_id(#{} = Rep) ->
+    {BaseId, ExtId} = couch_replicator_ids:replication_id(Rep),
+    RepId = erlang:iolist_to_binary([BaseId, ExtId]),
+    Rep#{<<"id">> => RepId, <<"base_id">> = list_toBaseId}.
 
 
 update_rep_doc(RepDbName, RepDocId, KVs) ->
@@ -445,101 +424,84 @@ maybe_add_trailing_slash(Url) ->
     end.
 
 
--spec make_options([_]) -> [_].
-make_options(Props) ->
-    Options0 = lists:ukeysort(1, convert_options(Props)),
+-spec make_options(#{}) -> #{}.
+make_options(#{} = RepDoc) ->
+    Options0 = maps:fold(fun convert_options/3, #{}, RepDoc)
     Options = check_options(Options0),
-    DefWorkers = config:get("replicator", "worker_processes", "4"),
-    DefBatchSize = config:get("replicator", "worker_batch_size", "500"),
-    DefConns = config:get("replicator", "http_connections", "20"),
-    DefTimeout = config:get("replicator", "connection_timeout", "30000"),
-    DefRetries = config:get("replicator", "retries_per_request", "5"),
-    UseCheckpoints = config:get("replicator", "use_checkpoints", "true"),
-    DefCheckpointInterval = config:get("replicator", "checkpoint_interval",
-        "30000"),
-    {ok, DefSocketOptions} = couch_util:parse_term(
-        config:get("replicator", "socket_options",
-            "[{keepalive, true}, {nodelay, false}]")),
-    lists:ukeymerge(1, Options, lists:keysort(1, [
-        {connection_timeout, list_to_integer(DefTimeout)},
-        {retries, list_to_integer(DefRetries)},
-        {http_connections, list_to_integer(DefConns)},
-        {socket_options, DefSocketOptions},
-        {worker_batch_size, list_to_integer(DefBatchSize)},
-        {worker_processes, list_to_integer(DefWorkers)},
-        {use_checkpoints, list_to_existing_atom(UseCheckpoints)},
-        {checkpoint_interval, list_to_integer(DefCheckpointInterval)}
-    ])).
-
-
--spec convert_options([_]) -> [_].
-convert_options([])->
-    [];
-convert_options([{<<"cancel">>, V} | _R]) when not is_boolean(V)->
+    ConfigOptions = lists:foldl(fun({K, Default, ConversionFun}, Acc) ->
+        V = ConversionFun(config:get("replicator", K, Default)),
+        Acc#{list_to_binary(K) => V}
+    end, #{}, ?CONFIG_DEFAULTS),
+    maps:merge(ConfigOptions, Options).
+
+
+-spec convert_options(binary(), any(), #{}) -> #{}.
+convert_options(<<"cancel">>, V, _Acc) when not is_boolean(V)->
     throw({bad_request, <<"parameter `cancel` must be a boolean">>});
-convert_options([{<<"cancel">>, V} | R]) ->
-    [{cancel, V} | convert_options(R)];
-convert_options([{IdOpt, V} | R]) when IdOpt =:= <<"_local_id">>;
+convert_options(<<"cancel">>, V, Acc) ->
+    Acc#{<<"cancel">> => V};
+convert_options(IdOpt, V, Acc) when IdOpt =:= <<"_local_id">>;
         IdOpt =:= <<"replication_id">>; IdOpt =:= <<"id">> ->
-    [{id, couch_replicator_ids:convert(V)} | convert_options(R)];
-convert_options([{<<"create_target">>, V} | _R]) when not is_boolean(V)->
+    Acc#{<<"id">> => couch_replicator_ids:convert(V)};
+convert_options(<<"create_target">>, V, _Acc) when not is_boolean(V)->
     throw({bad_request, <<"parameter `create_target` must be a boolean">>});
-convert_options([{<<"create_target">>, V} | R]) ->
-    [{create_target, V} | convert_options(R)];
-convert_options([{<<"create_target_params">>, V} | _R]) when not is_tuple(V) ->
+convert_options(<<"create_target">>, V, Acc) ->
+    Acc#{<<"create_target">> => V};
+convert_options(<<"create_target_params">>, V, _Acc) when not is_tuple(V) ->
     throw({bad_request,
         <<"parameter `create_target_params` must be a JSON object">>});
-convert_options([{<<"create_target_params">>, V} | R]) ->
-    [{create_target_params, V} | convert_options(R)];
-convert_options([{<<"continuous">>, V} | _R]) when not is_boolean(V)->
+convert_options(<<"create_target_params">>, V, Acc) ->
+    Acc#{<<"create_target_params">> => V};
+convert_options(<<"continuous">>, V, Acc) when not is_boolean(V)->
     throw({bad_request, <<"parameter `continuous` must be a boolean">>});
-convert_options([{<<"continuous">>, V} | R]) ->
-    [{continuous, V} | convert_options(R)];
-convert_options([{<<"filter">>, V} | R]) ->
-    [{filter, V} | convert_options(R)];
-convert_options([{<<"query_params">>, V} | R]) ->
-    [{query_params, V} | convert_options(R)];
-convert_options([{<<"doc_ids">>, null} | R]) ->
-    convert_options(R);
-convert_options([{<<"doc_ids">>, V} | _R]) when not is_list(V) ->
+convert_options(<<"continuous">>, V, Acc) ->
+    Acc#{<<"continuous">> => V};
+convert_options(<<"filter">>, V, Acc) ->
+    Acc#{<<"filter">> => V};
+convert_options(<<"query_params">>, V, Acc) ->
+    Acc#{<<"query_params">> => V};
+convert_options(<<"doc_ids">>, null, Acc) ->
+    Acc;
+convert_options(<<"doc_ids">>, V, _Acc) when not is_list(V) ->
     throw({bad_request, <<"parameter `doc_ids` must be an array">>});
-convert_options([{<<"doc_ids">>, V} | R]) ->
+convert_options(<<"doc_ids">>, V, Acc) ->
     % Ensure same behaviour as old replicator: accept a list of percent
     % encoded doc IDs.
     DocIds = lists:usort([?l2b(couch_httpd:unquote(Id)) || Id <- V]),
-    [{doc_ids, DocIds} | convert_options(R)];
-convert_options([{<<"selector">>, V} | _R]) when not is_tuple(V) ->
+    Acc#{<<"doc_ids">> => DocIds};
+convert_options(<<"selector">>, V, _Acc) when not is_tuple(V) ->
     throw({bad_request, <<"parameter `selector` must be a JSON object">>});
-convert_options([{<<"selector">>, V} | R]) ->
-    [{selector, V} | convert_options(R)];
-convert_options([{<<"worker_processes">>, V} | R]) ->
-    [{worker_processes, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"worker_batch_size">>, V} | R]) ->
-    [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"http_connections">>, V} | R]) ->
-    [{http_connections, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"connection_timeout">>, V} | R]) ->
-    [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"retries_per_request">>, V} | R]) ->
-    [{retries, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"socket_options">>, V} | R]) ->
-    {ok, SocketOptions} = couch_util:parse_term(V),
-    [{socket_options, SocketOptions} | convert_options(R)];
-convert_options([{<<"since_seq">>, V} | R]) ->
-    [{since_seq, V} | convert_options(R)];
-convert_options([{<<"use_checkpoints">>, V} | R]) ->
-    [{use_checkpoints, V} | convert_options(R)];
-convert_options([{<<"checkpoint_interval">>, V} | R]) ->
-    [{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([_ | R]) -> % skip unknown option
-    convert_options(R).
-
-
--spec check_options([_]) -> [_].
+convert_options(<<"selector">>, V, Acc) ->
+    Acc#{<<"selector">> => V};
+convert_options(<<"worker_processes">>, V, Acc) ->
+    Acc#{<<"worker_processes">> => couch_util:to_integer(V)};
+convert_options(<<"worker_batch_size">>, V, Acc) ->
+    Acc#{<<"worker_batch_size">> => couch_util:to_integer(V)};
+convert_options(<<"http_connections">>, V, Acc) ->
+    Acc#{<<"http_connections">> => couch_util:to_integer(V)};
+convert_options(<<"connection_timeout">>, V, Acc) ->
+    Acc#{<<"connection_timeout">> => couch_util:to_integer(V)};
+convert_options(<<"retries_per_request">>, V, Acc) ->
+    Acc#{<<"retries">> => couch_util:to_integer(V)};
+convert_options(<<"socket_options">>, V, Acc) ->
+    Acc#{<<"socket_options">> => parse_sock_opts(V)};
+convert_options(<<"since_seq">>, V, Acc) ->
+    Acc#{<<"since_seq">> => V};
+convert_options(<<"use_checkpoints">>, V, Acc) when not is_boolean(V)->
+    throw({bad_request, <<"parameter `use_checkpoints` must be a boolean">>});
+convert_options(<<"use_checkpoints">>, V, Acc) ->
+    Acc#{<<"use_checkpoints">> => V};
+convert_options(<<"checkpoint_interval">>, V, Acc) ->
+    Acc#{<<"checkpoint_interval">>, couch_util:to_integer(V)};
+convert_options(_K, _V, Acc) -> % skip unknown option
+    Acc.
+
+
+-spec check_options(#{}) -> #{}.
 check_options(Options) ->
-    DocIds = lists:keyfind(doc_ids, 1, Options),
-    Filter = lists:keyfind(filter, 1, Options),
-    Selector = lists:keyfind(selector, 1, Options),
+    DocIds = maps:is_key(<<"doc_ids">>, Options),
+    Filter = maps:is_key(<<"filter">>, Options),
+    Selector = maps:is_key(<<"selector">>, Options),
     case {DocIds, Filter, Selector} of
         {false, false, false} -> Options;
         {false, false, _} -> Options;
@@ -551,6 +513,19 @@ check_options(Options) ->
     end.
 
 
+parse_sock_opts(V) ->
+    {ok, SocketOptions} = couch_util:parse_term(V),
+    lists:foldl(fun
+        ({K, V}, Acc) when is_atom(K) ->
+            case lists:member(K, ?VALID_SOCKET_OPTIONS) of
+                true -> Acc#{atom_to_binary(K) => V};
+                false -> Acc
+            end;
+        (_, Acc) ->
+            Acc
+    end, #{}, SocketOptions).
+
+
 -spec parse_proxy_params(binary() | [_]) -> [_].
 parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
     parse_proxy_params(?b2l(ProxyUrl));
diff --git a/src/couch_replicator/src/couch_replicator_filters.erl b/src/couch_replicator/src/couch_replicator_filters.erl
index 5668820..940b8ad 100644
--- a/src/couch_replicator/src/couch_replicator_filters.erl
+++ b/src/couch_replicator/src/couch_replicator_filters.erl
@@ -63,9 +63,9 @@ parse(Options) ->
 % Fetches body of filter function from source database. Guaranteed to either
 % return {ok, Body} or an {error, Reason}. Also assume this function might
 % block due to network / socket issues for an undeterminted amount of time.
--spec fetch(binary(), binary(), binary(), #user_ctx{}) ->
+-spec fetch(binary(), binary(), binary(), #{}) ->
     {ok, {[_]}} | {error, binary()}.
-fetch(DDocName, FilterName, Source, UserCtx) ->
+fetch(DDocName, FilterName, Source, #{} = UserCtx) ->
     {Pid, Ref} = spawn_monitor(fun() ->
         try fetch_internal(DDocName, FilterName, Source, UserCtx) of
             Resp ->
@@ -108,9 +108,8 @@ view_type(Props, Options) ->
 
 % Private functions
 
-fetch_internal(DDocName, FilterName, Source, UserCtx) ->
-    Db = case (catch couch_replicator_api_wrap:db_open(Source,
-        [{user_ctx, UserCtx}])) of
+fetch_internal(DDocName, FilterName, Source, #{} = UserCtx) ->
+    Db = case (catch couch_replicator_api_wrap:db_open(Source, UserCtx) of
     {ok, Db0} ->
         Db0;
     DbError ->
diff --git a/src/couch_replicator/src/couch_replicator_ids.erl b/src/couch_replicator/src/couch_replicator_ids.erl
index e10b980..0d73cbf 100644
--- a/src/couch_replicator/src/couch_replicator_ids.erl
+++ b/src/couch_replicator/src/couch_replicator_ids.erl
@@ -30,28 +30,29 @@
 %  {filter_fetch_error, Error} exception.
 %
 
-replication_id(#rep{options = Options} = Rep) ->
+replication_id(#{<<"options">> := Options} = Rep) ->
     BaseId = replication_id(Rep, ?REP_ID_VERSION),
-    {BaseId, maybe_append_options([continuous, create_target], Options)}.
+    UseOpts = [<<"continuous">>, <<"create_target">>]
+    {BaseId, maybe_append_options(UseOpts, Options)}.
 
 
 % Versioned clauses for generating replication IDs.
 % If a change is made to how replications are identified,
 % please add a new clause and increase ?REP_ID_VERSION.
 
-replication_id(#rep{user_ctx = UserCtx} = Rep, 4) ->
+replication_id(#{<<"user_ctx">> := UserCtx} = Rep, 4) ->
     UUID = couch_server:get_uuid(),
-    SrcInfo = get_v4_endpoint(UserCtx, Rep#rep.source),
-    TgtInfo = get_v4_endpoint(UserCtx, Rep#rep.target),
+    SrcInfo = get_v4_endpoint(UserCtx, maps:get(<<"source">>, Rep)),
+    TgtInfo = get_v4_endpoint(UserCtx, maps:get(<<"target">>, Rep)),
     maybe_append_filters([UUID, SrcInfo, TgtInfo], Rep);
 
-replication_id(#rep{user_ctx = UserCtx} = Rep, 3) ->
+replication_id(#{<<"user_ctx">> := UserCtx} = Rep, 3) ->
     UUID = couch_server:get_uuid(),
-    Src = get_rep_endpoint(UserCtx, Rep#rep.source),
-    Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
+    Src = get_rep_endpoint(UserCtx, maps:get(<<"source">>, Rep)),
+    Tgt = get_rep_endpoint(UserCtx, maps:get(<<"target">>, Res)),
     maybe_append_filters([UUID, Src, Tgt], Rep);
 
-replication_id(#rep{user_ctx = UserCtx} = Rep, 2) ->
+replication_id(#{<<"user_ctx">> := UserCtx} = Rep, 2) ->
     {ok, HostName} = inet:gethostname(),
     Port = case (catch mochiweb_socket_server:get(couch_httpd, port)) of
     P when is_number(P) ->
@@ -64,14 +65,14 @@ replication_id(#rep{user_ctx = UserCtx} = Rep, 2) ->
         % ... mochiweb_socket_server:get(https, port)
         list_to_integer(config:get("httpd", "port", "5984"))
     end,
-    Src = get_rep_endpoint(UserCtx, Rep#rep.source),
-    Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
+    Src = get_rep_endpoint(UserCtx, maps:get(<<"source">>, Rep)),
+    Tgt = get_rep_endpoint(UserCtx, maps:get(<<"target">>, Rep)),
     maybe_append_filters([HostName, Port, Src, Tgt], Rep);
 
-replication_id(#rep{user_ctx = UserCtx} = Rep, 1) ->
+replication_id(#{<<"user_ctx">> := UserCtx} = Rep, 1) ->
     {ok, HostName} = inet:gethostname(),
-    Src = get_rep_endpoint(UserCtx, Rep#rep.source),
-    Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
+    Src = get_rep_endpoint(UserCtx, maps:get(<<"source">>, Rep)),
+    Tgt = get_rep_endpoint(UserCtx, maps:get(<<"target">>, Rep)),
     maybe_append_filters([HostName, Src, Tgt], Rep).
 
 
@@ -83,15 +84,24 @@ convert(Id0) when is_binary(Id0) ->
     % the URL path. So undo the incorrect parsing here to avoid forcing
     % users to url encode + characters.
     Id = binary:replace(Id0, <<" ">>, <<"+">>, [global]),
-    lists:splitwith(fun(Char) -> Char =/= $+ end, ?b2l(Id));
-convert({BaseId, Ext} = Id) when is_list(BaseId), is_list(Ext) ->
+    case binary:split(Id, <<"+">>) of
+        [BaseId, Ext] -> {BaseId, Ext};
+        [BaseId] -> {BaseId, <<>>}
+    end
+convert({BaseId, Ext}) when is_list(BaseId), is_list(Ext) ->
+    {list_to_binary(BaseId), list_to_binary(Ext)};
+convert({BaseId, Ext} = Id) when is_binary(BaseId), is_binary(Ext) ->
     Id.
 
 
 % Private functions
 
-maybe_append_filters(Base,
-        #rep{source = Source, user_ctx = UserCtx, options = Options}) ->
+maybe_append_filters(Base, #{} = Rep) ->
+    #{
+        <<"source">> := Source,
+        <<"user_ctx">> := UserCtx,
+        <<"options">> := Options
+    } = Rep,
     Base2 = Base ++
         case couch_replicator_filters:parse(Options) of
         {ok, nil} ->
@@ -112,7 +122,8 @@ maybe_append_filters(Base,
         {error, FilterParseError} ->
             throw({error, FilterParseError})
         end,
-    couch_util:to_hex(couch_hash:md5_hash(term_to_binary(Base2))).
+    Res = couch_util:to_hex(couch_hash:md5_hash(term_to_binary(Base2))),
+    list_to_binary(Res).
 
 
 maybe_append_options(Options, RepOptions) ->
@@ -127,14 +138,20 @@ maybe_append_options(Options, RepOptions) ->
     end, [], Options).
 
 
-get_rep_endpoint(_UserCtx, #httpdb{url=Url, headers=Headers}) ->
+get_rep_endpoint(_UserCtx, #{<<"url">> = Url0, <<"headers">> = Headers0}) ->
+    Url = binary_to_list(Url0),
+    Headers1 = maps:fold(fun(K, V, Acc) ->
+        [{binary_to_list(K), binary_to_list(V)} | Acc]
+    end, [], Header0),
+    Headers2 = lists:keysort(1, Headers1),
     DefaultHeaders = (#httpdb{})#httpdb.headers,
-    {remote, Url, Headers -- DefaultHeaders};
-get_rep_endpoint(UserCtx, <<DbName/binary>>) ->
-    {local, DbName, UserCtx}.
+    {remote, Url, Headers2 -- DefaultHeaders};
+get_rep_endpoint(#{} = UserCtx, <<DbName/binary>>) ->
+    UserCtxRec = couch_replicator_utils:user_ctx_from_json(UserCtx),
+    {local, DbName, UserCtxRec}.
 
 
-get_v4_endpoint(UserCtx, #httpdb{} = HttpDb) ->
+get_v4_endpoint(#{} = UserCtx, #{} = HttpDb) ->
     {remote, Url, Headers} = get_rep_endpoint(UserCtx, HttpDb),
     {{UserFromHeaders, _}, HeadersWithoutBasicAuth} =
         couch_replicator_utils:remove_basic_auth_from_headers(Headers),
@@ -142,8 +159,9 @@ get_v4_endpoint(UserCtx, #httpdb{} = HttpDb) ->
     User = pick_defined_value([UserFromUrl, UserFromHeaders]),
     OAuth = undefined, % Keep this to ensure checkpoints don't change
     {remote, User, Host, NonDefaultPort, Path, HeadersWithoutBasicAuth, OAuth};
-get_v4_endpoint(UserCtx, <<DbName/binary>>) ->
-    {local, DbName, UserCtx}.
+get_v4_endpoint(#{} = UserCtx, <<DbName/binary>>) ->
+    UserCtxRec = couch_replicator_utils:user_ctx_from_json(UserCtx),
+    {local, DbName, UserCtxRec}.
 
 
 pick_defined_value(Values) ->
diff --git a/src/couch_replicator/src/couch_replicator_notifier.erl b/src/couch_replicator/src/couch_replicator_notifier.erl
deleted file mode 100644
index f7640a3..0000000
--- a/src/couch_replicator/src/couch_replicator_notifier.erl
+++ /dev/null
@@ -1,58 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_notifier).
-
--behaviour(gen_event).
--vsn(1).
-
-% public API
--export([start_link/1, stop/1, notify/1]).
-
-% gen_event callbacks
--export([init/1, terminate/2, code_change/3]).
--export([handle_event/2, handle_call/2, handle_info/2]).
-
--include_lib("couch/include/couch_db.hrl").
-
-start_link(FunAcc) ->
-    couch_event_sup:start_link(couch_replication,
-        {couch_replicator_notifier, make_ref()}, FunAcc).
-
-notify(Event) ->
-    gen_event:notify(couch_replication, Event).
-
-stop(Pid) ->
-    couch_event_sup:stop(Pid).
-
-
-init(FunAcc) ->
-    {ok, FunAcc}.
-
-terminate(_Reason, _State) ->
-    ok.
-
-handle_event(Event, Fun) when is_function(Fun, 1) ->
-    Fun(Event),
-    {ok, Fun};
-handle_event(Event, {Fun, Acc}) when is_function(Fun, 2) ->
-    Acc2 = Fun(Event, Acc),
-    {ok, {Fun, Acc2}}.
-
-handle_call(_Msg, State) ->
-    {ok, ok, State}.
-
-handle_info(_Msg, State) ->
-    {ok, State}.
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 412ff7d..e6e8a4d 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -15,7 +15,7 @@
 -behaviour(gen_server).
 
 -export([
-   start_link/1
+   start_link/3
 ]).
 
 -export([
@@ -39,19 +39,16 @@
     to_binary/1
 ]).
 
--import(couch_replicator_utils, [
-    start_db_compaction_notifier/2,
-    stop_db_compaction_notifier/1,
-    pp_rep_id/1
-]).
-
 
 -define(LOWEST_SEQ, 0).
 -define(DEFAULT_CHECKPOINT_INTERVAL, 30000).
 -define(STARTUP_JITTER_DEFAULT, 5000).
 
 -record(rep_state, {
-    rep_details,
+    job,
+    job_data,
+    id,
+    base_id,
     source_name,
     target_name,
     source,
@@ -75,39 +72,36 @@
     workers,
     stats = couch_replicator_stats:new(),
     session_id,
-    source_db_compaction_notifier = nil,
-    target_db_compaction_notifier = nil,
-    source_monitor = nil,
-    target_monitor = nil,
     source_seq = nil,
     use_checkpoints = true,
     checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL,
     type = db,
-    view = nil
+    view = nil,
+    user = null,
+    options = #{}
 }).
 
 
-start_link(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
-    RepChildId = BaseId ++ Ext,
-    Source = couch_replicator_api_wrap:db_uri(Src),
-    Target = couch_replicator_api_wrap:db_uri(Tgt),
-    ServerName = {global, {?MODULE, Rep#rep.id}},
-
-    case gen_server:start_link(ServerName, ?MODULE, Rep, []) of
+start_link(#{] = Job, #{} = JobData) ->
+    case gen_server:start_link(?MODULE, {Job, JobData}, []) of
         {ok, Pid} ->
             {ok, Pid};
         {error, Reason} ->
-            couch_log:warning("failed to start replication `~s` (`~s` -> `~s`)",
-                           [RepChildId, Source, Target]),
+            #{<<"rep">> := Rep} = JobData,
+            {<<"id">> := Id, <<"source">> := Src, <<"target">> := Ttg} = Rep,
+            Source = couch_replicator_api_wrap:db_uri(Src),
+            Target = couch_replicator_api_wrap:db_uri(Tgt),
+            ErrMsg = "failed to start replication `~s` (`~s` -> `~s`)",
+            couch_log:warning(ErrMsg, [RepId, Source, Target]),
             {error, Reason}
     end.
 
 
-init(InitArgs) ->
-    {ok, InitArgs, 0}.
+init({#{} = Job, #{} = JobData}) ->
+    {ok, {Job, JobData}, 0}.
 
 
-do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
+do_init(#{} = Job, #{} = JobData) ->
     process_flag(trap_exit, true),
 
     timer:sleep(startup_jitter()),
@@ -119,8 +113,12 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
         target_name = TargetName,
         start_seq = {_Ts, StartSeq},
         highest_seq_done = {_, HighestSeq},
-        checkpoint_interval = CheckpointInterval
-    } = State = init_state(Rep),
+        checkpoint_interval = CheckpointInterval,
+        user = User,
+        options = Options,
+        doc_id = DocId,
+        db_name = DbName
+    } = State = init_state(Job, JobData),
 
     NumWorkers = get_value(worker_processes, Options),
     BatchSize = get_value(worker_batch_size, Options),
@@ -151,10 +149,10 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
 
     couch_task_status:add_task([
         {type, replication},
-        {user, UserCtx#user_ctx.name},
-        {replication_id, ?l2b(BaseId ++ Ext)},
-        {database, Rep#rep.db_name},
-        {doc_id, Rep#rep.doc_id},
+        {user, User},
+        {replication_id, State#rep_state.id},
+        {database, DbName},
+        {doc_id, DocId},
         {source, ?l2b(SourceName)},
         {target, ?l2b(TargetName)},
         {continuous, get_value(continuous, Options, false)},
@@ -163,16 +161,6 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
     ] ++ rep_stats(State)),
     couch_task_status:set_update_frequency(1000),
 
-    % Until OTP R14B03:
-    %
-    % Restarting a temporary supervised child implies that the original arguments
-    % (#rep{} record) specified in the MFA component of the supervisor
-    % child spec will always be used whenever the child is restarted.
-    % This implies the same replication performance tunning parameters will
-    % always be used. The solution is to delete the child spec (see
-    % cancel_replication/1) and then start the replication again, but this is
-    % unfortunately not immune to race conditions.
-
     log_replication_start(State),
     couch_log:debug("Worker pids are: ~p", [Workers]),
 
@@ -226,22 +214,6 @@ handle_call({report_seq_done, Seq, StatsInc}, From,
     update_task(NewState),
     {noreply, NewState}.
 
-handle_cast({db_compacted, DbName}, State) ->
-    #rep_state{
-        source = Source,
-        target = Target
-    } = State,
-    SourceName = couch_replicator_utils:local_db_name(Source),
-    TargetName = couch_replicator_utils:local_db_name(Target),
-    case DbName of
-        SourceName ->
-            {ok, NewSource} = couch_db:reopen(Source),
-            {noreply, State#rep_state{source = NewSource}};
-        TargetName ->
-            {ok, NewTarget} = couch_db:reopen(Target),
-            {noreply, State#rep_state{target = NewTarget}}
-    end;
-
 handle_cast(checkpoint, State) ->
     case do_checkpoint(State) of
     {ok, NewState} ->
@@ -261,14 +233,6 @@ handle_cast({report_seq, Seq},
 handle_info(shutdown, St) ->
     {stop, shutdown, St};
 
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
-    couch_log:error("Source database is down. Reason: ~p", [Why]),
-    {stop, source_db_down, St};
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
-    couch_log:error("Target database is down. Reason: ~p", [Why]),
-    {stop, target_db_down, St};
-
 handle_info({'EXIT', Pid, max_backoff}, State) ->
     couch_log:error("Max backoff reached child process ~p", [Pid]),
     {stop, {shutdown, max_backoff}, State};
@@ -327,9 +291,10 @@ handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
         {stop, {worker_died, Pid, Reason}, State2}
     end;
 
-handle_info(timeout, InitArgs) ->
-    try do_init(InitArgs) of {ok, State} ->
-        {noreply, State}
+handle_info(timeout, {#{} = Job, #{} = JobData} = InitArgs) ->
+    try do_init(Job, JobData) of
+        {ok, State} ->
+            {noreply, State}
     catch
         exit:{http_request_failed, _, _, max_backoff} ->
             {stop, {shutdown, max_backoff}, {error, InitArgs}};
@@ -344,13 +309,12 @@ handle_info(timeout, InitArgs) ->
     end.
 
 
-terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep,
-    checkpoint_history = CheckpointHistory} = State) ->
-    terminate_cleanup(State),
-    couch_replicator_notifier:notify({finished, RepId, CheckpointHistory}),
-    doc_update_completed(Rep, rep_stats(State));
+terminate(normal, #rep_state{} = State) ->
+    % Note: when terminating `normal`, the job was already marked as finished.
+    % if that fails then we'd end up in the error terminate clause
+    terminate_cleanup(State).
 
-terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
+terminate(shutdown, #rep_state{id = RepId} = State) ->
     % Replication stopped via _scheduler_sup:terminate_child/1, which can be
     % occur during regular scheduler operation or when job is removed from
     % the scheduler.
@@ -362,58 +326,60 @@ terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
             couch_log:error(LogMsg, [?MODULE, RepId, Error]),
             State
     end,
-    couch_replicator_notifier:notify({stopped, RepId, <<"stopped">>}),
+    finish_couch_job(State1, <<"stopped">>, null),
     terminate_cleanup(State1);
 
-terminate({shutdown, max_backoff}, {error, InitArgs}) ->
-    #rep{id = {BaseId, Ext} = RepId} = InitArgs,
+terminate({shutdown, max_backoff}, {error, {#{} = Job, #{} = JobData}}) ->
+    % Here we handle the case when replication fails during initialization.
+    % That is before the #rep_state{} is even built.
+    #{<<"rep">> := #{<<"id">> := RepId}} = JobData,
     couch_stats:increment_counter([couch_replicator, failed_starts]),
-    couch_log:warning("Replication `~s` reached max backoff ", [BaseId ++ Ext]),
-    couch_replicator_notifier:notify({error, RepId, max_backoff});
-
-terminate({shutdown, {error, Error}}, {error, Class, Stack, InitArgs}) ->
-    #rep{
-        id = {BaseId, Ext} = RepId,
-        source = Source0,
-        target = Target0,
-        doc_id = DocId,
-        db_name = DbName
-    } = InitArgs,
+    couch_log:warning("Replication `~s` reached max backoff ", [RepId]),
+    finish_couch_job(Job, JobData, <<"error">>, max_backoff);
+
+terminate({shutdown, {error, Error}}, {error, Class, Stack, {Job, JobData}}) ->
+    % Here we handle the case when replication fails during initialization.
+    #{<<"rep">> := Rep} = JobData,
+    #{
+       <<"id">> := Id,
+       <<"source">> := Source0,
+       <<"target">> := Target0,
+       <<"doc_id">> := DocId,
+       <<"db_name">> := DbName
+    } = Rep,
     Source = couch_replicator_api_wrap:db_uri(Source0),
     Target = couch_replicator_api_wrap:db_uri(Target0),
-    RepIdStr = BaseId ++ Ext,
     Msg = "~p:~p: Replication ~s failed to start ~p -> ~p doc ~p:~p stack:~p",
-    couch_log:error(Msg, [Class, Error, RepIdStr, Source, Target, DbName,
+    couch_log:error(Msg, [Class, Error, RepId, Source, Target, DbName,
         DocId, Stack]),
     couch_stats:increment_counter([couch_replicator, failed_starts]),
-    couch_replicator_notifier:notify({error, RepId, Error});
+    finish_couch_job(Job, JobData, <<"error">>, Error);
 
-terminate({shutdown, max_backoff}, State) ->
+terminate({shutdown, max_backoff}, #rep_state{} = State) ->
     #rep_state{
+        id = RepId,
         source_name = Source,
         target_name = Target,
-        rep_details = #rep{id = {BaseId, Ext} = RepId}
     } = State,
     couch_log:error("Replication `~s` (`~s` -> `~s`) reached max backoff",
-        [BaseId ++ Ext, Source, Target]),
+        [RepId, Source, Target]),
     terminate_cleanup(State),
-    couch_replicator_notifier:notify({error, RepId, max_backoff});
+    finish_couch_job(State, <<"error">>, max_backoff);
 
 terminate(Reason, State) ->
-#rep_state{
+    #rep_state{
+        id = RepId,
         source_name = Source,
         target_name = Target,
-        rep_details = #rep{id = {BaseId, Ext} = RepId}
     } = State,
     couch_log:error("Replication `~s` (`~s` -> `~s`) failed: ~s",
-        [BaseId ++ Ext, Source, Target, to_binary(Reason)]),
+        [RepId, Source, Target, to_binary(Reason)]),
     terminate_cleanup(State),
-    couch_replicator_notifier:notify({error, RepId, Reason}).
+    finish_couch_job(State, <<"error">>, Reason).
+
 
 terminate_cleanup(State) ->
     update_task(State),
-    stop_db_compaction_notifier(State#rep_state.source_db_compaction_notifier),
-    stop_db_compaction_notifier(State#rep_state.target_db_compaction_notifier),
     couch_replicator_api_wrap:db_close(State#rep_state.source),
     couch_replicator_api_wrap:db_close(State#rep_state.target).
 
@@ -424,22 +390,19 @@ code_change(_OldVsn, #rep_state{}=State, _Extra) ->
 
 format_status(_Opt, [_PDict, State]) ->
     #rep_state{
+       id = Id,
        source = Source,
        target = Target,
-       rep_details = RepDetails,
        start_seq = StartSeq,
        source_seq = SourceSeq,
        committed_seq = CommitedSeq,
        current_through_seq = ThroughSeq,
        highest_seq_done = HighestSeqDone,
-       session_id = SessionId
-    } = state_strip_creds(State),
-    #rep{
-       id = RepId,
-       options = Options,
+       session_id = SessionId,
        doc_id = DocId,
-       db_name = DbName
-    } = RepDetails,
+       db_name = DbName,
+       options = Options
+    } = state_strip_creds(State),
     [
         {rep_id, RepId},
         {source, couch_replicator_api_wrap:db_uri(Source)},
@@ -483,73 +446,108 @@ httpdb_strip_creds(LocalDb) ->
     LocalDb.
 
 
-rep_strip_creds(#rep{source = Source, target = Target} = Rep) ->
-    Rep#rep{
-        source = httpdb_strip_creds(Source),
-        target = httpdb_strip_creds(Target)
-    }.
-
-
-state_strip_creds(#rep_state{rep_details = Rep, source = Source, target = Target} = State) ->
-    % #rep_state contains the source and target at the top level and also
-    % in the nested #rep_details record
+state_strip_creds(#rep_state{source = Source, target = Target} = State) ->
     State#rep_state{
-        rep_details = rep_strip_creds(Rep),
         source = httpdb_strip_creds(Source),
         target = httpdb_strip_creds(Target)
     }.
 
 
-adjust_maxconn(Src = #httpdb{http_connections = 1}, RepId) ->
+adjust_maxconn(Src = #{<<"http_connections">> : = 1}, RepId) ->
     Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p",
     couch_log:notice(Msg, [RepId]),
-    Src#httpdb{http_connections = 2};
+    Src#{<<"http_connections">> := 2};
 adjust_maxconn(Src, _RepId) ->
     Src.
 
 
--spec doc_update_triggered(#rep{}) -> ok.
-doc_update_triggered(#rep{db_name = null}) ->
+-spec doc_update_triggered(#rep_state{}) -> ok.
+doc_update_triggered(#rep_state{db_name = null}) ->
     ok;
-doc_update_triggered(#rep{id = RepId, doc_id = DocId} = Rep) ->
+doc_update_triggered(#rep_state{} = State) ->
+    #rep_state{id = Id, doc_id = DocId, db_name = DbName} = State,
     case couch_replicator_doc_processor:update_docs() of
         true ->
-            couch_replicator_docs:update_triggered(Rep, RepId);
+            couch_replicator_docs:update_triggered(Id, DocId, DbName);
         false ->
             ok
     end,
-    couch_log:notice("Document `~s` triggered replication `~s`",
-        [DocId, pp_rep_id(RepId)]),
+    couch_log:notice("Document `~s` triggered replication `~s`", [DocId, Id]),
     ok.
 
 
--spec doc_update_completed(#rep{}, list()) -> ok.
-doc_update_completed(#rep{db_name = null}, _Stats) ->
+-spec doc_update_completed(#rep_state{}) -> ok.
+doc_update_completed(#rep_state{db_name = null}) ->
     ok;
-doc_update_completed(#rep{id = RepId, doc_id = DocId, db_name = DbName,
-    start_time = StartTime}, Stats0) ->
-    Stats = Stats0 ++ [{start_time, couch_replicator_utils:iso8601(StartTime)}],
+doc_update_completed(#rep_state{} = State) ->
+    #rep_state{
+        id = Id,
+        doc_id = DocId,
+        db_name = DbName,
+        start_time = Start,
+        stats = Stats0
+    } = State,
+    Stats = Stats0 ++ [{start_time, couch_replicator_utils:iso8601(Start)}],
     couch_replicator_docs:update_doc_completed(DbName, DocId, Stats),
-    couch_log:notice("Replication `~s` completed (triggered by `~s`)",
-        [pp_rep_id(RepId), DocId]),
+    couch_log:notice("Replication `~s` completed (triggered by `~s:~s`)",
+        [Id, DbName, DocId]),
     ok.
 
 
 do_last_checkpoint(#rep_state{seqs_in_progress = [],
     highest_seq_done = {_Ts, ?LOWEST_SEQ}} = State) ->
-    {stop, normal, cancel_timer(State)};
+    History = State#rep_state.checkoint_history,
+    Result = case finish_couch_job(State, <<"completed">>, History) of
+        ok -> normal;
+        {error, _} = Error -> Error
+    end,
+    {stop, Result, cancel_timer(State)};
 do_last_checkpoint(#rep_state{seqs_in_progress = [],
     highest_seq_done = Seq} = State) ->
     case do_checkpoint(State#rep_state{current_through_seq = Seq}) of
     {ok, NewState} ->
         couch_stats:increment_counter([couch_replicator, checkpoints, success]),
-        {stop, normal, cancel_timer(NewState)};
+        History = NewState#rep_state.checkpoint_history,
+        Result = case finish_couch_job(NewState, <<"completed">>, History) of
+            ok -> normal;
+            {error, _} = Error -> Error
+        end,
+        {stop, Result, cancel_timer(NewState)};
     Error ->
         couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
         {stop, Error, State}
     end.
 
 
+finish_couch_job(#rep_state{} = State, FinishedState, Result) ->
+    #rep_state{job = Job, job_data = Jobdata} = State,
+    finish_couch_job(Job, JobData, FinishedState, Result).
+
+
+finish_couch_job(#{} = Job, #{} = JobData, FinishState, Result0) ->
+    #{<<"rep">> := #{<<"id">> := RepId}} = JobData,
+    case Result of
+        null -> null;
+        #{} -> Result0;
+        <<_/binary>> -> Result0;
+        Atom when is_atom(Atom) -> atom_to_binary(Atom, utf8)
+        Other -> couch_replicator_utils:rep_error_to_binary(Result0)
+    end,
+    JobData= JobData0#{
+        <<"finished_state">> => FinishState,
+        <<"finished_result">> => Result
+    },
+    case couch_jobs:finish(undefined, Job, JobData) of
+        ok ->
+            doc_update_completed(State),
+            ok;
+        {error, Error} ->
+            Msg = "Replication ~s job could not finish. Error:~p",
+            couch_log:error(Msg, [RepId, Error]),
+            {error, Error}
+    end.
+
+
 start_timer(State) ->
     After = State#rep_state.checkpoint_interval,
     case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of
@@ -568,21 +566,35 @@ cancel_timer(#rep_state{timer = Timer} = State) ->
     State#rep_state{timer = nil}.
 
 
-init_state(Rep) ->
-    #rep{
-        id = {BaseId, _Ext},
-        source = Src0, target = Tgt,
-        options = Options, user_ctx = UserCtx,
-        type = Type, view = View,
-        start_time = StartTime,
-        stats = Stats
+init_state(#{} = Job, #{<<"rep">> =: Rep}} = JobData) ->
+    #{
+        <<"id">> := Id,
+        <<"base_id">> := BaseId,
+        <<"source">> := Src0,
+        <<"target">> := Tgt,
+        <<"type">> := Type,
+        <<"view">> := View,
+        <<"start_time">> := StartTime,
+        <<"stats">> := Stats,
+        <<"options">> := OptionsMap,
+        <<"user_ctx">> := UserCtx,
+        <<"db_name">> := DbName,
+        <<"doc_id">> := DocId,
     } = Rep,
+
+    Options = maps:fold(fun(K, V, Acc) ->
+        [{binary_to_atom(K, utf8), V} | Acc]
+    end, [], OptionsMap),
+
     % Adjust minimum number of http source connections to 2 to avoid deadlock
     Src = adjust_maxconn(Src0, BaseId),
-    {ok, Source} = couch_replicator_api_wrap:db_open(Src, [{user_ctx, UserCtx}]),
-    {CreateTargetParams} = get_value(create_target_params, Options, {[]}),
-    {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}],
-        get_value(create_target, Options, false), CreateTargetParams),
+
+    {ok, Source} = couch_replicator_api_wrap:db_open(Src, UserCtx),
+
+    CreateTgt = get_value(create_target, Options, false),
+    CreateParams = maps:to_list(get_value(create_target_params, Options, #{}),
+    {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, UserCtx, CreateTgt,
+        CreateParams),
 
     {ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source),
     {ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target),
@@ -597,7 +609,10 @@ init_state(Rep) ->
 
     #doc{body={CheckpointHistory}} = SourceLog,
     State = #rep_state{
-        rep_details = Rep,
+        job = Job,
+        job_data = JobData,
+        id = Id,
+        base_id = BaseId,
         source_name = couch_replicator_api_wrap:db_uri(Source),
         target_name = couch_replicator_api_wrap:db_uri(Target),
         source = Source,
@@ -613,32 +628,27 @@ init_state(Rep) ->
         src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
         tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
         session_id = couch_uuids:random(),
-        source_db_compaction_notifier =
-            start_db_compaction_notifier(Source, self()),
-        target_db_compaction_notifier =
-            start_db_compaction_notifier(Target, self()),
-        source_monitor = db_monitor(Source),
-        target_monitor = db_monitor(Target),
         source_seq = SourceSeq,
-        use_checkpoints = get_value(use_checkpoints, Options, true),
-        checkpoint_interval = get_value(checkpoint_interval, Options,
-                                        ?DEFAULT_CHECKPOINT_INTERVAL),
+        use_checkpoints = get_value(use_checkpoints, Options),
+        checkpoint_interval = get_value(checkpoint_interval, Options),
         type = Type,
         view = View,
         stats = Stats
+        doc_id = DocId,
+        db_name = DbName
     },
     State#rep_state{timer = start_timer(State)}.
 
 
-find_and_migrate_logs(DbList, #rep{id = {BaseId, _}} = Rep) ->
+find_and_migrate_logs(DbList, #{<<"base_id">> := BaseId} = Rep) ->
     LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId),
-    fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, Rep, []).
+    fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, State, []).
 
 
 fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) ->
     lists:reverse(Acc);
 
-fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
+fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, #{} = Rep, Acc) ->
     case couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body]) of
     {error, <<"not_found">>} when Vsn > 1 ->
         OldRepId = couch_replicator_utils:replication_id(Rep, Vsn - 1),
@@ -658,8 +668,8 @@ fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
     end.
 
 
-maybe_save_migrated_log(Rep, Db, #doc{} = Doc, OldId) ->
-    case get_value(use_checkpoints, Rep#rep.options, true) of
+maybe_save_migrated_log(#{<<"options">> = Options}, Db, #doc{} = Doc, OldId) ->
+    case maps:get(<<"use_checkpoints">>, Options) of
         true ->
             update_checkpoint(Db, Doc),
             Msg = "Migrated replication checkpoint. Db:~p ~p -> ~p",
@@ -722,7 +732,7 @@ do_checkpoint(State) ->
         src_starttime = SrcInstanceStartTime,
         tgt_starttime = TgtInstanceStartTime,
         stats = Stats,
-        rep_details = #rep{options = Options},
+        options = Options,
         session_id = SessionId
     } = State,
     case commit_to_both(Source, Target) of
@@ -931,14 +941,13 @@ has_session_id(SessionId, [{Props} | Rest]) ->
 
 
 db_monitor(#httpdb{}) ->
-	nil;
+    nil;
 db_monitor(Db) ->
-	couch_db:monitor(Db).
+    couch_db:monitor(Db).
 
 
-get_pending_count(St) ->
-    Rep = St#rep_state.rep_details,
-    Timeout = get_value(connection_timeout, Rep#rep.options),
+get_pending_count(#rep_state{options = Options} = St) ->
+    Timeout = get_value(connection_timeout, Options),
     TimeoutMicro = Timeout * 1000,
     case get(pending_count_state) of
         {LastUpdate, PendingCount} ->
@@ -985,8 +994,7 @@ update_task(State) ->
     ]).
 
 
-update_scheduler_job_stats(#rep_state{rep_details = Rep, stats = Stats}) ->
-    JobId = Rep#rep.id,
+update_scheduler_job_stats(#rep_state{id = JobId, stats = Stats}) ->
     couch_replicator_scheduler:update_job_stats(JobId, Stats).
 
 
@@ -1023,24 +1031,21 @@ replication_start_error(Error) ->
     Error.
 
 
-log_replication_start(#rep_state{rep_details = Rep} = RepState) ->
-    #rep{
-       id = {BaseId, Ext},
-       doc_id = DocId,
-       db_name = DbName,
-       options = Options
-    } = Rep,
-    Id = BaseId ++ Ext,
-    Workers = get_value(worker_processes, Options),
-    BatchSize = get_value(worker_batch_size, Options),
+log_replication_start(#rep_state{} = RepState) ->
     #rep_state{
-       source_name = Source,  % credentials already stripped
-       target_name = Target,  % credentials already stripped
-       session_id = Sid
+        id = Id,
+        doc_id = DocId,
+        db_name = DbName,
+        options = Options,
+        source_name = Source,
+        target_name = Target,
+        session_id = Sid,
     } = RepState,
+    Workers = get_value(worker_processes, Options),
+    BatchSize = get_value(worker_batch_size, Options),
     From = case DbName of
-        ShardName when is_binary(ShardName) ->
-            io_lib:format("from doc ~s:~s", [mem3:dbname(ShardName), DocId]);
+        Name when is_binary(Name) ->
+            io_lib:format("from doc ~s:~s", [Name, DocId]);
         _ ->
             "from _replicate endpoint"
     end,
@@ -1080,7 +1085,6 @@ scheduler_job_format_status_test() ->
         db_name = <<"mydb">>
     },
     State = #rep_state{
-        rep_details = Rep,
         source = Rep#rep.source,
         target = Rep#rep.target,
         session_id = <<"a">>,
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_sup.erl b/src/couch_replicator/src/couch_replicator_scheduler_sup.erl
index 8ab55f8..3ea9dff 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_sup.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_sup.erl
@@ -17,7 +17,7 @@
 %% public api
 -export([
     start_link/0,
-    start_child/1,
+    start_child/2,
     terminate_child/1
 ]).
 
@@ -37,8 +37,8 @@ start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 
-start_child(#rep{} = Rep) ->
-    supervisor:start_child(?MODULE, [Rep]).
+start_child(#{} = Job, #{} = Rep) ->
+    supervisor:start_child(?MODULE, [Job, Rep]).
 
 
 terminate_child(Pid) ->
diff --git a/src/couch_replicator/src/couch_replicator_sup.erl b/src/couch_replicator/src/couch_replicator_sup.erl
index 8202b1c..b86529f 100644
--- a/src/couch_replicator/src/couch_replicator_sup.erl
+++ b/src/couch_replicator/src/couch_replicator_sup.erl
@@ -20,18 +20,6 @@ start_link() ->
 
 init(_Args) ->
     Children = [
-        {couch_replication_event,
-            {gen_event, start_link, [{local, couch_replication}]},
-            permanent,
-            brutal_kill,
-            worker,
-            dynamic},
-       {couch_replicator_clustering,
-            {couch_replicator_clustering, start_link, []},
-            permanent,
-            brutal_kill,
-            worker,
-            [couch_replicator_clustering]},
        {couch_replicator_connection,
             {couch_replicator_connection, start_link, []},
             permanent,
diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl
index b0d7069..ff61fcf 100644
--- a/src/couch_replicator/src/couch_replicator_utils.erl
+++ b/src/couch_replicator/src/couch_replicator_utils.erl
@@ -25,11 +25,12 @@
    rep_error_to_binary/1,
    get_json_value/2,
    get_json_value/3,
-   pp_rep_id/1,
    iso8601/1,
    filter_state/3,
    remove_basic_auth_from_headers/1,
-   normalize_rep/1
+   normalize_rep/1,
+   user_ctx_from_json/1,
+   user_ctx_to_json/1
 ]).
 
 -export([
@@ -126,14 +127,6 @@ get_json_value(Key, Props, Default) when is_binary(Key) ->
     end.
 
 
-% pretty-print replication id
--spec pp_rep_id(#rep{} | rep_id()) -> string().
-pp_rep_id(#rep{id = RepId}) ->
-    pp_rep_id(RepId);
-pp_rep_id({Base, Extension}) ->
-    Base ++ Extension.
-
-
 % NV: TODO: this function is not used outside api wrap module
 % consider moving it there during final cleanup
 is_deleted(Change) ->
@@ -154,8 +147,13 @@ parse_rep_doc(Props, UserCtx) ->
     couch_replicator_docs:parse_rep_doc(Props, UserCtx).
 
 
--spec iso8601(erlang:timestamp()) -> binary().
-iso8601({_Mega, _Sec, _Micro} = Timestamp) ->
+-spec iso8601(integer()) -> binary().
+iso8601(Native) when is_integer(Native) ->
+    ErlangSystemTime = erlang:convert_time_unit(Native, native, microsecond),
+    MegaSecs = ErlangSystemTime div 1000000000000,
+    Secs = ErlangSystemTime div 1000000 - MegaSecs * 1000000,
+    MicroSecs = ErlangSystemTime rem 1000000,
+    {MegaSecs, Secs, MicroSecs}.
     {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time(Timestamp),
     Format = "~B-~2..0B-~2..0BT~2..0B:~2..0B:~2..0BZ",
     iolist_to_binary(io_lib:format(Format, [Y, Mon, D, H, Min, S])).
@@ -209,25 +207,53 @@ decode_basic_creds(Base64) ->
     end.
 
 
-% Normalize a #rep{} record such that it doesn't contain time dependent fields
+% Normalize a rep map such that it doesn't contain time dependent fields
 % pids (like httpc pools), and options / props are sorted. This function would
 % used during comparisons.
--spec normalize_rep(#rep{} | nil) -> #rep{} | nil.
-normalize_rep(nil) ->
-    nil;
+-spec normalize_rep(#{} | null) -> #{} | null.
+normalize_rep(null) ->
+    null;
+
+normalize_rep(#{} = Rep)->
+    Ks = [<<"options">>, <<"type">>, <<"view">>, <<"doc_id">>, <<"db_name">>],
+    Rep1 = maps:with(Ks, Rep),
+    #{<<"source">> := Source, <<"target">> := Target} = Rep,
+    Rep1#{
+        <<"source">> => normalize_endpoint(Source),
+        <<"target">> => normalize_endpoint(Target)
+    }.
+
+
+normalize_endpoint(<<DbName/binary>>) ->
+    DbName;
+
+normalize_endpoint(#{} = Endpoint) ->
+    Ks = [<<"url">>,<<"auth_props">>, <<"headers">>, <<"timeout">>,
+        <<"ibrowse_options">>, <<"retries">>, <<"http_connections">>
+    ],
+    maps:with(Ks, Endpoint).
 
-normalize_rep(#rep{} = Rep)->
-    #rep{
-        source = couch_replicator_api_wrap:normalize_db(Rep#rep.source),
-        target = couch_replicator_api_wrap:normalize_db(Rep#rep.target),
-        options = Rep#rep.options,  % already sorted in make_options/1
-        type = Rep#rep.type,
-        view = Rep#rep.view,
-        doc_id = Rep#rep.doc_id,
-        db_name = Rep#rep.db_name
+
+user_ctx_to_json(#user_ctx{name = Name, roles = Roles0} = UserCtx) ->
+    {AtomRoles0, Roles} = lists:partition(fun erlang:is_atom/1, Roles0),
+    AtomRoles = lists:map(fun(R) -> atom_to_binary(V, utf8) end, AtomRoles0),
+    UserCtxMap = #{
+        <<"name">> => Name,
+        <<"roles">> => Roles,
+        <<"atom_roles">> => AtomRoles
     }.
 
 
+user_ctx_from_json(#{} = UserCtxMap) ->
+    #{
+        <<"name">> := Name,
+        <<"roles">> := Roles
+        <<"atom_roles">> := AtomRoles0
+    },
+    AtomRoles = lists:map(fun(R) -> binary_to_atom(V, utf8) end, AtomRoles0),
+    #user_ctx{name = Name, roles = lists:sort(Roles ++ AtomRoles)}.
+
+
 -ifdef(TEST).
 
 -include_lib("eunit/include/eunit.hrl").
@@ -306,4 +332,23 @@ normalize_rep_test_() ->
         end)
     }.
 
+
+normalize_endpoint() ->
+    HttpDb =  #httpdb{
+        url = "http://host/db",
+        auth_props = [{"key", "val"}],
+        headers = [{"k2","v2"}, {"k1","v1"}],
+        timeout = 30000,
+        ibrowse_options = [{k2, v2}, {k1, v1}],
+        retries = 10,
+        http_connections = 20
+    },
+    Expected = HttpDb#httpdb{
+        headers = [{"k1","v1"}, {"k2","v2"}],
+        ibrowse_options = [{k1, v1}, {k2, v2}]
+    },
+    ?assertEqual(Expected, normalize_db(HttpDb)),
+    ?assertEqual(<<"local">>, normalize_db(<<"local">>)).
+
+
 -endif.
diff --git a/src/couch_replicator/test/couch_replicator_proxy_tests.erl b/src/couch_replicator/test/couch_replicator_proxy_tests.erl
index 4f545bc..5fb922a 100644
--- a/src/couch_replicator/test/couch_replicator_proxy_tests.erl
+++ b/src/couch_replicator/test/couch_replicator_proxy_tests.erl
@@ -49,7 +49,7 @@ parse_rep_doc_without_proxy(_) ->
             {<<"source">>, <<"http://unproxied.com">>},
             {<<"target">>, <<"http://otherunproxied.com">>}
         ]},
-        Rep = couch_replicator_docs:parse_rep_doc(NoProxyDoc),
+        Rep = couch_replicator_docs:parse_rep_doc_without_id(NoProxyDoc),
         ?assertEqual((Rep#rep.source)#httpdb.proxy_url, undefined),
         ?assertEqual((Rep#rep.target)#httpdb.proxy_url, undefined)
     end).
@@ -63,7 +63,7 @@ parse_rep_doc_with_proxy(_) ->
             {<<"target">>, <<"http://otherunproxied.com">>},
             {<<"proxy">>, ProxyURL}
         ]},
-        Rep = couch_replicator_docs:parse_rep_doc(ProxyDoc),
+        Rep = couch_replicator_docs:parse_rep_doc_without_id(ProxyDoc),
         ?assertEqual((Rep#rep.source)#httpdb.proxy_url, binary_to_list(ProxyURL)),
         ?assertEqual((Rep#rep.target)#httpdb.proxy_url, binary_to_list(ProxyURL))
     end).