You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2021/03/11 22:12:30 UTC
[httpcomponents-core] 06/06: Protocol upgrade APIs redesign
This is an automated email from the ASF dual-hosted git repository.
olegk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git
commit 5db3bc3eb9107253f3169d3028305d5f99e71e3b
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Thu Dec 17 20:59:03 2020 +0100
Protocol upgrade APIs redesign
---
.../http2/impl/nio/ClientH2UpgradeHandler.java | 69 ++++++
.../impl/nio/ClientHttpProtocolNegotiator.java | 1 +
.../impl/nio/H2OnlyClientProtocolNegotiator.java | 13 +-
.../http2/impl/nio/ServerH2UpgradeHandler.java | 69 ++++++
.../impl/nio/ServerHttpProtocolNegotiator.java | 1 +
.../http2/impl/nio/bootstrap/H2AsyncRequester.java | 60 ++++++
.../impl/nio/bootstrap/H2RequesterBootstrap.java | 4 +-
.../examples/H2ViaHttp1ProxyExecutionExample.java | 233 +++++++++++++++++++++
.../http/impl/bootstrap/HttpAsyncRequester.java | 66 +++++-
.../nio/ssl/TlsUpgradeCapable.java} | 15 +-
.../hc/core5/reactor/InternalDataChannel.java | 24 +++
.../apache/hc/core5/reactor/ProtocolIOSession.java | 24 +++
...lIOSession.java => ProtocolUpgradeHandler.java} | 17 +-
13 files changed, 579 insertions(+), 17 deletions(-)
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2UpgradeHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2UpgradeHandler.java
new file mode 100644
index 0000000..ee548f8
--- /dev/null
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2UpgradeHandler.java
@@ -0,0 +1,69 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http2.impl.nio;
+
+import java.io.IOException;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.Internal;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.impl.nio.HttpConnectionEventHandler;
+import org.apache.hc.core5.reactor.ProtocolIOSession;
+import org.apache.hc.core5.reactor.ProtocolUpgradeHandler;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * Protocol upgrade handler that upgrades the underlying {@link ProtocolIOSession}
+ * to HTTP/2 in case of a successful protocol negotiation.
+ *
+ * @since 5.2
+ */
+@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
+@Internal
+public class ClientH2UpgradeHandler implements ProtocolUpgradeHandler {
+
+ private final ClientH2StreamMultiplexerFactory http2StreamHandlerFactory;
+
+ public ClientH2UpgradeHandler(final ClientH2StreamMultiplexerFactory http2StreamHandlerFactory) {
+ this.http2StreamHandlerFactory = Args.notNull(http2StreamHandlerFactory, "HTTP/2 stream handler factory");
+ }
+
+ @Override
+ public void upgrade(final ProtocolIOSession ioSession, final FutureCallback<ProtocolIOSession> callback) {
+ final HttpConnectionEventHandler protocolNegotiator = new H2OnlyClientProtocolNegotiator(
+ ioSession, http2StreamHandlerFactory, true, callback);
+ ioSession.upgrade(protocolNegotiator);
+ try {
+ protocolNegotiator.connected(ioSession);
+ } catch (final IOException ex) {
+ protocolNegotiator.exception(ioSession, ex);
+ }
+ }
+
+}
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
index a1a7f4b..9c2152f 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
@@ -95,6 +95,7 @@ public class ClientHttpProtocolNegotiator extends ProtocolNegotiatorBase {
private void startHttp1() throws IOException {
final ByteBuffer data = inBuf != null ? inBuf.data() : null;
startProtocol(new ClientHttp1IOEventHandler(http1StreamHandlerFactory.create(ioSession)), data);
+ ioSession.registerProtocol(ApplicationProtocol.HTTP_2.id, new ClientH2UpgradeHandler(http2StreamHandlerFactory));
if (inBuf != null) {
inBuf.clear();
}
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2OnlyClientProtocolNegotiator.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2OnlyClientProtocolNegotiator.java
index a4f7df5..388e034 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2OnlyClientProtocolNegotiator.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2OnlyClientProtocolNegotiator.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.impl.nio.BufferedData;
import org.apache.hc.core5.http2.ssl.ApplicationProtocol;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.reactor.ProtocolIOSession;
@@ -56,6 +57,7 @@ public class H2OnlyClientProtocolNegotiator extends ProtocolNegotiatorBase {
private final AtomicBoolean initialized;
private volatile ByteBuffer preface;
+ private volatile BufferedData inBuf;
public H2OnlyClientProtocolNegotiator(
final ProtocolIOSession ioSession,
@@ -103,7 +105,11 @@ public class H2OnlyClientProtocolNegotiator extends ProtocolNegotiatorBase {
if (!preface.hasRemaining()) {
session.clearEvent(SelectionKey.OP_WRITE);
final ClientH2StreamMultiplexer streamMultiplexer = http2StreamHandlerFactory.create(ioSession);
- startProtocol(new ClientH2IOEventHandler(streamMultiplexer), null);
+ final ByteBuffer data = inBuf != null ? inBuf.data() : null;
+ startProtocol(new ClientH2IOEventHandler(streamMultiplexer), data);
+ if (inBuf != null) {
+ inBuf.clear();
+ }
preface = null;
}
}
@@ -130,7 +136,10 @@ public class H2OnlyClientProtocolNegotiator extends ProtocolNegotiatorBase {
@Override
public void inputReady(final IOSession session, final ByteBuffer src) throws IOException {
if (src != null) {
- throw new ProtocolNegotiationException("Unexpected input");
+ if (inBuf == null) {
+ inBuf = BufferedData.allocate(src.remaining());
+ }
+ inBuf.put(src);
}
if (preface != null) {
writeOutPreface(session);
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerH2UpgradeHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerH2UpgradeHandler.java
new file mode 100644
index 0000000..181a465
--- /dev/null
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerH2UpgradeHandler.java
@@ -0,0 +1,69 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http2.impl.nio;
+
+import java.io.IOException;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.Internal;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.impl.nio.HttpConnectionEventHandler;
+import org.apache.hc.core5.reactor.ProtocolIOSession;
+import org.apache.hc.core5.reactor.ProtocolUpgradeHandler;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * Protocol upgrade handler that upgrades the underlying {@link ProtocolIOSession}
+ * to HTTP/2 in case of a successful protocol negotiation.
+ *
+ * @since 5.2
+ */
+@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
+@Internal
+public class ServerH2UpgradeHandler implements ProtocolUpgradeHandler {
+
+ private final ServerH2StreamMultiplexerFactory http2StreamHandlerFactory;
+
+ public ServerH2UpgradeHandler(final ServerH2StreamMultiplexerFactory http2StreamHandlerFactory) {
+ this.http2StreamHandlerFactory = Args.notNull(http2StreamHandlerFactory, "HTTP/2 stream handler factory");
+ }
+
+ @Override
+ public void upgrade(final ProtocolIOSession ioSession, final FutureCallback<ProtocolIOSession> callback) {
+ final HttpConnectionEventHandler protocolNegotiator = new H2OnlyServerHttpProtocolNegotiator(
+ ioSession, http2StreamHandlerFactory, callback);
+ ioSession.upgrade(protocolNegotiator);
+ try {
+ protocolNegotiator.connected(ioSession);
+ } catch (final IOException ex) {
+ protocolNegotiator.exception(ioSession, ex);
+ }
+ }
+
+}
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
index 8d9aed2..f6febff 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
@@ -96,6 +96,7 @@ public class ServerHttpProtocolNegotiator extends ProtocolNegotiatorBase {
tlsDetails != null ? URIScheme.HTTPS.id : URIScheme.HTTP.id,
ioSession);
startProtocol(new ServerHttp1IOEventHandler(http1StreamHandler), data);
+ ioSession.registerProtocol(ApplicationProtocol.HTTP_2.id, new ServerH2UpgradeHandler(http2StreamHandlerFactory));
}
private void startHttp2(final ByteBuffer data) throws IOException {
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2AsyncRequester.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2AsyncRequester.java
index de22471..e88a11f 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2AsyncRequester.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2AsyncRequester.java
@@ -30,18 +30,24 @@ package org.apache.hc.core5.http2.impl.nio.bootstrap;
import java.util.concurrent.Future;
import org.apache.hc.core5.annotation.Internal;
+import org.apache.hc.core5.concurrent.CallbackContribution;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.function.Decorator;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
+import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
import org.apache.hc.core5.http2.HttpVersionPolicy;
+import org.apache.hc.core5.http2.ssl.ApplicationProtocol;
+import org.apache.hc.core5.net.NamedEndpoint;
import org.apache.hc.core5.pool.ManagedConnPool;
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.reactor.IOSessionListener;
+import org.apache.hc.core5.reactor.ProtocolIOSession;
+import org.apache.hc.core5.reactor.ssl.TlsDetails;
import org.apache.hc.core5.util.Timeout;
/**
@@ -70,6 +76,27 @@ public class H2AsyncRequester extends HttpAsyncRequester {
this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
}
+ /**
+ * Use {@link H2RequesterBootstrap} to create instances of this class.
+ *
+ * @since 5.2
+ */
+ @Internal
+ public H2AsyncRequester(
+ final HttpVersionPolicy versionPolicy,
+ final IOReactorConfig ioReactorConfig,
+ final IOEventHandlerFactory eventHandlerFactory,
+ final Decorator<IOSession> ioSessionDecorator,
+ final Callback<Exception> exceptionCallback,
+ final IOSessionListener sessionListener,
+ final ManagedConnPool<HttpHost, IOSession> connPool,
+ final TlsStrategy tlsStrategy,
+ final Timeout handshakeTimeout) {
+ super(ioReactorConfig, eventHandlerFactory, ioSessionDecorator, exceptionCallback, sessionListener, connPool,
+ tlsStrategy, handshakeTimeout);
+ this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
+ }
+
@Override
protected Future<AsyncClientEndpoint> doConnect(
final HttpHost host,
@@ -79,4 +106,37 @@ public class H2AsyncRequester extends HttpAsyncRequester {
return super.doConnect(host, timeout, attachment != null ? attachment : versionPolicy, callback);
}
+ @Override
+ protected void doTlsUpgrade(final ProtocolIOSession ioSession,
+ final NamedEndpoint endpoint,
+ final FutureCallback<ProtocolIOSession> callback) {
+ super.doTlsUpgrade(ioSession, endpoint, new CallbackContribution<ProtocolIOSession>(callback) {
+
+ @Override
+ public void completed(final ProtocolIOSession protocolSession) {
+ final boolean switchProtocol;
+ switch (versionPolicy) {
+ case FORCE_HTTP_2:
+ switchProtocol = true;
+ break;
+ case NEGOTIATE:
+ final TlsDetails tlsDetails = protocolSession.getTlsDetails();
+ final String appProtocol = tlsDetails != null ? tlsDetails.getApplicationProtocol() : null;
+ switchProtocol = ApplicationProtocol.HTTP_2.id.equals(appProtocol);
+ break;
+ default:
+ switchProtocol = false;
+ }
+ if (switchProtocol) {
+ protocolSession.switchProtocol(ApplicationProtocol.HTTP_2.id, callback);
+ } else {
+ if (callback != null) {
+ callback.completed(protocolSession);
+ }
+ }
+ }
+
+ });
+ }
+
}
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java
index ab94662..dc5883c 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java
@@ -348,7 +348,9 @@ public class H2RequesterBootstrap {
ioSessionDecorator,
exceptionCallback,
sessionListener,
- connPool);
+ connPool,
+ actualTlsStrategy,
+ handshakeTimeout);
}
}
diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2ViaHttp1ProxyExecutionExample.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2ViaHttp1ProxyExecutionExample.java
new file mode 100644
index 0000000..9f2e1fd
--- /dev/null
+++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2ViaHttp1ProxyExecutionExample.java
@@ -0,0 +1,233 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http2.examples;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hc.core5.concurrent.ComplexFuture;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.concurrent.FutureContribution;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpConnection;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.Method;
+import org.apache.hc.core5.http.impl.Http1StreamListener;
+import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
+import org.apache.hc.core5.http.message.BasicHttpRequest;
+import org.apache.hc.core5.http.message.RequestLine;
+import org.apache.hc.core5.http.message.StatusLine;
+import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
+import org.apache.hc.core5.http.nio.entity.NoopEntityConsumer;
+import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
+import org.apache.hc.core5.http.nio.ssl.TlsUpgradeCapable;
+import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
+import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
+import org.apache.hc.core5.http2.HttpVersionPolicy;
+import org.apache.hc.core5.http2.config.H2Config;
+import org.apache.hc.core5.http2.frame.RawFrame;
+import org.apache.hc.core5.http2.impl.nio.H2StreamListener;
+import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.reactor.ProtocolIOSession;
+import org.apache.hc.core5.util.Timeout;
+
+/**
+ * Example of asynchronous HTTP/2 request execution via a HTTP/1.1 proxy.
+ */
+public class H2ViaHttp1ProxyExecutionExample {
+
+ public static void main(final String[] args) throws Exception {
+
+ // Create and start requester
+ final H2Config h2Config = H2Config.custom()
+ .setPushEnabled(false)
+ .build();
+
+ final HttpAsyncRequester requester = H2RequesterBootstrap.bootstrap()
+ .setH2Config(h2Config)
+ .setVersionPolicy(HttpVersionPolicy.NEGOTIATE)
+ .setStreamListener(new Http1StreamListener() {
+
+ @Override
+ public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
+ System.out.println(connection.getRemoteAddress() + " " + new RequestLine(request));
+ }
+
+ @Override
+ public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
+ System.out.println(connection.getRemoteAddress() + " " + new StatusLine(response));
+ }
+
+ @Override
+ public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
+ if (keepAlive) {
+ System.out.println(connection.getRemoteAddress() + " exchange completed (connection kept alive)");
+ } else {
+ System.out.println(connection.getRemoteAddress() + " exchange completed (connection closed)");
+ }
+ }
+
+ })
+ .setStreamListener(new H2StreamListener() {
+
+ @Override
+ public void onHeaderInput(final HttpConnection connection, final int streamId, final List<? extends Header> headers) {
+ for (int i = 0; i < headers.size(); i++) {
+ System.out.println(connection.getRemoteAddress() + " (" + streamId + ") << " + headers.get(i));
+ }
+ }
+
+ @Override
+ public void onHeaderOutput(final HttpConnection connection, final int streamId, final List<? extends Header> headers) {
+ for (int i = 0; i < headers.size(); i++) {
+ System.out.println(connection.getRemoteAddress() + " (" + streamId + ") >> " + headers.get(i));
+ }
+ }
+
+ @Override
+ public void onFrameInput(final HttpConnection connection, final int streamId, final RawFrame frame) {
+ }
+
+ @Override
+ public void onFrameOutput(final HttpConnection connection, final int streamId, final RawFrame frame) {
+ }
+
+ @Override
+ public void onInputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
+ }
+
+ @Override
+ public void onOutputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
+ }
+
+ })
+ .create();
+
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ System.out.println("HTTP requester shutting down");
+ requester.close(CloseMode.GRACEFUL);
+ }));
+ requester.start();
+
+ final HttpHost proxy = new HttpHost("localhost", 8888);
+ final HttpHost target = new HttpHost("https", "nghttp2.org");
+
+ final ComplexFuture<AsyncClientEndpoint> tunnelFuture = new ComplexFuture<>(null);
+ tunnelFuture.setDependency(requester.connect(
+ proxy,
+ Timeout.ofSeconds(30),
+ null,
+ new FutureContribution<AsyncClientEndpoint>(tunnelFuture) {
+
+ @Override
+ public void completed(final AsyncClientEndpoint endpoint) {
+ if (endpoint instanceof TlsUpgradeCapable) {
+ final HttpRequest connect = new BasicHttpRequest(Method.CONNECT, proxy, target.toHostString());
+ endpoint.execute(
+ new BasicRequestProducer(connect, null),
+ new BasicResponseConsumer<>(new NoopEntityConsumer()),
+ new FutureContribution<Message<HttpResponse, Void>>(tunnelFuture) {
+
+ @Override
+ public void completed(final Message<HttpResponse, Void> message) {
+ final HttpResponse response = message.getHead();
+ if (response.getCode() == HttpStatus.SC_OK) {
+ ((TlsUpgradeCapable) endpoint).tlsUpgrade(
+ target,
+ new FutureContribution<ProtocolIOSession>(tunnelFuture) {
+
+ @Override
+ public void completed(final ProtocolIOSession protocolSession) {
+ System.out.println("Tunnel to " + target + " via " + proxy + " established");
+ tunnelFuture.completed(endpoint);
+ }
+
+ });
+ } else {
+ tunnelFuture.failed(new HttpException("Tunnel refused: " + new StatusLine(response)));
+ }
+ }
+
+ });
+ } else {
+ tunnelFuture.failed(new IllegalStateException("TLS upgrade not supported"));
+ }
+ }
+
+ }));
+
+ final String[] requestUris = new String[] {"/httpbin/ip", "/httpbin/user-agent", "/httpbin/headers"};
+ final AsyncClientEndpoint endpoint = tunnelFuture.get(1, TimeUnit.MINUTES);
+ try {
+ final CountDownLatch latch = new CountDownLatch(requestUris.length);
+ for (final String requestUri : requestUris) {
+ endpoint.execute(
+ new BasicRequestProducer(Method.GET, target, requestUri),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
+ new FutureCallback<Message<HttpResponse, String>>() {
+
+ @Override
+ public void completed(final Message<HttpResponse, String> message) {
+ final HttpResponse response = message.getHead();
+ final String body = message.getBody();
+ System.out.println(requestUri + "->" + response.getCode());
+ System.out.println(body);
+ latch.countDown();
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ System.out.println(requestUri + "->" + ex);
+ latch.countDown();
+ }
+
+ @Override
+ public void cancelled() {
+ System.out.println(requestUri + " cancelled");
+ latch.countDown();
+ }
+
+ });
+ }
+
+ latch.await();
+ } finally {
+ endpoint.releaseAndDiscard();
+ }
+
+ System.out.println("Shutting down I/O reactor");
+ requester.initiateShutdown();
+ }
+
+}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
index 6ebe715..c87b2d4 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.concurrent.BasicFuture;
+import org.apache.hc.core5.concurrent.CallbackContribution;
import org.apache.hc.core5.concurrent.ComplexFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.concurrent.FutureContribution;
@@ -61,10 +62,13 @@ import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.RequestChannel;
import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
import org.apache.hc.core5.http.nio.command.ShutdownCommand;
+import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
+import org.apache.hc.core5.http.nio.ssl.TlsUpgradeCapable;
import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http.protocol.HttpCoreContext;
import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.net.NamedEndpoint;
import org.apache.hc.core5.net.URIAuthority;
import org.apache.hc.core5.pool.ConnPoolControl;
import org.apache.hc.core5.pool.ManagedConnPool;
@@ -77,6 +81,8 @@ import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.reactor.IOSessionListener;
+import org.apache.hc.core5.reactor.ProtocolIOSession;
+import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
@@ -89,9 +95,13 @@ import org.apache.hc.core5.util.Timeout;
public class HttpAsyncRequester extends AsyncRequester implements ConnPoolControl<HttpHost> {
private final ManagedConnPool<HttpHost, IOSession> connPool;
+ private final TlsStrategy tlsStrategy;
+ private final Timeout handshakeTimeout;
/**
* Use {@link AsyncRequesterBootstrap} to create instances of this class.
+ *
+ * @since 5.2
*/
@Internal
public HttpAsyncRequester(
@@ -100,10 +110,29 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
final Decorator<IOSession> ioSessionDecorator,
final Callback<Exception> exceptionCallback,
final IOSessionListener sessionListener,
- final ManagedConnPool<HttpHost, IOSession> connPool) {
+ final ManagedConnPool<HttpHost, IOSession> connPool,
+ final TlsStrategy tlsStrategy,
+ final Timeout handshakeTimeout) {
super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener,
ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE);
this.connPool = Args.notNull(connPool, "Connection pool");
+ this.tlsStrategy = tlsStrategy;
+ this.handshakeTimeout = handshakeTimeout;
+ }
+
+ /**
+ * Use {@link AsyncRequesterBootstrap} to create instances of this class.
+ */
+ @Internal
+ public HttpAsyncRequester(
+ final IOReactorConfig ioReactorConfig,
+ final IOEventHandlerFactory eventHandlerFactory,
+ final Decorator<IOSession> ioSessionDecorator,
+ final Callback<Exception> exceptionCallback,
+ final IOSessionListener sessionListener,
+ final ManagedConnPool<HttpHost, IOSession> connPool) {
+ this(ioReactorConfig, eventHandlerFactory, ioSessionDecorator, exceptionCallback, sessionListener, connPool,
+ null, null);
}
@Override
@@ -403,7 +432,31 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
return execute(requestProducer, responseConsumer, null, timeout, null, callback);
}
- private class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
+ protected void doTlsUpgrade(
+ final ProtocolIOSession ioSession,
+ final NamedEndpoint endpoint,
+ final FutureCallback<ProtocolIOSession> callback) {
+ if (tlsStrategy != null) {
+ tlsStrategy.upgrade(ioSession,
+ endpoint,
+ null,
+ handshakeTimeout,
+ new CallbackContribution<TransportSecurityLayer>(callback) {
+
+ @Override
+ public void completed(final TransportSecurityLayer transportSecurityLayer) {
+ if (callback != null) {
+ callback.completed(ioSession);
+ }
+ }
+
+ });
+ } else {
+ throw new IllegalStateException("TLS upgrade not supported");
+ }
+ }
+
+ private class InternalAsyncClientEndpoint extends AsyncClientEndpoint implements TlsUpgradeCapable {
final AtomicReference<PoolEntry<HttpHost, IOSession>> poolEntryRef;
@@ -471,6 +524,15 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
}
}
+ @Override
+ public void tlsUpgrade(final NamedEndpoint endpoint, final FutureCallback<ProtocolIOSession> callback) {
+ final IOSession ioSession = getIOSession();
+ if (ioSession instanceof ProtocolIOSession) {
+ doTlsUpgrade((ProtocolIOSession) ioSession, endpoint, callback);
+ } else {
+ throw new IllegalStateException("TLS upgrade not supported");
+ }
+ }
}
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolIOSession.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/ssl/TlsUpgradeCapable.java
similarity index 76%
copy from httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolIOSession.java
copy to httpcore5/src/main/java/org/apache/hc/core5/http/nio/ssl/TlsUpgradeCapable.java
index 35e8e29..23ee3bd 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolIOSession.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/ssl/TlsUpgradeCapable.java
@@ -25,18 +25,21 @@
*
*/
-package org.apache.hc.core5.reactor;
+package org.apache.hc.core5.http.nio.ssl;
+import org.apache.hc.core5.annotation.Internal;
+import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.net.NamedEndpoint;
-import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
+import org.apache.hc.core5.reactor.ProtocolIOSession;
/**
- * TLS capable, protocol upgradable {@link IOSession}.
+ * Capability to upgrade to TLS.
*
- * @since 5.0
+ * @since 5.2
*/
-public interface ProtocolIOSession extends IOSession, TransportSecurityLayer {
+@Internal
+public interface TlsUpgradeCapable {
- NamedEndpoint getInitialEndpoint();
+ void tlsUpgrade(NamedEndpoint endpoint, final FutureCallback<ProtocolIOSession> callback);
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
index e292a69..570b2c6 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
@@ -32,7 +32,10 @@ import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
+import java.util.Locale;
import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
@@ -50,6 +53,7 @@ import org.apache.hc.core5.reactor.ssl.SSLSessionInitializer;
import org.apache.hc.core5.reactor.ssl.SSLSessionVerifier;
import org.apache.hc.core5.reactor.ssl.TlsDetails;
import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
+import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.Asserts;
import org.apache.hc.core5.util.Timeout;
@@ -63,6 +67,7 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
private final AtomicReference<SSLIOSession> tlsSessionRef;
private final AtomicReference<IOSession> currentSessionRef;
private final AtomicReference<FutureCallback<TransportSecurityLayer>> tlsHandshakeCallbackRef;
+ private final ConcurrentMap<String, ProtocolUpgradeHandler> protocolUpgradeHandlerMap;
private final AtomicBoolean closed;
InternalDataChannel(
@@ -80,6 +85,7 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
this.currentSessionRef = new AtomicReference<>(
ioSessionDecorator != null ? ioSessionDecorator.decorate(ioSession) : ioSession);
this.tlsHandshakeCallbackRef = new AtomicReference<>(null);
+ this.protocolUpgradeHandlerMap = new ConcurrentHashMap<>();
this.closed = new AtomicBoolean(false);
}
@@ -404,6 +410,24 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
}
@Override
+ public void switchProtocol(final String protocolId, final FutureCallback<ProtocolIOSession> callback) {
+ Args.notEmpty(protocolId, "Application protocol ID");
+ final ProtocolUpgradeHandler upgradeHandler = protocolUpgradeHandlerMap.get(protocolId.toLowerCase(Locale.ROOT));
+ if (upgradeHandler != null) {
+ upgradeHandler.upgrade(this, callback);
+ } else {
+ throw new IllegalStateException("Unsupported protocol: " + protocolId);
+ }
+ }
+
+ @Override
+ public void registerProtocol(final String protocolId, final ProtocolUpgradeHandler upgradeHandler) {
+ Args.notEmpty(protocolId, "Application protocol ID");
+ Args.notNull(upgradeHandler, "Protocol upgrade handler");
+ protocolUpgradeHandlerMap.put(protocolId.toLowerCase(Locale.ROOT), upgradeHandler);
+ }
+
+ @Override
public String toString() {
final IOSession currentSession = currentSessionRef.get();
if (currentSession != null) {
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolIOSession.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolIOSession.java
index 35e8e29..cebfb06 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolIOSession.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolIOSession.java
@@ -27,6 +27,7 @@
package org.apache.hc.core5.reactor;
+import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.net.NamedEndpoint;
import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
@@ -37,6 +38,29 @@ import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
*/
public interface ProtocolIOSession extends IOSession, TransportSecurityLayer {
+ /**
+ * Switches this I/O session to the application protocol with the given ID.
+ * @param protocolId the application protocol ID
+ * @param callback the result callback
+ * @throws UnsupportedOperationException if application protocol switch
+ * is not supported.
+ */
+ default void switchProtocol(String protocolId, FutureCallback<ProtocolIOSession> callback) throws UnsupportedOperationException {
+ throw new UnsupportedOperationException("Protocol switch not supported");
+ }
+
+ /**
+ * Registers protocol upgrade handler with the given application protocol ID.
+ *
+ * @since 5.2
+ * @param protocolId the application protocol ID
+ * @param upgradeHandler the upgrade handler.
+ *
+ * @since 5.2
+ */
+ default void registerProtocol(String protocolId, ProtocolUpgradeHandler upgradeHandler) {
+ }
+
NamedEndpoint getInitialEndpoint();
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolIOSession.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolUpgradeHandler.java
similarity index 72%
copy from httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolIOSession.java
copy to httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolUpgradeHandler.java
index 35e8e29..fef4bb7 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolIOSession.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolUpgradeHandler.java
@@ -27,16 +27,21 @@
package org.apache.hc.core5.reactor;
-import org.apache.hc.core5.net.NamedEndpoint;
-import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
+import org.apache.hc.core5.annotation.Internal;
+import org.apache.hc.core5.concurrent.FutureCallback;
/**
- * TLS capable, protocol upgradable {@link IOSession}.
+ * Application protocol upgrade handler. This routine can be used to upgrade
+ * I/O sessions to a new application protocol.
*
- * @since 5.0
+ * @since 5.2
*/
-public interface ProtocolIOSession extends IOSession, TransportSecurityLayer {
+@Internal
+public interface ProtocolUpgradeHandler {
- NamedEndpoint getInitialEndpoint();
+ /**
+ * Upgrades application protocol of the given I/O session.
+ */
+ void upgrade(ProtocolIOSession ioSession, FutureCallback<ProtocolIOSession> callback);
}