You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2017/04/18 18:39:52 UTC

[couchdb] 01/06: Share connections between replications

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch 63012-scheduler
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit aebec51b3f1e8856f16a6552d54aea0a181a1a4d
Author: Benjamin Bastian <be...@gmail.com>
AuthorDate: Tue Aug 2 15:01:23 2016 -0700

    Share connections between replications
    
    This commit adds functionality to share connections between
    replications. This is to solve two problems:
    
    - Prior to this commit, each replication would create a pool of
      connections and hold onto those connections as long as the replication
      existed. This was wasteful and cause CouchDB to use many unnecessary
      connections.
    - When the pool was being terminated, the pool would block while the
      socket was closed. This would cause the entire replication scheduler
      to block. By reusing connections, connections are never closed by
      clients. They are only ever relinquished. This operation is always
      fast.
    
    This commit adds an intermediary process which tracks which connection
    processes are being used by which client. It monitors clients and
    connections. If a client or connection crashes, the paired
    client/connection will be terminated.
    
    A client can gracefully relinquish ownership of a connection. If that
    happens, the connection will be shared with another client. If the
    connection remains idle for too long, it will be closed.
    
    Jira: COUCHDB-3324
---
 .../src/couch_replicator_connection.erl            | 237 +++++++++++++++++++++
 .../src/couch_replicator_httpc.erl                 | 123 +++++++----
 .../src/couch_replicator_httpc_pool.erl            |  79 +++----
 3 files changed, 354 insertions(+), 85 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_connection.erl b/src/couch_replicator/src/couch_replicator_connection.erl
