You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2019/10/24 20:33:01 UTC

[couchdb] 01/01: Include proxy host and port in connection pool key

This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch 2271-proxy-connection-sharing
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit f1f9fec00f48bb413f6d09b64f8788466298efc2
Author: Robert Newson <rn...@apache.org>
AuthorDate: Thu Oct 24 21:30:02 2019 +0100

    Include proxy host and port in connection pool key
---
 .../src/couch_replicator_connection.erl            | 48 ++++++++++++++--------
 .../src/couch_replicator_httpc.erl                 |  8 +---
 .../src/couch_replicator_httpc_pool.erl            | 16 +++++---
 3 files changed, 43 insertions(+), 29 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_connection.erl b/src/couch_replicator/src/couch_replicator_connection.erl
index f3e4a86..4284472 100644
--- a/src/couch_replicator/src/couch_replicator_connection.erl
+++ b/src/couch_replicator/src/couch_replicator_connection.erl
@@ -29,7 +29,7 @@
 ]).
 
 -export([
-   acquire/1,
+   acquire/2,
    release/1
 ]).
 
@@ -53,6 +53,8 @@
     worker,
     host,
     port,
+    proxy_host,
+    proxy_port,
     mref
 }).
 
@@ -73,18 +75,22 @@ init([]) ->
     {ok, #state{close_interval=Interval, timer=Timer}}.
 
 
-acquire(URL) when is_binary(URL) ->
-    acquire(binary_to_list(URL));
+acquire(Url, ProxyUrl) when is_binary(Url) ->
+    acquire(binary_to_list(Url), ProxyUrl);
 
-acquire(URL0) ->
-    URL = couch_util:url_strip_password(URL0),
-    case gen_server:call(?MODULE, {acquire, URL}) of
+acquire(Url, ProxyUrl) when is_binary(ProxyUrl) ->
+    acquire(Url, binary_to_list(ProxyUrl));
+
+acquire(Url0, ProxyUrl0) ->
+    Url = couch_util:url_strip_password(Url0),
+    ProxyUrl = couch_util:url_strip_password(ProxyUrl0),
+    case gen_server:call(?MODULE, {acquire, Url, ProxyUrl}) of
         {ok, Worker} ->
             link(Worker),
             {ok, Worker};
         {error, all_allocated} ->
-            {ok, Pid} = ibrowse:spawn_link_worker_process(URL),
-            ok = gen_server:call(?MODULE, {create, URL, Pid}),
+            {ok, Pid} = ibrowse:spawn_link_worker_process(Url),
+            ok = gen_server:call(?MODULE, {create, Url, ProxyUrl, Pid}),
             {ok, Pid};
         {error, Reason} ->
             {error, Reason}
@@ -96,11 +102,14 @@ release(Worker) ->
     gen_server:cast(?MODULE, {release, Worker}).
 
 
-handle_call({acquire, URL}, From, State) ->
+handle_call({acquire, Url, ProxyUrl}, From, State) ->
     {Pid, _Ref} = From,
-    case ibrowse_lib:parse_url(URL) of
-        #url{host=Host, port=Port} ->
-            Pat = #connection{host=Host, port=Port, mref=undefined, _='_'},
+    case {ibrowse_lib:parse_url(Url), ibrowse_lib:parse_url(ProxyUrl)} of
+        {#url{host=Host, port=Port}, #url{host=ProxyHost, port=ProxyPort}} ->
+            Pat = #connection{
+	        host=Host, port=Port,
+		proxy_host=ProxyHost, proxy_port=ProxyPort,
+		mref=undefined, _='_'},
             case ets:match_object(?MODULE, Pat, 1) of
                 '$end_of_table' ->
                     {reply, {error, all_allocated}, State};
@@ -111,20 +120,25 @@ handle_call({acquire, URL}, From, State) ->
                         Pid)}),
                     {reply, {ok, Worker#connection.worker}, State}
             end;
-        {error, invalid_uri} ->
+        {{error, invalid_uri}, _} ->
+            {reply, {error, invalid_uri}, State};
+        {_, {error, invalid_uri}} ->
             {reply, {error, invalid_uri}, State}
     end;
 
-handle_call({create, URL, Worker}, From, State) ->
+handle_call({create, Url, ProxyUrl, Worker}, From, State) ->
     {Pid, _Ref} = From,
-    case ibrowse_lib:parse_url(URL) of
-        #url{host=Host, port=Port} ->
+    case {ibrowse_lib:parse_url(Url), ibrowse_lib:parse_url(ProxyUrl)} of
+        {#url{host=Host, port=Port}, #url{host=ProxyHost, port=ProxyPort}} ->
             link(Worker),
             couch_stats:increment_counter([couch_replicator, connection,
                 creates]),
             true = ets:insert_new(
                 ?MODULE,
-                #connection{host=Host, port=Port, worker=Worker,
+                #connection{
+		    host=Host, port=Port,
+		    proxy_host=ProxyHost, proxy_port=ProxyPort,
+		    worker=Worker,
                     mref=monitor(process, Pid)}
             ),
             {reply, ok, State}
diff --git a/src/couch_replicator/src/couch_replicator_httpc.erl b/src/couch_replicator/src/couch_replicator_httpc.erl
index e4cf116..4dce319 100644
--- a/src/couch_replicator/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator/src/couch_replicator_httpc.erl
@@ -45,13 +45,9 @@ setup(Db) ->
         httpc_pool = nil,
         url = Url,
         http_connections = MaxConns,
-        proxy_url = ProxyURL
+        proxy_url = ProxyUrl
     } = Db,
-    HttpcURL = case ProxyURL of
-        undefined -> Url;
-        _ when is_list(ProxyURL) -> ProxyURL
-    end,
-    {ok, Pid} = couch_replicator_httpc_pool:start_link(HttpcURL,
+    {ok, Pid} = couch_replicator_httpc_pool:start_link(Url, ProxyUrl,
         [{max_connections, MaxConns}]),
     case couch_replicator_auth:initialize(Db#httpdb{httpc_pool = Pid}) of
         {ok, Db1} ->
diff --git a/src/couch_replicator/src/couch_replicator_httpc_pool.erl b/src/couch_replicator/src/couch_replicator_httpc_pool.erl
index 33fb61f..377be50 100644
--- a/src/couch_replicator/src/couch_replicator_httpc_pool.erl
+++ b/src/couch_replicator/src/couch_replicator_httpc_pool.erl
@@ -15,7 +15,7 @@
 -vsn(1).
 
 % public API
--export([start_link/2, stop/1]).
+-export([start_link/3, stop/1]).
 -export([get_worker/1, release_worker/2, release_worker_sync/2]).
 
 % gen_server API
@@ -30,6 +30,7 @@
 
 -record(state, {
     url,
+    proxy_url,
     limit,                  % max # of workers allowed
     workers = [],
     waiting = queue:new(),  % blocked clients waiting for a worker
@@ -37,8 +38,8 @@
 }).
 
 
-start_link(Url, Options) ->
-    gen_server:start_link(?MODULE, {Url, Options}, []).
+start_link(Url, ProxyUrl, Options) ->
+    gen_server:start_link(?MODULE, {Url, ProxyUrl, Options}, []).
 
 stop(Pool) ->
     ok = gen_server:call(Pool, stop, infinity).
@@ -54,10 +55,11 @@ release_worker(Pool, Worker) ->
 release_worker_sync(Pool, Worker) ->
     ok = gen_server:call(Pool, {release_worker_sync, Worker}).
 
-init({Url, Options}) ->
+init({Url, ProxyUrl, Options}) ->
     process_flag(trap_exit, true),
     State = #state{
         url = Url,
+	proxy_url = ProxyUrl,
         limit = get_value(max_connections, Options)
     },
     {ok, State}.
@@ -68,6 +70,7 @@ handle_call(get_worker, From, State) ->
         waiting = Waiting,
         callers = Callers,
         url = Url,
+	proxy_url = ProxyUrl,
         limit = Limit,
         workers = Workers
     } = State,
@@ -77,7 +80,7 @@ handle_call(get_worker, From, State) ->
     false ->
         % If the call to acquire fails, the worker pool will crash with a
         % badmatch.
-        {ok, Worker} = couch_replicator_connection:acquire(Url),
+        {ok, Worker} = couch_replicator_connection:acquire(Url, ProxyUrl),
         NewState = State#state{
             workers = [Worker | Workers],
             callers = monitor_client(Callers, Worker, From)
@@ -97,6 +100,7 @@ handle_cast({release_worker, Worker}, State) ->
 handle_info({'EXIT', Pid, _Reason}, State) ->
     #state{
         url = Url,
+	proxy_url = ProxyUrl,
         workers = Workers,
         waiting = Waiting,
         callers = Callers
@@ -111,7 +115,7 @@ handle_info({'EXIT', Pid, _Reason}, State) ->
                     {noreply, State#state{workers = Workers2,
                         callers = NewCallers0}};
                 {{value, From}, Waiting2} ->
-                    {ok, Worker} = couch_replicator_connection:acquire(Url),
+                    {ok, Worker} = couch_replicator_connection:acquire(Url, ProxyUrl),
                     NewCallers1 = monitor_client(NewCallers0, Worker, From),
                     gen_server:reply(From, {ok, Worker}),
                     NewState = State#state{