You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2011/07/29 14:13:05 UTC
svn commit: r1152181 - in /httpcomponents/httpcore/trunk/httpcore-nio/src:
main/java/org/apache/http/nio/pool/ test/java/org/apache/http/nio/pool/
Author: olegk
Date: Fri Jul 29 12:13:04 2011
New Revision: 1152181
URL: http://svn.apache.org/viewvc?rev=1152181&view=rev
Log:
NIO conn pool to use standard Future interface instead of a custom callback
Removed:
httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/PoolEntryCallback.java
Modified:
httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/AbstractNIOConnPool.java
httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/LeaseRequest.java
httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/RouteSpecificPool.java
httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/pool/TestNIOConnPool.java
httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/pool/TestRouteSpecificPool.java
Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/AbstractNIOConnPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/AbstractNIOConnPool.java?rev=1152181&r1=1152180&r2=1152181&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/AbstractNIOConnPool.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/AbstractNIOConnPool.java Fri Jul 29 12:13:04 2011
@@ -35,10 +35,13 @@ import java.util.LinkedList;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.http.concurrent.BasicFuture;
+import org.apache.http.concurrent.FutureCallback;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOSession;
import org.apache.http.nio.reactor.SessionRequest;
@@ -46,12 +49,12 @@ import org.apache.http.nio.reactor.Sessi
import org.apache.http.pool.PoolEntry;
import org.apache.http.pool.PoolStats;
-public abstract class AbstractNIOConnPool<T, E extends PoolEntry<T, IOSession>> {
+public abstract class AbstractNIOConnPool<T, C, E extends PoolEntry<T, C>> {
private final ConnectingIOReactor ioreactor;
private final SessionRequestCallback sessionRequestCallback;
- private final Map<T, RouteSpecificPool<T, E>> routeToPool;
- private final LinkedList<LeaseRequest<T, E>> leasingRequests;
+ private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
+ private final LinkedList<LeaseRequest<T, C, E>> leasingRequests;
private final Set<SessionRequest> pending;
private final Set<E> leased;
private final LinkedList<E> available;
@@ -78,8 +81,8 @@ public abstract class AbstractNIOConnPoo
}
this.ioreactor = ioreactor;
this.sessionRequestCallback = new InternalSessionRequestCallback();
- this.routeToPool = new HashMap<T, RouteSpecificPool<T, E>>();
- this.leasingRequests = new LinkedList<LeaseRequest<T, E>>();
+ this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
+ this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>();
this.pending = new HashSet<SessionRequest>();
this.leased = new HashSet<E>();
this.available = new LinkedList<E>();
@@ -93,7 +96,9 @@ public abstract class AbstractNIOConnPoo
protected abstract SocketAddress resolveLocalAddress(T route);
- protected abstract E createEntry(T route, IOSession session);
+ protected abstract C createConnection(T route, IOSession session);
+
+ protected abstract E createEntry(T route, C conn);
protected abstract void closeEntry(E entry);
@@ -117,7 +122,7 @@ public abstract class AbstractNIOConnPoo
for (E entry: this.leased) {
closeEntry(entry);
}
- for (RouteSpecificPool<T, E> pool: this.routeToPool.values()) {
+ for (RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
pool.shutdown();
}
this.routeToPool.clear();
@@ -131,14 +136,19 @@ public abstract class AbstractNIOConnPoo
}
}
- private RouteSpecificPool<T, E> getPool(final T route) {
- RouteSpecificPool<T, E> pool = this.routeToPool.get(route);
+ private RouteSpecificPool<T, C, E> getPool(final T route) {
+ RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
if (pool == null) {
- pool = new RouteSpecificPool<T, E>(route) {
+ pool = new RouteSpecificPool<T, C, E>(route) {
@Override
- protected E createEntry(final T route, final IOSession session) {
- return AbstractNIOConnPool.this.createEntry(route, session);
+ protected E createEntry(final T route, final C conn) {
+ return AbstractNIOConnPool.this.createEntry(route, conn);
+ }
+
+ @Override
+ protected void closeEntry(final E entry) {
+ AbstractNIOConnPool.this.closeEntry(entry);
}
};
@@ -147,19 +157,16 @@ public abstract class AbstractNIOConnPoo
return pool;
}
- public void lease(
+ public Future<E> lease(
final T route, final Object state,
final long connectTimeout, final TimeUnit tunit,
- final PoolEntryCallback<E> callback) {
+ final FutureCallback<E> callback) {
if (route == null) {
throw new IllegalArgumentException("Route may not be null");
}
if (tunit == null) {
throw new IllegalArgumentException("Time unit may not be null.");
}
- if (callback == null) {
- throw new IllegalArgumentException("Callback may not be null.");
- }
if (this.isShutDown) {
throw new IllegalStateException("Session pool has been shut down");
}
@@ -169,15 +176,25 @@ public abstract class AbstractNIOConnPoo
if (timeout < 0) {
timeout = 0;
}
- LeaseRequest<T, E> request = new LeaseRequest<T, E>(route, state, timeout, callback);
+ BasicFuture<E> future = new BasicFuture<E>(callback);
+ LeaseRequest<T, C, E> request = new LeaseRequest<T, C, E>(route, state, timeout, future);
this.leasingRequests.add(request);
processPendingRequests();
+ return future;
} finally {
this.lock.unlock();
}
}
+ public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
+ return lease(route, state, -1, TimeUnit.MICROSECONDS, callback);
+ }
+
+ public Future<E> lease(final T route, final Object state) {
+ return lease(route, state, -1, TimeUnit.MICROSECONDS, null);
+ }
+
public void release(final E entry, boolean reusable) {
if (this.isShutDown) {
return;
@@ -185,7 +202,7 @@ public abstract class AbstractNIOConnPoo
this.lock.lock();
try {
if (this.leased.remove(entry)) {
- RouteSpecificPool<T, E> pool = getPool(entry.getRoute());
+ RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
pool.free(entry, reusable);
if (reusable) {
this.available.add(entry);
@@ -200,16 +217,16 @@ public abstract class AbstractNIOConnPoo
}
private void processPendingRequests() {
- ListIterator<LeaseRequest<T, E>> it = this.leasingRequests.listIterator();
+ ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
while (it.hasNext()) {
- LeaseRequest<T, E> request = it.next();
+ LeaseRequest<T, C, E> request = it.next();
T route = request.getRoute();
Object state = request.getState();
int timeout = request.getConnectTimeout();
- PoolEntryCallback<E> callback = request.getCallback();
+ BasicFuture<E> future = request.getFuture();
- RouteSpecificPool<T, E> pool = getPool(request.getRoute());
+ RouteSpecificPool<T, C, E> pool = getPool(request.getRoute());
E entry = null;
for (;;) {
entry = pool.getFree(state);
@@ -228,7 +245,7 @@ public abstract class AbstractNIOConnPoo
it.remove();
this.available.remove(entry);
this.leased.add(entry);
- callback.completed(entry);
+ future.completed(entry);
continue;
}
if (pool.getAllocatedCount() < getMaxPerRoute(route)) {
@@ -249,7 +266,7 @@ public abstract class AbstractNIOConnPoo
this.sessionRequestCallback);
sessionRequest.setConnectTimeout(timeout);
this.pending.add(sessionRequest);
- pool.addPending(sessionRequest, callback);
+ pool.addPending(sessionRequest, future);
}
}
}
@@ -258,7 +275,7 @@ public abstract class AbstractNIOConnPoo
if (!this.available.isEmpty()) {
E entry = this.available.removeFirst();
closeEntry(entry);
- RouteSpecificPool<T, E> pool = getPool(entry.getRoute());
+ RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
pool.remove(entry);
}
}
@@ -272,8 +289,10 @@ public abstract class AbstractNIOConnPoo
this.lock.lock();
try {
this.pending.remove(request);
- RouteSpecificPool<T, E> pool = getPool(route);
- E entry = pool.completed(request);
+ RouteSpecificPool<T, C, E> pool = getPool(route);
+ IOSession session = request.getSession();
+ C conn = createConnection(route, session);
+ E entry = pool.completed(request, conn);
this.leased.add(entry);
} finally {
this.lock.unlock();
@@ -289,7 +308,7 @@ public abstract class AbstractNIOConnPoo
this.lock.lock();
try {
this.pending.remove(request);
- RouteSpecificPool<T, E> pool = getPool(route);
+ RouteSpecificPool<T, C, E> pool = getPool(route);
pool.cancelled(request);
} finally {
this.lock.unlock();
@@ -305,7 +324,7 @@ public abstract class AbstractNIOConnPoo
this.lock.lock();
try {
this.pending.remove(request);
- RouteSpecificPool<T, E> pool = getPool(route);
+ RouteSpecificPool<T, C, E> pool = getPool(route);
pool.failed(request);
} finally {
this.lock.unlock();
@@ -321,7 +340,7 @@ public abstract class AbstractNIOConnPoo
this.lock.lock();
try {
this.pending.remove(request);
- RouteSpecificPool<T, E> pool = getPool(route);
+ RouteSpecificPool<T, C, E> pool = getPool(route);
pool.timeout(request);
} finally {
this.lock.unlock();
@@ -395,7 +414,7 @@ public abstract class AbstractNIOConnPoo
}
this.lock.lock();
try {
- RouteSpecificPool<T, E> pool = getPool(route);
+ RouteSpecificPool<T, C, E> pool = getPool(route);
return new PoolStats(
pool.getLeasedCount(),
pool.getPendingCount(),
@@ -422,7 +441,7 @@ public abstract class AbstractNIOConnPoo
E entry = it.next();
if (entry.getUpdated() <= deadline) {
closeEntry(entry);
- RouteSpecificPool<T, E> pool = getPool(entry.getRoute());
+ RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
pool.remove(entry);
it.remove();
}
@@ -442,7 +461,7 @@ public abstract class AbstractNIOConnPoo
E entry = it.next();
if (entry.isExpired(now)) {
closeEntry(entry);
- RouteSpecificPool<T, E> pool = getPool(entry.getRoute());
+ RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
pool.remove(entry);
it.remove();
}
Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/LeaseRequest.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/LeaseRequest.java?rev=1152181&r1=1152180&r2=1152181&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/LeaseRequest.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/LeaseRequest.java Fri Jul 29 12:13:04 2011
@@ -26,26 +26,26 @@
*/
package org.apache.http.nio.pool;
-import org.apache.http.nio.reactor.IOSession;
+import org.apache.http.concurrent.BasicFuture;
import org.apache.http.pool.PoolEntry;
-class LeaseRequest<T, E extends PoolEntry<T, IOSession>> {
+class LeaseRequest<T, C, E extends PoolEntry<T, C>> {
private final T route;
private final Object state;
private final int connectTimeout;
- private final PoolEntryCallback<E> callback;
+ private final BasicFuture<E> future;
public LeaseRequest(
final T route,
final Object state,
final int connectTimeout,
- final PoolEntryCallback<E> callback) {
+ final BasicFuture<E> future) {
super();
this.route = route;
this.state = state;
this.connectTimeout = connectTimeout;
- this.callback = callback;
+ this.future = future;
}
public T getRoute() {
@@ -56,8 +56,8 @@ class LeaseRequest<T, E extends PoolEntr
return this.state;
}
- public PoolEntryCallback<E> getCallback() {
- return this.callback;
+ public BasicFuture<E> getFuture() {
+ return this.future;
}
public int getConnectTimeout() {
Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/RouteSpecificPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/RouteSpecificPool.java?rev=1152181&r1=1152180&r2=1152181&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/RouteSpecificPool.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/pool/RouteSpecificPool.java Fri Jul 29 12:13:04 2011
@@ -34,26 +34,28 @@ import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
-import org.apache.http.nio.reactor.IOSession;
+import org.apache.http.concurrent.BasicFuture;
import org.apache.http.nio.reactor.SessionRequest;
import org.apache.http.pool.PoolEntry;
-abstract class RouteSpecificPool<T, E extends PoolEntry<T, IOSession>> {
+abstract class RouteSpecificPool<T, C, E extends PoolEntry<T, C>> {
private final T route;
private final Set<E> leased;
private final LinkedList<E> available;
- private final Map<SessionRequest, PoolEntryCallback<E>> pending;
+ private final Map<SessionRequest, BasicFuture<E>> pending;
RouteSpecificPool(final T route) {
super();
this.route = route;
this.leased = new HashSet<E>();
this.available = new LinkedList<E>();
- this.pending = new HashMap<SessionRequest, PoolEntryCallback<E>>();
+ this.pending = new HashMap<SessionRequest, BasicFuture<E>>();
}
- protected abstract E createEntry(T route, IOSession session);
+ protected abstract E createEntry(T route, C conn);
+
+ protected abstract void closeEntry(E entry);
public int getLeasedCount() {
return this.leased.size();
@@ -125,40 +127,39 @@ abstract class RouteSpecificPool<T, E ex
public void addPending(
final SessionRequest sessionRequest,
- final PoolEntryCallback<E> callback) {
- this.pending.put(sessionRequest, callback);
+ final BasicFuture<E> future) {
+ this.pending.put(sessionRequest, future);
}
- private PoolEntryCallback<E> removeRequest(final SessionRequest request) {
- PoolEntryCallback<E> callback = this.pending.remove(request);
- if (callback == null) {
+ private BasicFuture<E> removeRequest(final SessionRequest request) {
+ BasicFuture<E> future = this.pending.remove(request);
+ if (future == null) {
throw new IllegalStateException("Invalid session request");
}
- return callback;
+ return future;
}
- public E completed(final SessionRequest request) {
- PoolEntryCallback<E> callback = removeRequest(request);
- IOSession iosession = request.getSession();
- E entry = createEntry(this.route, iosession);
+ public E completed(final SessionRequest request, final C conn) {
+ BasicFuture<E> future = removeRequest(request);
+ E entry = createEntry(this.route, conn);
this.leased.add(entry);
- callback.completed(entry);
+ future.completed(entry);
return entry;
}
public void cancelled(final SessionRequest request) {
- PoolEntryCallback<E> callback = removeRequest(request);
- callback.cancelled();
+ BasicFuture<E> future = removeRequest(request);
+ future.cancel(true);
}
public void failed(final SessionRequest request) {
- PoolEntryCallback<E> callback = removeRequest(request);
- callback.failed(request.getException());
+ BasicFuture<E> future = removeRequest(request);
+ future.failed(request.getException());
}
public void timeout(final SessionRequest request) {
- PoolEntryCallback<E> callback = removeRequest(request);
- callback.failed(new SocketTimeoutException());
+ BasicFuture<E> future = removeRequest(request);
+ future.failed(new SocketTimeoutException());
}
public void shutdown() {
@@ -167,11 +168,11 @@ abstract class RouteSpecificPool<T, E ex
}
this.pending.clear();
for (E entry: this.available) {
- entry.getConnection().close();
+ closeEntry(entry);
}
this.available.clear();
for (E entry: this.leased) {
- entry.getConnection().close();
+ closeEntry(entry);
}
this.leased.clear();
}
Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/pool/TestNIOConnPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/pool/TestNIOConnPool.java?rev=1152181&r1=1152180&r2=1152181&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/pool/TestNIOConnPool.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/pool/TestNIOConnPool.java Fri Jul 29 12:13:04 2011
@@ -26,13 +26,17 @@
*/
package org.apache.http.nio.pool;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
+import org.apache.http.concurrent.BasicFuture;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOSession;
import org.apache.http.nio.reactor.SessionRequest;
@@ -51,52 +55,8 @@ public class TestNIOConnPool {
}
}
-
- static class BasicPoolEntryCallback implements PoolEntryCallback<LocalPoolEntry> {
- private LocalPoolEntry entry;
- private Exception ex;
- private boolean completed;
- private boolean failed;
- private boolean cancelled;
-
- public void completed(final LocalPoolEntry entry) {
- this.entry = entry;
- this.completed = true;
- }
-
- public LocalPoolEntry getEntry() {
- return this.entry;
- }
-
- public Exception getException() {
- return this.ex;
- }
-
- public void failed(final Exception ex) {
- this.ex = ex;
- this.failed = true;
- }
-
- public void cancelled() {
- this.cancelled = true;
- }
-
- public boolean isCompleted() {
- return this.completed;
- }
-
- public boolean isFailed() {
- return this.failed;
- }
-
- public boolean isCancelled() {
- return this.cancelled;
- }
-
- }
-
- static class LocalSessionPool extends AbstractNIOConnPool<String, LocalPoolEntry> {
+ static class LocalSessionPool extends AbstractNIOConnPool<String, IOSession, LocalPoolEntry> {
public LocalSessionPool(
final ConnectingIOReactor ioreactor, int defaultMaxPerRoute, int maxTotal) {
@@ -114,6 +74,11 @@ public class TestNIOConnPool {
}
@Override
+ protected IOSession createConnection(final String route, final IOSession session) {
+ return session;
+ }
+
+ @Override
protected LocalPoolEntry createEntry(final String route, final IOSession session) {
return new LocalPoolEntry(route, session);
}
@@ -145,9 +110,9 @@ public class TestNIOConnPool {
@Test
public void testInternalLeaseRequest() throws Exception {
- LeaseRequest<String, LocalPoolEntry> leaseRequest =
- new LeaseRequest<String, LocalPoolEntry>("somehost", null, 0,
- new BasicPoolEntryCallback());
+ LeaseRequest<String, IOSession, LocalPoolEntry> leaseRequest =
+ new LeaseRequest<String, IOSession, LocalPoolEntry>("somehost", null, 0,
+ new BasicFuture<LocalPoolEntry>(null));
Assert.assertEquals("[somehost][null]", leaseRequest.toString());
}
@@ -184,8 +149,7 @@ public class TestNIOConnPool {
Mockito.any(), Mockito.any(SessionRequestCallback.class))).
thenReturn(sessionRequest);
LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 10);
- BasicPoolEntryCallback callback = new BasicPoolEntryCallback();
- pool.lease("somehost", null, 100, TimeUnit.MILLISECONDS, callback);
+ Future<LocalPoolEntry> future = pool.lease("somehost", null, 100, TimeUnit.MILLISECONDS, null);
Mockito.verify(sessionRequest).setConnectTimeout(100);
PoolStats totals = pool.getTotalStats();
@@ -194,9 +158,11 @@ public class TestNIOConnPool {
Assert.assertEquals(1, totals.getPending());
pool.requestCompleted(sessionRequest);
- Assert.assertTrue(callback.isCompleted());
- Assert.assertFalse(callback.isFailed());
- Assert.assertFalse(callback.isCancelled());
+
+ Assert.assertTrue(future.isDone());
+ Assert.assertFalse(future.isCancelled());
+ LocalPoolEntry entry = future.get();
+ Assert.assertNotNull(entry);
totals = pool.getTotalStats();
Assert.assertEquals(0, totals.getAvailable());
@@ -206,10 +172,9 @@ public class TestNIOConnPool {
@Test
public void testFailedConnect() throws Exception {
- IOSession iosession = Mockito.mock(IOSession.class);
SessionRequest sessionRequest = Mockito.mock(SessionRequest.class);
Mockito.when(sessionRequest.getAttachment()).thenReturn("somehost");
- Mockito.when(sessionRequest.getSession()).thenReturn(iosession);
+ Mockito.when(sessionRequest.getException()).thenReturn(new IOException());
ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
Mockito.when(ioreactor.connect(
Mockito.any(SocketAddress.class),
@@ -217,8 +182,7 @@ public class TestNIOConnPool {
Mockito.any(), Mockito.any(SessionRequestCallback.class))).
thenReturn(sessionRequest);
LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 10);
- BasicPoolEntryCallback callback = new BasicPoolEntryCallback();
- pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback);
+ Future<LocalPoolEntry> future = pool.lease("somehost", null);
PoolStats totals = pool.getTotalStats();
Assert.assertEquals(0, totals.getAvailable());
@@ -226,9 +190,15 @@ public class TestNIOConnPool {
Assert.assertEquals(1, totals.getPending());
pool.requestFailed(sessionRequest);
- Assert.assertFalse(callback.isCompleted());
- Assert.assertTrue(callback.isFailed());
- Assert.assertFalse(callback.isCancelled());
+
+ Assert.assertTrue(future.isDone());
+ Assert.assertFalse(future.isCancelled());
+ try {
+ future.get();
+ Assert.fail("ExecutionException should have been thrown");
+ } catch (ExecutionException ex) {
+ Assert.assertTrue(ex.getCause() instanceof IOException);
+ }
totals = pool.getTotalStats();
Assert.assertEquals(0, totals.getAvailable());
@@ -249,8 +219,7 @@ public class TestNIOConnPool {
Mockito.any(), Mockito.any(SessionRequestCallback.class))).
thenReturn(sessionRequest);
LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 10);
- BasicPoolEntryCallback callback = new BasicPoolEntryCallback();
- pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback);
+ Future<LocalPoolEntry> future = pool.lease("somehost", null);
PoolStats totals = pool.getTotalStats();
Assert.assertEquals(0, totals.getAvailable());
@@ -258,9 +227,11 @@ public class TestNIOConnPool {
Assert.assertEquals(1, totals.getPending());
pool.requestCancelled(sessionRequest);
- Assert.assertFalse(callback.isCompleted());
- Assert.assertFalse(callback.isFailed());
- Assert.assertTrue(callback.isCancelled());
+
+ Assert.assertTrue(future.isDone());
+ Assert.assertTrue(future.isCancelled());
+ LocalPoolEntry entry = future.get();
+ Assert.assertNull(entry);
totals = pool.getTotalStats();
Assert.assertEquals(0, totals.getAvailable());
@@ -281,8 +252,7 @@ public class TestNIOConnPool {
Mockito.any(), Mockito.any(SessionRequestCallback.class))).
thenReturn(sessionRequest);
LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 10);
- BasicPoolEntryCallback callback = new BasicPoolEntryCallback();
- pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback);
+ Future<LocalPoolEntry> future = pool.lease("somehost", null);
PoolStats totals = pool.getTotalStats();
Assert.assertEquals(0, totals.getAvailable());
@@ -290,10 +260,15 @@ public class TestNIOConnPool {
Assert.assertEquals(1, totals.getPending());
pool.requestTimeout(sessionRequest);
- Assert.assertFalse(callback.isCompleted());
- Assert.assertTrue(callback.isFailed());
- Assert.assertFalse(callback.isCancelled());
- Assert.assertTrue(callback.getException() instanceof SocketTimeoutException);
+
+ Assert.assertTrue(future.isDone());
+ Assert.assertFalse(future.isCancelled());
+ try {
+ future.get();
+ Assert.fail("ExecutionException should have been thrown");
+ } catch (ExecutionException ex) {
+ Assert.assertTrue(ex.getCause() instanceof SocketTimeoutException);
+ }
totals = pool.getTotalStats();
Assert.assertEquals(0, totals.getAvailable());
@@ -326,21 +301,18 @@ public class TestNIOConnPool {
thenReturn(sessionRequest2);
LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 10);
- BasicPoolEntryCallback callback1 = new BasicPoolEntryCallback();
- pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback1);
+ Future<LocalPoolEntry> future1 = pool.lease("somehost", null);
pool.requestCompleted(sessionRequest1);
- BasicPoolEntryCallback callback2 = new BasicPoolEntryCallback();
- pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback2);
+ Future<LocalPoolEntry> future2 = pool.lease("somehost", null);
pool.requestCompleted(sessionRequest1);
- BasicPoolEntryCallback callback3 = new BasicPoolEntryCallback();
- pool.lease("otherhost", null, -1, TimeUnit.MILLISECONDS, callback3);
+ Future<LocalPoolEntry> future3 = pool.lease("otherhost", null);
pool.requestCompleted(sessionRequest2);
- LocalPoolEntry entry1 = callback1.getEntry();
+ LocalPoolEntry entry1 = future1.get();
Assert.assertNotNull(entry1);
- LocalPoolEntry entry2 = callback2.getEntry();
+ LocalPoolEntry entry2 = future2.get();
Assert.assertNotNull(entry2);
- LocalPoolEntry entry3 = callback3.getEntry();
+ LocalPoolEntry entry3 = future3.get();
Assert.assertNotNull(entry3);
pool.release(entry1, true);
@@ -360,17 +332,12 @@ public class TestNIOConnPool {
ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 10);
try {
- pool.lease(null, null, 0, TimeUnit.MILLISECONDS, new BasicPoolEntryCallback());
- Assert.fail("IllegalArgumentException should have been thrown");
- } catch (IllegalArgumentException expected) {
- }
- try {
- pool.lease("somehost", null, 0, null, new BasicPoolEntryCallback());
+ pool.lease(null, null, 0, TimeUnit.MILLISECONDS, null);
Assert.fail("IllegalArgumentException should have been thrown");
} catch (IllegalArgumentException expected) {
}
try {
- pool.lease("somehost", null, 0, TimeUnit.MILLISECONDS, null);
+ pool.lease("somehost", null, 0, null, null);
Assert.fail("IllegalArgumentException should have been thrown");
} catch (IllegalArgumentException expected) {
}
@@ -412,21 +379,18 @@ public class TestNIOConnPool {
pool.setMaxPerHost("otherhost", 1);
pool.setTotalMax(3);
- BasicPoolEntryCallback callback1 = new BasicPoolEntryCallback();
- pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback1);
+ Future<LocalPoolEntry> future1 = pool.lease("somehost", null);
pool.requestCompleted(sessionRequest1);
- BasicPoolEntryCallback callback2 = new BasicPoolEntryCallback();
- pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback2);
+ Future<LocalPoolEntry> future2 = pool.lease("somehost", null);
pool.requestCompleted(sessionRequest1);
- BasicPoolEntryCallback callback3 = new BasicPoolEntryCallback();
- pool.lease("otherhost", null, -1, TimeUnit.MILLISECONDS, callback3);
+ Future<LocalPoolEntry> future3 = pool.lease("otherhost", null);
pool.requestCompleted(sessionRequest2);
- LocalPoolEntry entry1 = callback1.getEntry();
+ LocalPoolEntry entry1 = future1.get();
Assert.assertNotNull(entry1);
- LocalPoolEntry entry2 = callback2.getEntry();
+ LocalPoolEntry entry2 = future2.get();
Assert.assertNotNull(entry2);
- LocalPoolEntry entry3 = callback3.getEntry();
+ LocalPoolEntry entry3 = future3.get();
Assert.assertNotNull(entry3);
pool.release(entry1, true);
@@ -438,31 +402,25 @@ public class TestNIOConnPool {
Assert.assertEquals(0, totals.getLeased());
Assert.assertEquals(0, totals.getPending());
- BasicPoolEntryCallback callback4 = new BasicPoolEntryCallback();
- pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback4);
- BasicPoolEntryCallback callback5 = new BasicPoolEntryCallback();
- pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback5);
- BasicPoolEntryCallback callback6 = new BasicPoolEntryCallback();
- pool.lease("otherhost", null, -1, TimeUnit.MILLISECONDS, callback6);
- BasicPoolEntryCallback callback7 = new BasicPoolEntryCallback();
- pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback7);
- BasicPoolEntryCallback callback8 = new BasicPoolEntryCallback();
- pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback8);
- BasicPoolEntryCallback callback9 = new BasicPoolEntryCallback();
- pool.lease("otherhost", null, -1, TimeUnit.MILLISECONDS, callback9);
+ Future<LocalPoolEntry> future4 = pool.lease("somehost", null);
+ Future<LocalPoolEntry> future5 = pool.lease("somehost", null);
+ Future<LocalPoolEntry> future6 = pool.lease("otherhost", null);
+ Future<LocalPoolEntry> future7 = pool.lease("somehost", null);
+ Future<LocalPoolEntry> future8 = pool.lease("somehost", null);
+ Future<LocalPoolEntry> future9 = pool.lease("otherhost", null);
- Assert.assertTrue(callback4.isCompleted());
- LocalPoolEntry entry4 = callback4.getEntry();
+ Assert.assertTrue(future4.isDone());
+ LocalPoolEntry entry4 = future4.get();
Assert.assertNotNull(entry4);
- Assert.assertTrue(callback5.isCompleted());
- LocalPoolEntry entry5 = callback5.getEntry();
+ Assert.assertTrue(future5.isDone());
+ LocalPoolEntry entry5 = future5.get();
Assert.assertNotNull(entry5);
- Assert.assertTrue(callback6.isCompleted());
- LocalPoolEntry entry6 = callback6.getEntry();
+ Assert.assertTrue(future6.isDone());
+ LocalPoolEntry entry6 = future6.get();
Assert.assertNotNull(entry6);
- Assert.assertFalse(callback7.isCompleted());
- Assert.assertFalse(callback8.isCompleted());
- Assert.assertFalse(callback9.isCompleted());
+ Assert.assertFalse(future7.isDone());
+ Assert.assertFalse(future8.isDone());
+ Assert.assertFalse(future9.isDone());
Mockito.verify(ioreactor, Mockito.times(3)).connect(
Mockito.any(SocketAddress.class), Mockito.any(SocketAddress.class),
@@ -472,9 +430,9 @@ public class TestNIOConnPool {
pool.release(entry5, false);
pool.release(entry6, true);
- Assert.assertTrue(callback7.isCompleted());
- Assert.assertFalse(callback8.isCompleted());
- Assert.assertTrue(callback9.isCompleted());
+ Assert.assertTrue(future7.isDone());
+ Assert.assertFalse(future8.isDone());
+ Assert.assertTrue(future9.isDone());
Mockito.verify(ioreactor, Mockito.times(4)).connect(
Mockito.any(SocketAddress.class), Mockito.any(SocketAddress.class),
@@ -520,14 +478,10 @@ public class TestNIOConnPool {
pool.setMaxPerHost("otherhost", 2);
pool.setTotalMax(2);
- BasicPoolEntryCallback callback1 = new BasicPoolEntryCallback();
- pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback1);
- BasicPoolEntryCallback callback2 = new BasicPoolEntryCallback();
- pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback2);
- BasicPoolEntryCallback callback3 = new BasicPoolEntryCallback();
- pool.lease("otherhost", null, -1, TimeUnit.MILLISECONDS, callback3);
- BasicPoolEntryCallback callback4 = new BasicPoolEntryCallback();
- pool.lease("otherhost", null, -1, TimeUnit.MILLISECONDS, callback4);
+ Future<LocalPoolEntry> future1 = pool.lease("somehost", null);
+ Future<LocalPoolEntry> future2 = pool.lease("somehost", null);
+ Future<LocalPoolEntry> future3 = pool.lease("otherhost", null);
+ Future<LocalPoolEntry> future4 = pool.lease("otherhost", null);
Mockito.verify(ioreactor, Mockito.times(2)).connect(
Mockito.eq(InetSocketAddress.createUnresolved("somehost", 80)),
@@ -542,15 +496,15 @@ public class TestNIOConnPool {
pool.requestCompleted(sessionRequest1);
pool.requestCompleted(sessionRequest2);
- Assert.assertTrue(callback1.isCompleted());
- LocalPoolEntry entry1 = callback1.getEntry();
+ Assert.assertTrue(future1.isDone());
+ LocalPoolEntry entry1 = future1.get();
Assert.assertNotNull(entry1);
- Assert.assertTrue(callback2.isCompleted());
- LocalPoolEntry entry2 = callback2.getEntry();
+ Assert.assertTrue(future2.isDone());
+ LocalPoolEntry entry2 = future2.get();
Assert.assertNotNull(entry2);
- Assert.assertFalse(callback3.isCompleted());
- Assert.assertFalse(callback4.isCompleted());
+ Assert.assertFalse(future3.isDone());
+ Assert.assertFalse(future4.isDone());
PoolStats totals = pool.getTotalStats();
Assert.assertEquals(0, totals.getAvailable());
@@ -573,11 +527,11 @@ public class TestNIOConnPool {
pool.requestCompleted(sessionRequest3);
pool.requestCompleted(sessionRequest4);
- Assert.assertTrue(callback3.isCompleted());
- LocalPoolEntry entry3 = callback3.getEntry();
+ Assert.assertTrue(future3.isDone());
+ LocalPoolEntry entry3 = future3.get();
Assert.assertNotNull(entry3);
- Assert.assertTrue(callback4.isCompleted());
- LocalPoolEntry entry4 = callback4.getEntry();
+ Assert.assertTrue(future4.isDone());
+ LocalPoolEntry entry4 = future4.get();
Assert.assertNotNull(entry4);
totals = pool.getTotalStats();
@@ -585,10 +539,8 @@ public class TestNIOConnPool {
Assert.assertEquals(2, totals.getLeased());
Assert.assertEquals(0, totals.getPending());
- BasicPoolEntryCallback callback5 = new BasicPoolEntryCallback();
- pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback5);
- BasicPoolEntryCallback callback6 = new BasicPoolEntryCallback();
- pool.lease("otherhost", null, -1, TimeUnit.MILLISECONDS, callback6);
+ Future<LocalPoolEntry> future5 = pool.lease("somehost", null);
+ Future<LocalPoolEntry> future6 = pool.lease("otherhost", null);
Mockito.verify(ioreactor, Mockito.times(2)).connect(
Mockito.eq(InetSocketAddress.createUnresolved("somehost", 80)),
@@ -615,11 +567,11 @@ public class TestNIOConnPool {
pool.requestCompleted(sessionRequest1);
- Assert.assertTrue(callback5.isCompleted());
- LocalPoolEntry entry5 = callback5.getEntry();
+ Assert.assertTrue(future5.isDone());
+ LocalPoolEntry entry5 = future5.get();
Assert.assertNotNull(entry5);
- Assert.assertTrue(callback6.isCompleted());
- LocalPoolEntry entry6 = callback6.getEntry();
+ Assert.assertTrue(future6.isDone());
+ LocalPoolEntry entry6 = future6.get();
Assert.assertNotNull(entry6);
totals = pool.getTotalStats();
@@ -663,8 +615,7 @@ public class TestNIOConnPool {
LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 2);
- BasicPoolEntryCallback callback1 = new BasicPoolEntryCallback();
- pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback1);
+ Future<LocalPoolEntry> future1 = pool.lease("somehost", null);
Mockito.verify(ioreactor, Mockito.times(1)).connect(
Mockito.any(SocketAddress.class), Mockito.any(SocketAddress.class),
@@ -672,8 +623,8 @@ public class TestNIOConnPool {
pool.requestCompleted(sessionRequest1);
- Assert.assertTrue(callback1.isCompleted());
- LocalPoolEntry entry1 = callback1.getEntry();
+ Assert.assertTrue(future1.isDone());
+ LocalPoolEntry entry1 = future1.get();
Assert.assertNotNull(entry1);
entry1.updateExpiry(1, TimeUnit.MILLISECONDS);
@@ -681,10 +632,9 @@ public class TestNIOConnPool {
Thread.sleep(200L);
- BasicPoolEntryCallback callback2 = new BasicPoolEntryCallback();
- pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback2);
+ Future<LocalPoolEntry> future2 = pool.lease("somehost", null);
- Assert.assertFalse(callback2.isCompleted());
+ Assert.assertFalse(future2.isDone());
Mockito.verify(iosession1).close();
Mockito.verify(ioreactor, Mockito.times(2)).connect(
@@ -722,19 +672,17 @@ public class TestNIOConnPool {
LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 2);
- BasicPoolEntryCallback callback1 = new BasicPoolEntryCallback();
- pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback1);
- BasicPoolEntryCallback callback2 = new BasicPoolEntryCallback();
- pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback2);
+ Future<LocalPoolEntry> future1 = pool.lease("somehost", null);
+ Future<LocalPoolEntry> future2 = pool.lease("somehost", null);
pool.requestCompleted(sessionRequest1);
pool.requestCompleted(sessionRequest2);
- Assert.assertTrue(callback1.isCompleted());
- LocalPoolEntry entry1 = callback1.getEntry();
+ Assert.assertTrue(future1.isDone());
+ LocalPoolEntry entry1 = future1.get();
Assert.assertNotNull(entry1);
- Assert.assertTrue(callback2.isCompleted());
- LocalPoolEntry entry2 = callback2.getEntry();
+ Assert.assertTrue(future2.isDone());
+ LocalPoolEntry entry2 = future2.get();
Assert.assertNotNull(entry2);
entry1.updateExpiry(1, TimeUnit.MILLISECONDS);
@@ -780,19 +728,17 @@ public class TestNIOConnPool {
LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 2);
- BasicPoolEntryCallback callback1 = new BasicPoolEntryCallback();
- pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback1);
- BasicPoolEntryCallback callback2 = new BasicPoolEntryCallback();
- pool.lease("somehost", null, -1, TimeUnit.MILLISECONDS, callback2);
+ Future<LocalPoolEntry> future1 = pool.lease("somehost", null);
+ Future<LocalPoolEntry> future2 = pool.lease("somehost", null);
pool.requestCompleted(sessionRequest1);
pool.requestCompleted(sessionRequest2);
- Assert.assertTrue(callback1.isCompleted());
- LocalPoolEntry entry1 = callback1.getEntry();
+ Assert.assertTrue(future1.isDone());
+ LocalPoolEntry entry1 = future1.get();
Assert.assertNotNull(entry1);
- Assert.assertTrue(callback2.isCompleted());
- LocalPoolEntry entry2 = callback2.getEntry();
+ Assert.assertTrue(future2.isDone());
+ LocalPoolEntry entry2 = future2.get();
Assert.assertNotNull(entry2);
entry1.updateExpiry(0, TimeUnit.MILLISECONDS);
@@ -880,7 +826,7 @@ public class TestNIOConnPool {
pool.shutdown(1000);
Mockito.verify(ioreactor, Mockito.times(1)).shutdown(1000);
try {
- pool.lease("somehost", null, 0, TimeUnit.MILLISECONDS, new BasicPoolEntryCallback());
+ pool.lease("somehost", null);
Assert.fail("IllegalStateException should have been thrown");
} catch (IllegalStateException expected) {
}
Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/pool/TestRouteSpecificPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/pool/TestRouteSpecificPool.java?rev=1152181&r1=1152180&r2=1152181&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/pool/TestRouteSpecificPool.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/pool/TestRouteSpecificPool.java Fri Jul 29 12:13:04 2011
@@ -28,9 +28,11 @@ package org.apache.http.nio.pool;
import java.io.IOException;
import java.net.SocketTimeoutException;
+import java.util.concurrent.ExecutionException;
import junit.framework.Assert;
+import org.apache.http.concurrent.BasicFuture;
import org.apache.http.nio.reactor.IOSession;
import org.apache.http.nio.reactor.SessionRequest;
import org.apache.http.pool.PoolEntry;
@@ -46,62 +48,24 @@ public class TestRouteSpecificPool {
}
}
-
- static class BasicPoolEntryCallback implements PoolEntryCallback<LocalPoolEntry> {
- private LocalPoolEntry entry;
- private Exception ex;
- private boolean completed;
- private boolean failed;
- private boolean cancelled;
-
- public void completed(final LocalPoolEntry entry) {
- this.entry = entry;
- this.completed = true;
- }
-
- public LocalPoolEntry getEntry() {
- return this.entry;
- }
-
- public Exception getException() {
- return this.ex;
- }
-
- public void failed(final Exception ex) {
- this.ex = ex;
- this.failed = true;
- }
-
- public void cancelled() {
- this.cancelled = true;
- }
-
- public boolean isCompleted() {
- return this.completed;
- }
-
- public boolean isFailed() {
- return this.failed;
- }
-
- public boolean isCancelled() {
- return this.cancelled;
- }
-
- }
-
- static class LocalRoutePool extends RouteSpecificPool<String, LocalPoolEntry> {
+ static class LocalRoutePool extends RouteSpecificPool<String, IOSession, LocalPoolEntry> {
public LocalRoutePool() {
super("whatever");
}
@Override
- protected LocalPoolEntry createEntry(String route, IOSession session) {
+ protected LocalPoolEntry createEntry(final String route, final IOSession session) {
return new LocalPoolEntry(route, session);
}
+ @Override
+ protected void closeEntry(final LocalPoolEntry entry) {
+ IOSession session = entry.getConnection();
+ session.close();
+ }
+
};
@Test
@@ -120,18 +84,17 @@ public class TestRouteSpecificPool {
IOSession session = Mockito.mock(IOSession.class);
SessionRequest sessionRequest = Mockito.mock(SessionRequest.class);
Mockito.when(sessionRequest.getSession()).thenReturn(session);
- BasicPoolEntryCallback callback = new BasicPoolEntryCallback();
- pool.addPending(sessionRequest, callback);
+ BasicFuture<LocalPoolEntry> future = new BasicFuture<LocalPoolEntry>(null);
+ pool.addPending(sessionRequest, future);
Assert.assertEquals(1, pool.getAllocatedCount());
Assert.assertEquals(0, pool.getAvailableCount());
Assert.assertEquals(0, pool.getLeasedCount());
Assert.assertEquals(1, pool.getPendingCount());
- LocalPoolEntry entry = pool.completed(sessionRequest);
+ LocalPoolEntry entry = pool.completed(sessionRequest, session);
Assert.assertNotNull(entry);
Assert.assertSame(session, entry.getConnection());
- Assert.assertTrue(callback.isCompleted());
- Assert.assertFalse(callback.isFailed());
- Assert.assertFalse(callback.isCancelled());
+ Assert.assertTrue(future.isDone());
+ Assert.assertFalse(future.isCancelled());
Assert.assertEquals(1, pool.getAllocatedCount());
Assert.assertEquals(0, pool.getAvailableCount());
@@ -142,20 +105,23 @@ public class TestRouteSpecificPool {
@Test
public void testFailedConnect() throws Exception {
LocalRoutePool pool = new LocalRoutePool();
- IOException ex = new IOException();
SessionRequest sessionRequest = Mockito.mock(SessionRequest.class);
- Mockito.when(sessionRequest.getException()).thenReturn(ex);
- BasicPoolEntryCallback callback = new BasicPoolEntryCallback();
- pool.addPending(sessionRequest, callback);
+ Mockito.when(sessionRequest.getException()).thenReturn(new IOException());
+ BasicFuture<LocalPoolEntry> future = new BasicFuture<LocalPoolEntry>(null);
+ pool.addPending(sessionRequest, future);
Assert.assertEquals(1, pool.getAllocatedCount());
Assert.assertEquals(0, pool.getAvailableCount());
Assert.assertEquals(0, pool.getLeasedCount());
Assert.assertEquals(1, pool.getPendingCount());
pool.failed(sessionRequest);
- Assert.assertFalse(callback.isCompleted());
- Assert.assertTrue(callback.isFailed());
- Assert.assertFalse(callback.isCancelled());
-
+ Assert.assertTrue(future.isDone());
+ Assert.assertFalse(future.isCancelled());
+ try {
+ future.get();
+ Assert.fail("ExecutionException should have been thrown");
+ } catch (ExecutionException ex) {
+ Assert.assertTrue(ex.getCause() instanceof IOException);
+ }
Assert.assertEquals(0, pool.getAllocatedCount());
Assert.assertEquals(0, pool.getAvailableCount());
Assert.assertEquals(0, pool.getLeasedCount());
@@ -166,16 +132,15 @@ public class TestRouteSpecificPool {
public void testCancelledConnect() throws Exception {
LocalRoutePool pool = new LocalRoutePool();
SessionRequest sessionRequest = Mockito.mock(SessionRequest.class);
- BasicPoolEntryCallback callback = new BasicPoolEntryCallback();
- pool.addPending(sessionRequest, callback);
+ BasicFuture<LocalPoolEntry> future = new BasicFuture<LocalPoolEntry>(null);
+ pool.addPending(sessionRequest, future);
Assert.assertEquals(1, pool.getAllocatedCount());
Assert.assertEquals(0, pool.getAvailableCount());
Assert.assertEquals(0, pool.getLeasedCount());
Assert.assertEquals(1, pool.getPendingCount());
pool.cancelled(sessionRequest);
- Assert.assertFalse(callback.isCompleted());
- Assert.assertFalse(callback.isFailed());
- Assert.assertTrue(callback.isCancelled());
+ Assert.assertTrue(future.isDone());
+ Assert.assertTrue(future.isCancelled());
Assert.assertEquals(0, pool.getAllocatedCount());
Assert.assertEquals(0, pool.getAvailableCount());
@@ -187,18 +152,21 @@ public class TestRouteSpecificPool {
public void testConnectTimeout() throws Exception {
LocalRoutePool pool = new LocalRoutePool();
SessionRequest sessionRequest = Mockito.mock(SessionRequest.class);
- BasicPoolEntryCallback callback = new BasicPoolEntryCallback();
- pool.addPending(sessionRequest, callback);
+ BasicFuture<LocalPoolEntry> future = new BasicFuture<LocalPoolEntry>(null);
+ pool.addPending(sessionRequest, future);
Assert.assertEquals(1, pool.getAllocatedCount());
Assert.assertEquals(0, pool.getAvailableCount());
Assert.assertEquals(0, pool.getLeasedCount());
Assert.assertEquals(1, pool.getPendingCount());
pool.timeout(sessionRequest);
- Assert.assertFalse(callback.isCompleted());
- Assert.assertTrue(callback.isFailed());
- Assert.assertFalse(callback.isCancelled());
- Assert.assertTrue(callback.getException() instanceof SocketTimeoutException);
-
+ Assert.assertTrue(future.isDone());
+ Assert.assertFalse(future.isCancelled());
+ try {
+ future.get();
+ Assert.fail("ExecutionException should have been thrown");
+ } catch (ExecutionException ex) {
+ Assert.assertTrue(ex.getCause() instanceof SocketTimeoutException);
+ }
Assert.assertEquals(0, pool.getAllocatedCount());
Assert.assertEquals(0, pool.getAvailableCount());
Assert.assertEquals(0, pool.getLeasedCount());
@@ -211,26 +179,29 @@ public class TestRouteSpecificPool {
IOSession session1 = Mockito.mock(IOSession.class);
SessionRequest sessionRequest1 = Mockito.mock(SessionRequest.class);
Mockito.when(sessionRequest1.getSession()).thenReturn(session1);
- pool.addPending(sessionRequest1, new BasicPoolEntryCallback());
+ BasicFuture<LocalPoolEntry> future1 = new BasicFuture<LocalPoolEntry>(null);
+ pool.addPending(sessionRequest1, future1);
IOSession session2 = Mockito.mock(IOSession.class);
SessionRequest sessionRequest2 = Mockito.mock(SessionRequest.class);
Mockito.when(sessionRequest2.getSession()).thenReturn(session2);
- pool.addPending(sessionRequest2, new BasicPoolEntryCallback());
+ BasicFuture<LocalPoolEntry> future2 = new BasicFuture<LocalPoolEntry>(null);
+ pool.addPending(sessionRequest2, future2);
IOSession session3 = Mockito.mock(IOSession.class);
SessionRequest sessionRequest3 = Mockito.mock(SessionRequest.class);
Mockito.when(sessionRequest3.getSession()).thenReturn(session3);
- pool.addPending(sessionRequest3, new BasicPoolEntryCallback());
+ BasicFuture<LocalPoolEntry> future3 = new BasicFuture<LocalPoolEntry>(null);
+ pool.addPending(sessionRequest3, future3);
Assert.assertEquals(3, pool.getAllocatedCount());
Assert.assertEquals(0, pool.getAvailableCount());
Assert.assertEquals(0, pool.getLeasedCount());
Assert.assertEquals(3, pool.getPendingCount());
- LocalPoolEntry entry1 = pool.completed(sessionRequest1);
+ LocalPoolEntry entry1 = pool.completed(sessionRequest1, session1);
Assert.assertNotNull(entry1);
- LocalPoolEntry entry2 = pool.completed(sessionRequest2);
+ LocalPoolEntry entry2 = pool.completed(sessionRequest2, session2);
Assert.assertNotNull(entry2);
- LocalPoolEntry entry3 = pool.completed(sessionRequest3);
+ LocalPoolEntry entry3 = pool.completed(sessionRequest3, session3);
Assert.assertNotNull(entry3);
Assert.assertEquals(3, pool.getAllocatedCount());
@@ -260,24 +231,28 @@ public class TestRouteSpecificPool {
@Test
public void testLeaseReleaseStateful() throws Exception {
LocalRoutePool pool = new LocalRoutePool();
+
IOSession session1 = Mockito.mock(IOSession.class);
SessionRequest sessionRequest1 = Mockito.mock(SessionRequest.class);
Mockito.when(sessionRequest1.getSession()).thenReturn(session1);
- pool.addPending(sessionRequest1, new BasicPoolEntryCallback());
+ BasicFuture<LocalPoolEntry> future1 = new BasicFuture<LocalPoolEntry>(null);
+ pool.addPending(sessionRequest1, future1);
IOSession session2 = Mockito.mock(IOSession.class);
SessionRequest sessionRequest2 = Mockito.mock(SessionRequest.class);
Mockito.when(sessionRequest2.getSession()).thenReturn(session2);
- pool.addPending(sessionRequest2, new BasicPoolEntryCallback());
+ BasicFuture<LocalPoolEntry> future2 = new BasicFuture<LocalPoolEntry>(null);
+ pool.addPending(sessionRequest2, future2);
IOSession session3 = Mockito.mock(IOSession.class);
SessionRequest sessionRequest3 = Mockito.mock(SessionRequest.class);
Mockito.when(sessionRequest3.getSession()).thenReturn(session3);
- pool.addPending(sessionRequest3, new BasicPoolEntryCallback());
+ BasicFuture<LocalPoolEntry> future3 = new BasicFuture<LocalPoolEntry>(null);
+ pool.addPending(sessionRequest3, future3);
- LocalPoolEntry entry1 = pool.completed(sessionRequest1);
+ LocalPoolEntry entry1 = pool.completed(sessionRequest1, session1);
Assert.assertNotNull(entry1);
- LocalPoolEntry entry2 = pool.completed(sessionRequest2);
+ LocalPoolEntry entry2 = pool.completed(sessionRequest2, session2);
Assert.assertNotNull(entry2);
- LocalPoolEntry entry3 = pool.completed(sessionRequest3);
+ LocalPoolEntry entry3 = pool.completed(sessionRequest3, session3);
Assert.assertNotNull(entry3);
entry2.setState(Boolean.FALSE);
@@ -319,26 +294,29 @@ public class TestRouteSpecificPool {
IOSession session1 = Mockito.mock(IOSession.class);
SessionRequest sessionRequest1 = Mockito.mock(SessionRequest.class);
Mockito.when(sessionRequest1.getSession()).thenReturn(session1);
- pool.addPending(sessionRequest1, new BasicPoolEntryCallback());
+ BasicFuture<LocalPoolEntry> future1 = new BasicFuture<LocalPoolEntry>(null);
+ pool.addPending(sessionRequest1, future1);
IOSession session2 = Mockito.mock(IOSession.class);
SessionRequest sessionRequest2 = Mockito.mock(SessionRequest.class);
Mockito.when(sessionRequest2.getSession()).thenReturn(session2);
- pool.addPending(sessionRequest2, new BasicPoolEntryCallback());
+ BasicFuture<LocalPoolEntry> future2 = new BasicFuture<LocalPoolEntry>(null);
+ pool.addPending(sessionRequest2, future2);
IOSession session3 = Mockito.mock(IOSession.class);
SessionRequest sessionRequest3 = Mockito.mock(SessionRequest.class);
Mockito.when(sessionRequest3.getSession()).thenReturn(session3);
- pool.addPending(sessionRequest3, new BasicPoolEntryCallback());
+ BasicFuture<LocalPoolEntry> future3 = new BasicFuture<LocalPoolEntry>(null);
+ pool.addPending(sessionRequest3, future3);
Assert.assertEquals(3, pool.getAllocatedCount());
Assert.assertEquals(0, pool.getAvailableCount());
Assert.assertEquals(0, pool.getLeasedCount());
Assert.assertEquals(3, pool.getPendingCount());
- LocalPoolEntry entry1 = pool.completed(sessionRequest1);
+ LocalPoolEntry entry1 = pool.completed(sessionRequest1, session1);
Assert.assertNotNull(entry1);
- LocalPoolEntry entry2 = pool.completed(sessionRequest2);
+ LocalPoolEntry entry2 = pool.completed(sessionRequest2, session2);
Assert.assertNotNull(entry2);
- LocalPoolEntry entry3 = pool.completed(sessionRequest3);
+ LocalPoolEntry entry3 = pool.completed(sessionRequest3, session3);
Assert.assertNotNull(entry3);
Assert.assertEquals(3, pool.getAllocatedCount());
@@ -389,19 +367,22 @@ public class TestRouteSpecificPool {
IOSession session1 = Mockito.mock(IOSession.class);
SessionRequest sessionRequest1 = Mockito.mock(SessionRequest.class);
Mockito.when(sessionRequest1.getSession()).thenReturn(session1);
- pool.addPending(sessionRequest1, new BasicPoolEntryCallback());
+ BasicFuture<LocalPoolEntry> future1 = new BasicFuture<LocalPoolEntry>(null);
+ pool.addPending(sessionRequest1, future1);
IOSession session2 = Mockito.mock(IOSession.class);
SessionRequest sessionRequest2 = Mockito.mock(SessionRequest.class);
Mockito.when(sessionRequest2.getSession()).thenReturn(session2);
- pool.addPending(sessionRequest2, new BasicPoolEntryCallback());
+ BasicFuture<LocalPoolEntry> future2 = new BasicFuture<LocalPoolEntry>(null);
+ pool.addPending(sessionRequest2, future2);
IOSession session3 = Mockito.mock(IOSession.class);
SessionRequest sessionRequest3 = Mockito.mock(SessionRequest.class);
Mockito.when(sessionRequest3.getSession()).thenReturn(session3);
- pool.addPending(sessionRequest3, new BasicPoolEntryCallback());
+ BasicFuture<LocalPoolEntry> future3 = new BasicFuture<LocalPoolEntry>(null);
+ pool.addPending(sessionRequest3, future3);
- LocalPoolEntry entry1 = pool.completed(sessionRequest1);
+ LocalPoolEntry entry1 = pool.completed(sessionRequest1, session1);
Assert.assertNotNull(entry1);
- LocalPoolEntry entry2 = pool.completed(sessionRequest2);
+ LocalPoolEntry entry2 = pool.completed(sessionRequest2, session2);
Assert.assertNotNull(entry2);
pool.free(entry1, true);