You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@couchdb.apache.org by GitBox <gi...@apache.org> on 2018/03/05 18:29:31 UTC

[GitHub] nickva closed pull request #1176: Implement pluggable authentication and session support for replicator

nickva closed pull request #1176: Implement pluggable authentication and session support for replicator
URL: https://github.com/apache/couchdb/pull/1176
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 4017a0c228..03f4d14e06 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -430,6 +430,21 @@ ssl_certificate_max_depth = 3
 ; Re-check cluster state at least every cluster_quiet_period seconds
 ; cluster_quiet_period = 60
 
+; List of replicator client authentication plugins to try. Plugins will be
+; tried in order. The first to initialize successfully will be used for that
+; particular endpoint (source or target). Normally couch_replicator_auth_noop
+; would be used at the end of the list as a "catch-all". It doesn't do anything
+; and effectively implements the previous behavior of using basic auth.
+; There are currently two plugins available:
+;   couch_replicator_auth_session - use _session cookie authentication
+;   couch_replicator_auth_noop - use basic authentication (previous default)
+; Currently previous default behavior is still the default. To start using
+; session auth, use this as the list of plugins:
+; `couch_replicator_auth_session,couch_replicator_auth_noop`.
+; In a future release the session plugin might be used by default.
+;auth_plugins = couch_replicator_auth_noop
+
+
 [compaction_daemon]
 ; The delay, in seconds, between each check for which database and view indexes
 ; need to be compacted.
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.hrl b/src/couch_replicator/include/couch_replicator_api_wrap.hrl
similarity index 86%
rename from src/couch_replicator/src/couch_replicator_api_wrap.hrl
rename to src/couch_replicator/include/couch_replicator_api_wrap.hrl
index d2e0fdff5c..0f8213c515 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.hrl
+++ b/src/couch_replicator/include/couch_replicator_api_wrap.hrl
@@ -14,7 +14,7 @@
 
 -record(httpdb, {
     url,
-    oauth = nil,
+    auth_props = [],
     headers = [
         {"Accept", "application/json"},
         {"User-Agent", "CouchDB-Replicator/" ++ couch_server:get_version()}
@@ -26,13 +26,6 @@
     httpc_pool = nil,
     http_connections,
     first_error_timestamp = nil,
-    proxy_url
-}).
-
--record(oauth, {
-    consumer_key,
-    token,
-    token_secret,
-    consumer_secret,
-    signature_method
+    proxy_url,
+    auth_context = nil
 }).
diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index 8b7cd5cb19..39141c3017 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -25,7 +25,7 @@
 
 -include_lib("couch/include/couch_db.hrl").
 -include("couch_replicator.hrl").
--include("couch_replicator_api_wrap.hrl").
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 -include_lib("mem3/include/mem3.hrl").
 
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index b5ea57c3c9..44c290d33b 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -142,7 +142,8 @@ db_open(DbName, Options, Create, _CreateParams) ->
         throw({unauthorized, DbName})
     end.
 
-db_close(#httpdb{httpc_pool = Pool}) ->
+db_close(#httpdb{httpc_pool = Pool} = HttpDb) ->
+    couch_replicator_auth:cleanup(HttpDb),
     unlink(Pool),
     ok = couch_replicator_httpc_pool:stop(Pool);
 db_close(DbName) ->
@@ -1009,7 +1010,7 @@ header_value(Key, Headers, Default) ->
 normalize_db(#httpdb{} = HttpDb) ->
     #httpdb{
         url = HttpDb#httpdb.url,
-        oauth = HttpDb#httpdb.oauth,
+        auth_props = lists:sort(HttpDb#httpdb.auth_props),
         headers = lists:keysort(1, HttpDb#httpdb.headers),
         timeout = HttpDb#httpdb.timeout,
         ibrowse_options = lists:keysort(1, HttpDb#httpdb.ibrowse_options),
@@ -1037,7 +1038,7 @@ maybe_append_create_query_params(Db, CreateParams) ->
 normalize_http_db_test() ->
     HttpDb =  #httpdb{
         url = "http://host/db",
-        oauth = #oauth{},
+        auth_props = [{"key", "val"}],
         headers = [{"k2","v2"}, {"k1","v1"}],
         timeout = 30000,
         ibrowse_options = [{k2, v2}, {k1, v1}],
diff --git a/src/couch_replicator/src/couch_replicator_auth.erl b/src/couch_replicator/src/couch_replicator_auth.erl
new file mode 100644
index 0000000000..1c9a497232
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_auth.erl
@@ -0,0 +1,99 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_auth).
+
+
+-export([
+    initialize/1,
+    update_headers/2,
+    handle_response/3,
+    cleanup/1
+]).
+
+
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
+
+
+-type headers() :: [{string(), string()}].
+-type code() :: non_neg_integer().
+
+
+-define(DEFAULT_PLUGINS, "couch_replicator_auth_noop").
+
+
+% Behavior API
+
+-callback initialize(#httpdb{}) -> {ok, #httpdb{}, term()} | ignore.
+
+-callback update_headers(term(), headers()) -> {headers(), term()}.
+
+-callback handle_response(term(), code(), headers()) ->
+    {continue | retry, term()}.
+
+-callback cleanup(term()) -> ok.
+
+
+% Main API
+
+-spec initialize(#httpdb{}) -> {ok, #httpdb{}} | {error, term()}.
+initialize(#httpdb{auth_context = nil} = HttpDb) ->
+    case try_initialize(get_plugin_modules(), HttpDb) of
+        {ok, Mod, HttpDb1, Context} ->
+            {ok, HttpDb1#httpdb{auth_context = {Mod, Context}}};
+        {error, Error} ->
+            {error, Error}
+    end.
+
+
+-spec update_headers(#httpdb{}, headers()) -> {headers(), #httpdb{}}.
+update_headers(#httpdb{auth_context = {Mod, Context}} = HttpDb, Headers) ->
+    {Headers1, Context1} = Mod:update_headers(Context, Headers),
+    {Headers1, HttpDb#httpdb{auth_context = {Mod, Context1}}}.
+
+
+-spec handle_response(#httpdb{}, code(), headers()) ->
+    {continue | retry, term()}.
+handle_response(#httpdb{} = HttpDb, Code, Headers) ->
+    {Mod, Context} = HttpDb#httpdb.auth_context,
+    {Res, Context1} = Mod:handle_response(Context, Code, Headers),
+    {Res, HttpDb#httpdb{auth_context = {Mod, Context1}}}.
+
+
+-spec cleanup(#httpdb{}) -> #httpdb{}.
+cleanup(#httpdb{auth_context = {Module, Context}} = HttpDb) ->
+    ok = Module:cleanup(Context),
+    HttpDb#httpdb{auth_context = nil}.
+
+
+% Private helper functions
+
+-spec get_plugin_modules() -> [atom()].
+get_plugin_modules() ->
+    Plugins1 = config:get("replicator", "auth_plugins", ?DEFAULT_PLUGINS),
+    [list_to_atom(Plugin) || Plugin <- string:tokens(Plugins1, ",")].
+
+
+try_initialize([], _HttpDb) ->
+    {error, no_more_auth_plugins_left_to_try};
+try_initialize([Mod | Modules], HttpDb) ->
+    try Mod:initialize(HttpDb) of
+        {ok, HttpDb1, Context} ->
+            {ok, Mod, HttpDb1, Context};
+        ignore ->
+            try_initialize(Modules, HttpDb);
+        {error, Error} ->
+            {error, Error}
+    catch
+        error:undef ->
+            {error, {could_not_load_plugin_module, Mod}}
+    end.
diff --git a/src/couch_replicator/src/couch_replicator_auth_noop.erl b/src/couch_replicator/src/couch_replicator_auth_noop.erl
new file mode 100644
index 0000000000..5dbf13335c
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_auth_noop.erl
@@ -0,0 +1,52 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_auth_noop).
+
+
+-behavior(couch_replicator_auth).
+
+
+-export([
+    initialize/1,
+    update_headers/2,
+    handle_response/3,
+    cleanup/1
+]).
+
+
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
+
+
+-type headers() :: [{string(), string()}].
+-type code() :: non_neg_integer().
+
+
+-spec initialize(#httpdb{}) -> {ok, #httpdb{}, term()} | ignore.
+initialize(#httpdb{} = HttpDb) ->
+    {ok, HttpDb, nil}.
+
+
+-spec update_headers(term(), headers()) -> {headers(), term()}.
+update_headers(Context, Headers) ->
+    {Headers, Context}.
+
+
+-spec handle_response(term(), code(), headers()) ->
+    {continue | retry, term()}.
+handle_response(Context, _Code, _Headers) ->
+    {continue, Context}.
+
+
+-spec cleanup(term()) -> ok.
+cleanup(_Context) ->
+    ok.
diff --git a/src/couch_replicator/src/couch_replicator_auth_session.erl b/src/couch_replicator/src/couch_replicator_auth_session.erl
new file mode 100644
index 0000000000..3fff295725
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_auth_session.erl
@@ -0,0 +1,692 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+% This is the replicator session auth plugin. It implements session based
+% authentication for the replicator. The only public API are the functions from
+% the couch_replicator_auth behaviour. Most of the logic and state is in the
+% gen_server. An instance of a gen_server could be spawned for the source and
+% target endpoints of each replication jobs.
+%
+% The workflow is roughly this:
+%
+%  * On initialization, try to get a cookie in `refresh/1` If an error occurs,
+%    the crash. If `_session` endpoint fails with a 404 (not found), return
+%    `ignore` assuming session authentication is not support or we simply hit a
+%    non-CouchDb server.
+%
+%  * Before each request, auth framework calls `update_headers` API function.
+%    Before updating the headers and returning, check if need to refresh again.
+%    The check looks `next_refresh` time. If that time is set (not `infinity`)
+%    and just expired, then obtain a new cookie, then update headers and
+%    return.
+%
+%  * After each request, auth framework calls `handle_response` function. If
+%    request was successful check if a new cookie was sent by the server in the
+%    `Set-Cookie` header. If it was then then that becomes the current cookie.
+%
+%  * If last request has an auth failure, check if request used a stale cookie
+%    In this case nothing is done, and the client is told to retry. Next time
+%    it updates its headers befor the request it should pick up the latest
+%    cookie.
+%
+%  * If last request failed and cookie was the latest known cookie, schedule a
+%    refresh and tell client to retry. However, if the cookie was just updated,
+%    tell the client to continue such that it will handle the auth failure on
+%    its own via a set of retries with exponential backoffs. This is it to
+%    ensure if something goes wrong and one of the endpoints issues invalid
+%    cookies, replicator won't be stuck in a busy loop refreshing them.
+
+
+-module(couch_replicator_auth_session).
+
+
+-behaviour(couch_replicator_auth).
+-behaviour(gen_server).
+
+
+-export([
+    initialize/1,
+    update_headers/2,
+    handle_response/3,
+    cleanup/1
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3,
+    format_status/2
+]).
+
+
+-include_lib("ibrowse/include/ibrowse.hrl").
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
+
+
+-type headers() :: [{string(), string()}].
+-type code() :: non_neg_integer().
+-type creds() :: {string() | undefined, string() | undefined}.
+
+
+-define(MIN_UPDATE_INTERVAL, 5).
+
+
+-record(state, {
+    epoch = 0 :: non_neg_integer(),
+    cookie :: string() | undefined,
+    user :: string() | undefined,
+    pass :: string() | undefined,
+    httpdb_timeout :: integer(),
+    httpdb_pool :: pid(),
+    httpdb_ibrowse_options = [] :: list(),
+    session_url :: string(),
+    next_refresh = infinity :: infinity |  non_neg_integer(),
+    refresh_tstamp = 0 :: non_neg_integer()
+}).
+
+
+% Behavior API callbacks
+
+-spec initialize(#httpdb{}) -> {ok, #httpdb{}, term()} | ignore.
+initialize(#httpdb{} = HttpDb) ->
+    case init_state(HttpDb) of
+        {ok, HttpDb1, State} ->
+            {ok, Pid} = gen_server:start_link(?MODULE, [State], []),
+            Epoch = State#state.epoch,
+            Timeout = State#state.httpdb_timeout,
+            {ok, HttpDb1, {Pid, Epoch, Timeout}};
+        {error, Error} ->
+            {error, Error};
+        ignore ->
+            ignore
+    end.
+
+
+-spec update_headers(term(), headers()) -> {headers(), term()}.
+update_headers({Pid, Epoch, Timeout}, Headers) ->
+    Args = {update_headers, Headers, Epoch},
+    {Headers1, Epoch1} = gen_server:call(Pid, Args, Timeout * 10),
+    {Headers1, {Pid, Epoch1, Timeout}}.
+
+
+-spec handle_response(term(), code(), headers()) ->
+    {continue | retry, term()}.
+handle_response({Pid, Epoch, Timeout}, Code, Headers) ->
+    Args =  {handle_response, Code, Headers, Epoch},
+    {Retry, Epoch1} = gen_server:call(Pid, Args, Timeout * 10),
+    {Retry, {Pid, Epoch1, Timeout}}.
+
+
+-spec cleanup(term()) -> ok.
+cleanup({Pid, _Epoch, Timeout}) ->
+    gen_server:call(Pid, stop, Timeout * 10).
+
+
+%% gen_server functions
+
+init([#state{} = State]) ->
+    {ok, State}.
+
+
+terminate(_Reason, _State) ->
+    ok.
+
+
+handle_call({update_headers, Headers, _Epoch}, _From, State) ->
+    case maybe_refresh(State) of
+        {ok, State1} ->
+            Cookie = "AuthSession=" ++ State1#state.cookie,
+            Headers1 = [{"Cookie", Cookie} | Headers],
+            {reply, {Headers1, State1#state.epoch}, State1};
+        {error, Error} ->
+            LogMsg = "~p: Stopping session auth plugin because of error ~p",
+            couch_log:error(LogMsg, [?MODULE, Error]),
+            {stop, Error, State}
+    end;
+
+handle_call({handle_response, Code, Headers, Epoch}, _From, State) ->
+    {Retry, State1} = process_response(Code, Headers, Epoch, State),
+    {reply, {Retry, State1#state.epoch}, State1};
+
+handle_call(stop, _From, State) ->
+    {stop, normal, ok, State}.
+
+
+handle_cast(Msg, State) ->
+    couch_log:error("~p: Received un-expected cast ~p", [?MODULE, Msg]),
+    {noreply, State}.
+
+
+handle_info(Msg, State) ->
+    couch_log:error("~p : Received un-expected message ~p", [?MODULE, Msg]),
+    {noreply, State}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+format_status(_Opt, [_PDict, State]) ->
+    [
+        {epoch, State#state.epoch},
+        {user, State#state.user},
+        {session_url, State#state.session_url},
+        {refresh_tstamp, State#state.refresh_tstamp}
+    ].
+
+
+%% Private helper functions
+
+
+-spec init_state(#httpdb{}) ->
+    {ok, #httpdb{}, #state{}} | {error, term()} | ignore.
+init_state(#httpdb{} = HttpDb) ->
+    case extract_creds(HttpDb) of
+        {ok, User, Pass, HttpDb1} ->
+            State = #state{
+                user = User,
+                pass = Pass,
+                session_url = get_session_url(HttpDb1#httpdb.url),
+                httpdb_pool = HttpDb1#httpdb.httpc_pool,
+                httpdb_timeout = HttpDb1#httpdb.timeout,
+                httpdb_ibrowse_options = HttpDb1#httpdb.ibrowse_options
+            },
+            case refresh(State) of
+                {ok, State1} ->
+                    {ok, HttpDb1, State1};
+                {error, {session_not_supported, _, _}} ->
+                    ignore;
+                {error, Error} ->
+                    {error, Error}
+            end;
+        {error, missing_credentials} ->
+            ignore;
+        {error, Error} ->
+            {error, Error}
+    end.
+
+
+-spec extract_creds(#httpdb{}) ->
+    {ok, string(), string(), #httpdb{}} | {error, term()}.
+extract_creds(#httpdb{url = Url, headers = Headers} = HttpDb) ->
+    {{HeadersUser, HeadersPass}, HeadersNoCreds} =
+            couch_replicator_utils:remove_basic_auth_from_headers(Headers),
+    case extract_creds_from_url(Url) of
+        {ok, UrlUser, UrlPass, UrlNoCreds} ->
+            case pick_creds({UrlUser, UrlPass}, {HeadersUser, HeadersPass}) of
+                {ok, User, Pass} ->
+                    HttpDb1 = HttpDb#httpdb{
+                        url = UrlNoCreds,
+                        headers = HeadersNoCreds
+                    },
+                    {ok, User, Pass, HttpDb1};
+                {error, Error} ->
+                    {error, Error}
+            end;
+        {error, Error} ->
+            {error, Error}
+    end.
+
+
+% Credentials could be specified in the url and/or in the headers.
+%  * If no credentials specified return error.
+%  * If specified in url but not in headers, pick url creds.
+%  * Otherwise pick headers creds.
+%
+-spec pick_creds(creds(), creds()) ->
+    {ok, string(), string()} | {error, missing_credentials}.
+pick_creds({undefined, _}, {undefined, _}) ->
+    {error, missing_credentials};
+pick_creds({UrlUser, UrlPass}, {undefined, _}) ->
+    {ok, UrlUser, UrlPass};
+pick_creds({_, _}, {HeadersUser, HeadersPass}) ->
+    {ok, HeadersUser, HeadersPass}.
+
+
+-spec extract_creds_from_url(string()) ->
+    {ok, string() | undefined, string() | undefined, string()} |
+    {error, term()}.
+extract_creds_from_url(Url) ->
+    case ibrowse_lib:parse_url(Url) of
+        {error, Error} ->
+            {error, Error};
+        #url{username = undefined, password = undefined} ->
+            {ok, undefined, undefined, Url};
+        #url{protocol = Proto, username = User, password = Pass} ->
+            % Excise user and pass parts from the url. Try to keep the host,
+            % port and path as they were in the original.
+            Prefix = lists:concat([Proto, "://", User, ":", Pass, "@"]),
+            Suffix = lists:sublist(Url, length(Prefix) + 1, length(Url) + 1),
+            NoCreds = lists:concat([Proto, "://", Suffix]),
+            {ok, User, Pass, NoCreds}
+    end.
+
+
+-spec process_response(non_neg_integer(), headers(),
+    non_neg_integer(), #state{}) -> {retry | continue, #state{}}.
+process_response(403, _Headers, Epoch, State) ->
+    process_auth_failure(Epoch, State);
+process_response(401, _Headers, Epoch, State) ->
+    process_auth_failure(Epoch, State);
+process_response(Code, Headers, _Epoch, State) when Code >= 200, Code < 300 ->
+    % If server noticed cookie is about to time out it can send a new cookie in
+    % the response headers. Take advantage of that and refresh the cookie.
+    State1 = case maybe_update_cookie(Headers, State) of
+        {ok, UpdatedState} ->
+            UpdatedState;
+        {error, cookie_not_found} ->
+            State;
+        {error, Other} ->
+            LogMsg = "~p : Could not parse cookie from response headers ~p",
+            couch_log:error(LogMsg, [?MODULE, Other]),
+            State
+    end,
+    {continue, State1};
+process_response(_Code, _Headers, _Epoch, State) ->
+    {continue, State}.
+
+
+-spec process_auth_failure(non_neg_integer(), #state{}) ->
+    {retry | continue, #state{}}.
+process_auth_failure(Epoch, #state{epoch = StateEpoch} = State)
+        when StateEpoch > Epoch ->
+    % This request used an outdated cookie, tell it to immediately retry
+    % and it will pick up the current cookie when its headers are updated
+    {retry, State};
+process_auth_failure(Epoch, #state{epoch = Epoch} = State) ->
+    MinInterval = min_update_interval(),
+    case cookie_age_sec(State, now_sec()) of
+        AgeSec when AgeSec < MinInterval ->
+            % A recently acquired cookie failed. Schedule a refresh and
+            % return `continue` to let httpc's retry apply a backoff
+            {continue, schedule_refresh(now_sec() + MinInterval, State)};
+        _AgeSec ->
+            % Current cookie failed auth. Schedule refresh and ask
+            % httpc to retry the request.
+            {retry, schedule_refresh(now_sec(), State)}
+    end.
+
+
+-spec get_session_url(string()) -> string().
+get_session_url(Url) ->
+    #url{
+        protocol = Proto,
+        host = Host,
+        port = Port
+    } = ibrowse_lib:parse_url(Url),
+    WithPort = lists:concat([Proto, "://", Host, ":", Port]),
+    case lists:prefix(WithPort, Url) of
+        true ->
+            % Explicit port specified in the original url
+            WithPort ++ "/_session";
+        false ->
+            % Implicit proto default port was used
+            lists:concat([Proto, "://", Host, "/_session"])
+    end.
+
+
+-spec schedule_refresh(non_neg_integer(), #state{}) -> #state{}.
+schedule_refresh(T, #state{next_refresh = Tc} = State) when T < Tc ->
+    State#state{next_refresh = T};
+schedule_refresh(_, #state{} = State) ->
+    State.
+
+
+-spec maybe_refresh(#state{}) -> {ok, #state{}} | {error, term()}.
+maybe_refresh(#state{next_refresh = T} = State) ->
+    case now_sec() >= T of
+        true ->
+            refresh(State#state{next_refresh = infinity});
+        false ->
+            {ok, State}
+    end.
+
+
+-spec refresh(#state{}) -> {ok, #state{}} | {error, term()}.
+refresh(#state{session_url = Url, user = User, pass = Pass} = State) ->
+    Body =  mochiweb_util:urlencode([{name, User}, {password, Pass}]),
+    Headers = [{"Content-Type", "application/x-www-form-urlencoded"}],
+    Result = http_request(State, Url, Headers, post, Body),
+    http_response(Result, State).
+
+
+-spec http_request(#state{}, string(), headers(), atom(), iolist()) ->
+    {ok, string(), headers(), binary()} | {error, term()}.
+http_request(#state{httpdb_pool = Pool} = State, Url, Headers, Method, Body) ->
+    Timeout = State#state.httpdb_timeout,
+    Opts = [
+        {response_format, binary},
+        {inactivity_timeout, Timeout}
+        | State#state.httpdb_ibrowse_options
+    ],
+    {ok, Wrk} = couch_replicator_httpc_pool:get_worker(Pool),
+    try
+        ibrowse:send_req_direct(Wrk, Url, Headers, Method, Body, Opts, Timeout)
+    after
+        ok = couch_replicator_httpc_pool:release_worker(Pool, Wrk)
+    end.
+
+
+-spec http_response({ok, string(), headers(), binary()} | {error, term()},
+    #state{}) -> {ok, #state{}} | {error, term()}.
+http_response({ok, "200", Headers, _}, State) ->
+    maybe_update_cookie(Headers, State);
+http_response({ok, "401", _, _}, #state{session_url = Url, user = User}) ->
+    {error, {session_request_unauthorized, Url, User}};
+http_response({ok, "403", _, _}, #state{session_url = Url, user = User}) ->
+    {error, {session_request_forbidden, Url, User}};
+http_response({ok, "404", _, _}, #state{session_url = Url, user = User}) ->
+    {error, {session_not_supported, Url, User}};
+http_response({ok, Code, _, _}, #state{session_url = Url, user = User}) ->
+    {error, {session_unexpected_result, Code, Url, User}};
+http_response({error, Error}, #state{session_url = Url, user = User}) ->
+    {error, {session_request_failed, Url, User, Error}}.
+
+
+-spec parse_cookie(list()) -> {ok, string()} | {error, term()}.
+parse_cookie(Headers0) ->
+    Headers = mochiweb_headers:make(Headers0),
+    case mochiweb_headers:get_value("Set-Cookie", Headers) of
+        undefined ->
+            {error, cookie_not_found};
+        CookieHeader ->
+            CookieKVs = mochiweb_cookies:parse_cookie(CookieHeader),
+            CaseInsKVs = mochiweb_headers:make(CookieKVs),
+            case mochiweb_headers:get_value("AuthSession", CaseInsKVs) of
+                undefined ->
+                    {error, cookie_format_invalid};
+                Cookie ->
+                    {ok, Cookie}
+            end
+    end.
+
+
+-spec maybe_update_cookie(headers(), #state{}) ->
+    {ok, string()} | {error, term()}.
+maybe_update_cookie(ResponseHeaders, State) ->
+    case parse_cookie(ResponseHeaders) of
+        {ok, Cookie} ->
+            {ok, update_cookie(State, Cookie, now_sec())};
+        {error, Error} ->
+            {error, Error}
+    end.
+
+
+-spec update_cookie(#state{}, string(), non_neg_integer()) -> #state{}.
+update_cookie(#state{cookie = Cookie} = State, Cookie, _) ->
+    State;
+update_cookie(#state{epoch = Epoch} = State, Cookie, NowSec) ->
+    State#state{
+        epoch = Epoch + 1,
+        cookie = Cookie,
+        refresh_tstamp = NowSec
+    }.
+
+
+-spec cookie_age_sec(#state{}, non_neg_integer()) -> non_neg_integer().
+cookie_age_sec(#state{refresh_tstamp = RefreshTs}, Now) ->
+    max(0, Now - RefreshTs).
+
+
+-spec now_sec() -> non_neg_integer().
+now_sec() ->
+    {Mega, Sec, _Micro} = os:timestamp(),
+    Mega * 1000000 + Sec.
+
+
+-spec min_update_interval() -> non_neg_integer().
+min_update_interval() ->
+    config:get_integer("replicator", "session_min_update_interval",
+        ?MIN_UPDATE_INTERVAL).
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+
+get_session_url_test_() ->
+    [?_assertEqual(SessionUrl, get_session_url(Url)) || {Url, SessionUrl} <- [
+        {"http://host/db", "http://host/_session"},
+        {"http://127.0.0.1/db", "http://127.0.0.1/_session"},
+        {"http://host/x/y/z", "http://host/_session"},
+        {"http://host:5984/db", "http://host:5984/_session"},
+        {"https://host/db?q=1", "https://host/_session"}
+    ]].
+
+
+extract_creds_success_test_() ->
+    DefaultHeaders = (#httpdb{})#httpdb.headers,
+    [?_assertEqual({ok, User, Pass, HttpDb2}, extract_creds(HttpDb1)) ||
+        {HttpDb1, {User, Pass, HttpDb2}} <- [
+        {
+            #httpdb{url = "http://u:p@x.y/db"},
+            {"u", "p", #httpdb{url = "http://x.y/db"}}
+        },
+        {
+            #httpdb{url = "http://u:p@h:80/db"},
+            {"u", "p", #httpdb{url = "http://h:80/db"}}
+        },
+        {
+            #httpdb{url = "https://u:p@h/db"},
+            {"u", "p", #httpdb{url = "https://h/db"}}
+        },
+        {
+            #httpdb{url = "http://u:p@127.0.0.1:5984/db"},
+            {"u", "p", #httpdb{url = "http://127.0.0.1:5984/db"}}
+        },
+        {
+            #httpdb{url = "http://u:p@[2001:db8:a1b:12f9::1]/db"},
+            {"u", "p", #httpdb{url = "http://[2001:db8:a1b:12f9::1]/db"}}
+        },
+        {
+            #httpdb{url = "http://u:p@[2001:db8:a1b:12f9::1]:81/db"},
+            {"u", "p", #httpdb{url = "http://[2001:db8:a1b:12f9::1]:81/db"}}
+        },
+        {
+            #httpdb{url = "http://u:p@x.y/db/other?query=Z&query=w"},
+            {"u", "p", #httpdb{url = "http://x.y/db/other?query=Z&query=w"}}
+        },
+        {
+            #httpdb{
+                url = "http://h/db",
+                headers = DefaultHeaders ++ [
+                    {"Authorization", "Basic " ++ b64creds("u", "p")}
+                ]
+            },
+            {"u", "p", #httpdb{url = "http://h/db"}}
+        },
+        {
+            #httpdb{
+                url = "http://h/db",
+                headers = DefaultHeaders ++ [
+                    {"aUthoriZation", "bASIC " ++ b64creds("U", "p")}
+                ]
+            },
+            {"U", "p", #httpdb{url = "http://h/db"}}
+        },
+        {
+            #httpdb{
+                url = "http://u1:p1@h/db",
+                headers = DefaultHeaders ++ [
+                    {"Authorization", "Basic " ++ b64creds("u2", "p2")}
+                ]
+            },
+            {"u2", "p2", #httpdb{url = "http://h/db"}}
+        }
+    ]].
+
+
+cookie_update_test_() ->
+    {
+        foreach,
+        fun setup/0,
+        fun teardown/1,
+        [
+            t_do_refresh(),
+            t_dont_refresh(),
+            t_process_auth_failure(),
+            t_process_auth_failure_stale_epoch(),
+            t_process_auth_failure_too_frequent(),
+            t_process_ok_update_cookie(),
+            t_process_ok_no_cookie(),
+            t_init_state_fails_on_401(),
+            t_init_state_404(),
+            t_init_state_no_creds(),
+            t_init_state_http_error()
+        ]
+    }.
+
+
+t_do_refresh() ->
+    ?_test(begin
+        State = #state{next_refresh = 0},
+        {ok, State1} = maybe_refresh(State),
+        ?assertMatch(#state{
+            next_refresh = infinity,
+            epoch = 1,
+            cookie = "Abc"
+        }, State1)
+    end).
+
+
+t_dont_refresh() ->
+    ?_test(begin
+        State = #state{next_refresh = now_sec() + 100},
+        {ok, State1} = maybe_refresh(State),
+        ?assertMatch(State, State1),
+        State2 = #state{next_refresh = infinity},
+        {ok, State3} = maybe_refresh(State2),
+        ?assertMatch(State2, State3)
+    end).
+
+
+t_process_auth_failure() ->
+    ?_test(begin
+        State = #state{epoch = 1, refresh_tstamp = 0},
+        {retry, State1} = process_auth_failure(1, State),
+        NextRefresh = State1#state.next_refresh,
+        ?assert(NextRefresh =< now_sec())
+    end).
+
+
+t_process_auth_failure_stale_epoch() ->
+    ?_test(begin
+        State = #state{epoch = 3},
+        ?assertMatch({retry, State}, process_auth_failure(2, State))
+    end).
+
+
+t_process_auth_failure_too_frequent() ->
+    ?_test(begin
+        State = #state{epoch = 4, refresh_tstamp = now_sec()},
+        ?assertMatch({continue, _}, process_auth_failure(4, State))
+    end).
+
+
+t_process_ok_update_cookie() ->
+    ?_test(begin
+        Headers = [{"set-CookiE", "AuthSession=xyz; Path=/;"}, {"X", "y"}],
+        Res = process_response(200, Headers, 1, #state{}),
+        ?assertMatch({continue, #state{cookie = "xyz", epoch = 1}}, Res),
+        State = #state{cookie = "xyz", refresh_tstamp = 42, epoch = 2},
+        Res2 = process_response(200, Headers, 1, State),
+        ?assertMatch({continue, #state{cookie = "xyz", epoch = 2}}, Res2)
+    end).
+
+
+t_process_ok_no_cookie() ->
+    ?_test(begin
+        Headers = [{"X", "y"}],
+        State = #state{cookie = "old", epoch = 3, refresh_tstamp = 42},
+        Res = process_response(200, Headers, 1, State),
+        ?assertMatch({continue, State}, Res)
+    end).
+
+
+t_init_state_fails_on_401() ->
+    ?_test(begin
+        mock_http_401_response(),
+        {error, Error} = init_state(#httpdb{url = "http://u:p@h"}),
+        SessionUrl =  "http://h/_session",
+        ?assertEqual({session_request_unauthorized, SessionUrl, "u"}, Error)
+    end).
+
+
+t_init_state_404() ->
+    ?_test(begin
+        mock_http_404_response(),
+        ?assertEqual(ignore, init_state(#httpdb{url = "http://u:p@h"}))
+    end).
+
+
+t_init_state_no_creds() ->
+    ?_test(begin
+        ?_assertEqual(ignore, init_state(#httpdb{url = "http://h"}))
+    end).
+
+
+t_init_state_http_error() ->
+    ?_test(begin
+        mock_http_error_response(),
+        {error, Error} = init_state(#httpdb{url = "http://u:p@h"}),
+        SessionUrl = "http://h/_session",
+        ?assertEqual({session_request_failed, SessionUrl, "u", x}, Error)
+    end).
+
+
+setup() ->
+    meck:expect(couch_replicator_httpc_pool, get_worker, 1, {ok, worker}),
+    meck:expect(couch_replicator_httpc_pool, release_worker, 2, ok),
+    meck:expect(config, get, fun(_, _, Default) -> Default end),
+    mock_http_cookie_response("Abc"),
+    ok.
+
+
+teardown(_) ->
+    meck:unload().
+
+
+mock_http_cookie_response(Cookie) ->
+    Resp = {ok, "200", [{"Set-Cookie", "AuthSession=" ++ Cookie}], []},
+    meck:expect(ibrowse, send_req_direct, 7, Resp).
+
+
+mock_http_401_response() ->
+    meck:expect(ibrowse, send_req_direct, 7, {ok, "401", [], []}).
+
+
+mock_http_404_response() ->
+    meck:expect(ibrowse, send_req_direct, 7, {ok, "404", [], []}).
+
+
+mock_http_error_response() ->
+    meck:expect(ibrowse, send_req_direct, 7, {error, x}).
+
+
+extract_creds_error_test_() ->
+    [?_assertMatch({error, Error}, extract_creds(HttpDb)) ||
+        {HttpDb, Error} <- [
+        {#httpdb{url = "some_junk"}, invalid_uri},
+        {#httpdb{url = "http://h/db"}, missing_credentials}
+    ]].
+
+
+b64creds(User, Pass) ->
+    base64:encode_to_string(User ++ ":" ++ Pass).
+
+
+-endif.
diff --git a/src/couch_replicator/src/couch_replicator_changes_reader.erl b/src/couch_replicator/src/couch_replicator_changes_reader.erl
index 3659d95925..2e4df5365d 100644
--- a/src/couch_replicator/src/couch_replicator_changes_reader.erl
+++ b/src/couch_replicator/src/couch_replicator_changes_reader.erl
@@ -19,7 +19,7 @@
 -export([read_changes/5]).
 
 -include_lib("couch/include/couch_db.hrl").
--include("couch_replicator_api_wrap.hrl").
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
 -include("couch_replicator.hrl").
 
 -import(couch_util, [
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index 1fe91eca4c..62d21fe126 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -35,7 +35,7 @@
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("ibrowse/include/ibrowse.hrl").
 -include_lib("mem3/include/mem3.hrl").
--include("couch_replicator_api_wrap.hrl").
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
 -include("couch_replicator.hrl").
 -include("couch_replicator_js_functions.hrl").
 
@@ -396,28 +396,9 @@ parse_rep_db({Props}, Proxy, Options) ->
     {BinHeaders} = get_value(<<"headers">>, Props, {[]}),
     Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]),
     DefaultHeaders = (#httpdb{})#httpdb.headers,
-    OAuth = case get_value(<<"oauth">>, AuthProps) of
-    undefined ->
-        nil;
-    {OauthProps} ->
-        #oauth{
-            consumer_key = ?b2l(get_value(<<"consumer_key">>, OauthProps)),
-            token = ?b2l(get_value(<<"token">>, OauthProps)),
-            token_secret = ?b2l(get_value(<<"token_secret">>, OauthProps)),
-            consumer_secret = ?b2l(get_value(<<"consumer_secret">>,
-                OauthProps)),
-            signature_method =
-                case get_value(<<"signature_method">>, OauthProps) of
-                undefined ->        hmac_sha1;
-                <<"PLAINTEXT">> ->  plaintext;
-                <<"HMAC-SHA1">> ->  hmac_sha1;
-                <<"RSA-SHA1">> ->   rsa_sha1
-                end
-        }
-    end,
     #httpdb{
         url = Url,
-        oauth = OAuth,
+        auth_props = AuthProps,
         headers = lists:ukeymerge(1, Headers, DefaultHeaders),
         ibrowse_options = lists:keysort(1,
             [{socket_options, get_value(socket_options, Options)} |
@@ -695,8 +676,7 @@ strip_credentials(Url) when is_binary(Url) ->
         "http\\1://\\2",
         [{return, binary}]);
 strip_credentials({Props}) ->
-    Props1 = lists:keydelete(<<"oauth">>, 1, Props),
-    {lists:keydelete(<<"headers">>, 1, Props1)}.
+    {lists:keydelete(<<"headers">>, 1, Props)}.
 
 
 error_reason({shutdown, Error}) ->
@@ -772,10 +752,6 @@ check_strip_credentials_test() ->
             <<"https://remote_server/database">>,
             <<"https://foo:bar@remote_server/database">>
         },
-        {
-            {[{<<"_id">>, <<"foo">>}]},
-            {[{<<"_id">>, <<"foo">>}, {<<"oauth">>, <<"bar">>}]}
-        },
         {
             {[{<<"_id">>, <<"foo">>}]},
             {[{<<"_id">>, <<"foo">>}, {<<"headers">>, <<"bar">>}]}
@@ -786,8 +762,7 @@ check_strip_credentials_test() ->
         },
         {
             {[{<<"_id">>, <<"foo">>}]},
-            {[{<<"_id">>, <<"foo">>}, {<<"oauth">>, <<"bar">>},
-                {<<"headers">>, <<"baz">>}]}
+            {[{<<"_id">>, <<"foo">>}, {<<"headers">>, <<"baz">>}]}
         }
     ]].
 
diff --git a/src/couch_replicator/src/couch_replicator_httpc.erl b/src/couch_replicator/src/couch_replicator_httpc.erl
index 45472f4310..6e787514bd 100644
--- a/src/couch_replicator/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator/src/couch_replicator_httpc.erl
@@ -14,7 +14,7 @@
 
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("ibrowse/include/ibrowse.hrl").
--include("couch_replicator_api_wrap.hrl").
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
 
 -export([setup/1]).
 -export([send_req/3]).
@@ -51,8 +51,17 @@ setup(Db) ->
         undefined -> Url;
         _ when is_list(ProxyURL) -> ProxyURL
     end,
-    {ok, Pid} = couch_replicator_httpc_pool:start_link(HttpcURL, [{max_connections, MaxConns}]),
-    {ok, Db#httpdb{httpc_pool = Pid}}.
+    {ok, Pid} = couch_replicator_httpc_pool:start_link(HttpcURL,
+        [{max_connections, MaxConns}]),
+    case couch_replicator_auth:initialize(Db#httpdb{httpc_pool = Pid}) of
+        {ok, Db1} ->
+            {ok, Db1};
+        {error, Error} ->
+            LogMsg = "~p: auth plugin initialization failed ~p ~p",
+            LogUrl = couch_util:url_strip_password(Url),
+            couch_log:error(LogMsg, [?MODULE, LogUrl, Error]),
+            throw({replication_auth_error, Error})
+    end.
 
 
 send_req(HttpDb, Params1, Callback) ->
@@ -86,11 +95,11 @@ send_req(HttpDb, Params1, Callback) ->
     end.
 
 
-send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb, Params) ->
+send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb0, Params) ->
     Method = get_value(method, Params, get),
     UserHeaders = lists:keysort(1, get_value(headers, Params, [])),
     Headers1 = lists:ukeymerge(1, UserHeaders, BaseHeaders),
-    Headers2 = oauth_header(HttpDb, Params) ++ Headers1,
+    {Headers2, HttpDb} = couch_replicator_auth:update_headers(HttpDb0, Headers1),
     Url = full_url(HttpDb, Params),
     Body = get_value(body, Params, []),
     case get_value(path, Params) == "_changes" of
@@ -157,6 +166,7 @@ process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
         Json ->
             ?JSON_DECODE(Json)
         end,
+        process_auth_response(HttpDb, Ok, Headers, Params),
         Callback(Ok, Headers, EJson);
     R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
         backoff_success(HttpDb, Params),
@@ -179,8 +189,9 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
             backoff(HttpDb#httpdb{timeout = Timeout}, Params);
         Ok when (Ok >= 200 andalso Ok < 300) ; (Ok >= 400 andalso Ok < 500) ->
             backoff_success(HttpDb, Params),
+            HttpDb1 = process_auth_response(HttpDb, Ok, Headers, Params),
             StreamDataFun = fun() ->
-                stream_data_self(HttpDb, Params, Worker, ReqId, Callback)
+                stream_data_self(HttpDb1, Params, Worker, ReqId, Callback)
             end,
             put(?STREAM_STATUS, {streaming, Worker}),
             ibrowse:stream_next(ReqId),
@@ -190,9 +201,9 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
             catch
                 throw:{maybe_retry_req, connection_closed} ->
                     maybe_retry({connection_closed, mid_stream},
-                        Worker, HttpDb, Params);
+                        Worker, HttpDb1, Params);
                 throw:{maybe_retry_req, Err} ->
-                    maybe_retry(Err, Worker, HttpDb, Params)
+                    maybe_retry(Err, Worker, HttpDb1, Params)
             end;
         R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
             backoff_success(HttpDb, Params),
@@ -216,6 +227,16 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
     end.
 
 
+process_auth_response(HttpDb, Code, Headers, Params) ->
+    case couch_replicator_auth:handle_response(HttpDb, Code, Headers) of
+        {continue, HttpDb1} ->
+            HttpDb1;
+        {retry, HttpDb1} ->
+            log_retry_error(Params, HttpDb1, 0, Code),
+            throw({retry, HttpDb1, Params})
+    end.
+
+
 % Only streaming HTTP requests send messages back from
 % the ibrowse worker process. We can detect that based
 % on the ibrowse_req_id format. This just drops all
@@ -397,28 +418,6 @@ query_args_to_string([{K, V} | Rest], Acc) ->
     query_args_to_string(Rest, [K ++ "=" ++ couch_httpd:quote(V) | Acc]).
 
 
-oauth_header(#httpdb{oauth = nil}, _ConnParams) ->
-    [];
-oauth_header(#httpdb{url = BaseUrl, oauth = OAuth}, ConnParams) ->
-    Consumer = {
-        OAuth#oauth.consumer_key,
-        OAuth#oauth.consumer_secret,
-        OAuth#oauth.signature_method
-    },
-    Method = case get_value(method, ConnParams, get) of
-    get -> "GET";
-    post -> "POST";
-    put -> "PUT";
-    head -> "HEAD"
-    end,
-    QSL = get_value(qs, ConnParams, []),
-    OAuthParams = oauth:sign(Method,
-        BaseUrl ++ get_value(path, ConnParams, []),
-        QSL, Consumer, OAuth#oauth.token, OAuth#oauth.token_secret) -- QSL,
-    [{"Authorization",
-        "OAuth " ++ oauth:header_params_encode(OAuthParams)}].
-
-
 do_redirect(_Worker, Code, Headers, #httpdb{url = Url} = HttpDb, Params, _Cb) ->
     RedirectUrl = redirect_url(Headers, Url),
     {HttpDb2, Params2} = after_redirect(RedirectUrl, Code, HttpDb, Params),
diff --git a/src/couch_replicator/src/couch_replicator_ids.erl b/src/couch_replicator/src/couch_replicator_ids.erl
index e7067622bb..e8faf8ea3f 100644
--- a/src/couch_replicator/src/couch_replicator_ids.erl
+++ b/src/couch_replicator/src/couch_replicator_ids.erl
@@ -21,7 +21,7 @@
 -include_lib("ibrowse/include/ibrowse.hrl").
 
 -include_lib("couch/include/couch_db.hrl").
--include("couch_replicator_api_wrap.hrl").
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
 -include("couch_replicator.hrl").
 
 % replication_id/1 and replication_id/2 will attempt to fetch
@@ -127,62 +127,25 @@ maybe_append_options(Options, RepOptions) ->
     end, [], Options).
 
 
-get_rep_endpoint(_UserCtx, #httpdb{url=Url, headers=Headers, oauth=OAuth}) ->
+get_rep_endpoint(_UserCtx, #httpdb{url=Url, headers=Headers}) ->
     DefaultHeaders = (#httpdb{})#httpdb.headers,
-    case OAuth of
-    nil ->
-        {remote, Url, Headers -- DefaultHeaders};
-    #oauth{} ->
-        {remote, Url, Headers -- DefaultHeaders, OAuth}
-    end;
+    {remote, Url, Headers -- DefaultHeaders};
 get_rep_endpoint(UserCtx, <<DbName/binary>>) ->
     {local, DbName, UserCtx}.
 
 
 get_v4_endpoint(UserCtx, #httpdb{} = HttpDb) ->
-    {Url, Headers, OAuth} = case get_rep_endpoint(UserCtx, HttpDb) of
-        {remote, U, Hds} ->
-            {U, Hds, undefined};
-        {remote, U, Hds, OA} ->
-            {U, Hds, OA}
-    end,
-    {UserFromHeaders, HeadersWithoutBasicAuth} = remove_basic_auth(Headers),
+    {remote, Url, Headers} = get_rep_endpoint(UserCtx, HttpDb),
+    {{UserFromHeaders, _}, HeadersWithoutBasicAuth} =
+        couch_replicator_utils:remove_basic_auth_from_headers(Headers),
     {UserFromUrl, Host, NonDefaultPort, Path} = get_v4_url_info(Url),
     User = pick_defined_value([UserFromUrl, UserFromHeaders]),
+    OAuth = undefined, % Keep this to ensure checkpoints don't change
     {remote, User, Host, NonDefaultPort, Path, HeadersWithoutBasicAuth, OAuth};
 get_v4_endpoint(UserCtx, <<DbName/binary>>) ->
     {local, DbName, UserCtx}.
 
 
-remove_basic_auth(Headers) ->
-    case lists:partition(fun is_basic_auth/1, Headers) of
-        {[], HeadersWithoutBasicAuth} ->
-            {undefined, HeadersWithoutBasicAuth};
-        {[{_, "Basic " ++ Base64} | _], HeadersWithoutBasicAuth} ->
-            User = get_basic_auth_user(Base64),
-            {User, HeadersWithoutBasicAuth}
-    end.
-
-
-is_basic_auth({"Authorization", "Basic " ++ _Base64}) ->
-    true;
-is_basic_auth(_) ->
-    false.
-
-
-get_basic_auth_user(Base64) ->
-    try re:split(base64:decode(Base64), ":", [{return, list}, {parts, 2}]) of
-        [User, _Pass] ->
-            User;
-        _ ->
-            undefined
-    catch
-        % Tolerate invalid B64 values here to avoid crashing replicator
-        error:function_clause ->
-            undefined
-    end.
-
-
 pick_defined_value(Values) ->
     case [V || V <- Values, V /= undefined] of
         [] ->
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
index be956b6a72..0b396346a7 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -56,7 +56,7 @@
 
 -include("couch_replicator_scheduler.hrl").
 -include("couch_replicator.hrl").
--include("couch_replicator_api_wrap.hrl").
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
 -include_lib("couch/include/couch_db.hrl").
 
 %% types
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 0438249be0..1467d9f30d 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -29,7 +29,7 @@
 ]).
 
 -include_lib("couch/include/couch_db.hrl").
--include("couch_replicator_api_wrap.hrl").
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
 -include("couch_replicator_scheduler.hrl").
 -include("couch_replicator.hrl").
 
diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl
index 01881e4232..218fcf501c 100644
--- a/src/couch_replicator/src/couch_replicator_utils.erl
+++ b/src/couch_replicator/src/couch_replicator_utils.erl
@@ -27,7 +27,8 @@
    get_json_value/3,
    pp_rep_id/1,
    iso8601/1,
-   filter_state/3
+   filter_state/3,
+   remove_basic_auth_from_headers/1
 ]).
 
 -export([
@@ -36,7 +37,7 @@
 
 -include_lib("couch/include/couch_db.hrl").
 -include("couch_replicator.hrl").
--include("couch_replicator_api_wrap.hrl").
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
 
 -import(couch_util, [
     get_value/2,
@@ -174,3 +175,88 @@ filter_state(State, States, Info) ->
         false ->
             skip
     end.
+
+
+remove_basic_auth_from_headers(Headers) ->
+    Headers1 = mochiweb_headers:make(Headers),
+    case mochiweb_headers:get_value("Authorization", Headers1) of
+        undefined ->
+            {{undefined, undefined}, Headers};
+        Auth ->
+            {Basic, Base64} = lists:splitwith(fun(X) -> X =/= $\s end, Auth),
+            maybe_remove_basic_auth(string:to_lower(Basic), Base64, Headers1)
+    end.
+
+
+maybe_remove_basic_auth("basic", " " ++ Base64, Headers) ->
+    Headers1 = mochiweb_headers:delete_any("Authorization", Headers),
+    {decode_basic_creds(Base64), mochiweb_headers:to_list(Headers1)};
+maybe_remove_basic_auth(_, _, Headers) ->
+    {{undefined, undefined}, mochiweb_headers:to_list(Headers)}.
+
+
+decode_basic_creds(Base64) ->
+    try re:split(base64:decode(Base64), ":", [{return, list}, {parts, 2}]) of
+        [User, Pass] ->
+            {User, Pass};
+        _ ->
+            {undefined, undefined}
+    catch
+        % Tolerate invalid B64 values here to avoid crashing replicator
+        error:function_clause ->
+            {undefined, undefined}
+    end.
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+remove_basic_auth_from_headers_test_() ->
+    [?_assertMatch({{User, Pass}, NoAuthHeaders},
+        remove_basic_auth_from_headers(Headers)) ||
+        {{User, Pass, NoAuthHeaders}, Headers} <- [
+            {
+                {undefined, undefined, []},
+                []
+            },
+            {
+                {undefined, undefined, [{"h", "v"}]},
+                [{"h", "v"}]
+            },
+            {
+                {undefined, undefined, [{"Authorization", "junk"}]},
+                [{"Authorization", "junk"}]
+            },
+            {
+                {undefined, undefined, []},
+                [{"Authorization", "basic X"}]
+            },
+            {
+                {"user", "pass", []},
+                [{"Authorization", "Basic " ++ b64creds("user", "pass")}]
+            },
+            {
+                {"user", "pass", []},
+                [{"AuThorization", "Basic " ++ b64creds("user", "pass")}]
+            },
+            {
+                {"user", "pass", []},
+                [{"Authorization", "bAsIc " ++ b64creds("user", "pass")}]
+            },
+            {
+                {"user", "pass", [{"h", "v"}]},
+                [
+                    {"Authorization", "Basic " ++ b64creds("user", "pass")},
+                    {"h", "v"}
+                ]
+            }
+        ]
+    ].
+
+
+b64creds(User, Pass) ->
+    base64:encode_to_string(User ++ ":" ++ Pass).
+
+
+-endif.
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl
index db6b72b2e9..e51565866e 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -22,7 +22,7 @@
 -export([handle_call/3, handle_cast/2, handle_info/2]).
 
 -include_lib("couch/include/couch_db.hrl").
--include("couch_replicator_api_wrap.hrl").
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
 -include("couch_replicator.hrl").
 
 % TODO: maybe make both buffer max sizes configurable
diff --git a/src/couch_replicator/test/couch_replicator_proxy_tests.erl b/src/couch_replicator/test/couch_replicator_proxy_tests.erl
index a40e5b1666..4f545bcb5e 100644
--- a/src/couch_replicator/test/couch_replicator_proxy_tests.erl
+++ b/src/couch_replicator/test/couch_replicator_proxy_tests.erl
@@ -14,7 +14,7 @@
 
 -include_lib("couch/include/couch_eunit.hrl").
 -include_lib("couch_replicator/src/couch_replicator.hrl").
--include_lib("couch_replicator/src/couch_replicator_api_wrap.hrl").
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
 
 
 setup() ->


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services