You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2019/10/28 20:17:47 UTC

[couchdb] branch master updated: Include proxy host and port in connection pool key

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 692f602  Include proxy host and port in connection pool key
692f602 is described below

commit 692f6025567f8fab586b26fef446ba0b8a698f94
Author: Robert Newson <rn...@apache.org>
AuthorDate: Thu Oct 24 21:30:02 2019 +0100

    Include proxy host and port in connection pool key
    
    Closes #2271
---
 .../src/couch_replicator_connection.erl            | 58 ++++++++++++++++------
 .../src/couch_replicator_httpc.erl                 |  8 +--
 .../src/couch_replicator_httpc_pool.erl            | 17 +++++--
 3 files changed, 56 insertions(+), 27 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_connection.erl b/src/couch_replicator/src/couch_replicator_connection.erl
index f3e4a86..f31baf4 100644
--- a/src/couch_replicator/src/couch_replicator_connection.erl
+++ b/src/couch_replicator/src/couch_replicator_connection.erl
@@ -30,6 +30,7 @@
 
 -export([
    acquire/1,
+   acquire/2,
    release/1
 ]).
 
@@ -53,6 +54,8 @@
     worker,
     host,
     port,
+    proxy_host,
+    proxy_port,
     mref
 }).
 
@@ -72,19 +75,28 @@ init([]) ->
     ibrowse:add_config([{inactivity_timeout, Interval}]),
     {ok, #state{close_interval=Interval, timer=Timer}}.
 
+acquire(Url) ->
+    acquire(Url, undefined).
 
-acquire(URL) when is_binary(URL) ->
-    acquire(binary_to_list(URL));
+acquire(Url, ProxyUrl) when is_binary(Url) ->
+    acquire(binary_to_list(Url), ProxyUrl);
 
-acquire(URL0) ->
-    URL = couch_util:url_strip_password(URL0),
-    case gen_server:call(?MODULE, {acquire, URL}) of
+acquire(Url, ProxyUrl) when is_binary(ProxyUrl) ->
+    acquire(Url, binary_to_list(ProxyUrl));
+
+acquire(Url0, ProxyUrl0) ->
+    Url = couch_util:url_strip_password(Url0),
+    ProxyUrl = case ProxyUrl0 of
+        undefined -> undefined;
+        _ -> couch_util:url_strip_password(ProxyUrl0)
+    end,
+    case gen_server:call(?MODULE, {acquire, Url, ProxyUrl}) of
         {ok, Worker} ->
             link(Worker),
             {ok, Worker};
         {error, all_allocated} ->
-            {ok, Pid} = ibrowse:spawn_link_worker_process(URL),
-            ok = gen_server:call(?MODULE, {create, URL, Pid}),
+            {ok, Pid} = ibrowse:spawn_link_worker_process(Url),
+            ok = gen_server:call(?MODULE, {create, Url, ProxyUrl, Pid}),
             {ok, Pid};
         {error, Reason} ->
             {error, Reason}
@@ -96,11 +108,14 @@ release(Worker) ->
     gen_server:cast(?MODULE, {release, Worker}).
 
 
-handle_call({acquire, URL}, From, State) ->
+handle_call({acquire, Url, ProxyUrl}, From, State) ->
     {Pid, _Ref} = From,
-    case ibrowse_lib:parse_url(URL) of
-        #url{host=Host, port=Port} ->
-            Pat = #connection{host=Host, port=Port, mref=undefined, _='_'},
+    case {ibrowse_lib:parse_url(Url), parse_proxy_url(ProxyUrl)} of
+        {#url{host=Host, port=Port}, #url{host=ProxyHost, port=ProxyPort}} ->
+            Pat = #connection{
+            host=Host, port=Port,
+            proxy_host=ProxyHost, proxy_port=ProxyPort,
+            mref=undefined, _='_'},
             case ets:match_object(?MODULE, Pat, 1) of
                 '$end_of_table' ->
                     {reply, {error, all_allocated}, State};
@@ -111,20 +126,25 @@ handle_call({acquire, URL}, From, State) ->
                         Pid)}),
                     {reply, {ok, Worker#connection.worker}, State}
             end;
-        {error, invalid_uri} ->
+        {{error, invalid_uri}, _} ->
+            {reply, {error, invalid_uri}, State};
+        {_, {error, invalid_uri}} ->
             {reply, {error, invalid_uri}, State}
     end;
 
-handle_call({create, URL, Worker}, From, State) ->
+handle_call({create, Url, ProxyUrl, Worker}, From, State) ->
     {Pid, _Ref} = From,
-    case ibrowse_lib:parse_url(URL) of
-        #url{host=Host, port=Port} ->
+    case {ibrowse_lib:parse_url(Url), parse_proxy_url(ProxyUrl)} of
+        {#url{host=Host, port=Port}, #url{host=ProxyHost, port=ProxyPort}} ->
             link(Worker),
             couch_stats:increment_counter([couch_replicator, connection,
                 creates]),
             true = ets:insert_new(
                 ?MODULE,
-                #connection{host=Host, port=Port, worker=Worker,
+                #connection{
+                    host=Host, port=Port,
+                    proxy_host=ProxyHost, proxy_port=ProxyPort,
+                    worker=Worker,
                     mref=monitor(process, Pid)}
             ),
             {reply, ok, State}
@@ -239,3 +259,9 @@ handle_config_terminate(_, stop, _) ->
 handle_config_terminate(_, _, _) ->
     Pid = whereis(?MODULE),
     erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener).
+
+
+parse_proxy_url(undefined) ->
+    #url{host=undefined, port=undefined};
+parse_proxy_url(ProxyUrl) ->
+    ibrowse_lib:parse_url(ProxyUrl).
diff --git a/src/couch_replicator/src/couch_replicator_httpc.erl b/src/couch_replicator/src/couch_replicator_httpc.erl
index e4cf116..4dce319 100644
--- a/src/couch_replicator/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator/src/couch_replicator_httpc.erl
@@ -45,13 +45,9 @@ setup(Db) ->
         httpc_pool = nil,
         url = Url,
         http_connections = MaxConns,
-        proxy_url = ProxyURL
+        proxy_url = ProxyUrl
     } = Db,
