You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2011/01/03 21:35:02 UTC

svn commit: r1054735 - in /httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http: impl/nio/client/ impl/nio/conn/ impl/nio/pool/ nio/conn/

Author: olegk
Date: Mon Jan  3 20:35:01 2011
New Revision: 1054735

URL: http://svn.apache.org/viewvc?rev=1054735&view=rev
Log:
Keep-alive support; improvements in the connection management code

Modified:
    httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncClient.java
    httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/DefaultAsyncRequestDirector.java
    httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/NHttpClientProtocolHandler.java
    httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/ClientConnAdaptor.java
    httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/HttpPoolEntry.java
    httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/HttpSessionPool.java
    httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/PoolingClientConnectionManager.java
    httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/PoolEntry.java
    httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/SessionPool.java
    httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/SessionPoolForRoute.java
    httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/conn/ClientConnectionManager.java
    httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/conn/ManagedClientConnection.java

Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncClient.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncClient.java?rev=1054735&r1=1054734&r2=1054735&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncClient.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncClient.java Mon Jan  3 20:35:01 2011
@@ -40,8 +40,10 @@ import org.apache.http.HttpRequestInterc
 import org.apache.http.HttpResponse;
 import org.apache.http.client.ClientProtocolException;
 import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.conn.ConnectionKeepAliveStrategy;
 import org.apache.http.conn.routing.HttpRoutePlanner;
 import org.apache.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
 import org.apache.http.impl.nio.conn.DefaultHttpAsyncRoutePlanner;
 import org.apache.http.impl.nio.conn.PoolingClientConnectionManager;
 import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
@@ -150,13 +152,16 @@ public class BasicHttpAsyncClient implem
         return new DefaultConnectionReuseStrategy();
     }
 
+    protected ConnectionKeepAliveStrategy createConnectionKeepAliveStrategy() {
+        return new DefaultConnectionKeepAliveStrategy();
+    }
+
     protected HttpRoutePlanner createHttpRoutePlanner() {
         return new DefaultHttpAsyncRoutePlanner(this.connmgr.getSchemeRegistry());
     }
 
     private void doExecute() {
-        NHttpClientProtocolHandler handler = new NHttpClientProtocolHandler(
-                createConnectionReuseStrategy());
+        NHttpClientProtocolHandler handler = new NHttpClientProtocolHandler();
         try {
             IOEventDispatch ioEventDispatch = new InternalClientEventDispatch(handler);
             this.ioReactor.execute(ioEventDispatch);
@@ -221,6 +226,8 @@ public class BasicHttpAsyncClient implem
                     this.connmgr,
                     createHttpProcessor(),
                     createHttpRoutePlanner(),
+                    createConnectionReuseStrategy(),
+                    createConnectionKeepAliveStrategy(),
                     this.params);
         }
         httpexchange.start();

Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/DefaultAsyncRequestDirector.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/DefaultAsyncRequestDirector.java?rev=1054735&r1=1054734&r2=1054735&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/DefaultAsyncRequestDirector.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/DefaultAsyncRequestDirector.java Mon Jan  3 20:35:01 2011
@@ -33,6 +33,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
+import org.apache.http.ConnectionReuseStrategy;
 import org.apache.http.HttpEntityEnclosingRequest;
 import org.apache.http.HttpException;
 import org.apache.http.HttpHost;
@@ -44,6 +45,7 @@ import org.apache.http.ProtocolVersion;
 import org.apache.http.client.params.ClientPNames;
 import org.apache.http.client.protocol.ClientContext;
 import org.apache.http.client.utils.URIUtils;
+import org.apache.http.conn.ConnectionKeepAliveStrategy;
 import org.apache.http.conn.routing.BasicRouteDirector;
 import org.apache.http.conn.routing.HttpRoute;
 import org.apache.http.conn.routing.HttpRouteDirector;
@@ -84,16 +86,19 @@ class DefaultAsyncRequestDirector<T> imp
     private final HttpProcessor httppocessor;
     private final HttpRoutePlanner routePlanner;
     private final HttpRouteDirector routeDirector;
+    private final ConnectionReuseStrategy reuseStrategy;
+    private final ConnectionKeepAliveStrategy keepaliveStrategy;
     private final HttpParams clientParams;
 
     private ClientParamsStack params;
     private RequestWrapper request;
