You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2014/04/15 12:54:19 UTC

svn commit: r1587508 - in /httpcomponents/httpasyncclient/trunk/httpasyncclient/src: main/java/org/apache/http/impl/nio/client/ test/java/org/apache/http/nio/client/integration/

Author: olegk
Date: Tue Apr 15 10:54:19 2014
New Revision: 1587508

URL: http://svn.apache.org/r1587508
Log:
HTTPASYNC-74: support for shared connection managers

Modified:
    httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/CloseableHttpAsyncClientBase.java
    httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/HttpAsyncClientBuilder.java
    httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/InternalHttpAsyncClient.java
    httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalHttpAsyncClient.java
    httpcomponents/httpasyncclient/trunk/httpasyncclient/src/test/java/org/apache/http/nio/client/integration/TestHttpAsync.java

Modified: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/CloseableHttpAsyncClientBase.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/CloseableHttpAsyncClientBase.java?rev=1587508&r1=1587507&r2=1587508&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/CloseableHttpAsyncClientBase.java (original)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/CloseableHttpAsyncClientBase.java Tue Apr 15 10:54:19 2014
@@ -26,21 +26,22 @@
  */
 package org.apache.http.impl.nio.client;
 
+import java.io.IOException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.http.nio.NHttpClientEventHandler;
 import org.apache.http.nio.conn.NHttpClientConnectionManager;
 import org.apache.http.nio.reactor.IOEventDispatch;
-
-import java.io.IOException;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicReference;
+import org.apache.http.util.Asserts;
 
 abstract class CloseableHttpAsyncClientBase extends CloseableHttpAsyncClient {
 
     private final Log log = LogFactory.getLog(getClass());
 
-    static enum Status { INACTIVE, ACTIVE, STOPPED }
+    static enum Status {INACTIVE, ACTIVE, STOPPED}
 
     private final NHttpClientConnectionManager connmgr;
     private final Thread reactorThread;
@@ -53,60 +54,66 @@ abstract class CloseableHttpAsyncClientB
             final NHttpClientEventHandler handler) {
         super();
         this.connmgr = connmgr;
-        this.reactorThread = threadFactory.newThread(new Runnable() {
+        if (threadFactory != null && handler != null) {
+            this.reactorThread = threadFactory.newThread(new Runnable() {
 
-            public void run() {
-                doExecute(handler);
-            }
-
-        });
+                @Override
+                public void run() {
+                    try {
+                        final IOEventDispatch ioEventDispatch = new InternalIODispatch(handler);
+                        connmgr.execute(ioEventDispatch);
+                    } catch (final Exception ex) {
+                        log.error("I/O reactor terminated abnormally", ex);
+                    } finally {
+                        status.set(Status.STOPPED);
+                    }
+                }
+
+            });
+        } else {
+            this.reactorThread = null;
+        }
         this.status = new AtomicReference<Status>(Status.INACTIVE);
     }
 
