You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2017/04/05 19:08:50 UTC

[couchdb] 04/09: Share connections between replications

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch 63012-scheduler
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 6908717c938ab9ac63563a746e0871300097e257
Author: Benjamin Bastian <be...@gmail.com>
AuthorDate: Tue Aug 2 15:01:23 2016 -0700

    Share connections between replications
    
    This commit adds functionality to share connections between
    replications. This is to solve two problems:
    
    - Prior to this commit, each replication would create a pool of
      connections and hold onto those connections as long as the replication
      existed. This was wasteful and cause CouchDB to use many unnecessary
      connections.
    - When the pool was being terminated, the pool would block while the
      socket was closed. This would cause the entire replication scheduler
      to block. By reusing connections, connections are never closed by
      clients. They are only ever relinquished. This operation is always
      fast.
    
    This commit adds an intermediary process which tracks which connection
    processes are being used by which client. It monitors clients and
    connections. If a client or connection crashes, the paired
    client/connection will be terminated.
    
    A client can gracefully relinquish ownership of a connection. If that
    happens, the connection will be shared with another client. If the
    connection remains idle for too long, it will be closed.
    
    Jira: COUCHDB-3324
---
 .../src/couch_replicator_connection.erl            | 211 +++++++++++++++++++++
 .../src/couch_replicator_httpc_pool.erl            |  78 +++-----
 .../test/couch_replicator_connection_tests.erl     | 128 +++++++++++++
 3 files changed, 370 insertions(+), 47 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_connection.erl b/src/couch_replicator/src/couch_replicator_connection.erl
