You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2018/08/04 09:45:47 UTC
httpcomponents-core git commit: Request specific push consumers
Repository: httpcomponents-core
Updated Branches:
refs/heads/master 331f8b708 -> 36ef4aa74
Request specific push consumers
Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/36ef4aa7
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/36ef4aa7
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/36ef4aa7
Branch: refs/heads/master
Commit: 36ef4aa747e6a9a0b00cc93eb3c5482f62e6ba8b
Parents: 331f8b7
Author: Oleg Kalnichevski <ol...@apache.org>
Authored: Tue Jul 31 16:19:43 2018 +0200
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Sat Aug 4 11:42:51 2018 +0200
----------------------------------------------------------------------
.../nio/AbstractHttp2StreamMultiplexer.java | 13 ++++--
.../impl/nio/ClientHttp2StreamHandler.java | 10 ++++
.../impl/nio/ClientHttp2StreamMultiplexer.java | 12 +++--
.../impl/nio/ClientPushHttp2StreamHandler.java | 5 ++
.../http2/impl/nio/Http2StreamHandler.java | 4 ++
.../impl/nio/ServerHttp2StreamHandler.java | 6 +++
.../impl/nio/ServerHttp2StreamMultiplexer.java | 4 +-
.../impl/nio/ServerPushHttp2StreamHandler.java | 7 +++
.../bootstrap/Http2MultiplexingRequester.java | 29 ++++++++++--
.../testing/nio/ClientSessionEndpoint.java | 24 ++++++++--
.../core5/testing/nio/Http2IntegrationTest.java | 49 +++++++++++---------
.../http/impl/bootstrap/HttpAsyncRequester.java | 33 +++++++++++--
.../hc/core5/http/nio/AsyncClientEndpoint.java | 36 ++++++++++++--
.../nio/command/RequestExecutionCommand.java | 18 ++++++-
14 files changed, 204 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/36ef4aa7/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
index 5881f21..572ff8c 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
@@ -60,7 +60,9 @@ import org.apache.hc.core5.http.config.CharCodingConfig;
import org.apache.hc.core5.http.impl.BasicEndpointDetails;
import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
import org.apache.hc.core5.http.impl.CharCodingSupport;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.AsyncPushProducer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.command.ExecutableCommand;
import org.apache.hc.core5.http.nio.command.ShutdownCommand;
import org.apache.hc.core5.http.protocol.HttpCoreContext;
@@ -180,7 +182,8 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
abstract Http2StreamHandler createRemotelyInitiatedStream(
Http2StreamChannel channel,
HttpProcessor httpProcessor,
- BasicHttpConnectionMetrics connMetrics) throws IOException;
+ BasicHttpConnectionMetrics connMetrics,
+ HandlerFactory<AsyncPushConsumer> pushHandlerFactory) throws IOException;
abstract Http2StreamHandler createLocallyInitiatedStream(
ExecutableCommand command,
@@ -747,7 +750,7 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
localConfig.getInitialWindowSize(),
remoteConfig.getInitialWindowSize());
final Http2StreamHandler streamHandler = createRemotelyInitiatedStream(
- channel, httpProcessor, connMetrics);
+ channel, httpProcessor, connMetrics, null);
stream = new Http2Stream(channel, streamHandler, true);
if (stream.isOutputReady()) {
stream.produceOutput();
@@ -925,7 +928,7 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
localConfig.getInitialWindowSize(),
remoteConfig.getInitialWindowSize());
final Http2StreamHandler streamHandler = createRemotelyInitiatedStream(
- channel, httpProcessor, connMetrics);
+ channel, httpProcessor, connMetrics, stream.getPushHandlerFactory());
final Http2Stream promisedStream = new Http2Stream(channel, streamHandler, true);
streamMap.put(promisedStreamId, promisedStream);
@@ -1576,6 +1579,10 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
localReset(ex, ex.getCode());
}
+ HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
+ return handler.getPushHandlerFactory();
+ }
+
void cancel() {
reset(new CancellationException("HTTP/2 message exchange cancelled"));
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/36ef4aa7/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamHandler.java
index c442589..0002ad7 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamHandler.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamHandler.java
@@ -45,7 +45,9 @@ import org.apache.hc.core5.http.impl.IncomingEntityDetails;
import org.apache.hc.core5.http.impl.nio.MessageState;
import org.apache.hc.core5.http.message.StatusLine;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.RequestChannel;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http.protocol.HttpCoreContext;
@@ -62,6 +64,7 @@ class ClientHttp2StreamHandler implements Http2StreamHandler {
private final HttpProcessor httpProcessor;
private final BasicHttpConnectionMetrics connMetrics;
private final AsyncClientExchangeHandler exchangeHandler;
+ private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
private final HttpCoreContext context;
private final AtomicBoolean requestCommitted;
private final AtomicBoolean failed;
@@ -75,6 +78,7 @@ class ClientHttp2StreamHandler implements Http2StreamHandler {
final HttpProcessor httpProcessor,
final BasicHttpConnectionMetrics connMetrics,
final AsyncClientExchangeHandler exchangeHandler,
+ final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpCoreContext context) {
this.outputChannel = outputChannel;
this.dataChannel = new DataStreamChannel() {
@@ -105,6 +109,7 @@ class ClientHttp2StreamHandler implements Http2StreamHandler {
this.httpProcessor = httpProcessor;
this.connMetrics = connMetrics;
this.exchangeHandler = exchangeHandler;
+ this.pushHandlerFactory = pushHandlerFactory;
this.context = context;
this.requestCommitted = new AtomicBoolean(false);
this.failed = new AtomicBoolean(false);
@@ -114,6 +119,11 @@ class ClientHttp2StreamHandler implements Http2StreamHandler {
}
@Override
+ public HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
+ return pushHandlerFactory;
+ }
+
+ @Override
public boolean isOutputReady() {
switch (requestState) {
case HEADERS:
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/36ef4aa7/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamMultiplexer.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamMultiplexer.java
index c1bdb8f..6a46b96 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamMultiplexer.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamMultiplexer.java
@@ -107,10 +107,13 @@ public class ClientHttp2StreamMultiplexer extends AbstractHttp2StreamMultiplexer
if (command instanceof RequestExecutionCommand) {
final RequestExecutionCommand executionCommand = (RequestExecutionCommand) command;
final AsyncClientExchangeHandler exchangeHandler = executionCommand.getExchangeHandler();
+ final HandlerFactory<AsyncPushConsumer> pushHandlerFactory = executionCommand.getPushHandlerFactory();
final HttpCoreContext context = HttpCoreContext.adapt(executionCommand.getContext());
context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
- return new ClientHttp2StreamHandler(channel, httpProcessor, connMetrics, exchangeHandler, context);
+ return new ClientHttp2StreamHandler(channel, httpProcessor, connMetrics, exchangeHandler,
+ pushHandlerFactory != null ? pushHandlerFactory : this.pushHandlerFactory,
+ context);
} else {
throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Unexpected executable command");
}
@@ -120,11 +123,14 @@ public class ClientHttp2StreamMultiplexer extends AbstractHttp2StreamMultiplexer
Http2StreamHandler createRemotelyInitiatedStream(
final Http2StreamChannel channel,
final HttpProcessor httpProcessor,
- final BasicHttpConnectionMetrics connMetrics) throws IOException {
+ final BasicHttpConnectionMetrics connMetrics,
+ final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) throws IOException {
final HttpCoreContext context = HttpCoreContext.create();
context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
- return new ClientPushHttp2StreamHandler(channel, httpProcessor, connMetrics, pushHandlerFactory, context);
+ return new ClientPushHttp2StreamHandler(channel, httpProcessor, connMetrics,
+ pushHandlerFactory != null ? pushHandlerFactory : this.pushHandlerFactory,
+ context);
}
@Override
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/36ef4aa7/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientPushHttp2StreamHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientPushHttp2StreamHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientPushHttp2StreamHandler.java
index dfbf816..4cdb523 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientPushHttp2StreamHandler.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientPushHttp2StreamHandler.java
@@ -85,6 +85,11 @@ class ClientPushHttp2StreamHandler implements Http2StreamHandler {
}
@Override
+ public HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
+ return pushHandlerFactory;
+ }
+
+ @Override
public boolean isOutputReady() {
return false;
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/36ef4aa7/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamHandler.java
index 589728c..b8e3e1f 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamHandler.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamHandler.java
@@ -32,6 +32,8 @@ import java.util.List;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.ResourceHolder;
interface Http2StreamHandler extends ResourceHolder {
@@ -48,6 +50,8 @@ interface Http2StreamHandler extends ResourceHolder {
void consumeData(ByteBuffer src, boolean endStream) throws HttpException, IOException;
+ HandlerFactory<AsyncPushConsumer> getPushHandlerFactory();
+
void failed(Exception cause);
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/36ef4aa7/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamHandler.java
index 1a866a7..0cc43ed 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamHandler.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamHandler.java
@@ -43,6 +43,7 @@ import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
import org.apache.hc.core5.http.impl.IncomingEntityDetails;
import org.apache.hc.core5.http.impl.nio.MessageState;
import org.apache.hc.core5.http.impl.nio.ServerSupport;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.AsyncPushProducer;
import org.apache.hc.core5.http.nio.AsyncResponseProducer;
import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
@@ -120,6 +121,11 @@ public class ServerHttp2StreamHandler implements Http2StreamHandler {
this.responseState = MessageState.IDLE;
}
+ @Override
+ public HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
+ return null;
+ }
+
private void commitInformation(final HttpResponse response) throws IOException, HttpException {
if (responseCommitted.get()) {
throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Response already committed");
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/36ef4aa7/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamMultiplexer.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamMultiplexer.java
index 3b9f004..fc94370 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamMultiplexer.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamMultiplexer.java
@@ -30,6 +30,7 @@ import java.io.IOException;
import org.apache.hc.core5.http.config.CharCodingConfig;
import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.command.ExecutableCommand;
@@ -92,7 +93,8 @@ public class ServerHttp2StreamMultiplexer extends AbstractHttp2StreamMultiplexer
Http2StreamHandler createRemotelyInitiatedStream(
final Http2StreamChannel channel,
final HttpProcessor httpProcessor,
- final BasicHttpConnectionMetrics connMetrics) throws IOException {
+ final BasicHttpConnectionMetrics connMetrics,
+ final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) throws IOException {
final HttpCoreContext context = HttpCoreContext.create();
context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/36ef4aa7/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerPushHttp2StreamHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerPushHttp2StreamHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerPushHttp2StreamHandler.java
index af0afe6..9af9c34 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerPushHttp2StreamHandler.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerPushHttp2StreamHandler.java
@@ -41,8 +41,10 @@ import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.http.ProtocolException;
import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
import org.apache.hc.core5.http.impl.nio.MessageState;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.AsyncPushProducer;
import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.ResponseChannel;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http.protocol.HttpCoreContext;
@@ -111,6 +113,11 @@ class ServerPushHttp2StreamHandler implements Http2StreamHandler {
}
@Override
+ public HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
+ return null;
+ }
+
+ @Override
public void consumePromise(final List<Header> headers) throws HttpException, IOException {
throw new ProtocolException("Unexpected message promise");
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/36ef4aa7/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
index 1cc35af..dd0f669 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
@@ -51,10 +51,12 @@ import org.apache.hc.core5.http.ProtocolException;
import org.apache.hc.core5.http.impl.DefaultAddressResolver;
import org.apache.hc.core5.http.impl.bootstrap.AsyncRequester;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.RequestChannel;
import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
import org.apache.hc.core5.http.nio.command.ShutdownCommand;
@@ -117,18 +119,27 @@ public class Http2MultiplexingRequester extends AsyncRequester{
public Cancellable execute(
final AsyncClientExchangeHandler exchangeHandler,
+ final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final Timeout timeout,
final HttpContext context) {
Args.notNull(exchangeHandler, "Exchange handler");
Args.notNull(timeout, "Timeout");
Args.notNull(context, "Context");
final CancellableExecution cancellableExecution = new CancellableExecution();
- execute(exchangeHandler, cancellableExecution, timeout, context);
+ execute(exchangeHandler, pushHandlerFactory, cancellableExecution, timeout, context);
return cancellableExecution;
}
+ public Cancellable execute(
+ final AsyncClientExchangeHandler exchangeHandler,
+ final Timeout timeout,
+ final HttpContext context) {
+ return execute(exchangeHandler, null, timeout, context);
+ }
+
private void execute(
final AsyncClientExchangeHandler exchangeHandler,
+ final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final CancellableDependency cancellableDependency,
final Timeout timeout,
final HttpContext context) {
@@ -210,7 +221,7 @@ public class Http2MultiplexingRequester extends AsyncRequester{
exchangeHandler.failed(cause);
}
- }, cancellableDependency, context), Command.Priority.NORMAL);
+ }, pushHandlerFactory, cancellableDependency, context), Command.Priority.NORMAL);
}
@Override
@@ -236,6 +247,7 @@ public class Http2MultiplexingRequester extends AsyncRequester{
public final <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
+ final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final Timeout timeout,
final HttpContext context,
final FutureCallback<T> callback) {
@@ -261,7 +273,7 @@ public class Http2MultiplexingRequester extends AsyncRequester{
}
});
- execute(exchangeHandler, future, timeout, context != null ? context : HttpCoreContext.create());
+ execute(exchangeHandler, pushHandlerFactory, future, timeout, context != null ? context : HttpCoreContext.create());
return future;
}
@@ -269,8 +281,17 @@ public class Http2MultiplexingRequester extends AsyncRequester{
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
final Timeout timeout,
+ final HttpContext context,
+ final FutureCallback<T> callback) {
+ return execute(requestProducer, responseConsumer, null, timeout, context, callback);
+ }
+
+ public final <T> Future<T> execute(
+ final AsyncRequestProducer requestProducer,
+ final AsyncResponseConsumer<T> responseConsumer,
+ final Timeout timeout,
final FutureCallback<T> callback) {
- return execute(requestProducer, responseConsumer, timeout, null, callback);
+ return execute(requestProducer, responseConsumer, null, timeout, null, callback);
}
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/36ef4aa7/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ClientSessionEndpoint.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ClientSessionEndpoint.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ClientSessionEndpoint.java
index f48e45a..184fc34 100644
--- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ClientSessionEndpoint.java
+++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ClientSessionEndpoint.java
@@ -36,8 +36,10 @@ import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.concurrent.BasicFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
import org.apache.hc.core5.http.nio.command.ShutdownCommand;
import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
@@ -74,15 +76,23 @@ public final class ClientSessionEndpoint implements ModalCloseable {
public void execute(
final AsyncClientExchangeHandler exchangeHandler,
+ final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context) {
Asserts.check(!closed.get(), "Connection is already closed");
- final Command executionCommand = new RequestExecutionCommand(exchangeHandler, context);
+ final Command executionCommand = new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, null, context);
execute(executionCommand, Command.Priority.NORMAL);
}
+ public void execute(
+ final AsyncClientExchangeHandler exchangeHandler,
+ final HttpContext context) {
+ execute(exchangeHandler, null, context);
+ }
+
public <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
+ final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context,
final FutureCallback<T> callback) {
Asserts.check(!closed.get(), "Connection is already closed");
@@ -106,15 +116,23 @@ public final class ClientSessionEndpoint implements ModalCloseable {
}
}),
- context);
+ pushHandlerFactory, context);
return future;
}
public <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
+ final HttpContext context,
+ final FutureCallback<T> callback) {
+ return execute(requestProducer, responseConsumer, null, context, callback);
+ }
+
+ public <T> Future<T> execute(
+ final AsyncRequestProducer requestProducer,
+ final AsyncResponseConsumer<T> responseConsumer,
final FutureCallback<T> callback) {
- return execute(requestProducer, responseConsumer, null, callback);
+ return execute(requestProducer, responseConsumer, null, null, callback);
}
public boolean isOpen() {
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/36ef4aa7/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java
index 89662ca..3af925e 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java
@@ -91,6 +91,7 @@ import org.apache.hc.core5.http.nio.BasicResponseConsumer;
import org.apache.hc.core5.http.nio.BasicResponseProducer;
import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.ResponseChannel;
import org.apache.hc.core5.http.nio.entity.AbstractClassicEntityConsumer;
import org.apache.hc.core5.http.nio.entity.AbstractClassicEntityProducer;
@@ -597,28 +598,6 @@ public class Http2IntegrationTest extends InternalHttp2ServerTestBase {
client.start(H2Config.custom().setPushEnabled(true).build());
final BlockingQueue<Message<HttpResponse, String>> pushMessageQueue = new LinkedBlockingDeque<>();
- client.register("*", new Supplier<AsyncPushConsumer>() {
-
- @Override
- public AsyncPushConsumer get() {
- return new AbstractAsyncPushHandler<Message<HttpResponse, String>>(new BasicResponseConsumer<>(new StringAsyncEntityConsumer())) {
-
- @Override
- protected void handleResponse(
- final HttpRequest promise,
- final Message<HttpResponse, String> responseMessage) throws IOException, HttpException {
- try {
- pushMessageQueue.put(responseMessage);
- } catch (final InterruptedException ex) {
- Thread.currentThread().interrupt();
- throw new InterruptedIOException(ex.getMessage());
- }
- }
-
- };
- }
-
- });
final Future<ClientSessionEndpoint> connectFuture = client.connect(
"localhost", serverEndpoint.getPort(), TIMEOUT);
@@ -626,7 +605,31 @@ public class Http2IntegrationTest extends InternalHttp2ServerTestBase {
final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
new BasicRequestProducer("GET", createRequestURI(serverEndpoint, "/hello")),
- new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
+ new HandlerFactory<AsyncPushConsumer>() {
+
+ @Override
+ public AsyncPushConsumer create(
+ final HttpRequest request, final HttpContext context) throws HttpException {
+ return new AbstractAsyncPushHandler<Message<HttpResponse, String>>(new BasicResponseConsumer<>(new StringAsyncEntityConsumer())) {
+
+ @Override
+ protected void handleResponse(
+ final HttpRequest promise,
+ final Message<HttpResponse, String> responseMessage) throws IOException, HttpException {
+ try {
+ pushMessageQueue.put(responseMessage);
+ } catch (final InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException(ex.getMessage());
+ }
+ }
+
+ };
+ }
+ },
+ null,
+ null);
final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
Assert.assertNotNull(result1);
final HttpResponse response1 = result1.getHead();
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/36ef4aa7/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
index 0fff5cd..b855e6f 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
@@ -50,10 +50,12 @@ import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http.impl.DefaultAddressResolver;
import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.RequestChannel;
import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
import org.apache.hc.core5.http.nio.command.ShutdownCommand;
@@ -251,6 +253,7 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
public void execute(
final AsyncClientExchangeHandler exchangeHandler,
+ final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final Timeout timeout,
final HttpContext executeContext) {
Args.notNull(exchangeHandler, "Exchange handler");
@@ -338,7 +341,7 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
exchangeHandler.streamEnd(trailers);
}
- }, executeContext);
+ }, pushHandlerFactory, executeContext);
}
@@ -363,9 +366,17 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
}
}
+ public void execute(
+ final AsyncClientExchangeHandler exchangeHandler,
+ final Timeout timeout,
+ final HttpContext executeContext) {
+ execute(exchangeHandler, null, timeout, executeContext);
+ }
+
public final <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
+ final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final Timeout timeout,
final HttpContext context,
final FutureCallback<T> callback) {
@@ -391,7 +402,7 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
}
});
- execute(exchangeHandler, timeout, context != null ? context : HttpCoreContext.create());
+ execute(exchangeHandler, pushHandlerFactory, timeout, context != null ? context : HttpCoreContext.create());
return future;
}
@@ -399,8 +410,17 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
final Timeout timeout,
+ final HttpContext context,
+ final FutureCallback<T> callback) {
+ return execute(requestProducer, responseConsumer, null, timeout, context, callback);
+ }
+
+ public final <T> Future<T> execute(
+ final AsyncRequestProducer requestProducer,
+ final AsyncResponseConsumer<T> responseConsumer,
+ final Timeout timeout,
final FutureCallback<T> callback) {
- return execute(requestProducer, responseConsumer, timeout, null, callback);
+ return execute(requestProducer, responseConsumer, null, timeout, null, callback);
}
private class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
@@ -412,7 +432,10 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
}
@Override
- public void execute(final AsyncClientExchangeHandler exchangeHandler, final HttpContext context) {
+ public void execute(
+ final AsyncClientExchangeHandler exchangeHandler,
+ final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
+ final HttpContext context) {
final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.get();
if (poolEntry == null) {
throw new IllegalStateException("Endpoint has already been released");
@@ -421,7 +444,7 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
if (ioSession == null) {
throw new IllegalStateException("I/O session is invalid");
}
- ioSession.enqueue(new RequestExecutionCommand(exchangeHandler, context), Command.Priority.NORMAL);
+ ioSession.enqueue(new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, null, context), Command.Priority.NORMAL);
}
@Override
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/36ef4aa7/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncClientEndpoint.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncClientEndpoint.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncClientEndpoint.java
index a23e55a..322a73f 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncClientEndpoint.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncClientEndpoint.java
@@ -54,7 +54,22 @@ public abstract class AsyncClientEndpoint {
* Once the endpoint is no longer needed it MUST be released with {@link #releaseAndReuse()}
* or {@link #releaseAndDiscard()}.
*/
- public abstract void execute(AsyncClientExchangeHandler exchangeHandler, HttpContext context);
+ public abstract void execute(
+ AsyncClientExchangeHandler exchangeHandler,
+ HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
+ HttpContext context);
+
+ /**
+ * Initiates a message exchange using the given handler.
+ * <p>
+ * Once the endpoint is no longer needed it MUST be released with {@link #releaseAndReuse()}
+ * or {@link #releaseAndDiscard()}.
+ */
+ public void execute(
+ final AsyncClientExchangeHandler exchangeHandler,
+ final HttpContext context) {
+ execute(exchangeHandler, null, context);
+ }
/**
* Releases the underlying connection back to the connection pool as re-usable.
@@ -75,6 +90,7 @@ public abstract class AsyncClientEndpoint {
public final <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
+ final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context,
final FutureCallback<T> callback) {
final BasicFuture<T> future = new BasicFuture<>(callback);
@@ -97,11 +113,25 @@ public abstract class AsyncClientEndpoint {
}
}),
- context != null ? context : HttpCoreContext.create());
+ pushHandlerFactory, context != null ? context : HttpCoreContext.create());
return future;
}
/**
+ * Initiates message exchange using the given request producer and response consumer.
+ * <p>
+ * Once the endpoint is no longer needed it MUST be released with {@link #releaseAndReuse()}
+ * or {@link #releaseAndDiscard()}.
+ */
+ public final <T> Future<T> execute(
+ final AsyncRequestProducer requestProducer,
+ final AsyncResponseConsumer<T> responseConsumer,
+ final HttpContext context,
+ final FutureCallback<T> callback) {
+ return execute(requestProducer, responseConsumer, null, context, callback);
+ }
+
+ /**
* Initiates a message exchange using the given request producer and response consumer.
* <p>
* Once the endpoint is no longer needed it MUST be released with {@link #releaseAndReuse()}
@@ -111,7 +141,7 @@ public abstract class AsyncClientEndpoint {
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
final FutureCallback<T> callback) {
- return execute(requestProducer, responseConsumer, null, callback);
+ return execute(requestProducer, responseConsumer, null, null, callback);
}
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/36ef4aa7/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/RequestExecutionCommand.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/RequestExecutionCommand.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/RequestExecutionCommand.java
index 0b83aef..98f46f2 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/RequestExecutionCommand.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/RequestExecutionCommand.java
@@ -29,6 +29,8 @@ package org.apache.hc.core5.http.nio.command;
import org.apache.hc.core5.concurrent.CancellableDependency;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.util.Args;
@@ -40,28 +42,42 @@ import org.apache.hc.core5.util.Args;
public final class RequestExecutionCommand extends ExecutableCommand {
private final AsyncClientExchangeHandler exchangeHandler;
+ private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
private final CancellableDependency cancellableDependency;
private final HttpContext context;
public RequestExecutionCommand(
final AsyncClientExchangeHandler exchangeHandler,
+ final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final CancellableDependency cancellableDependency,
final HttpContext context) {
this.exchangeHandler = Args.notNull(exchangeHandler, "Handler");
+ this.pushHandlerFactory = pushHandlerFactory;
this.cancellableDependency = cancellableDependency;
this.context = context;
}
public RequestExecutionCommand(
final AsyncClientExchangeHandler exchangeHandler,
+ final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context) {
- this(exchangeHandler, null, context);
+ this(exchangeHandler, pushHandlerFactory, null, context);
+ }
+
+ public RequestExecutionCommand(
+ final AsyncClientExchangeHandler exchangeHandler,
+ final HttpContext context) {
+ this(exchangeHandler, null, null, context);
}
public AsyncClientExchangeHandler getExchangeHandler() {
return exchangeHandler;
}
+ public HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
+ return pushHandlerFactory;
+ }
+
@Override
public CancellableDependency getCancellableDependency() {
return cancellableDependency;