You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2018/08/04 09:45:47 UTC

httpcomponents-core git commit: Request specific push consumers

Repository: httpcomponents-core
Updated Branches:
  refs/heads/master 331f8b708 -> 36ef4aa74


Request specific push consumers


Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/36ef4aa7
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/36ef4aa7
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/36ef4aa7

Branch: refs/heads/master
Commit: 36ef4aa747e6a9a0b00cc93eb3c5482f62e6ba8b
Parents: 331f8b7
Author: Oleg Kalnichevski <ol...@apache.org>
Authored: Tue Jul 31 16:19:43 2018 +0200
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Sat Aug 4 11:42:51 2018 +0200

----------------------------------------------------------------------
 .../nio/AbstractHttp2StreamMultiplexer.java     | 13 ++++--
 .../impl/nio/ClientHttp2StreamHandler.java      | 10 ++++
 .../impl/nio/ClientHttp2StreamMultiplexer.java  | 12 +++--
 .../impl/nio/ClientPushHttp2StreamHandler.java  |  5 ++
 .../http2/impl/nio/Http2StreamHandler.java      |  4 ++
 .../impl/nio/ServerHttp2StreamHandler.java      |  6 +++
 .../impl/nio/ServerHttp2StreamMultiplexer.java  |  4 +-
 .../impl/nio/ServerPushHttp2StreamHandler.java  |  7 +++
 .../bootstrap/Http2MultiplexingRequester.java   | 29 ++++++++++--
 .../testing/nio/ClientSessionEndpoint.java      | 24 ++++++++--
 .../core5/testing/nio/Http2IntegrationTest.java | 49 +++++++++++---------
 .../http/impl/bootstrap/HttpAsyncRequester.java | 33 +++++++++++--
 .../hc/core5/http/nio/AsyncClientEndpoint.java  | 36 ++++++++++++--
 .../nio/command/RequestExecutionCommand.java    | 18 ++++++-
 14 files changed, 204 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/36ef4aa7/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
index 5881f21..572ff8c 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
@@ -60,7 +60,9 @@ import org.apache.hc.core5.http.config.CharCodingConfig;
 import org.apache.hc.core5.http.impl.BasicEndpointDetails;
 import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
 import org.apache.hc.core5.http.impl.CharCodingSupport;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
 import org.apache.hc.core5.http.nio.AsyncPushProducer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
 import org.apache.hc.core5.http.nio.command.ExecutableCommand;
 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
 import org.apache.hc.core5.http.protocol.HttpCoreContext;
@@ -180,7 +182,8 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
     abstract Http2StreamHandler createRemotelyInitiatedStream(
             Http2StreamChannel channel,
             HttpProcessor httpProcessor,
-            BasicHttpConnectionMetrics connMetrics) throws IOException;
+            BasicHttpConnectionMetrics connMetrics,
+            HandlerFactory<AsyncPushConsumer> pushHandlerFactory) throws IOException;
 
     abstract Http2StreamHandler createLocallyInitiatedStream(
             ExecutableCommand command,
@@ -747,7 +750,7 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
                             localConfig.getInitialWindowSize(),
                             remoteConfig.getInitialWindowSize());
                     final Http2StreamHandler streamHandler = createRemotelyInitiatedStream(
-                            channel, httpProcessor, connMetrics);
+                            channel, httpProcessor, connMetrics, null);
                     stream = new Http2Stream(channel, streamHandler, true);
                     if (stream.isOutputReady()) {
                         stream.produceOutput();
@@ -925,7 +928,7 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
                         localConfig.getInitialWindowSize(),
                         remoteConfig.getInitialWindowSize());
                 final Http2StreamHandler streamHandler = createRemotelyInitiatedStream(
-                        channel, httpProcessor, connMetrics);
+                        channel, httpProcessor, connMetrics, stream.getPushHandlerFactory());
                 final Http2Stream promisedStream = new Http2Stream(channel, streamHandler, true);
                 streamMap.put(promisedStreamId, promisedStream);
 
@@ -1576,6 +1579,10 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
             localReset(ex, ex.getCode());
         }
 
+        HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
+            return handler.getPushHandlerFactory();
+        }
+
         void cancel() {
             reset(new CancellationException("HTTP/2 message exchange cancelled"));
         }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/36ef4aa7/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamHandler.java
