You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2018/09/20 15:32:19 UTC

httpcomponents-client git commit: Async clients to support request specific push consumers [Forced Update!]

Repository: httpcomponents-client
Updated Branches:
  refs/heads/master f30fa6d54 -> 9eb00018c (forced update)


Async clients to support request specific push consumers


Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-client/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-client/commit/9eb00018
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-client/tree/9eb00018
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-client/diff/9eb00018

Branch: refs/heads/master
Commit: 9eb00018cefc94ce405a54ff48bb4bbd2d2d90aa
Parents: 7f3539c
Author: Oleg Kalnichevski <ol...@apache.org>
Authored: Thu Sep 20 17:16:06 2018 +0200
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Thu Sep 20 17:16:06 2018 +0200

----------------------------------------------------------------------
 .../hc/client5/http/async/HttpAsyncClient.java  | 19 +++--------
 .../AbstractMinimalHttpAsyncClientBase.java     | 12 +++++--
 .../impl/async/CloseableHttpAsyncClient.java    | 16 ++++++++--
 .../async/InternalAbstractHttpAsyncClient.java  |  7 +++--
 .../impl/async/InternalHttp2AsyncClient.java    |  6 ++--
 .../async/InternalHttp2AsyncExecRuntime.java    | 15 ++++++---
 .../impl/async/InternalHttpAsyncClient.java     |  6 ++--
 .../async/InternalHttpAsyncExecRuntime.java     |  9 ++++--
 .../impl/async/MinimalHttp2AsyncClient.java     | 13 ++++++--
 .../http/impl/async/MinimalHttpAsyncClient.java |  4 ++-
 .../PoolingAsyncClientConnectionManager.java    |  9 ++++--
 .../http/nio/AsyncConnectionEndpoint.java       | 33 ++++++++++++++++++--
 12 files changed, 110 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/9eb00018/httpclient5/src/main/java/org/apache/hc/client5/http/async/HttpAsyncClient.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/async/HttpAsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/async/HttpAsyncClient.java
index 1413968..3e1c684 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/async/HttpAsyncClient.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/async/HttpAsyncClient.java
@@ -29,10 +29,10 @@ package org.apache.hc.client5.http.async;
 import java.util.concurrent.Future;
 
 import org.apache.hc.core5.concurrent.FutureCallback;
-import org.apache.hc.core5.function.Supplier;
 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
 import org.apache.hc.core5.http.protocol.HttpContext;
 
 /**
@@ -59,25 +59,16 @@ public interface HttpAsyncClient {
      * @param <T> the result type of request execution.
      * @param requestProducer request producer callback.
      * @param responseConsumer response consumer callback.
-     * @param context HTTP context
-     * @param callback future callback.
+     * @param pushHandlerFactory the push handler factory. Optional and may be {@code null}.
+     * @param context HTTP context. Optional and may be {@code null}.
+     * @param callback future callback. Optional and may be {@code null}.
      * @return future representing pending completion of the operation.
      */
     <T> Future<T> execute(
             AsyncRequestProducer requestProducer,
             AsyncResponseConsumer<T> responseConsumer,
+            HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
             HttpContext context,
             FutureCallback<T> callback);
 
-    /**
-     * Registers {@link AsyncPushConsumer} for the given host and the URI pattern.
-     *
-     * @param hostname the name of the host this consumer intended for.
-     *                 Can be {@code null} if applies to all hosts
-     * @param uriPattern URI request pattern
-     * @param supplier supplier that will be used to supply a consumer instance
-     *                 for the given combination of hostname and URI pattern.
-     */
-    void register(String hostname, String uriPattern, Supplier<AsyncPushConsumer> supplier);
-
 }

http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/9eb00018/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AbstractMinimalHttpAsyncClientBase.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AbstractMinimalHttpAsyncClientBase.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AbstractMinimalHttpAsyncClientBase.java
index 3c0505f..0ff5459 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AbstractMinimalHttpAsyncClientBase.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AbstractMinimalHttpAsyncClientBase.java
@@ -34,8 +34,10 @@ import org.apache.hc.core5.concurrent.Cancellable;
 import org.apache.hc.core5.concurrent.ComplexFuture;
 import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
 import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
 import org.apache.hc.core5.http.protocol.HttpContext;
 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
