You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2014/04/15 12:54:19 UTC
svn commit: r1587508 - in
/httpcomponents/httpasyncclient/trunk/httpasyncclient/src:
main/java/org/apache/http/impl/nio/client/
test/java/org/apache/http/nio/client/integration/
Author: olegk
Date: Tue Apr 15 10:54:19 2014
New Revision: 1587508
URL: http://svn.apache.org/r1587508
Log:
HTTPASYNC-74: support for shared connection managers
Modified:
httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/CloseableHttpAsyncClientBase.java
httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/HttpAsyncClientBuilder.java
httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/InternalHttpAsyncClient.java
httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalHttpAsyncClient.java
httpcomponents/httpasyncclient/trunk/httpasyncclient/src/test/java/org/apache/http/nio/client/integration/TestHttpAsync.java
Modified: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/CloseableHttpAsyncClientBase.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/CloseableHttpAsyncClientBase.java?rev=1587508&r1=1587507&r2=1587508&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/CloseableHttpAsyncClientBase.java (original)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/CloseableHttpAsyncClientBase.java Tue Apr 15 10:54:19 2014
@@ -26,21 +26,22 @@
*/
package org.apache.http.impl.nio.client;
+import java.io.IOException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.nio.NHttpClientEventHandler;
import org.apache.http.nio.conn.NHttpClientConnectionManager;
import org.apache.http.nio.reactor.IOEventDispatch;
-
-import java.io.IOException;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicReference;
+import org.apache.http.util.Asserts;
abstract class CloseableHttpAsyncClientBase extends CloseableHttpAsyncClient {
private final Log log = LogFactory.getLog(getClass());
- static enum Status { INACTIVE, ACTIVE, STOPPED }
+ static enum Status {INACTIVE, ACTIVE, STOPPED}
private final NHttpClientConnectionManager connmgr;
private final Thread reactorThread;
@@ -53,60 +54,66 @@ abstract class CloseableHttpAsyncClientB
final NHttpClientEventHandler handler) {
super();
this.connmgr = connmgr;
- this.reactorThread = threadFactory.newThread(new Runnable() {
+ if (threadFactory != null && handler != null) {
+ this.reactorThread = threadFactory.newThread(new Runnable() {
- public void run() {
- doExecute(handler);
- }
-
- });
+ @Override
+ public void run() {
+ try {
+ final IOEventDispatch ioEventDispatch = new InternalIODispatch(handler);
+ connmgr.execute(ioEventDispatch);
+ } catch (final Exception ex) {
+ log.error("I/O reactor terminated abnormally", ex);
+ } finally {
+ status.set(Status.STOPPED);
+ }
+ }
+
+ });
+ } else {
+ this.reactorThread = null;
+ }
this.status = new AtomicReference<Status>(Status.INACTIVE);
}
- private void doExecute(final NHttpClientEventHandler handler) {
- try {
- final IOEventDispatch ioEventDispatch = new InternalIODispatch(handler);
- this.connmgr.execute(ioEventDispatch);
- } catch (final Exception ex) {
- this.log.error("I/O reactor terminated abnormally", ex);
- } finally {
- this.status.set(Status.STOPPED);
- }
+ void startConnManager(final NHttpClientEventHandler handler) {
}
@Override
public void start() {
if (this.status.compareAndSet(Status.INACTIVE, Status.ACTIVE)) {
- this.reactorThread.start();
+ if (this.reactorThread != null) {
+ this.reactorThread.start();
+ }
}
}
- public void shutdown() {
- if (this.status.compareAndSet(Status.ACTIVE, Status.STOPPED)) {
- try {
- this.connmgr.shutdown();
- } catch (final IOException ex) {
- this.log.error("I/O error shutting down connection manager", ex);
- }
- try {
- this.reactorThread.join();
- } catch (final InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- }
+ protected void ensureRunning() {
+ final Status currentStatus = this.status.get();
+ Asserts.check(currentStatus == Status.ACTIVE, "Request cannot be executed; " +
+ "I/O reactor status: %s", currentStatus);
}
public void close() {
- shutdown();
+ if (this.status.compareAndSet(Status.ACTIVE, Status.STOPPED)) {
+ if (this.reactorThread != null) {
+ try {
+ this.connmgr.shutdown();
+ } catch (IOException ex) {
+ this.log.error("I/O error shutting down connection manager", ex);
+ }
+ try {
+ this.reactorThread.join();
+ } catch (final InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
}
@Override
public boolean isRunning() {
- return getStatus() == Status.ACTIVE;
- }
-
- Status getStatus() {
- return this.status.get();
+ return this.status.get() == Status.ACTIVE;
}
}
Modified: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/HttpAsyncClientBuilder.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/HttpAsyncClientBuilder.java?rev=1587508&r1=1587507&r2=1587508&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/HttpAsyncClientBuilder.java (original)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/HttpAsyncClientBuilder.java Tue Apr 15 10:54:19 2014
@@ -152,6 +152,7 @@ public class HttpAsyncClientBuilder {
}
private NHttpClientConnectionManager connManager;
+ private boolean connManagerShared;
private SchemePortResolver schemePortResolver;
private SchemeIOSessionStrategy sslStrategy;
private X509HostnameVerifier hostnameVerifier;
@@ -205,7 +206,26 @@ public class HttpAsyncClientBuilder {
*/
public final HttpAsyncClientBuilder setConnectionManager(
final NHttpClientConnectionManager connManager) {
+ return setConnectionManager(connManager, false);
+ }
+
+ /**
+ * Assigns {@link NHttpClientConnectionManager} instance.
+ * <p/>
+ * If the connection manager is shared its life-cycle is expected
+ * to be managed by the caller and it will not be shut down
+ * if the client is closed.
+ *
+ * @param connManager connection manager
+ * @param shared defines whether or not the connection manager can be shared
+ * by multiple clients.
+ *
+ * @since 4.1
+ */
+ public final HttpAsyncClientBuilder setConnectionManager(
+ final NHttpClientConnectionManager connManager, final boolean shared) {
this.connManager = connManager;
+ this.connManagerShared = shared;
return this;
}
@@ -786,16 +806,6 @@ public class HttpAsyncClientBuilder {
defaultRequestConfig = RequestConfig.DEFAULT;
}
- ThreadFactory threadFactory = this.threadFactory;
- if (threadFactory == null) {
- threadFactory = Executors.defaultThreadFactory();
- }
-
- NHttpClientEventHandler eventHandler = this.eventHandler;
- if (eventHandler == null) {
- eventHandler = new LoggingAsyncRequestExecutor();
- }
-
final MainClientExec exec = new MainClientExec(
connManager,
httpprocessor,
@@ -807,16 +817,28 @@ public class HttpAsyncClientBuilder {
proxyAuthStrategy,
userTokenHandler);
+ ThreadFactory threadFactory = null;
+ NHttpClientEventHandler eventHandler = null;
+ if (!this.connManagerShared) {
+ threadFactory = this.threadFactory;
+ if (threadFactory == null) {
+ threadFactory = Executors.defaultThreadFactory();
+ }
+ eventHandler = this.eventHandler;
+ if (eventHandler == null) {
+ eventHandler = new LoggingAsyncRequestExecutor();
+ }
+ }
return new InternalHttpAsyncClient(
connManager,
+ threadFactory,
+ eventHandler,
exec,
cookieSpecRegistry,
authSchemeRegistry,
defaultCookieStore,
defaultCredentialsProvider,
- defaultRequestConfig,
- threadFactory,
- eventHandler);
+ defaultRequestConfig);
}
}
Modified: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/InternalHttpAsyncClient.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/InternalHttpAsyncClient.java?rev=1587508&r1=1587507&r2=1587508&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/InternalHttpAsyncClient.java (original)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/InternalHttpAsyncClient.java Tue Apr 15 10:54:19 2014
@@ -47,7 +47,6 @@ import org.apache.http.nio.protocol.Http
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.HttpContext;
-import org.apache.http.util.Asserts;
class InternalHttpAsyncClient extends CloseableHttpAsyncClientBase {
@@ -63,15 +62,15 @@ class InternalHttpAsyncClient extends Cl
public InternalHttpAsyncClient(
final NHttpClientConnectionManager connmgr,
+ final ThreadFactory threadFactory,
+ final NHttpClientEventHandler handler,
final InternalClientExec exec,
final Lookup<CookieSpecProvider> cookieSpecRegistry,
final Lookup<AuthSchemeProvider> authSchemeRegistry,
final CookieStore cookieStore,
final CredentialsProvider credentialsProvider,
- final RequestConfig defaultConfig,
- final ThreadFactory threadFactory,
- final NHttpClientEventHandler eventHandler) {
- super(connmgr, threadFactory, eventHandler);
+ final RequestConfig defaultConfig) {
+ super(connmgr, threadFactory, handler);
this.connmgr = connmgr;
this.exec = exec;
this.cookieSpecRegistry = cookieSpecRegistry;
@@ -110,9 +109,7 @@ class InternalHttpAsyncClient extends Cl
final HttpAsyncResponseConsumer<T> responseConsumer,
final HttpContext context,
final FutureCallback<T> callback) {
- final Status status = getStatus();
- Asserts.check(status == Status.ACTIVE, "Request cannot be executed; " +
- "I/O reactor status: %s", status);
+ ensureRunning();
final BasicFuture<T> future = new BasicFuture<T>(callback);
final HttpClientContext localcontext = HttpClientContext.adapt(
context != null ? context : new BasicHttpContext());
Modified: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalHttpAsyncClient.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalHttpAsyncClient.java?rev=1587508&r1=1587507&r2=1587508&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalHttpAsyncClient.java (original)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalHttpAsyncClient.java Tue Apr 15 10:54:19 2014
@@ -49,7 +49,6 @@ import org.apache.http.protocol.Immutabl
import org.apache.http.protocol.RequestContent;
import org.apache.http.protocol.RequestTargetHost;
import org.apache.http.protocol.RequestUserAgent;
-import org.apache.http.util.Asserts;
import org.apache.http.util.VersionInfo;
class MinimalHttpAsyncClient extends CloseableHttpAsyncClientBase {
@@ -78,14 +77,8 @@ class MinimalHttpAsyncClient extends Clo
}
public MinimalHttpAsyncClient(
- final NHttpClientConnectionManager connmgr,
- final ThreadFactory threadFactory) {
- this(connmgr, threadFactory, new LoggingAsyncRequestExecutor());
- }
-
- public MinimalHttpAsyncClient(
final NHttpClientConnectionManager connmgr) {
- this(connmgr, Executors.defaultThreadFactory());
+ this(connmgr, Executors.defaultThreadFactory(), new LoggingAsyncRequestExecutor());
}
public <T> Future<T> execute(
@@ -93,9 +86,7 @@ class MinimalHttpAsyncClient extends Clo
final HttpAsyncResponseConsumer<T> responseConsumer,
final HttpContext context,
final FutureCallback<T> callback) {
- final Status status = getStatus();
- Asserts.check(status == Status.ACTIVE, "Request cannot be executed; " +
- "I/O reactor status: %s", status);
+ ensureRunning();
final BasicFuture<T> future = new BasicFuture<T>(callback);
final HttpClientContext localcontext = HttpClientContext.adapt(
context != null ? context : new BasicHttpContext());
Modified: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/test/java/org/apache/http/nio/client/integration/TestHttpAsync.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/test/java/org/apache/http/nio/client/integration/TestHttpAsync.java?rev=1587508&r1=1587507&r2=1587508&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/test/java/org/apache/http/nio/client/integration/TestHttpAsync.java (original)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/test/java/org/apache/http/nio/client/integration/TestHttpAsync.java Tue Apr 15 10:54:19 2014
@@ -45,6 +45,7 @@ import org.apache.http.impl.DefaultConne
import org.apache.http.impl.DefaultHttpResponseFactory;
import org.apache.http.impl.nio.DefaultNHttpServerConnection;
import org.apache.http.impl.nio.DefaultNHttpServerConnectionFactory;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.localserver.EchoHandler;
import org.apache.http.localserver.RandomHandler;
@@ -243,4 +244,36 @@ public class TestHttpAsync extends HttpA
}
}
+ @Test
+ public void testSharedPool() throws Exception {
+ final HttpHost target = start();
+ final HttpGet httpget = new HttpGet("/random/2048");
+ final Future<HttpResponse> future = this.httpclient.execute(target, httpget, null);
+ final HttpResponse response = future.get();
+ Assert.assertNotNull(response);
+ Assert.assertEquals(200, response.getStatusLine().getStatusCode());
+
+
+ final CloseableHttpAsyncClient httpclient2 = HttpAsyncClients.custom()
+ .setConnectionManager(this.connMgr, true)
+ .build();
+ try {
+ httpclient2.start();
+ final HttpGet httpget2 = new HttpGet("/random/2048");
+ final Future<HttpResponse> future2 = httpclient2.execute(target, httpget2, null);
+ final HttpResponse response2 = future2.get();
+ Assert.assertNotNull(response2);
+ Assert.assertEquals(200, response2.getStatusLine().getStatusCode());
+
+ } finally {
+ httpclient2.close();
+ }
+
+ final HttpGet httpget3 = new HttpGet("/random/2048");
+ final Future<HttpResponse> future3 = this.httpclient.execute(target, httpget3, null);
+ final HttpResponse response3 = future3.get();
+ Assert.assertNotNull(response3);
+ Assert.assertEquals(200, response3.getStatusLine().getStatusCode());
+ }
+
}