index c442589..0002ad7 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamHandler.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamHandler.java
@@ -45,7 +45,9 @@ import org.apache.hc.core5.http.impl.IncomingEntityDetails;
 import org.apache.hc.core5.http.impl.nio.MessageState;
 import org.apache.hc.core5.http.message.StatusLine;
 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
 import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.HandlerFactory;
 import org.apache.hc.core5.http.nio.RequestChannel;
 import org.apache.hc.core5.http.protocol.HttpContext;
 import org.apache.hc.core5.http.protocol.HttpCoreContext;
@@ -62,6 +64,7 @@ class ClientHttp2StreamHandler implements Http2StreamHandler {
     private final HttpProcessor httpProcessor;
     private final BasicHttpConnectionMetrics connMetrics;
     private final AsyncClientExchangeHandler exchangeHandler;
+    private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
     private final HttpCoreContext context;
     private final AtomicBoolean requestCommitted;
     private final AtomicBoolean failed;
@@ -75,6 +78,7 @@ class ClientHttp2StreamHandler implements Http2StreamHandler {
             final HttpProcessor httpProcessor,
             final BasicHttpConnectionMetrics connMetrics,
             final AsyncClientExchangeHandler exchangeHandler,
+            final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
             final HttpCoreContext context) {
         this.outputChannel = outputChannel;
         this.dataChannel = new DataStreamChannel() {
@@ -105,6 +109,7 @@ class ClientHttp2StreamHandler implements Http2StreamHandler {
         this.httpProcessor = httpProcessor;
         this.connMetrics = connMetrics;
         this.exchangeHandler = exchangeHandler;
+        this.pushHandlerFactory = pushHandlerFactory;
         this.context = context;
         this.requestCommitted = new AtomicBoolean(false);
         this.failed = new AtomicBoolean(false);
@@ -114,6 +119,11 @@ class ClientHttp2StreamHandler implements Http2StreamHandler {
     }
 
     @Override
+    public HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
+        return pushHandlerFactory;
+    }
+
+    @Override
     public boolean isOutputReady() {
         switch (requestState) {
             case HEADERS:

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/36ef4aa7/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamMultiplexer.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamMultiplexer.java
index c1bdb8f..6a46b96 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamMultiplexer.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamMultiplexer.java
@@ -107,10 +107,13 @@ public class ClientHttp2StreamMultiplexer extends AbstractHttp2StreamMultiplexer
         if (command instanceof RequestExecutionCommand) {
             final RequestExecutionCommand executionCommand = (RequestExecutionCommand) command;
             final AsyncClientExchangeHandler exchangeHandler = executionCommand.getExchangeHandler();
+            final HandlerFactory<AsyncPushConsumer> pushHandlerFactory = executionCommand.getPushHandlerFactory();
             final HttpCoreContext context = HttpCoreContext.adapt(executionCommand.getContext());
             context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
             context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
-            return new ClientHttp2StreamHandler(channel, httpProcessor, connMetrics, exchangeHandler, context);
+            return new ClientHttp2StreamHandler(channel, httpProcessor, connMetrics, exchangeHandler,
+                    pushHandlerFactory != null ? pushHandlerFactory : this.pushHandlerFactory,
+                    context);
         } else {
             throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Unexpected executable command");
         }
@@ -120,11 +123,14 @@ public class ClientHttp2StreamMultiplexer extends AbstractHttp2StreamMultiplexer
     Http2StreamHandler createRemotelyInitiatedStream(
             final Http2StreamChannel channel,
             final HttpProcessor httpProcessor,
-            final BasicHttpConnectionMetrics connMetrics) throws IOException {
+            final BasicHttpConnectionMetrics connMetrics,
+            final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) throws IOException {
         final HttpCoreContext context = HttpCoreContext.create();
         context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
         context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
-        return new ClientPushHttp2StreamHandler(channel, httpProcessor, connMetrics, pushHandlerFactory, context);
+        return new ClientPushHttp2StreamHandler(channel, httpProcessor, connMetrics,
+                pushHandlerFactory != null ? pushHandlerFactory : this.pushHandlerFactory,
+                context);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/36ef4aa7/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientPushHttp2StreamHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientPushHttp2StreamHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientPushHttp2StreamHandler.java
index dfbf816..4cdb523 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientPushHttp2StreamHandler.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientPushHttp2StreamHandler.java
@@ -85,6 +85,11 @@ class ClientPushHttp2StreamHandler implements Http2StreamHandler {
     }
 
     @Override
+    public HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
+        return pushHandlerFactory;
+    }
+
+    @Override
     public boolean isOutputReady() {
         return false;
     }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/36ef4aa7/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamHandler.java
index 589728c..b8e3e1f 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamHandler.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamHandler.java
@@ -32,6 +32,8 @@ import java.util.List;
 
 import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
 import org.apache.hc.core5.http.nio.ResourceHolder;
 
 interface Http2StreamHandler extends ResourceHolder {
@@ -48,6 +50,8 @@ interface Http2StreamHandler extends ResourceHolder {
 
     void consumeData(ByteBuffer src, boolean endStream) throws HttpException, IOException;
 
+    HandlerFactory<AsyncPushConsumer> getPushHandlerFactory();
+
     void failed(Exception cause);
 
 }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/36ef4aa7/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamHandler.java
index 1a866a7..0cc43ed 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamHandler.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamHandler.java
@@ -43,6 +43,7 @@ import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
 import org.apache.hc.core5.http.impl.IncomingEntityDetails;
 import org.apache.hc.core5.http.impl.nio.MessageState;
 import org.apache.hc.core5.http.impl.nio.ServerSupport;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
 import org.apache.hc.core5.http.nio.AsyncPushProducer;
 import org.apache.hc.core5.http.nio.AsyncResponseProducer;
 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
@@ -120,6 +121,11 @@ public class ServerHttp2StreamHandler implements Http2StreamHandler {
         this.responseState = MessageState.IDLE;
     }
 
+    @Override
+    public HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
+        return null;
+    }
+
     private void commitInformation(final HttpResponse response) throws IOException, HttpException {
         if (responseCommitted.get()) {
             throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Response already committed");

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/36ef4aa7/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamMultiplexer.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamMultiplexer.java
index 3b9f004..fc94370 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamMultiplexer.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamMultiplexer.java
@@ -30,6 +30,7 @@ import java.io.IOException;
 
 import org.apache.hc.core5.http.config.CharCodingConfig;
 import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
 import org.apache.hc.core5.http.nio.HandlerFactory;
 import org.apache.hc.core5.http.nio.command.ExecutableCommand;
@@ -92,7 +93,8 @@ public class ServerHttp2StreamMultiplexer extends AbstractHttp2StreamMultiplexer
     Http2StreamHandler createRemotelyInitiatedStream(
             final Http2StreamChannel channel,
             final HttpProcessor httpProcessor,
-            final BasicHttpConnectionMetrics connMetrics) throws IOException {
+            final BasicHttpConnectionMetrics connMetrics,
+            final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) throws IOException {
         final HttpCoreContext context = HttpCoreContext.create();
         context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
         context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/36ef4aa7/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerPushHttp2StreamHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerPushHttp2StreamHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerPushHttp2StreamHandler.java
index af0afe6..9af9c34 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerPushHttp2StreamHandler.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerPushHttp2StreamHandler.java
@@ -41,8 +41,10 @@ import org.apache.hc.core5.http.HttpVersion;
 import org.apache.hc.core5.http.ProtocolException;
 import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
 import org.apache.hc.core5.http.impl.nio.MessageState;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
 import org.apache.hc.core5.http.nio.AsyncPushProducer;
 import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.HandlerFactory;
 import org.apache.hc.core5.http.nio.ResponseChannel;
 import org.apache.hc.core5.http.protocol.HttpContext;
 import org.apache.hc.core5.http.protocol.HttpCoreContext;
@@ -111,6 +113,11 @@ class ServerPushHttp2StreamHandler implements Http2StreamHandler {
     }
 
     @Override
+    public HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
+        return null;
+    }
+
+    @Override
     public void consumePromise(final List<Header> headers) throws HttpException, IOException {
         throw new ProtocolException("Unexpected message promise");
     }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/36ef4aa7/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
index 1cc35af..dd0f669 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
@@ -51,10 +51,12 @@ import org.apache.hc.core5.http.ProtocolException;
 import org.apache.hc.core5.http.impl.DefaultAddressResolver;
 import org.apache.hc.core5.http.impl.bootstrap.AsyncRequester;
 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
 import org.apache.hc.core5.http.nio.CapacityChannel;
 import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.HandlerFactory;
 import org.apache.hc.core5.http.nio.RequestChannel;
 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
@@ -117,18 +119,27 @@ public class Http2MultiplexingRequester extends AsyncRequester{
 
     public Cancellable execute(
             final AsyncClientExchangeHandler exchangeHandler,
+            final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
             final Timeout timeout,
             final HttpContext context) {
         Args.notNull(exchangeHandler, "Exchange handler");
         Args.notNull(timeout, "Timeout");
         Args.notNull(context, "Context");
         final CancellableExecution cancellableExecution = new CancellableExecution();
-        execute(exchangeHandler, cancellableExecution, timeout, context);
+        execute(exchangeHandler, pushHandlerFactory, cancellableExecution, timeout, context);
         return cancellableExecution;
     }
 
+    public Cancellable execute(
+            final AsyncClientExchangeHandler exchangeHandler,
+            final Timeout timeout,
+            final HttpContext context) {
+        return execute(exchangeHandler, null, timeout, context);
+    }
+
     private void execute(
             final AsyncClientExchangeHandler exchangeHandler,
+            final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
             final CancellableDependency cancellableDependency,
             final Timeout timeout,
             final HttpContext context) {
@@ -210,7 +221,7 @@ public class Http2MultiplexingRequester extends AsyncRequester{
                                     exchangeHandler.failed(cause);
                                 }
 
-                            }, cancellableDependency, context), Command.Priority.NORMAL);
+                            }, pushHandlerFactory, cancellableDependency, context), Command.Priority.NORMAL);
                         }
 
                         @Override
@@ -236,6 +247,7 @@ public class Http2MultiplexingRequester extends AsyncRequester{
     public final <T> Future<T> execute(
             final AsyncRequestProducer requestProducer,
             final AsyncResponseConsumer<T> responseConsumer,
+            final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
             final Timeout timeout,
             final HttpContext context,
             final FutureCallback<T> callback) {
@@ -261,7 +273,7 @@ public class Http2MultiplexingRequester extends AsyncRequester{
             }
 
         });
-        execute(exchangeHandler, future, timeout, context != null ? context : HttpCoreContext.create());
+        execute(exchangeHandler, pushHandlerFactory, future, timeout, context != null ? context : HttpCoreContext.create());
         return future;
     }
 
@@ -269,8 +281,17 @@ public class Http2MultiplexingRequester extends AsyncRequester{
             final AsyncRequestProducer requestProducer,
             final AsyncResponseConsumer<T> responseConsumer,
             final Timeout timeout,
+            final HttpContext context,
+            final FutureCallback<T> callback) {
+        return execute(requestProducer, responseConsumer, null, timeout, context, callback);
+    }
+
+    public final <T> Future<T> execute(
+            final AsyncRequestProducer requestProducer,
+            final AsyncResponseConsumer<T> responseConsumer,
+            final Timeout timeout,
             final FutureCallback<T> callback) {
-        return execute(requestProducer, responseConsumer, timeout, null, callback);
+        return execute(requestProducer, responseConsumer, null, timeout, null, callback);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/36ef4aa7/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ClientSessionEndpoint.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ClientSessionEndpoint.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ClientSessionEndpoint.java
index f48e45a..184fc34 100644
--- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ClientSessionEndpoint.java
+++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ClientSessionEndpoint.java
@@ -36,8 +36,10 @@ import org.apache.hc.core5.annotation.ThreadingBehavior;
 import org.apache.hc.core5.concurrent.BasicFuture;
 import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
 import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
@@ -74,15 +76,23 @@ public final class ClientSessionEndpoint implements ModalCloseable {
 
     public void execute(
             final AsyncClientExchangeHandler exchangeHandler,
+            final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
             final HttpContext context) {
         Asserts.check(!closed.get(), "Connection is already closed");
-        final Command executionCommand = new RequestExecutionCommand(exchangeHandler, context);
+        final Command executionCommand = new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, null, context);
         execute(executionCommand, Command.Priority.NORMAL);
     }
 
+    public void execute(
+            final AsyncClientExchangeHandler exchangeHandler,
+            final HttpContext context) {
+        execute(exchangeHandler, null, context);
+    }
+
     public <T> Future<T> execute(
             final AsyncRequestProducer requestProducer,
             final AsyncResponseConsumer<T> responseConsumer,
+            final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
             final HttpContext context,
             final FutureCallback<T> callback) {
         Asserts.check(!closed.get(), "Connection is already closed");
@@ -106,15 +116,23 @@ public final class ClientSessionEndpoint implements ModalCloseable {
                     }
 
                 }),
-                context);
+                pushHandlerFactory, context);
         return future;
     }
 
     public <T> Future<T> execute(
             final AsyncRequestProducer requestProducer,
             final AsyncResponseConsumer<T> responseConsumer,
+            final HttpContext context,
+            final FutureCallback<T> callback) {
+        return execute(requestProducer, responseConsumer, null, context, callback);
+    }
+
+    public <T> Future<T> execute(
+            final AsyncRequestProducer requestProducer,
+            final AsyncResponseConsumer<T> responseConsumer,
             final FutureCallback<T> callback) {
-        return execute(requestProducer, responseConsumer, null, callback);
+        return execute(requestProducer, responseConsumer, null, null, callback);
     }
 
     public boolean isOpen() {

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/36ef4aa7/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java
index 89662ca..3af925e 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java
@@ -91,6 +91,7 @@ import org.apache.hc.core5.http.nio.BasicResponseConsumer;
 import org.apache.hc.core5.http.nio.BasicResponseProducer;
 import org.apache.hc.core5.http.nio.CapacityChannel;
 import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.HandlerFactory;
 import org.apache.hc.core5.http.nio.ResponseChannel;
 import org.apache.hc.core5.http.nio.entity.AbstractClassicEntityConsumer;
 import org.apache.hc.core5.http.nio.entity.AbstractClassicEntityProducer;
@@ -597,28 +598,6 @@ public class Http2IntegrationTest extends InternalHttp2ServerTestBase {
         client.start(H2Config.custom().setPushEnabled(true).build());
 
         final BlockingQueue<Message<HttpResponse, String>> pushMessageQueue = new LinkedBlockingDeque<>();
-        client.register("*", new Supplier<AsyncPushConsumer>() {
-
-            @Override
-            public AsyncPushConsumer get() {
-                return new AbstractAsyncPushHandler<Message<HttpResponse, String>>(new BasicResponseConsumer<>(new StringAsyncEntityConsumer())) {
-
-                    @Override
-                    protected void handleResponse(
-                            final HttpRequest promise,
-                            final Message<HttpResponse, String> responseMessage) throws IOException, HttpException {
-                        try {
-                            pushMessageQueue.put(responseMessage);
-                        } catch (final InterruptedException ex) {
-                            Thread.currentThread().interrupt();
-                            throw new InterruptedIOException(ex.getMessage());
-                        }
-                    }
-
-                };
-            }
-
-        });
 
         final Future<ClientSessionEndpoint> connectFuture = client.connect(
                 "localhost", serverEndpoint.getPort(), TIMEOUT);
@@ -626,7 +605,31 @@ public class Http2IntegrationTest extends InternalHttp2ServerTestBase {
 
         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
                 new BasicRequestProducer("GET", createRequestURI(serverEndpoint, "/hello")),
-                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
+                new HandlerFactory<AsyncPushConsumer>() {
+
+                    @Override
+                    public AsyncPushConsumer create(
+                            final HttpRequest request, final HttpContext context) throws HttpException {
+                        return new AbstractAsyncPushHandler<Message<HttpResponse, String>>(new BasicResponseConsumer<>(new StringAsyncEntityConsumer())) {
+
+                            @Override
+                            protected void handleResponse(
+                                    final HttpRequest promise,
+                                    final Message<HttpResponse, String> responseMessage) throws IOException, HttpException {
+                                try {
+                                    pushMessageQueue.put(responseMessage);
+                                } catch (final InterruptedException ex) {
+                                    Thread.currentThread().interrupt();
+                                    throw new InterruptedIOException(ex.getMessage());
+                                }
+                            }
+
+                        };
+                    }
+                },
+                null,
+                null);
         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
         Assert.assertNotNull(result1);
         final HttpResponse response1 = result1.getHead();

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/36ef4aa7/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
index 0fff5cd..b855e6f 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
@@ -50,10 +50,12 @@ import org.apache.hc.core5.http.URIScheme;
 import org.apache.hc.core5.http.impl.DefaultAddressResolver;
 import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
 import org.apache.hc.core5.http.nio.CapacityChannel;
 import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.HandlerFactory;
 import org.apache.hc.core5.http.nio.RequestChannel;
 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
@@ -251,6 +253,7 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
 
     public void execute(
             final AsyncClientExchangeHandler exchangeHandler,
+            final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
             final Timeout timeout,
             final HttpContext executeContext) {
         Args.notNull(exchangeHandler, "Exchange handler");
@@ -338,7 +341,7 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
                                     exchangeHandler.streamEnd(trailers);
                                 }
 
-                            }, executeContext);
+                            }, pushHandlerFactory, executeContext);
 
                         }
 