new file mode 100644
index 0000000..e580663
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_connection.erl
@@ -0,0 +1,211 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_connection).
+
+-behavior(gen_server).
+-behavior(config_listener).
+
+-export([start_link/0]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
+-export([code_change/3, terminate/2]).
+
+-export([acquire/1, relinquish/1]).
+
+-export([handle_config_change/5, handle_config_terminate/3]).
+
+-define(DEFAULT_CLOSE_INTERVAL, 90000).
+-define(RELISTEN_DELAY, 5000).
+
+-record(state, {
+    close_interval,
+    timer
+}).
+
+-record(connection, {
+    worker,
+    host,
+    port,
+    mref
+}).
+
+-include_lib("ibrowse/include/ibrowse.hrl").
+
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+init([]) ->
+    process_flag(trap_exit, true),
+    ?MODULE = ets:new(?MODULE, [named_table, public, {keypos, #connection.worker}]),
+    ok = config:listen_for_changes(?MODULE, nil),
+    Interval = config:get_integer("replicator", "connection_close_interval", ?DEFAULT_CLOSE_INTERVAL),
+    {ok, Timer} = timer:send_after(Interval, close_idle_connections),
+    ibrowse:add_config([{inactivity_timeout, Interval}]),
+    {ok, #state{close_interval=Interval, timer=Timer}}.
+
+
+acquire(URL) when is_binary(URL) ->
+    acquire(binary_to_list(URL));
+
+acquire(URL) ->
+    case gen_server:call(?MODULE, {acquire, URL}) of
+        {ok, Worker} ->
+            link(Worker),
+            {ok, Worker};
+        {error, all_allocated} ->
+            {ok, Pid} = ibrowse:spawn_link_worker_process(URL),
+            ok = gen_server:call(?MODULE, {create, URL, Pid}),
+            {ok, Pid};
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+
+relinquish(Worker) ->
+    unlink(Worker),
+    gen_server:cast(?MODULE, {relinquish, Worker}).
+
+
+handle_call({acquire, URL}, From, State) ->
+    {Pid, _Ref} = From,
+    case ibrowse_lib:parse_url(URL) of
+        #url{host=Host, port=Port} ->
+            case ets:match_object(?MODULE, #connection{host=Host, port=Port, mref=undefined, _='_'}, 1) of
+                '$end_of_table' ->
+                    {reply, {error, all_allocated}, State};
+                {[Worker], _Cont} ->
+                    couch_stats:increment_counter([couch_replicator, connection, acquires]),
+                    ets:insert(?MODULE, Worker#connection{mref=monitor(process, Pid)}),
+                    {reply, {ok, Worker#connection.worker}, State}
+            end;
+        {error, invalid_uri} ->
+            {reply, {error, invalid_uri}, State}
+    end;
+
+handle_call({create, URL, Worker}, From, State) ->
+    {Pid, _Ref} = From,
+    case ibrowse_lib:parse_url(URL) of
+        #url{host=Host, port=Port} ->
+            link(Worker),
+            couch_stats:increment_counter([couch_replicator, connection, creates]),
+            true = ets:insert_new(
+                ?MODULE,
+                #connection{host=Host, port=Port, worker=Worker, mref=monitor(process, Pid)}
+            ),
+            {reply, ok, State}
+    end.
+
+
+handle_cast({relinquish, WorkerPid}, State) ->
+    couch_stats:increment_counter([couch_replicator, connection, relinquishes]),
+    case ets:lookup(?MODULE, WorkerPid) of
+        [Worker] ->
+            case Worker#connection.mref of
+                MRef when is_reference(MRef) -> demonitor(MRef, [flush]);
+                undefined -> ok
+            end,
+            ets:insert(?MODULE, Worker#connection{mref=undefined});
+        [] ->
+            ok
+    end,
+    {noreply, State};
+
+handle_cast({connection_close_interval, V}, State) ->
+    {ok, cancel} = timer:cancel(State#state.timer),
+    {ok, NewTimer} = timer:send_after(V, close_idle_connections),
+    ibrowse:add_config([{inactivity_timeout, V}]),
+    {noreply, State#state{close_interval=V, timer=NewTimer}}.
+
+
+% owner crashed
+handle_info({'DOWN', Ref, process, _Pid, _Reason}, State) ->
+    couch_stats:increment_counter([couch_replicator, connection, owner_crashes]),
+    ets:match_delete(?MODULE, #connection{mref=Ref, _='_'}),
+    {noreply, State};
+
+% worker crashed
+handle_info({'EXIT', Pid, Reason}, State) ->
+    couch_stats:increment_counter([couch_replicator, connection, worker_crashes]),
+    case ets:lookup(?MODULE, Pid) of
+        [] ->
+            ok;
+        [Worker] ->
+            #connection{host=Host, port=Port} = Worker,
+            maybe_log_worker_death(Host, Port, Reason),
+            case Worker#connection.mref of
+                MRef when is_reference(MRef) -> demonitor(MRef, [flush]);
+                undefined -> ok
+            end,
+            ets:delete(?MODULE, Pid)
+    end,
+    {noreply, State};
+
+handle_info(close_idle_connections, State) ->
+    #state{
+        close_interval=Interval,
+        timer=Timer
+    } = State,
+    Conns = ets:match_object(?MODULE, #connection{mref=undefined, _='_'}),
+    lists:foreach(fun(Conn) ->
+        couch_stats:increment_counter([couch_replicator, connection, closes]),
+        delete_worker(Conn)
+    end, Conns),
+    {ok, cancel} = timer:cancel(Timer),
+    {ok, NewTimer} = timer:send_after(Interval, close_idle_connections),
+    {noreply, State#state{timer=NewTimer}};
+
+handle_info(restart_config_listener, State) ->
+    ok = config:listen_for_changes(?MODULE, nil),
+    {noreply, State}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+terminate(_Reason, _State) ->
+    ok.
+
+
+maybe_log_worker_death(_Host, _Port, normal) ->
+    ok;
+
+maybe_log_worker_death(Host, Port, Reason) ->
+    ErrMsg = "Replication connection to: ~p:~p died with reason ~p",
+    couch_log:info(ErrMsg, [Host, Port, Reason]).
+
+
+-spec delete_worker(#connection{}) -> ok.
+delete_worker(Worker) ->
+    ets:delete(?MODULE, Worker#connection.worker),
+    unlink(Worker#connection.worker),
+    spawn(fun() -> ibrowse_http_client:stop(Worker#connection.worker) end),
+    ok.
+
+
+handle_config_change("replicator", "connection_close_interval", V, _, S) ->
+    ok = gen_server:cast(?MODULE, {connection_close_interval,
+        list_to_integer(V)}),
+    {ok, S};
+
+handle_config_change(_, _, _, _, S) ->
+    {ok, S}.
+
+
+handle_config_terminate(_, stop, _) ->
+    ok;
+
+handle_config_terminate(_, _, _) ->
+    Pid = whereis(?MODULE),
+    erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener).
diff --git a/src/couch_replicator/src/couch_replicator_httpc_pool.erl b/src/couch_replicator/src/couch_replicator_httpc_pool.erl
index 09e3b23..9f1f1a8 100644
--- a/src/couch_replicator/src/couch_replicator_httpc_pool.erl
+++ b/src/couch_replicator/src/couch_replicator_httpc_pool.erl
@@ -31,8 +31,7 @@
 -record(state, {
     url,
     limit,                  % max # of workers allowed
-    free = [],              % free workers (connections)
-    busy = [],              % busy workers (connections)
+    conns = [],
     waiting = queue:new(),  % blocked clients waiting for a worker
     callers = []            % clients who've been given a worker
 }).
@@ -70,23 +69,17 @@ handle_call(get_worker, From, State) ->
         callers = Callers,
         url = Url,
         limit = Limit,
-        busy = Busy,
-        free = Free
+        conns = Conns
     } = State,
-    case length(Busy) >= Limit of
+    case length(Conns) >= Limit of
     true ->
         {noreply, State#state{waiting = queue:in(From, Waiting)}};
     false ->
-        case Free of
-        [] ->
-           {ok, Worker} = ibrowse:spawn_link_worker_process(Url),
-           Free2 = Free;
-        [Worker | Free2] ->
-           ok
-        end,
+        % If the call to acquire fails, the worker pool will crash with a
+        % badmatch.
+        {ok, Worker} = couch_replicator_connection:acquire(Url),
         NewState = State#state{
-            free = Free2,
-            busy = [Worker | Busy],
+            conns = [Worker | Conns],
             callers = monitor_client(Callers, Worker, From)
         },
         {reply, {ok, Worker}, NewState}
@@ -104,35 +97,29 @@ handle_cast({release_worker, Worker}, State) ->
 handle_info({'EXIT', Pid, _Reason}, State) ->
     #state{
         url = Url,
-        busy = Busy,
-        free = Free,
+        conns = Conns,
         waiting = Waiting,
         callers = Callers
     } = State,
     NewCallers0 = demonitor_client(Callers, Pid),
-    case Free -- [Pid] of
-    Free ->
-        case Busy -- [Pid] of
-        Busy ->
+    case Conns -- [Pid] of
+        Conns ->
             {noreply, State#state{callers = NewCallers0}};
-        Busy2 ->
+        Conns2 ->
             case queue:out(Waiting) of
-            {empty, _} ->
-                {noreply, State#state{busy = Busy2, callers = NewCallers0}};
-            {{value, From}, Waiting2} ->
-                {ok, Worker} = ibrowse:spawn_link_worker_process(Url),
-                NewCallers1 = monitor_client(NewCallers0, Worker, From),
-                gen_server:reply(From, {ok, Worker}),
-                NewState = State#state{
-                    busy = [Worker | Busy2],
-                    waiting = Waiting2,
-                    callers = NewCallers1
-                },
-                {noreply, NewState}
+                {empty, _} ->
+                    {noreply, State#state{conns = Conns2, callers = NewCallers0}};
+                {{value, From}, Waiting2} ->
+                    {ok, Worker} = couch_replicator_connection:acquire(Url),
+                    NewCallers1 = monitor_client(NewCallers0, Worker, From),
+                    gen_server:reply(From, {ok, Worker}),
+                    NewState = State#state{
+                        conns = [Worker | Conns2],
+                        waiting = Waiting2,
+                        callers = NewCallers1
+                    },
+                    {noreply, NewState}
             end
-        end;
-    Free2 ->
-        {noreply, State#state{free = Free2, callers = NewCallers0}}
     end;
 
 handle_info({'DOWN', Ref, process, _, _}, #state{callers = Callers} = State) ->
@@ -147,9 +134,8 @@ code_change(_OldVsn, #state{}=State, _Extra) ->
     {ok, State}.
 
 
-terminate(_Reason, State) ->
-    lists:foreach(fun ibrowse_http_client:stop/1, State#state.free),
-    lists:foreach(fun ibrowse_http_client:stop/1, State#state.busy).
+terminate(_Reason, _State) ->
+    ok.
 
 monitor_client(Callers, Worker, {ClientPid, _}) ->
     [{Worker, erlang:monitor(process, ClientPid)} | Callers].
@@ -167,22 +153,20 @@ release_worker_internal(Worker, State) ->
     #state{waiting = Waiting, callers = Callers} = State,
     NewCallers0 = demonitor_client(Callers, Worker),
     case is_process_alive(Worker) andalso
-        lists:member(Worker, State#state.busy) of
+        lists:member(Worker, State#state.conns) of
     true ->
-        case queue:out(Waiting) of
+        Conns = case queue:out(Waiting) of
         {empty, Waiting2} ->
             NewCallers1 = NewCallers0,
-            Busy2 = State#state.busy -- [Worker],
-            Free2 = [Worker | State#state.free];
+            couch_replicator_connection:relinquish(Worker),
+            State#state.conns -- [Worker];
         {{value, From}, Waiting2} ->
             NewCallers1 = monitor_client(NewCallers0, Worker, From),
             gen_server:reply(From, {ok, Worker}),
-            Busy2 = State#state.busy,
-            Free2 = State#state.free
+            State#state.conns
         end,
         NewState = State#state{
-           busy = Busy2,
-           free = Free2,
+           conns = Conns,
            waiting = Waiting2,
            callers = NewCallers1
         },
diff --git a/src/couch_replicator/test/couch_replicator_connection_tests.erl b/src/couch_replicator/test/couch_replicator_connection_tests.erl
new file mode 100644
index 0000000..323a7de
--- /dev/null
+++ b/src/couch_replicator/test/couch_replicator_connection_tests.erl
@@ -0,0 +1,128 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_connection_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-define(TIMEOUT, 1000).
+
+
+setup() ->
+    Host = config:get("httpd", "bind_address", "127.0.0.1"),
+    Port = config:get("httpd", "port", "5984"),
+    {Host, Port}.
+
+teardown(_) ->
+    ok.
+
+
+httpc_pool_test_() ->
+    {
+        "replicator connection sharing tests",
+        {
+            setup,
+            fun() -> test_util:start_couch([couch_replicator]) end, fun test_util:stop_couch/1,
+            {
+                foreach,
+                fun setup/0, fun teardown/1,
+                [
+                    fun connections_shared_after_relinquish/1,
+                    fun connections_not_shared_after_owner_death/1,
+                    fun idle_connections_closed/1,
+                    fun test_owner_monitors/1
+                ]
+            }
+        }
+    }.
+
+
+connections_shared_after_relinquish({Host, Port}) ->
+    ?_test(begin
+        URL = "http://" ++ Host ++ ":" ++ Port,
+        Self = self(),
+        {ok, Pid} = couch_replicator_connection:acquire(URL),
+        couch_replicator_connection:relinquish(Pid),
+        spawn(fun() ->
+            Self ! couch_replicator_connection:acquire(URL)
+        end),
+        receive
+            {ok, Pid2} ->
+                ?assertEqual(Pid, Pid2)
+        end
+    end).
+
+
+connections_not_shared_after_owner_death({Host, Port}) ->
+    ?_test(begin
+        URL = "http://" ++ Host ++ ":" ++ Port,
+        Self = self(),
+        spawn(fun() ->
+            Self ! couch_replicator_connection:acquire(URL),
+            1/0
+        end),
+        receive
+            {ok, Pid} ->
+                {ok, Pid2} = couch_replicator_connection:acquire(URL),
+                ?assertNotEqual(Pid, Pid2),
+                MRef = monitor(process, Pid),
+                receive {'DOWN', MRef, process, Pid, _Reason} ->
+                    ?assert(not is_process_alive(Pid));
+                    Other -> throw(Other)
+                end
+        end
+    end).
+
+
+idle_connections_closed({Host, Port}) ->
+    ?_test(begin
+        URL = "http://" ++ Host ++ ":" ++ Port,
+        {ok, Pid} = couch_replicator_connection:acquire(URL),
+        couch_replicator_connection ! close_idle_connections,
+        ?assert(ets:member(couch_replicator_connection, Pid)),
+        % block until idle connections have closed
+        sys:get_status(couch_replicator_connection),
+        couch_replicator_connection:relinquish(Pid),
+        couch_replicator_connection ! close_idle_connections,
+        % block until idle connections have closed
+        sys:get_status(couch_replicator_connection),
+        ?assert(not ets:member(couch_replicator_connection, Pid))
+    end).
+
+
+test_owner_monitors({Host, Port}) ->
+    ?_test(begin
+        URL = "http://" ++ Host ++ ":" ++ Port,
+        {ok, Worker0} = couch_replicator_connection:acquire(URL),
+        assert_monitors_equal([{process, self()}]),
+        couch_replicator_connection:relinquish(Worker0),
+        assert_monitors_equal([]),
+        {Workers, Monitors}  = lists:foldl(fun(_, {WAcc, MAcc}) ->
+            {ok, Worker1} = couch_replicator_connection:acquire(URL),
+            MAcc1 = [{process, self()} | MAcc],
+            assert_monitors_equal(MAcc1),
+            {[Worker1 | WAcc], MAcc1}
+        end, {[], []}, lists:seq(1,5)),
+        lists:foldl(fun(Worker2, Acc) ->
+            [_ | NewAcc] = Acc,
+            couch_replicator_connection:relinquish(Worker2),
+            assert_monitors_equal(NewAcc),
+            NewAcc
+        end, Monitors, Workers)
+    end).
+
+
+assert_monitors_equal(ShouldBe) ->
+    sys:get_status(couch_replicator_connection),
+    {monitors, Monitors} = process_info(whereis(couch_replicator_connection), monitors),
+    ?assertEqual(Monitors, ShouldBe).

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.