You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2011/07/29 14:13:05 UTC

svn commit: r1152181 - in /httpcomponents/httpcore/trunk/httpcore-nio/src: main/java/org/apache/http/nio/pool/ test/java/org/apache/http/nio/pool/

Author: olegk
Date: Fri Jul 29 12:13:04 2011
New Revision: 1152181

URL: http://svn.apache.org/viewvc?rev=1152181&view=rev
Log:
NIO conn pool to use standard Future interface instead of a custom callback

Removed:
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/PoolEntryCallback.java
Modified:
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/AbstractNIOConnPool.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/LeaseRequest.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/RouteSpecificPool.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/pool/TestNIOConnPool.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/pool/TestRouteSpecificPool.java

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/AbstractNIOConnPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/AbstractNIOConnPool.java?rev=1152181&r1=1152180&r2=1152181&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/AbstractNIOConnPool.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/AbstractNIOConnPool.java Fri Jul 29 12:13:04 2011
@@ -35,10 +35,13 @@ import java.util.LinkedList;
 import java.util.ListIterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.http.concurrent.BasicFuture;
+import org.apache.http.concurrent.FutureCallback;
 import org.apache.http.nio.reactor.ConnectingIOReactor;
 import org.apache.http.nio.reactor.IOSession;
 import org.apache.http.nio.reactor.SessionRequest;
@@ -46,12 +49,12 @@ import org.apache.http.nio.reactor.Sessi
 import org.apache.http.pool.PoolEntry;
 import org.apache.http.pool.PoolStats;
 
-public abstract class AbstractNIOConnPool<T, E extends PoolEntry<T, IOSession>> {
+public abstract class AbstractNIOConnPool<T, C, E extends PoolEntry<T, C>> {
 
     private final ConnectingIOReactor ioreactor;
     private final SessionRequestCallback sessionRequestCallback;
-    private final Map<T, RouteSpecificPool<T, E>> routeToPool;
-    private final LinkedList<LeaseRequest<T, E>> leasingRequests;
+    private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
+    private final LinkedList<LeaseRequest<T, C, E>> leasingRequests;
     private final Set<SessionRequest> pending;
     private final Set<E> leased;
     private final LinkedList<E> available;
@@ -78,8 +81,8 @@ public abstract class AbstractNIOConnPoo
         }
         this.ioreactor = ioreactor;
         this.sessionRequestCallback = new InternalSessionRequestCallback();
-        this.routeToPool = new HashMap<T, RouteSpecificPool<T, E>>();
-        this.leasingRequests = new LinkedList<LeaseRequest<T, E>>();
+        this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
+        this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>();
         this.pending = new HashSet<SessionRequest>();
         this.leased = new HashSet<E>();
         this.available = new LinkedList<E>();
@@ -93,7 +96,9 @@ public abstract class AbstractNIOConnPoo
 
     protected abstract SocketAddress resolveLocalAddress(T route);
 
-    protected abstract E createEntry(T route, IOSession session);
+    protected abstract C createConnection(T route, IOSession session);
+
+    protected abstract E createEntry(T route, C conn);
 
     protected abstract void closeEntry(E entry);
 
@@ -117,7 +122,7 @@ public abstract class AbstractNIOConnPoo
             for (E entry: this.leased) {
                 closeEntry(entry);
             }
-            for (RouteSpecificPool<T, E> pool: this.routeToPool.values()) {
+            for (RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
                 pool.shutdown();
             }
             this.routeToPool.clear();
@@ -131,14 +136,19 @@ public abstract class AbstractNIOConnPoo
         }
     }
 