-    private RequestWrapper current;
+    private HttpResponse response;
+    private RequestWrapper currentRequest;
+    private HttpResponse currentResponse;
     private HttpRoute route;
     private boolean routeEstablished;
     private Future<ManagedClientConnection> connFuture;
     private ManagedClientConnection managedConn;
-    private HttpResponse response;
 
     public DefaultAsyncRequestDirector(
             final Log log,
@@ -104,6 +109,8 @@ class DefaultAsyncRequestDirector<T> imp
             final ClientConnectionManager connmgr,
             final HttpProcessor httppocessor,
             final HttpRoutePlanner routePlanner,
+            final ConnectionReuseStrategy reuseStrategy,
+            final ConnectionKeepAliveStrategy keepaliveStrategy,
             final HttpParams clientParams) {
         super();
         this.log = log;
@@ -114,6 +121,8 @@ class DefaultAsyncRequestDirector<T> imp
         this.connmgr = connmgr;
         this.httppocessor = httppocessor;
         this.routePlanner = routePlanner;
+        this.reuseStrategy = reuseStrategy;
+        this.keepaliveStrategy = keepaliveStrategy;
         this.routeDirector = new BasicRouteDirector();
         this.clientParams = clientParams;
     }
@@ -158,8 +167,8 @@ class DefaultAsyncRequestDirector<T> imp
                 case HttpRouteDirector.TUNNEL_TARGET:
                     this.log.debug("Tunnel required");
                     HttpRequest connect = createConnectRequest(this.route);
-                    this.current = wrapRequest(connect);
-                    this.current.setParams(this.params);
+                    this.currentRequest = wrapRequest(connect);
+                    this.currentRequest.setParams(this.params);
                     break;
                 case HttpRouteDirector.TUNNEL_PROXY:
                     throw new HttpException("Proxy chains are not supported");
@@ -176,7 +185,7 @@ class DefaultAsyncRequestDirector<T> imp
                     throw new IllegalStateException("Unknown step indicator "
                             + step + " from RouteDirector.");
                 }
-            } while (step > HttpRouteDirector.COMPLETE && this.current == null);
+            } while (step > HttpRouteDirector.COMPLETE && this.currentRequest == null);
         }
 
         HttpHost target = (HttpHost) this.params.getParameter(ClientPNames.VIRTUAL_HOST);
@@ -185,22 +194,22 @@ class DefaultAsyncRequestDirector<T> imp
         }
         HttpHost proxy = this.route.getProxyHost();
 
