You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2021/03/11 22:12:30 UTC

[httpcomponents-core] 06/06: Protocol upgrade APIs redesign

This is an automated email from the ASF dual-hosted git repository.

olegk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git

commit 5db3bc3eb9107253f3169d3028305d5f99e71e3b
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Thu Dec 17 20:59:03 2020 +0100

    Protocol upgrade APIs redesign
---
 .../http2/impl/nio/ClientH2UpgradeHandler.java     |  69 ++++++
 .../impl/nio/ClientHttpProtocolNegotiator.java     |   1 +
 .../impl/nio/H2OnlyClientProtocolNegotiator.java   |  13 +-
 .../http2/impl/nio/ServerH2UpgradeHandler.java     |  69 ++++++
 .../impl/nio/ServerHttpProtocolNegotiator.java     |   1 +
 .../http2/impl/nio/bootstrap/H2AsyncRequester.java |  60 ++++++
 .../impl/nio/bootstrap/H2RequesterBootstrap.java   |   4 +-
 .../examples/H2ViaHttp1ProxyExecutionExample.java  | 233 +++++++++++++++++++++
 .../http/impl/bootstrap/HttpAsyncRequester.java    |  66 +++++-
 .../nio/ssl/TlsUpgradeCapable.java}                |  15 +-
 .../hc/core5/reactor/InternalDataChannel.java      |  24 +++
 .../apache/hc/core5/reactor/ProtocolIOSession.java |  24 +++
 ...lIOSession.java => ProtocolUpgradeHandler.java} |  17 +-
 13 files changed, 579 insertions(+), 17 deletions(-)

diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2UpgradeHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2UpgradeHandler.java
new file mode 100644
index 0000000..ee548f8
--- /dev/null
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2UpgradeHandler.java
@@ -0,0 +1,69 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http2.impl.nio;
+
+import java.io.IOException;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.Internal;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.impl.nio.HttpConnectionEventHandler;
+import org.apache.hc.core5.reactor.ProtocolIOSession;
+import org.apache.hc.core5.reactor.ProtocolUpgradeHandler;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * Protocol upgrade handler that upgrades the underlying {@link ProtocolIOSession}
+ * to HTTP/2 in case of a successful protocol negotiation.
+ *
+ * @since 5.2
+ */
+@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
+@Internal
+public class ClientH2UpgradeHandler implements ProtocolUpgradeHandler {
+
+    private final ClientH2StreamMultiplexerFactory http2StreamHandlerFactory;
+
+    public ClientH2UpgradeHandler(final ClientH2StreamMultiplexerFactory http2StreamHandlerFactory) {
+        this.http2StreamHandlerFactory = Args.notNull(http2StreamHandlerFactory, "HTTP/2 stream handler factory");
+    }
+
+    @Override
+    public void upgrade(final ProtocolIOSession ioSession, final FutureCallback<ProtocolIOSession> callback) {
+        final HttpConnectionEventHandler protocolNegotiator = new H2OnlyClientProtocolNegotiator(
+                ioSession, http2StreamHandlerFactory, true, callback);
+        ioSession.upgrade(protocolNegotiator);
+        try {
+            protocolNegotiator.connected(ioSession);
+        } catch (final IOException ex) {
+            protocolNegotiator.exception(ioSession, ex);
+        }
+    }
+
+}
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
index a1a7f4b..9c2152f 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
@@ -95,6 +95,7 @@ public class ClientHttpProtocolNegotiator extends ProtocolNegotiatorBase {
     private void startHttp1() throws IOException {
         final ByteBuffer data = inBuf != null ? inBuf.data() : null;
         startProtocol(new ClientHttp1IOEventHandler(http1StreamHandlerFactory.create(ioSession)), data);
+        ioSession.registerProtocol(ApplicationProtocol.HTTP_2.id, new ClientH2UpgradeHandler(http2StreamHandlerFactory));
         if (inBuf != null) {
             inBuf.clear();
         }
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2OnlyClientProtocolNegotiator.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2OnlyClientProtocolNegotiator.java
index a4f7df5..388e034 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2OnlyClientProtocolNegotiator.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2OnlyClientProtocolNegotiator.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hc.core5.annotation.Internal;
 import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.impl.nio.BufferedData;
 import org.apache.hc.core5.http2.ssl.ApplicationProtocol;
 import org.apache.hc.core5.reactor.IOSession;
 import org.apache.hc.core5.reactor.ProtocolIOSession;
@@ -56,6 +57,7 @@ public class H2OnlyClientProtocolNegotiator extends ProtocolNegotiatorBase {
     private final AtomicBoolean initialized;
 
     private volatile ByteBuffer preface;
+    private volatile BufferedData inBuf;
 
     public H2OnlyClientProtocolNegotiator(
             final ProtocolIOSession ioSession,
@@ -103,7 +105,11 @@ public class H2OnlyClientProtocolNegotiator extends ProtocolNegotiatorBase {
         if (!preface.hasRemaining()) {
             session.clearEvent(SelectionKey.OP_WRITE);
             final ClientH2StreamMultiplexer streamMultiplexer = http2StreamHandlerFactory.create(ioSession);
-            startProtocol(new ClientH2IOEventHandler(streamMultiplexer), null);
+            final ByteBuffer data = inBuf != null ? inBuf.data() : null;
+            startProtocol(new ClientH2IOEventHandler(streamMultiplexer), data);
+            if (inBuf != null) {
+                inBuf.clear();
+            }
             preface = null;
         }
     }
@@ -130,7 +136,10 @@ public class H2OnlyClientProtocolNegotiator extends ProtocolNegotiatorBase {
     @Override
     public void inputReady(final IOSession session, final ByteBuffer src) throws IOException {
         if (src != null) {
-            throw new ProtocolNegotiationException("Unexpected input");
+            if (inBuf == null) {
+                inBuf = BufferedData.allocate(src.remaining());
+            }
+            inBuf.put(src);
         }
         if (preface != null) {
             writeOutPreface(session);
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerH2UpgradeHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerH2UpgradeHandler.java
new file mode 100644
index 0000000..181a465
--- /dev/null
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerH2UpgradeHandler.java
@@ -0,0 +1,69 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http2.impl.nio;
+
+import java.io.IOException;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.Internal;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.impl.nio.HttpConnectionEventHandler;
+import org.apache.hc.core5.reactor.ProtocolIOSession;
+import org.apache.hc.core5.reactor.ProtocolUpgradeHandler;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * Protocol upgrade handler that upgrades the underlying {@link ProtocolIOSession}
+ * to HTTP/2 in case of a successful protocol negotiation.
+ *
+ * @since 5.2
+ */
+@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
+@Internal
+public class ServerH2UpgradeHandler implements ProtocolUpgradeHandler {
+
+    private final ServerH2StreamMultiplexerFactory http2StreamHandlerFactory;
+
+    public ServerH2UpgradeHandler(final ServerH2StreamMultiplexerFactory http2StreamHandlerFactory) {
+        this.http2StreamHandlerFactory = Args.notNull(http2StreamHandlerFactory, "HTTP/2 stream handler factory");
+    }
+
+    @Override
+    public void upgrade(final ProtocolIOSession ioSession, final FutureCallback<ProtocolIOSession> callback) {
+        final HttpConnectionEventHandler protocolNegotiator = new H2OnlyServerHttpProtocolNegotiator(
+                ioSession, http2StreamHandlerFactory, callback);
+        ioSession.upgrade(protocolNegotiator);
+        try {
+            protocolNegotiator.connected(ioSession);
+        } catch (final IOException ex) {
+            protocolNegotiator.exception(ioSession, ex);
+        }
+    }
+
+}
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
index 8d9aed2..f6febff 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
@@ -96,6 +96,7 @@ public class ServerHttpProtocolNegotiator extends ProtocolNegotiatorBase {
                 tlsDetails != null ? URIScheme.HTTPS.id : URIScheme.HTTP.id,
                 ioSession);
         startProtocol(new ServerHttp1IOEventHandler(http1StreamHandler), data);
+        ioSession.registerProtocol(ApplicationProtocol.HTTP_2.id, new ServerH2UpgradeHandler(http2StreamHandlerFactory));
     }
 
     private void startHttp2(final ByteBuffer data) throws IOException {
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2AsyncRequester.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2AsyncRequester.java
index de22471..e88a11f 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2AsyncRequester.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2AsyncRequester.java
@@ -30,18 +30,24 @@ package org.apache.hc.core5.http2.impl.nio.bootstrap;
 import java.util.concurrent.Future;
 
 import org.apache.hc.core5.annotation.Internal;
+import org.apache.hc.core5.concurrent.CallbackContribution;
 import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.function.Callback;
 import org.apache.hc.core5.function.Decorator;
 import org.apache.hc.core5.http.HttpHost;
 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
 import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
+import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
 import org.apache.hc.core5.http2.HttpVersionPolicy;
+import org.apache.hc.core5.http2.ssl.ApplicationProtocol;
+import org.apache.hc.core5.net.NamedEndpoint;
 import org.apache.hc.core5.pool.ManagedConnPool;
 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
 import org.apache.hc.core5.reactor.IOReactorConfig;
 import org.apache.hc.core5.reactor.IOSession;
 import org.apache.hc.core5.reactor.IOSessionListener;
+import org.apache.hc.core5.reactor.ProtocolIOSession;
+import org.apache.hc.core5.reactor.ssl.TlsDetails;
 import org.apache.hc.core5.util.Timeout;
 
 /**
@@ -70,6 +76,27 @@ public class H2AsyncRequester extends HttpAsyncRequester {
         this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
     }
 
+    /**
+     * Use {@link H2RequesterBootstrap} to create instances of this class.
+     *
+     * @since 5.2
+     */
+    @Internal
+    public H2AsyncRequester(
+            final HttpVersionPolicy versionPolicy,
+            final IOReactorConfig ioReactorConfig,
+            final IOEventHandlerFactory eventHandlerFactory,
+            final Decorator<IOSession> ioSessionDecorator,
+            final Callback<Exception> exceptionCallback,
+            final IOSessionListener sessionListener,
+            final ManagedConnPool<HttpHost, IOSession> connPool,
+            final TlsStrategy tlsStrategy,
+            final Timeout handshakeTimeout) {
+        super(ioReactorConfig, eventHandlerFactory, ioSessionDecorator, exceptionCallback, sessionListener, connPool,
+                tlsStrategy, handshakeTimeout);
+        this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
+    }
+
     @Override
     protected Future<AsyncClientEndpoint> doConnect(
             final HttpHost host,
@@ -79,4 +106,37 @@ public class H2AsyncRequester extends HttpAsyncRequester {
         return super.doConnect(host, timeout, attachment != null ? attachment : versionPolicy, callback);
     }
 
+    @Override
+    protected void doTlsUpgrade(final ProtocolIOSession ioSession,
+                                final NamedEndpoint endpoint,
+                                final FutureCallback<ProtocolIOSession> callback) {
+        super.doTlsUpgrade(ioSession, endpoint, new CallbackContribution<ProtocolIOSession>(callback) {
+
+            @Override
+            public void completed(final ProtocolIOSession protocolSession) {
+                final boolean switchProtocol;
+                switch (versionPolicy) {
+                    case FORCE_HTTP_2:
+                        switchProtocol = true;
+                        break;
+                    case NEGOTIATE:
+                        final TlsDetails tlsDetails = protocolSession.getTlsDetails();
+                        final String appProtocol = tlsDetails != null ? tlsDetails.getApplicationProtocol() : null;
+                        switchProtocol = ApplicationProtocol.HTTP_2.id.equals(appProtocol);
+                        break;
+                    default:
+                        switchProtocol = false;
+                }
+                if (switchProtocol) {
+                    protocolSession.switchProtocol(ApplicationProtocol.HTTP_2.id, callback);
+                } else {
+                    if (callback != null) {
+                        callback.completed(protocolSession);
+                    }
+                }
+            }
+
+        });
+    }
+
 }
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java
index ab94662..dc5883c 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java
@@ -348,7 +348,9 @@ public class H2RequesterBootstrap {
                 ioSessionDecorator,
                 exceptionCallback,
                 sessionListener,
-                connPool);
+                connPool,
+                actualTlsStrategy,
+                handshakeTimeout);
     }
 
 }
diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2ViaHttp1ProxyExecutionExample.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2ViaHttp1ProxyExecutionExample.java
new file mode 100644
index 0000000..9f2e1fd
--- /dev/null
+++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2ViaHttp1ProxyExecutionExample.java
@@ -0,0 +1,233 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http2.examples;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hc.core5.concurrent.ComplexFuture;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.concurrent.FutureContribution;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpConnection;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.Method;
+import org.apache.hc.core5.http.impl.Http1StreamListener;
+import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
+import org.apache.hc.core5.http.message.BasicHttpRequest;
+import org.apache.hc.core5.http.message.RequestLine;
+import org.apache.hc.core5.http.message.StatusLine;
+import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
+import org.apache.hc.core5.http.nio.entity.NoopEntityConsumer;
+import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
+import org.apache.hc.core5.http.nio.ssl.TlsUpgradeCapable;
+import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
+import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
+import org.apache.hc.core5.http2.HttpVersionPolicy;
+import org.apache.hc.core5.http2.config.H2Config;
+import org.apache.hc.core5.http2.frame.RawFrame;
+import org.apache.hc.core5.http2.impl.nio.H2StreamListener;
+import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.reactor.ProtocolIOSession;
+import org.apache.hc.core5.util.Timeout;
+
+/**
+ * Example of asynchronous HTTP/2 request execution via a HTTP/1.1 proxy.
+ */
+public class H2ViaHttp1ProxyExecutionExample {
+
+    public static void main(final String[] args) throws Exception {
+
+        // Create and start requester
+        final H2Config h2Config = H2Config.custom()
+                .setPushEnabled(false)
+                .build();
+
+        final HttpAsyncRequester requester = H2RequesterBootstrap.bootstrap()
+                .setH2Config(h2Config)
+                .setVersionPolicy(HttpVersionPolicy.NEGOTIATE)
+                .setStreamListener(new Http1StreamListener() {
+
+                    @Override
+                    public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
+                        System.out.println(connection.getRemoteAddress() + " " + new RequestLine(request));
+                    }
+
+                    @Override
+                    public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
+                        System.out.println(connection.getRemoteAddress() + " " + new StatusLine(response));
+                    }
+
+                    @Override
+                    public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
+                        if (keepAlive) {
+                            System.out.println(connection.getRemoteAddress() + " exchange completed (connection kept alive)");
+                        } else {
+                            System.out.println(connection.getRemoteAddress() + " exchange completed (connection closed)");
+                        }
+                    }
+
+                })
+                .setStreamListener(new H2StreamListener() {
+
+                    @Override
+                    public void onHeaderInput(final HttpConnection connection, final int streamId, final List<? extends Header> headers) {
+                        for (int i = 0; i < headers.size(); i++) {
+                            System.out.println(connection.getRemoteAddress() + " (" + streamId + ") << " + headers.get(i));
+                        }
+                    }
+
+                    @Override
+                    public void onHeaderOutput(final HttpConnection connection, final int streamId, final List<? extends Header> headers) {
+                        for (int i = 0; i < headers.size(); i++) {
+                            System.out.println(connection.getRemoteAddress() + " (" + streamId + ") >> " + headers.get(i));
+                        }
+                    }
+
+                    @Override
+                    public void onFrameInput(final HttpConnection connection, final int streamId, final RawFrame frame) {
+                    }
+
+                    @Override
+                    public void onFrameOutput(final HttpConnection connection, final int streamId, final RawFrame frame) {
+                    }
+
+                    @Override
+                    public void onInputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
+                    }
+
+                    @Override
+                    public void onOutputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
+                    }
+
+                })
+                .create();
+
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            System.out.println("HTTP requester shutting down");
+            requester.close(CloseMode.GRACEFUL);
+        }));
+        requester.start();
+
+        final HttpHost proxy = new HttpHost("localhost", 8888);
+        final HttpHost target = new HttpHost("https", "nghttp2.org");
+
+        final ComplexFuture<AsyncClientEndpoint> tunnelFuture = new ComplexFuture<>(null);
+        tunnelFuture.setDependency(requester.connect(
+                proxy,
+                Timeout.ofSeconds(30),
+                null,
+                new FutureContribution<AsyncClientEndpoint>(tunnelFuture) {
+
+                    @Override
+                    public void completed(final AsyncClientEndpoint endpoint) {
+                        if (endpoint instanceof TlsUpgradeCapable) {
+                            final HttpRequest connect = new BasicHttpRequest(Method.CONNECT, proxy, target.toHostString());
+                            endpoint.execute(
+                                    new BasicRequestProducer(connect, null),
+                                    new BasicResponseConsumer<>(new NoopEntityConsumer()),
+                                    new FutureContribution<Message<HttpResponse, Void>>(tunnelFuture) {
+
+                                        @Override
+                                        public void completed(final Message<HttpResponse, Void> message) {
+                                            final HttpResponse response = message.getHead();
+                                            if (response.getCode() == HttpStatus.SC_OK) {
+                                                ((TlsUpgradeCapable) endpoint).tlsUpgrade(
+                                                        target,
+                                                        new FutureContribution<ProtocolIOSession>(tunnelFuture) {
+
+                                                            @Override
+                                                            public void completed(final ProtocolIOSession protocolSession) {
+                                                                System.out.println("Tunnel to " + target + " via " + proxy + " established");
+                                                                tunnelFuture.completed(endpoint);
+                                                            }
+
+                                                        });
+                                            } else {
+                                                tunnelFuture.failed(new HttpException("Tunnel refused: " + new StatusLine(response)));
+                                            }
+                                        }
+
+                                    });
+                        } else {
+                            tunnelFuture.failed(new IllegalStateException("TLS upgrade not supported"));
+                        }
+                    }
+
+                }));
+
+        final String[] requestUris = new String[] {"/httpbin/ip", "/httpbin/user-agent", "/httpbin/headers"};
+        final AsyncClientEndpoint endpoint = tunnelFuture.get(1, TimeUnit.MINUTES);
+        try {
+            final CountDownLatch latch = new CountDownLatch(requestUris.length);
+            for (final String requestUri : requestUris) {
+                endpoint.execute(
+                        new BasicRequestProducer(Method.GET, target, requestUri),
+                        new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
+                        new FutureCallback<Message<HttpResponse, String>>() {
+
+                            @Override
+                            public void completed(final Message<HttpResponse, String> message) {
+                                final HttpResponse response = message.getHead();
+                                final String body = message.getBody();
+                                System.out.println(requestUri + "->" + response.getCode());
+                                System.out.println(body);
+                                latch.countDown();
+                            }
+
+                            @Override
+                            public void failed(final Exception ex) {
+                                System.out.println(requestUri + "->" + ex);
+                                latch.countDown();
+                            }
+
+                            @Override
+                            public void cancelled() {
+                                System.out.println(requestUri + " cancelled");
+                                latch.countDown();
+                            }
+
+                        });
+            }
+
+            latch.await();
+        } finally {
+            endpoint.releaseAndDiscard();
+        }
+
+        System.out.println("Shutting down I/O reactor");
+        requester.initiateShutdown();
+    }
+
+}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
index 6ebe715..c87b2d4 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hc.core5.annotation.Internal;
 import org.apache.hc.core5.concurrent.BasicFuture;
