You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2018/09/20 15:32:19 UTC
httpcomponents-client git commit: Async clients to support request
specific push consumers [Forced Update!]
Repository: httpcomponents-client
Updated Branches:
refs/heads/master f30fa6d54 -> 9eb00018c (forced update)
Async clients to support request specific push consumers
Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-client/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-client/commit/9eb00018
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-client/tree/9eb00018
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-client/diff/9eb00018
Branch: refs/heads/master
Commit: 9eb00018cefc94ce405a54ff48bb4bbd2d2d90aa
Parents: 7f3539c
Author: Oleg Kalnichevski <ol...@apache.org>
Authored: Thu Sep 20 17:16:06 2018 +0200
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Thu Sep 20 17:16:06 2018 +0200
----------------------------------------------------------------------
.../hc/client5/http/async/HttpAsyncClient.java | 19 +++--------
.../AbstractMinimalHttpAsyncClientBase.java | 12 +++++--
.../impl/async/CloseableHttpAsyncClient.java | 16 ++++++++--
.../async/InternalAbstractHttpAsyncClient.java | 7 +++--
.../impl/async/InternalHttp2AsyncClient.java | 6 ++--
.../async/InternalHttp2AsyncExecRuntime.java | 15 ++++++---
.../impl/async/InternalHttpAsyncClient.java | 6 ++--
.../async/InternalHttpAsyncExecRuntime.java | 9 ++++--
.../impl/async/MinimalHttp2AsyncClient.java | 13 ++++++--
.../http/impl/async/MinimalHttpAsyncClient.java | 4 ++-
.../PoolingAsyncClientConnectionManager.java | 9 ++++--
.../http/nio/AsyncConnectionEndpoint.java | 33 ++++++++++++++++++--
12 files changed, 110 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/9eb00018/httpclient5/src/main/java/org/apache/hc/client5/http/async/HttpAsyncClient.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/async/HttpAsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/async/HttpAsyncClient.java
index 1413968..3e1c684 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/async/HttpAsyncClient.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/async/HttpAsyncClient.java
@@ -29,10 +29,10 @@ package org.apache.hc.client5.http.async;
import java.util.concurrent.Future;
import org.apache.hc.core5.concurrent.FutureCallback;
-import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.protocol.HttpContext;
/**
@@ -59,25 +59,16 @@ public interface HttpAsyncClient {
* @param <T> the result type of request execution.
* @param requestProducer request producer callback.
* @param responseConsumer response consumer callback.
- * @param context HTTP context
- * @param callback future callback.
+ * @param pushHandlerFactory the push handler factory. Optional and may be {@code null}.
+ * @param context HTTP context. Optional and may be {@code null}.
+ * @param callback future callback. Optional and may be {@code null}.
* @return future representing pending completion of the operation.
*/
<T> Future<T> execute(
AsyncRequestProducer requestProducer,
AsyncResponseConsumer<T> responseConsumer,
+ HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
HttpContext context,
FutureCallback<T> callback);
- /**
- * Registers {@link AsyncPushConsumer} for the given host and the URI pattern.
- *
- * @param hostname the name of the host this consumer intended for.
- * Can be {@code null} if applies to all hosts
- * @param uriPattern URI request pattern
- * @param supplier supplier that will be used to supply a consumer instance
- * for the given combination of hostname and URI pattern.
- */
- void register(String hostname, String uriPattern, Supplier<AsyncPushConsumer> supplier);
-
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/9eb00018/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AbstractMinimalHttpAsyncClientBase.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AbstractMinimalHttpAsyncClientBase.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AbstractMinimalHttpAsyncClientBase.java
index 3c0505f..0ff5459 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AbstractMinimalHttpAsyncClientBase.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AbstractMinimalHttpAsyncClientBase.java
@@ -34,8 +34,10 @@ import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.concurrent.ComplexFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
@@ -53,6 +55,7 @@ abstract class AbstractMinimalHttpAsyncClientBase extends AbstractHttpAsyncClien
public final <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
+ final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context,
final FutureCallback<T> callback) {
final ComplexFuture<T> future = new ComplexFuture<>(callback);
@@ -76,14 +79,17 @@ abstract class AbstractMinimalHttpAsyncClientBase extends AbstractHttpAsyncClien
future.cancel();
}
- })));
+ }), pushHandlerFactory, context));
return future;
}
public final Cancellable execute(final AsyncClientExchangeHandler exchangeHandler) {
- return execute(exchangeHandler, HttpClientContext.create());
+ return execute(exchangeHandler, null, HttpClientContext.create());
}
- public abstract Cancellable execute(AsyncClientExchangeHandler exchangeHandler, HttpContext context);
+ public abstract Cancellable execute(
+ AsyncClientExchangeHandler exchangeHandler,
+ HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
+ HttpContext context);
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/9eb00018/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/CloseableHttpAsyncClient.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/CloseableHttpAsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/CloseableHttpAsyncClient.java
index 2e82ee1..65dbe51 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/CloseableHttpAsyncClient.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/CloseableHttpAsyncClient.java
@@ -74,12 +74,16 @@ public abstract class CloseableHttpAsyncClient implements HttpAsyncClient, Close
public final <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
+ final HttpContext context,
final FutureCallback<T> callback) {
- return execute(requestProducer, responseConsumer, HttpClientContext.create(), callback);
+ return execute(requestProducer, responseConsumer, null, context, callback);
}
- public final void register(final String uriPattern, final Supplier<AsyncPushConsumer> supplier) {
- register(null, uriPattern, supplier);
+ public final <T> Future<T> execute(
+ final AsyncRequestProducer requestProducer,
+ final AsyncResponseConsumer<T> responseConsumer,
+ final FutureCallback<T> callback) {
+ return execute(requestProducer, responseConsumer, HttpClientContext.create(), callback);
}
public final Future<SimpleHttpResponse> execute(
@@ -115,4 +119,10 @@ public abstract class CloseableHttpAsyncClient implements HttpAsyncClient, Close
return execute(request, HttpClientContext.create(), callback);
}
+ public abstract void register(String hostname, String uriPattern, Supplier<AsyncPushConsumer> supplier);
+
+ public final void register(final String uriPattern, final Supplier<AsyncPushConsumer> supplier) {
+ register(null, uriPattern, supplier);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/9eb00018/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java
index cf93234..3fdbe55 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java
@@ -57,9 +57,11 @@ import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.config.Lookup;
import org.apache.hc.core5.http.nio.AsyncDataConsumer;
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.RequestChannel;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
@@ -127,7 +129,7 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
}
}
- abstract AsyncExecRuntime crerateAsyncExecRuntime();
+ abstract AsyncExecRuntime crerateAsyncExecRuntime(HandlerFactory<AsyncPushConsumer> pushHandlerFactory);
abstract HttpRoute determineRoute(HttpRequest request, HttpClientContext clientContext) throws HttpException;
@@ -135,6 +137,7 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
public <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
+ final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context,
final FutureCallback<T> callback) {
ensureRunning();
@@ -158,7 +161,7 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
}
final HttpRoute route = determineRoute(request, clientContext);
final String exchangeId = ExecSupport.getNextExchangeId();
- final AsyncExecRuntime execRuntime = crerateAsyncExecRuntime();
+ final AsyncExecRuntime execRuntime = crerateAsyncExecRuntime(pushHandlerFactory);
if (log.isDebugEnabled()) {
log.debug(exchangeId + ": preparing request execution");
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/9eb00018/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncClient.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncClient.java
index 8be84b5..af80542 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncClient.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncClient.java
@@ -43,6 +43,8 @@ import org.apache.hc.client5.http.routing.RoutingSupport;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.config.Lookup;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http2.nio.pool.H2ConnPool;
import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
@@ -71,8 +73,8 @@ class InternalHttp2AsyncClient extends InternalAbstractHttpAsyncClient {
}
@Override
- AsyncExecRuntime crerateAsyncExecRuntime() {
- return new InternalHttp2AsyncExecRuntime(log, connPool);
+ AsyncExecRuntime crerateAsyncExecRuntime(final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
+ return new InternalHttp2AsyncExecRuntime(log, connPool, pushHandlerFactory);
}
@Override
http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/9eb00018/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncExecRuntime.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncExecRuntime.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncExecRuntime.java
index f5f3112..715222d 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncExecRuntime.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncExecRuntime.java
@@ -41,6 +41,8 @@ import org.apache.hc.core5.concurrent.ComplexCancellable;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
import org.apache.hc.core5.http2.nio.pool.H2ConnPool;
import org.apache.hc.core5.io.CloseMode;
@@ -53,13 +55,18 @@ class InternalHttp2AsyncExecRuntime implements AsyncExecRuntime {
private final Logger log;
private final H2ConnPool connPool;
+ private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
private final AtomicReference<Endpoint> sessionRef;
private volatile boolean reusable;
- InternalHttp2AsyncExecRuntime(final Logger log, final H2ConnPool connPool) {
+ InternalHttp2AsyncExecRuntime(
+ final Logger log,
+ final H2ConnPool connPool,
+ final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
super();
this.log = log;
this.connPool = connPool;
+ this.pushHandlerFactory = pushHandlerFactory;
this.sessionRef = new AtomicReference<>(null);
}
@@ -198,7 +205,7 @@ class InternalHttp2AsyncExecRuntime implements AsyncExecRuntime {
log.debug(ConnPoolSupport.getId(endpoint) + ": executing " + ConnPoolSupport.getId(exchangeHandler));
}
session.enqueue(
- new RequestExecutionCommand(exchangeHandler, null, complexCancellable, context),
+ new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
Command.Priority.NORMAL);
} else {
final HttpHost target = endpoint.target;
@@ -213,7 +220,7 @@ class InternalHttp2AsyncExecRuntime implements AsyncExecRuntime {
log.debug(ConnPoolSupport.getId(endpoint) + ": executing " + ConnPoolSupport.getId(exchangeHandler));
}
session.enqueue(
- new RequestExecutionCommand(exchangeHandler, null, complexCancellable, context),
+ new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
Command.Priority.NORMAL);
}
@@ -255,7 +262,7 @@ class InternalHttp2AsyncExecRuntime implements AsyncExecRuntime {
@Override
public AsyncExecRuntime fork() {
- return new InternalHttp2AsyncExecRuntime(log, connPool);
+ return new InternalHttp2AsyncExecRuntime(log, connPool, pushHandlerFactory);
}
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/9eb00018/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java
index eed92f1..b0e46d0 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java
@@ -46,6 +46,8 @@ import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.http.ProtocolVersion;
import org.apache.hc.core5.http.config.Lookup;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http2.HttpVersionPolicy;
import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
@@ -77,8 +79,8 @@ class InternalHttpAsyncClient extends InternalAbstractHttpAsyncClient {
}
@Override
- AsyncExecRuntime crerateAsyncExecRuntime() {
- return new InternalHttpAsyncExecRuntime(log, connmgr, getConnectionInitiator(), versionPolicy);
+ AsyncExecRuntime crerateAsyncExecRuntime(final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
+ return new InternalHttpAsyncExecRuntime(log, connmgr, getConnectionInitiator(), pushHandlerFactory, versionPolicy);
}
@Override
http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/9eb00018/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java
index 6145751..5f513da 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java
@@ -42,6 +42,8 @@ import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http2.HttpVersionPolicy;
import org.apache.hc.core5.reactor.ConnectionInitiator;
import org.apache.hc.core5.util.TimeValue;
@@ -52,6 +54,7 @@ class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
private final Logger log;
private final AsyncClientConnectionManager manager;
private final ConnectionInitiator connectionInitiator;
+ private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
private final HttpVersionPolicy versionPolicy;
private final AtomicReference<AsyncConnectionEndpoint> endpointRef;
private volatile boolean reusable;
@@ -62,11 +65,13 @@ class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
final Logger log,
final AsyncClientConnectionManager manager,
final ConnectionInitiator connectionInitiator,
+ final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpVersionPolicy versionPolicy) {
super();
this.log = log;
this.manager = manager;
this.connectionInitiator = connectionInitiator;
+ this.pushHandlerFactory = pushHandlerFactory;
this.versionPolicy = versionPolicy;
this.endpointRef = new AtomicReference<>(null);
this.validDuration = TimeValue.NEG_ONE_MILLISECONDS;
@@ -252,7 +257,7 @@ class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
log.debug(ConnPoolSupport.getId(endpoint) + ": executing " + ConnPoolSupport.getId(exchangeHandler));
}
try {
- endpoint.execute(exchangeHandler, context);
+ endpoint.execute(exchangeHandler, pushHandlerFactory, context);
} catch (final RuntimeException ex) {
failed(ex);
}
@@ -289,7 +294,7 @@ class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
@Override
public AsyncExecRuntime fork() {
- return new InternalHttpAsyncExecRuntime(log, manager, connectionInitiator, versionPolicy);
+ return new InternalHttpAsyncExecRuntime(log, manager, connectionInitiator, pushHandlerFactory, versionPolicy);
}
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/9eb00018/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttp2AsyncClient.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttp2AsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttp2AsyncClient.java
index 570ff7a..20b2508 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttp2AsyncClient.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttp2AsyncClient.java
@@ -53,8 +53,10 @@ import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.RequestChannel;
import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
import org.apache.hc.core5.http.nio.command.ShutdownCommand;
@@ -113,6 +115,7 @@ public final class MinimalHttp2AsyncClient extends AbstractMinimalHttpAsyncClien
@Override
public Cancellable execute(
final AsyncClientExchangeHandler exchangeHandler,
+ final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context) {
ensureRunning();
final ComplexCancellable cancellable = new ComplexCancellable();
@@ -212,11 +215,17 @@ public final class MinimalHttp2AsyncClient extends AbstractMinimalHttpAsyncClien
session.enqueue(
new RequestExecutionCommand(
new LoggingAsyncClientExchangeHandler(log, exchangeId, internalExchangeHandler),
- null, cancellable, clientContext),
+ pushHandlerFactory,
+ cancellable,
+ clientContext),
Command.Priority.NORMAL);
} else {
session.enqueue(
- new RequestExecutionCommand(internalExchangeHandler, null, cancellable, clientContext),
+ new RequestExecutionCommand(
+ internalExchangeHandler,
+ pushHandlerFactory,
+ cancellable,
+ clientContext),
Command.Priority.NORMAL);
}
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/9eb00018/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java
index 3f473a3..c5c2927 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java
@@ -216,6 +216,7 @@ public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClient
@Override
public Cancellable execute(
final AsyncClientExchangeHandler exchangeHandler,
+ final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context) {
ensureRunning();
final ComplexCancellable cancellable = new ComplexCancellable();
@@ -362,7 +363,7 @@ public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClient
}
};
- endpoint.execute(internalExchangeHandler, clientContext);
+ endpoint.execute(internalExchangeHandler, pushHandlerFactory, clientContext);
}
@Override
@@ -420,6 +421,7 @@ public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClient
log.debug(ConnPoolSupport.getId(connectionEndpoint) + ": executing message exchange " + exchangeId);
connectionEndpoint.execute(
new LoggingAsyncClientExchangeHandler(log, exchangeId, exchangeHandler),
+ pushHandlerFactory,
context);
} else {
connectionEndpoint.execute(exchangeHandler, context);
http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/9eb00018/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java
index 1167592..3c28d72 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java
@@ -57,6 +57,8 @@ import org.apache.hc.core5.http.ProtocolVersion;
import org.apache.hc.core5.http.config.Lookup;
import org.apache.hc.core5.http.config.RegistryBuilder;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
import org.apache.hc.core5.http.protocol.HttpContext;
@@ -553,13 +555,16 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
}
@Override
- public void execute(final AsyncClientExchangeHandler exchangeHandler, final HttpContext context) {
+ public void execute(
+ final AsyncClientExchangeHandler exchangeHandler,
+ final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
+ final HttpContext context) {
final ManagedAsyncClientConnection connection = getValidatedPoolEntry().getConnection();
if (log.isDebugEnabled()) {
log.debug(id + ": executing exchange " + ConnPoolSupport.getId(exchangeHandler) +
" over " + ConnPoolSupport.getId(connection));
}
- connection.submitCommand(new RequestExecutionCommand(exchangeHandler, context));
+ connection.submitCommand(new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, context));
}
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/9eb00018/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncConnectionEndpoint.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncConnectionEndpoint.java b/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncConnectionEndpoint.java
index 47767aa..692863d 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncConnectionEndpoint.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncConnectionEndpoint.java
@@ -36,8 +36,10 @@ import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.concurrent.BasicFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http.protocol.HttpCoreContext;
@@ -50,11 +52,21 @@ import org.apache.hc.core5.http.protocol.HttpCoreContext;
@Contract(threading = ThreadingBehavior.SAFE)
public abstract class AsyncConnectionEndpoint implements Closeable {
- public abstract void execute(AsyncClientExchangeHandler exchangeHandler, HttpContext context);
+ public abstract void execute(
+ AsyncClientExchangeHandler exchangeHandler,
+ HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
+ HttpContext context);
+
+ public void execute(
+ final AsyncClientExchangeHandler exchangeHandler,
+ final HttpContext context) {
+ execute(exchangeHandler, null, context);
+ }
public <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
+ final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context,
final FutureCallback<T> callback) {
final BasicFuture<T> future = new BasicFuture<>(callback);
@@ -77,6 +89,7 @@ public abstract class AsyncConnectionEndpoint implements Closeable {
}
}),
+ pushHandlerFactory,
context != null ? context : HttpCoreContext.create());
return future;
}
@@ -84,8 +97,24 @@ public abstract class AsyncConnectionEndpoint implements Closeable {
public <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
+ final HttpContext context,
+ final FutureCallback<T> callback) {
+ return execute(requestProducer, responseConsumer, null, context, callback);
+ }
+
+ public <T> Future<T> execute(
+ final AsyncRequestProducer requestProducer,
+ final AsyncResponseConsumer<T> responseConsumer,
+ final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
+ final FutureCallback<T> callback) {
+ return execute(requestProducer, responseConsumer, pushHandlerFactory, null, callback);
+ }
+
+ public <T> Future<T> execute(
+ final AsyncRequestProducer requestProducer,
+ final AsyncResponseConsumer<T> responseConsumer,
final FutureCallback<T> callback) {
- return execute(requestProducer, responseConsumer, null, callback);
+ return execute(requestProducer, responseConsumer, null, null, callback);
}
public abstract boolean isConnected();