-        if (this.current == null) {
-            this.current = this.request;
+        if (this.currentRequest == null) {
+            this.currentRequest = this.request;
             // Re-write request URI if needed
-            rewriteRequestURI(this.current, this.route);
+            rewriteRequestURI(this.currentRequest, this.route);
         }
         // Reset headers on the request wrapper
-        this.current.resetHeaders();
+        this.currentRequest.resetHeaders();
 
-        this.localContext.setAttribute(ExecutionContext.HTTP_REQUEST, this.current);
+        this.localContext.setAttribute(ExecutionContext.HTTP_REQUEST, this.currentRequest);
         this.localContext.setAttribute(ExecutionContext.HTTP_TARGET_HOST, target);
         this.localContext.setAttribute(ExecutionContext.HTTP_PROXY_HOST, proxy);
-        this.httppocessor.process(this.current, this.localContext);
+        this.httppocessor.process(this.currentRequest, this.localContext);
         if (this.log.isDebugEnabled()) {
-            this.log.debug("Request submitted: " + this.current.getRequestLine());
+            this.log.debug("Request submitted: " + this.currentRequest.getRequestLine());
         }
-        return this.current;
+        return this.currentRequest;
     }
 
     public synchronized void produceContent(
@@ -222,16 +231,18 @@ class DefaultAsyncRequestDirector<T> imp
     public synchronized void responseReceived(
             final HttpResponse response) throws IOException, HttpException {
         if (this.log.isDebugEnabled()) {
-            this.log.debug("Response received: " + response.getStatusLine());
+            this.log.debug("Response: " + response.getStatusLine());
         }
-        response.setParams(this.params);
-        this.localContext.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
-        this.httppocessor.process(response, this.localContext);
+        this.currentResponse = response;
+        this.currentResponse.setParams(this.params);
+        this.localContext.setAttribute(ExecutionContext.HTTP_RESPONSE, this.currentResponse);
+        this.httppocessor.process(this.currentResponse, this.localContext);
 
-        int status = response.getStatusLine().getStatusCode();
+        int status = this.currentResponse.getStatusLine().getStatusCode();
 
         if (!this.routeEstablished) {
-            if (this.current.getMethod().equalsIgnoreCase("CONNECT") && status == HttpStatus.SC_OK) {
+            String method = this.currentRequest.getMethod();
+            if (method.equalsIgnoreCase("CONNECT") && status == HttpStatus.SC_OK) {
                 this.managedConn.tunnelTarget(this.params);
             } else {
                 this.response = response;
@@ -242,7 +253,7 @@ class DefaultAsyncRequestDirector<T> imp
         if (this.response != null) {
             this.responseConsumer.responseReceived(response);
         }
-        this.current = null;
+        this.currentRequest = null;
     }
 
     public synchronized void consumeContent(
@@ -283,12 +294,32 @@ class DefaultAsyncRequestDirector<T> imp
     }
 
     public synchronized void responseCompleted() {
-        this.log.debug("Response completed");
+        this.log.debug("Response fully read");
         try {
+            if (this.reuseStrategy.keepAlive(this.currentResponse, this.localContext)) {
+                long duration = this.keepaliveStrategy.getKeepAliveDuration(
+                        this.currentResponse, this.localContext);
+                if (this.log.isDebugEnabled()) {
+                    String s;
+                    if (duration >= 0) {
+                        s = duration + " " + TimeUnit.MILLISECONDS;
+                    } else {
+                        s = "ever";
+                    }
+                    this.log.debug("Connection can be kept alive for " + s);
+                }
+                this.managedConn.setIdleDuration(duration, TimeUnit.MILLISECONDS);
+            } else {
+                try {
+                    this.managedConn.close();
+                } catch (IOException ex) {
+                    this.log.debug("I/O error closing connection", ex);
+                }
+            }
             if (this.response != null) {
                 this.responseConsumer.responseCompleted();
                 if (this.responseConsumer.isDone()) {
-                    this.log.debug("Response processing completed");
+                    this.log.debug("Response processed");
                     this.resultFuture.completed(this.responseConsumer.getResult());
                     releaseResources();
                 }

Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/NHttpClientProtocolHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/NHttpClientProtocolHandler.java?rev=1054735&r1=1054734&r2=1054735&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/NHttpClientProtocolHandler.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/NHttpClientProtocolHandler.java Mon Jan  3 20:35:01 2011
@@ -31,7 +31,6 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.http.ConnectionReuseStrategy;
 import org.apache.http.HttpEntityEnclosingRequest;
 import org.apache.http.HttpException;
 import org.apache.http.HttpInetConnection;
@@ -58,16 +57,12 @@ import org.apache.http.protocol.HttpCont
  */
 class NHttpClientProtocolHandler implements NHttpClientHandler {
 
-    private final Log log;
+    private final Log log = LogFactory.getLog(getClass());
 
     private static final String HTTP_EXCHNAGE = "http.nio.exchange";
 
-    private final ConnectionReuseStrategy connStrategy;
-
-    public NHttpClientProtocolHandler(
-            final ConnectionReuseStrategy connStrategy) {
-        this.connStrategy = connStrategy;
-        this.log = LogFactory.getLog(getClass());
+    public NHttpClientProtocolHandler() {
+        super();
     }
 
     private void closeConnection(final NHttpClientConnection conn) {
@@ -358,7 +353,6 @@ class NHttpClientProtocolHandler impleme
         if (!httpexchange.isValid()) {
             conn.close();
         }
-        HttpContext context = conn.getContext();
         HttpRequest request = httpexchange.getRequest();
         HttpResponse response = httpexchange.getResponse();
 
@@ -367,10 +361,6 @@ class NHttpClientProtocolHandler impleme
         if (method.equalsIgnoreCase("CONNECT") && status == HttpStatus.SC_OK) {
             this.log.debug("CONNECT method succeeded");
             conn.resetInput();
-        } else {
-            if (!this.connStrategy.keepAlive(response, context)) {
-                conn.close();
-            }
         }
         if (this.log.isDebugEnabled()) {
             this.log.debug("Response processed " + formatState(conn, httpexchange));
@@ -380,10 +370,6 @@ class NHttpClientProtocolHandler impleme
             httpexchange.setHandler(null);
         }
         httpexchange.reset();
-        if (conn.isOpen()) {
-            // Ready for another request
-            conn.requestOutput();
-        }
     }
 
     private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {

Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/ClientConnAdaptor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/ClientConnAdaptor.java?rev=1054735&r1=1054734&r2=1054735&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/ClientConnAdaptor.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/ClientConnAdaptor.java Mon Jan  3 20:35:01 2011
@@ -27,6 +27,7 @@
 package org.apache.http.impl.nio.conn;
 
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.http.HttpConnectionMetrics;
 import org.apache.http.HttpException;
@@ -56,8 +57,8 @@ import org.apache.http.protocol.HttpCont
 class ClientConnAdaptor implements ManagedClientConnection {
 
     private final ClientConnectionManager manager;
-    private volatile HttpPoolEntry entry;
-    private volatile OperatedClientConnection conn;
+    private HttpPoolEntry entry;
+    private OperatedClientConnection conn;
     private volatile boolean released;
     private volatile boolean reusable;
 
@@ -88,7 +89,7 @@ class ClientConnAdaptor implements Manag
             return;
         }
         this.released = true;
-        this.manager.releaseConnection(this);
+        this.manager.releaseConnection(this, this.entry.getExpiry(), TimeUnit.MILLISECONDS);
         this.entry = null;
         this.conn = null;
     }
@@ -101,7 +102,7 @@ class ClientConnAdaptor implements Manag
         this.reusable = false;
         IOSession iosession = this.entry.getIOSession();
         iosession.shutdown();
-        this.manager.releaseConnection(this);
+        this.manager.releaseConnection(this, -1, TimeUnit.MILLISECONDS);
         this.entry = null;
         this.conn = null;
     }
@@ -138,11 +139,16 @@ class ClientConnAdaptor implements Manag
         this.reusable = true;
     }
 
-    public void shutdown() {
+    public synchronized void shutdown() {
         abortConnection();
     }
 
-    public void close() {
+    public synchronized void close() throws IOException {
+        if (this.released) {
+            return;
+        }
+        this.conn.close();
+        this.reusable = false;
         releaseConnection();
     }
 
@@ -358,6 +364,13 @@ class ClientConnAdaptor implements Manag
         tracker.layerProtocol(layeringStrategy.isSecure());
     }
 
+    public void setIdleDuration(final long duration, final TimeUnit tunit) {
+        if (tunit == null) {
+            throw new IllegalArgumentException("Time unit may not be null");
+        }
+        this.entry.setExpiry(tunit.toMillis(duration));
+    }
+
     @Override
     public synchronized String toString() {
         HttpRoute route = this.entry.getPlannedRoute();

Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/HttpPoolEntry.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/HttpPoolEntry.java?rev=1054735&r1=1054734&r2=1054735&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/HttpPoolEntry.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/HttpPoolEntry.java Mon Jan  3 20:35:01 2011
@@ -26,17 +26,20 @@
  */
 package org.apache.http.impl.nio.conn;
 
+import org.apache.commons.logging.Log;
 import org.apache.http.conn.routing.HttpRoute;
 import org.apache.http.conn.routing.RouteTracker;
 import org.apache.http.impl.nio.pool.PoolEntry;
 import org.apache.http.nio.reactor.IOSession;
 
-public class HttpPoolEntry extends PoolEntry<HttpRoute> {
+class HttpPoolEntry extends PoolEntry<HttpRoute> {
 
+    private final Log log;
     private final RouteTracker tracker;
 
-    public HttpPoolEntry(final HttpRoute route, final IOSession session) {
+    HttpPoolEntry(final Log log, final HttpRoute route, final IOSession session) {
         super(route, session);
+        this.log = log;
         this.tracker = new RouteTracker(route);
     }
 
@@ -67,4 +70,13 @@ public class HttpPoolEntry extends PoolE
         return this.tracker.toRoute();
     }
 
+    @Override
+    public boolean isExpired(long now) {
+        boolean expired = super.isExpired(now);
+        if (expired && this.log.isDebugEnabled()) {
+            this.log.debug("Connection expired: " + this);
+        }
+        return expired;
+    }
+
 }

Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/HttpSessionPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/HttpSessionPool.java?rev=1054735&r1=1054734&r2=1054735&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/HttpSessionPool.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/HttpSessionPool.java Mon Jan  3 20:35:01 2011
@@ -28,7 +28,9 @@ package org.apache.http.impl.nio.conn;
 
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.logging.Log;
 import org.apache.http.HttpHost;
 import org.apache.http.conn.routing.HttpRoute;
 import org.apache.http.impl.nio.pool.PoolEntryFactory;
@@ -41,10 +43,13 @@ import org.apache.http.nio.reactor.IOSes
 
 class HttpSessionPool extends SessionPool<HttpRoute, HttpPoolEntry> {
 
-    public HttpSessionPool(
-            final ConnectingIOReactor ioreactor, final SchemeRegistry schemeRegistry) {
+    HttpSessionPool(
+            final Log log,
+            final ConnectingIOReactor ioreactor,
+            final SchemeRegistry schemeRegistry,
+            long timeToLive, final TimeUnit tunit) {
         super(ioreactor,
-                new InternalEntryFactory(),
+                new InternalEntryFactory(log, tunit.toMillis(timeToLive)),
                 new InternalRouteResolver(schemeRegistry),
                 20, 50);
     }
@@ -80,8 +85,24 @@ class HttpSessionPool extends SessionPoo
 
     static class InternalEntryFactory implements PoolEntryFactory<HttpRoute, HttpPoolEntry> {
 
+        private final Log log;
+        private final long connTimeToLive;
+
+        InternalEntryFactory(final Log log, final long connTimeToLive) {
+            super();
+            this.log = log;
+            this.connTimeToLive = connTimeToLive;
+        }
+
         public HttpPoolEntry createEntry(final HttpRoute route, final IOSession session) {
-            return new HttpPoolEntry(route, session);
+            HttpPoolEntry entry = new HttpPoolEntry(this.log, route, session);
+            long now = System.currentTimeMillis();
+            entry.setCreated(now);
+            entry.setUpdated(now);
+            if (this.connTimeToLive > 0) {
+                entry.setDeadline(now + this.connTimeToLive);
+            }
+            return entry;
         }
 
     };

Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/PoolingClientConnectionManager.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/PoolingClientConnectionManager.java?rev=1054735&r1=1054734&r2=1054735&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/PoolingClientConnectionManager.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/conn/PoolingClientConnectionManager.java Mon Jan  3 20:35:01 2011
@@ -40,7 +40,6 @@ import org.apache.http.nio.conn.ClientCo
 import org.apache.http.nio.conn.PoolStats;
 import org.apache.http.nio.conn.scheme.SchemeRegistry;
 import org.apache.http.nio.reactor.ConnectingIOReactor;
-import org.apache.http.nio.reactor.IOSession;
 
 public class PoolingClientConnectionManager implements ClientConnectionManager {
 
@@ -51,7 +50,8 @@ public class PoolingClientConnectionMana
 
     public PoolingClientConnectionManager(
             final ConnectingIOReactor ioreactor,
-            final SchemeRegistry schemeRegistry) {
+            final SchemeRegistry schemeRegistry,
+            final long timeToLive, final TimeUnit tunit) {
         super();
         if (ioreactor == null) {
             throw new IllegalArgumentException("I/O reactor may not be null");
@@ -59,27 +59,42 @@ public class PoolingClientConnectionMana
         if (schemeRegistry == null) {
             throw new IllegalArgumentException("Scheme registory may not be null");
         }
-        this.pool = new HttpSessionPool(ioreactor, schemeRegistry);
+        if (tunit == null) {
+            throw new IllegalArgumentException("Time unit may not be null");
+        }
+        this.pool = new HttpSessionPool(this.log, ioreactor, schemeRegistry, timeToLive, tunit);
         this.schemeRegistry = schemeRegistry;
     }
 
     public PoolingClientConnectionManager(
+            final ConnectingIOReactor ioreactor,
+            final SchemeRegistry schemeRegistry) {
+        this(ioreactor, schemeRegistry, -1, TimeUnit.MILLISECONDS);
+    }
+
+    public PoolingClientConnectionManager(
             final ConnectingIOReactor ioreactor) {
-        this(ioreactor, SchemeRegistryFactory.createDefault());
+        this(ioreactor, SchemeRegistryFactory.createDefault(), -1, TimeUnit.MILLISECONDS);
     }
 
     public SchemeRegistry getSchemeRegistry() {
         return this.schemeRegistry;
     }
 
-    public synchronized Future<ManagedClientConnection> leaseConnection(
+    public Future<ManagedClientConnection> leaseConnection(
             final HttpRoute route,
             final Object state,
             final long timeout,
-            final TimeUnit timeUnit,
+            final TimeUnit tunit,
             final FutureCallback<ManagedClientConnection> callback) {
+        if (route == null) {
+            throw new IllegalArgumentException("HTTP route may not be null");
+        }
+        if (tunit == null) {
+            throw new IllegalArgumentException("Time unit may not be null");
+        }
         if (this.log.isDebugEnabled()) {
-            this.log.debug("I/O session request: route[" + route + "][state: " + state + "]");
+            this.log.debug("Connection request: route[" + route + "][state: " + state + "]");
             PoolStats totals = this.pool.getTotalStats();
             PoolStats stats = this.pool.getStats(route);
             this.log.debug("Total: " + totals);
@@ -87,39 +102,64 @@ public class PoolingClientConnectionMana
         }
         BasicFuture<ManagedClientConnection> future = new BasicFuture<ManagedClientConnection>(
                 callback);
-        this.pool.lease(route, state, timeout, timeUnit, new InternalPoolEntryCallback(future));
+        this.pool.lease(route, state, timeout, tunit, new InternalPoolEntryCallback(future));
         if (this.log.isDebugEnabled()) {
             if (!future.isDone()) {
-                this.log.debug("I/O session could not be allocated immediately: " +
+                this.log.debug("Connection could not be allocated immediately: " +
                         "route[" + route + "][state: " + state + "]");
             }
         }
         return future;
     }
 
-    public synchronized void releaseConnection(final ManagedClientConnection conn) {
+    public void releaseConnection(
+            final ManagedClientConnection conn,
+            final long validDuration,
+            final TimeUnit tunit) {
+        if (conn == null) {
+            throw new IllegalArgumentException("HTTP connection may not be null");
+        }
         if (!(conn instanceof ClientConnAdaptor)) {
-            throw new IllegalArgumentException
-                ("I/O session class mismatch, " +
-                 "I/O session not obtained from this manager");
+            throw new IllegalArgumentException("Connection class mismatch, " +
+                 "connection not obtained from this manager");
+        }
+        if (tunit == null) {
+            throw new IllegalArgumentException("Time unit may not be null");
         }
         ClientConnAdaptor adaptor = (ClientConnAdaptor) conn;
         ClientConnectionManager manager = adaptor.getManager();
         if (manager != null && manager != this) {
-            throw new IllegalArgumentException
-                ("I/O session not obtained from this manager");
+            throw new IllegalArgumentException("connection not obtained from this manager");
         }
         HttpPoolEntry entry = adaptor.getEntry();
-        IOSession iosession = entry.getIOSession();
         if (this.log.isDebugEnabled()) {
             HttpRoute route = entry.getPlannedRoute();
+            Object state = entry.getState();
+            this.log.debug("Releasing connection: route[" + route + "][state: " + state + "]");
             PoolStats totals = this.pool.getTotalStats();
             PoolStats stats = this.pool.getStats(route);
             this.log.debug("Total: " + totals);
             this.log.debug("Route [" + route + "]: " + stats);
-            this.log.debug("I/O session released: " + entry);
         }
-        this.pool.release(entry, adaptor.isReusable() && !iosession.isClosed());
+
+        boolean reusable = adaptor.isReusable();
+        if (reusable) {
+            entry.setExpiry(tunit.toMillis(validDuration));
+            if (this.log.isDebugEnabled()) {
+                entry.setExpiry(tunit.toMillis(validDuration));
+                String s;
+                if (validDuration >= 0) {
+                    s = validDuration + " " + tunit;
+                } else {
+                    s = "ever";
+                }
+                HttpRoute route = entry.getPlannedRoute();
+                Object state = entry.getState();
+                this.log.debug("Pooling connection" +
+                        " [" + route + "][" + state + "]; keep alive for " + s);
+            }
+        }
+        this.pool.release(entry, reusable);
     }
 
     public PoolStats getTotalStats() {
@@ -142,8 +182,20 @@ public class PoolingClientConnectionMana
         this.pool.setMaxPerHost(route, max);
     }
 
-    public synchronized void shutdown() {
-        this.log.debug("I/O session manager shut down");
+    public void closeIdleConnections(long idleTimeout, final TimeUnit tunit) {
+        if (log.isDebugEnabled()) {
+            log.debug("Closing connections idle longer than " + idleTimeout + " " + tunit);
+        }
+        this.pool.closeIdle(idleTimeout, tunit);
+    }
+
+    public void closeExpiredConnections() {
+        log.debug("Closing expired connections");
+        this.pool.closeExpired();
+    }
+
+    public void shutdown() {
+        this.log.debug("Connection manager shut down");
         this.pool.shutdown();
     }
 
@@ -159,7 +211,7 @@ public class PoolingClientConnectionMana
 
         public void completed(final HttpPoolEntry entry) {
             if (log.isDebugEnabled()) {
-                log.debug("I/O session allocated: " + entry);
+                log.debug("Connection allocated: " + entry);
             }
             ManagedClientConnection conn = new ClientConnAdaptor(
                     PoolingClientConnectionManager.this,
@@ -171,13 +223,13 @@ public class PoolingClientConnectionMana
 
         public void failed(final Exception ex) {
             if (log.isDebugEnabled()) {
-                log.debug("I/O session request failed", ex);
+                log.debug("Connection request failed", ex);
             }
             this.future.failed(ex);
         }
 
         public void cancelled() {
-            log.debug("I/O session request cancelled");
+            log.debug("Connection request cancelled");
             this.future.cancel(true);
         }
 

Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/PoolEntry.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/PoolEntry.java?rev=1054735&r1=1054734&r2=1054735&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/PoolEntry.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/PoolEntry.java Mon Jan  3 20:35:01 2011
@@ -38,6 +38,10 @@ public abstract class PoolEntry<T> {
     private final T route;
     private final IOSession session;
     private Object state;
+    private long created;
+    private long updated;
+    private long deadline;
+    private long expiry;
 
     public PoolEntry(final T route, final IOSession session) {
         super();
@@ -62,6 +66,48 @@ public abstract class PoolEntry<T> {
         this.state = state;
     }
 
+    public long getCreated() {
+        return this.created;
+    }
+
+    public void setCreated(long created) {
+        this.created = created;
+    }
+
+    public long getUpdated() {
+        return this.updated;
+    }
+
+    public void setUpdated(long updated) {
+        this.updated = updated;
+    }
+
+    public long getDeadline() {
+        return this.deadline;
+    }
+
+    public void setDeadline(long deadline) {
+        this.deadline = deadline;
+    }
+
+    public long getExpiry() {
+        return this.expiry;
+    }
+
+    public void setExpiry(long expiry) {
+        this.expiry = expiry;
+    }
+
+    public boolean isExpired(long now) {
+        if (this.deadline > 0 && this.deadline < now) {
+            return true;
+        }
+        if (this.expiry > 0 && this.expiry < now) {
+            return true;
+        }
+        return false;
+    }
+
     @Override
     public String toString() {
         StringBuilder buffer = new StringBuilder();

Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/SessionPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/SessionPool.java?rev=1054735&r1=1054734&r2=1054735&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/SessionPool.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/SessionPool.java Mon Jan  3 20:35:01 2011
@@ -28,6 +28,7 @@ package org.apache.http.impl.nio.pool;
 
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.ListIterator;
 import java.util.Map;
@@ -123,15 +124,20 @@ public abstract class SessionPool<T, E e
 
     public void lease(
             final T route, final Object state,
-            final long connectTimeout, final TimeUnit timeUnit,
+            final long connectTimeout, final TimeUnit tunit,
             final PoolEntryCallback<T, E> callback) {
+        if (route == null) {
+            throw new IllegalArgumentException("Route may not be null");
+        }
+        if (tunit == null) {
+            throw new IllegalArgumentException("Time unit must not be null.");
+        }
         if (this.isShutDown) {
             throw new IllegalStateException("Session pool has been shut down");
         }
         this.lock.lock();
         try {
-            TimeUnit unit = timeUnit != null ? timeUnit : TimeUnit.MILLISECONDS;
-            int timeout = (int) unit.toMillis(connectTimeout);
+            int timeout = (int) tunit.toMillis(connectTimeout);
             if (timeout < 0) {
                 timeout = 0;
             }
@@ -155,6 +161,8 @@ public abstract class SessionPool<T, E e
                 pool.freeEntry(entry, reusable);
                 if (reusable) {
                     this.availableSessions.add(entry);
+                } else {
+                    entryShutdown(entry);
                 }
                 processPendingRequests();
             }
@@ -350,6 +358,9 @@ public abstract class SessionPool<T, E e
     }
 
     public PoolStats getStats(final T route) {
+        if (route == null) {
+            throw new IllegalArgumentException("Route may not be null");
+        }
         this.lock.lock();
         try {
             SessionPoolForRoute<T, E> pool = getPool(route);
@@ -363,6 +374,49 @@ public abstract class SessionPool<T, E e
         }
     }
 
+    public void closeIdle(long idletime, final TimeUnit tunit) {
+        if (tunit == null) {
+            throw new IllegalArgumentException("Time unit must not be null.");
+        }
+        long time = tunit.toMillis(idletime);
+        if (time < 0) {
+            time = 0;
+        }
+        long deadline = System.currentTimeMillis() - time;
+        this.lock.lock();
+        try {
+            Iterator<E> it = this.availableSessions.iterator();
+            while (it.hasNext()) {
+                E entry = it.next();
+                if (entry.getUpdated() <= deadline) {
+                    it.remove();
+                    entryShutdown(entry);
+                }
+            }
+            processPendingRequests();
+        } finally {
+            this.lock.unlock();
+        }
+    }
+
+    public void closeExpired() {
+        long now = System.currentTimeMillis();
+        this.lock.lock();
+        try {
+            Iterator<E> it = this.availableSessions.iterator();
+            while (it.hasNext()) {
+                E entry = it.next();
+                if (entry.isExpired(now)) {
+                    it.remove();
+                    entryShutdown(entry);
+                }
+            }
+            processPendingRequests();
+        } finally {
+            this.lock.unlock();
+        }
+    }
+
     @Override
     public String toString() {
         StringBuilder buffer = new StringBuilder();

Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/SessionPoolForRoute.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/SessionPoolForRoute.java?rev=1054735&r1=1054734&r2=1054735&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/SessionPoolForRoute.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/pool/SessionPoolForRoute.java Mon Jan  3 20:35:01 2011
@@ -76,7 +76,7 @@ class SessionPoolForRoute<T, E extends P
             while (it.hasPrevious()) {
                 E entry = it.previous();
                 IOSession iosession = entry.getIOSession();
-                if (iosession.isClosed()) {
+                if (iosession.isClosed() || entry.isExpired(System.currentTimeMillis())) {
                     it.remove();
                 } else {
                     if (entry.getState() == null || entry.getState().equals(state)) {

Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/conn/ClientConnectionManager.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/conn/ClientConnectionManager.java?rev=1054735&r1=1054734&r2=1054735&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/conn/ClientConnectionManager.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/conn/ClientConnectionManager.java Mon Jan  3 20:35:01 2011
@@ -42,7 +42,8 @@ public interface ClientConnectionManager
             long connectTimeout, TimeUnit timeUnit,
             FutureCallback<ManagedClientConnection> callback);
 
-    void releaseConnection(ManagedClientConnection session);
+    void releaseConnection(ManagedClientConnection session,
+            long validDuration, TimeUnit timeUnit);
 
     void shutdown();
 

Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/conn/ManagedClientConnection.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/conn/ManagedClientConnection.java?rev=1054735&r1=1054734&r2=1054735&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/conn/ManagedClientConnection.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/conn/ManagedClientConnection.java Mon Jan  3 20:35:01 2011
@@ -27,6 +27,7 @@
 package org.apache.http.nio.conn;
 
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.http.HttpHost;
 import org.apache.http.conn.ConnectionReleaseTrigger;
@@ -57,4 +58,6 @@ public interface ManagedClientConnection
 
     void layerProtocol(HttpContext context, HttpParams params) throws IOException;
 
+    void setIdleDuration(long duration, TimeUnit tunit);
+
 }