+import org.apache.hc.core5.concurrent.CallbackContribution;
 import org.apache.hc.core5.concurrent.ComplexFuture;
 import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.concurrent.FutureContribution;
@@ -61,10 +62,13 @@ import org.apache.hc.core5.http.nio.HandlerFactory;
 import org.apache.hc.core5.http.nio.RequestChannel;
 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
+import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
+import org.apache.hc.core5.http.nio.ssl.TlsUpgradeCapable;
 import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
 import org.apache.hc.core5.http.protocol.HttpContext;
 import org.apache.hc.core5.http.protocol.HttpCoreContext;
 import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.net.NamedEndpoint;
 import org.apache.hc.core5.net.URIAuthority;
 import org.apache.hc.core5.pool.ConnPoolControl;
 import org.apache.hc.core5.pool.ManagedConnPool;
@@ -77,6 +81,8 @@ import org.apache.hc.core5.reactor.IOEventHandlerFactory;
 import org.apache.hc.core5.reactor.IOReactorConfig;
 import org.apache.hc.core5.reactor.IOSession;
 import org.apache.hc.core5.reactor.IOSessionListener;
+import org.apache.hc.core5.reactor.ProtocolIOSession;
+import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
 import org.apache.hc.core5.util.Args;
 import org.apache.hc.core5.util.TimeValue;
 import org.apache.hc.core5.util.Timeout;