@@ -53,6 +55,7 @@ abstract class AbstractMinimalHttpAsyncClientBase extends AbstractHttpAsyncClien
     public final <T> Future<T> execute(
             final AsyncRequestProducer requestProducer,
             final AsyncResponseConsumer<T> responseConsumer,
+            final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
             final HttpContext context,
             final FutureCallback<T> callback) {
         final ComplexFuture<T> future = new ComplexFuture<>(callback);
@@ -76,14 +79,17 @@ abstract class AbstractMinimalHttpAsyncClientBase extends AbstractHttpAsyncClien
                         future.cancel();
                     }
 
-                })));
+                }), pushHandlerFactory, context));
         return future;
     }
 
     public final Cancellable execute(final AsyncClientExchangeHandler exchangeHandler) {
-        return execute(exchangeHandler, HttpClientContext.create());
+        return execute(exchangeHandler, null, HttpClientContext.create());
     }
 
-    public abstract Cancellable execute(AsyncClientExchangeHandler exchangeHandler, HttpContext context);
+    public abstract Cancellable execute(
+            AsyncClientExchangeHandler exchangeHandler,
+            HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
+            HttpContext context);
 
 }

http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/9eb00018/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/CloseableHttpAsyncClient.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/CloseableHttpAsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/CloseableHttpAsyncClient.java
index 2e82ee1..65dbe51 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/CloseableHttpAsyncClient.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/CloseableHttpAsyncClient.java
@@ -74,12 +74,16 @@ public abstract class CloseableHttpAsyncClient implements HttpAsyncClient, Close
     public final <T> Future<T> execute(
             final AsyncRequestProducer requestProducer,
             final AsyncResponseConsumer<T> responseConsumer,
+            final HttpContext context,
             final FutureCallback<T> callback) {
-        return execute(requestProducer, responseConsumer, HttpClientContext.create(), callback);
+        return execute(requestProducer, responseConsumer, null, context, callback);
     }
 
-    public final void register(final String uriPattern, final Supplier<AsyncPushConsumer> supplier) {
-        register(null, uriPattern, supplier);
+    public final <T> Future<T> execute(
+            final AsyncRequestProducer requestProducer,
+            final AsyncResponseConsumer<T> responseConsumer,
+            final FutureCallback<T> callback) {
+        return execute(requestProducer, responseConsumer, HttpClientContext.create(), callback);
     }
 
     public final Future<SimpleHttpResponse> execute(
@@ -115,4 +119,10 @@ public abstract class CloseableHttpAsyncClient implements HttpAsyncClient, Close
         return execute(request, HttpClientContext.create(), callback);
     }
 
+    public abstract void register(String hostname, String uriPattern, Supplier<AsyncPushConsumer> supplier);
+
+    public final void register(final String uriPattern, final Supplier<AsyncPushConsumer> supplier) {
+        register(null, uriPattern, supplier);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/9eb00018/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java
index cf93234..3fdbe55 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java
@@ -57,9 +57,11 @@ import org.apache.hc.core5.http.HttpStatus;
 import org.apache.hc.core5.http.config.Lookup;
 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
 import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.HandlerFactory;
 import org.apache.hc.core5.http.nio.RequestChannel;
 import org.apache.hc.core5.http.protocol.HttpContext;
 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
@@ -127,7 +129,7 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
         }
     }
 
-    abstract AsyncExecRuntime crerateAsyncExecRuntime();
+    abstract AsyncExecRuntime crerateAsyncExecRuntime(HandlerFactory<AsyncPushConsumer> pushHandlerFactory);
 
     abstract HttpRoute determineRoute(HttpRequest request, HttpClientContext clientContext) throws HttpException;
 
@@ -135,6 +137,7 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
     public <T> Future<T> execute(
             final AsyncRequestProducer requestProducer,
             final AsyncResponseConsumer<T> responseConsumer,
+            final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
             final HttpContext context,
             final FutureCallback<T> callback) {
         ensureRunning();
@@ -158,7 +161,7 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
                     }
                     final HttpRoute route = determineRoute(request, clientContext);
                     final String exchangeId = ExecSupport.getNextExchangeId();
-                    final AsyncExecRuntime execRuntime = crerateAsyncExecRuntime();
+                    final AsyncExecRuntime execRuntime = crerateAsyncExecRuntime(pushHandlerFactory);
                     if (log.isDebugEnabled()) {
                         log.debug(exchangeId + ": preparing request execution");
                     }

