You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2011/01/03 21:35:02 UTC
svn commit: r1054735 - in
/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http:
impl/nio/client/ impl/nio/conn/ impl/nio/pool/ nio/conn/
Author: olegk
Date: Mon Jan 3 20:35:01 2011
New Revision: 1054735
URL: http://svn.apache.org/viewvc?rev=1054735&view=rev
Log:
Keep-alive support; improvements in the connection management code
Modified:
httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncClient.java
httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/DefaultAsyncRequestDirector.java
httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/NHttpClientProtocolHandler.java
httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/ClientConnAdaptor.java
httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/HttpPoolEntry.java
httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/HttpSessionPool.java
httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/PoolingClientConnectionManager.java
httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/PoolEntry.java
httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/SessionPool.java
httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/SessionPoolForRoute.java
httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/conn/ClientConnectionManager.java
httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/conn/ManagedClientConnection.java
Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncClient.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncClient.java?rev=1054735&r1=1054734&r2=1054735&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncClient.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncClient.java Mon Jan 3 20:35:01 2011
@@ -40,8 +40,10 @@ import org.apache.http.HttpRequestInterc
import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.conn.routing.HttpRoutePlanner;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
import org.apache.http.impl.nio.conn.DefaultHttpAsyncRoutePlanner;
import org.apache.http.impl.nio.conn.PoolingClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
@@ -150,13 +152,16 @@ public class BasicHttpAsyncClient implem
return new DefaultConnectionReuseStrategy();
}
+ protected ConnectionKeepAliveStrategy createConnectionKeepAliveStrategy() {
+ return new DefaultConnectionKeepAliveStrategy();
+ }
+
protected HttpRoutePlanner createHttpRoutePlanner() {
return new DefaultHttpAsyncRoutePlanner(this.connmgr.getSchemeRegistry());
}
private void doExecute() {
- NHttpClientProtocolHandler handler = new NHttpClientProtocolHandler(
- createConnectionReuseStrategy());
+ NHttpClientProtocolHandler handler = new NHttpClientProtocolHandler();
try {
IOEventDispatch ioEventDispatch = new InternalClientEventDispatch(handler);
this.ioReactor.execute(ioEventDispatch);
@@ -221,6 +226,8 @@ public class BasicHttpAsyncClient implem
this.connmgr,
createHttpProcessor(),
createHttpRoutePlanner(),
+ createConnectionReuseStrategy(),
+ createConnectionKeepAliveStrategy(),
this.params);
}
httpexchange.start();
Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/DefaultAsyncRequestDirector.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/DefaultAsyncRequestDirector.java?rev=1054735&r1=1054734&r2=1054735&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/DefaultAsyncRequestDirector.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/DefaultAsyncRequestDirector.java Mon Jan 3 20:35:01 2011
@@ -33,6 +33,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
+import org.apache.http.ConnectionReuseStrategy;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpException;
import org.apache.http.HttpHost;
@@ -44,6 +45,7 @@ import org.apache.http.ProtocolVersion;
import org.apache.http.client.params.ClientPNames;
import org.apache.http.client.protocol.ClientContext;
import org.apache.http.client.utils.URIUtils;
+import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.conn.routing.BasicRouteDirector;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.conn.routing.HttpRouteDirector;
@@ -84,16 +86,19 @@ class DefaultAsyncRequestDirector<T> imp
private final HttpProcessor httppocessor;
private final HttpRoutePlanner routePlanner;
private final HttpRouteDirector routeDirector;
+ private final ConnectionReuseStrategy reuseStrategy;
+ private final ConnectionKeepAliveStrategy keepaliveStrategy;
private final HttpParams clientParams;
private ClientParamsStack params;
private RequestWrapper request;
- private RequestWrapper current;
+ private HttpResponse response;
+ private RequestWrapper currentRequest;
+ private HttpResponse currentResponse;
private HttpRoute route;
private boolean routeEstablished;
private Future<ManagedClientConnection> connFuture;
private ManagedClientConnection managedConn;
- private HttpResponse response;
public DefaultAsyncRequestDirector(
final Log log,
@@ -104,6 +109,8 @@ class DefaultAsyncRequestDirector<T> imp
final ClientConnectionManager connmgr,
final HttpProcessor httppocessor,
final HttpRoutePlanner routePlanner,
+ final ConnectionReuseStrategy reuseStrategy,
+ final ConnectionKeepAliveStrategy keepaliveStrategy,
final HttpParams clientParams) {
super();
this.log = log;
@@ -114,6 +121,8 @@ class DefaultAsyncRequestDirector<T> imp
this.connmgr = connmgr;
this.httppocessor = httppocessor;
this.routePlanner = routePlanner;
+ this.reuseStrategy = reuseStrategy;
+ this.keepaliveStrategy = keepaliveStrategy;
this.routeDirector = new BasicRouteDirector();
this.clientParams = clientParams;
}
@@ -158,8 +167,8 @@ class DefaultAsyncRequestDirector<T> imp
case HttpRouteDirector.TUNNEL_TARGET:
this.log.debug("Tunnel required");
HttpRequest connect = createConnectRequest(this.route);
- this.current = wrapRequest(connect);
- this.current.setParams(this.params);
+ this.currentRequest = wrapRequest(connect);
+ this.currentRequest.setParams(this.params);
break;
case HttpRouteDirector.TUNNEL_PROXY:
throw new HttpException("Proxy chains are not supported");
@@ -176,7 +185,7 @@ class DefaultAsyncRequestDirector<T> imp
throw new IllegalStateException("Unknown step indicator "
+ step + " from RouteDirector.");
}
- } while (step > HttpRouteDirector.COMPLETE && this.current == null);
+ } while (step > HttpRouteDirector.COMPLETE && this.currentRequest == null);
}
HttpHost target = (HttpHost) this.params.getParameter(ClientPNames.VIRTUAL_HOST);
@@ -185,22 +194,22 @@ class DefaultAsyncRequestDirector<T> imp
}
HttpHost proxy = this.route.getProxyHost();
- if (this.current == null) {
- this.current = this.request;
+ if (this.currentRequest == null) {
+ this.currentRequest = this.request;
// Re-write request URI if needed
- rewriteRequestURI(this.current, this.route);
+ rewriteRequestURI(this.currentRequest, this.route);
}
// Reset headers on the request wrapper
- this.current.resetHeaders();
+ this.currentRequest.resetHeaders();
- this.localContext.setAttribute(ExecutionContext.HTTP_REQUEST, this.current);
+ this.localContext.setAttribute(ExecutionContext.HTTP_REQUEST, this.currentRequest);
this.localContext.setAttribute(ExecutionContext.HTTP_TARGET_HOST, target);
this.localContext.setAttribute(ExecutionContext.HTTP_PROXY_HOST, proxy);
- this.httppocessor.process(this.current, this.localContext);
+ this.httppocessor.process(this.currentRequest, this.localContext);
if (this.log.isDebugEnabled()) {
- this.log.debug("Request submitted: " + this.current.getRequestLine());
+ this.log.debug("Request submitted: " + this.currentRequest.getRequestLine());
}
- return this.current;
+ return this.currentRequest;
}
public synchronized void produceContent(
@@ -222,16 +231,18 @@ class DefaultAsyncRequestDirector<T> imp
public synchronized void responseReceived(
final HttpResponse response) throws IOException, HttpException {
if (this.log.isDebugEnabled()) {
- this.log.debug("Response received: " + response.getStatusLine());
+ this.log.debug("Response: " + response.getStatusLine());
}
- response.setParams(this.params);
- this.localContext.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
- this.httppocessor.process(response, this.localContext);
+ this.currentResponse = response;
+ this.currentResponse.setParams(this.params);
+ this.localContext.setAttribute(ExecutionContext.HTTP_RESPONSE, this.currentResponse);
+ this.httppocessor.process(this.currentResponse, this.localContext);
- int status = response.getStatusLine().getStatusCode();
+ int status = this.currentResponse.getStatusLine().getStatusCode();
if (!this.routeEstablished) {
- if (this.current.getMethod().equalsIgnoreCase("CONNECT") && status == HttpStatus.SC_OK) {
+ String method = this.currentRequest.getMethod();
+ if (method.equalsIgnoreCase("CONNECT") && status == HttpStatus.SC_OK) {
this.managedConn.tunnelTarget(this.params);
} else {
this.response = response;
@@ -242,7 +253,7 @@ class DefaultAsyncRequestDirector<T> imp
if (this.response != null) {
this.responseConsumer.responseReceived(response);
}
- this.current = null;
+ this.currentRequest = null;
}
public synchronized void consumeContent(
@@ -283,12 +294,32 @@ class DefaultAsyncRequestDirector<T> imp
}
public synchronized void responseCompleted() {
- this.log.debug("Response completed");
+ this.log.debug("Response fully read");
try {
+ if (this.reuseStrategy.keepAlive(this.currentResponse, this.localContext)) {
+ long duration = this.keepaliveStrategy.getKeepAliveDuration(
+ this.currentResponse, this.localContext);
+ if (this.log.isDebugEnabled()) {
+ String s;
+ if (duration >= 0) {
+ s = duration + " " + TimeUnit.MILLISECONDS;
+ } else {
+ s = "ever";
+ }
+ this.log.debug("Connection can be kept alive for " + s);
+ }
+ this.managedConn.setIdleDuration(duration, TimeUnit.MILLISECONDS);
+ } else {
+ try {
+ this.managedConn.close();
+ } catch (IOException ex) {
+ this.log.debug("I/O error closing connection", ex);
+ }
+ }
if (this.response != null) {
this.responseConsumer.responseCompleted();
if (this.responseConsumer.isDone()) {
- this.log.debug("Response processing completed");
+ this.log.debug("Response processed");
this.resultFuture.completed(this.responseConsumer.getResult());
releaseResources();
}
Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/NHttpClientProtocolHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/NHttpClientProtocolHandler.java?rev=1054735&r1=1054734&r2=1054735&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/NHttpClientProtocolHandler.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/NHttpClientProtocolHandler.java Mon Jan 3 20:35:01 2011
@@ -31,7 +31,6 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.http.ConnectionReuseStrategy;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpException;
import org.apache.http.HttpInetConnection;
@@ -58,16 +57,12 @@ import org.apache.http.protocol.HttpCont
*/
class NHttpClientProtocolHandler implements NHttpClientHandler {
- private final Log log;
+ private final Log log = LogFactory.getLog(getClass());
private static final String HTTP_EXCHNAGE = "http.nio.exchange";
- private final ConnectionReuseStrategy connStrategy;
-
- public NHttpClientProtocolHandler(
- final ConnectionReuseStrategy connStrategy) {
- this.connStrategy = connStrategy;
- this.log = LogFactory.getLog(getClass());
+ public NHttpClientProtocolHandler() {
+ super();
}
private void closeConnection(final NHttpClientConnection conn) {
@@ -358,7 +353,6 @@ class NHttpClientProtocolHandler impleme
if (!httpexchange.isValid()) {
conn.close();
}
- HttpContext context = conn.getContext();
HttpRequest request = httpexchange.getRequest();
HttpResponse response = httpexchange.getResponse();
@@ -367,10 +361,6 @@ class NHttpClientProtocolHandler impleme
if (method.equalsIgnoreCase("CONNECT") && status == HttpStatus.SC_OK) {
this.log.debug("CONNECT method succeeded");
conn.resetInput();
- } else {
- if (!this.connStrategy.keepAlive(response, context)) {
- conn.close();
- }
}
if (this.log.isDebugEnabled()) {
this.log.debug("Response processed " + formatState(conn, httpexchange));
@@ -380,10 +370,6 @@ class NHttpClientProtocolHandler impleme
httpexchange.setHandler(null);
}
httpexchange.reset();
- if (conn.isOpen()) {
- // Ready for another request
- conn.requestOutput();
- }
}
private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {
Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/ClientConnAdaptor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/ClientConnAdaptor.java?rev=1054735&r1=1054734&r2=1054735&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/ClientConnAdaptor.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/ClientConnAdaptor.java Mon Jan 3 20:35:01 2011
@@ -27,6 +27,7 @@
package org.apache.http.impl.nio.conn;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
import org.apache.http.HttpConnectionMetrics;
import org.apache.http.HttpException;
@@ -56,8 +57,8 @@ import org.apache.http.protocol.HttpCont
class ClientConnAdaptor implements ManagedClientConnection {
private final ClientConnectionManager manager;
- private volatile HttpPoolEntry entry;
- private volatile OperatedClientConnection conn;
+ private HttpPoolEntry entry;
+ private OperatedClientConnection conn;
private volatile boolean released;
private volatile boolean reusable;
@@ -88,7 +89,7 @@ class ClientConnAdaptor implements Manag
return;
}
this.released = true;
- this.manager.releaseConnection(this);
+ this.manager.releaseConnection(this, this.entry.getExpiry(), TimeUnit.MILLISECONDS);
this.entry = null;
this.conn = null;
}
@@ -101,7 +102,7 @@ class ClientConnAdaptor implements Manag
this.reusable = false;
IOSession iosession = this.entry.getIOSession();
iosession.shutdown();
- this.manager.releaseConnection(this);
+ this.manager.releaseConnection(this, -1, TimeUnit.MILLISECONDS);
this.entry = null;
this.conn = null;
}
@@ -138,11 +139,16 @@ class ClientConnAdaptor implements Manag
this.reusable = true;
}
- public void shutdown() {
+ public synchronized void shutdown() {
abortConnection();
}
- public void close() {
+ public synchronized void close() throws IOException {
+ if (this.released) {
+ return;
+ }
+ this.conn.close();
+ this.reusable = false;
releaseConnection();
}
@@ -358,6 +364,13 @@ class ClientConnAdaptor implements Manag
tracker.layerProtocol(layeringStrategy.isSecure());
}
+ public void setIdleDuration(final long duration, final TimeUnit tunit) {
+ if (tunit == null) {
+ throw new IllegalArgumentException("Time unit may not be null");
+ }
+ this.entry.setExpiry(tunit.toMillis(duration));
+ }
+
@Override
public synchronized String toString() {
HttpRoute route = this.entry.getPlannedRoute();
Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/HttpPoolEntry.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/HttpPoolEntry.java?rev=1054735&r1=1054734&r2=1054735&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/HttpPoolEntry.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/HttpPoolEntry.java Mon Jan 3 20:35:01 2011
@@ -26,17 +26,20 @@
*/
package org.apache.http.impl.nio.conn;
+import org.apache.commons.logging.Log;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.conn.routing.RouteTracker;
import org.apache.http.impl.nio.pool.PoolEntry;
import org.apache.http.nio.reactor.IOSession;
-public class HttpPoolEntry extends PoolEntry<HttpRoute> {
+class HttpPoolEntry extends PoolEntry<HttpRoute> {
+ private final Log log;
private final RouteTracker tracker;
- public HttpPoolEntry(final HttpRoute route, final IOSession session) {
+ HttpPoolEntry(final Log log, final HttpRoute route, final IOSession session) {
super(route, session);
+ this.log = log;
this.tracker = new RouteTracker(route);
}
@@ -67,4 +70,13 @@ public class HttpPoolEntry extends PoolE
return this.tracker.toRoute();
}
+ @Override
+ public boolean isExpired(long now) {
+ boolean expired = super.isExpired(now);
+ if (expired && this.log.isDebugEnabled()) {
+ this.log.debug("Connection expired: " + this);
+ }
+ return expired;
+ }
+
}
Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/HttpSessionPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/HttpSessionPool.java?rev=1054735&r1=1054734&r2=1054735&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/HttpSessionPool.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/HttpSessionPool.java Mon Jan 3 20:35:01 2011
@@ -28,7 +28,9 @@ package org.apache.http.impl.nio.conn;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.logging.Log;
import org.apache.http.HttpHost;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.impl.nio.pool.PoolEntryFactory;
@@ -41,10 +43,13 @@ import org.apache.http.nio.reactor.IOSes
class HttpSessionPool extends SessionPool<HttpRoute, HttpPoolEntry> {
- public HttpSessionPool(
- final ConnectingIOReactor ioreactor, final SchemeRegistry schemeRegistry) {
+ HttpSessionPool(
+ final Log log,
+ final ConnectingIOReactor ioreactor,
+ final SchemeRegistry schemeRegistry,
+ long timeToLive, final TimeUnit tunit) {
super(ioreactor,
- new InternalEntryFactory(),
+ new InternalEntryFactory(log, tunit.toMillis(timeToLive)),
new InternalRouteResolver(schemeRegistry),
20, 50);
}
@@ -80,8 +85,24 @@ class HttpSessionPool extends SessionPoo
static class InternalEntryFactory implements PoolEntryFactory<HttpRoute, HttpPoolEntry> {
+ private final Log log;
+ private final long connTimeToLive;
+
+ InternalEntryFactory(final Log log, final long connTimeToLive) {
+ super();
+ this.log = log;
+ this.connTimeToLive = connTimeToLive;
+ }
+
public HttpPoolEntry createEntry(final HttpRoute route, final IOSession session) {
- return new HttpPoolEntry(route, session);
+ HttpPoolEntry entry = new HttpPoolEntry(this.log, route, session);
+ long now = System.currentTimeMillis();
+ entry.setCreated(now);
+ entry.setUpdated(now);
+ if (this.connTimeToLive > 0) {
+ entry.setDeadline(now + this.connTimeToLive);
+ }
+ return entry;
}
};
Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/PoolingClientConnectionManager.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/PoolingClientConnectionManager.java?rev=1054735&r1=1054734&r2=1054735&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/PoolingClientConnectionManager.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/PoolingClientConnectionManager.java Mon Jan 3 20:35:01 2011
@@ -40,7 +40,6 @@ import org.apache.http.nio.conn.ClientCo
import org.apache.http.nio.conn.PoolStats;
import org.apache.http.nio.conn.scheme.SchemeRegistry;
import org.apache.http.nio.reactor.ConnectingIOReactor;
-import org.apache.http.nio.reactor.IOSession;
public class PoolingClientConnectionManager implements ClientConnectionManager {
@@ -51,7 +50,8 @@ public class PoolingClientConnectionMana
public PoolingClientConnectionManager(
final ConnectingIOReactor ioreactor,
- final SchemeRegistry schemeRegistry) {
+ final SchemeRegistry schemeRegistry,
+ final long timeToLive, final TimeUnit tunit) {
super();
if (ioreactor == null) {
throw new IllegalArgumentException("I/O reactor may not be null");
@@ -59,27 +59,42 @@ public class PoolingClientConnectionMana
if (schemeRegistry == null) {
throw new IllegalArgumentException("Scheme registory may not be null");
}
- this.pool = new HttpSessionPool(ioreactor, schemeRegistry);
+ if (tunit == null) {
+ throw new IllegalArgumentException("Time unit may not be null");
+ }
+ this.pool = new HttpSessionPool(this.log, ioreactor, schemeRegistry, timeToLive, tunit);
this.schemeRegistry = schemeRegistry;
}
public PoolingClientConnectionManager(
+ final ConnectingIOReactor ioreactor,
+ final SchemeRegistry schemeRegistry) {
+ this(ioreactor, schemeRegistry, -1, TimeUnit.MILLISECONDS);
+ }
+
+ public PoolingClientConnectionManager(
final ConnectingIOReactor ioreactor) {
- this(ioreactor, SchemeRegistryFactory.createDefault());
+ this(ioreactor, SchemeRegistryFactory.createDefault(), -1, TimeUnit.MILLISECONDS);
}
public SchemeRegistry getSchemeRegistry() {
return this.schemeRegistry;
}
- public synchronized Future<ManagedClientConnection> leaseConnection(
+ public Future<ManagedClientConnection> leaseConnection(
final HttpRoute route,
final Object state,
final long timeout,
- final TimeUnit timeUnit,
+ final TimeUnit tunit,
final FutureCallback<ManagedClientConnection> callback) {
+ if (route == null) {
+ throw new IllegalArgumentException("HTTP route may not be null");
+ }
+ if (tunit == null) {
+ throw new IllegalArgumentException("Time unit may not be null");
+ }
if (this.log.isDebugEnabled()) {
- this.log.debug("I/O session request: route[" + route + "][state: " + state + "]");
+ this.log.debug("Connection request: route[" + route + "][state: " + state + "]");
PoolStats totals = this.pool.getTotalStats();
PoolStats stats = this.pool.getStats(route);
this.log.debug("Total: " + totals);
@@ -87,39 +102,64 @@ public class PoolingClientConnectionMana
}
BasicFuture<ManagedClientConnection> future = new BasicFuture<ManagedClientConnection>(
callback);
- this.pool.lease(route, state, timeout, timeUnit, new InternalPoolEntryCallback(future));
+ this.pool.lease(route, state, timeout, tunit, new InternalPoolEntryCallback(future));
if (this.log.isDebugEnabled()) {
if (!future.isDone()) {
- this.log.debug("I/O session could not be allocated immediately: " +
+ this.log.debug("Connection could not be allocated immediately: " +
"route[" + route + "][state: " + state + "]");
}
}
return future;
}
- public synchronized void releaseConnection(final ManagedClientConnection conn) {
+ public void releaseConnection(
+ final ManagedClientConnection conn,
+ final long validDuration,
+ final TimeUnit tunit) {
+ if (conn == null) {
+ throw new IllegalArgumentException("HTTP connection may not be null");
+ }
if (!(conn instanceof ClientConnAdaptor)) {
- throw new IllegalArgumentException
- ("I/O session class mismatch, " +
- "I/O session not obtained from this manager");
+ throw new IllegalArgumentException("Connection class mismatch, " +
+ "connection not obtained from this manager");
+ }
+ if (tunit == null) {
+ throw new IllegalArgumentException("Time unit may not be null");
}
ClientConnAdaptor adaptor = (ClientConnAdaptor) conn;
ClientConnectionManager manager = adaptor.getManager();
if (manager != null && manager != this) {
- throw new IllegalArgumentException
- ("I/O session not obtained from this manager");
+ throw new IllegalArgumentException("connection not obtained from this manager");
}
HttpPoolEntry entry = adaptor.getEntry();
- IOSession iosession = entry.getIOSession();
if (this.log.isDebugEnabled()) {
HttpRoute route = entry.getPlannedRoute();
+ Object state = entry.getState();
+ this.log.debug("Releasing connection: route[" + route + "][state: " + state + "]");
PoolStats totals = this.pool.getTotalStats();
PoolStats stats = this.pool.getStats(route);
this.log.debug("Total: " + totals);
this.log.debug("Route [" + route + "]: " + stats);
- this.log.debug("I/O session released: " + entry);
}
- this.pool.release(entry, adaptor.isReusable() && !iosession.isClosed());
+
+ boolean reusable = adaptor.isReusable();
+ if (reusable) {
+ entry.setExpiry(tunit.toMillis(validDuration));
+ if (this.log.isDebugEnabled()) {
+ entry.setExpiry(tunit.toMillis(validDuration));
+ String s;
+ if (validDuration >= 0) {
+ s = validDuration + " " + tunit;
+ } else {
+ s = "ever";
+ }
+ HttpRoute route = entry.getPlannedRoute();
+ Object state = entry.getState();
+ this.log.debug("Pooling connection" +
+ " [" + route + "][" + state + "]; keep alive for " + s);
+ }
+ }
+ this.pool.release(entry, reusable);
}
public PoolStats getTotalStats() {
@@ -142,8 +182,20 @@ public class PoolingClientConnectionMana
this.pool.setMaxPerHost(route, max);
}
- public synchronized void shutdown() {
- this.log.debug("I/O session manager shut down");
+ public void closeIdleConnections(long idleTimeout, final TimeUnit tunit) {
+ if (log.isDebugEnabled()) {
+ log.debug("Closing connections idle longer than " + idleTimeout + " " + tunit);
+ }
+ this.pool.closeIdle(idleTimeout, tunit);
+ }
+
+ public void closeExpiredConnections() {
+ log.debug("Closing expired connections");
+ this.pool.closeExpired();
+ }
+
+ public void shutdown() {
+ this.log.debug("Connection manager shut down");
this.pool.shutdown();
}
@@ -159,7 +211,7 @@ public class PoolingClientConnectionMana
public void completed(final HttpPoolEntry entry) {
if (log.isDebugEnabled()) {
- log.debug("I/O session allocated: " + entry);
+ log.debug("Connection allocated: " + entry);
}
ManagedClientConnection conn = new ClientConnAdaptor(
PoolingClientConnectionManager.this,
@@ -171,13 +223,13 @@ public class PoolingClientConnectionMana
public void failed(final Exception ex) {
if (log.isDebugEnabled()) {
- log.debug("I/O session request failed", ex);
+ log.debug("Connection request failed", ex);
}
this.future.failed(ex);
}
public void cancelled() {
- log.debug("I/O session request cancelled");
+ log.debug("Connection request cancelled");
this.future.cancel(true);
}
Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/PoolEntry.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/PoolEntry.java?rev=1054735&r1=1054734&r2=1054735&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/PoolEntry.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/PoolEntry.java Mon Jan 3 20:35:01 2011
@@ -38,6 +38,10 @@ public abstract class PoolEntry<T> {
private final T route;
private final IOSession session;
private Object state;
+ private long created;
+ private long updated;
+ private long deadline;
+ private long expiry;
public PoolEntry(final T route, final IOSession session) {
super();
@@ -62,6 +66,48 @@ public abstract class PoolEntry<T> {
this.state = state;
}
+ public long getCreated() {
+ return this.created;
+ }
+
+ public void setCreated(long created) {
+ this.created = created;
+ }
+
+ public long getUpdated() {
+ return this.updated;
+ }
+
+ public void setUpdated(long updated) {
+ this.updated = updated;
+ }
+
+ public long getDeadline() {
+ return this.deadline;
+ }
+
+ public void setDeadline(long deadline) {
+ this.deadline = deadline;
+ }
+
+ public long getExpiry() {
+ return this.expiry;
+ }
+
+ public void setExpiry(long expiry) {
+ this.expiry = expiry;
+ }
+
+ public boolean isExpired(long now) {
+ if (this.deadline > 0 && this.deadline < now) {
+ return true;
+ }
+ if (this.expiry > 0 && this.expiry < now) {
+ return true;
+ }
+ return false;
+ }
+
@Override
public String toString() {
StringBuilder buffer = new StringBuilder();
Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/SessionPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/SessionPool.java?rev=1054735&r1=1054734&r2=1054735&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/SessionPool.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/SessionPool.java Mon Jan 3 20:35:01 2011
@@ -28,6 +28,7 @@ package org.apache.http.impl.nio.pool;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.ListIterator;
import java.util.Map;
@@ -123,15 +124,20 @@ public abstract class SessionPool<T, E e
public void lease(
final T route, final Object state,
- final long connectTimeout, final TimeUnit timeUnit,
+ final long connectTimeout, final TimeUnit tunit,
final PoolEntryCallback<T, E> callback) {
+ if (route == null) {
+ throw new IllegalArgumentException("Route may not be null");
+ }
+ if (tunit == null) {
+ throw new IllegalArgumentException("Time unit must not be null.");
+ }
if (this.isShutDown) {
throw new IllegalStateException("Session pool has been shut down");
}
this.lock.lock();
try {
- TimeUnit unit = timeUnit != null ? timeUnit : TimeUnit.MILLISECONDS;
- int timeout = (int) unit.toMillis(connectTimeout);
+ int timeout = (int) tunit.toMillis(connectTimeout);
if (timeout < 0) {
timeout = 0;
}
@@ -155,6 +161,8 @@ public abstract class SessionPool<T, E e
pool.freeEntry(entry, reusable);
if (reusable) {
this.availableSessions.add(entry);
+ } else {
+ entryShutdown(entry);
}
processPendingRequests();
}
@@ -350,6 +358,9 @@ public abstract class SessionPool<T, E e
}
public PoolStats getStats(final T route) {
+ if (route == null) {
+ throw new IllegalArgumentException("Route may not be null");
+ }
this.lock.lock();
try {
SessionPoolForRoute<T, E> pool = getPool(route);
@@ -363,6 +374,49 @@ public abstract class SessionPool<T, E e
}
}
+ public void closeIdle(long idletime, final TimeUnit tunit) {
+ if (tunit == null) {
+ throw new IllegalArgumentException("Time unit must not be null.");
+ }
+ long time = tunit.toMillis(idletime);
+ if (time < 0) {
+ time = 0;
+ }
+ long deadline = System.currentTimeMillis() - time;
+ this.lock.lock();
+ try {
+ Iterator<E> it = this.availableSessions.iterator();
+ while (it.hasNext()) {
+ E entry = it.next();
+ if (entry.getUpdated() <= deadline) {
+ it.remove();
+ entryShutdown(entry);
+ }
+ }
+ processPendingRequests();
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ public void closeExpired() {
+ long now = System.currentTimeMillis();
+ this.lock.lock();
+ try {
+ Iterator<E> it = this.availableSessions.iterator();
+ while (it.hasNext()) {
+ E entry = it.next();
+ if (entry.isExpired(now)) {
+ it.remove();
+ entryShutdown(entry);
+ }
+ }
+ processPendingRequests();
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
@Override
public String toString() {
StringBuilder buffer = new StringBuilder();
Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/SessionPoolForRoute.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/SessionPoolForRoute.java?rev=1054735&r1=1054734&r2=1054735&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/SessionPoolForRoute.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/SessionPoolForRoute.java Mon Jan 3 20:35:01 2011
@@ -76,7 +76,7 @@ class SessionPoolForRoute<T, E extends P
while (it.hasPrevious()) {
E entry = it.previous();
IOSession iosession = entry.getIOSession();
- if (iosession.isClosed()) {
+ if (iosession.isClosed() || entry.isExpired(System.currentTimeMillis())) {
it.remove();
} else {
if (entry.getState() == null || entry.getState().equals(state)) {
Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/conn/ClientConnectionManager.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/conn/ClientConnectionManager.java?rev=1054735&r1=1054734&r2=1054735&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/conn/ClientConnectionManager.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/conn/ClientConnectionManager.java Mon Jan 3 20:35:01 2011
@@ -42,7 +42,8 @@ public interface ClientConnectionManager
long connectTimeout, TimeUnit timeUnit,
FutureCallback<ManagedClientConnection> callback);
- void releaseConnection(ManagedClientConnection session);
+ void releaseConnection(ManagedClientConnection session,
+ long validDuration, TimeUnit timeUnit);
void shutdown();
Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/conn/ManagedClientConnection.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/conn/ManagedClientConnection.java?rev=1054735&r1=1054734&r2=1054735&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/conn/ManagedClientConnection.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/conn/ManagedClientConnection.java Mon Jan 3 20:35:01 2011
@@ -27,6 +27,7 @@
package org.apache.http.nio.conn;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
import org.apache.http.HttpHost;
import org.apache.http.conn.ConnectionReleaseTrigger;
@@ -57,4 +58,6 @@ public interface ManagedClientConnection
void layerProtocol(HttpContext context, HttpParams params) throws IOException;
+ void setIdleDuration(long duration, TimeUnit tunit);
+
}