-    HttpcURL = case ProxyURL of
-        undefined -> Url;
-        _ when is_list(ProxyURL) -> ProxyURL
-    end,
-    {ok, Pid} = couch_replicator_httpc_pool:start_link(HttpcURL,
+    {ok, Pid} = couch_replicator_httpc_pool:start_link(Url, ProxyUrl,
         [{max_connections, MaxConns}]),
     case couch_replicator_auth:initialize(Db#httpdb{httpc_pool = Pid}) of
         {ok, Db1} ->
diff --git a/src/couch_replicator/src/couch_replicator_httpc_pool.erl b/src/couch_replicator/src/couch_replicator_httpc_pool.erl
index 33fb61f..90234a6 100644
--- a/src/couch_replicator/src/couch_replicator_httpc_pool.erl
+++ b/src/couch_replicator/src/couch_replicator_httpc_pool.erl
@@ -15,7 +15,7 @@
 -vsn(1).
 
 % public API
--export([start_link/2, stop/1]).
+-export([start_link/2, start_link/3, stop/1]).
 -export([get_worker/1, release_worker/2, release_worker_sync/2]).
 
 % gen_server API
@@ -30,6 +30,7 @@
 
 -record(state, {
     url,
+    proxy_url,
     limit,                  % max # of workers allowed
     workers = [],
     waiting = queue:new(),  % blocked clients waiting for a worker
@@ -38,7 +39,10 @@
 
 
 start_link(Url, Options) ->
-    gen_server:start_link(?MODULE, {Url, Options}, []).
+    start_link(Url, undefined, Options).
+
+start_link(Url, ProxyUrl, Options) ->
+    gen_server:start_link(?MODULE, {Url, ProxyUrl, Options}, []).
 
 stop(Pool) ->
     ok = gen_server:call(Pool, stop, infinity).
@@ -54,10 +58,11 @@ release_worker(Pool, Worker) ->
 release_worker_sync(Pool, Worker) ->
     ok = gen_server:call(Pool, {release_worker_sync, Worker}).
 
-init({Url, Options}) ->
+init({Url, ProxyUrl, Options}) ->
     process_flag(trap_exit, true),
     State = #state{
         url = Url,
+        proxy_url = ProxyUrl,
         limit = get_value(max_connections, Options)
     },
     {ok, State}.
@@ -68,6 +73,7 @@ handle_call(get_worker, From, State) ->
         waiting = Waiting,
         callers = Callers,
         url = Url,
+        proxy_url = ProxyUrl,
         limit = Limit,
         workers = Workers
     } = State,
@@ -77,7 +83,7 @@ handle_call(get_worker, From, State) ->
     false ->
         % If the call to acquire fails, the worker pool will crash with a
         % badmatch.
-        {ok, Worker} = couch_replicator_connection:acquire(Url),
+        {ok, Worker} = couch_replicator_connection:acquire(Url, ProxyUrl),
         NewState = State#state{
             workers = [Worker | Workers],
             callers = monitor_client(Callers, Worker, From)
@@ -97,6 +103,7 @@ handle_cast({release_worker, Worker}, State) ->
 handle_info({'EXIT', Pid, _Reason}, State) ->
     #state{
         url = Url,
+        proxy_url = ProxyUrl,
         workers = Workers,
         waiting = Waiting,
         callers = Callers
@@ -111,7 +118,7 @@ handle_info({'EXIT', Pid, _Reason}, State) ->
                     {noreply, State#state{workers = Workers2,
                         callers = NewCallers0}};
                 {{value, From}, Waiting2} ->
-                    {ok, Worker} = couch_replicator_connection:acquire(Url),
+                    {ok, Worker} = couch_replicator_connection:acquire(Url, ProxyUrl),
                     NewCallers1 = monitor_client(NewCallers0, Worker, From),
                     gen_server:reply(From, {ok, Worker}),
                     NewState = State#state{