-    private RouteSpecificPool<T, E> getPool(final T route) {
-        RouteSpecificPool<T, E> pool = this.routeToPool.get(route);
+    private RouteSpecificPool<T, C, E> getPool(final T route) {
+        RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
         if (pool == null) {
-            pool = new RouteSpecificPool<T, E>(route) {
+            pool = new RouteSpecificPool<T, C, E>(route) {
 
                 @Override
-                protected E createEntry(final T route, final IOSession session) {
-                    return AbstractNIOConnPool.this.createEntry(route, session);
+                protected E createEntry(final T route, final C conn) {
+                    return AbstractNIOConnPool.this.createEntry(route, conn);
+                }
+
+                @Override
+                protected void closeEntry(final E entry) {
+                    AbstractNIOConnPool.this.closeEntry(entry);
                 }
 
             };
@@ -147,19 +157,16 @@ public abstract class AbstractNIOConnPoo
         return pool;
     }
 
-    public void lease(
+    public Future<E> lease(
             final T route, final Object state,
             final long connectTimeout, final TimeUnit tunit,
-            final PoolEntryCallback<E> callback) {
+            final FutureCallback<E> callback) {
         if (route == null) {
             throw new IllegalArgumentException("Route may not be null");
         }
         if (tunit == null) {
             throw new IllegalArgumentException("Time unit may not be null.");
         }
-        if (callback == null) {
-            throw new IllegalArgumentException("Callback may not be null.");
-        }
         if (this.isShutDown) {
             throw new IllegalStateException("Session pool has been shut down");
         }
@@ -169,15 +176,25 @@ public abstract class AbstractNIOConnPoo
             if (timeout < 0) {
                 timeout = 0;
             }
-            LeaseRequest<T, E> request = new LeaseRequest<T, E>(route, state, timeout, callback);
+            BasicFuture<E> future = new BasicFuture<E>(callback);
+            LeaseRequest<T, C, E> request = new LeaseRequest<T, C, E>(route, state, timeout, future);
             this.leasingRequests.add(request);
 
             processPendingRequests();
+            return future;
         } finally {
             this.lock.unlock();
         }
     }
 
+    public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
+        return lease(route, state, -1, TimeUnit.MICROSECONDS, callback);
+    }
+
+    public Future<E> lease(final T route, final Object state) {
+        return lease(route, state, -1, TimeUnit.MICROSECONDS, null);
+    }
+
     public void release(final E entry, boolean reusable) {
         if (this.isShutDown) {
             return;
@@ -185,7 +202,7 @@ public abstract class AbstractNIOConnPoo
         this.lock.lock();
         try {
             if (this.leased.remove(entry)) {
-                RouteSpecificPool<T, E> pool = getPool(entry.getRoute());
+                RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
                 pool.free(entry, reusable);
                 if (reusable) {
                     this.available.add(entry);
@@ -200,16 +217,16 @@ public abstract class AbstractNIOConnPoo
     }
 
     private void processPendingRequests() {
-        ListIterator<LeaseRequest<T, E>> it = this.leasingRequests.listIterator();
+        ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
         while (it.hasNext()) {
-            LeaseRequest<T, E> request = it.next();
+            LeaseRequest<T, C, E> request = it.next();
 
             T route = request.getRoute();
             Object state = request.getState();
             int timeout = request.getConnectTimeout();
-            PoolEntryCallback<E> callback = request.getCallback();
+            BasicFuture<E> future = request.getFuture();
 
-            RouteSpecificPool<T, E> pool = getPool(request.getRoute());
+            RouteSpecificPool<T, C, E> pool = getPool(request.getRoute());
             E entry = null;
             for (;;) {
                 entry = pool.getFree(state);
@@ -228,7 +245,7 @@ public abstract class AbstractNIOConnPoo
                 it.remove();
                 this.available.remove(entry);
                 this.leased.add(entry);
-                callback.completed(entry);
+                future.completed(entry);
                 continue;
             }
             if (pool.getAllocatedCount() < getMaxPerRoute(route)) {
@@ -249,7 +266,7 @@ public abstract class AbstractNIOConnPoo
                         this.sessionRequestCallback);
                 sessionRequest.setConnectTimeout(timeout);
                 this.pending.add(sessionRequest);
-                pool.addPending(sessionRequest, callback);
+                pool.addPending(sessionRequest, future);
             }
         }
     }
@@ -258,7 +275,7 @@ public abstract class AbstractNIOConnPoo
         if (!this.available.isEmpty()) {
             E entry = this.available.removeFirst();
             closeEntry(entry);
-            RouteSpecificPool<T, E> pool = getPool(entry.getRoute());
+            RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
             pool.remove(entry);
         }
     }
@@ -272,8 +289,10 @@ public abstract class AbstractNIOConnPoo
         this.lock.lock();
         try {
             this.pending.remove(request);
-            RouteSpecificPool<T, E> pool = getPool(route);
-            E entry = pool.completed(request);
+            RouteSpecificPool<T, C, E> pool = getPool(route);
+            IOSession session = request.getSession();
+            C conn = createConnection(route, session);
+            E entry = pool.completed(request, conn);
             this.leased.add(entry);
         } finally {
             this.lock.unlock();
@@ -289,7 +308,7 @@ public abstract class AbstractNIOConnPoo
         this.lock.lock();
         try {
             this.pending.remove(request);
-            RouteSpecificPool<T, E> pool = getPool(route);
+            RouteSpecificPool<T, C, E> pool = getPool(route);
             pool.cancelled(request);
         } finally {
             this.lock.unlock();
@@ -305,7 +324,7 @@ public abstract class AbstractNIOConnPoo
         this.lock.lock();
         try {
             this.pending.remove(request);
-            RouteSpecificPool<T, E> pool = getPool(route);
+            RouteSpecificPool<T, C, E> pool = getPool(route);
             pool.failed(request);
         } finally {
             this.lock.unlock();
@@ -321,7 +340,7 @@ public abstract class AbstractNIOConnPoo
         this.lock.lock();
         try {
             this.pending.remove(request);
-            RouteSpecificPool<T, E> pool = getPool(route);
+            RouteSpecificPool<T, C, E> pool = getPool(route);
             pool.timeout(request);
         } finally {
             this.lock.unlock();
@@ -395,7 +414,7 @@ public abstract class AbstractNIOConnPoo
         }
         this.lock.lock();
         try {
-            RouteSpecificPool<T, E> pool = getPool(route);
+            RouteSpecificPool<T, C, E> pool = getPool(route);
             return new PoolStats(
                     pool.getLeasedCount(),
                     pool.getPendingCount(),
@@ -422,7 +441,7 @@ public abstract class AbstractNIOConnPoo
                 E entry = it.next();
                 if (entry.getUpdated() <= deadline) {
                     closeEntry(entry);
-                    RouteSpecificPool<T, E> pool = getPool(entry.getRoute());
+                    RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
                     pool.remove(entry);
                     it.remove();
                 }
@@ -442,7 +461,7 @@ public abstract class AbstractNIOConnPoo
                 E entry = it.next();
                 if (entry.isExpired(now)) {
                     closeEntry(entry);
-                    RouteSpecificPool<T, E> pool = getPool(entry.getRoute());
+                    RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
                     pool.remove(entry);
                     it.remove();
                 }

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/LeaseRequest.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/LeaseRequest.java?rev=1152181&r1=1152180&r2=1152181&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/LeaseRequest.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/LeaseRequest.java Fri Jul 29 12:13:04 2011
@@ -26,26 +26,26 @@
  */
 package org.apache.http.nio.pool;
 
-import org.apache.http.nio.reactor.IOSession;
+import org.apache.http.concurrent.BasicFuture;
 import org.apache.http.pool.PoolEntry;
 
-class LeaseRequest<T, E extends PoolEntry<T, IOSession>> {
+class LeaseRequest<T, C, E extends PoolEntry<T, C>> {
 
     private final T route;
     private final Object state;
     private final int connectTimeout;
-    private final PoolEntryCallback<E> callback;
+    private final BasicFuture<E> future;
 
     public LeaseRequest(
             final T route,
             final Object state,
             final int connectTimeout,
-            final PoolEntryCallback<E> callback) {
+            final BasicFuture<E> future) {
         super();
         this.route = route;
         this.state = state;
         this.connectTimeout = connectTimeout;
-        this.callback = callback;
+        this.future = future;
     }
 
     public T getRoute() {
@@ -56,8 +56,8 @@ class LeaseRequest<T, E extends PoolEntr
         return this.state;
     }
 
-    public PoolEntryCallback<E> getCallback() {
-        return this.callback;
+    public BasicFuture<E> getFuture() {
+        return this.future;
     }
 
     public int getConnectTimeout() {

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/RouteSpecificPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/RouteSpecificPool.java?rev=1152181&r1=1152180&r2=1152181&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/RouteSpecificPool.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/RouteSpecificPool.java Fri Jul 29 12:13:04 2011
@@ -34,26 +34,28 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.http.nio.reactor.IOSession;
+import org.apache.http.concurrent.BasicFuture;
 import org.apache.http.nio.reactor.SessionRequest;
 import org.apache.http.pool.PoolEntry;
 
-abstract class RouteSpecificPool<T, E extends PoolEntry<T, IOSession>> {
+abstract class RouteSpecificPool<T, C, E extends PoolEntry<T, C>> {
 
     private final T route;
     private final Set<E> leased;
     private final LinkedList<E> available;
-    private final Map<SessionRequest, PoolEntryCallback<E>> pending;
+    private final Map<SessionRequest, BasicFuture<E>> pending;
 
     RouteSpecificPool(final T route) {
         super();
         this.route = route;
         this.leased = new HashSet<E>();
         this.available = new LinkedList<E>();
-        this.pending = new HashMap<SessionRequest, PoolEntryCallback<E>>();
+        this.pending = new HashMap<SessionRequest, BasicFuture<E>>();
     }
 
-    protected abstract E createEntry(T route, IOSession session);
+    protected abstract E createEntry(T route, C conn);
+
+    protected abstract void closeEntry(E entry);
 
     public int getLeasedCount() {
         return this.leased.size();
@@ -125,40 +127,39 @@ abstract class RouteSpecificPool<T, E ex
 
     public void addPending(
             final SessionRequest sessionRequest,
-            final PoolEntryCallback<E> callback) {
-        this.pending.put(sessionRequest, callback);
+            final BasicFuture<E> future) {
+        this.pending.put(sessionRequest, future);
     }
 
-    private PoolEntryCallback<E> removeRequest(final SessionRequest request) {
-        PoolEntryCallback<E> callback = this.pending.remove(request);
-        if (callback == null) {
+    private BasicFuture<E> removeRequest(final SessionRequest request) {
+        BasicFuture<E> future = this.pending.remove(request);
+        if (future == null) {
             throw new IllegalStateException("Invalid session request");
         }
-        return callback;
+        return future;
     }
 
-    public E completed(final SessionRequest request) {
-        PoolEntryCallback<E> callback = removeRequest(request);
-        IOSession iosession = request.getSession();
-        E entry = createEntry(this.route, iosession);
+    public E completed(final SessionRequest request, final C conn) {
+        BasicFuture<E> future = removeRequest(request);
+        E entry = createEntry(this.route, conn);
         this.leased.add(entry);
-        callback.completed(entry);
+        future.completed(entry);
         return entry;
     }
 
     public void cancelled(final SessionRequest request) {
-        PoolEntryCallback<E> callback = removeRequest(request);
-        callback.cancelled();
+        BasicFuture<E> future = removeRequest(request);
+        future.cancel(true);
     }
 
     public void failed(final SessionRequest request) {
-        PoolEntryCallback<E> callback = removeRequest(request);
-        callback.failed(request.getException());
+        BasicFuture<E> future = removeRequest(request);
+        future.failed(request.getException());
     }
 
     public void timeout(final SessionRequest request) {
-        PoolEntryCallback<E> callback = removeRequest(request);
-        callback.failed(new SocketTimeoutException());
+        BasicFuture<E> future = removeRequest(request);
+        future.failed(new SocketTimeoutException());
     }
 
     public void shutdown() {
@@ -167,11 +168,11 @@ abstract class RouteSpecificPool<T, E ex
         }
         this.pending.clear();
         for (E entry: this.available) {
-            entry.getConnection().close();
+            closeEntry(entry);
         }
         this.available.clear();
         for (E entry: this.leased) {
-            entry.getConnection().close();
+            closeEntry(entry);
         }
         this.leased.clear();
     }

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/pool/TestNIOConnPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/pool/TestNIOConnPool.java?rev=1152181&r1=1152180&r2=1152181&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/pool/TestNIOConnPool.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/pool/TestNIOConnPool.java Fri Jul 29 12:13:04 2011
@@ -26,13 +26,17 @@
  */
 package org.apache.http.nio.pool;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.SocketTimeoutException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import junit.framework.Assert;
 
+import org.apache.http.concurrent.BasicFuture;
 import org.apache.http.nio.reactor.ConnectingIOReactor;
 import org.apache.http.nio.reactor.IOSession;
 import org.apache.http.nio.reactor.SessionRequest;
@@ -51,52 +55,8 @@ public class TestNIOConnPool {
         }
 
     }
-    
-    static class BasicPoolEntryCallback implements PoolEntryCallback<LocalPoolEntry> {
 
-        private LocalPoolEntry entry;
-        private Exception ex;
-        private boolean completed;
-        private boolean failed;
-        private boolean cancelled;
-
-        public void completed(final LocalPoolEntry entry) {
-            this.entry = entry;
-            this.completed = true;
-        }
-
-        public LocalPoolEntry getEntry() {
-            return this.entry;
-        }
-
-        public Exception getException() {
-            return this.ex;
-        }
-
-        public void failed(final Exception ex) {
-            this.ex = ex;
-            this.failed = true;
-        }
-
-        public void cancelled() {
-            this.cancelled = true;
-        }
-
-        public boolean isCompleted() {
-            return this.completed;
-        }
-
-        public boolean isFailed() {
-            return this.failed;
-        }
-
-        public boolean isCancelled() {
-            return this.cancelled;
-        }
-
-    }    
-
-    static class LocalSessionPool extends AbstractNIOConnPool<String, LocalPoolEntry> {
+    static class LocalSessionPool extends AbstractNIOConnPool<String, IOSession, LocalPoolEntry> {
 
         public LocalSessionPool(
                 final ConnectingIOReactor ioreactor, int defaultMaxPerRoute, int maxTotal) {
@@ -114,6 +74,11 @@ public class TestNIOConnPool {
         }
 
         @Override
+        protected IOSession createConnection(final String route, final IOSession session) {
+            return session;
+        }
+
+        @Override
         protected LocalPoolEntry createEntry(final String route, final IOSession session) {
             return new LocalPoolEntry(route, session);
         }
@@ -145,9 +110,9 @@ public class TestNIOConnPool {
 
     @Test
     public void testInternalLeaseRequest() throws Exception {
-        LeaseRequest<String, LocalPoolEntry> leaseRequest =
-            new LeaseRequest<String, LocalPoolEntry>("somehost", null, 0,
-                    new BasicPoolEntryCallback());
+        LeaseRequest<String, IOSession, LocalPoolEntry> leaseRequest =
+            new LeaseRequest<String, IOSession, LocalPoolEntry>("somehost", null, 0,
+                    new BasicFuture<LocalPoolEntry>(null));
         Assert.assertEquals("[somehost][null]", leaseRequest.toString());
     }
 
@@ -184,8 +149,7 @@ public class TestNIOConnPool {
                 Mockito.any(), Mockito.any(SessionRequestCallback.class))).
                 thenReturn(sessionRequest);
         LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 10);
-        BasicPoolEntryCallback callback = new BasicPoolEntryCallback();
-        pool.lease("somehost", null, 100, TimeUnit.MILLISECONDS, callback);
+        Future<LocalPoolEntry> future = pool.lease("somehost", null, 100, TimeUnit.MILLISECONDS, null);
         Mockito.verify(sessionRequest).setConnectTimeout(100);
 
         PoolStats totals = pool.getTotalStats();
@@ -194,9 +158,11 @@ public class TestNIOConnPool {
         Assert.assertEquals(1, totals.getPending());
 
         pool.requestCompleted(sessionRequest);
-        Assert.assertTrue(callback.isCompleted());
-        Assert.assertFalse(callback.isFailed());
-        Assert.assertFalse(callback.isCancelled());
+
+        Assert.assertTrue(future.isDone());
+        Assert.assertFalse(future.isCancelled());
+        LocalPoolEntry entry = future.get();
+        Assert.assertNotNull(entry);
 
         totals = pool.getTotalStats();
         Assert.assertEquals(0, totals.getAvailable());
@@ -206,10 +172,9 @@ public class TestNIOConnPool {
 
     @Test
     public void testFailedConnect() throws Exception {
-        IOSession iosession = Mockito.mock(IOSession.class);
         SessionRequest sessionRequest = Mockito.mock(SessionRequest.class);
         Mockito.when(sessionRequest.getAttachment()).thenReturn("somehost");
-        Mockito.when(sessionRequest.getSession()).thenReturn(iosession);
+        Mockito.when(sessionRequest.getException()).thenReturn(new IOException());
         ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
         Mockito.when(ioreactor.connect(
                 Mockito.any(SocketAddress.class),
@@ -217,8 +182,7 @@ public class TestNIOConnPool {
                 Mockito.any(), Mockito.any(SessionRequestCallback.class))).
                 thenReturn(sessionRequest);
         LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 10);
-        BasicPoolEntryCallback callback = new BasicPoolEntryCallback();
-        pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback);
+        Future<LocalPoolEntry> future = pool.lease("somehost", null);
 
         PoolStats totals = pool.getTotalStats();
         Assert.assertEquals(0, totals.getAvailable());
@@ -226,9 +190,15 @@ public class TestNIOConnPool {
         Assert.assertEquals(1, totals.getPending());
 
         pool.requestFailed(sessionRequest);
-        Assert.assertFalse(callback.isCompleted());
-        Assert.assertTrue(callback.isFailed());
-        Assert.assertFalse(callback.isCancelled());
+
+        Assert.assertTrue(future.isDone());
+        Assert.assertFalse(future.isCancelled());
+        try {
+            future.get();
+            Assert.fail("ExecutionException should have been thrown");
+        } catch (ExecutionException ex) {
+            Assert.assertTrue(ex.getCause() instanceof IOException);
+        }
 
         totals = pool.getTotalStats();
         Assert.assertEquals(0, totals.getAvailable());
@@ -249,8 +219,7 @@ public class TestNIOConnPool {
                 Mockito.any(), Mockito.any(SessionRequestCallback.class))).
                 thenReturn(sessionRequest);
         LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 10);
-        BasicPoolEntryCallback callback = new BasicPoolEntryCallback();
-        pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback);
+        Future<LocalPoolEntry> future = pool.lease("somehost", null);
 
         PoolStats totals = pool.getTotalStats();
         Assert.assertEquals(0, totals.getAvailable());
@@ -258,9 +227,11 @@ public class TestNIOConnPool {
         Assert.assertEquals(1, totals.getPending());
 
         pool.requestCancelled(sessionRequest);
-        Assert.assertFalse(callback.isCompleted());
-        Assert.assertFalse(callback.isFailed());
-        Assert.assertTrue(callback.isCancelled());
+
+        Assert.assertTrue(future.isDone());
+        Assert.assertTrue(future.isCancelled());
+        LocalPoolEntry entry = future.get();
+        Assert.assertNull(entry);
 
         totals = pool.getTotalStats();
         Assert.assertEquals(0, totals.getAvailable());
@@ -281,8 +252,7 @@ public class TestNIOConnPool {
                 Mockito.any(), Mockito.any(SessionRequestCallback.class))).
                 thenReturn(sessionRequest);
         LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 10);
-        BasicPoolEntryCallback callback = new BasicPoolEntryCallback();
-        pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback);
+        Future<LocalPoolEntry> future = pool.lease("somehost", null);
 
         PoolStats totals = pool.getTotalStats();
         Assert.assertEquals(0, totals.getAvailable());
@@ -290,10 +260,15 @@ public class TestNIOConnPool {
         Assert.assertEquals(1, totals.getPending());
 
         pool.requestTimeout(sessionRequest);
-        Assert.assertFalse(callback.isCompleted());
-        Assert.assertTrue(callback.isFailed());
-        Assert.assertFalse(callback.isCancelled());
-        Assert.assertTrue(callback.getException() instanceof SocketTimeoutException);
+
+        Assert.assertTrue(future.isDone());
+        Assert.assertFalse(future.isCancelled());
+        try {
+            future.get();
+            Assert.fail("ExecutionException should have been thrown");
+        } catch (ExecutionException ex) {
+            Assert.assertTrue(ex.getCause() instanceof SocketTimeoutException);
+        }
 
         totals = pool.getTotalStats();
         Assert.assertEquals(0, totals.getAvailable());
@@ -326,21 +301,18 @@ public class TestNIOConnPool {
                 thenReturn(sessionRequest2);
 
         LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 10);
-        BasicPoolEntryCallback callback1 = new BasicPoolEntryCallback();
-        pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback1);
+        Future<LocalPoolEntry> future1 = pool.lease("somehost", null);
         pool.requestCompleted(sessionRequest1);
-        BasicPoolEntryCallback callback2 = new BasicPoolEntryCallback();
-        pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback2);
+        Future<LocalPoolEntry> future2 = pool.lease("somehost", null);
         pool.requestCompleted(sessionRequest1);
-        BasicPoolEntryCallback callback3 = new BasicPoolEntryCallback();
-        pool.lease("otherhost", null, -1, TimeUnit.MILLISECONDS, callback3);
+        Future<LocalPoolEntry> future3 = pool.lease("otherhost", null);
         pool.requestCompleted(sessionRequest2);
 
-        LocalPoolEntry entry1 = callback1.getEntry();
+        LocalPoolEntry entry1 = future1.get();
         Assert.assertNotNull(entry1);
-        LocalPoolEntry entry2 = callback2.getEntry();
+        LocalPoolEntry entry2 = future2.get();
         Assert.assertNotNull(entry2);
-        LocalPoolEntry entry3 = callback3.getEntry();
+        LocalPoolEntry entry3 = future3.get();
         Assert.assertNotNull(entry3);
 
         pool.release(entry1, true);
@@ -360,17 +332,12 @@ public class TestNIOConnPool {
         ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
         LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 10);
         try {
-            pool.lease(null, null, 0, TimeUnit.MILLISECONDS, new BasicPoolEntryCallback());
-            Assert.fail("IllegalArgumentException should have been thrown");
-        } catch (IllegalArgumentException expected) {
-        }
-        try {
-            pool.lease("somehost", null, 0, null, new BasicPoolEntryCallback());
+            pool.lease(null, null, 0, TimeUnit.MILLISECONDS, null);
             Assert.fail("IllegalArgumentException should have been thrown");
         } catch (IllegalArgumentException expected) {
         }
         try {
-            pool.lease("somehost", null, 0, TimeUnit.MILLISECONDS, null);
+            pool.lease("somehost", null, 0, null, null);
             Assert.fail("IllegalArgumentException should have been thrown");
         } catch (IllegalArgumentException expected) {
         }
@@ -412,21 +379,18 @@ public class TestNIOConnPool {
         pool.setMaxPerHost("otherhost", 1);
         pool.setTotalMax(3);
 
-        BasicPoolEntryCallback callback1 = new BasicPoolEntryCallback();
-        pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback1);
+        Future<LocalPoolEntry> future1 = pool.lease("somehost", null);
         pool.requestCompleted(sessionRequest1);
-        BasicPoolEntryCallback callback2 = new BasicPoolEntryCallback();
-        pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback2);
+        Future<LocalPoolEntry> future2 = pool.lease("somehost", null);
         pool.requestCompleted(sessionRequest1);
-        BasicPoolEntryCallback callback3 = new BasicPoolEntryCallback();
-        pool.lease("otherhost", null, -1, TimeUnit.MILLISECONDS, callback3);
+        Future<LocalPoolEntry> future3 = pool.lease("otherhost", null);
         pool.requestCompleted(sessionRequest2);
 
-        LocalPoolEntry entry1 = callback1.getEntry();
+        LocalPoolEntry entry1 = future1.get();
         Assert.assertNotNull(entry1);
-        LocalPoolEntry entry2 = callback2.getEntry();
+        LocalPoolEntry entry2 = future2.get();
         Assert.assertNotNull(entry2);
-        LocalPoolEntry entry3 = callback3.getEntry();
+        LocalPoolEntry entry3 = future3.get();
         Assert.assertNotNull(entry3);
 
         pool.release(entry1, true);
@@ -438,31 +402,25 @@ public class TestNIOConnPool {
         Assert.assertEquals(0, totals.getLeased());
         Assert.assertEquals(0, totals.getPending());
 
-        BasicPoolEntryCallback callback4 = new BasicPoolEntryCallback();
-        pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback4);
-        BasicPoolEntryCallback callback5 = new BasicPoolEntryCallback();
-        pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback5);
-        BasicPoolEntryCallback callback6 = new BasicPoolEntryCallback();
-        pool.lease("otherhost", null, -1, TimeUnit.MILLISECONDS, callback6);
-        BasicPoolEntryCallback callback7 = new BasicPoolEntryCallback();
-        pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback7);
-        BasicPoolEntryCallback callback8 = new BasicPoolEntryCallback();
-        pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback8);
-        BasicPoolEntryCallback callback9 = new BasicPoolEntryCallback();
-        pool.lease("otherhost", null, -1, TimeUnit.MILLISECONDS, callback9);
+        Future<LocalPoolEntry> future4 = pool.lease("somehost", null);
+        Future<LocalPoolEntry> future5 = pool.lease("somehost", null);
+        Future<LocalPoolEntry> future6 = pool.lease("otherhost", null);
+        Future<LocalPoolEntry> future7 = pool.lease("somehost", null);
+        Future<LocalPoolEntry> future8 = pool.lease("somehost", null);
+        Future<LocalPoolEntry> future9 = pool.lease("otherhost", null);
 
-        Assert.assertTrue(callback4.isCompleted());
-        LocalPoolEntry entry4 = callback4.getEntry();
+        Assert.assertTrue(future4.isDone());
+        LocalPoolEntry entry4 = future4.get();
         Assert.assertNotNull(entry4);
-        Assert.assertTrue(callback5.isCompleted());
-        LocalPoolEntry entry5 = callback5.getEntry();
+        Assert.assertTrue(future5.isDone());
+        LocalPoolEntry entry5 = future5.get();
         Assert.assertNotNull(entry5);
-        Assert.assertTrue(callback6.isCompleted());
-        LocalPoolEntry entry6 = callback6.getEntry();
+        Assert.assertTrue(future6.isDone());
+        LocalPoolEntry entry6 = future6.get();
         Assert.assertNotNull(entry6);
-        Assert.assertFalse(callback7.isCompleted());
-        Assert.assertFalse(callback8.isCompleted());
-        Assert.assertFalse(callback9.isCompleted());
+        Assert.assertFalse(future7.isDone());
+        Assert.assertFalse(future8.isDone());
+        Assert.assertFalse(future9.isDone());
 
         Mockito.verify(ioreactor, Mockito.times(3)).connect(
                 Mockito.any(SocketAddress.class), Mockito.any(SocketAddress.class),
@@ -472,9 +430,9 @@ public class TestNIOConnPool {
         pool.release(entry5, false);
         pool.release(entry6, true);
 
-        Assert.assertTrue(callback7.isCompleted());
-        Assert.assertFalse(callback8.isCompleted());
-        Assert.assertTrue(callback9.isCompleted());
+        Assert.assertTrue(future7.isDone());
+        Assert.assertFalse(future8.isDone());
+        Assert.assertTrue(future9.isDone());
 
         Mockito.verify(ioreactor, Mockito.times(4)).connect(
                 Mockito.any(SocketAddress.class), Mockito.any(SocketAddress.class),
@@ -520,14 +478,10 @@ public class TestNIOConnPool {
         pool.setMaxPerHost("otherhost", 2);
         pool.setTotalMax(2);
 
-        BasicPoolEntryCallback callback1 = new BasicPoolEntryCallback();
-        pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback1);
-        BasicPoolEntryCallback callback2 = new BasicPoolEntryCallback();
-        pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback2);
-        BasicPoolEntryCallback callback3 = new BasicPoolEntryCallback();
-        pool.lease("otherhost", null, -1, TimeUnit.MILLISECONDS, callback3);
-        BasicPoolEntryCallback callback4 = new BasicPoolEntryCallback();
-        pool.lease("otherhost", null, -1, TimeUnit.MILLISECONDS, callback4);
+        Future<LocalPoolEntry> future1 = pool.lease("somehost", null);
+        Future<LocalPoolEntry> future2 = pool.lease("somehost", null);
+        Future<LocalPoolEntry> future3 = pool.lease("otherhost", null);
+        Future<LocalPoolEntry> future4 = pool.lease("otherhost", null);
 
         Mockito.verify(ioreactor, Mockito.times(2)).connect(
                 Mockito.eq(InetSocketAddress.createUnresolved("somehost", 80)),
@@ -542,15 +496,15 @@ public class TestNIOConnPool {
         pool.requestCompleted(sessionRequest1);
         pool.requestCompleted(sessionRequest2);
 
-        Assert.assertTrue(callback1.isCompleted());
-        LocalPoolEntry entry1 = callback1.getEntry();
+        Assert.assertTrue(future1.isDone());
+        LocalPoolEntry entry1 = future1.get();
         Assert.assertNotNull(entry1);
-        Assert.assertTrue(callback2.isCompleted());
-        LocalPoolEntry entry2 = callback2.getEntry();
+        Assert.assertTrue(future2.isDone());
+        LocalPoolEntry entry2 = future2.get();
         Assert.assertNotNull(entry2);
 
-        Assert.assertFalse(callback3.isCompleted());
-        Assert.assertFalse(callback4.isCompleted());
+        Assert.assertFalse(future3.isDone());
+        Assert.assertFalse(future4.isDone());
 
         PoolStats totals = pool.getTotalStats();
         Assert.assertEquals(0, totals.getAvailable());
@@ -573,11 +527,11 @@ public class TestNIOConnPool {
         pool.requestCompleted(sessionRequest3);
         pool.requestCompleted(sessionRequest4);
 
-        Assert.assertTrue(callback3.isCompleted());
-        LocalPoolEntry entry3 = callback3.getEntry();
+        Assert.assertTrue(future3.isDone());
+        LocalPoolEntry entry3 = future3.get();
         Assert.assertNotNull(entry3);
-        Assert.assertTrue(callback4.isCompleted());
-        LocalPoolEntry entry4 = callback4.getEntry();
+        Assert.assertTrue(future4.isDone());
+        LocalPoolEntry entry4 = future4.get();
         Assert.assertNotNull(entry4);
 
         totals = pool.getTotalStats();
@@ -585,10 +539,8 @@ public class TestNIOConnPool {
         Assert.assertEquals(2, totals.getLeased());
         Assert.assertEquals(0, totals.getPending());
 
-        BasicPoolEntryCallback callback5 = new BasicPoolEntryCallback();
-        pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback5);
-        BasicPoolEntryCallback callback6 = new BasicPoolEntryCallback();
-        pool.lease("otherhost", null, -1, TimeUnit.MILLISECONDS, callback6);
+        Future<LocalPoolEntry> future5 = pool.lease("somehost", null);
+        Future<LocalPoolEntry> future6 = pool.lease("otherhost", null);
 
         Mockito.verify(ioreactor, Mockito.times(2)).connect(
                 Mockito.eq(InetSocketAddress.createUnresolved("somehost", 80)),
@@ -615,11 +567,11 @@ public class TestNIOConnPool {
 
         pool.requestCompleted(sessionRequest1);
 
-        Assert.assertTrue(callback5.isCompleted());
-        LocalPoolEntry entry5 = callback5.getEntry();
+        Assert.assertTrue(future5.isDone());
+        LocalPoolEntry entry5 = future5.get();
         Assert.assertNotNull(entry5);
-        Assert.assertTrue(callback6.isCompleted());
-        LocalPoolEntry entry6 = callback6.getEntry();
+        Assert.assertTrue(future6.isDone());
+        LocalPoolEntry entry6 = future6.get();
         Assert.assertNotNull(entry6);
 
         totals = pool.getTotalStats();
@@ -663,8 +615,7 @@ public class TestNIOConnPool {
 
         LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 2);
 
-        BasicPoolEntryCallback callback1 = new BasicPoolEntryCallback();
-        pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback1);
+        Future<LocalPoolEntry> future1 = pool.lease("somehost", null);
 
         Mockito.verify(ioreactor, Mockito.times(1)).connect(
                 Mockito.any(SocketAddress.class), Mockito.any(SocketAddress.class),
@@ -672,8 +623,8 @@ public class TestNIOConnPool {
 
         pool.requestCompleted(sessionRequest1);
 
-        Assert.assertTrue(callback1.isCompleted());
-        LocalPoolEntry entry1 = callback1.getEntry();
+        Assert.assertTrue(future1.isDone());
+        LocalPoolEntry entry1 = future1.get();
         Assert.assertNotNull(entry1);
 
         entry1.updateExpiry(1, TimeUnit.MILLISECONDS);
@@ -681,10 +632,9 @@ public class TestNIOConnPool {
 
         Thread.sleep(200L);
 
-        BasicPoolEntryCallback callback2 = new BasicPoolEntryCallback();
-        pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback2);
+        Future<LocalPoolEntry> future2 = pool.lease("somehost", null);
 
-        Assert.assertFalse(callback2.isCompleted());
+        Assert.assertFalse(future2.isDone());
 
         Mockito.verify(iosession1).close();
         Mockito.verify(ioreactor, Mockito.times(2)).connect(
@@ -722,19 +672,17 @@ public class TestNIOConnPool {
 
         LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 2);
 
-        BasicPoolEntryCallback callback1 = new BasicPoolEntryCallback();
-        pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback1);
-        BasicPoolEntryCallback callback2 = new BasicPoolEntryCallback();
-        pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback2);
+        Future<LocalPoolEntry> future1 = pool.lease("somehost", null);
+        Future<LocalPoolEntry> future2 = pool.lease("somehost", null);
 
         pool.requestCompleted(sessionRequest1);
         pool.requestCompleted(sessionRequest2);
 
-        Assert.assertTrue(callback1.isCompleted());
-        LocalPoolEntry entry1 = callback1.getEntry();
+        Assert.assertTrue(future1.isDone());
+        LocalPoolEntry entry1 = future1.get();
         Assert.assertNotNull(entry1);
-        Assert.assertTrue(callback2.isCompleted());
-        LocalPoolEntry entry2 = callback2.getEntry();
+        Assert.assertTrue(future2.isDone());
+        LocalPoolEntry entry2 = future2.get();
         Assert.assertNotNull(entry2);
 
         entry1.updateExpiry(1, TimeUnit.MILLISECONDS);
@@ -780,19 +728,17 @@ public class TestNIOConnPool {
 
         LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 2);
 
-        BasicPoolEntryCallback callback1 = new BasicPoolEntryCallback();
-        pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback1);
-        BasicPoolEntryCallback callback2 = new BasicPoolEntryCallback();
-        pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback2);
+        Future<LocalPoolEntry> future1 = pool.lease("somehost", null);
+        Future<LocalPoolEntry> future2 = pool.lease("somehost", null);
 
         pool.requestCompleted(sessionRequest1);
         pool.requestCompleted(sessionRequest2);
 
-        Assert.assertTrue(callback1.isCompleted());
-        LocalPoolEntry entry1 = callback1.getEntry();
+        Assert.assertTrue(future1.isDone());
+        LocalPoolEntry entry1 = future1.get();
         Assert.assertNotNull(entry1);
-        Assert.assertTrue(callback2.isCompleted());
-        LocalPoolEntry entry2 = callback2.getEntry();
+        Assert.assertTrue(future2.isDone());
+        LocalPoolEntry entry2 = future2.get();
         Assert.assertNotNull(entry2);
 
         entry1.updateExpiry(0, TimeUnit.MILLISECONDS);
@@ -880,7 +826,7 @@ public class TestNIOConnPool {
         pool.shutdown(1000);
         Mockito.verify(ioreactor, Mockito.times(1)).shutdown(1000);
         try {
-            pool.lease("somehost", null, 0, TimeUnit.MILLISECONDS, new BasicPoolEntryCallback());
+            pool.lease("somehost", null);
             Assert.fail("IllegalStateException should have been thrown");
         } catch (IllegalStateException expected) {
         }

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/pool/TestRouteSpecificPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/pool/TestRouteSpecificPool.java?rev=1152181&r1=1152180&r2=1152181&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/pool/TestRouteSpecificPool.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/pool/TestRouteSpecificPool.java Fri Jul 29 12:13:04 2011
@@ -28,9 +28,11 @@ package org.apache.http.nio.pool;
 
 import java.io.IOException;
 import java.net.SocketTimeoutException;
+import java.util.concurrent.ExecutionException;
 
 import junit.framework.Assert;
 
+import org.apache.http.concurrent.BasicFuture;
 import org.apache.http.nio.reactor.IOSession;
 import org.apache.http.nio.reactor.SessionRequest;
 import org.apache.http.pool.PoolEntry;
@@ -46,62 +48,24 @@ public class TestRouteSpecificPool {
         }
 
     }
-    
-    static class BasicPoolEntryCallback implements PoolEntryCallback<LocalPoolEntry> {
 
-        private LocalPoolEntry entry;
-        private Exception ex;
-        private boolean completed;
-        private boolean failed;
-        private boolean cancelled;
-
-        public void completed(final LocalPoolEntry entry) {
-            this.entry = entry;
-            this.completed = true;
-        }
-
-        public LocalPoolEntry getEntry() {
-            return this.entry;
-        }
-
-        public Exception getException() {
-            return this.ex;
-        }
-
-        public void failed(final Exception ex) {
-            this.ex = ex;
-            this.failed = true;
-        }
-
-        public void cancelled() {
-            this.cancelled = true;
-        }
-
-        public boolean isCompleted() {
-            return this.completed;
-        }
-
-        public boolean isFailed() {
-            return this.failed;
-        }
-
-        public boolean isCancelled() {
-            return this.cancelled;
-        }
-
-    }    
-
-    static class LocalRoutePool extends RouteSpecificPool<String, LocalPoolEntry> {
+    static class LocalRoutePool extends RouteSpecificPool<String, IOSession, LocalPoolEntry> {
 
         public LocalRoutePool() {
             super("whatever");
         }
 
         @Override
-        protected LocalPoolEntry createEntry(String route, IOSession session) {
+        protected LocalPoolEntry createEntry(final String route, final IOSession session) {
             return new LocalPoolEntry(route, session);
         }
 
+        @Override
+        protected void closeEntry(final LocalPoolEntry entry) {
+            IOSession session = entry.getConnection();
+            session.close();
+        }
+
     };
 
     @Test
@@ -120,18 +84,17 @@ public class TestRouteSpecificPool {
         IOSession session = Mockito.mock(IOSession.class);
         SessionRequest sessionRequest = Mockito.mock(SessionRequest.class);
         Mockito.when(sessionRequest.getSession()).thenReturn(session);
-        BasicPoolEntryCallback callback = new BasicPoolEntryCallback();
-        pool.addPending(sessionRequest, callback);
+        BasicFuture<LocalPoolEntry> future = new BasicFuture<LocalPoolEntry>(null);
+        pool.addPending(sessionRequest, future);
         Assert.assertEquals(1, pool.getAllocatedCount());
         Assert.assertEquals(0, pool.getAvailableCount());
         Assert.assertEquals(0, pool.getLeasedCount());
         Assert.assertEquals(1, pool.getPendingCount());
-        LocalPoolEntry entry = pool.completed(sessionRequest);
+        LocalPoolEntry entry = pool.completed(sessionRequest, session);
         Assert.assertNotNull(entry);
         Assert.assertSame(session, entry.getConnection());
-        Assert.assertTrue(callback.isCompleted());
-        Assert.assertFalse(callback.isFailed());
-        Assert.assertFalse(callback.isCancelled());
+        Assert.assertTrue(future.isDone());
+        Assert.assertFalse(future.isCancelled());
 
         Assert.assertEquals(1, pool.getAllocatedCount());
         Assert.assertEquals(0, pool.getAvailableCount());
@@ -142,20 +105,23 @@ public class TestRouteSpecificPool {
     @Test
     public void testFailedConnect() throws Exception {
         LocalRoutePool pool = new LocalRoutePool();
-        IOException ex = new IOException();
         SessionRequest sessionRequest = Mockito.mock(SessionRequest.class);
-        Mockito.when(sessionRequest.getException()).thenReturn(ex);
-        BasicPoolEntryCallback callback = new BasicPoolEntryCallback();
-        pool.addPending(sessionRequest, callback);
+        Mockito.when(sessionRequest.getException()).thenReturn(new IOException());
+        BasicFuture<LocalPoolEntry> future = new BasicFuture<LocalPoolEntry>(null);
+        pool.addPending(sessionRequest, future);
         Assert.assertEquals(1, pool.getAllocatedCount());
         Assert.assertEquals(0, pool.getAvailableCount());
         Assert.assertEquals(0, pool.getLeasedCount());
         Assert.assertEquals(1, pool.getPendingCount());
         pool.failed(sessionRequest);
-        Assert.assertFalse(callback.isCompleted());
-        Assert.assertTrue(callback.isFailed());
-        Assert.assertFalse(callback.isCancelled());
-
+        Assert.assertTrue(future.isDone());
+        Assert.assertFalse(future.isCancelled());
+        try {
+            future.get();
+            Assert.fail("ExecutionException should have been thrown");
+        } catch (ExecutionException ex) {
+            Assert.assertTrue(ex.getCause() instanceof IOException);
+        }
         Assert.assertEquals(0, pool.getAllocatedCount());
         Assert.assertEquals(0, pool.getAvailableCount());
         Assert.assertEquals(0, pool.getLeasedCount());
@@ -166,16 +132,15 @@ public class TestRouteSpecificPool {
     public void testCancelledConnect() throws Exception {
         LocalRoutePool pool = new LocalRoutePool();
         SessionRequest sessionRequest = Mockito.mock(SessionRequest.class);
-        BasicPoolEntryCallback callback = new BasicPoolEntryCallback();
-        pool.addPending(sessionRequest, callback);
+        BasicFuture<LocalPoolEntry> future = new BasicFuture<LocalPoolEntry>(null);
+        pool.addPending(sessionRequest, future);
         Assert.assertEquals(1, pool.getAllocatedCount());
         Assert.assertEquals(0, pool.getAvailableCount());
         Assert.assertEquals(0, pool.getLeasedCount());
         Assert.assertEquals(1, pool.getPendingCount());
         pool.cancelled(sessionRequest);
-        Assert.assertFalse(callback.isCompleted());
-        Assert.assertFalse(callback.isFailed());
-        Assert.assertTrue(callback.isCancelled());
+        Assert.assertTrue(future.isDone());
+        Assert.assertTrue(future.isCancelled());
 
         Assert.assertEquals(0, pool.getAllocatedCount());
         Assert.assertEquals(0, pool.getAvailableCount());
@@ -187,18 +152,21 @@ public class TestRouteSpecificPool {
     public void testConnectTimeout() throws Exception {
         LocalRoutePool pool = new LocalRoutePool();
         SessionRequest sessionRequest = Mockito.mock(SessionRequest.class);
-        BasicPoolEntryCallback callback = new BasicPoolEntryCallback();
-        pool.addPending(sessionRequest, callback);
+        BasicFuture<LocalPoolEntry> future = new BasicFuture<LocalPoolEntry>(null);
+        pool.addPending(sessionRequest, future);
         Assert.assertEquals(1, pool.getAllocatedCount());
         Assert.assertEquals(0, pool.getAvailableCount());
         Assert.assertEquals(0, pool.getLeasedCount());
         Assert.assertEquals(1, pool.getPendingCount());
         pool.timeout(sessionRequest);
-        Assert.assertFalse(callback.isCompleted());
-        Assert.assertTrue(callback.isFailed());
-        Assert.assertFalse(callback.isCancelled());
-        Assert.assertTrue(callback.getException() instanceof SocketTimeoutException);
-
+        Assert.assertTrue(future.isDone());
+        Assert.assertFalse(future.isCancelled());
+        try {
+            future.get();
+            Assert.fail("ExecutionException should have been thrown");
+        } catch (ExecutionException ex) {
+            Assert.assertTrue(ex.getCause() instanceof SocketTimeoutException);
+        }
         Assert.assertEquals(0, pool.getAllocatedCount());
         Assert.assertEquals(0, pool.getAvailableCount());
         Assert.assertEquals(0, pool.getLeasedCount());
@@ -211,26 +179,29 @@ public class TestRouteSpecificPool {
         IOSession session1 = Mockito.mock(IOSession.class);
         SessionRequest sessionRequest1 = Mockito.mock(SessionRequest.class);
         Mockito.when(sessionRequest1.getSession()).thenReturn(session1);
-        pool.addPending(sessionRequest1, new BasicPoolEntryCallback());
+        BasicFuture<LocalPoolEntry> future1 = new BasicFuture<LocalPoolEntry>(null);
+        pool.addPending(sessionRequest1, future1);
         IOSession session2 = Mockito.mock(IOSession.class);
         SessionRequest sessionRequest2 = Mockito.mock(SessionRequest.class);
         Mockito.when(sessionRequest2.getSession()).thenReturn(session2);
-        pool.addPending(sessionRequest2, new BasicPoolEntryCallback());
+        BasicFuture<LocalPoolEntry> future2 = new BasicFuture<LocalPoolEntry>(null);
+        pool.addPending(sessionRequest2, future2);
         IOSession session3 = Mockito.mock(IOSession.class);
         SessionRequest sessionRequest3 = Mockito.mock(SessionRequest.class);
         Mockito.when(sessionRequest3.getSession()).thenReturn(session3);
-        pool.addPending(sessionRequest3, new BasicPoolEntryCallback());
+        BasicFuture<LocalPoolEntry> future3 = new BasicFuture<LocalPoolEntry>(null);
+        pool.addPending(sessionRequest3, future3);
 
         Assert.assertEquals(3, pool.getAllocatedCount());
         Assert.assertEquals(0, pool.getAvailableCount());
         Assert.assertEquals(0, pool.getLeasedCount());
         Assert.assertEquals(3, pool.getPendingCount());
 
-        LocalPoolEntry entry1 = pool.completed(sessionRequest1);
+        LocalPoolEntry entry1 = pool.completed(sessionRequest1, session1);
         Assert.assertNotNull(entry1);
-        LocalPoolEntry entry2 = pool.completed(sessionRequest2);
+        LocalPoolEntry entry2 = pool.completed(sessionRequest2, session2);
         Assert.assertNotNull(entry2);
-        LocalPoolEntry entry3 = pool.completed(sessionRequest3);
+        LocalPoolEntry entry3 = pool.completed(sessionRequest3, session3);
         Assert.assertNotNull(entry3);
 
         Assert.assertEquals(3, pool.getAllocatedCount());
@@ -260,24 +231,28 @@ public class TestRouteSpecificPool {
     @Test
     public void testLeaseReleaseStateful() throws Exception {
         LocalRoutePool pool = new LocalRoutePool();
+
         IOSession session1 = Mockito.mock(IOSession.class);
         SessionRequest sessionRequest1 = Mockito.mock(SessionRequest.class);
         Mockito.when(sessionRequest1.getSession()).thenReturn(session1);
-        pool.addPending(sessionRequest1, new BasicPoolEntryCallback());
+        BasicFuture<LocalPoolEntry> future1 = new BasicFuture<LocalPoolEntry>(null);
+        pool.addPending(sessionRequest1, future1);
         IOSession session2 = Mockito.mock(IOSession.class);
         SessionRequest sessionRequest2 = Mockito.mock(SessionRequest.class);
         Mockito.when(sessionRequest2.getSession()).thenReturn(session2);
-        pool.addPending(sessionRequest2, new BasicPoolEntryCallback());
+        BasicFuture<LocalPoolEntry> future2 = new BasicFuture<LocalPoolEntry>(null);
+        pool.addPending(sessionRequest2, future2);
         IOSession session3 = Mockito.mock(IOSession.class);
         SessionRequest sessionRequest3 = Mockito.mock(SessionRequest.class);
         Mockito.when(sessionRequest3.getSession()).thenReturn(session3);
-        pool.addPending(sessionRequest3, new BasicPoolEntryCallback());
+        BasicFuture<LocalPoolEntry> future3 = new BasicFuture<LocalPoolEntry>(null);
+        pool.addPending(sessionRequest3, future3);
 
-        LocalPoolEntry entry1 = pool.completed(sessionRequest1);
+        LocalPoolEntry entry1 = pool.completed(sessionRequest1, session1);
         Assert.assertNotNull(entry1);
-        LocalPoolEntry entry2 = pool.completed(sessionRequest2);
+        LocalPoolEntry entry2 = pool.completed(sessionRequest2, session2);
         Assert.assertNotNull(entry2);
-        LocalPoolEntry entry3 = pool.completed(sessionRequest3);
+        LocalPoolEntry entry3 = pool.completed(sessionRequest3, session3);
         Assert.assertNotNull(entry3);
 
         entry2.setState(Boolean.FALSE);
@@ -319,26 +294,29 @@ public class TestRouteSpecificPool {
         IOSession session1 = Mockito.mock(IOSession.class);
         SessionRequest sessionRequest1 = Mockito.mock(SessionRequest.class);
         Mockito.when(sessionRequest1.getSession()).thenReturn(session1);
-        pool.addPending(sessionRequest1, new BasicPoolEntryCallback());
+        BasicFuture<LocalPoolEntry> future1 = new BasicFuture<LocalPoolEntry>(null);
+        pool.addPending(sessionRequest1, future1);
         IOSession session2 = Mockito.mock(IOSession.class);
         SessionRequest sessionRequest2 = Mockito.mock(SessionRequest.class);
         Mockito.when(sessionRequest2.getSession()).thenReturn(session2);
-        pool.addPending(sessionRequest2, new BasicPoolEntryCallback());
+        BasicFuture<LocalPoolEntry> future2 = new BasicFuture<LocalPoolEntry>(null);
+        pool.addPending(sessionRequest2, future2);
         IOSession session3 = Mockito.mock(IOSession.class);
         SessionRequest sessionRequest3 = Mockito.mock(SessionRequest.class);
         Mockito.when(sessionRequest3.getSession()).thenReturn(session3);
-        pool.addPending(sessionRequest3, new BasicPoolEntryCallback());
+        BasicFuture<LocalPoolEntry> future3 = new BasicFuture<LocalPoolEntry>(null);
+        pool.addPending(sessionRequest3, future3);
 
         Assert.assertEquals(3, pool.getAllocatedCount());
         Assert.assertEquals(0, pool.getAvailableCount());
         Assert.assertEquals(0, pool.getLeasedCount());
         Assert.assertEquals(3, pool.getPendingCount());
 
-        LocalPoolEntry entry1 = pool.completed(sessionRequest1);
+        LocalPoolEntry entry1 = pool.completed(sessionRequest1, session1);
         Assert.assertNotNull(entry1);
-        LocalPoolEntry entry2 = pool.completed(sessionRequest2);
+        LocalPoolEntry entry2 = pool.completed(sessionRequest2, session2);
         Assert.assertNotNull(entry2);
-        LocalPoolEntry entry3 = pool.completed(sessionRequest3);
+        LocalPoolEntry entry3 = pool.completed(sessionRequest3, session3);
         Assert.assertNotNull(entry3);
 
         Assert.assertEquals(3, pool.getAllocatedCount());
@@ -389,19 +367,22 @@ public class TestRouteSpecificPool {
         IOSession session1 = Mockito.mock(IOSession.class);
         SessionRequest sessionRequest1 = Mockito.mock(SessionRequest.class);
         Mockito.when(sessionRequest1.getSession()).thenReturn(session1);
-        pool.addPending(sessionRequest1, new BasicPoolEntryCallback());
+        BasicFuture<LocalPoolEntry> future1 = new BasicFuture<LocalPoolEntry>(null);
+        pool.addPending(sessionRequest1, future1);
         IOSession session2 = Mockito.mock(IOSession.class);
         SessionRequest sessionRequest2 = Mockito.mock(SessionRequest.class);
         Mockito.when(sessionRequest2.getSession()).thenReturn(session2);
-        pool.addPending(sessionRequest2, new BasicPoolEntryCallback());
+        BasicFuture<LocalPoolEntry> future2 = new BasicFuture<LocalPoolEntry>(null);
+        pool.addPending(sessionRequest2, future2);
         IOSession session3 = Mockito.mock(IOSession.class);
         SessionRequest sessionRequest3 = Mockito.mock(SessionRequest.class);
         Mockito.when(sessionRequest3.getSession()).thenReturn(session3);
-        pool.addPending(sessionRequest3, new BasicPoolEntryCallback());
+        BasicFuture<LocalPoolEntry> future3 = new BasicFuture<LocalPoolEntry>(null);
+        pool.addPending(sessionRequest3, future3);
 
-        LocalPoolEntry entry1 = pool.completed(sessionRequest1);
+        LocalPoolEntry entry1 = pool.completed(sessionRequest1, session1);
         Assert.assertNotNull(entry1);
-        LocalPoolEntry entry2 = pool.completed(sessionRequest2);
+        LocalPoolEntry entry2 = pool.completed(sessionRequest2, session2);
         Assert.assertNotNull(entry2);
 
         pool.free(entry1, true);