-    private void doExecute(final NHttpClientEventHandler handler) {
-        try {
-            final IOEventDispatch ioEventDispatch = new InternalIODispatch(handler);
-            this.connmgr.execute(ioEventDispatch);
-        } catch (final Exception ex) {
-            this.log.error("I/O reactor terminated abnormally", ex);
-        } finally {
-            this.status.set(Status.STOPPED);
-        }
+    void startConnManager(final NHttpClientEventHandler handler) {
     }
 
     @Override
     public void start() {
         if (this.status.compareAndSet(Status.INACTIVE, Status.ACTIVE)) {
-            this.reactorThread.start();
+            if (this.reactorThread != null) {
+                this.reactorThread.start();
+            }
         }
     }
 
-    public void shutdown() {
-        if (this.status.compareAndSet(Status.ACTIVE, Status.STOPPED)) {
-            try {
-                this.connmgr.shutdown();
-            } catch (final IOException ex) {
-                this.log.error("I/O error shutting down connection manager", ex);
-            }
-            try {
-                this.reactorThread.join();
-            } catch (final InterruptedException ex) {
-                Thread.currentThread().interrupt();
-            }
-        }
+    protected void ensureRunning() {
+        final Status currentStatus = this.status.get();
+        Asserts.check(currentStatus == Status.ACTIVE, "Request cannot be executed; " +
+                "I/O reactor status: %s", currentStatus);
     }
 
     public void close() {
-        shutdown();
+        if (this.status.compareAndSet(Status.ACTIVE, Status.STOPPED)) {
+            if (this.reactorThread != null) {
+                try {
+                    this.connmgr.shutdown();
+                } catch (IOException ex) {
+                    this.log.error("I/O error shutting down connection manager", ex);
+                }
+                try {
+                    this.reactorThread.join();
+                } catch (final InterruptedException ex) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
     }
 
     @Override
     public boolean isRunning() {
-        return getStatus() == Status.ACTIVE;
-    }
-
-    Status getStatus() {
-        return this.status.get();
+        return this.status.get() == Status.ACTIVE;
     }
 
 }

Modified: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/HttpAsyncClientBuilder.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/HttpAsyncClientBuilder.java?rev=1587508&r1=1587507&r2=1587508&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/HttpAsyncClientBuilder.java (original)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/HttpAsyncClientBuilder.java Tue Apr 15 10:54:19 2014
@@ -152,6 +152,7 @@ public class HttpAsyncClientBuilder {
     }
 
     private NHttpClientConnectionManager connManager;
+    private boolean connManagerShared;
     private SchemePortResolver schemePortResolver;
     private SchemeIOSessionStrategy sslStrategy;
     private X509HostnameVerifier hostnameVerifier;
@@ -205,7 +206,26 @@ public class HttpAsyncClientBuilder {
      */
     public final HttpAsyncClientBuilder setConnectionManager(
             final NHttpClientConnectionManager connManager) {
+        return setConnectionManager(connManager, false);
+    }
+
+    /**
+     * Assigns {@link NHttpClientConnectionManager} instance.
+     * <p/>
+     * If the connection manager is shared its life-cycle is expected
+     * to be managed by the caller and it will not be shut down
+     * if the client is closed.
+     *
+     * @param connManager connection manager
+     * @param shared defines whether or not the connection manager can be shared
+     *  by multiple clients.
+     *
+     * @since 4.1
+     */
+    public final HttpAsyncClientBuilder setConnectionManager(
+            final NHttpClientConnectionManager connManager, final boolean shared) {
         this.connManager = connManager;
+        this.connManagerShared = shared;
         return this;
     }
 
@@ -786,16 +806,6 @@ public class HttpAsyncClientBuilder {
             defaultRequestConfig = RequestConfig.DEFAULT;
         }
 
-        ThreadFactory threadFactory = this.threadFactory;
-        if (threadFactory == null) {
-            threadFactory = Executors.defaultThreadFactory();
-        }
-
-        NHttpClientEventHandler eventHandler = this.eventHandler;
-        if (eventHandler == null) {
-            eventHandler = new LoggingAsyncRequestExecutor();
-        }
-
         final MainClientExec exec = new MainClientExec(
             connManager,
             httpprocessor,
@@ -807,16 +817,28 @@ public class HttpAsyncClientBuilder {
             proxyAuthStrategy,
             userTokenHandler);
 
+        ThreadFactory threadFactory = null;
+        NHttpClientEventHandler eventHandler = null;
+        if (!this.connManagerShared) {
+            threadFactory = this.threadFactory;
+            if (threadFactory == null) {
+                threadFactory = Executors.defaultThreadFactory();
+            }
+            eventHandler = this.eventHandler;
+            if (eventHandler == null) {
+                eventHandler = new LoggingAsyncRequestExecutor();
+            }
+        }
         return new InternalHttpAsyncClient(
             connManager,
+            threadFactory,
+            eventHandler,
             exec,
             cookieSpecRegistry,
             authSchemeRegistry,
             defaultCookieStore,
             defaultCredentialsProvider,
-            defaultRequestConfig,
-            threadFactory,
-            eventHandler);
+            defaultRequestConfig);
     }
 
 }

Modified: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/InternalHttpAsyncClient.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/InternalHttpAsyncClient.java?rev=1587508&r1=1587507&r2=1587508&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/InternalHttpAsyncClient.java (original)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/InternalHttpAsyncClient.java Tue Apr 15 10:54:19 2014
@@ -47,7 +47,6 @@ import org.apache.http.nio.protocol.Http
 import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
 import org.apache.http.protocol.BasicHttpContext;
 import org.apache.http.protocol.HttpContext;
-import org.apache.http.util.Asserts;
 
 class InternalHttpAsyncClient extends CloseableHttpAsyncClientBase {
 
@@ -63,15 +62,15 @@ class InternalHttpAsyncClient extends Cl
 
     public InternalHttpAsyncClient(
             final NHttpClientConnectionManager connmgr,
+            final ThreadFactory threadFactory,
+            final NHttpClientEventHandler handler,
             final InternalClientExec exec,
             final Lookup<CookieSpecProvider> cookieSpecRegistry,
             final Lookup<AuthSchemeProvider> authSchemeRegistry,
             final CookieStore cookieStore,
             final CredentialsProvider credentialsProvider,
-            final RequestConfig defaultConfig,
-            final ThreadFactory threadFactory,
-            final NHttpClientEventHandler eventHandler) {
-        super(connmgr, threadFactory, eventHandler);
+            final RequestConfig defaultConfig) {
+        super(connmgr, threadFactory, handler);
         this.connmgr = connmgr;
         this.exec = exec;
         this.cookieSpecRegistry = cookieSpecRegistry;
@@ -110,9 +109,7 @@ class InternalHttpAsyncClient extends Cl
             final HttpAsyncResponseConsumer<T> responseConsumer,
             final HttpContext context,
             final FutureCallback<T> callback) {
-        final Status status = getStatus();
-        Asserts.check(status == Status.ACTIVE, "Request cannot be executed; " +
-                "I/O reactor status: %s", status);
+        ensureRunning();
         final BasicFuture<T> future = new BasicFuture<T>(callback);
         final HttpClientContext localcontext = HttpClientContext.adapt(
             context != null ? context : new BasicHttpContext());

Modified: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalHttpAsyncClient.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalHttpAsyncClient.java?rev=1587508&r1=1587507&r2=1587508&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalHttpAsyncClient.java (original)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalHttpAsyncClient.java Tue Apr 15 10:54:19 2014
@@ -49,7 +49,6 @@ import org.apache.http.protocol.Immutabl
 import org.apache.http.protocol.RequestContent;
 import org.apache.http.protocol.RequestTargetHost;
 import org.apache.http.protocol.RequestUserAgent;
-import org.apache.http.util.Asserts;
 import org.apache.http.util.VersionInfo;
 
 class MinimalHttpAsyncClient extends CloseableHttpAsyncClientBase {
@@ -78,14 +77,8 @@ class MinimalHttpAsyncClient extends Clo
     }
 
     public MinimalHttpAsyncClient(
-            final NHttpClientConnectionManager connmgr,
-            final ThreadFactory threadFactory) {
-        this(connmgr, threadFactory, new LoggingAsyncRequestExecutor());
-    }
-
-    public MinimalHttpAsyncClient(
             final NHttpClientConnectionManager connmgr) {
-        this(connmgr, Executors.defaultThreadFactory());
+        this(connmgr, Executors.defaultThreadFactory(), new LoggingAsyncRequestExecutor());
     }
 
     public <T> Future<T> execute(
@@ -93,9 +86,7 @@ class MinimalHttpAsyncClient extends Clo
             final HttpAsyncResponseConsumer<T> responseConsumer,
             final HttpContext context,
             final FutureCallback<T> callback) {
-        final Status status = getStatus();
-        Asserts.check(status == Status.ACTIVE, "Request cannot be executed; " +
-                "I/O reactor status: %s", status);
+        ensureRunning();
         final BasicFuture<T> future = new BasicFuture<T>(callback);
         final HttpClientContext localcontext = HttpClientContext.adapt(
             context != null ? context : new BasicHttpContext());

Modified: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/test/java/org/apache/http/nio/client/integration/TestHttpAsync.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/test/java/org/apache/http/nio/client/integration/TestHttpAsync.java?rev=1587508&r1=1587507&r2=1587508&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/test/java/org/apache/http/nio/client/integration/TestHttpAsync.java (original)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/test/java/org/apache/http/nio/client/integration/TestHttpAsync.java Tue Apr 15 10:54:19 2014
@@ -45,6 +45,7 @@ import org.apache.http.impl.DefaultConne
 import org.apache.http.impl.DefaultHttpResponseFactory;
 import org.apache.http.impl.nio.DefaultNHttpServerConnection;
 import org.apache.http.impl.nio.DefaultNHttpServerConnectionFactory;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
 import org.apache.http.impl.nio.client.HttpAsyncClients;
 import org.apache.http.localserver.EchoHandler;
 import org.apache.http.localserver.RandomHandler;
@@ -243,4 +244,36 @@ public class TestHttpAsync extends HttpA
         }
     }
 
+    @Test
+    public void testSharedPool() throws Exception {
+        final HttpHost target = start();
+        final HttpGet httpget = new HttpGet("/random/2048");
+        final Future<HttpResponse> future = this.httpclient.execute(target, httpget, null);
+        final HttpResponse response = future.get();
+        Assert.assertNotNull(response);
+        Assert.assertEquals(200, response.getStatusLine().getStatusCode());
+
+
+        final CloseableHttpAsyncClient httpclient2 = HttpAsyncClients.custom()
+                .setConnectionManager(this.connMgr, true)
+                .build();
+        try {
+            httpclient2.start();
+            final HttpGet httpget2 = new HttpGet("/random/2048");
+            final Future<HttpResponse> future2 = httpclient2.execute(target, httpget2, null);
+            final HttpResponse response2 = future2.get();
+            Assert.assertNotNull(response2);
+            Assert.assertEquals(200, response2.getStatusLine().getStatusCode());
+
+        } finally {
+            httpclient2.close();
+        }
+
+        final HttpGet httpget3 = new HttpGet("/random/2048");
+        final Future<HttpResponse> future3 = this.httpclient.execute(target, httpget3, null);
+        final HttpResponse response3 = future3.get();
+        Assert.assertNotNull(response3);
+        Assert.assertEquals(200, response3.getStatusLine().getStatusCode());
+    }
+
 }