You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2010/12/07 21:57:33 UTC
svn commit: r1043196 - in /couchdb/branches/new_replicator:
etc/couchdb/default.ini.tpl.in src/couchdb/Makefile.am
src/couchdb/couch_api_wrap.erl src/couchdb/couch_api_wrap.hrl
src/couchdb/couch_api_wrap_httpc.erl src/couchdb/couch_httpc_pool.erl
Author: fdmanana
Date: Tue Dec 7 20:57:33 2010
New Revision: 1043196
URL: http://svn.apache.org/viewvc?rev=1043196&view=rev
Log:
New replicator: added custom httpc connection pool manager. This avoids sharing http connections between different replications that share the same remote endpoints.
Added:
couchdb/branches/new_replicator/src/couchdb/couch_httpc_pool.erl
Modified:
couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in
couchdb/branches/new_replicator/src/couchdb/Makefile.am
couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl
couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl
Modified: couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in?rev=1043196&r1=1043195&r2=1043196&view=diff
==============================================================================
--- couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in (original)
+++ couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in Tue Dec 7 20:57:33 2010
@@ -118,8 +118,6 @@ compressible_types = text/*, application
[replicator]
; should be at least 2
worker_processes = 10
-; the maximum number of TCP connections to use against a single server
-max_connections_per_server = 100
; set to true to validate peer certificates
verify_ssl_certificates = false
; file containing a list of peer trusted certificates (PEM format)
Modified: couchdb/branches/new_replicator/src/couchdb/Makefile.am
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/Makefile.am?rev=1043196&r1=1043195&r2=1043196&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/Makefile.am (original)
+++ couchdb/branches/new_replicator/src/couchdb/Makefile.am Tue Dec 7 20:57:33 2010
@@ -41,6 +41,7 @@ source_files = \
couch_external_manager.erl \
couch_external_server.erl \
couch_file.erl \
+ couch_httpc_pool.erl \
couch_httpd.erl \
couch_httpd_db.erl \
couch_httpd_auth.erl \
@@ -108,6 +109,7 @@ compiled_files = \
couch_external_manager.beam \
couch_external_server.beam \
couch_file.beam \
+ couch_httpc_pool.beam \
couch_httpd.beam \
couch_httpd_db.beam \
couch_httpd_auth.beam \
Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=1043196&r1=1043195&r2=1043196&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Tue Dec 7 20:57:33 2010
@@ -97,8 +97,9 @@ db_open(DbName, Options, Create) ->
throw({unauthorized, DbName})
end.
-db_close(#httpdb{}) ->
- ok;
+db_close(#httpdb{httpc_pool = Pool}) ->
+ unlink(Pool),
+ ok = couch_httpc_pool:stop(Pool);
db_close(DbName) ->
couch_db:close(DbName).
Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl?rev=1043196&r1=1043195&r2=1043196&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl Tue Dec 7 20:57:33 2010
@@ -20,7 +20,8 @@
proxy_options = [],
ssl_options = [],
retries = 10,
- wait = 250 % milliseconds
+ wait = 250, % milliseconds
+ httpc_pool = nil
}).
-record(oauth, {
Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl?rev=1043196&r1=1043195&r2=1043196&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl Tue Dec 7 20:57:33 2010
@@ -29,20 +29,14 @@
-define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
-setup(#httpdb{url = Url} = Db) ->
+setup(#httpdb{url = Url, httpc_pool = nil} = Db) ->
#url{host = Host, port = Port} = ibrowse_lib:parse_url(Url),
- MaxSessions = list_to_integer(
- couch_config:get("replicator", "max_connections_per_server", "100")),
- ok = ibrowse:set_max_sessions(Host, Port, MaxSessions),
- ok = ibrowse:set_max_pipeline_size(Host, Port, 1),
- ok = couch_config:register(
- fun("replicator", "max_connections_per_server", NewMax) ->
- ok = ibrowse:set_max_sessions(Host, Port, list_to_integer(NewMax))
- end),
- {ok, Db}.
+ {ok, Pid} = couch_httpc_pool:start_link(Host, Port),
+ {ok, Db#httpdb{httpc_pool = Pid}}.
-send_req(#httpdb{headers = BaseHeaders} = HttpDb, Params, Callback) ->
+send_req(#httpdb{headers = BaseHeaders, httpc_pool = Pool} = HttpDb,
+ Params, Callback) ->
Method = get_value(method, Params, get),
Headers = get_value(headers, Params, []) ++ BaseHeaders,
Body = get_value(body, Params, []),
@@ -63,20 +57,19 @@ send_req(#httpdb{headers = BaseHeaders}
Url = full_url(HttpDb, Params),
case get_value(direct, Params, false) of
true ->
- {ok, Worker} = ibrowse:spawn_link_worker_process(Url),
+ {ok, Pid} = ibrowse:spawn_link_worker_process(Url),
+ Worker = {direct, Pid},
Response = ibrowse:send_req_direct(
- Worker, Url, Headers2, Method, Body, IbrowseOptions, infinity);
+ Pid, Url, Headers2, Method, Body, IbrowseOptions, infinity);
false ->
- Worker = nil,
- Response = ibrowse:send_req(
- Url, Headers2, Method, Body, IbrowseOptions, infinity)
+ {ok, Worker} = couch_httpc_pool:get_worker(Pool),
+ Response = ibrowse:send_req_direct(
+ Worker, Url, Headers2, Method, Body, IbrowseOptions, infinity)
end,
process_response(Response, Worker, HttpDb, Params, Callback).
-process_response({error, retry_later}, _Worker, HttpDb, Params, Callback) ->
- % this means that the config option "max_connections_per_server" should
- % probably be increased
+process_response({error, sel_conn_closed}, _Worker, HttpDb, Params, Callback) ->
ok = timer:sleep(?RETRY_LATER_WAIT),
send_req(HttpDb, Params, Callback);
@@ -89,7 +82,7 @@ process_response({ibrowse_req_id, ReqId}
process_stream_response(ReqId, Worker, HttpDb, Params, Callback);
process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
- stop_worker(Worker),
+ stop_worker(Worker, HttpDb),
case list_to_integer(Code) of
Ok when Ok =:= 200 ; Ok =:= 201 ; (Ok >= 400 andalso Ok < 500) ->
EJson = case Body of
@@ -118,7 +111,7 @@ process_stream_response(ReqId, Worker, H
stream_data_self(HttpDb, Params, Worker, ReqId, Callback)
end,
Ret = Callback(Ok, Headers, StreamDataFun),
- stop_worker(Worker),
+ stop_worker(Worker, HttpDb),
Ret;
R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
do_redirect(Worker, R, Headers, HttpDb, Params, Callback);
@@ -130,12 +123,12 @@ process_stream_response(ReqId, Worker, H
end.
-stop_worker(nil) ->
- ok;
-stop_worker(Worker) when is_pid(Worker) ->
+stop_worker({direct, Worker}, _HttpDb) ->
unlink(Worker),
receive {'EXIT', Worker, _} -> ok after 0 -> ok end,
- catch ibrowse:stop_worker_process(Worker).
+ catch ibrowse:stop_worker_process(Worker);
+stop_worker(Worker, #httpdb{httpc_pool = Pool}) ->
+ ok = couch_httpc_pool:release_worker(Pool, Worker).
maybe_retry(Error, Worker, #httpdb{retries = 0} = HttpDb, Params, _Cb) ->
@@ -143,7 +136,7 @@ maybe_retry(Error, Worker, #httpdb{retri
maybe_retry(Error, Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb,
Params, Cb) ->
- stop_worker(Worker),
+ stop_worker(Worker, HttpDb),
Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
?LOG_INFO("Retrying ~s request to ~s in ~p seconds due to error ~p",
@@ -162,7 +155,7 @@ report_error(Worker, HttpDb, Params, Err
Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
do_report_error(Url, Method, Error),
- stop_worker(Worker),
+ stop_worker(Worker, HttpDb),
exit({http_request_failed, Method, Url, Error}).
@@ -231,7 +224,7 @@ oauth_header(#httpdb{url = BaseUrl, oaut
do_redirect(Worker, Code, Headers, #httpdb{url = Url} = HttpDb, Params, Cb) ->
- stop_worker(Worker),
+ stop_worker(Worker, HttpDb),
RedirectUrl = redirect_url(Headers, Url),
{HttpDb2, Params2} = after_redirect(RedirectUrl, Code, HttpDb, Params),
send_req(HttpDb2, Params2, Cb).
Added: couchdb/branches/new_replicator/src/couchdb/couch_httpc_pool.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_httpc_pool.erl?rev=1043196&view=auto
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_httpc_pool.erl (added)
+++ couchdb/branches/new_replicator/src/couchdb/couch_httpc_pool.erl Tue Dec 7 20:57:33 2010
@@ -0,0 +1,117 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_httpc_pool).
+-behaviour(gen_server).
+
+% public API
+-export([start_link/2, stop/1]).
+-export([get_worker/1]).
+-export([release_worker/2]).
+
+% gen_server API
+-export([init/1, handle_call/3, handle_info/2, handle_cast/2]).
+-export([code_change/3, terminate/2]).
+
+-include("couch_db.hrl").
+
+-record(state, {
+ host,
+ port,
+ free = [],
+ busy = [],
+ shutdown = false
+}).
+
+
+start_link(Host, Port) ->
+ gen_server:start_link(?MODULE, {Host, Port}, []).
+
+
+stop(Pool) ->
+ ok = gen_server:call(Pool, stop, infinity).
+
+
+get_worker(Pool) ->
+ gen_server:call(Pool, get_worker, infinity).
+
+
+release_worker(Pool, Worker) ->
+ ok = gen_server:call(Pool, {release_worker, Worker}, infinity).
+
+
+init({Host, Port}) ->
+ process_flag(trap_exit, true),
+ {ok, #state{host = Host, port = Port}}.
+
+
+handle_call(get_worker, _From, #state{shutdown = true} = State) ->
+ {reply, {error, shutting_down}, State};
+
+handle_call(get_worker, _From,
+ #state{free = [], busy = Busy, host = Host, port = Port} = State) ->
+ {ok, Worker} = ibrowse_http_client:start_link({Host, Port}),
+ {reply, {ok, Worker}, State#state{busy = [Worker | Busy]}};
+
+handle_call(get_worker, _From,
+ #state{free = [Worker | RestFree], busy = Busy} = State) ->
+ {reply, {ok, Worker}, State#state{free = RestFree, busy = [Worker | Busy]}};
+
+handle_call({release_worker, Worker}, _From,
+ #state{free = Free, busy = Busy, shutdown = Shutdown} = State) ->
+ case Busy -- [Worker] of
+ Busy ->
+ {reply, ok, State};
+ [] when Shutdown =:= true ->
+ {stop, normal, ok, State};
+ Busy2 ->
+ {reply, ok, State#state{free = [Worker | Free], busy = Busy2}}
+ end;
+
+handle_call(stop, _From, #state{shutdown = true} = State) ->
+ {reply, ok, State};
+
+handle_call(stop, _From, #state{free = Free, busy = []} = State) ->
+ lists:foreach(fun ibrowse_http_client:stop/1, Free),
+ {stop, normal, ok, State};
+
+handle_call(stop, _From, #state{free = Free} = State) ->
+ lists:foreach(fun ibrowse_http_client:stop/1, Free),
+ {reply, ok, State#state{free = [], shutdown = true}}.
+
+handle_cast(Msg, State) ->
+ {stop, {unexpected_cast, Msg}, State}.
+
+
+handle_info({'EXIT', From, Reason},
+ #state{free = Free, busy = Busy, shutdown = Shutdown} = State) ->
+ case Busy -- [From] of
+ Busy ->
+ case Free -- [From] of
+ Free ->
+ {stop, {unknown_process_died, {From, Reason}}, State};
+ Free2 ->
+ {noreply, State#state{free = Free2}}
+ end;
+ [] when Shutdown =:= true ->
+ {stop, normal, State};
+ Busy2 ->
+ {noreply, State#state{busy = Busy2}}
+ end.
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+terminate(_Reason, _State) ->
+ ok.