@@ -89,9 +95,13 @@ import org.apache.hc.core5.util.Timeout;
 public class HttpAsyncRequester extends AsyncRequester implements ConnPoolControl<HttpHost> {
 
     private final ManagedConnPool<HttpHost, IOSession> connPool;
+    private final TlsStrategy tlsStrategy;
+    private final Timeout handshakeTimeout;
 
     /**
      * Use {@link AsyncRequesterBootstrap} to create instances of this class.
+     *
+     * @since 5.2
      */
     @Internal
     public HttpAsyncRequester(
@@ -100,10 +110,29 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
             final Decorator<IOSession> ioSessionDecorator,
             final Callback<Exception> exceptionCallback,
             final IOSessionListener sessionListener,
-            final ManagedConnPool<HttpHost, IOSession> connPool) {
+            final ManagedConnPool<HttpHost, IOSession> connPool,
+            final TlsStrategy tlsStrategy,
+            final Timeout handshakeTimeout) {
         super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener,
                 ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE);
         this.connPool = Args.notNull(connPool, "Connection pool");
+        this.tlsStrategy = tlsStrategy;
+        this.handshakeTimeout = handshakeTimeout;
+    }
+
+    /**
+     * Use {@link AsyncRequesterBootstrap} to create instances of this class.
+     */
+    @Internal
+    public HttpAsyncRequester(
+            final IOReactorConfig ioReactorConfig,
+            final IOEventHandlerFactory eventHandlerFactory,
+            final Decorator<IOSession> ioSessionDecorator,
+            final Callback<Exception> exceptionCallback,
+            final IOSessionListener sessionListener,
+            final ManagedConnPool<HttpHost, IOSession> connPool) {
+        this(ioReactorConfig, eventHandlerFactory, ioSessionDecorator, exceptionCallback, sessionListener, connPool,
+                null, null);
     }
 
     @Override
