You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2011/06/30 10:13:48 UTC
svn commit: r1141434 - in
/httpcomponents/httpasyncclient/trunk/httpasyncclient/src:
main/java/org/apache/http/impl/nio/conn/
main/java/org/apache/http/impl/nio/pool/
test/java/org/apache/http/impl/nio/pool/
Author: olegk
Date: Thu Jun 30 08:13:48 2011
New Revision: 1141434
URL: http://svn.apache.org/viewvc?rev=1141434&view=rev
Log:
Non-blocking HTTP connection pool improvements and code cleanups
Removed:
httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/pool/PoolEntryFactory.java
httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/pool/RouteResolver.java
Modified:
httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/conn/ClientConnAdaptor.java
httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/conn/HttpPoolEntry.java
httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/conn/HttpSessionPool.java
httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/pool/PoolEntry.java
httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/pool/RouteSpecificPool.java
httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/pool/SessionPool.java
httpcomponents/httpasyncclient/trunk/httpasyncclient/src/test/java/org/apache/http/impl/nio/pool/TestRouteSpecificPool.java
httpcomponents/httpasyncclient/trunk/httpasyncclient/src/test/java/org/apache/http/impl/nio/pool/TestSessionPool.java
Modified: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/conn/ClientConnAdaptor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/conn/ClientConnAdaptor.java?rev=1141434&r1=1141433&r2=1141434&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/conn/ClientConnAdaptor.java (original)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/conn/ClientConnAdaptor.java Thu Jun 30 08:13:48 2011
@@ -78,10 +78,7 @@ class ClientConnAdaptor implements Manag
this.reusable = true;
this.expiry = -1;
this.tunit = TimeUnit.MILLISECONDS;
-
- IOSession iosession = entry.getIOSession();
- this.conn = (OperatedClientConnection) iosession.getAttribute(
- ExecutionContext.HTTP_CONNECTION);
+ this.conn = entry.getConnection();
}
protected ClientConnectionManager getManager() {
Modified: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/conn/HttpPoolEntry.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/conn/HttpPoolEntry.java?rev=1141434&r1=1141433&r2=1141434&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/conn/HttpPoolEntry.java (original)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/conn/HttpPoolEntry.java Thu Jun 30 08:13:48 2011
@@ -33,7 +33,9 @@ import org.apache.commons.logging.Log;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.conn.routing.RouteTracker;
import org.apache.http.impl.nio.pool.PoolEntry;
+import org.apache.http.nio.conn.OperatedClientConnection;
import org.apache.http.nio.reactor.IOSession;
+import org.apache.http.protocol.ExecutionContext;
class HttpPoolEntry extends PoolEntry<HttpRoute> {
@@ -83,4 +85,9 @@ class HttpPoolEntry extends PoolEntry<Ht
return expired;
}
+ public OperatedClientConnection getConnection() {
+ return (OperatedClientConnection) getIOSession().getAttribute(
+ ExecutionContext.HTTP_CONNECTION);
+ }
+
}
Modified: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/conn/HttpSessionPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/conn/HttpSessionPool.java?rev=1141434&r1=1141433&r2=1141434&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/conn/HttpSessionPool.java (original)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/conn/HttpSessionPool.java Thu Jun 30 08:13:48 2011
@@ -26,15 +26,15 @@
*/
package org.apache.http.impl.nio.conn;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
+import org.apache.http.HttpConnection;
import org.apache.http.HttpHost;
import org.apache.http.conn.routing.HttpRoute;
-import org.apache.http.impl.nio.pool.PoolEntryFactory;
-import org.apache.http.impl.nio.pool.RouteResolver;
import org.apache.http.impl.nio.pool.SessionPool;
import org.apache.http.nio.conn.scheme.Scheme;
import org.apache.http.nio.conn.scheme.SchemeRegistry;
@@ -43,63 +43,58 @@ import org.apache.http.nio.reactor.IOSes
class HttpSessionPool extends SessionPool<HttpRoute, HttpPoolEntry> {
+ private final Log log;
+ private final SchemeRegistry schemeRegistry;
+ private final long connTimeToLive;
+ private final TimeUnit tunit;
+
HttpSessionPool(
final Log log,
final ConnectingIOReactor ioreactor,
final SchemeRegistry schemeRegistry,
- long timeToLive, final TimeUnit tunit) {
- super(ioreactor,
- new InternalEntryFactory(log, timeToLive, tunit),
- new InternalRouteResolver(schemeRegistry),
- 20, 50);
+ long connTimeToLive, final TimeUnit tunit) {
+ super(ioreactor, 20, 50);
+ this.log = log;
+ this.schemeRegistry = schemeRegistry;
+ this.connTimeToLive = connTimeToLive;
+ this.tunit = tunit;
}
- static class InternalRouteResolver implements RouteResolver<HttpRoute> {
-
- private final SchemeRegistry schemeRegistry;
-
- InternalRouteResolver(final SchemeRegistry schemeRegistry) {
- super();
- this.schemeRegistry = schemeRegistry;
- }
+ @Override
+ protected SocketAddress resolveLocalAddress(final HttpRoute route) {
+ return new InetSocketAddress(route.getLocalAddress(), 0);
+ }
- public SocketAddress resolveLocalAddress(final HttpRoute route) {
- return new InetSocketAddress(route.getLocalAddress(), 0);
+ @Override
+ protected SocketAddress resolveRemoteAddress(final HttpRoute route) {
+ HttpHost firsthop = route.getProxyHost();
+ if (firsthop == null) {
+ firsthop = route.getTargetHost();
}
-
- public SocketAddress resolveRemoteAddress(final HttpRoute route) {
- HttpHost firsthop = route.getProxyHost();
- if (firsthop == null) {
- firsthop = route.getTargetHost();
- }
- String hostname = firsthop.getHostName();
- int port = firsthop.getPort();
- if (port < 0) {
- Scheme scheme = this.schemeRegistry.getScheme(firsthop);
- port = scheme.resolvePort(port);
- }
- return new InetSocketAddress(hostname, port);
+ String hostname = firsthop.getHostName();
+ int port = firsthop.getPort();
+ if (port < 0) {
+ Scheme scheme = this.schemeRegistry.getScheme(firsthop);
+ port = scheme.resolvePort(port);
}
-
+ return new InetSocketAddress(hostname, port);
}
- static class InternalEntryFactory implements PoolEntryFactory<HttpRoute, HttpPoolEntry> {
-
- private final Log log;
- private final long connTimeToLive;
- private final TimeUnit tunit;
-
- InternalEntryFactory(final Log log, final long connTimeToLive, final TimeUnit tunit) {
- super();
- this.log = log;
- this.connTimeToLive = connTimeToLive;
- this.tunit = tunit;
- }
+ @Override
+ protected HttpPoolEntry createEntry(final HttpRoute route, final IOSession session) {
+ return new HttpPoolEntry(this.log, route, session, this.connTimeToLive, this.tunit);
+ }
- public HttpPoolEntry createEntry(final HttpRoute route, final IOSession session) {
- return new HttpPoolEntry(this.log, route, session, this.connTimeToLive, this.tunit);
+ @Override
+ protected void closeEntry(final HttpPoolEntry entry) {
+ HttpConnection conn = entry.getConnection();
+ try {
+ conn.close();
+ } catch (IOException ex) {
+ if (this.log.isDebugEnabled()) {
+ this.log.debug("I/O error closing connection", ex);
+ }
}
-
- };
+ }
}
Modified: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/pool/PoolEntry.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/pool/PoolEntry.java?rev=1141434&r1=1141433&r2=1141434&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/pool/PoolEntry.java (original)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/pool/PoolEntry.java Thu Jun 30 08:13:48 2011
@@ -31,7 +31,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.http.nio.reactor.IOSession;
-public abstract class PoolEntry<T> {
+public class PoolEntry<T> {
private static AtomicLong COUNTER = new AtomicLong();
Modified: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/pool/RouteSpecificPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/pool/RouteSpecificPool.java?rev=1141434&r1=1141433&r2=1141434&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/pool/RouteSpecificPool.java (original)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/pool/RouteSpecificPool.java Thu Jun 30 08:13:48 2011
@@ -37,23 +37,23 @@ import java.util.Set;
import org.apache.http.nio.reactor.IOSession;
import org.apache.http.nio.reactor.SessionRequest;
-class RouteSpecificPool<T, E extends PoolEntry<T>> {
+abstract class RouteSpecificPool<T, E extends PoolEntry<T>> {
private final T route;
- private final PoolEntryFactory<T, E> factory;
private final Set<E> leasedSessions;
private final LinkedList<E> availableSessions;
private final Map<SessionRequest, PoolEntryCallback<T, E>> pendingSessions;
- public RouteSpecificPool(final T route, final PoolEntryFactory<T, E> factory) {
+ RouteSpecificPool(final T route) {
super();
this.route = route;
- this.factory = factory;
this.leasedSessions = new HashSet<E>();
this.availableSessions = new LinkedList<E>();
this.pendingSessions = new HashMap<SessionRequest, PoolEntryCallback<T, E>>();
}
+ protected abstract E createEntry(T route, IOSession session);
+
public int getLeasedCount() {
return this.leasedSessions.size();
}
@@ -139,7 +139,7 @@ class RouteSpecificPool<T, E extends Poo
public E completed(final SessionRequest request) {
PoolEntryCallback<T, E> callback = removeRequest(request);
IOSession iosession = request.getSession();
- E entry = this.factory.createEntry(this.route, iosession);
+ E entry = createEntry(this.route, iosession);
this.leasedSessions.add(entry);
callback.completed(entry);
return entry;
Modified: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/pool/SessionPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/pool/SessionPool.java?rev=1141434&r1=1141433&r2=1141434&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/pool/SessionPool.java (original)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/pool/SessionPool.java Thu Jun 30 08:13:48 2011
@@ -27,6 +27,7 @@
package org.apache.http.impl.nio.pool;
import java.io.IOException;
+import java.net.SocketAddress;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -47,13 +48,11 @@ import org.apache.http.nio.reactor.Sessi
public abstract class SessionPool<T, E extends PoolEntry<T>> {
private final ConnectingIOReactor ioreactor;
- private final PoolEntryFactory<T, E> factory;
private final SessionRequestCallback sessionRequestCallback;
- private final RouteResolver<T> routeResolver;
private final Map<T, RouteSpecificPool<T, E>> routeToPool;
private final LinkedList<LeaseRequest<T, E>> leasingRequests;
private final Set<SessionRequest> pendingSessions;
- private final Set<PoolEntry<T>> leasedSessions;
+ private final Set<E> leasedSessions;
private final LinkedList<E> availableSessions;
private final Map<T, Integer> maxPerRoute;
private final Lock lock;
@@ -64,20 +63,12 @@ public abstract class SessionPool<T, E e
public SessionPool(
final ConnectingIOReactor ioreactor,
- final PoolEntryFactory<T, E> factory,
- final RouteResolver<T> routeResolver,
int defaultMaxPerRoute,
int maxTotal) {
super();
if (ioreactor == null) {
throw new IllegalArgumentException("I/O reactor may not be null");
}
- if (factory == null) {
- throw new IllegalArgumentException("Pool entry factory may not be null");
- }
- if (routeResolver == null) {
- throw new IllegalArgumentException("Route resolver may not be null");
- }
if (defaultMaxPerRoute <= 0) {
throw new IllegalArgumentException("Max per route value may not be negative or zero");
}
@@ -85,13 +76,11 @@ public abstract class SessionPool<T, E e
throw new IllegalArgumentException("Max total value may not be negative or zero");
}
this.ioreactor = ioreactor;
- this.factory = factory;
this.sessionRequestCallback = new InternalSessionRequestCallback();
- this.routeResolver = routeResolver;
this.routeToPool = new HashMap<T, RouteSpecificPool<T, E>>();
this.leasingRequests = new LinkedList<LeaseRequest<T, E>>();
this.pendingSessions = new HashSet<SessionRequest>();
- this.leasedSessions = new HashSet<PoolEntry<T>>();
+ this.leasedSessions = new HashSet<E>();
this.availableSessions = new LinkedList<E>();
this.maxPerRoute = new HashMap<T, Integer>();
this.lock = new ReentrantLock();
@@ -99,6 +88,14 @@ public abstract class SessionPool<T, E e
this.maxTotal = maxTotal;
}
+ protected abstract SocketAddress resolveRemoteAddress(T route);
+
+ protected abstract SocketAddress resolveLocalAddress(T route);
+
+ protected abstract E createEntry(T route, IOSession session);
+
+ protected abstract void closeEntry(E entry);
+
public boolean isShutdown() {
return this.isShutDown;
}
@@ -110,11 +107,19 @@ public abstract class SessionPool<T, E e
this.isShutDown = true;
this.lock.lock();
try {
+ for (SessionRequest sessionRequest: this.pendingSessions) {
+ sessionRequest.cancel();
+ }
+ for (E entry: this.availableSessions) {
+ closeEntry(entry);
+ }
+ for (E entry: this.leasedSessions) {
+ closeEntry(entry);
+ }
for (RouteSpecificPool<T, E> pool: this.routeToPool.values()) {
pool.shutdown();
}
this.routeToPool.clear();
-
this.leasedSessions.clear();
this.pendingSessions.clear();
this.availableSessions.clear();
@@ -128,7 +133,14 @@ public abstract class SessionPool<T, E e
private RouteSpecificPool<T, E> getPool(final T route) {
RouteSpecificPool<T, E> pool = this.routeToPool.get(route);
if (pool == null) {
- pool = new RouteSpecificPool<T, E>(route, this.factory);
+ pool = new RouteSpecificPool<T, E>(route) {
+
+ @Override
+ protected E createEntry(final T route, final IOSession session) {
+ return SessionPool.this.createEntry(route, session);
+ }
+
+ };
this.routeToPool.put(route, pool);
}
return pool;
@@ -176,6 +188,8 @@ public abstract class SessionPool<T, E e
pool.freeEntry(entry, reusable);
if (reusable) {
this.availableSessions.add(entry);
+ } else {
+ closeEntry(entry);
}
processPendingRequests();
}
@@ -201,11 +215,8 @@ public abstract class SessionPool<T, E e
if (entry == null) {
break;
}
- IOSession iosession = entry.getIOSession();
if (entry.isExpired(System.currentTimeMillis())) {
- iosession.close();
- }
- if (iosession.isClosed()) {
+ closeEntry(entry);
this.availableSessions.remove(entry);
pool.freeEntry(entry, false);
} else {
@@ -231,8 +242,8 @@ public abstract class SessionPool<T, E e
}
it.remove();
SessionRequest sessionRequest = this.ioreactor.connect(
- this.routeResolver.resolveRemoteAddress(route),
- this.routeResolver.resolveLocalAddress(route),
+ resolveRemoteAddress(route),
+ resolveLocalAddress(route),
route,
this.sessionRequestCallback);
sessionRequest.setConnectTimeout(timeout);
@@ -245,8 +256,7 @@ public abstract class SessionPool<T, E e
private void dropLastUsed() {
if (!this.availableSessions.isEmpty()) {
E entry = this.availableSessions.removeFirst();
- IOSession iosession = entry.getIOSession();
- iosession.close();
+ closeEntry(entry);
RouteSpecificPool<T, E> pool = getPool(entry.getRoute());
pool.remove(entry);
}
@@ -262,7 +272,7 @@ public abstract class SessionPool<T, E e
try {
this.pendingSessions.remove(request);
RouteSpecificPool<T, E> pool = getPool(route);
- PoolEntry<T> entry = pool.completed(request);
+ E entry = pool.completed(request);
this.leasedSessions.add(entry);
} finally {
this.lock.unlock();
@@ -410,8 +420,7 @@ public abstract class SessionPool<T, E e
while (it.hasNext()) {
E entry = it.next();
if (entry.getUpdated() <= deadline) {
- IOSession iosession = entry.getIOSession();
- iosession.close();
+ closeEntry(entry);
RouteSpecificPool<T, E> pool = getPool(entry.getRoute());
pool.remove(entry);
it.remove();
@@ -431,8 +440,7 @@ public abstract class SessionPool<T, E e
while (it.hasNext()) {
E entry = it.next();
if (entry.isExpired(now)) {
- IOSession iosession = entry.getIOSession();
- iosession.close();
+ closeEntry(entry);
RouteSpecificPool<T, E> pool = getPool(entry.getRoute());
pool.remove(entry);
it.remove();
Modified: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/test/java/org/apache/http/impl/nio/pool/TestRouteSpecificPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/test/java/org/apache/http/impl/nio/pool/TestRouteSpecificPool.java?rev=1141434&r1=1141433&r2=1141434&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/test/java/org/apache/http/impl/nio/pool/TestRouteSpecificPool.java (original)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/test/java/org/apache/http/impl/nio/pool/TestRouteSpecificPool.java Thu Jun 30 08:13:48 2011
@@ -39,27 +39,15 @@ import org.mockito.Mockito;
public class TestRouteSpecificPool {
- static class LocalPoolEntry extends PoolEntry<String> {
-
- public LocalPoolEntry(final String route, final IOSession iosession,
- long timeToLive, final TimeUnit tunit) {
- super(route, iosession, timeToLive, tunit);
- }
-
- }
-
- static class LocalPoolEntryFactory implements PoolEntryFactory<String, PoolEntry<String>> {
-
- public PoolEntry<String> createEntry(final String route, final IOSession session) {
- return new LocalPoolEntry(route, session, 0L, TimeUnit.MILLISECONDS);
- }
-
- };
-
static class LocalRoutePool extends RouteSpecificPool<String, PoolEntry<String>> {
public LocalRoutePool() {
- super("whatever", new LocalPoolEntryFactory());
+ super("whatever");
+ }
+
+ @Override
+ protected PoolEntry<String> createEntry(String route, IOSession session) {
+ return new PoolEntry<String>(route, session, 0L, TimeUnit.MILLISECONDS);
}
};
@@ -269,7 +257,7 @@ public class TestRouteSpecificPool {
public void testReleaseInvalidEntry() throws Exception {
LocalRoutePool pool = new LocalRoutePool();
IOSession session = Mockito.mock(IOSession.class);
- LocalPoolEntry entry = new LocalPoolEntry("whatever", session, 0L, TimeUnit.MILLISECONDS);
+ PoolEntry<String> entry = new PoolEntry<String>("whatever", session, 0L, TimeUnit.MILLISECONDS);
pool.freeEntry(entry, true);
}
Modified: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/test/java/org/apache/http/impl/nio/pool/TestSessionPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/test/java/org/apache/http/impl/nio/pool/TestSessionPool.java?rev=1141434&r1=1141433&r2=1141434&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/test/java/org/apache/http/impl/nio/pool/TestSessionPool.java (original)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/test/java/org/apache/http/impl/nio/pool/TestSessionPool.java Thu Jun 30 08:13:48 2011
@@ -43,48 +43,32 @@ import org.mockito.Mockito;
public class TestSessionPool {
- static class LocalPoolEntry extends PoolEntry<String> {
-
- public LocalPoolEntry(final String route, final IOSession iosession,
- long timeToLive, final TimeUnit tunit) {
- super(route, iosession, timeToLive, tunit);
- }
-
- }
-
- static class LocalPoolEntryFactory implements PoolEntryFactory<String, PoolEntry<String>> {
+ static class LocalSessionPool extends SessionPool<String, PoolEntry<String>> {
- public PoolEntry<String> createEntry(final String route, final IOSession session) {
- return new LocalPoolEntry(route, session, 0L, TimeUnit.MILLISECONDS);
+ public LocalSessionPool(
+ final ConnectingIOReactor ioreactor, int defaultMaxPerRoute, int maxTotal) {
+ super(ioreactor, defaultMaxPerRoute, maxTotal);
}
- };
-
- static class LocalRouteResolver implements RouteResolver<String> {
-
- public SocketAddress resolveRemoteAddress(final String route) {
+ @Override
+ protected SocketAddress resolveRemoteAddress(final String route) {
return InetSocketAddress.createUnresolved(route, 80);
}
- public SocketAddress resolveLocalAddress(final String route) {
+ @Override
+ protected SocketAddress resolveLocalAddress(final String route) {
return InetSocketAddress.createUnresolved(route, 80);
}
- };
-
- static class LocalSessionPool extends SessionPool<String, PoolEntry<String>> {
-
- public LocalSessionPool(
- final ConnectingIOReactor ioreactor,
- final PoolEntryFactory<String, PoolEntry<String>> factory,
- final RouteResolver<String> routeResolver,
- int defaultMaxPerRoute, int maxTotal) {
- super(ioreactor, factory, routeResolver, defaultMaxPerRoute, maxTotal);
+ @Override
+ protected PoolEntry<String> createEntry(final String route, final IOSession session) {
+ return new PoolEntry<String>(route, session, 0L, TimeUnit.MILLISECONDS);
}
- public LocalSessionPool(
- final ConnectingIOReactor ioreactor, int defaultMaxPerRoute, int maxTotal) {
- super(ioreactor, new LocalPoolEntryFactory(), new LocalRouteResolver(), defaultMaxPerRoute, maxTotal);
+ @Override
+ protected void closeEntry(final PoolEntry<String> entry) {
+ IOSession session = entry.getIOSession();
+ session.close();
}
}
@@ -118,27 +102,17 @@ public class TestSessionPool {
public void testInvalidConstruction() throws Exception {
ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
try {
- new LocalSessionPool(null, new LocalPoolEntryFactory(), new LocalRouteResolver(), 1, 1);
- Assert.fail("IllegalArgumentException should have been thrown");
- } catch (IllegalArgumentException expected) {
- }
- try {
- new LocalSessionPool(ioreactor, null, new LocalRouteResolver(), 1, 1);
- Assert.fail("IllegalArgumentException should have been thrown");
- } catch (IllegalArgumentException expected) {
- }
- try {
- new LocalSessionPool(ioreactor, new LocalPoolEntryFactory(), null, 1, 1);
+ new LocalSessionPool(null, 1, 1);
Assert.fail("IllegalArgumentException should have been thrown");
} catch (IllegalArgumentException expected) {
}
try {
- new LocalSessionPool(ioreactor, new LocalPoolEntryFactory(), new LocalRouteResolver(), -1, 1);
+ new LocalSessionPool(ioreactor, -1, 1);
Assert.fail("IllegalArgumentException should have been thrown");
} catch (IllegalArgumentException expected) {
}
try {
- new LocalSessionPool(ioreactor, new LocalPoolEntryFactory(), new LocalRouteResolver(), 1, -1);
+ new LocalSessionPool(ioreactor, 1, -1);
Assert.fail("IllegalArgumentException should have been thrown");
} catch (IllegalArgumentException expected) {
}
@@ -319,6 +293,8 @@ public class TestSessionPool {
pool.release(entry1, true);
pool.release(entry2, true);
pool.release(entry3, false);
+ Mockito.verify(iosession1, Mockito.never()).close();
+ Mockito.verify(iosession2, Mockito.times(1)).close();
PoolStats totals = pool.getTotalStats();
Assert.assertEquals(2, totals.getAvailable());
@@ -351,7 +327,7 @@ public class TestSessionPool {
public void testReleaseUnknownEntry() throws Exception {
ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 2);
- pool.release(new LocalPoolEntry(
+ pool.release(new PoolEntry<String>(
"somehost", Mockito.mock(IOSession.class), 1000, TimeUnit.SECONDS), true);
}
@@ -857,7 +833,8 @@ public class TestSessionPool {
} catch (IllegalStateException expected) {
}
// Ignored if shut down
- pool.release(new LocalPoolEntry("somehost", Mockito.mock(IOSession.class), 1000, TimeUnit.SECONDS), true);
+ pool.release(new PoolEntry<String>(
+ "somehost", Mockito.mock(IOSession.class), 1000, TimeUnit.SECONDS), true);
pool.requestCompleted(Mockito.mock(SessionRequest.class));
pool.requestFailed(Mockito.mock(SessionRequest.class));
pool.requestCancelled(Mockito.mock(SessionRequest.class));