@@ -363,9 +366,17 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
         }
     }
 
+    public void execute(
+            final AsyncClientExchangeHandler exchangeHandler,
+            final Timeout timeout,
+            final HttpContext executeContext) {
+        execute(exchangeHandler, null, timeout, executeContext);
+    }
+
     public final <T> Future<T> execute(
             final AsyncRequestProducer requestProducer,
             final AsyncResponseConsumer<T> responseConsumer,
+            final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
             final Timeout timeout,
             final HttpContext context,
             final FutureCallback<T> callback) {
@@ -391,7 +402,7 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
             }
 
         });
-        execute(exchangeHandler, timeout, context != null ? context : HttpCoreContext.create());
+        execute(exchangeHandler, pushHandlerFactory, timeout, context != null ? context : HttpCoreContext.create());
         return future;
     }
 
@@ -399,8 +410,17 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
             final AsyncRequestProducer requestProducer,
             final AsyncResponseConsumer<T> responseConsumer,
             final Timeout timeout,
+            final HttpContext context,
+            final FutureCallback<T> callback) {
+        return execute(requestProducer, responseConsumer, null, timeout, context, callback);
+    }
+
+    public final <T> Future<T> execute(
+            final AsyncRequestProducer requestProducer,
+            final AsyncResponseConsumer<T> responseConsumer,
+            final Timeout timeout,
             final FutureCallback<T> callback) {
-        return execute(requestProducer, responseConsumer, timeout, null, callback);
+        return execute(requestProducer, responseConsumer, null, timeout, null, callback);
     }
 
     private class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
