You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2014/02/05 00:44:33 UTC

[20/44] Remove src/couch_replicator

http://git-wip-us.apache.org/repos/asf/couchdb/blob/550e8202/src/couch_replicator/src/couch_replicator_httpc.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_httpc.erl b/src/couch_replicator/src/couch_replicator_httpc.erl
deleted file mode 100644
index 10409c4..0000000
--- a/src/couch_replicator/src/couch_replicator_httpc.erl
+++ /dev/null
@@ -1,297 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_httpc).
-
--include_lib("couch/include/couch_db.hrl").
--include_lib("ibrowse/include/ibrowse.hrl").
--include("couch_replicator_api_wrap.hrl").
-
--export([setup/1]).
--export([send_req/3]).
--export([full_url/2]).
-
--import(couch_util, [
-    get_value/2,
-    get_value/3
-]).
-
--define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
--define(MAX_WAIT, 5 * 60 * 1000).
-
-
-setup(#httpdb{httpc_pool = nil, url = Url, http_connections = MaxConns} = Db) ->
-    {ok, Pid} = couch_replicator_httpc_pool:start_link(Url, [{max_connections, MaxConns}]),
-    {ok, Db#httpdb{httpc_pool = Pid}}.
-
-
-send_req(HttpDb, Params1, Callback) ->
-    Params2 = ?replace(Params1, qs,
-        [{K, ?b2l(iolist_to_binary(V))} || {K, V} <- get_value(qs, Params1, [])]),
-    Params = ?replace(Params2, ibrowse_options,
-        lists:keysort(1, get_value(ibrowse_options, Params2, []))),
-    {Worker, Response} = send_ibrowse_req(HttpDb, Params),
-    process_response(Response, Worker, HttpDb, Params, Callback).
-
-
-send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb, Params) ->
-    Method = get_value(method, Params, get),
-    UserHeaders = lists:keysort(1, get_value(headers, Params, [])),
-    Headers1 = lists:ukeymerge(1, UserHeaders, BaseHeaders),
-    Headers2 = oauth_header(HttpDb, Params) ++ Headers1,
-    Url = full_url(HttpDb, Params),
-    Body = get_value(body, Params, []),
-    case get_value(path, Params) of
-    "_changes" ->
-        {ok, Worker} = ibrowse:spawn_link_worker_process(Url);
-    _ ->
-        {ok, Worker} = couch_replicator_httpc_pool:get_worker(HttpDb#httpdb.httpc_pool)
-    end,
-    IbrowseOptions = [
-        {response_format, binary}, {inactivity_timeout, HttpDb#httpdb.timeout} |
-        lists:ukeymerge(1, get_value(ibrowse_options, Params, []),
-            HttpDb#httpdb.ibrowse_options)
-    ],
-    Response = ibrowse:send_req_direct(
-        Worker, Url, Headers2, Method, Body, IbrowseOptions, infinity),
-    {Worker, Response}.
-
-
-process_response({error, sel_conn_closed}, _Worker, HttpDb, Params, Callback) ->
-    send_req(HttpDb, Params, Callback);
-
-process_response({error, {'EXIT', {normal, _}}}, _Worker, HttpDb, Params, Cb) ->
-    % ibrowse worker terminated because remote peer closed the socket
-    % -> not an error
-    send_req(HttpDb, Params, Cb);
-
-process_response({ibrowse_req_id, ReqId}, Worker, HttpDb, Params, Callback) ->
-    process_stream_response(ReqId, Worker, HttpDb, Params, Callback);
-
-process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
-    release_worker(Worker, HttpDb),
-    case list_to_integer(Code) of
-    Ok when (Ok >= 200 andalso Ok < 300) ; (Ok >= 400 andalso Ok < 500) ->
-        EJson = case Body of
-        <<>> ->
-            null;
-        Json ->
-            ?JSON_DECODE(Json)
-        end,
-        Callback(Ok, Headers, EJson);
-    R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
-        do_redirect(Worker, R, Headers, HttpDb, Params, Callback);
-    Error ->
-        maybe_retry({code, Error}, Worker, HttpDb, Params, Callback)
-    end;
-
-process_response(Error, Worker, HttpDb, Params, Callback) ->
-    maybe_retry(Error, Worker, HttpDb, Params, Callback).
-
-
-process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
-    receive
-    {ibrowse_async_headers, ReqId, Code, Headers} ->
-        case list_to_integer(Code) of
-        Ok when (Ok >= 200 andalso Ok < 300) ; (Ok >= 400 andalso Ok < 500) ->
-            StreamDataFun = fun() ->
-                stream_data_self(HttpDb, Params, Worker, ReqId, Callback)
-            end,
-            ibrowse:stream_next(ReqId),
-            try
-                Ret = Callback(Ok, Headers, StreamDataFun),
-                release_worker(Worker, HttpDb),
-                clean_mailbox_req(ReqId),
-                Ret
-            catch throw:{maybe_retry_req, Err} ->
-                clean_mailbox_req(ReqId),
-                maybe_retry(Err, Worker, HttpDb, Params, Callback)
-            end;
-        R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
-            do_redirect(Worker, R, Headers, HttpDb, Params, Callback);
-        Error ->
-            report_error(Worker, HttpDb, Params, {code, Error})
-        end;
-    {ibrowse_async_response, ReqId, {error, _} = Error} ->
-        maybe_retry(Error, Worker, HttpDb, Params, Callback)
-    after HttpDb#httpdb.timeout + 500 ->
-        % Note: ibrowse should always reply with timeouts, but this doesn't
-        % seem to be always true when there's a very high rate of requests
-        % and many open connections.
-        maybe_retry(timeout, Worker, HttpDb, Params, Callback)
-    end.
-
-
-clean_mailbox_req(ReqId) ->
-    receive
-    {ibrowse_async_response, ReqId, _} ->
-        clean_mailbox_req(ReqId);
-    {ibrowse_async_response_end, ReqId} ->
-        clean_mailbox_req(ReqId)
-    after 0 ->
-        ok
-    end.
-
-
-release_worker(Worker, #httpdb{httpc_pool = Pool}) ->
-    ok = couch_replicator_httpc_pool:release_worker(Pool, Worker).
-
-
-maybe_retry(Error, Worker, #httpdb{retries = 0} = HttpDb, Params, _Cb) ->
-    report_error(Worker, HttpDb, Params, {error, Error});
-
-maybe_retry(Error, Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb,
-    Params, Cb) ->
-    release_worker(Worker, HttpDb),
-    Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
-    Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
-    twig:log(notice,"Retrying ~s request to ~s in ~p seconds due to error ~s",
-        [Method, Url, Wait / 1000, error_cause(Error)]),
-    ok = timer:sleep(Wait),
-    Wait2 = erlang:min(Wait * 2, ?MAX_WAIT),
-    send_req(HttpDb#httpdb{retries = Retries - 1, wait = Wait2}, Params, Cb).
-
-
-report_error(Worker, HttpDb, Params, Error) ->
-    Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
-    Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
-    do_report_error(Url, Method, Error),
-    release_worker(Worker, HttpDb),
-    exit({http_request_failed, Method, Url, Error}).
-
-
-do_report_error(Url, Method, {code, Code}) ->
-    twig:log(error,"Replicator, request ~s to ~p failed. The received "
-        "HTTP error code is ~p", [Method, Url, Code]);
-
-do_report_error(FullUrl, Method, Error) ->
-    twig:log(error,"Replicator, request ~s to ~p failed due to error ~s",
-        [Method, FullUrl, error_cause(Error)]).
-
-
-error_cause({error, Cause}) ->
-    lists:flatten(io_lib:format("~p", [Cause]));
-error_cause(Cause) ->
-    lists:flatten(io_lib:format("~p", [Cause])).
-
-
-stream_data_self(#httpdb{timeout = T} = HttpDb, Params, Worker, ReqId, Cb) ->
-    case accumulate_messages(ReqId, [], T + 500) of
-    {Data, ibrowse_async_response} ->
-        ibrowse:stream_next(ReqId),
-        {Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId, Cb) end};
-    {Data, ibrowse_async_response_end} ->
-        {Data, fun() -> throw({maybe_retry_req, more_data_expected}) end}
-    end.
-
-accumulate_messages(ReqId, Acc, Timeout) ->
-    receive
-    {ibrowse_async_response, ReqId, {error, Error}} ->
-        throw({maybe_retry_req, Error});
-    {ibrowse_async_response, ReqId, <<>>} ->
-        accumulate_messages(ReqId, Acc, Timeout);
-    {ibrowse_async_response, ReqId, Data} ->
-        accumulate_messages(ReqId, [Data | Acc], 0);
-    {ibrowse_async_response_end, ReqId} ->
-        {iolist_to_binary(lists:reverse(Acc)), ibrowse_async_response_end}
-    after Timeout ->
-        % Note: ibrowse should always reply with timeouts, but this doesn't
-        % seem to be always true when there's a very high rate of requests
-        % and many open connections.
-        if Acc =:= [] ->
-            throw({maybe_retry_req, timeout});
-        true ->
-            {iolist_to_binary(lists:reverse(Acc)), ibrowse_async_response}
-        end
-    end.
-
-
-full_url(#httpdb{url = BaseUrl}, Params) ->
-    Path = get_value(path, Params, []),
-    QueryArgs = get_value(qs, Params, []),
-    BaseUrl ++ Path ++ query_args_to_string(QueryArgs, []).
-
-
-query_args_to_string([], []) ->
-    "";
-query_args_to_string([], Acc) ->
-    "?" ++ string:join(lists:reverse(Acc), "&");
-query_args_to_string([{K, V} | Rest], Acc) ->
-    query_args_to_string(Rest, [K ++ "=" ++ couch_httpd:quote(V) | Acc]).
-
-
-oauth_header(#httpdb{oauth = nil}, _ConnParams) ->
-    [];
-oauth_header(#httpdb{url = BaseUrl, oauth = OAuth}, ConnParams) ->
-    Consumer = {
-        OAuth#oauth.consumer_key,
-        OAuth#oauth.consumer_secret,
-        OAuth#oauth.signature_method
-    },
-    Method = case get_value(method, ConnParams, get) of
-    get -> "GET";
-    post -> "POST";
-    put -> "PUT";
-    head -> "HEAD"
-    end,
-    QSL = get_value(qs, ConnParams, []),
-    OAuthParams = oauth:sign(Method,
-        BaseUrl ++ get_value(path, ConnParams, []),
-        QSL, Consumer, OAuth#oauth.token, OAuth#oauth.token_secret) -- QSL,
-    [{"Authorization",
-        "OAuth " ++ oauth:header_params_encode(OAuthParams)}].
-
-
-do_redirect(Worker, Code, Headers, #httpdb{url = Url} = HttpDb, Params, Cb) ->
-    release_worker(Worker, HttpDb),
-    RedirectUrl = redirect_url(Headers, Url),
-    {HttpDb2, Params2} = after_redirect(RedirectUrl, Code, HttpDb, Params),
-    send_req(HttpDb2, Params2, Cb).
-
-
-redirect_url(RespHeaders, OrigUrl) ->
-    MochiHeaders = mochiweb_headers:make(RespHeaders),
-    RedUrl = mochiweb_headers:get_value("Location", MochiHeaders),
-    #url{
-        host = Host,
-        host_type = HostType,
-        port = Port,
-        path = Path,  % includes query string
-        protocol = Proto
-    } = ibrowse_lib:parse_url(RedUrl),
-    #url{
-        username = User,
-        password = Passwd
-    } = ibrowse_lib:parse_url(OrigUrl),
-    Creds = case is_list(User) andalso is_list(Passwd) of
-    true ->
-        User ++ ":" ++ Passwd ++ "@";
-    false ->
-        []
-    end,
-    HostPart = case HostType of
-    ipv6_address ->
-        "[" ++ Host ++ "]";
-    _ ->
-        Host
-    end,
-    atom_to_list(Proto) ++ "://" ++ Creds ++ HostPart ++ ":" ++
-        integer_to_list(Port) ++ Path.
-
-after_redirect(RedirectUrl, 303, HttpDb, Params) ->
-    after_redirect(RedirectUrl, HttpDb, ?replace(Params, method, get));
-after_redirect(RedirectUrl, _Code, HttpDb, Params) ->
-    after_redirect(RedirectUrl, HttpDb, Params).
-
-after_redirect(RedirectUrl, HttpDb, Params) ->
-    Params2 = lists:keydelete(path, 1, lists:keydelete(qs, 1, Params)),
-    {HttpDb#httpdb{url = RedirectUrl}, Params2}.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/550e8202/src/couch_replicator/src/couch_replicator_httpc_pool.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_httpc_pool.erl b/src/couch_replicator/src/couch_replicator_httpc_pool.erl
deleted file mode 100644
index 0a42284..0000000
--- a/src/couch_replicator/src/couch_replicator_httpc_pool.erl
+++ /dev/null
@@ -1,194 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_httpc_pool).
--behaviour(gen_server).
-
-% public API
--export([start_link/2, stop/1]).
--export([get_worker/1, release_worker/2]).
-
-% gen_server API
--export([init/1, handle_call/3, handle_info/2, handle_cast/2]).
--export([code_change/3, terminate/2]).
-
--include_lib("couch/include/couch_db.hrl").
-
--import(couch_util, [
-    get_value/2
-]).
-
--record(state, {
-    url,
-    limit,                  % max # of workers allowed
-    free = [],              % free workers (connections)
-    busy = [],              % busy workers (connections)
-    waiting = queue:new(),  % blocked clients waiting for a worker
-    callers = []            % clients who've been given a worker
-}).
-
-
-start_link(Url, Options) ->
-    gen_server:start_link(?MODULE, {Url, Options}, []).
-
-stop(Pool) ->
-    ok = gen_server:call(Pool, stop, infinity).
-
-
-get_worker(Pool) ->
-    {ok, _Worker} = gen_server:call(Pool, get_worker, infinity).
-
-
-release_worker(Pool, Worker) ->
-    ok = gen_server:cast(Pool, {release_worker, Worker}).
-
-
-init({Url, Options}) ->
-    process_flag(trap_exit, true),
-    State = #state{
-        url = Url,
-        limit = get_value(max_connections, Options)
-    },
-    {ok, State}.
-
-
-handle_call(get_worker, From, State) ->
-    #state{
-        waiting = Waiting,
-        callers = Callers,
-        url = Url,
-        limit = Limit,
-        busy = Busy,
-        free = Free
-    } = State,
-    case length(Busy) >= Limit of
-    true ->
-        {noreply, State#state{waiting = queue:in(From, Waiting)}};
-    false ->
-        case Free of
-        [] ->
-           {ok, Worker} = ibrowse:spawn_link_worker_process(Url),
-           Free2 = Free;
-        [Worker | Free2] ->
-           ok
-        end,
-        NewState = State#state{
-            free = Free2,
-            busy = [Worker | Busy],
-            callers = monitor_client(Callers, Worker, From)
-        },
-        {reply, {ok, Worker}, NewState}
-    end;
-
-handle_call(stop, _From, State) ->
-    {stop, normal, ok, State}.
-
-
-handle_cast({release_worker, Worker}, State) ->
-    #state{waiting = Waiting, callers = Callers} = State,
-    NewCallers0 = demonitor_client(Callers, Worker),
-    case is_process_alive(Worker) andalso
-        lists:member(Worker, State#state.busy) of
-    true ->
-        case queue:out(Waiting) of
-        {empty, Waiting2} ->
-            NewCallers1 = NewCallers0,
-            Busy2 = State#state.busy -- [Worker],
-            Free2 = [Worker | State#state.free];
-        {{value, From}, Waiting2} ->
-            NewCallers1 = monitor_client(NewCallers0, Worker, From),
-            gen_server:reply(From, {ok, Worker}),
-            Busy2 = State#state.busy,
-            Free2 = State#state.free
-        end,
-        NewState = State#state{
-           busy = Busy2,
-           free = Free2,
-           waiting = Waiting2,
-           callers = NewCallers1
-        },
-        {noreply, NewState};
-   false ->
-        {noreply, State#state{callers = NewCallers0}}
-   end.
-
-handle_info({'EXIT', Pid, _Reason}, State) ->
-    #state{
-        url = Url,
-        busy = Busy,
-        free = Free,
-        waiting = Waiting,
-        callers = Callers
-    } = State,
-    NewCallers0 = demonitor_client(Callers, Pid),
-    case Free -- [Pid] of
-    Free ->
-        case Busy -- [Pid] of
-        Busy ->
-            {noreply, State#state{callers = NewCallers0}};
-        Busy2 ->
-            case queue:out(Waiting) of
-            {empty, _} ->
-                {noreply, State#state{busy = Busy2, callers = NewCallers0}};
-            {{value, From}, Waiting2} ->
-                {ok, Worker} = ibrowse:spawn_link_worker_process(Url),
-                NewCallers1 = monitor_client(NewCallers0, Worker, From),
-                gen_server:reply(From, {ok, Worker}),
-                NewState = State#state{
-                    busy = [Worker | Busy2],
-                    waiting = Waiting2,
-                    callers = NewCallers1
-                },
-                {noreply, NewState}
-            end
-        end;
-    Free2 ->
-        {noreply, State#state{free = Free2, callers = NewCallers0}}
-    end;
-
-handle_info({'DOWN', Ref, process, _, _}, #state{callers = Callers} = State) ->
-    case lists:keysearch(Ref, 2, Callers) of
-        {value, {Worker, Ref}} ->
-            handle_cast({release_worker, Worker}, State);
-        false ->
-            {noreply, State}
-    end.
-
-code_change(_OldVsn, OldState, _Extra) when tuple_size(OldState) =:= 7 ->
-    case element(7, OldState) of
-        EtsTable when is_integer(EtsTable) ->
-            NewState = setelement(7, OldState, ets:tab2list(EtsTable)),
-            ets:delete(EtsTable),
-            {ok, NewState};
-        Callers when is_list(Callers) ->
-            % Already upgraded
-            {ok, OldState}
-    end;
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-
-terminate(_Reason, State) ->
-    lists:foreach(fun ibrowse_http_client:stop/1, State#state.free),
-    lists:foreach(fun ibrowse_http_client:stop/1, State#state.busy).
-
-monitor_client(Callers, Worker, {ClientPid, _}) ->
-    [{Worker, erlang:monitor(process, ClientPid)} | Callers].
-
-demonitor_client(Callers, Worker) ->
-    case lists:keysearch(Worker, 1, Callers) of
-        {value, {Worker, MonRef}} ->
-            erlang:demonitor(MonRef, [flush]),
-            lists:keydelete(Worker, 1, Callers);
-        false ->
-            Callers
-    end.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/550e8202/src/couch_replicator/src/couch_replicator_httpd.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_httpd.erl b/src/couch_replicator/src/couch_replicator_httpd.erl
deleted file mode 100644
index f84152c..0000000
--- a/src/couch_replicator/src/couch_replicator_httpd.erl
+++ /dev/null
@@ -1,65 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_httpd).
-
--include_lib("couch/include/couch_db.hrl").
-
--import(couch_httpd, [
-    send_json/2,
-    send_json/3,
-    send_method_not_allowed/2
-]).
-
--import(couch_util, [
-    to_binary/1
-]).
-
--export([handle_req/1]).
-
-
-handle_req(#httpd{method = 'POST', user_ctx = UserCtx} = Req) ->
-    couch_httpd:validate_ctype(Req, "application/json"),
-    RepDoc = {Props} = couch_httpd:json_body_obj(Req),
-    validate_rep_props(Props),
-    case couch_replicator:replicate(RepDoc, UserCtx) of
-    {error, {Error, Reason}} ->
-        send_json(
-            Req, 404,
-            {[{error, to_binary(Error)}, {reason, to_binary(Reason)}]});
-    {error, not_found} ->
-        % Tried to cancel a replication that didn't exist.
-        send_json(Req, 404, {[{error, <<"not found">>}]});
-    {error, Reason} ->
-        send_json(Req, 500, {[{error, to_binary(Reason)}]});
-    {ok, {cancelled, RepId}} ->
-        send_json(Req, 200, {[{ok, true}, {<<"_local_id">>, RepId}]});
-    {ok, {continuous, RepId}} ->
-        send_json(Req, 202, {[{ok, true}, {<<"_local_id">>, RepId}]});
-    {ok, {HistoryResults}} ->
-        send_json(Req, {[{ok, true} | HistoryResults]})
-    end;
-
-handle_req(Req) ->
-    send_method_not_allowed(Req, "POST").
-
-validate_rep_props([]) ->
-    ok;
-validate_rep_props([{<<"query_params">>, {Params}}|Rest]) ->
-    lists:foreach(fun
-        ({_,V}) when is_binary(V) -> ok;
-        ({K,_}) -> throw({bad_request,
-            <<K/binary," value must be a string.">>})
-        end, Params),
-    validate_rep_props(Rest);
-validate_rep_props([_|Rest]) ->
-    validate_rep_props(Rest).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/550e8202/src/couch_replicator/src/couch_replicator_job_sup.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_job_sup.erl b/src/couch_replicator/src/couch_replicator_job_sup.erl
deleted file mode 100644
index 3cce46c..0000000
--- a/src/couch_replicator/src/couch_replicator_job_sup.erl
+++ /dev/null
@@ -1,29 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_job_sup).
--behaviour(supervisor).
--export([init/1, start_link/0]).
-
-start_link() ->
-    supervisor:start_link({local,?MODULE}, ?MODULE, []).
-
-%%=============================================================================
-%% supervisor callbacks
-%%=============================================================================
-
-init([]) ->
-    {ok, {{one_for_one, 3, 10}, []}}.
-
-%%=============================================================================
-%% internal functions
-%%=============================================================================

http://git-wip-us.apache.org/repos/asf/couchdb/blob/550e8202/src/couch_replicator/src/couch_replicator_js_functions.hrl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_js_functions.hrl b/src/couch_replicator/src/couch_replicator_js_functions.hrl
deleted file mode 100644
index 3f1db7c..0000000
--- a/src/couch_replicator/src/couch_replicator_js_functions.hrl
+++ /dev/null
@@ -1,151 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License.  You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--define(REP_DB_DOC_VALIDATE_FUN, <<"
-    function(newDoc, oldDoc, userCtx) {
-        function reportError(error_msg) {
-            log('Error writing document `' + newDoc._id +
-                '\\' to the replicator database: ' + error_msg);
-            throw({forbidden: error_msg});
-        }
-
-        function validateEndpoint(endpoint, fieldName) {
-            if ((typeof endpoint !== 'string') &&
-                ((typeof endpoint !== 'object') || (endpoint === null))) {
-
-                reportError('The `' + fieldName + '\\' property must exist' +
-                    ' and be either a string or an object.');
-            }
-
-            if (typeof endpoint === 'object') {
-                if ((typeof endpoint.url !== 'string') || !endpoint.url) {
-                    reportError('The url property must exist in the `' +
-                        fieldName + '\\' field and must be a non-empty string.');
-                }
-
-                if ((typeof endpoint.auth !== 'undefined') &&
-                    ((typeof endpoint.auth !== 'object') ||
-                        endpoint.auth === null)) {
-
-                    reportError('`' + fieldName +
-                        '.auth\\' must be a non-null object.');
-                }
-
-                if ((typeof endpoint.headers !== 'undefined') &&
-                    ((typeof endpoint.headers !== 'object') ||
-                        endpoint.headers === null)) {
-
-                    reportError('`' + fieldName +
-                        '.headers\\' must be a non-null object.');
-                }
-            }
-        }
-
-        var isReplicator = (userCtx.roles.indexOf('_replicator') >= 0);
-        var isAdmin = (userCtx.roles.indexOf('_admin') >= 0);
-
-        if (oldDoc && !newDoc._deleted && !isReplicator &&
-            (oldDoc._replication_state === 'triggered')) {
-            reportError('Only the replicator can edit replication documents ' +
-                'that are in the triggered state.');
-        }
-
-        if (!newDoc._deleted) {
-            validateEndpoint(newDoc.source, 'source');
-            validateEndpoint(newDoc.target, 'target');
-
-            if ((typeof newDoc.create_target !== 'undefined') &&
-                (typeof newDoc.create_target !== 'boolean')) {
-
-                reportError('The `create_target\\' field must be a boolean.');
-            }
-
-            if ((typeof newDoc.continuous !== 'undefined') &&
-                (typeof newDoc.continuous !== 'boolean')) {
-
-                reportError('The `continuous\\' field must be a boolean.');
-            }
-
-            if ((typeof newDoc.doc_ids !== 'undefined') &&
-                !isArray(newDoc.doc_ids)) {
-
-                reportError('The `doc_ids\\' field must be an array of strings.');
-            }
-
-            if ((typeof newDoc.filter !== 'undefined') &&
-                ((typeof newDoc.filter !== 'string') || !newDoc.filter)) {
-
-                reportError('The `filter\\' field must be a non-empty string.');
-            }
-
-            if ((typeof newDoc.query_params !== 'undefined') &&
-                ((typeof newDoc.query_params !== 'object') ||
-                    newDoc.query_params === null)) {
-
-                reportError('The `query_params\\' field must be an object.');
-            }
-
-            if (newDoc.user_ctx) {
-                var user_ctx = newDoc.user_ctx;
-
-                if ((typeof user_ctx !== 'object') || (user_ctx === null)) {
-                    reportError('The `user_ctx\\' property must be a ' +
-                        'non-null object.');
-                }
-
-                if (!(user_ctx.name === null ||
-                    (typeof user_ctx.name === 'undefined') ||
-                    ((typeof user_ctx.name === 'string') &&
-                        user_ctx.name.length > 0))) {
-
-                    reportError('The `user_ctx.name\\' property must be a ' +
-                        'non-empty string or null.');
-                }
-
-                if (!isAdmin && (user_ctx.name !== userCtx.name)) {
-                    reportError('The given `user_ctx.name\\' is not valid');
-                }
-
-                if (user_ctx.roles && !isArray(user_ctx.roles)) {
-                    reportError('The `user_ctx.roles\\' property must be ' +
-                        'an array of strings.');
-                }
-
-                if (!isAdmin && user_ctx.roles) {
-                    for (var i = 0; i < user_ctx.roles.length; i++) {
-                        var role = user_ctx.roles[i];
-
-                        if (typeof role !== 'string' || role.length === 0) {
-                            reportError('Roles must be non-empty strings.');
-                        }
-                        if (userCtx.roles.indexOf(role) === -1) {
-                            reportError('Invalid role (`' + role +
-                                '\\') in the `user_ctx\\'');
-                        }
-                    }
-                }
-            } else {
-                if (!isAdmin) {
-                    reportError('The `user_ctx\\' property is missing (it is ' +
-                       'optional for admins only).');
-                }
-            }
-        } else {
-            if (!isAdmin) {
-                if (!oldDoc.user_ctx || (oldDoc.user_ctx.name !== userCtx.name)) {
-                    reportError('Replication documents can only be deleted by ' +
-                        'admins or by the users who created them.');
-                }
-            }
-        }
-    }
-">>).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/550e8202/src/couch_replicator/src/couch_replicator_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_manager.erl b/src/couch_replicator/src/couch_replicator_manager.erl
deleted file mode 100644
index 047f573..0000000
--- a/src/couch_replicator/src/couch_replicator_manager.erl
+++ /dev/null
@@ -1,889 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_manager).
--behaviour(gen_server).
--behaviour(config_listener).
-
-% public API
--export([replication_started/1, replication_completed/2, replication_error/2]).
-
--export([before_doc_update/2, after_doc_read/2]).
-
-% gen_server callbacks
--export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]).
--export([code_change/3, terminate/2]).
-
-% config_listener callback
--export([handle_config_change/5]).
-
--include_lib("couch/include/couch_db.hrl").
--include("couch_replicator.hrl").
--include("couch_replicator_js_functions.hrl").
-
--define(DOC_TO_REP, couch_rep_doc_id_to_rep_id).
--define(REP_TO_STATE, couch_rep_id_to_rep_state).
--define(INITIAL_WAIT, 2.5). % seconds
--define(MAX_WAIT, 600).     % seconds
--define(OWNER, <<"owner">>).
-
--define(DB_TO_SEQ, db_to_seq).
--define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}).
-
--define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
-
--record(rep_state, {
-    dbname,
-    rep,
-    starting,
-    retries_left,
-    max_retries,
-    wait = ?INITIAL_WAIT
-}).
-
--import(couch_util, [
-    to_binary/1
-]).
-
--record(state, {
-    db_notifier = nil,
-    scan_pid = nil,
-    rep_start_pids = [],
-    max_retries
-}).
-
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-
-replication_started(#rep{id = {BaseId, _} = RepId}) ->
-    case rep_state(RepId) of
-    nil ->
-        ok;
-    #rep_state{dbname = DbName, rep = #rep{doc_id = DocId}} ->
-        update_rep_doc(DbName, DocId, [
-            {<<"_replication_state">>, <<"triggered">>},
-            {<<"_replication_id">>, ?l2b(BaseId)},
-            {<<"_replication_stats">>, undefined}]),
-        ok = gen_server:call(?MODULE, {rep_started, RepId}, infinity),
-        twig:log(notice, "Document `~s` triggered replication `~s`",
-            [DocId, pp_rep_id(RepId)])
-    end.
-
-
-replication_completed(#rep{id = RepId}, Stats) ->
-    case rep_state(RepId) of
-    nil ->
-        ok;
-    #rep_state{dbname = DbName, rep = #rep{doc_id = DocId}} ->
-        update_rep_doc(DbName, DocId, [
-            {<<"_replication_state">>, <<"completed">>},
-            {<<"_replication_stats">>, {Stats}}]),
-        ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity),
-        twig:log(notice, "Replication `~s` finished (triggered by document `~s`)",
-            [pp_rep_id(RepId), DocId])
-    end.
-
-
-replication_error(#rep{id = {BaseId, _} = RepId}, Error) ->
-    case rep_state(RepId) of
-    nil ->
-        ok;
-    #rep_state{dbname = DbName, rep = #rep{doc_id = DocId}} ->
-        % TODO: maybe add error reason to replication document
-        update_rep_doc(DbName, DocId, [
-            {<<"_replication_state">>, <<"error">>},
-            {<<"_replication_id">>, ?l2b(BaseId)}]),
-        ok = gen_server:call(?MODULE, {rep_error, RepId, Error}, infinity)
-    end.
-
-
-handle_config_change("replicator", "db", _, _, S) ->
-    ok = gen_server:call(S, rep_db_changed),
-    remove_handler;
-handle_config_change("replicator", "max_replication_retry_count", V, _, S) ->
-    ok = gen_server:cast(S, {set_max_retries, retries_value(V)}),
-    {ok, S};
-handle_config_change(_, _, _, _, S) ->
-    {ok, S}.
-
-
-init(_) ->
-    process_flag(trap_exit, true),
-    net_kernel:monitor_nodes(true),
-    ?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, public]),
-    ?REP_TO_STATE = ets:new(?REP_TO_STATE, [named_table, set, public]),
-    ?DB_TO_SEQ = ets:new(?DB_TO_SEQ, [named_table, set, public]),
-    Server = self(),
-    ok = config:listen_for_changes(?MODULE, Server),
-    ScanPid = spawn_link(fun() -> scan_all_dbs(Server) end),
-    % Automatically start node local changes feed loop
-    LocalRepDb = ?l2b(config:get("replicator", "db", "_replicator")),
-    Pid = changes_feed_loop(LocalRepDb, 0),
-    {ok, #state{
-        db_notifier = db_update_notifier(),
-        scan_pid = ScanPid,
-        max_retries = retries_value(
-            config:get("replicator", "max_replication_retry_count", "10")),
-        rep_start_pids = [Pid]
-    }}.
-
-
-handle_call({rep_db_update, DbName, {ChangeProps} = Change}, _From, State) ->
-    NewState = try
-        process_update(State, DbName, Change)
-    catch
-    _Tag:Error ->
-        {RepProps} = get_json_value(doc, ChangeProps),
-        DocId = get_json_value(<<"_id">>, RepProps),
-        rep_db_update_error(Error, DbName, DocId),
-        State
-    end,
-    {reply, ok, NewState};
-
-
-handle_call({rep_started, RepId}, _From, State) ->
-    case rep_state(RepId) of
-    nil ->
-        ok;
-    RepState ->
-        NewRepState = RepState#rep_state{
-            starting = false,
-            retries_left = State#state.max_retries,
-            max_retries = State#state.max_retries,
-            wait = ?INITIAL_WAIT
-        },
-        true = ets:insert(?REP_TO_STATE, {RepId, NewRepState})
-    end,
-    {reply, ok, State};
-
-handle_call({rep_complete, RepId}, _From, State) ->
-    true = ets:delete(?REP_TO_STATE, RepId),
-    {reply, ok, State};
-
-handle_call({rep_error, RepId, Error}, _From, State) ->
-    {reply, ok, replication_error(State, RepId, Error)};
-
-handle_call({resume_scan, DbName}, _From, State) ->
-    Since = case ets:lookup(?DB_TO_SEQ, DbName) of
-        [] -> 0;
-        [{DbName, EndSeq}] -> EndSeq
-    end,
-    Pid = changes_feed_loop(DbName, Since),
-    twig:log(debug, "Scanning ~s from update_seq ~p", [DbName, Since]),
-    {reply, ok, State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}};
-
-handle_call({rep_db_checkpoint, DbName, EndSeq}, _From, State) ->
-    true = ets:insert(?DB_TO_SEQ, {DbName, EndSeq}),
-    {reply, ok, State};
-
-handle_call(rep_db_changed, _From, State) ->
-    {stop, shutdown, ok, State};
-
-handle_call(Msg, From, State) ->
-    twig:log(error, "Replication manager received unexpected call ~p from ~p",
-        [Msg, From]),
-    {stop, {error, {unexpected_call, Msg}}, State}.
-
-handle_cast({set_max_retries, MaxRetries}, State) ->
-    {noreply, State#state{max_retries = MaxRetries}};
-
-handle_cast(Msg, State) ->
-    twig:log(error, "Replication manager received unexpected cast ~p", [Msg]),
-    {stop, {error, {unexpected_cast, Msg}}, State}.
-
-handle_info({nodeup, _Node}, State) ->
-    {noreply, rescan(State)};
-
-handle_info({nodedown, _Node}, State) ->
-    {noreply, rescan(State)};
-
-handle_info({'EXIT', From, normal}, #state{scan_pid = From} = State) ->
-    twig:log(debug, "Background scan has completed.", []),
-    {noreply, State#state{scan_pid=nil}};
-
-handle_info({'EXIT', From, Reason}, #state{scan_pid = From} = State) ->
-    twig:log(error, "Background scanner died. Reason: ~p", [Reason]),
-    {stop, {scanner_died, Reason}, State};
-
-handle_info({'EXIT', From, Reason}, #state{db_notifier = From} = State) ->
-    twig:log(error, "Database update notifier died. Reason: ~p", [Reason]),
-    {stop, {db_update_notifier_died, Reason}, State};
-
-handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) ->
-    % one of the replication start processes terminated successfully
-    {noreply, State#state{rep_start_pids = Pids -- [From]}};
-
-handle_info({'DOWN', _Ref, _, _, _}, State) ->
-    % From a db monitor created by a replication process. Ignore.
-    {noreply, State};
-
-handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, State) ->
-    erlang:send_after(5000, self(), restart_config_listener),
-    {noreply, State};
-
-handle_info(restart_config_listener, State) ->
-    ok = config:listen_for_changes(?MODULE, self()),
-    {noreply, State};
-
-handle_info(shutdown, State) ->
-    {stop, shutdown, State};
-
-handle_info(Msg, State) ->
-    twig:log(error,"Replication manager received unexpected message ~p", [Msg]),
-    {stop, {unexpected_msg, Msg}, State}.
-
-
-terminate(_Reason, State) ->
-    #state{
-        scan_pid = ScanPid,
-        rep_start_pids = StartPids,
-        db_notifier = DbNotifier
-    } = State,
-    stop_all_replications(),
-    lists:foreach(
-        fun(Pid) ->
-            catch unlink(Pid),
-            catch exit(Pid, stop)
-        end,
-        [ScanPid | StartPids]),
-    true = ets:delete(?REP_TO_STATE),
-    true = ets:delete(?DOC_TO_REP),
-    true = ets:delete(?DB_TO_SEQ),
-    couch_db_update_notifier:stop(DbNotifier).
-
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-changes_feed_loop(<<"shards/", _/binary>>=DbName, Since) ->
-    Server = self(),
-    Pid = spawn_link(
-        fun() ->
-            fabric:changes(DbName, fun
-            ({change, Change}, Acc) ->
-                case has_valid_rep_id(Change) of
-                true ->
-                    ok = gen_server:call(
-                        Server, {rep_db_update, DbName, Change}, infinity);
-                false ->
-                    ok
-                end,
-                {ok, Acc};
-            ({stop, EndSeq}, Acc) ->
-                ok = gen_server:call(Server, {rep_db_checkpoint, DbName, EndSeq}, infinity),
-                {ok, Acc};
-            (_, Acc) ->
-                {ok, Acc}
-            end,
-            nil,
-            #changes_args{
-                include_docs = true,
-                feed = "longpoll",
-                since = Since,
-                filter = main_only,
-                timeout = infinity
-                }
-            )
-        end),
-    Pid;
-changes_feed_loop(DbName, Since) ->
-    ensure_rep_db_exists(DbName),
-    Server = self(),
-    spawn_link(fun() ->
-        UserCtx = #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]},
-        DbOpenOptions = [{user_ctx, UserCtx}, sys_db],
-        {ok, Db} = couch_db:open_int(DbName, DbOpenOptions),
-        ChangesFeedFun = couch_changes:handle_changes(
-            #changes_args{
-                include_docs = true,
-                since = Since,
-                feed = "continuous",
-                timeout = infinity
-            },
-            {json_req, null},
-            Db
-        ),
-        EnumFun = fun
-        ({change, Change, _}, _) ->
-            case has_valid_rep_id(Change) of
-                true ->
-                    Msg = {rep_db_update, DbName, Change},
-                    ok = gen_server:call(Server, Msg, infinity);
-                false ->
-                    ok
-            end;
-        (_, _) ->
-            ok
-        end,
-        ChangesFeedFun(EnumFun)
-    end).
-
-
-has_valid_rep_id({Change}) ->
-    has_valid_rep_id(get_json_value(<<"id">>, Change));
-has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) ->
-    false;
-has_valid_rep_id(_Else) ->
-    true.
-
-db_update_notifier() ->
-    Server = self(),
-    IsReplicatorDbFun = is_replicator_db_fun(),
-    {ok, Notifier} = couch_db_update_notifier:start_link(fun
-        ({Event, ShardDbName})
-                when Event == created; Event == updated; Event == deleted ->
-            DbName = mem3:dbname(ShardDbName),
-            IsRepDb = IsReplicatorDbFun(DbName),
-            case Event of
-                created when IsRepDb ->
-                    ensure_rep_ddoc_exists(DbName);
-                updated when IsRepDb ->
-                    ensure_rep_ddoc_exists(DbName),
-                    Msg = {resume_scan, DbName},
-                    ok = gen_server:call(Server, Msg, infinity);
-                deleted when IsRepDb ->
-                    clean_up_replications(DbName);
-                _ ->
-                    ok
-            end;
-        (_Event) ->
-            ok
-        end
-    ),
-    Notifier.
-
-rescan(#state{scan_pid = nil} = State) ->
-    true = ets:delete_all_objects(?DB_TO_SEQ),
-    Server = self(),
-    NewScanPid = spawn_link(fun() -> scan_all_dbs(Server) end),
-    State#state{scan_pid = NewScanPid};
-rescan(#state{scan_pid = ScanPid} = State) ->
-    unlink(ScanPid),
-    exit(ScanPid, exit),
-    rescan(State#state{scan_pid = nil}).
-
-process_update(State, DbName, {Change}) ->
-    {RepProps} = JsonRepDoc = get_json_value(doc, Change),
-    DocId = get_json_value(<<"_id">>, RepProps),
-    case {is_owner(DbName, DocId), get_json_value(deleted, Change, false)} of
-    {false, _} ->
-        replication_complete(DbName, DocId),
-        State;
-    {true, true} ->
-        rep_doc_deleted(DbName, DocId),
-        State;
-    {true, false} ->
-        case get_json_value(<<"_replication_state">>, RepProps) of
-        undefined ->
-            maybe_start_replication(State, DbName, DocId, JsonRepDoc);
-        <<"triggered">> ->
-            maybe_start_replication(State, DbName, DocId, JsonRepDoc);
-        <<"completed">> ->
-            replication_complete(DbName, DocId),
-            State;
-        <<"error">> ->
-            case ets:lookup(?DOC_TO_REP, {DbName, DocId}) of
-            [] ->
-                maybe_start_replication(State, DbName, DocId, JsonRepDoc);
-            _ ->
-                State
-            end
-        end
-    end.
-
-
-is_owner(<<"shards/", _/binary>>=DbName, DocId) ->
-    mem3_util:owner(DbName, DocId);
-is_owner(_, _) ->
-    true.
-
-
-rep_db_update_error(Error, DbName, DocId) ->
-    case Error of
-    {bad_rep_doc, Reason} ->
-        ok;
-    _ ->
-        Reason = to_binary(Error)
-    end,
-    twig:log(error,"Replication manager, error processing document `~s`: ~s",
-        [DocId, Reason]),
-    update_rep_doc(DbName, DocId, [{<<"_replication_state">>, <<"error">>}]).
-
-
-rep_user_ctx({RepDoc}) ->
-    case get_json_value(<<"user_ctx">>, RepDoc) of
-    undefined ->
-        #user_ctx{};
-    {UserCtx} ->
-        #user_ctx{
-            name = get_json_value(<<"name">>, UserCtx, null),
-            roles = get_json_value(<<"roles">>, UserCtx, [])
-        }
-    end.
-
-
-maybe_start_replication(State, DbName, DocId, RepDoc) ->
-    #rep{id = {BaseId, _} = RepId} = Rep = parse_rep_doc(RepDoc),
-    case rep_state(RepId) of
-    nil ->
-        RepState = #rep_state{
-            dbname = DbName,
-            rep = Rep,
-            starting = true,
-            retries_left = State#state.max_retries,
-            max_retries = State#state.max_retries
-        },
-        true = ets:insert(?REP_TO_STATE, {RepId, RepState}),
-        true = ets:insert(?DOC_TO_REP, {{DbName, DocId}, RepId}),
-        twig:log(notice,"Attempting to start replication `~s` (document `~s`).",
-            [pp_rep_id(RepId), DocId]),
-        Pid = spawn_link(fun() -> start_replication(Rep, 0) end),
-        State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
-    #rep_state{rep = #rep{doc_id = DocId}} ->
-        State;
-    #rep_state{starting = false, dbname = DbName, rep = #rep{doc_id = OtherDocId}} ->
-        twig:log(notice, "The replication specified by the document `~s` was already"
-            " triggered by the document `~s`", [DocId, OtherDocId]),
-        maybe_tag_rep_doc(DbName, DocId, RepDoc, ?l2b(BaseId)),
-        State;
-    #rep_state{starting = true, dbname = DbName, rep = #rep{doc_id = OtherDocId}} ->
-        twig:log(notice, "The replication specified by the document `~s` is already"
-            " being triggered by the document `~s`", [DocId, OtherDocId]),
-        maybe_tag_rep_doc(DbName, DocId, RepDoc, ?l2b(BaseId)),
-        State
-    end.
-
-
-parse_rep_doc(RepDoc) ->
-    {ok, Rep} = try
-        couch_replicator_utils:parse_rep_doc(RepDoc, rep_user_ctx(RepDoc))
-    catch
-    throw:{error, Reason} ->
-        throw({bad_rep_doc, Reason});
-    Tag:Err ->
-        throw({bad_rep_doc, to_binary({Tag, Err})})
-    end,
-    Rep.
-
-
-maybe_tag_rep_doc(DbName, DocId, {RepProps}, RepId) ->
-    case get_json_value(<<"_replication_id">>, RepProps) of
-    RepId ->
-        ok;
-    _ ->
-        update_rep_doc(DbName, DocId, [{<<"_replication_id">>, RepId}])
-    end.
-
-%% note to self: this is markedly diff from mem3_rep_manager
-start_replication(Rep, Wait) ->
-    ok = timer:sleep(Wait * 1000),
-    case (catch couch_replicator:async_replicate(Rep)) of
-    {ok, _} ->
-        ok;
-    Error ->
-        replication_error(Rep, Error)
-    end.
-
-replication_complete(DbName, DocId) ->
-    case ets:lookup(?DOC_TO_REP, {DbName, DocId}) of
-    [{{DbName, DocId}, {BaseId, Ext} = RepId}] ->
-        case rep_state(RepId) of
-        nil ->
-            % Prior to OTP R14B02, temporary child specs remain in
-            % in the supervisor after a worker finishes - remove them.
-            % We want to be able to start the same replication but with
-            % eventually different values for parameters that don't
-            % contribute to its ID calculation.
-            case erlang:system_info(otp_release) < "R14B02" of
-            true ->
-                spawn(fun() ->
-                    _ = supervisor:delete_child(couch_replicator_job_sup, BaseId ++ Ext)
-                end);
-            false ->
-                ok
-            end;
-        #rep_state{} ->
-            ok
-        end,
-        true = ets:delete(?DOC_TO_REP, {DbName, DocId});
-    _ ->
-        ok
-    end.
-
-
-rep_doc_deleted(DbName, DocId) ->
-    case ets:lookup(?DOC_TO_REP, {DbName, DocId}) of
-    [{{DbName, DocId}, RepId}] ->
-        couch_replicator:cancel_replication(RepId),
-        true = ets:delete(?REP_TO_STATE, RepId),
-        true = ets:delete(?DOC_TO_REP, {DbName, DocId}),
-        twig:log(notice, "Stopped replication `~s` because replication document `~s`"
-            " was deleted", [pp_rep_id(RepId), DocId]);
-    [] ->
-        ok
-    end.
-
-
-replication_error(State, RepId, Error) ->
-    case rep_state(RepId) of
-    nil ->
-        State;
-    RepState ->
-        maybe_retry_replication(RepState, Error, State)
-    end.
-
-maybe_retry_replication(#rep_state{retries_left = 0} = RepState, Error, State) ->
-    #rep_state{
-        dbname = DbName,
-        rep = #rep{id = RepId, doc_id = DocId},
-        max_retries = MaxRetries
-    } = RepState,
-    couch_replicator:cancel_replication(RepId),
-    true = ets:delete(?REP_TO_STATE, RepId),
-    true = ets:delete(?DOC_TO_REP, {DbName, DocId}),
-    twig:log(error, "Error in replication `~s` (triggered by document `~s`): ~s"
-        "~nReached maximum retry attempts (~p).",
-        [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), MaxRetries]),
-    State;
-
-maybe_retry_replication(RepState, Error, State) ->
-    #rep_state{
-        rep = #rep{id = RepId, doc_id = DocId} = Rep
-    } = RepState,
-    #rep_state{wait = Wait} = NewRepState = state_after_error(RepState),
-    true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}),
-    twig:log(error, "Error in replication `~s` (triggered by document `~s`): ~s"
-        "~nRestarting replication in ~p seconds.",
-        [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), Wait]),
-    Pid = spawn_link(fun() -> start_replication(Rep, Wait) end),
-    State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}.
-
-
-stop_all_replications() ->
-    twig:log(notice, "Stopping all ongoing replications because the replicator"
-        " database was deleted or changed", []),
-    ets:foldl(
-        fun({_, RepId}, _) ->
-            couch_replicator:cancel_replication(RepId)
-        end,
-        ok, ?DOC_TO_REP),
-    true = ets:delete_all_objects(?REP_TO_STATE),
-    true = ets:delete_all_objects(?DOC_TO_REP),
-    true = ets:delete_all_objects(?DB_TO_SEQ).
-
-clean_up_replications(DbName) ->
-    ets:foldl(
-        fun({{Name, DocId}, RepId}, _) when Name =:= DbName ->
-            couch_replicator:cancel_replication(RepId),
-            ets:delete(?DOC_TO_REP,{Name, DocId}),
-            ets:delete(?REP_TO_STATE, RepId);
-           ({_,_}, _) ->
-            ok
-        end,
-        ok, ?DOC_TO_REP),
-    ets:delete(?DB_TO_SEQ,DbName).
-
-
-update_rep_doc(RepDbName, RepDocId, KVs) when is_binary(RepDocId) ->
-    try
-        case open_rep_doc(RepDbName, RepDocId) of
-            {ok, LastRepDoc} ->
-                update_rep_doc(RepDbName, LastRepDoc, KVs);
-            _ ->
-                ok
-        end
-    catch
-        throw:conflict ->
-            Msg = "Conflict when updating replication document `~s`. Retrying.",
-            twig:log(error, Msg, [RepDocId]),
-            ok = timer:sleep(5),
-            update_rep_doc(RepDbName, RepDocId, KVs)
-    end;
-update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
-    NewRepDocBody = lists:foldl(
-        fun({K, undefined}, Body) ->
-                lists:keydelete(K, 1, Body);
-           ({<<"_replication_state">> = K, State} = KV, Body) ->
-                case get_json_value(K, Body) of
-                State ->
-                    Body;
-                _ ->
-                    Body1 = lists:keystore(K, 1, Body, KV),
-                    lists:keystore(
-                        <<"_replication_state_time">>, 1, Body1,
-                        {<<"_replication_state_time">>, timestamp()})
-                end;
-            ({K, _V} = KV, Body) ->
-                lists:keystore(K, 1, Body, KV)
-        end,
-        RepDocBody, KVs),
-    case NewRepDocBody of
-    RepDocBody ->
-        ok;
-    _ ->
-        % Might not succeed - when the replication doc is deleted right
-        % before this update (not an error, ignore).
-        save_rep_doc(RepDbName, RepDoc#doc{body = {NewRepDocBody}})
-    end.
-
-
-open_rep_doc(<<"shards/", _/binary>>=ShardDbName, DocId) ->
-    defer_call(fun() ->
-        fabric:open_doc(mem3:dbname(ShardDbName), DocId, [])
-    end);
-open_rep_doc(DbName, DocId) ->
-    {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
-    try
-        couch_db:open_doc(Db, DocId, [ejson_body])
-    after
-        couch_db:close(Db)
-    end.
-
-save_rep_doc(<<"shards/", _/binary>>=DbName, Doc) ->
-    defer_call(fun() ->
-        fabric:update_doc(DbName, Doc, [?CTX])
-    end);
-save_rep_doc(DbName, Doc) ->
-    {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
-    try
-        couch_db:update_doc(Db, Doc, [])
-    after
-        couch_db:close(Db)
-    end.
-
-defer_call(Fun) ->
-    {Pid, Ref} = erlang:spawn_monitor(fun() ->
-        try
-            exit({exit_ok, Fun()})
-        catch
-            Type:Reason ->
-                exit({exit_err, Type, Reason})
-        end
-    end),
-    receive
-        {'DOWN', Ref, process, Pid, {exit_ok, Resp}} ->
-            Resp;
-        {'DOWN', Ref, process, Pid, {exit_err, throw, Error}} ->
-            throw(Error);
-        {'DOWN', Ref, process, Pid, {exit_err, error, Error}} ->
-            erlang:error(Error);
-        {'DOWN', Ref, process, Pid, {exit_err, exit, Error}} ->
-            exit(Error)
-    end.
-
-
-% RFC3339 timestamps.
-% Note: doesn't include the time seconds fraction (RFC3339 says it's optional).
-timestamp() ->
-    {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(now()),
-    UTime = erlang:universaltime(),
-    LocalTime = calendar:universal_time_to_local_time(UTime),
-    DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) -
-        calendar:datetime_to_gregorian_seconds(UTime),
-    zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60),
-    iolist_to_binary(
-        io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w~s",
-            [Year, Month, Day, Hour, Min, Sec,
-                zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])).
-
-zone(Hr, Min) when Hr >= 0, Min >= 0 ->
-    io_lib:format("+~2..0w:~2..0w", [Hr, Min]);
-zone(Hr, Min) ->
-    io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]).
-
-
-ensure_rep_db_exists(DbName) ->
-    Db = case couch_db:open_int(DbName, [?CTX, sys_db, nologifmissing]) of
-        {ok, Db0} ->
-            Db0;
-        _Error ->
-            {ok, Db0} = couch_db:create(DbName, [?CTX, sys_db]),
-            Db0
-    end,
-    ensure_rep_ddoc_exists(DbName),
-    {ok, Db}.
-
-
-ensure_rep_ddoc_exists(RepDb) ->
-    DDocId = <<"_design/_replicator">>,
-    case open_rep_doc(RepDb, DDocId) of
-        {ok, _Doc} ->
-            ok;
-        _ ->
-            DDoc = couch_doc:from_json_obj({[
-                {<<"_id">>, DDocId},
-                {<<"language">>, <<"javascript">>},
-                {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
-            ]}),
-            try
-                {ok, _} = save_rep_doc(RepDb, DDoc)
-            catch
-                throw:conflict ->
-                    % NFC what to do about this other than
-                    % not kill the process.
-                    ok
-            end
-    end.
-
-
-% pretty-print replication id
-pp_rep_id(#rep{id = RepId}) ->
-    pp_rep_id(RepId);
-pp_rep_id({Base, Extension}) ->
-    Base ++ Extension.
-
-
-rep_state(RepId) ->
-    case ets:lookup(?REP_TO_STATE, RepId) of
-    [{RepId, RepState}] ->
-        RepState;
-    [] ->
-        nil
-    end.
-
-
-error_reason({error, Reason}) ->
-    Reason;
-error_reason(Reason) ->
-    Reason.
-
-
-retries_value("infinity") ->
-    infinity;
-retries_value(Value) ->
-    list_to_integer(Value).
-
-
-state_after_error(#rep_state{retries_left = Left, wait = Wait} = State) ->
-    Wait2 = erlang:min(trunc(Wait * 2), ?MAX_WAIT),
-    case Left of
-    infinity ->
-        State#rep_state{wait = Wait2};
-    _ ->
-        State#rep_state{retries_left = Left - 1, wait = Wait2}
-    end.
-
-
-before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
-    Doc;
-before_doc_update(#doc{body = {Body}} = Doc, #db{user_ctx=UserCtx} = Db) ->
-    #user_ctx{roles = Roles, name = Name} = UserCtx,
-    case lists:member(<<"_replicator">>, Roles) of
-    true ->
-        Doc;
-    false ->
-        case couch_util:get_value(?OWNER, Body) of
-        undefined ->
-            Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
-        Name ->
-            Doc;
-        Other ->
-            case (catch couch_db:check_is_admin(Db)) of
-            ok when Other =:= null ->
-                Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
-            ok ->
-                Doc;
-            _ ->
-                throw({forbidden, <<"Can't update replication documents",
-                    " from other users.">>})
-            end
-        end
-    end.
-
-
-after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
-    Doc;
-after_doc_read(#doc{body = {Body}} = Doc, #db{user_ctx=UserCtx} = Db) ->
-    #user_ctx{name = Name} = UserCtx,
-    case (catch couch_db:check_is_admin(Db)) of
-    ok ->
-        Doc;
-    _ ->
-        case couch_util:get_value(?OWNER, Body) of
-        Name ->
-            Doc;
-        _Other ->
-            Source = strip_credentials(couch_util:get_value(<<"source">>,
-Body)),
-            Target = strip_credentials(couch_util:get_value(<<"target">>,
-Body)),
-            NewBody0 = ?replace(Body, <<"source">>, Source),
-            NewBody = ?replace(NewBody0, <<"target">>, Target),
-            #doc{revs = {Pos, [_ | Revs]}} = Doc,
-            NewDoc = Doc#doc{body = {NewBody}, revs = {Pos - 1, Revs}},
-            NewRevId = couch_db:new_revid(NewDoc),
-            NewDoc#doc{revs = {Pos, [NewRevId | Revs]}}
-        end
-    end.
-
-strip_credentials(Url) when is_binary(Url) ->
-    re:replace(Url,
-        "http(s)?://(?:[^:]+):[^@]+@(.*)$",
-        "http\\1://\\2",
-        [{return, binary}]);
-strip_credentials({Props}) ->
-    {lists:keydelete(<<"oauth">>, 1, Props)}.
-
-scan_all_dbs(Server) when is_pid(Server) ->
-    {ok, Db} = mem3_util:ensure_exists(config:get("mem3", "shard_db", "dbs")),
-    ChangesFun = couch_changes:handle_changes(#changes_args{}, nil, Db),
-    IsReplicatorDbFun = is_replicator_db_fun(),
-    ChangesFun(fun({change, {Change}, _}, _) ->
-        DbName = get_json_value(<<"id">>, Change),
-        case DbName of <<"_design/", _/binary>> -> ok; _Else ->
-            case couch_replicator_utils:is_deleted(Change) of
-            true ->
-                ok;
-            false ->
-                case IsReplicatorDbFun(DbName) of
-                    true ->
-                        ensure_rep_ddoc_exists(DbName),
-                        gen_server:call(Server, {resume_scan, DbName});
-                    false ->
-                        ok
-                end
-            end
-        end;
-        (_, _) -> ok
-    end),
-    couch_db:close(Db).
-
-is_replicator_db_fun() ->
-    {ok, RegExp} = re:compile("^([a-z][a-z0-9\\_\\$()\\+\\-\\/]*/)?_replicator$"),
-    fun
-        (<<"shards/", _/binary>>=DbName) ->
-            match =:= re:run(mem3:dbname(DbName), RegExp, [{capture,none}]);
-        (DbName) ->
-            LocalRepDb = ?l2b(config:get("replicator", "db", "_replicator")),
-            DbName == LocalRepDb
-    end.
-
-get_json_value(Key, Props) ->
-    get_json_value(Key, Props, undefined).
-
-get_json_value(Key, Props, Default) when is_atom(Key) ->
-    Ref = make_ref(),
-    case couch_util:get_value(Key, Props, Ref) of
-        Ref ->
-            couch_util:get_value(?l2b(atom_to_list(Key)), Props, Default);
-        Else ->
-            Else
-    end;
-get_json_value(Key, Props, Default) when is_binary(Key) ->
-    Ref = make_ref(),
-    case couch_util:get_value(Key, Props, Ref) of
-        Ref ->
-            couch_util:get_value(list_to_atom(?b2l(Key)), Props, Default);
-        Else ->
-            Else
-    end.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/550e8202/src/couch_replicator/src/couch_replicator_notifier.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_notifier.erl b/src/couch_replicator/src/couch_replicator_notifier.erl
deleted file mode 100644
index 39fd68b..0000000
--- a/src/couch_replicator/src/couch_replicator_notifier.erl
+++ /dev/null
@@ -1,57 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_notifier).
-
--behaviour(gen_event).
-
-% public API
--export([start_link/1, stop/1, notify/1]).
-
-% gen_event callbacks
--export([init/1, terminate/2, code_change/3]).
--export([handle_event/2, handle_call/2, handle_info/2]).
-
--include_lib("couch/include/couch_db.hrl").
-
-start_link(FunAcc) ->
-    couch_event_sup:start_link(couch_replication,
-        {couch_replicator_notifier, make_ref()}, FunAcc).
-
-notify(Event) ->
-    gen_event:notify(couch_replication, Event).
-
-stop(Pid) ->
-    couch_event_sup:stop(Pid).
-
-
-init(FunAcc) ->
-    {ok, FunAcc}.
-
-terminate(_Reason, _State) ->
-    ok.
-
-handle_event(Event, Fun) when is_function(Fun, 1) ->
-    Fun(Event),
-    {ok, Fun};
-handle_event(Event, {Fun, Acc}) when is_function(Fun, 2) ->
-    Acc2 = Fun(Event, Acc),
-    {ok, {Fun, Acc2}}.
-
-handle_call(_Msg, State) ->
-    {reply, ok, State}.
-
-handle_info(_Msg, State) ->
-    {ok, State}.
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/550e8202/src/couch_replicator/src/couch_replicator_sup.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_sup.erl b/src/couch_replicator/src/couch_replicator_sup.erl
deleted file mode 100644
index bad9747..0000000
--- a/src/couch_replicator/src/couch_replicator_sup.erl
+++ /dev/null
@@ -1,42 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_sup).
--behaviour(supervisor).
--export([start_link/0, init/1]).
-
-start_link() ->
-    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-
-init(_Args) ->
-    Children = [
-        {couch_replication_event,
-            {gen_event, start_link, [{local, couch_replication}]},
-            permanent,
-            brutal_kill,
-            worker,
-            dynamic},
-        {couch_replicator_manager,
-            {couch_replicator_manager, start_link, []},
-            permanent,
-            brutal_kill,
-            worker,
-            [couch_replicator_manager]},
-        {couch_replicator_job_sup,
-            {couch_replicator_job_sup, start_link, []},
-            permanent,
-            infinity,
-            supervisor,
-            [couch_replicator_job_sup]}
-    ],
-    {ok, {{one_for_one,10,3600}, Children}}.
-

http://git-wip-us.apache.org/repos/asf/couchdb/blob/550e8202/src/couch_replicator/src/couch_replicator_utils.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl
deleted file mode 100644
index 9b9dd74..0000000
--- a/src/couch_replicator/src/couch_replicator_utils.erl
+++ /dev/null
@@ -1,432 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_utils).
-
--export([parse_rep_doc/2]).
--export([open_db/1, close_db/1]).
--export([start_db_compaction_notifier/2, stop_db_compaction_notifier/1]).
--export([replication_id/2]).
--export([sum_stats/2, is_deleted/1]).
--export([mp_parse_doc/2]).
-
--include_lib("couch/include/couch_db.hrl").
--include_lib("ibrowse/include/ibrowse.hrl").
--include("couch_replicator_api_wrap.hrl").
--include("couch_replicator.hrl").
-
--import(couch_util, [
-    get_value/2,
-    get_value/3
-]).
-
-
-parse_rep_doc({Props}, UserCtx) ->
-    ProxyParams = parse_proxy_params(get_value(<<"proxy">>, Props, <<>>)),
-    Options = make_options(Props),
-    case get_value(cancel, Options, false) andalso
-        (get_value(id, Options, nil) =/= nil) of
-    true ->
-        {ok, #rep{options = Options, user_ctx = UserCtx}};
-    false ->
-        Source = parse_rep_db(get_value(<<"source">>, Props), ProxyParams, Options),
-        Target = parse_rep_db(get_value(<<"target">>, Props), ProxyParams, Options),
-        Rep = #rep{
-            source = Source,
-            target = Target,
-            options = Options,
-            user_ctx = UserCtx,
-            doc_id = get_value(<<"_id">>, Props, null)
-        },
-        {ok, Rep#rep{id = replication_id(Rep)}}
-    end.
-
-
-replication_id(#rep{options = Options} = Rep) ->
-    BaseId = replication_id(Rep, ?REP_ID_VERSION),
-    {BaseId, maybe_append_options([continuous, create_target], Options)}.
-
-
-% Versioned clauses for generating replication IDs.
-% If a change is made to how replications are identified,
-% please add a new clause and increase ?REP_ID_VERSION.
-
-replication_id(#rep{user_ctx = UserCtx} = Rep, 3) ->
-    UUID = couch_server:get_uuid(),
-    Src = get_rep_endpoint(UserCtx, Rep#rep.source),
-    Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
-    maybe_append_filters([UUID, Src, Tgt], Rep);
-
-replication_id(#rep{user_ctx = UserCtx} = Rep, 2) ->
-    {ok, HostName} = inet:gethostname(),
-    Port = case (catch mochiweb_socket_server:get(couch_httpd, port)) of
-    P when is_number(P) ->
-        P;
-    _ ->
-        % On restart we might be called before the couch_httpd process is
-        % started.
-        % TODO: we might be under an SSL socket server only, or both under
-        % SSL and a non-SSL socket.
-        % ... mochiweb_socket_server:get(https, port)
-        list_to_integer(config:get("httpd", "port", "5984"))
-    end,
-    Src = get_rep_endpoint(UserCtx, Rep#rep.source),
-    Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
-    maybe_append_filters([HostName, Port, Src, Tgt], Rep);
-
-replication_id(#rep{user_ctx = UserCtx} = Rep, 1) ->
-    {ok, HostName} = inet:gethostname(),
-    Src = get_rep_endpoint(UserCtx, Rep#rep.source),
-    Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
-    maybe_append_filters([HostName, Src, Tgt], Rep).
-
-
-maybe_append_filters(Base,
-        #rep{source = Source, user_ctx = UserCtx, options = Options}) ->
-    Base2 = Base ++
-        case get_value(filter, Options) of
-        undefined ->
-            case get_value(doc_ids, Options) of
-            undefined ->
-                [];
-            DocIds ->
-                [DocIds]
-            end;
-        Filter ->
-            [filter_code(Filter, Source, UserCtx),
-                get_value(query_params, Options, {[]})]
-        end,
-    couch_util:to_hex(couch_util:md5(term_to_binary(Base2))).
-
-
-filter_code(Filter, Source, UserCtx) ->
-    {DDocName, FilterName} =
-    case re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]) of
-    {match, [DDocName0, FilterName0]} ->
-        {DDocName0, FilterName0};
-    _ ->
-        throw({error, <<"Invalid filter. Must match `ddocname/filtername`.">>})
-    end,
-    Db = case (catch couch_replicator_api_wrap:db_open(Source, [{user_ctx, UserCtx}])) of
-    {ok, Db0} ->
-        Db0;
-    DbError ->
-        DbErrorMsg = io_lib:format("Could not open source database `~s`: ~s",
-           [couch_replicator_api_wrap:db_uri(Source), couch_util:to_binary(DbError)]),
-        throw({error, iolist_to_binary(DbErrorMsg)})
-    end,
-    try
-        Body = case (catch couch_replicator_api_wrap:open_doc(
-            Db, <<"_design/", DDocName/binary>>, [ejson_body])) of
-        {ok, #doc{body = Body0}} ->
-            Body0;
-        DocError ->
-            DocErrorMsg = io_lib:format(
-                "Couldn't open document `_design/~s` from source "
-                "database `~s`: ~s", [DDocName, couch_replicator_api_wrap:db_uri(Source),
-                    couch_util:to_binary(DocError)]),
-            throw({error, iolist_to_binary(DocErrorMsg)})
-        end,
-        Code = couch_util:get_nested_json_value(
-            Body, [<<"filters">>, FilterName]),
-        re:replace(Code, [$^, "\s*(.*?)\s*", $$], "\\1", [{return, binary}])
-    after
-        couch_replicator_api_wrap:db_close(Db)
-    end.
-
-
-maybe_append_options(Options, RepOptions) ->
-    lists:foldl(fun(Option, Acc) ->
-        Acc ++
-        case get_value(Option, RepOptions, false) of
-        true ->
-            "+" ++ atom_to_list(Option);
-        false ->
-            ""
-        end
-    end, [], Options).
-
-
-get_rep_endpoint(_UserCtx, #httpdb{url=Url, headers=Headers, oauth=OAuth}) ->
-    DefaultHeaders = (#httpdb{})#httpdb.headers,
-    case OAuth of
-    nil ->
-        {remote, Url, Headers -- DefaultHeaders};
-    #oauth{} ->
-        {remote, Url, Headers -- DefaultHeaders, OAuth}
-    end;
-get_rep_endpoint(UserCtx, <<DbName/binary>>) ->
-    {local, DbName, UserCtx}.
-
-
-parse_rep_db({Props}, ProxyParams, Options) ->
-    Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)),
-    {AuthProps} = get_value(<<"auth">>, Props, {[]}),
-    {BinHeaders} = get_value(<<"headers">>, Props, {[]}),
-    Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]),
-    DefaultHeaders = (#httpdb{})#httpdb.headers,
-    OAuth = case get_value(<<"oauth">>, AuthProps) of
-    undefined ->
-        nil;
-    {OauthProps} ->
-        #oauth{
-            consumer_key = ?b2l(get_value(<<"consumer_key">>, OauthProps)),
-            token = ?b2l(get_value(<<"token">>, OauthProps)),
-            token_secret = ?b2l(get_value(<<"token_secret">>, OauthProps)),
-            consumer_secret = ?b2l(get_value(<<"consumer_secret">>, OauthProps)),
-            signature_method =
-                case get_value(<<"signature_method">>, OauthProps) of
-                undefined ->        hmac_sha1;
-                <<"PLAINTEXT">> ->  plaintext;
-                <<"HMAC-SHA1">> ->  hmac_sha1;
-                <<"RSA-SHA1">> ->   rsa_sha1
-                end
-        }
-    end,
-    #httpdb{
-        url = Url,
-        oauth = OAuth,
-        headers = lists:ukeymerge(1, Headers, DefaultHeaders),
-        ibrowse_options = lists:keysort(1,
-            [{socket_options, get_value(socket_options, Options)} |
-                ProxyParams ++ ssl_params(Url)]),
-        timeout = get_value(connection_timeout, Options),
-        http_connections = get_value(http_connections, Options),
-        retries = get_value(retries, Options)
-    };
-parse_rep_db(<<"http://", _/binary>> = Url, ProxyParams, Options) ->
-    parse_rep_db({[{<<"url">>, Url}]}, ProxyParams, Options);
-parse_rep_db(<<"https://", _/binary>> = Url, ProxyParams, Options) ->
-    parse_rep_db({[{<<"url">>, Url}]}, ProxyParams, Options);
-parse_rep_db(<<DbName/binary>>, _ProxyParams, _Options) ->
-    DbName.
-
-
-maybe_add_trailing_slash(Url) when is_binary(Url) ->
-    maybe_add_trailing_slash(?b2l(Url));
-maybe_add_trailing_slash(Url) ->
-    case lists:last(Url) of
-    $/ ->
-        Url;
-    _ ->
-        Url ++ "/"
-    end.
-
-
-make_options(Props) ->
-    Options = lists:ukeysort(1, convert_options(Props)),
-    DefWorkers = config:get("replicator", "worker_processes", "4"),
-    DefBatchSize = config:get("replicator", "worker_batch_size", "500"),
-    DefConns = config:get("replicator", "http_connections", "20"),
-    DefTimeout = config:get("replicator", "connection_timeout", "30000"),
-    DefRetries = config:get("replicator", "retries_per_request", "10"),
-    UseCheckpoints = config:get("replicator", "use_checkpoints", "true"),
-    {ok, DefSocketOptions} = couch_util:parse_term(
-        config:get("replicator", "socket_options",
-            "[{keepalive, true}, {nodelay, false}]")),
-    lists:ukeymerge(1, Options, lists:keysort(1, [
-        {connection_timeout, list_to_integer(DefTimeout)},
-        {retries, list_to_integer(DefRetries)},
-        {http_connections, list_to_integer(DefConns)},
-        {socket_options, DefSocketOptions},
-        {worker_batch_size, list_to_integer(DefBatchSize)},
-        {worker_processes, list_to_integer(DefWorkers)},
-        {use_checkpoints, list_to_existing_atom(UseCheckpoints)}
-    ])).
-
-
-convert_options([])->
-    [];
-convert_options([{<<"cancel">>, V} | R]) ->
-    [{cancel, V} | convert_options(R)];
-convert_options([{IdOpt, V} | R]) when IdOpt =:= <<"_local_id">>;
-        IdOpt =:= <<"replication_id">>; IdOpt =:= <<"id">> ->
-    Id = lists:splitwith(fun(X) -> X =/= $+ end, ?b2l(V)),
-    [{id, Id} | convert_options(R)];
-convert_options([{<<"create_target">>, V} | R]) ->
-    [{create_target, V} | convert_options(R)];
-convert_options([{<<"continuous">>, V} | R]) ->
-    [{continuous, V} | convert_options(R)];
-convert_options([{<<"filter">>, V} | R]) ->
-    [{filter, V} | convert_options(R)];
-convert_options([{<<"query_params">>, V} | R]) ->
-    [{query_params, V} | convert_options(R)];
-convert_options([{<<"doc_ids">>, null} | R]) ->
-    convert_options(R);
-convert_options([{<<"doc_ids">>, V} | R]) ->
-    % Ensure same behaviour as old replicator: accept a list of percent
-    % encoded doc IDs.
-    DocIds = [?l2b(couch_httpd:unquote(Id)) || Id <- V],
-    [{doc_ids, DocIds} | convert_options(R)];
-convert_options([{<<"worker_processes">>, V} | R]) ->
-    [{worker_processes, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"worker_batch_size">>, V} | R]) ->
-    [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"http_connections">>, V} | R]) ->
-    [{http_connections, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"connection_timeout">>, V} | R]) ->
-    [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"retries_per_request">>, V} | R]) ->
-    [{retries, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"socket_options">>, V} | R]) ->
-    {ok, SocketOptions} = couch_util:parse_term(V),
-    [{socket_options, SocketOptions} | convert_options(R)];
-convert_options([{<<"since_seq">>, V} | R]) ->
-    [{since_seq, V} | convert_options(R)];
-convert_options([{<<"use_checkpoints">>, V} | R]) ->
-    [{use_checkpoints, V} | convert_options(R)];
-convert_options([_ | R]) -> % skip unknown option
-    convert_options(R).
-
-
-parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
-    parse_proxy_params(?b2l(ProxyUrl));
-parse_proxy_params([]) ->
-    [];
-parse_proxy_params(ProxyUrl) ->
-    #url{
-        host = Host,
-        port = Port,
-        username = User,
-        password = Passwd
-    } = ibrowse_lib:parse_url(ProxyUrl),
-    [{proxy_host, Host}, {proxy_port, Port}] ++
-        case is_list(User) andalso is_list(Passwd) of
-        false ->
-            [];
-        true ->
-            [{proxy_user, User}, {proxy_password, Passwd}]
-        end.
-
-
-ssl_params(Url) ->
-    case ibrowse_lib:parse_url(Url) of
-    #url{protocol = https} ->
-        Depth = list_to_integer(
-            config:get("replicator", "ssl_certificate_max_depth", "3")
-        ),
-        VerifyCerts = config:get("replicator", "verify_ssl_certificates"),
-        CertFile = config:get("replicator", "cert_file", nil),
-        KeyFile = config:get("replicator", "key_file", nil),
-        Password = config:get("replicator", "password", nil),
-        SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts =:= "true")],
-        SslOpts1 = case CertFile /= nil andalso KeyFile /= nil of
-            true ->
-                case Password of
-                    nil ->
-                        [{certfile, CertFile}, {keyfile, KeyFile}] ++ SslOpts;
-                    _ ->
-                        [{certfile, CertFile}, {keyfile, KeyFile},
-                            {password, Password}] ++ SslOpts
-                end;
-            false -> SslOpts
-        end,
-        [{is_ssl, true}, {ssl_options, SslOpts1}];
-    #url{protocol = http} ->
-        []
-    end.
-
-ssl_verify_options(Value) ->
-    ssl_verify_options(Value, erlang:system_info(otp_release)).
-
-ssl_verify_options(true, OTPVersion) when OTPVersion >= "R14" ->
-    CAFile = config:get("replicator", "ssl_trusted_certificates_file"),
-    [{verify, verify_peer}, {cacertfile, CAFile}];
-ssl_verify_options(false, OTPVersion) when OTPVersion >= "R14" ->
-    [{verify, verify_none}];
-ssl_verify_options(true, _OTPVersion) ->
-    CAFile = config:get("replicator", "ssl_trusted_certificates_file"),
-    [{verify, 2}, {cacertfile, CAFile}];
-ssl_verify_options(false, _OTPVersion) ->
-    [{verify, 0}].
-
-
-%% New db record has Options field removed here to enable smoother dbcore migration
-open_db(#db{name = Name, user_ctx = UserCtx}) ->
-    {ok, Db} = couch_db:open(Name, [{user_ctx, UserCtx} | []]),
-    Db;
-open_db(HttpDb) ->
-    HttpDb.
-
-
-close_db(#db{} = Db) ->
-    couch_db:close(Db);
-close_db(_HttpDb) ->
-    ok.
-
-
-start_db_compaction_notifier(#db{name = DbName}, Server) ->
-    {ok, Notifier} = couch_db_update_notifier:start_link(
-        fun({compacted, DbName1}) when DbName1 =:= DbName ->
-                ok = gen_server:cast(Server, {db_compacted, DbName});
-            (_) ->
-                ok
-        end),
-    Notifier;
-start_db_compaction_notifier(_, _) ->
-    nil.
-
-
-stop_db_compaction_notifier(nil) ->
-    ok;
-stop_db_compaction_notifier(Notifier) ->
-    couch_db_update_notifier:stop(Notifier).
-
-
-sum_stats(#rep_stats{} = S1, #rep_stats{} = S2) ->
-    #rep_stats{
-        missing_checked =
-            S1#rep_stats.missing_checked + S2#rep_stats.missing_checked,
-        missing_found = S1#rep_stats.missing_found + S2#rep_stats.missing_found,
-        docs_read = S1#rep_stats.docs_read + S2#rep_stats.docs_read,
-        docs_written = S1#rep_stats.docs_written + S2#rep_stats.docs_written,
-        doc_write_failures =
-            S1#rep_stats.doc_write_failures + S2#rep_stats.doc_write_failures
-    }.
-
-mp_parse_doc({headers, H}, []) ->
-    case couch_util:get_value("content-type", H) of
-    {"application/json", _} ->
-        fun (Next) ->
-            mp_parse_doc(Next, [])
-        end
-    end;
-mp_parse_doc({body, Bytes}, AccBytes) ->
-    fun (Next) ->
-        mp_parse_doc(Next, [Bytes | AccBytes])
-    end;
-mp_parse_doc(body_end, AccBytes) ->
-    receive {get_doc_bytes, Ref, From} ->
-        From ! {doc_bytes, Ref, lists:reverse(AccBytes)}
-    end,
-    fun mp_parse_atts/1.
-
-mp_parse_atts(eof) ->
-    ok;
-mp_parse_atts({headers, _H}) ->
-    fun mp_parse_atts/1;
-mp_parse_atts({body, Bytes}) ->
-    receive {get_bytes, Ref, From} ->
-        From ! {bytes, Ref, Bytes}
-    end,
-    fun mp_parse_atts/1;
-mp_parse_atts(body_end) ->
-    fun mp_parse_atts/1.
-
-is_deleted(Change) ->
-    case couch_util:get_value(<<"deleted">>, Change) of
-    undefined ->
-        % keep backwards compatibility for a while
-        couch_util:get_value(deleted, Change, false);
-    Else ->
-        Else
-    end.