You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2010/12/07 21:57:33 UTC

svn commit: r1043196 - in /couchdb/branches/new_replicator: etc/couchdb/default.ini.tpl.in src/couchdb/Makefile.am src/couchdb/couch_api_wrap.erl src/couchdb/couch_api_wrap.hrl src/couchdb/couch_api_wrap_httpc.erl src/couchdb/couch_httpc_pool.erl

Author: fdmanana
Date: Tue Dec  7 20:57:33 2010
New Revision: 1043196

URL: http://svn.apache.org/viewvc?rev=1043196&view=rev
Log:
New replicator: added custom httpc connection pool manager. This avoids sharing http connections between different replications that share the same remote endpoints.


Added:
    couchdb/branches/new_replicator/src/couchdb/couch_httpc_pool.erl
Modified:
    couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in
    couchdb/branches/new_replicator/src/couchdb/Makefile.am
    couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
    couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl
    couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl

Modified: couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in?rev=1043196&r1=1043195&r2=1043196&view=diff
==============================================================================
--- couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in (original)
+++ couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in Tue Dec  7 20:57:33 2010
@@ -118,8 +118,6 @@ compressible_types = text/*, application
 [replicator]
 ; should be at least 2
 worker_processes = 10
-; the maximum number of TCP connections to use against a single server
-max_connections_per_server = 100
 ; set to true to validate peer certificates
 verify_ssl_certificates = false
 ; file containing a list of peer trusted certificates (PEM format)

Modified: couchdb/branches/new_replicator/src/couchdb/Makefile.am
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/Makefile.am?rev=1043196&r1=1043195&r2=1043196&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/Makefile.am (original)
+++ couchdb/branches/new_replicator/src/couchdb/Makefile.am Tue Dec  7 20:57:33 2010
@@ -41,6 +41,7 @@ source_files = \
     couch_external_manager.erl \
     couch_external_server.erl \
     couch_file.erl \
+    couch_httpc_pool.erl \
     couch_httpd.erl \
     couch_httpd_db.erl \
     couch_httpd_auth.erl \
@@ -108,6 +109,7 @@ compiled_files = \
     couch_external_manager.beam \
     couch_external_server.beam \
     couch_file.beam \
+    couch_httpc_pool.beam \
     couch_httpd.beam \
     couch_httpd_db.beam \
     couch_httpd_auth.beam \

Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=1043196&r1=1043195&r2=1043196&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Tue Dec  7 20:57:33 2010
@@ -97,8 +97,9 @@ db_open(DbName, Options, Create) ->
         throw({unauthorized, DbName})
     end.
 
-db_close(#httpdb{}) ->
-    ok;
+db_close(#httpdb{httpc_pool = Pool}) ->
+    unlink(Pool),
+    ok = couch_httpc_pool:stop(Pool);
 db_close(DbName) ->
     couch_db:close(DbName).
 

Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl?rev=1043196&r1=1043195&r2=1043196&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl Tue Dec  7 20:57:33 2010
@@ -20,7 +20,8 @@
     proxy_options = [],
     ssl_options = [],
     retries = 10,
-    wait = 250          % milliseconds
+    wait = 250,         % milliseconds
+    httpc_pool = nil
 }).
 
 -record(oauth, {

Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl?rev=1043196&r1=1043195&r2=1043196&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl Tue Dec  7 20:57:33 2010
@@ -29,20 +29,14 @@
 -define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
 
 
-setup(#httpdb{url = Url} = Db) ->
+setup(#httpdb{url = Url, httpc_pool = nil} = Db) ->
     #url{host = Host, port = Port} = ibrowse_lib:parse_url(Url),
-    MaxSessions = list_to_integer(
-        couch_config:get("replicator", "max_connections_per_server", "100")),
-    ok = ibrowse:set_max_sessions(Host, Port, MaxSessions),
-    ok = ibrowse:set_max_pipeline_size(Host, Port, 1),
-    ok = couch_config:register(
-        fun("replicator", "max_connections_per_server", NewMax) ->
-            ok = ibrowse:set_max_sessions(Host, Port, list_to_integer(NewMax))
-        end),
-    {ok, Db}.
+    {ok, Pid} = couch_httpc_pool:start_link(Host, Port),
+    {ok, Db#httpdb{httpc_pool = Pid}}.
 
 
-send_req(#httpdb{headers = BaseHeaders} = HttpDb, Params, Callback) ->
+send_req(#httpdb{headers = BaseHeaders, httpc_pool = Pool} = HttpDb,
+         Params, Callback) ->
     Method = get_value(method, Params, get),
     Headers = get_value(headers, Params, []) ++ BaseHeaders,
     Body = get_value(body, Params, []),
@@ -63,20 +57,19 @@ send_req(#httpdb{headers = BaseHeaders} 
     Url = full_url(HttpDb, Params),
     case get_value(direct, Params, false) of
     true ->
-        {ok, Worker} = ibrowse:spawn_link_worker_process(Url),
+        {ok, Pid} = ibrowse:spawn_link_worker_process(Url),
+        Worker = {direct, Pid},
         Response = ibrowse:send_req_direct(
-            Worker, Url, Headers2, Method, Body, IbrowseOptions, infinity);
+            Pid, Url, Headers2, Method, Body, IbrowseOptions, infinity);
     false ->
-        Worker = nil,
-        Response = ibrowse:send_req(
-            Url, Headers2, Method, Body, IbrowseOptions, infinity)
+        {ok, Worker} = couch_httpc_pool:get_worker(Pool),
+        Response = ibrowse:send_req_direct(
+            Worker, Url, Headers2, Method, Body, IbrowseOptions, infinity)
     end,
     process_response(Response, Worker, HttpDb, Params, Callback).
 
 
-process_response({error, retry_later}, _Worker, HttpDb, Params, Callback) ->
-    % this means that the config option "max_connections_per_server" should
-    % probably be increased
+process_response({error, sel_conn_closed}, _Worker, HttpDb, Params, Callback) ->
     ok = timer:sleep(?RETRY_LATER_WAIT),
     send_req(HttpDb, Params, Callback);
 
@@ -89,7 +82,7 @@ process_response({ibrowse_req_id, ReqId}
     process_stream_response(ReqId, Worker, HttpDb, Params, Callback);
 
 process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
-    stop_worker(Worker),
+    stop_worker(Worker, HttpDb),
     case list_to_integer(Code) of
     Ok when Ok =:= 200 ; Ok =:= 201 ; (Ok >= 400 andalso Ok < 500) ->
         EJson = case Body of
@@ -118,7 +111,7 @@ process_stream_response(ReqId, Worker, H
                 stream_data_self(HttpDb, Params, Worker, ReqId, Callback)
             end,
             Ret = Callback(Ok, Headers, StreamDataFun),
-            stop_worker(Worker),
+            stop_worker(Worker, HttpDb),
             Ret;
         R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
             do_redirect(Worker, R, Headers, HttpDb, Params, Callback);
@@ -130,12 +123,12 @@ process_stream_response(ReqId, Worker, H
     end.
 
 
-stop_worker(nil) ->
-    ok;
-stop_worker(Worker) when is_pid(Worker) ->
+stop_worker({direct, Worker}, _HttpDb) ->
     unlink(Worker),
     receive {'EXIT', Worker, _} -> ok after 0 -> ok end,
-    catch ibrowse:stop_worker_process(Worker).
+    catch ibrowse:stop_worker_process(Worker);
+stop_worker(Worker, #httpdb{httpc_pool = Pool}) ->
+    ok = couch_httpc_pool:release_worker(Pool, Worker).
 
 
 maybe_retry(Error, Worker, #httpdb{retries = 0} = HttpDb, Params, _Cb) ->
@@ -143,7 +136,7 @@ maybe_retry(Error, Worker, #httpdb{retri
 
 maybe_retry(Error, Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb,
     Params, Cb) ->
-    stop_worker(Worker),
+    stop_worker(Worker, HttpDb),
     Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
     Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
     ?LOG_INFO("Retrying ~s request to ~s in ~p seconds due to error ~p",
@@ -162,7 +155,7 @@ report_error(Worker, HttpDb, Params, Err
     Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
     Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
     do_report_error(Url, Method, Error),
-    stop_worker(Worker),
+    stop_worker(Worker, HttpDb),
     exit({http_request_failed, Method, Url, Error}).
 
 
@@ -231,7 +224,7 @@ oauth_header(#httpdb{url = BaseUrl, oaut
 
 
 do_redirect(Worker, Code, Headers, #httpdb{url = Url} = HttpDb, Params, Cb) ->
-    stop_worker(Worker),
+    stop_worker(Worker, HttpDb),
     RedirectUrl = redirect_url(Headers, Url),
     {HttpDb2, Params2} = after_redirect(RedirectUrl, Code, HttpDb, Params),
     send_req(HttpDb2, Params2, Cb).

Added: couchdb/branches/new_replicator/src/couchdb/couch_httpc_pool.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_httpc_pool.erl?rev=1043196&view=auto
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_httpc_pool.erl (added)
+++ couchdb/branches/new_replicator/src/couchdb/couch_httpc_pool.erl Tue Dec  7 20:57:33 2010
@@ -0,0 +1,117 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_httpc_pool).
+-behaviour(gen_server).
+
+% public API
+-export([start_link/2, stop/1]).
+-export([get_worker/1]).
+-export([release_worker/2]).
+
+% gen_server API
+-export([init/1, handle_call/3, handle_info/2, handle_cast/2]).
+-export([code_change/3, terminate/2]).
+
+-include("couch_db.hrl").
+
+-record(state, {
+    host,
+    port,
+    free = [],
+    busy = [],
+    shutdown = false
+}).
+
+
+start_link(Host, Port) ->
+    gen_server:start_link(?MODULE, {Host, Port}, []).
+
+
+stop(Pool) ->
+    ok = gen_server:call(Pool, stop, infinity).
+
+
+get_worker(Pool) ->
+    gen_server:call(Pool, get_worker, infinity).
+
+
+release_worker(Pool, Worker) ->
+    ok = gen_server:call(Pool, {release_worker, Worker}, infinity).
+
+
+init({Host, Port}) ->
+    process_flag(trap_exit, true),
+    {ok, #state{host = Host, port = Port}}.
+
+
+handle_call(get_worker, _From, #state{shutdown = true} = State) ->
+    {reply, {error, shutting_down}, State};
+
+handle_call(get_worker, _From,
+    #state{free = [], busy = Busy, host = Host, port = Port} = State) ->
+    {ok, Worker} = ibrowse_http_client:start_link({Host, Port}),
+    {reply, {ok, Worker}, State#state{busy = [Worker | Busy]}};
+
+handle_call(get_worker, _From,
+    #state{free = [Worker | RestFree], busy = Busy} = State) ->
+    {reply, {ok, Worker}, State#state{free = RestFree, busy = [Worker | Busy]}};
+
+handle_call({release_worker, Worker}, _From,
+    #state{free = Free, busy = Busy, shutdown = Shutdown} = State) ->
+    case Busy -- [Worker] of
+    Busy ->
+        {reply, ok, State};
+    [] when Shutdown =:= true ->
+        {stop, normal, ok, State};
+    Busy2 ->
+        {reply, ok, State#state{free = [Worker | Free], busy = Busy2}}
+    end;
+
+handle_call(stop, _From, #state{shutdown = true} = State) ->
+    {reply, ok, State};
+
+handle_call(stop, _From, #state{free = Free, busy = []} = State) ->
+    lists:foreach(fun ibrowse_http_client:stop/1, Free),
+    {stop, normal, ok, State};
+
+handle_call(stop, _From, #state{free = Free} = State) ->
+    lists:foreach(fun ibrowse_http_client:stop/1, Free),
+    {reply, ok, State#state{free = [], shutdown = true}}.
+
+handle_cast(Msg, State) ->
+    {stop, {unexpected_cast, Msg}, State}.
+
+
+handle_info({'EXIT', From, Reason},
+    #state{free = Free, busy = Busy, shutdown = Shutdown} = State) ->
+    case Busy -- [From] of
+    Busy ->
+        case Free -- [From] of
+        Free ->
+            {stop, {unknown_process_died, {From, Reason}}, State};
+        Free2 ->
+            {noreply, State#state{free = Free2}}
+        end;
+    [] when Shutdown =:= true ->
+        {stop, normal, State};
+    Busy2 ->
+        {noreply, State#state{busy = Busy2}}
+    end.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+terminate(_Reason, _State) ->
+    ok.