@@ -403,7 +432,31 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
         return execute(requestProducer, responseConsumer, null, timeout, null, callback);
     }
 
-    private class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
+    protected void doTlsUpgrade(
+            final ProtocolIOSession ioSession,
+            final NamedEndpoint endpoint,
+            final FutureCallback<ProtocolIOSession> callback) {
+        if (tlsStrategy != null) {
+            tlsStrategy.upgrade(ioSession,
+                    endpoint,
+                    null,
+                    handshakeTimeout,
+                    new CallbackContribution<TransportSecurityLayer>(callback) {
+
+                        @Override
+                        public void completed(final TransportSecurityLayer transportSecurityLayer) {
+                            if (callback != null) {
+                                callback.completed(ioSession);
+                            }
+                        }
+
+                    });
+        } else {
+            throw new IllegalStateException("TLS upgrade not supported");
+        }
+    }
+
+    private class InternalAsyncClientEndpoint extends AsyncClientEndpoint implements TlsUpgradeCapable {
 
         final AtomicReference<PoolEntry<HttpHost, IOSession>> poolEntryRef;
 
@@ -471,6 +524,15 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
             }
         }
 
+        @Override
+        public void tlsUpgrade(final NamedEndpoint endpoint, final FutureCallback<ProtocolIOSession> callback) {
+            final IOSession ioSession = getIOSession();
+            if (ioSession instanceof ProtocolIOSession) {
+                doTlsUpgrade((ProtocolIOSession) ioSession, endpoint, callback);
+            } else {
+                throw new IllegalStateException("TLS upgrade not supported");
+            }
+        }
     }
 
 }
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolIOSession.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/ssl/TlsUpgradeCapable.java
similarity index 76%
copy from httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolIOSession.java
copy to httpcore5/src/main/java/org/apache/hc/core5/http/nio/ssl/TlsUpgradeCapable.java
index 35e8e29..23ee3bd 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolIOSession.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/ssl/TlsUpgradeCapable.java
@@ -25,18 +25,21 @@
  *
  */
 