@@ -412,7 +432,10 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
         }
 
         @Override
-        public void execute(final AsyncClientExchangeHandler exchangeHandler, final HttpContext context) {
+        public void execute(
+                final AsyncClientExchangeHandler exchangeHandler,
+                final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
+                final HttpContext context) {
             final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.get();
             if (poolEntry == null) {
                 throw new IllegalStateException("Endpoint has already been released");
@@ -421,7 +444,7 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
             if (ioSession == null) {
                 throw new IllegalStateException("I/O session is invalid");
             }
-            ioSession.enqueue(new RequestExecutionCommand(exchangeHandler, context), Command.Priority.NORMAL);
+            ioSession.enqueue(new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, null, context), Command.Priority.NORMAL);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/36ef4aa7/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncClientEndpoint.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncClientEndpoint.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncClientEndpoint.java
index a23e55a..322a73f 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncClientEndpoint.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncClientEndpoint.java
@@ -54,7 +54,22 @@ public abstract class AsyncClientEndpoint {
      * Once the endpoint is no longer needed it MUST be released with {@link #releaseAndReuse()}
      * or {@link #releaseAndDiscard()}.
      */
-    public abstract void execute(AsyncClientExchangeHandler exchangeHandler, HttpContext context);
+    public abstract void execute(
+            AsyncClientExchangeHandler exchangeHandler,
+            HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
+            HttpContext context);
+
+    /**
+     * Initiates a message exchange using the given handler.
+     * <p>
+     * Once the endpoint is no longer needed it MUST be released with {@link #releaseAndReuse()}
+     * or {@link #releaseAndDiscard()}.
+     */
+    public void execute(
+            final AsyncClientExchangeHandler exchangeHandler,
+            final HttpContext context) {
+        execute(exchangeHandler, null, context);
+    }
 
     /**
      * Releases the underlying connection back to the connection pool as re-usable.
@@ -75,6 +90,7 @@ public abstract class AsyncClientEndpoint {
     public final <T> Future<T> execute(
             final AsyncRequestProducer requestProducer,
             final AsyncResponseConsumer<T> responseConsumer,
+            final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
             final HttpContext context,
             final FutureCallback<T> callback) {
         final BasicFuture<T> future = new BasicFuture<>(callback);
@@ -97,11 +113,25 @@ public abstract class AsyncClientEndpoint {
                             }
 
                         }),
-                context != null ? context : HttpCoreContext.create());
+                pushHandlerFactory, context != null ? context : HttpCoreContext.create());
         return future;
     }
 
     /**
+     * Initiates message exchange using the given request producer and response consumer.
+     * <p>
+     * Once the endpoint is no longer needed it MUST be released with {@link #releaseAndReuse()}
+     * or {@link #releaseAndDiscard()}.
+     */
+    public final <T> Future<T> execute(
+            final AsyncRequestProducer requestProducer,
+            final AsyncResponseConsumer<T> responseConsumer,
+            final HttpContext context,
+            final FutureCallback<T> callback) {
+        return execute(requestProducer, responseConsumer, null, context, callback);
+    }
+
+    /**
      * Initiates a message exchange using the given request producer and response consumer.
      * <p>
      * Once the endpoint is no longer needed it MUST be released with {@link #releaseAndReuse()}
@@ -111,7 +141,7 @@ public abstract class AsyncClientEndpoint {
             final AsyncRequestProducer requestProducer,
             final AsyncResponseConsumer<T> responseConsumer,
             final FutureCallback<T> callback) {
-        return execute(requestProducer, responseConsumer, null, callback);
+        return execute(requestProducer, responseConsumer, null, null, callback);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/36ef4aa7/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/RequestExecutionCommand.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/RequestExecutionCommand.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/RequestExecutionCommand.java
index 0b83aef..98f46f2 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/RequestExecutionCommand.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/RequestExecutionCommand.java
@@ -29,6 +29,8 @@ package org.apache.hc.core5.http.nio.command;
 
 import org.apache.hc.core5.concurrent.CancellableDependency;
 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
 import org.apache.hc.core5.http.protocol.HttpContext;
 import org.apache.hc.core5.util.Args;
 
@@ -40,28 +42,42 @@ import org.apache.hc.core5.util.Args;
 public final class RequestExecutionCommand extends ExecutableCommand {
 
     private final AsyncClientExchangeHandler exchangeHandler;
+    private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
     private final CancellableDependency cancellableDependency;
     private final HttpContext context;
 
     public RequestExecutionCommand(
             final AsyncClientExchangeHandler exchangeHandler,
+            final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
             final CancellableDependency cancellableDependency,
             final HttpContext context) {
         this.exchangeHandler = Args.notNull(exchangeHandler, "Handler");
+        this.pushHandlerFactory = pushHandlerFactory;
         this.cancellableDependency = cancellableDependency;
         this.context = context;
     }
 
     public RequestExecutionCommand(
             final AsyncClientExchangeHandler exchangeHandler,
+            final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
             final HttpContext context) {
-        this(exchangeHandler, null, context);
+        this(exchangeHandler, pushHandlerFactory, null, context);
+    }
+
+    public RequestExecutionCommand(
+            final AsyncClientExchangeHandler exchangeHandler,
+            final HttpContext context) {
+        this(exchangeHandler, null, null, context);
     }
 
     public AsyncClientExchangeHandler getExchangeHandler() {
         return exchangeHandler;
     }
 
+    public HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
+        return pushHandlerFactory;
+    }
+
     @Override
     public CancellableDependency getCancellableDependency() {
         return cancellableDependency;