new file mode 100644
index 0000000..4c2f9a3
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_connection.erl
@@ -0,0 +1,237 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_connection).
+
+-behavior(gen_server).
+-behavior(config_listener).
+
+-export([
+    start_link/0
+]).
+
+-export([
+   init/1,
+   terminate/2,
+   handle_call/3,
+   handle_info/2,
+   handle_cast/2,
+   code_change/3
+]).
+
+-export([
+   acquire/1,
+   release/1
+]).
+
+-export([
+    handle_config_change/5,
+    handle_config_terminate/3
+]).
+
+-include_lib("ibrowse/include/ibrowse.hrl").
+
+-define(DEFAULT_CLOSE_INTERVAL, 90000).
+-define(RELISTEN_DELAY, 5000).
+
+
+-record(state, {
+    close_interval,
+    timer
+}).
+
+-record(connection, {
+    worker,
+    host,
+    port,
+    mref
+}).
+
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+init([]) ->
+    process_flag(trap_exit, true),
+    ?MODULE = ets:new(?MODULE, [named_table, public,
+        {keypos, #connection.worker}]),
+    ok = config:listen_for_changes(?MODULE, nil),
+    Interval = config:get_integer("replicator", "connection_close_interval",
+        ?DEFAULT_CLOSE_INTERVAL),
+    {ok, Timer} = timer:send_after(Interval, close_idle_connections),
+    ibrowse:add_config([{inactivity_timeout, Interval}]),
+    {ok, #state{close_interval=Interval, timer=Timer}}.
+
+
+acquire(URL) when is_binary(URL) ->
+    acquire(binary_to_list(URL));
+
+acquire(URL0) ->
+    URL = couch_util:url_strip_password(URL0),
+    case gen_server:call(?MODULE, {acquire, URL}) of
+        {ok, Worker} ->
+            link(Worker),
+            {ok, Worker};
+        {error, all_allocated} ->
+            {ok, Pid} = ibrowse:spawn_link_worker_process(URL),
+            ok = gen_server:call(?MODULE, {create, URL, Pid}),
+            {ok, Pid};
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+
+release(Worker) ->
+    unlink(Worker),
+    gen_server:cast(?MODULE, {release, Worker}).
+
+
+handle_call({acquire, URL}, From, State) ->
+    {Pid, _Ref} = From,
+    case ibrowse_lib:parse_url(URL) of
+        #url{host=Host, port=Port} ->
+            Pat = #connection{host=Host, port=Port, mref=undefined, _='_'},
+            case ets:match_object(?MODULE, Pat, 1) of
+                '$end_of_table' ->
+                    {reply, {error, all_allocated}, State};
+                {[Worker], _Cont} ->
+                    couch_stats:increment_counter([couch_replicator, connection,
+                        acquires]),
+                    ets:insert(?MODULE, Worker#connection{mref=monitor(process,
+                        Pid)}),
+                    {reply, {ok, Worker#connection.worker}, State}
+            end;
+        {error, invalid_uri} ->
+            {reply, {error, invalid_uri}, State}
+    end;
+
+handle_call({create, URL, Worker}, From, State) ->
+    {Pid, _Ref} = From,
+    case ibrowse_lib:parse_url(URL) of
+        #url{host=Host, port=Port} ->
+            link(Worker),
+            couch_stats:increment_counter([couch_replicator, connection,
+                creates]),
+            true = ets:insert_new(
+                ?MODULE,
+                #connection{host=Host, port=Port, worker=Worker,
+                    mref=monitor(process, Pid)}
+            ),
+            {reply, ok, State}
+    end.
+
+
+handle_cast({release, WorkerPid}, State) ->
+    couch_stats:increment_counter([couch_replicator, connection, releases]),
+    case ets:lookup(?MODULE, WorkerPid) of
+        [Worker] ->
+            case Worker#connection.mref of
+                MRef when is_reference(MRef) -> demonitor(MRef, [flush]);
+                undefined -> ok
+            end,
+            ets:insert(?MODULE, Worker#connection{mref=undefined});
+        [] ->
+            ok
+    end,
+    {noreply, State};
+
+handle_cast({connection_close_interval, V}, State) ->
+    {ok, cancel} = timer:cancel(State#state.timer),
+    {ok, NewTimer} = timer:send_after(V, close_idle_connections),
+    ibrowse:add_config([{inactivity_timeout, V}]),
+    {noreply, State#state{close_interval=V, timer=NewTimer}}.
+
+
+% owner crashed
+handle_info({'DOWN', Ref, process, _Pid, _Reason}, State) ->
+    couch_stats:increment_counter([couch_replicator, connection,
+        owner_crashes]),
+    ets:match_delete(?MODULE, #connection{mref=Ref, _='_'}),
+    {noreply, State};
+
+% worker crashed
+handle_info({'EXIT', Pid, Reason}, State) ->
+    couch_stats:increment_counter([couch_replicator, connection,
+        worker_crashes]),
+    case ets:lookup(?MODULE, Pid) of
+        [] ->
+            ok;
+        [Worker] ->
+            #connection{host=Host, port=Port} = Worker,
+            maybe_log_worker_death(Host, Port, Reason),
+            case Worker#connection.mref of
+                MRef when is_reference(MRef) -> demonitor(MRef, [flush]);
+                undefined -> ok
+            end,
+            ets:delete(?MODULE, Pid)
+    end,
+    {noreply, State};
+
+handle_info(close_idle_connections, State) ->
+    #state{
+        close_interval=Interval,
+        timer=Timer
+    } = State,
+    Conns = ets:match_object(?MODULE, #connection{mref=undefined, _='_'}),
+    lists:foreach(fun(Conn) ->
+        couch_stats:increment_counter([couch_replicator, connection, closes]),
+        delete_worker(Conn)
+    end, Conns),
+    {ok, cancel} = timer:cancel(Timer),
+    {ok, NewTimer} = timer:send_after(Interval, close_idle_connections),
+    {noreply, State#state{timer=NewTimer}};
+
+handle_info(restart_config_listener, State) ->
+    ok = config:listen_for_changes(?MODULE, nil),
+    {noreply, State}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+terminate(_Reason, _State) ->
+    ok.
+
+
+maybe_log_worker_death(_Host, _Port, normal) ->
+    ok;
+
+maybe_log_worker_death(Host, Port, Reason) ->
+    ErrMsg = "Replication connection to: ~p:~p died with reason ~p",
+    couch_log:info(ErrMsg, [Host, Port, Reason]).
+
+
+-spec delete_worker(#connection{}) -> ok.
+delete_worker(Worker) ->
+    ets:delete(?MODULE, Worker#connection.worker),
+    unlink(Worker#connection.worker),
+    spawn(fun() -> ibrowse_http_client:stop(Worker#connection.worker) end),
+    ok.
+
+
+handle_config_change("replicator", "connection_close_interval", V, _, S) ->
+    ok = gen_server:cast(?MODULE, {connection_close_interval,
+        list_to_integer(V)}),
+    {ok, S};
+
+handle_config_change(_, _, _, _, S) ->
+    {ok, S}.
+
+
+handle_config_terminate(_, stop, _) ->
+    ok;
+
+handle_config_terminate(_, _, _) ->
+    Pid = whereis(?MODULE),
+    erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener).
diff --git a/src/couch_replicator/src/couch_replicator_httpc.erl b/src/couch_replicator/src/couch_replicator_httpc.erl
index 309a230..58fb0e1 100644
--- a/src/couch_replicator/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator/src/couch_replicator_httpc.erl
@@ -40,8 +40,18 @@
 -define(MAX_DISCARDED_MESSAGES, 16).
 
 
-setup(#httpdb{httpc_pool = nil, url = Url, http_connections = MaxConns} = Db) ->
-    {ok, Pid} = couch_replicator_httpc_pool:start_link(Url, [{max_connections, MaxConns}]),
+setup(Db) ->
+    #httpdb{
+        httpc_pool = nil,
+        url = Url,
+        http_connections = MaxConns,
+        proxy_url = ProxyURL
+    } = Db,
+    HttpcURL = case ProxyURL of
+        undefined -> Url;
+        _ when is_list(ProxyURL) -> ProxyURL
+    end,
+    {ok, Pid} = couch_replicator_httpc_pool:start_link(HttpcURL, [{max_connections, MaxConns}]),
     {ok, Db#httpdb{httpc_pool = Pid}}.
 
 
@@ -98,6 +108,7 @@ send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb, Params) ->
         lists:ukeymerge(1, get_value(ibrowse_options, Params, []),
             HttpDb#httpdb.ibrowse_options)
     ],
+    backoff_before_request(Worker, HttpDb, Params),
     Response = ibrowse:send_req_direct(
         Worker, Url, Headers2, Method, Body, IbrowseOptions, Timeout),
     {Worker, Response}.
@@ -139,8 +150,9 @@ process_response({ibrowse_req_id, ReqId}, Worker, HttpDb, Params, Callback) ->
 process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
     case list_to_integer(Code) of
     429 ->
-        backoff(Worker, HttpDb, Params);
+        backoff(HttpDb, Params);
     Ok when (Ok >= 200 andalso Ok < 300) ; (Ok >= 400 andalso Ok < 500) ->
+        backoff_success(HttpDb, Params),
         couch_stats:increment_counter([couch_replicator, responses, success]),
         EJson = case Body of
         <<>> ->
@@ -150,6 +162,7 @@ process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
         end,
         Callback(Ok, Headers, EJson);
     R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
+        backoff_success(HttpDb, Params),
         do_redirect(Worker, R, Headers, HttpDb, Params, Callback);
     Error ->
         couch_stats:increment_counter([couch_replicator, responses, failure]),
@@ -165,9 +178,10 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
     {ibrowse_async_headers, ReqId, Code, Headers} ->
         case list_to_integer(Code) of
         429 ->
-            backoff(Worker, HttpDb#httpdb{timeout = get_max_back_off()},
-                Params);
+            Timeout = couch_replicator_rate_limiter:max_interval(),
+            backoff(HttpDb#httpdb{timeout = Timeout}, Params);
         Ok when (Ok >= 200 andalso Ok < 300) ; (Ok >= 400 andalso Ok < 500) ->
+            backoff_success(HttpDb, Params),
             StreamDataFun = fun() ->
                 stream_data_self(HttpDb, Params, Worker, ReqId, Callback)
             end,
@@ -184,6 +198,7 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
                     maybe_retry(Err, Worker, HttpDb, Params)
             end;
         R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
+            backoff_success(HttpDb, Params),
             do_redirect(Worker, R, Headers, HttpDb, Params, Callback);
         Error ->
             couch_stats:increment_counter(
@@ -266,37 +281,48 @@ discard_message(ReqId, Worker, Count) ->
     end.
 
 
-%% For 429 errors, we perform an exponential backoff up to 2.17 hours.
-%% We use Backoff time as a timeout/failure end.
-backoff(Worker, #httpdb{backoff = Backoff} = HttpDb, Params) ->
-    MaxBackOff = get_max_back_off(),
-    MaxBackOffLog = get_back_off_log_threshold(),
-    ok = timer:sleep(random:uniform(Backoff)),
-    Backoff2 = round(Backoff*get_back_off_exp()),
-    NewBackoff = erlang:min(Backoff2, MaxBackOff),
-    NewHttpDb = HttpDb#httpdb{backoff = NewBackoff},
-    case Backoff2 of
-        W0 when W0 > MaxBackOff ->
-            report_error(Worker, HttpDb, Params, {error,
-                "Long 429-induced Retry Time Out"});
-        W1 when W1 >=  MaxBackOffLog -> % Past 8 min, we log retries
-            log_retry_error(Params, HttpDb, Backoff2, "429 Retry"),
-            throw({retry, NewHttpDb, Params});
-        _ ->
-            throw({retry, NewHttpDb, Params})
-    end.
-
-
 maybe_retry(Error, Worker, #httpdb{retries = 0} = HttpDb, Params) ->
     report_error(Worker, HttpDb, Params, {error, Error});
 
-maybe_retry(Error, _Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb,
+maybe_retry(Error, Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb,
     Params) ->
-    ok = timer:sleep(Wait),
-    log_retry_error(Params, HttpDb, Wait, Error),
-    Wait2 = erlang:min(Wait * 2, ?MAX_WAIT),
-    NewHttpDb = HttpDb#httpdb{retries = Retries - 1, wait = Wait2},
-    throw({retry, NewHttpDb, Params}).
+    case total_error_time_exceeded(HttpDb) of
+        true ->
+            report_error(Worker, HttpDb, Params, {error, Error});
+        false ->
+            ok = timer:sleep(Wait),
+            log_retry_error(Params, HttpDb, Wait, Error),
+            Wait2 = erlang:min(Wait * 2, ?MAX_WAIT),
+            HttpDb1 = HttpDb#httpdb{retries = Retries - 1, wait = Wait2},
+            HttpDb2 = update_first_error_timestamp(HttpDb1),
+            throw({retry, HttpDb2, Params})
+    end.
+
+
+% When retrying, check to make total time spent retrying a request is below
+% the current scheduler health threshold. The goal is to not exceed the
+% threshold, otherwise the job which keep retrying too long will still be
+% considered healthy.
+total_error_time_exceeded(#httpdb{first_error_timestamp = nil}) ->
+    false;
+
+total_error_time_exceeded(#httpdb{first_error_timestamp = ErrorTimestamp}) ->
+    HealthThresholdSec = couch_replicator_scheduler:health_threshold(),
+    % Theshold value is halved because in the calling code the next step
+    % is a doubling. Not halving here could mean sleeping too long and
+    % exceeding the health threshold.
+    ThresholdUSec = (HealthThresholdSec / 2) * 1000000,
+    timer:now_diff(os:timestamp(), ErrorTimestamp) > ThresholdUSec.
+
+
+% Remember the first time an error occurs. This value is used later to check
+% the total time spend retrying a request. Because retrying is cursive, on
+% successful result #httpdb{} record is reset back to the original value.
+update_first_error_timestamp(#httpdb{first_error_timestamp = nil} = HttpDb) ->
+    HttpDb#httpdb{first_error_timestamp = os:timestamp()};
+
+update_first_error_timestamp(HttpDb) ->
+    HttpDb.
 
 
 log_retry_error(Params, HttpDb, Wait, Error) ->
@@ -440,11 +466,32 @@ after_redirect(RedirectUrl, HttpDb, Params) ->
     Params2 = lists:keydelete(path, 1, lists:keydelete(qs, 1, Params)),
     {HttpDb#httpdb{url = RedirectUrl}, Params2}.
 
-get_max_back_off() ->
-    config:get_integer("replicator", "max_backoff_wait", 250 * 32768).
 
-get_back_off_log_threshold() ->
-    config:get_integer("replicator", "max_backoff_log_threshold", 512000).
+backoff_key(HttpDb, Params) ->
+    Method = get_value(method, Params, get),
+    Url = HttpDb#httpdb.url,
+    {Url, Method}.
+
+
+backoff(HttpDb, Params) ->
+    Key = backoff_key(HttpDb, Params),
+    couch_replicator_rate_limiter:failure(Key),
+    throw({retry, HttpDb, Params}).
 
-get_back_off_exp() ->
-    config:get_float("replicator", "backoff_exp", 1.5).
+
+backoff_success(HttpDb, Params) ->
+    Key = backoff_key(HttpDb, Params),
+    couch_replicator_rate_limiter:success(Key).
+
+
+backoff_before_request(Worker, HttpDb, Params) ->
+    Key = backoff_key(HttpDb, Params),
+    Limit = couch_replicator_rate_limiter:max_interval(),
+    case couch_replicator_rate_limiter:interval(Key) of
+        Sleep when Sleep >= Limit ->
+            report_error(Worker, HttpDb, Params, max_backoff);
+        Sleep when Sleep >= 1 ->
+            timer:sleep(Sleep);
+        Sleep when Sleep == 0 ->
+            ok
+    end.
diff --git a/src/couch_replicator/src/couch_replicator_httpc_pool.erl b/src/couch_replicator/src/couch_replicator_httpc_pool.erl
index 09e3b23..33fb61f 100644
--- a/src/couch_replicator/src/couch_replicator_httpc_pool.erl
+++ b/src/couch_replicator/src/couch_replicator_httpc_pool.erl
@@ -31,8 +31,7 @@
 -record(state, {
     url,
     limit,                  % max # of workers allowed
-    free = [],              % free workers (connections)
-    busy = [],              % busy workers (connections)
+    workers = [],
     waiting = queue:new(),  % blocked clients waiting for a worker
     callers = []            % clients who've been given a worker
 }).
@@ -70,23 +69,17 @@ handle_call(get_worker, From, State) ->
         callers = Callers,
         url = Url,
         limit = Limit,
-        busy = Busy,
-        free = Free
+        workers = Workers
     } = State,
-    case length(Busy) >= Limit of
+    case length(Workers) >= Limit of
     true ->
         {noreply, State#state{waiting = queue:in(From, Waiting)}};
     false ->
-        case Free of
-        [] ->
-           {ok, Worker} = ibrowse:spawn_link_worker_process(Url),
-           Free2 = Free;
-        [Worker | Free2] ->
-           ok
-        end,
+        % If the call to acquire fails, the worker pool will crash with a
+        % badmatch.
+        {ok, Worker} = couch_replicator_connection:acquire(Url),
         NewState = State#state{
-            free = Free2,
-            busy = [Worker | Busy],
+            workers = [Worker | Workers],
             callers = monitor_client(Callers, Worker, From)
         },
         {reply, {ok, Worker}, NewState}
@@ -104,35 +97,30 @@ handle_cast({release_worker, Worker}, State) ->
 handle_info({'EXIT', Pid, _Reason}, State) ->
     #state{
         url = Url,
-        busy = Busy,
-        free = Free,
+        workers = Workers,
         waiting = Waiting,
         callers = Callers
     } = State,
     NewCallers0 = demonitor_client(Callers, Pid),
-    case Free -- [Pid] of
-    Free ->
-        case Busy -- [Pid] of
-        Busy ->
+    case Workers -- [Pid] of
+        Workers ->
             {noreply, State#state{callers = NewCallers0}};
-        Busy2 ->
+        Workers2 ->
             case queue:out(Waiting) of
-            {empty, _} ->
-                {noreply, State#state{busy = Busy2, callers = NewCallers0}};
-            {{value, From}, Waiting2} ->
-                {ok, Worker} = ibrowse:spawn_link_worker_process(Url),
-                NewCallers1 = monitor_client(NewCallers0, Worker, From),
-                gen_server:reply(From, {ok, Worker}),
-                NewState = State#state{
-                    busy = [Worker | Busy2],
-                    waiting = Waiting2,
-                    callers = NewCallers1
-                },
-                {noreply, NewState}
+                {empty, _} ->
+                    {noreply, State#state{workers = Workers2,
+                        callers = NewCallers0}};
+                {{value, From}, Waiting2} ->
+                    {ok, Worker} = couch_replicator_connection:acquire(Url),
+                    NewCallers1 = monitor_client(NewCallers0, Worker, From),
+                    gen_server:reply(From, {ok, Worker}),
+                    NewState = State#state{
+                        workers = [Worker | Workers2],
+                        waiting = Waiting2,
+                        callers = NewCallers1
+                    },
+                    {noreply, NewState}
             end
-        end;
-    Free2 ->
-        {noreply, State#state{free = Free2, callers = NewCallers0}}
     end;
 
 handle_info({'DOWN', Ref, process, _, _}, #state{callers = Callers} = State) ->
@@ -147,9 +135,8 @@ code_change(_OldVsn, #state{}=State, _Extra) ->
     {ok, State}.
 
 
-terminate(_Reason, State) ->
-    lists:foreach(fun ibrowse_http_client:stop/1, State#state.free),
-    lists:foreach(fun ibrowse_http_client:stop/1, State#state.busy).
+terminate(_Reason, _State) ->
+    ok.
 
 monitor_client(Callers, Worker, {ClientPid, _}) ->
     [{Worker, erlang:monitor(process, ClientPid)} | Callers].
@@ -167,22 +154,20 @@ release_worker_internal(Worker, State) ->
     #state{waiting = Waiting, callers = Callers} = State,
     NewCallers0 = demonitor_client(Callers, Worker),
     case is_process_alive(Worker) andalso
-        lists:member(Worker, State#state.busy) of
+        lists:member(Worker, State#state.workers) of
     true ->
-        case queue:out(Waiting) of
+        Workers = case queue:out(Waiting) of
         {empty, Waiting2} ->
             NewCallers1 = NewCallers0,
-            Busy2 = State#state.busy -- [Worker],
-            Free2 = [Worker | State#state.free];
+            couch_replicator_connection:release(Worker),
+            State#state.workers -- [Worker];
         {{value, From}, Waiting2} ->
             NewCallers1 = monitor_client(NewCallers0, Worker, From),
             gen_server:reply(From, {ok, Worker}),
-            Busy2 = State#state.busy,
-            Free2 = State#state.free
+            State#state.workers
         end,
         NewState = State#state{
-           busy = Busy2,
-           free = Free2,
+           workers = Workers,
            waiting = Waiting2,
            callers = NewCallers1
         },

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.