-package org.apache.hc.core5.reactor;
+package org.apache.hc.core5.http.nio.ssl;
 
+import org.apache.hc.core5.annotation.Internal;
+import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.net.NamedEndpoint;
-import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
+import org.apache.hc.core5.reactor.ProtocolIOSession;
 
 /**
- * TLS capable, protocol upgradable {@link IOSession}.
+ * Capability to upgrade to TLS.
  *
- * @since 5.0
+ * @since 5.2
  */
-public interface ProtocolIOSession extends IOSession, TransportSecurityLayer {
+@Internal
+public interface TlsUpgradeCapable {
 
-    NamedEndpoint getInitialEndpoint();
+    void tlsUpgrade(NamedEndpoint endpoint, final FutureCallback<ProtocolIOSession> callback);
 
 }
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
index e292a69..570b2c6 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
@@ -32,7 +32,10 @@ import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.ByteChannel;
 import java.nio.channels.SelectionKey;
+import java.util.Locale;
 import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
@@ -50,6 +53,7 @@ import org.apache.hc.core5.reactor.ssl.SSLSessionInitializer;
 import org.apache.hc.core5.reactor.ssl.SSLSessionVerifier;
 import org.apache.hc.core5.reactor.ssl.TlsDetails;
 import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
+import org.apache.hc.core5.util.Args;
 import org.apache.hc.core5.util.Asserts;
 import org.apache.hc.core5.util.Timeout;
 
@@ -63,6 +67,7 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
     private final AtomicReference<SSLIOSession> tlsSessionRef;
     private final AtomicReference<IOSession> currentSessionRef;
     private final AtomicReference<FutureCallback<TransportSecurityLayer>> tlsHandshakeCallbackRef;
+    private final ConcurrentMap<String, ProtocolUpgradeHandler> protocolUpgradeHandlerMap;
     private final AtomicBoolean closed;
 
     InternalDataChannel(
@@ -80,6 +85,7 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
         this.currentSessionRef = new AtomicReference<>(
                 ioSessionDecorator != null ? ioSessionDecorator.decorate(ioSession) : ioSession);
         this.tlsHandshakeCallbackRef = new AtomicReference<>(null);
+        this.protocolUpgradeHandlerMap = new ConcurrentHashMap<>();
         this.closed = new AtomicBoolean(false);
     }
 
@@ -404,6 +410,24 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
     }
 
     @Override