http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/9eb00018/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncClient.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncClient.java
index 8be84b5..af80542 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncClient.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncClient.java
@@ -43,6 +43,8 @@ import org.apache.hc.client5.http.routing.RoutingSupport;
 import org.apache.hc.core5.http.HttpException;
 import org.apache.hc.core5.http.HttpRequest;
 import org.apache.hc.core5.http.config.Lookup;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
 import org.apache.hc.core5.http2.nio.pool.H2ConnPool;
 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
 
@@ -71,8 +73,8 @@ class InternalHttp2AsyncClient extends InternalAbstractHttpAsyncClient {
     }
 
     @Override
-    AsyncExecRuntime crerateAsyncExecRuntime() {
-        return new InternalHttp2AsyncExecRuntime(log, connPool);
+    AsyncExecRuntime crerateAsyncExecRuntime(final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
+        return new InternalHttp2AsyncExecRuntime(log, connPool, pushHandlerFactory);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/9eb00018/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncExecRuntime.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncExecRuntime.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncExecRuntime.java
index f5f3112..715222d 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncExecRuntime.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncExecRuntime.java
@@ -41,6 +41,8 @@ import org.apache.hc.core5.concurrent.ComplexCancellable;
 import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.http.HttpHost;
 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
 import org.apache.hc.core5.http2.nio.pool.H2ConnPool;
 import org.apache.hc.core5.io.CloseMode;
@@ -53,13 +55,18 @@ class InternalHttp2AsyncExecRuntime implements AsyncExecRuntime {
 
     private final Logger log;
     private final H2ConnPool connPool;
+    private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
     private final AtomicReference<Endpoint> sessionRef;
     private volatile boolean reusable;
 
-    InternalHttp2AsyncExecRuntime(final Logger log, final H2ConnPool connPool) {
+    InternalHttp2AsyncExecRuntime(
+            final Logger log,
+            final H2ConnPool connPool,
+            final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
         super();
         this.log = log;
         this.connPool = connPool;
+        this.pushHandlerFactory = pushHandlerFactory;
         this.sessionRef = new AtomicReference<>(null);
     }
 
@@ -198,7 +205,7 @@ class InternalHttp2AsyncExecRuntime implements AsyncExecRuntime {
                 log.debug(ConnPoolSupport.getId(endpoint) + ": executing " + ConnPoolSupport.getId(exchangeHandler));
             }
             session.enqueue(
-                    new RequestExecutionCommand(exchangeHandler, null, complexCancellable, context),
+                    new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
                     Command.Priority.NORMAL);
         } else {
             final HttpHost target = endpoint.target;
@@ -213,7 +220,7 @@ class InternalHttp2AsyncExecRuntime implements AsyncExecRuntime {
                         log.debug(ConnPoolSupport.getId(endpoint) + ": executing " + ConnPoolSupport.getId(exchangeHandler));
                     }
                     session.enqueue(
-                            new RequestExecutionCommand(exchangeHandler, null, complexCancellable, context),
+                            new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
                             Command.Priority.NORMAL);
                 }
 
@@ -255,7 +262,7 @@ class InternalHttp2AsyncExecRuntime implements AsyncExecRuntime {
 
     @Override
     public AsyncExecRuntime fork() {
-        return new InternalHttp2AsyncExecRuntime(log, connPool);
+        return new InternalHttp2AsyncExecRuntime(log, connPool, pushHandlerFactory);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/9eb00018/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java
index eed92f1..b0e46d0 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java
@@ -46,6 +46,8 @@ import org.apache.hc.core5.http.HttpRequest;
 import org.apache.hc.core5.http.HttpVersion;
 import org.apache.hc.core5.http.ProtocolVersion;
 import org.apache.hc.core5.http.config.Lookup;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
 import org.apache.hc.core5.http2.HttpVersionPolicy;
 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
 
@@ -77,8 +79,8 @@ class InternalHttpAsyncClient extends InternalAbstractHttpAsyncClient {
     }
 
     @Override
-    AsyncExecRuntime crerateAsyncExecRuntime() {
-        return new InternalHttpAsyncExecRuntime(log, connmgr, getConnectionInitiator(), versionPolicy);
+    AsyncExecRuntime crerateAsyncExecRuntime(final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
+        return new InternalHttpAsyncExecRuntime(log, connmgr, getConnectionInitiator(), pushHandlerFactory, versionPolicy);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/9eb00018/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java
index 6145751..5f513da 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java
@@ -42,6 +42,8 @@ import org.apache.hc.client5.http.protocol.HttpClientContext;
 import org.apache.hc.core5.concurrent.Cancellable;
 import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
 import org.apache.hc.core5.http2.HttpVersionPolicy;
 import org.apache.hc.core5.reactor.ConnectionInitiator;
 import org.apache.hc.core5.util.TimeValue;
@@ -52,6 +54,7 @@ class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
     private final Logger log;
     private final AsyncClientConnectionManager manager;
     private final ConnectionInitiator connectionInitiator;
+    private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
     private final HttpVersionPolicy versionPolicy;
     private final AtomicReference<AsyncConnectionEndpoint> endpointRef;
     private volatile boolean reusable;
@@ -62,11 +65,13 @@ class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
             final Logger log,
             final AsyncClientConnectionManager manager,
             final ConnectionInitiator connectionInitiator,
+            final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
             final HttpVersionPolicy versionPolicy) {
         super();
         this.log = log;
         this.manager = manager;
         this.connectionInitiator = connectionInitiator;
+        this.pushHandlerFactory = pushHandlerFactory;
         this.versionPolicy = versionPolicy;
         this.endpointRef = new AtomicReference<>(null);
         this.validDuration = TimeValue.NEG_ONE_MILLISECONDS;
@@ -252,7 +257,7 @@ class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
                         log.debug(ConnPoolSupport.getId(endpoint) + ": executing " + ConnPoolSupport.getId(exchangeHandler));
                     }
                     try {
-                        endpoint.execute(exchangeHandler, context);
+                        endpoint.execute(exchangeHandler, pushHandlerFactory, context);
                     } catch (final RuntimeException ex) {
                         failed(ex);
                     }
@@ -289,7 +294,7 @@ class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
 
     @Override
     public AsyncExecRuntime fork() {
-        return new InternalHttpAsyncExecRuntime(log, manager, connectionInitiator, versionPolicy);
+        return new InternalHttpAsyncExecRuntime(log, manager, connectionInitiator, pushHandlerFactory, versionPolicy);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/9eb00018/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttp2AsyncClient.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttp2AsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttp2AsyncClient.java
index 570ff7a..20b2508 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttp2AsyncClient.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttp2AsyncClient.java
@@ -53,8 +53,10 @@ import org.apache.hc.core5.http.HttpHost;
 import org.apache.hc.core5.http.HttpRequest;
 import org.apache.hc.core5.http.HttpResponse;
 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
 import org.apache.hc.core5.http.nio.CapacityChannel;
 import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.HandlerFactory;
 import org.apache.hc.core5.http.nio.RequestChannel;
 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
@@ -113,6 +115,7 @@ public final class MinimalHttp2AsyncClient extends AbstractMinimalHttpAsyncClien
     @Override
     public Cancellable execute(
             final AsyncClientExchangeHandler exchangeHandler,
+            final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
             final HttpContext context) {
         ensureRunning();
         final ComplexCancellable cancellable = new ComplexCancellable();
@@ -212,11 +215,17 @@ public final class MinimalHttp2AsyncClient extends AbstractMinimalHttpAsyncClien
                                 session.enqueue(
                                         new RequestExecutionCommand(
                                                 new LoggingAsyncClientExchangeHandler(log, exchangeId, internalExchangeHandler),
-                                                null, cancellable, clientContext),
+                                                pushHandlerFactory,
+                                                cancellable,
+                                                clientContext),
                                         Command.Priority.NORMAL);
                             } else {
                                 session.enqueue(
-                                        new RequestExecutionCommand(internalExchangeHandler, null, cancellable, clientContext),
+                                        new RequestExecutionCommand(
+                                                internalExchangeHandler,
+                                                pushHandlerFactory,
+                                                cancellable,
+                                                clientContext),
                                         Command.Priority.NORMAL);
                             }
                         }

http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/9eb00018/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java
index 3f473a3..c5c2927 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java
@@ -216,6 +216,7 @@ public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClient
     @Override
     public Cancellable execute(
             final AsyncClientExchangeHandler exchangeHandler,
+            final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
             final HttpContext context) {
         ensureRunning();
         final ComplexCancellable cancellable = new ComplexCancellable();
@@ -362,7 +363,7 @@ public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClient
                                         }
 
                                     };
-                                    endpoint.execute(internalExchangeHandler, clientContext);
+                                    endpoint.execute(internalExchangeHandler, pushHandlerFactory, clientContext);
                                 }
 
                                 @Override
@@ -420,6 +421,7 @@ public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClient
                 log.debug(ConnPoolSupport.getId(connectionEndpoint) + ": executing message exchange " + exchangeId);
                 connectionEndpoint.execute(
                         new LoggingAsyncClientExchangeHandler(log, exchangeId, exchangeHandler),
+                        pushHandlerFactory,
                         context);
             } else {
                 connectionEndpoint.execute(exchangeHandler, context);

http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/9eb00018/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java
index 1167592..3c28d72 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java
@@ -57,6 +57,8 @@ import org.apache.hc.core5.http.ProtocolVersion;
 import org.apache.hc.core5.http.config.Lookup;
 import org.apache.hc.core5.http.config.RegistryBuilder;
 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
 import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
 import org.apache.hc.core5.http.protocol.HttpContext;
@@ -553,13 +555,16 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
         }
 
         @Override
-        public void execute(final AsyncClientExchangeHandler exchangeHandler, final HttpContext context) {
+        public void execute(
+                final AsyncClientExchangeHandler exchangeHandler,
+                final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
+                final HttpContext context) {
             final ManagedAsyncClientConnection connection = getValidatedPoolEntry().getConnection();
             if (log.isDebugEnabled()) {
                 log.debug(id + ": executing exchange " + ConnPoolSupport.getId(exchangeHandler) +
                         " over " + ConnPoolSupport.getId(connection));
             }
-            connection.submitCommand(new RequestExecutionCommand(exchangeHandler, context));
+            connection.submitCommand(new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, context));
         }
 
     }

http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/9eb00018/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncConnectionEndpoint.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncConnectionEndpoint.java b/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncConnectionEndpoint.java
index 47767aa..692863d 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncConnectionEndpoint.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncConnectionEndpoint.java
@@ -36,8 +36,10 @@ import org.apache.hc.core5.annotation.ThreadingBehavior;
 import org.apache.hc.core5.concurrent.BasicFuture;
 import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
 import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
 import org.apache.hc.core5.http.protocol.HttpContext;
 import org.apache.hc.core5.http.protocol.HttpCoreContext;
@@ -50,11 +52,21 @@ import org.apache.hc.core5.http.protocol.HttpCoreContext;
 @Contract(threading = ThreadingBehavior.SAFE)
 public abstract class AsyncConnectionEndpoint implements Closeable {
 
-    public abstract void execute(AsyncClientExchangeHandler exchangeHandler, HttpContext context);
+    public abstract void execute(
+            AsyncClientExchangeHandler exchangeHandler,
+            HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
+            HttpContext context);
+
+    public void execute(
+            final AsyncClientExchangeHandler exchangeHandler,
+            final HttpContext context) {
+        execute(exchangeHandler, null, context);
+    }
 
     public <T> Future<T> execute(
             final AsyncRequestProducer requestProducer,
             final AsyncResponseConsumer<T> responseConsumer,
+            final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
             final HttpContext context,
             final FutureCallback<T> callback) {
         final BasicFuture<T> future = new BasicFuture<>(callback);
@@ -77,6 +89,7 @@ public abstract class AsyncConnectionEndpoint implements Closeable {
                             }
 
                         }),
+                pushHandlerFactory,
                 context != null ? context : HttpCoreContext.create());
         return future;
     }
@@ -84,8 +97,24 @@ public abstract class AsyncConnectionEndpoint implements Closeable {
     public <T> Future<T> execute(
             final AsyncRequestProducer requestProducer,
             final AsyncResponseConsumer<T> responseConsumer,
+            final HttpContext context,
+            final FutureCallback<T> callback) {
+        return execute(requestProducer, responseConsumer, null, context, callback);
+    }
+
+    public <T> Future<T> execute(
+            final AsyncRequestProducer requestProducer,
+            final AsyncResponseConsumer<T> responseConsumer,
+            final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
+            final FutureCallback<T> callback) {
+        return execute(requestProducer, responseConsumer, pushHandlerFactory, null, callback);
+    }
+
+    public <T> Future<T> execute(
+            final AsyncRequestProducer requestProducer,
+            final AsyncResponseConsumer<T> responseConsumer,
             final FutureCallback<T> callback) {
-        return execute(requestProducer, responseConsumer, null, callback);
+        return execute(requestProducer, responseConsumer, null, null, callback);
     }
 
     public abstract boolean isConnected();