+    public void switchProtocol(final String protocolId, final FutureCallback<ProtocolIOSession> callback) {
+        Args.notEmpty(protocolId, "Application protocol ID");
+        final ProtocolUpgradeHandler upgradeHandler = protocolUpgradeHandlerMap.get(protocolId.toLowerCase(Locale.ROOT));
+        if (upgradeHandler != null) {
+            upgradeHandler.upgrade(this, callback);
+        } else {
+            throw new IllegalStateException("Unsupported protocol: " + protocolId);
+        }
+    }
+
+    @Override
+    public void registerProtocol(final String protocolId, final ProtocolUpgradeHandler upgradeHandler) {
+        Args.notEmpty(protocolId, "Application protocol ID");
+        Args.notNull(upgradeHandler, "Protocol upgrade handler");
+        protocolUpgradeHandlerMap.put(protocolId.toLowerCase(Locale.ROOT), upgradeHandler);
+    }
+
+    @Override
     public String toString() {
         final IOSession currentSession = currentSessionRef.get();
         if (currentSession != null) {
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolIOSession.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolIOSession.java
index 35e8e29..cebfb06 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolIOSession.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolIOSession.java
@@ -27,6 +27,7 @@
 
 package org.apache.hc.core5.reactor;
 
+import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.net.NamedEndpoint;
 import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
 
@@ -37,6 +38,29 @@ import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
  */
 public interface ProtocolIOSession extends IOSession, TransportSecurityLayer {
 
+    /**
+     * Switches this I/O session to the application protocol with the given ID.
+     * @param protocolId the application protocol ID
+     * @param callback the result callback
+     * @throws UnsupportedOperationException if application protocol switch
+     * is not supported.
+     */
+    default void switchProtocol(String protocolId, FutureCallback<ProtocolIOSession> callback) throws UnsupportedOperationException {
+        throw new UnsupportedOperationException("Protocol switch not supported");
+    }
+
+    /**
+     * Registers protocol upgrade handler with the given application protocol ID.
+     *
+     * @since 5.2
+     * @param protocolId the application protocol ID
+     * @param upgradeHandler the upgrade handler.
+     *
+     * @since 5.2
+     */
+    default void registerProtocol(String protocolId, ProtocolUpgradeHandler upgradeHandler) {
+    }
+
     NamedEndpoint getInitialEndpoint();
 
 }
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolIOSession.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolUpgradeHandler.java
similarity index 72%
copy from httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolIOSession.java
copy to httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolUpgradeHandler.java
index 35e8e29..fef4bb7 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolIOSession.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolUpgradeHandler.java
@@ -27,16 +27,21 @@
 
 package org.apache.hc.core5.reactor;
 
-import org.apache.hc.core5.net.NamedEndpoint;
-import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
+import org.apache.hc.core5.annotation.Internal;
+import org.apache.hc.core5.concurrent.FutureCallback;
 
 /**
- * TLS capable, protocol upgradable {@link IOSession}.
+ * Application protocol upgrade handler. This routine can be used to upgrade
+ * I/O sessions to a new application protocol.
  *
- * @since 5.0
+ * @since 5.2
  */
-public interface ProtocolIOSession extends IOSession, TransportSecurityLayer {
+@Internal
+public interface ProtocolUpgradeHandler {
 
-    NamedEndpoint getInitialEndpoint();
+    /**
+     * Upgrades application protocol of the given I/O session.
+     */
+    void upgrade(ProtocolIOSession ioSession, FutureCallback<ProtocolIOSession> callback);
 
 }