You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2020/11/03 20:57:58 UTC
[httpcomponents-core] 02/02: Application protocol upgrade for
non-blocking I/O sessions
This is an automated email from the ASF dual-hosted git repository.
olegk pushed a commit to branch http11_protocol_upgrade
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git
commit e073a3dcbf194aa20f37a5ad2cf0c3da7f907f17
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Fri Oct 30 16:10:54 2020 +0100
Application protocol upgrade for non-blocking I/O sessions
---
.../hc/core5/http2/H2ConnectionException.java | 2 +-
.../hc/core5/http2/H2StreamResetException.java | 2 +-
.../http2/impl/nio/AbstractH2UpgradeHandler.java | 101 +++++++++
.../http2/impl/nio/ClientH2UpgradeHandler.java | 72 +++++++
.../nio/ClientHttpProtocolNegotiatorFactory.java | 16 +-
.../nio/H2OnlyServerHttpProtocolNegotiator.java | 110 ++++++++++
.../http2/impl/nio/ServerH2UpgradeHandler.java | 69 ++++++
.../impl/nio/ServerHttpProtocolNegotiator.java | 1 -
.../nio/ServerHttpProtocolNegotiatorFactory.java | 20 +-
.../http2/impl/nio/bootstrap/H2AsyncRequester.java | 22 ++
.../impl/nio/bootstrap/H2RequesterBootstrap.java | 44 +++-
.../impl/nio/bootstrap/H2ServerBootstrap.java | 27 ++-
.../hc/core5/http2/ssl/TlsUpgradeHandler.java | 57 +++--
.../examples/H2ViaHttp1ProxyExecutionExample.java | 236 +++++++++++++++++++++
.../http/impl/bootstrap/HttpAsyncRequester.java | 76 +++++--
.../core5/http/impl/bootstrap/HttpAsyncServer.java | 32 ++-
.../http/impl/nio/AbstractHttp1StreamDuplexer.java | 31 ++-
.../impl/nio/ClientHttp1IOEventHandlerFactory.java | 9 +-
.../http/impl/nio/ClientHttp1StreamDuplexer.java | 24 ++-
.../impl/nio/ClientHttp1StreamDuplexerFactory.java | 24 ++-
.../impl/nio/ServerHttp1IOEventHandlerFactory.java | 5 +-
.../http/impl/nio/ServerHttp1StreamDuplexer.java | 27 ++-
.../impl/nio/ServerHttp1StreamDuplexerFactory.java | 27 ++-
.../apache/hc/core5/http/nio/ssl/TlsStrategy.java | 2 +-
.../impl/nio => reactor}/EndpointParameters.java | 50 ++++-
.../hc/core5/reactor/InternalDataChannel.java | 61 +++++-
.../ProtocolLayer.java} | 30 ++-
.../ProtocolUpgradeHandler.java} | 28 +--
.../TransportSecurityLayerEx.java} | 29 +--
.../apache/hc/core5/reactor/ssl/SSLIOSession.java | 2 +-
30 files changed, 1072 insertions(+), 164 deletions(-)
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2ConnectionException.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2ConnectionException.java
index a47e61c..9e69531 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2ConnectionException.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2ConnectionException.java
@@ -44,7 +44,7 @@ public class H2ConnectionException extends IOException {
public H2ConnectionException(final H2Error error, final String message) {
super(message);
- Args.notNull(error, "H2 Error code may not be null");
+ Args.notNull(error, "H2 Error code");
this.code = error.getCode();
}
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamResetException.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamResetException.java
index 2be857e..ec978da 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamResetException.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamResetException.java
@@ -41,7 +41,7 @@ public class H2StreamResetException extends HttpStreamResetException {
public H2StreamResetException(final H2Error error, final String message) {
super(message);
- Args.notNull(error, "H2 Error code may not be null");
+ Args.notNull(error, "H2 Error code");
this.code = error.getCode();
}
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2UpgradeHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2UpgradeHandler.java
new file mode 100644
index 0000000..67fbbe3
--- /dev/null
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2UpgradeHandler.java
@@ -0,0 +1,101 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http2.impl.nio;
+
+import java.io.IOException;
+
+import org.apache.hc.core5.concurrent.CallbackContribution;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.URIScheme;
+import org.apache.hc.core5.reactor.EndpointParameters;
+import org.apache.hc.core5.http.impl.nio.HttpConnectionEventHandler;
+import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
+import org.apache.hc.core5.http2.HttpVersionPolicy;
+import org.apache.hc.core5.reactor.ProtocolIOSession;
+import org.apache.hc.core5.reactor.ProtocolUpgradeHandler;
+import org.apache.hc.core5.reactor.TransportSecurityLayerEx;
+import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.Timeout;
+
+abstract class AbstractH2UpgradeHandler implements ProtocolUpgradeHandler {
+
+ private final TlsStrategy tlsStrategy;
+ private final Timeout handshakeTimeout;
+
+ AbstractH2UpgradeHandler(final TlsStrategy tlsStrategy, final Timeout handshakeTimeout) {
+ this.tlsStrategy = tlsStrategy;
+ this.handshakeTimeout = handshakeTimeout;
+ }
+
+ protected abstract HttpConnectionEventHandler createProtocolNegotiator(ProtocolIOSession ioSession,
+ FutureCallback<ProtocolIOSession> callback);
+
+ private void negotiateHttp(final ProtocolIOSession ioSession,
+ final FutureCallback<ProtocolIOSession> callback) {
+ final HttpConnectionEventHandler protocolNegotiator = createProtocolNegotiator(ioSession, callback);
+ ioSession.upgrade(protocolNegotiator);
+ try {
+ protocolNegotiator.connected(ioSession);
+ } catch (final IOException ex) {
+ protocolNegotiator.exception(ioSession, ex);
+ }
+ }
+
+ @Override
+ public final void upgrade(final ProtocolIOSession ioSession,
+ final EndpointParameters parameters,
+ final FutureCallback<ProtocolIOSession> callback) {
+ Args.notNull(parameters, "Endpoint parameters");
+ if (URIScheme.HTTPS.same(parameters.getScheme())) {
+ if (ioSession instanceof TransportSecurityLayerEx) {
+ final TransportSecurityLayerEx transportSecurityLayer = (TransportSecurityLayerEx) ioSession;
+ transportSecurityLayer.subscribe(new CallbackContribution<ProtocolIOSession>(callback) {
+
+ @Override
+ public void completed(final ProtocolIOSession result) {
+ negotiateHttp(ioSession, callback);
+ }
+
+ });
+ tlsStrategy.upgrade(
+ transportSecurityLayer,
+ new HttpHost(parameters.getScheme(), parameters.getHostName(), parameters.getPort()),
+ ioSession.getLocalAddress(),
+ ioSession.getRemoteAddress(),
+ HttpVersionPolicy.FORCE_HTTP_2,
+ handshakeTimeout);
+ } else {
+ throw new UnsupportedOperationException("TLS upgrade not supported");
+ }
+ } else {
+ negotiateHttp(ioSession, callback);
+ }
+ }
+
+}
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2UpgradeHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2UpgradeHandler.java
new file mode 100644
index 0000000..0c74214
--- /dev/null
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2UpgradeHandler.java
@@ -0,0 +1,72 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http2.impl.nio;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.Internal;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.impl.nio.ClientHttp1StreamDuplexerFactory;
+import org.apache.hc.core5.http.impl.nio.HttpConnectionEventHandler;
+import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
+import org.apache.hc.core5.http2.HttpVersionPolicy;
+import org.apache.hc.core5.reactor.ProtocolIOSession;
+import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.Timeout;
+
+/**
+ * Protocol upgrade handler that upgrades the underlying {@link ProtocolIOSession}
+ * to HTTP/2 in case of a successful protocol negotiation.
+ *
+ * @since 5.1
+ */
+@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
+@Internal
+public class ClientH2UpgradeHandler extends AbstractH2UpgradeHandler {
+
+ private final ClientH2StreamMultiplexerFactory http2StreamHandlerFactory;
+
+ public ClientH2UpgradeHandler(
+ final ClientH2StreamMultiplexerFactory http2StreamHandlerFactory,
+ final TlsStrategy tlsStrategy,
+ final Timeout handshakeTimeout) {
+ super(tlsStrategy, handshakeTimeout);
+ this.http2StreamHandlerFactory = Args.notNull(http2StreamHandlerFactory, "HTTP/2 stream handler factory");
+ }
+
+ @Override
+ protected HttpConnectionEventHandler createProtocolNegotiator(final ProtocolIOSession ioSession,
+ final FutureCallback<ProtocolIOSession> callback) {
+ return new H2OnlyClientProtocolNegotiator(
+ ioSession,
+ http2StreamHandlerFactory,
+ true,
+ callback);
+ }
+
+}
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiatorFactory.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiatorFactory.java
index 0f63ae9..6800702 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiatorFactory.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiatorFactory.java
@@ -33,10 +33,9 @@ import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http.impl.nio.ClientHttp1StreamDuplexerFactory;
-import org.apache.hc.core5.http.impl.nio.EndpointParameters;
+import org.apache.hc.core5.reactor.EndpointParameters;
import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
import org.apache.hc.core5.http2.HttpVersionPolicy;
-import org.apache.hc.core5.net.NamedEndpoint;
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.ProtocolIOSession;
import org.apache.hc.core5.util.Args;
@@ -66,7 +65,7 @@ public class ClientHttpProtocolNegotiatorFactory implements IOEventHandlerFactor
this.http1StreamHandlerFactory = Args.notNull(http1StreamHandlerFactory, "HTTP/1.1 stream handler factory");
this.http2StreamHandlerFactory = Args.notNull(http2StreamHandlerFactory, "HTTP/2 stream handler factory");
this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
- this.tlsStrategy = tlsStrategy;
+ this.tlsStrategy = Args.notNull(tlsStrategy, "TLS strategy");
this.handshakeTimeout = handshakeTimeout;
}
@@ -74,20 +73,19 @@ public class ClientHttpProtocolNegotiatorFactory implements IOEventHandlerFactor
public ClientHttpProtocolNegotiator createHandler(final ProtocolIOSession ioSession, final Object attachment) {
HttpVersionPolicy endpointPolicy = versionPolicy;
if (attachment instanceof EndpointParameters) {
- final NamedEndpoint endpoint = ioSession.getInitialEndpoint();
final EndpointParameters params = (EndpointParameters) attachment;
- if (tlsStrategy != null && endpoint != null && URIScheme.HTTPS.same(params.scheme)) {
- final HttpHost host = new HttpHost(params.scheme, endpoint.getHostName(), endpoint.getPort());
+ if (tlsStrategy != null && URIScheme.HTTPS.same(params.getScheme())) {
+ final HttpHost host = new HttpHost(params.getScheme(), params.getHostName(), params.getPort());
tlsStrategy.upgrade(
ioSession,
host,
ioSession.getLocalAddress(),
ioSession.getRemoteAddress(),
- params.attachment,
+ params.getAttachment(),
handshakeTimeout);
}
- if (params.attachment instanceof HttpVersionPolicy) {
- endpointPolicy = (HttpVersionPolicy) params.attachment;
+ if (params.getAttachment() instanceof HttpVersionPolicy) {
+ endpointPolicy = (HttpVersionPolicy) params.getAttachment();
}
}
return new ClientHttpProtocolNegotiator(
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2OnlyServerHttpProtocolNegotiator.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2OnlyServerHttpProtocolNegotiator.java
new file mode 100644
index 0000000..a770f01
--- /dev/null
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2OnlyServerHttpProtocolNegotiator.java
@@ -0,0 +1,110 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http2.impl.nio;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hc.core5.annotation.Internal;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.ConnectionClosedException;
+import org.apache.hc.core5.http.impl.nio.BufferedData;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.reactor.ProtocolIOSession;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * I/O event handler for events fired by {@link ProtocolIOSession} that implements
+ * server side of the HTTP/2 protocol negotiation handshake.
+ *
+ * @since 5.1
+ */
+@Internal
+public class H2OnlyServerHttpProtocolNegotiator extends ProtocolNegotiatorBase {
+
+ final static byte[] PREFACE = ClientHttpProtocolNegotiator.PREFACE;
+
+ private final ServerH2StreamMultiplexerFactory http2StreamHandlerFactory;
+ private final BufferedData inBuf;
+
+ public H2OnlyServerHttpProtocolNegotiator(
+ final ProtocolIOSession ioSession,
+ final ServerH2StreamMultiplexerFactory http2StreamHandlerFactory) {
+ this(ioSession, http2StreamHandlerFactory, null);
+ }
+
+ public H2OnlyServerHttpProtocolNegotiator(
+ final ProtocolIOSession ioSession,
+ final ServerH2StreamMultiplexerFactory http2StreamHandlerFactory,
+ final FutureCallback<ProtocolIOSession> resultCallback) {
+ super(ioSession, resultCallback);
+ this.http2StreamHandlerFactory = Args.notNull(http2StreamHandlerFactory, "HTTP/2 stream handler factory");
+ this.inBuf = BufferedData.allocate(1024);
+ }
+
+ @Override
+ public void connected(final IOSession session) throws IOException {
+ }
+
+ @Override
+ public void inputReady(final IOSession session, final ByteBuffer src) throws IOException {
+ if (src != null) {
+ inBuf.put(src);
+ }
+ boolean endOfStream = false;
+ if (inBuf.length() < PREFACE.length) {
+ final int bytesRead = inBuf.readFrom(session);
+ if (bytesRead == -1) {
+ endOfStream = true;
+ }
+ }
+ final ByteBuffer data = inBuf.data();
+ if (data.remaining() >= PREFACE.length) {
+ for (int i = 0; i < PREFACE.length; i++) {
+ if (data.get() != PREFACE[i]) {
+ throw new ProtocolNegotiationException("Unexpected HTTP/2 preface");
+ }
+ }
+ startProtocol(new ServerH2IOEventHandler(http2StreamHandlerFactory.create(ioSession)), data.hasRemaining() ? data : null);
+ } else {
+ if (endOfStream) {
+ throw new ConnectionClosedException();
+ }
+ }
+ }
+
+ @Override
+ public void outputReady(final IOSession session) throws IOException {
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getName();
+ }
+
+}
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerH2UpgradeHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerH2UpgradeHandler.java
new file mode 100644
index 0000000..ab8bf7c
--- /dev/null
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerH2UpgradeHandler.java
@@ -0,0 +1,69 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http2.impl.nio;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.Internal;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.impl.nio.HttpConnectionEventHandler;
+import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
+import org.apache.hc.core5.reactor.ProtocolIOSession;
+import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.Timeout;
+
+/**
+ * Protocol upgrade handler that upgrades the underlying {@link ProtocolIOSession}
+ * to HTTP/2 in case of a successful protocol negotiation.
+ *
+ * @since 5.1
+ */
+@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
+@Internal
+public class ServerH2UpgradeHandler extends AbstractH2UpgradeHandler {
+
+ private final ServerH2StreamMultiplexerFactory http2StreamHandlerFactory;
+
+ public ServerH2UpgradeHandler(
+ final ServerH2StreamMultiplexerFactory http2StreamHandlerFactory,
+ final TlsStrategy tlsStrategy,
+ final Timeout handshakeTimeout) {
+ super(tlsStrategy, handshakeTimeout);
+ this.http2StreamHandlerFactory = Args.notNull(http2StreamHandlerFactory, "HTTP/2 stream handler factory");
+ }
+
+ @Override
+ protected HttpConnectionEventHandler createProtocolNegotiator(final ProtocolIOSession ioSession,
+ final FutureCallback<ProtocolIOSession> callback) {
+ return new H2OnlyServerHttpProtocolNegotiator(
+ ioSession,
+ http2StreamHandlerFactory,
+ callback);
+ }
+
+}
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
index 7468497..8d9aed2 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
@@ -164,7 +164,6 @@ public class ServerHttpProtocolNegotiator extends ProtocolNegotiatorBase {
throw new ConnectionClosedException();
}
}
- data.clear();
}
@Override
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiatorFactory.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiatorFactory.java
index 817a895..35ca32f 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiatorFactory.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiatorFactory.java
@@ -31,7 +31,7 @@ import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.http.URIScheme;
-import org.apache.hc.core5.http.impl.nio.EndpointParameters;
+import org.apache.hc.core5.reactor.EndpointParameters;
import org.apache.hc.core5.http.impl.nio.ServerHttp1StreamDuplexerFactory;
import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
import org.apache.hc.core5.http2.HttpVersionPolicy;
@@ -64,7 +64,7 @@ public class ServerHttpProtocolNegotiatorFactory implements IOEventHandlerFactor
this.http1StreamDuplexerFactory = Args.notNull(http1StreamDuplexerFactory, "HTTP/1.1 stream handler factory");
this.http2StreamMultiplexerFactory = Args.notNull(http2StreamMultiplexerFactory, "HTTP/2 stream handler factory");
this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
- this.tlsStrategy = tlsStrategy;
+ this.tlsStrategy = Args.notNull(tlsStrategy, "TLS strategy");
this.handshakeTimeout = handshakeTimeout;
}
@@ -73,26 +73,18 @@ public class ServerHttpProtocolNegotiatorFactory implements IOEventHandlerFactor
HttpVersionPolicy endpointPolicy = versionPolicy;
if (attachment instanceof EndpointParameters) {
final EndpointParameters params = (EndpointParameters) attachment;
- if (tlsStrategy != null && URIScheme.HTTPS.same(params.scheme)) {
+ if (tlsStrategy != null && URIScheme.HTTPS.same(params.getScheme())) {
tlsStrategy.upgrade(
ioSession,
null,
ioSession.getLocalAddress(),
ioSession.getRemoteAddress(),
- params.attachment,
+ params.getAttachment(),
handshakeTimeout);
}
- if (params.attachment instanceof HttpVersionPolicy) {
- endpointPolicy = (HttpVersionPolicy) params.attachment;
+ if (params.getAttachment() instanceof HttpVersionPolicy) {
+ endpointPolicy = (HttpVersionPolicy) params.getAttachment();
}
- } else {
- tlsStrategy.upgrade(
- ioSession,
- null,
- ioSession.getLocalAddress(),
- ioSession.getRemoteAddress(),
- attachment,
- handshakeTimeout);
}
return new ServerHttpProtocolNegotiator(
ioSession,
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2AsyncRequester.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2AsyncRequester.java
index de22471..e7642b6 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2AsyncRequester.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2AsyncRequester.java
@@ -34,6 +34,7 @@ import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.function.Decorator;
import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.config.Lookup;
import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
import org.apache.hc.core5.http2.HttpVersionPolicy;
@@ -42,6 +43,7 @@ import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.reactor.IOSessionListener;
+import org.apache.hc.core5.reactor.ProtocolUpgradeHandler;
import org.apache.hc.core5.util.Timeout;
/**
@@ -70,6 +72,26 @@ public class H2AsyncRequester extends HttpAsyncRequester {
this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
}
+ /**
+ * Use {@link H2RequesterBootstrap} to create instances of this class.
+ *
+ * @since 5.1
+ */
+ @Internal
+ public H2AsyncRequester(
+ final HttpVersionPolicy versionPolicy,
+ final IOReactorConfig ioReactorConfig,
+ final IOEventHandlerFactory eventHandlerFactory,
+ final Decorator<IOSession> ioSessionDecorator,
+ final Callback<Exception> exceptionCallback,
+ final IOSessionListener sessionListener,
+ final ManagedConnPool<HttpHost, IOSession> connPool,
+ final Lookup<ProtocolUpgradeHandler> protocolUpgradeHandlerLookup) {
+ super(ioReactorConfig, eventHandlerFactory, ioSessionDecorator, exceptionCallback, sessionListener, connPool,
+ protocolUpgradeHandlerLookup);
+ this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
+ }
+
@Override
protected Future<AsyncClientEndpoint> doConnect(
final HttpHost host,
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java
index 049ed16..76be6fc 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java
@@ -36,9 +36,15 @@ import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.config.CharCodingConfig;
import org.apache.hc.core5.http.config.Http1Config;
+import org.apache.hc.core5.http.config.Registry;
+import org.apache.hc.core5.http.config.RegistryBuilder;
+import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
import org.apache.hc.core5.http.impl.Http1StreamListener;
import org.apache.hc.core5.http.impl.HttpProcessors;
import org.apache.hc.core5.http.impl.nio.ClientHttp1StreamDuplexerFactory;
+import org.apache.hc.core5.http.impl.nio.DefaultHttpRequestWriterFactory;
+import org.apache.hc.core5.http.impl.nio.DefaultHttpResponseParserFactory;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
import org.apache.hc.core5.http.protocol.HttpProcessor;
@@ -48,10 +54,12 @@ import org.apache.hc.core5.http2.HttpVersionPolicy;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.http2.impl.H2Processors;
import org.apache.hc.core5.http2.impl.nio.ClientH2StreamMultiplexerFactory;
+import org.apache.hc.core5.http2.impl.nio.ClientH2UpgradeHandler;
import org.apache.hc.core5.http2.impl.nio.ClientHttpProtocolNegotiatorFactory;
import org.apache.hc.core5.http2.impl.nio.H2StreamListener;
import org.apache.hc.core5.http2.nio.support.DefaultAsyncPushConsumerFactory;
import org.apache.hc.core5.http2.ssl.H2ClientTlsStrategy;
+import org.apache.hc.core5.http2.ssl.TlsUpgradeHandler;
import org.apache.hc.core5.pool.ConnPoolListener;
import org.apache.hc.core5.pool.DefaultDisposalCallback;
import org.apache.hc.core5.pool.LaxConnPool;
@@ -63,6 +71,7 @@ import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.reactor.IOSessionListener;
+import org.apache.hc.core5.reactor.ProtocolUpgradeHandler;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
@@ -309,23 +318,41 @@ public class H2RequesterBootstrap {
for (final HandlerEntry<Supplier<AsyncPushConsumer>> entry: pushConsumerList) {
registry.register(entry.hostname, entry.uriPattern, entry.handler);
}
- final ClientHttp1StreamDuplexerFactory http1StreamHandlerFactory = new ClientHttp1StreamDuplexerFactory(
- httpProcessor != null ? httpProcessor : HttpProcessors.client(),
- http1Config != null ? http1Config : Http1Config.DEFAULT,
- charCodingConfig != null ? charCodingConfig : CharCodingConfig.DEFAULT,
- http1StreamListener);
+
final ClientH2StreamMultiplexerFactory http2StreamHandlerFactory = new ClientH2StreamMultiplexerFactory(
httpProcessor != null ? httpProcessor : H2Processors.client(),
new DefaultAsyncPushConsumerFactory(registry),
h2Config != null ? h2Config : H2Config.DEFAULT,
charCodingConfig != null ? charCodingConfig : CharCodingConfig.DEFAULT,
streamListener);
+
+ final HttpVersionPolicy actualVersionProtocol = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
+ final TlsStrategy actualTlsStrategy = tlsStrategy != null ? tlsStrategy : new H2ClientTlsStrategy();
+
+ final Registry<ProtocolUpgradeHandler> protocolRegistry = RegistryBuilder.<ProtocolUpgradeHandler>create()
+ .register("TLS", new TlsUpgradeHandler(actualVersionProtocol, actualTlsStrategy, handshakeTimeout))
+ .register("H2", new ClientH2UpgradeHandler(http2StreamHandlerFactory, actualTlsStrategy, handshakeTimeout))
+ .build();
+
+ final ClientHttp1StreamDuplexerFactory http1StreamHandlerFactory = new ClientHttp1StreamDuplexerFactory(
+ httpProcessor != null ? httpProcessor : HttpProcessors.client(),
+ http1Config != null ? http1Config : Http1Config.DEFAULT,
+ charCodingConfig != null ? charCodingConfig : CharCodingConfig.DEFAULT,
+ DefaultConnectionReuseStrategy.INSTANCE,
+ new DefaultHttpResponseParserFactory(http1Config),
+ DefaultHttpRequestWriterFactory.INSTANCE,
+ DefaultContentLengthStrategy.INSTANCE,
+ DefaultContentLengthStrategy.INSTANCE,
+ protocolRegistry,
+ http1StreamListener);
+
final IOEventHandlerFactory ioEventHandlerFactory = new ClientHttpProtocolNegotiatorFactory(
http1StreamHandlerFactory,
http2StreamHandlerFactory,
- versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE,
- tlsStrategy != null ? tlsStrategy : new H2ClientTlsStrategy(),
+ actualVersionProtocol,
+ actualTlsStrategy,
handshakeTimeout);
+
return new H2AsyncRequester(
versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE,
ioReactorConfig,
@@ -333,7 +360,8 @@ public class H2RequesterBootstrap {
ioSessionDecorator,
exceptionCallback,
sessionListener,
- connPool);
+ connPool,
+ protocolRegistry);
}
}
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2ServerBootstrap.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2ServerBootstrap.java
index e458a91..c84c75b 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2ServerBootstrap.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2ServerBootstrap.java
@@ -35,6 +35,8 @@ import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.config.CharCodingConfig;
import org.apache.hc.core5.http.config.Http1Config;
import org.apache.hc.core5.http.config.NamedElementChain;
+import org.apache.hc.core5.http.config.Registry;
+import org.apache.hc.core5.http.config.RegistryBuilder;
import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
import org.apache.hc.core5.http.impl.Http1StreamListener;
@@ -65,12 +67,16 @@ import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.http2.impl.H2Processors;
import org.apache.hc.core5.http2.impl.nio.H2StreamListener;
import org.apache.hc.core5.http2.impl.nio.ServerH2StreamMultiplexerFactory;
+import org.apache.hc.core5.http2.impl.nio.ServerH2UpgradeHandler;
import org.apache.hc.core5.http2.impl.nio.ServerHttpProtocolNegotiatorFactory;
+import org.apache.hc.core5.http2.ssl.H2ServerTlsStrategy;
+import org.apache.hc.core5.http2.ssl.TlsUpgradeHandler;
import org.apache.hc.core5.net.InetAddressUtils;
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.reactor.IOSessionListener;
+import org.apache.hc.core5.reactor.ProtocolUpgradeHandler;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.Timeout;
@@ -354,8 +360,9 @@ public class H2ServerBootstrap {
}
public HttpAsyncServer create() {
+ final String actualCanonicalHostName = canonicalHostName != null ? canonicalHostName : InetAddressUtils.getCanonicalLocalHostName();
final RequestHandlerRegistry<Supplier<AsyncServerExchangeHandler>> registry = new RequestHandlerRegistry<>(
- canonicalHostName != null ? canonicalHostName : InetAddressUtils.getCanonicalLocalHostName(),
+ actualCanonicalHostName,
new Supplier<LookupRegistry<Supplier<AsyncServerExchangeHandler>>>() {
@Override
@@ -424,6 +431,15 @@ public class H2ServerBootstrap {
h2Config != null ? h2Config : H2Config.DEFAULT,
charCodingConfig != null ? charCodingConfig : CharCodingConfig.DEFAULT,
h2StreamListener);
+
+ final HttpVersionPolicy actualVersionProtocol = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
+ final TlsStrategy actualTlsStrategy = tlsStrategy != null ? tlsStrategy : new H2ServerTlsStrategy();
+
+ final Registry<ProtocolUpgradeHandler> protocolRegistry = RegistryBuilder.<ProtocolUpgradeHandler>create()
+ .register("TLS", new TlsUpgradeHandler(actualVersionProtocol, actualTlsStrategy, handshakeTimeout))
+ .register("H2", new ServerH2UpgradeHandler(http2StreamHandlerFactory, actualTlsStrategy, handshakeTimeout))
+ .build();
+
final ServerHttp1StreamDuplexerFactory http1StreamHandlerFactory = new ServerHttp1StreamDuplexerFactory(
httpProcessor != null ? httpProcessor : HttpProcessors.server(),
handlerFactory,
@@ -434,15 +450,18 @@ public class H2ServerBootstrap {
DefaultHttpResponseWriterFactory.INSTANCE,
DefaultContentLengthStrategy.INSTANCE,
DefaultContentLengthStrategy.INSTANCE,
+ protocolRegistry,
http1StreamListener);
+
final IOEventHandlerFactory ioEventHandlerFactory = new ServerHttpProtocolNegotiatorFactory(
http1StreamHandlerFactory,
http2StreamHandlerFactory,
- versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE,
- tlsStrategy,
+ actualVersionProtocol,
+ actualTlsStrategy,
handshakeTimeout);
+
return new HttpAsyncServer(ioEventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback,
- sessionListener);
+ sessionListener, actualCanonicalHostName);
}
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1IOEventHandlerFactory.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/ssl/TlsUpgradeHandler.java
similarity index 57%
copy from httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1IOEventHandlerFactory.java
copy to httpcore5-h2/src/main/java/org/apache/hc/core5/http2/ssl/TlsUpgradeHandler.java
index 09ece66..9aadd31 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1IOEventHandlerFactory.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/ssl/TlsUpgradeHandler.java
@@ -25,66 +25,65 @@
*
*/
-package org.apache.hc.core5.http.impl.nio;
+package org.apache.hc.core5.http2.ssl;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.annotation.ThreadingBehavior;
-import org.apache.hc.core5.http.URIScheme;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.reactor.EndpointParameters;
import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
-import org.apache.hc.core5.reactor.IOEventHandler;
-import org.apache.hc.core5.reactor.IOEventHandlerFactory;
+import org.apache.hc.core5.http2.HttpVersionPolicy;
import org.apache.hc.core5.reactor.ProtocolIOSession;
+import org.apache.hc.core5.reactor.ProtocolUpgradeHandler;
+import org.apache.hc.core5.reactor.TransportSecurityLayerEx;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.Timeout;
/**
- * {@link ServerHttp1IOEventHandler} factory.
+ * Protocol upgrade handler that upgrades the underlying {@link ProtocolIOSession} to TLS
+ * using {@link TlsStrategy}.
*
- * @since 5.0
+ * @since 5.1
*/
@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
@Internal
-public class ServerHttp1IOEventHandlerFactory implements IOEventHandlerFactory {
+public class TlsUpgradeHandler implements ProtocolUpgradeHandler {
- private final ServerHttp1StreamDuplexerFactory streamDuplexerFactory;
+ private final HttpVersionPolicy versionPolicy;
private final TlsStrategy tlsStrategy;
private final Timeout handshakeTimeout;
- public ServerHttp1IOEventHandlerFactory(
- final ServerHttp1StreamDuplexerFactory streamDuplexerFactory,
+ public TlsUpgradeHandler(
+ final HttpVersionPolicy versionPolicy,
final TlsStrategy tlsStrategy,
final Timeout handshakeTimeout) {
- this.streamDuplexerFactory = Args.notNull(streamDuplexerFactory, "Stream duplexer factory");
+ this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
this.tlsStrategy = tlsStrategy;
this.handshakeTimeout = handshakeTimeout;
}
@Override
- public IOEventHandler createHandler(final ProtocolIOSession ioSession, final Object attachment) {
- String endpointScheme = URIScheme.HTTP.id;
- if (attachment instanceof EndpointParameters) {
- final EndpointParameters params = (EndpointParameters) attachment;
- endpointScheme = params.scheme;
- if (tlsStrategy != null && URIScheme.HTTPS.same(endpointScheme)) {
- tlsStrategy.upgrade(
- ioSession,
- null,
- ioSession.getLocalAddress(),
- ioSession.getRemoteAddress(),
- params.attachment,
- handshakeTimeout);
+ public void upgrade(final ProtocolIOSession ioSession,
+ final EndpointParameters parameters,
+ final FutureCallback<ProtocolIOSession> callback) {
+ Args.notNull(parameters, "Endpoint parameters");
+ if (ioSession instanceof TransportSecurityLayerEx) {
+ final TransportSecurityLayerEx transportSecurityLayer = (TransportSecurityLayerEx) ioSession;
+ if (callback != null) {
+ transportSecurityLayer.subscribe(callback);
}
- } else {
tlsStrategy.upgrade(
- ioSession,
- null,
+ transportSecurityLayer,
+ new HttpHost(parameters.getScheme(), parameters.getHostName(), parameters.getPort()),
ioSession.getLocalAddress(),
ioSession.getRemoteAddress(),
- attachment,
+ versionPolicy,
handshakeTimeout);
+ } else {
+ throw new UnsupportedOperationException("TLS upgrade not supported");
}
- return new ServerHttp1IOEventHandler(streamDuplexerFactory.create(endpointScheme, ioSession));
}
}
diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2ViaHttp1ProxyExecutionExample.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2ViaHttp1ProxyExecutionExample.java
new file mode 100644
index 0000000..cd13e34
--- /dev/null
+++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2ViaHttp1ProxyExecutionExample.java
@@ -0,0 +1,236 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http2.examples;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hc.core5.concurrent.ComplexFuture;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.concurrent.FutureContribution;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpConnection;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.Method;
+import org.apache.hc.core5.http.impl.Http1StreamListener;
+import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
+import org.apache.hc.core5.reactor.EndpointParameters;
+import org.apache.hc.core5.http.message.BasicHttpRequest;
+import org.apache.hc.core5.http.message.RequestLine;
+import org.apache.hc.core5.http.message.StatusLine;
+import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
+import org.apache.hc.core5.http.nio.entity.NoopEntityConsumer;
+import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
+import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
+import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
+import org.apache.hc.core5.http2.HttpVersionPolicy;
+import org.apache.hc.core5.http2.config.H2Config;
+import org.apache.hc.core5.http2.frame.RawFrame;
+import org.apache.hc.core5.http2.impl.nio.H2StreamListener;
+import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.reactor.ProtocolLayer;
+import org.apache.hc.core5.util.Timeout;
+
+/**
+ * Example of asynchronous HTTP/2 request execution via a HTTP/1.1 proxy.
+ */
+public class H2ViaHttp1ProxyExecutionExample {
+
+ public static void main(final String[] args) throws Exception {
+
+ // Create and start requester
+ final H2Config h2Config = H2Config.custom()
+ .setPushEnabled(false)
+ .build();
+
+ final HttpAsyncRequester requester = H2RequesterBootstrap.bootstrap()
+ .setH2Config(h2Config)
+ .setVersionPolicy(HttpVersionPolicy.NEGOTIATE)
+ .setStreamListener(new Http1StreamListener() {
+
+ @Override
+ public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
+ System.out.println(connection.getRemoteAddress() + " " + new RequestLine(request));
+ }
+
+ @Override
+ public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
+ System.out.println(connection.getRemoteAddress() + " " + new StatusLine(response));
+ }
+
+ @Override
+ public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
+ if (keepAlive) {
+ System.out.println(connection.getRemoteAddress() + " exchange completed (connection kept alive)");
+ } else {
+ System.out.println(connection.getRemoteAddress() + " exchange completed (connection closed)");
+ }
+ }
+
+ })
+ .setStreamListener(new H2StreamListener() {
+
+ @Override
+ public void onHeaderInput(final HttpConnection connection, final int streamId, final List<? extends Header> headers) {
+ for (int i = 0; i < headers.size(); i++) {
+ System.out.println(connection.getRemoteAddress() + " (" + streamId + ") << " + headers.get(i));
+ }
+ }
+
+ @Override
+ public void onHeaderOutput(final HttpConnection connection, final int streamId, final List<? extends Header> headers) {
+ for (int i = 0; i < headers.size(); i++) {
+ System.out.println(connection.getRemoteAddress() + " (" + streamId + ") >> " + headers.get(i));
+ }
+ }
+
+ @Override
+ public void onFrameInput(final HttpConnection connection, final int streamId, final RawFrame frame) {
+ }
+
+ @Override
+ public void onFrameOutput(final HttpConnection connection, final int streamId, final RawFrame frame) {
+ }
+
+ @Override
+ public void onInputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
+ }
+
+ @Override
+ public void onOutputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
+ }
+
+ })
+ .create();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ System.out.println("HTTP requester shutting down");
+ requester.close(CloseMode.GRACEFUL);
+ }
+ });
+ requester.start();
+
+ final HttpHost proxy = new HttpHost("localhost", 8888);
+ final HttpHost target = new HttpHost("https", "nghttp2.org");
+
+ final ComplexFuture<AsyncClientEndpoint> tunnelFuture = new ComplexFuture<>(null);
+ tunnelFuture.setDependency(requester.connect(
+ proxy,
+ Timeout.ofSeconds(30),
+ null,
+ new FutureContribution<AsyncClientEndpoint>(tunnelFuture) {
+
+ @Override
+ public void completed(final AsyncClientEndpoint endpoint) {
+ final HttpRequest connect = new BasicHttpRequest(Method.CONNECT, proxy, target.toHostString());
+ endpoint.execute(
+ new BasicRequestProducer(connect, null),
+ new BasicResponseConsumer<>(new NoopEntityConsumer()),
+ new FutureContribution<Message<HttpResponse, Void>>(tunnelFuture) {
+
+ @Override
+ public void completed(final Message<HttpResponse, Void> message) {
+ final HttpResponse response = message.getHead();
+ if (response.getCode() == HttpStatus.SC_OK) {
+ if (endpoint instanceof ProtocolLayer) {
+ final EndpointParameters params = new EndpointParameters(
+ target.getSchemeName(),
+ target.getHostName(),
+ target.getPort(),
+ HttpVersionPolicy.NEGOTIATE);
+ final ProtocolLayer protocolLayer = (ProtocolLayer) endpoint;
+ try {
+ protocolLayer.upgrade("H2", params);
+ } catch (final RuntimeException ex) {
+ tunnelFuture.failed(ex);
+ }
+ }
+ System.out.println("Tunnel to " + target + " via " + proxy + " established");
+ tunnelFuture.completed(endpoint);
+ } else {
+ tunnelFuture.failed(new HttpException("Tunnel refused: " + new StatusLine(response)));
+ }
+ }
+
+ });
+ }
+
+ }));
+
+ final String[] requestUris = new String[] {"/httpbin/ip", "/httpbin/user-agent", "/httpbin/headers"};
+ final AsyncClientEndpoint endpoint = tunnelFuture.get(1, TimeUnit.MINUTES);
+ try {
+ final CountDownLatch latch = new CountDownLatch(requestUris.length);
+ for (final String requestUri : requestUris) {
+ endpoint.execute(
+ new BasicRequestProducer(Method.GET, target, requestUri),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
+ new FutureCallback<Message<HttpResponse, String>>() {
+
+ @Override
+ public void completed(final Message<HttpResponse, String> message) {
+ final HttpResponse response = message.getHead();
+ final String body = message.getBody();
+ System.out.println(requestUri + "->" + response.getCode());
+ System.out.println(body);
+ latch.countDown();
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ System.out.println(requestUri + "->" + ex);
+ latch.countDown();
+ }
+
+ @Override
+ public void cancelled() {
+ System.out.println(requestUri + " cancelled");
+ latch.countDown();
+ }
+
+ });
+ }
+
+ latch.await();
+ } finally {
+ endpoint.releaseAndDiscard();
+ }
+
+ System.out.println("Shutting down I/O reactor");
+ requester.initiateShutdown();
+ }
+
+}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
index 7657b2a..e31f546 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
@@ -49,8 +49,9 @@ import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.ProtocolException;
+import org.apache.hc.core5.http.config.Lookup;
import org.apache.hc.core5.http.impl.DefaultAddressResolver;
-import org.apache.hc.core5.http.impl.nio.EndpointParameters;
+import org.apache.hc.core5.reactor.EndpointParameters;
import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
@@ -76,6 +77,9 @@ import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.reactor.IOSessionListener;
+import org.apache.hc.core5.reactor.ProtocolIOSession;
+import org.apache.hc.core5.reactor.ProtocolLayer;
+import org.apache.hc.core5.reactor.ProtocolUpgradeHandler;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
@@ -88,9 +92,12 @@ import org.apache.hc.core5.util.Timeout;
public class HttpAsyncRequester extends AsyncRequester implements ConnPoolControl<HttpHost> {
private final ManagedConnPool<HttpHost, IOSession> connPool;
+ private final Lookup<ProtocolUpgradeHandler> protocolUpgradeHandlerLookup;
/**
* Use {@link AsyncRequesterBootstrap} to create instances of this class.
+ *
+ * @since 5.1
*/
@Internal
public HttpAsyncRequester(
@@ -99,10 +106,26 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
final Decorator<IOSession> ioSessionDecorator,
final Callback<Exception> exceptionCallback,
final IOSessionListener sessionListener,
- final ManagedConnPool<HttpHost, IOSession> connPool) {
+ final ManagedConnPool<HttpHost, IOSession> connPool,
+ final Lookup<ProtocolUpgradeHandler> protocolUpgradeHandlerLookup) {
super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener,
ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE);
this.connPool = Args.notNull(connPool, "Connection pool");
+ this.protocolUpgradeHandlerLookup = protocolUpgradeHandlerLookup;
+ }
+
+ /**
+ * Use {@link AsyncRequesterBootstrap} to create instances of this class.
+ */
+ @Internal
+ public HttpAsyncRequester(
+ final IOReactorConfig ioReactorConfig,
+ final IOEventHandlerFactory eventHandlerFactory,
+ final Decorator<IOSession> ioSessionDecorator,
+ final Callback<Exception> exceptionCallback,
+ final IOSessionListener sessionListener,
+ final ManagedConnPool<HttpHost, IOSession> connPool) {
+ this(ioReactorConfig, eventHandlerFactory, ioSessionDecorator, exceptionCallback, sessionListener, connPool, null);
}
@Override
@@ -192,7 +215,7 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
final Future<IOSession> future = requestSession(
host,
timeout,
- new EndpointParameters(host.getSchemeName(), attachment),
+ new EndpointParameters(host, attachment),
new FutureCallback<IOSession>() {
@Override
@@ -409,7 +432,7 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
return execute(requestProducer, responseConsumer, null, timeout, null, callback);
}
- private class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
+ private class InternalAsyncClientEndpoint extends AsyncClientEndpoint implements ProtocolLayer {
final AtomicReference<PoolEntry<HttpHost, IOSession>> poolEntryRef;
@@ -417,11 +440,7 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
this.poolEntryRef = new AtomicReference<>(poolEntry);
}
- @Override
- public void execute(
- final AsyncClientExchangeHandler exchangeHandler,
- final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
- final HttpContext context) {
+ private IOSession getIOSession() {
final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.get();
if (poolEntry == null) {
throw new IllegalStateException("Endpoint has already been released");
@@ -430,6 +449,15 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
if (ioSession == null) {
throw new IllegalStateException("I/O session is invalid");
}
+ return ioSession;
+ }
+
+ @Override
+ public void execute(
+ final AsyncClientExchangeHandler exchangeHandler,
+ final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
+ final HttpContext context) {
+ final IOSession ioSession = getIOSession();
ioSession.enqueue(new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, null, context), Command.Priority.NORMAL);
if (!ioSession.isOpen()) {
try {
@@ -445,9 +473,7 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.get();
if (poolEntry != null) {
final IOSession ioSession = poolEntry.getConnection();
- if (ioSession != null && ioSession.isOpen()) {
- return true;
- }
+ return ioSession != null && ioSession.isOpen();
}
return false;
}
@@ -470,6 +496,32 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
}
}
+ @Override
+ public void upgrade(final ProtocolUpgradeHandler upgradeHandler, final EndpointParameters parameters) {
+ Args.notNull(upgradeHandler, "Protocol upgrade handler");
+ Args.notNull(parameters, "Endpoint parameters");
+ final IOSession ioSession = getIOSession();
+ if (ioSession instanceof ProtocolIOSession) {
+ upgradeHandler.upgrade((ProtocolIOSession) ioSession, parameters, null);
+ } else {
+ throw new UnsupportedOperationException("Protocol upgrade not supported");
+ }
+ }
+
+ @Override
+ public void upgrade(final String id, final EndpointParameters parameters) {
+ final IOSession ioSession = getIOSession();
+ if (ioSession instanceof ProtocolIOSession) {
+ final ProtocolUpgradeHandler upgradeHandler = protocolUpgradeHandlerLookup.lookup(id);
+ if (upgradeHandler == null) {
+ throw new IllegalArgumentException("Unsupported protocol: " + id);
+ }
+ upgradeHandler.upgrade((ProtocolIOSession) ioSession, parameters, null);
+ } else {
+ throw new UnsupportedOperationException("Protocol upgrade not supported by the endpoint");
+ }
+ }
+
}
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncServer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncServer.java
index 1e39f18..cb6ca43 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncServer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncServer.java
@@ -26,6 +26,7 @@
*/
package org.apache.hc.core5.http.impl.bootstrap;
+import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.Future;
@@ -34,7 +35,7 @@ import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.function.Decorator;
import org.apache.hc.core5.http.URIScheme;
-import org.apache.hc.core5.http.impl.nio.EndpointParameters;
+import org.apache.hc.core5.reactor.EndpointParameters;
import org.apache.hc.core5.http.nio.command.ShutdownCommand;
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.IOReactorConfig;
@@ -49,8 +50,12 @@ import org.apache.hc.core5.reactor.ListenerEndpoint;
*/
public class HttpAsyncServer extends AsyncServer {
+ private final String canonicalName;
+
/**
* Use {@link AsyncServerBootstrap} to create instances of this class.
+ *
+ * @since 5.1
*/
@Internal
public HttpAsyncServer(
@@ -58,9 +63,24 @@ public class HttpAsyncServer extends AsyncServer {
final IOReactorConfig ioReactorConfig,
final Decorator<IOSession> ioSessionDecorator,
final Callback<Exception> exceptionCallback,
- final IOSessionListener sessionListener) {
+ final IOSessionListener sessionListener,
+ final String canonicalName) {
super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener,
ShutdownCommand.GRACEFUL_NORMAL_CALLBACK);
+ this.canonicalName = canonicalName;
+ }
+
+ /**
+ * Use {@link AsyncServerBootstrap} to create instances of this class.
+ */
+ @Internal
+ public HttpAsyncServer(
+ final IOEventHandlerFactory eventHandlerFactory,
+ final IOReactorConfig ioReactorConfig,
+ final Decorator<IOSession> ioSessionDecorator,
+ final Callback<Exception> exceptionCallback,
+ final IOSessionListener sessionListener) {
+ this(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener, null);
}
public Future<ListenerEndpoint> listen(
@@ -68,7 +88,13 @@ public class HttpAsyncServer extends AsyncServer {
final URIScheme scheme,
final Object attachment,
final FutureCallback<ListenerEndpoint> callback) {
- return super.listen(address, new EndpointParameters(scheme.id, attachment), callback);
+ final InetSocketAddress inetSocketAddress = (InetSocketAddress) address;
+ final EndpointParameters parameters = new EndpointParameters(
+ scheme.id,
+ canonicalName != null ? canonicalName : "localhost",
+ inetSocketAddress.getPort(),
+ attachment);
+ return super.listen(address, parameters, callback);
}
public Future<ListenerEndpoint> listen(
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
index 7d63e08..c5e63d4 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
@@ -51,6 +51,7 @@ import org.apache.hc.core5.http.Message;
import org.apache.hc.core5.http.ProtocolVersion;
import org.apache.hc.core5.http.config.CharCodingConfig;
import org.apache.hc.core5.http.config.Http1Config;
+import org.apache.hc.core5.http.config.Lookup;
import org.apache.hc.core5.http.impl.BasicEndpointDetails;
import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
@@ -70,16 +71,19 @@ import org.apache.hc.core5.http.nio.command.ShutdownCommand;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.io.SocketTimeoutExceptionFactory;
import org.apache.hc.core5.reactor.Command;
+import org.apache.hc.core5.reactor.EndpointParameters;
import org.apache.hc.core5.reactor.EventMask;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.reactor.ProtocolIOSession;
+import org.apache.hc.core5.reactor.ProtocolLayer;
+import org.apache.hc.core5.reactor.ProtocolUpgradeHandler;
import org.apache.hc.core5.reactor.ssl.TlsDetails;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.Identifiable;
import org.apache.hc.core5.util.Timeout;
abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, OutgoingMessage extends HttpMessage>
- implements Identifiable, HttpConnection {
+ implements ProtocolLayer, Identifiable, HttpConnection {
private enum ConnectionState { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
@@ -96,6 +100,7 @@ abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
private final ContentLengthStrategy outgoingContentStrategy;
private final ByteBuffer contentBuffer;
private final AtomicInteger outputRequests;
+ private final Lookup<ProtocolUpgradeHandler> protocolUpgradeHandlerLookup;
private volatile Message<IncomingMessage, ContentDecoder> incomingMessage;
private volatile Message<OutgoingMessage, ContentEncoder> outgoingMessage;
@@ -112,7 +117,8 @@ abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
final NHttpMessageParser<IncomingMessage> incomingMessageParser,
final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter,
final ContentLengthStrategy incomingContentStrategy,
- final ContentLengthStrategy outgoingContentStrategy) {
+ final ContentLengthStrategy outgoingContentStrategy,
+ final Lookup<ProtocolUpgradeHandler> protocolUpgradeHandlerLookup) {
this.ioSession = Args.notNull(ioSession, "I/O session");
this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
final int bufferSize = this.http1Config.getBufferSize();
@@ -130,6 +136,7 @@ abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
DefaultContentLengthStrategy.INSTANCE;
this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
DefaultContentLengthStrategy.INSTANCE;
+ this.protocolUpgradeHandlerLookup = protocolUpgradeHandlerLookup;
this.contentBuffer = ByteBuffer.allocate(this.http1Config.getBufferSize());
this.outputRequests = new AtomicInteger(0);
this.connState = ConnectionState.READY;
@@ -579,6 +586,26 @@ abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
return tlsDetails != null ? tlsDetails.getSSLSession() : null;
}
+ @Override
+ public void upgrade(final ProtocolUpgradeHandler upgradeHandler,
+ final EndpointParameters parameters) throws UnsupportedOperationException {
+ Args.notNull(upgradeHandler, "Protocol upgrade handler");
+ Args.notNull(parameters, "Endpoint parameters");
+ upgradeHandler.upgrade(ioSession, parameters, null);
+ }
+
+ @Override
+ public void upgrade(final String id,
+ final EndpointParameters parameters) throws UnsupportedOperationException {
+ Args.notNull(id, "Protocol id");
+ Args.notNull(parameters, "Endpoint parameters");
+ final ProtocolUpgradeHandler upgradeHandler = protocolUpgradeHandlerLookup.lookup(id);
+ if (upgradeHandler == null) {
+ throw new IllegalArgumentException("Unsupported protocol: " + id);
+ }
+ upgradeHandler.upgrade(ioSession, parameters, null);
+ }
+
void appendState(final StringBuilder buf) {
buf.append("connState=").append(connState)
.append(", inbuf=").append(inbuf)
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1IOEventHandlerFactory.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1IOEventHandlerFactory.java
index 2a8b571..2c39255 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1IOEventHandlerFactory.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1IOEventHandlerFactory.java
@@ -33,7 +33,7 @@ import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
-import org.apache.hc.core5.net.NamedEndpoint;
+import org.apache.hc.core5.reactor.EndpointParameters;
import org.apache.hc.core5.reactor.IOEventHandler;
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.ProtocolIOSession;
@@ -66,15 +66,14 @@ public class ClientHttp1IOEventHandlerFactory implements IOEventHandlerFactory {
public IOEventHandler createHandler(final ProtocolIOSession ioSession, final Object attachment) {
if (attachment instanceof EndpointParameters) {
final EndpointParameters params = (EndpointParameters) attachment;
- final NamedEndpoint endpoint = ioSession.getInitialEndpoint();
- if (tlsStrategy != null && endpoint != null && URIScheme.HTTPS.same(params.scheme)) {
- final HttpHost host = new HttpHost(params.scheme, endpoint.getHostName(), endpoint.getPort());
+ if (tlsStrategy != null && URIScheme.HTTPS.same(params.getScheme())) {
+ final HttpHost host = new HttpHost(params.getScheme(), params.getHostName(), params.getPort());
tlsStrategy.upgrade(
ioSession,
host,
ioSession.getLocalAddress(),
ioSession.getRemoteAddress(),
- params.attachment,
+ params.getAttachment(),
handshakeTimeout);
}
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java
index 36cb7e4..ca6c9a2 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java
@@ -48,6 +48,7 @@ import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.LengthRequiredException;
import org.apache.hc.core5.http.config.CharCodingConfig;
import org.apache.hc.core5.http.config.Http1Config;
+import org.apache.hc.core5.http.config.Lookup;
import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
@@ -66,6 +67,7 @@ import org.apache.hc.core5.http.protocol.HttpCoreContext;
import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.reactor.ProtocolIOSession;
+import org.apache.hc.core5.reactor.ProtocolUpgradeHandler;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.Asserts;
import org.apache.hc.core5.util.Timeout;
@@ -101,7 +103,27 @@ public class ClientHttp1StreamDuplexer extends AbstractHttp1StreamDuplexer<HttpR
final ContentLengthStrategy incomingContentStrategy,
final ContentLengthStrategy outgoingContentStrategy,
final Http1StreamListener streamListener) {
- super(ioSession, http1Config, charCodingConfig, incomingMessageParser, outgoingMessageWriter, incomingContentStrategy, outgoingContentStrategy);
+ this(ioSession, httpProcessor, http1Config, charCodingConfig, connectionReuseStrategy, incomingMessageParser,
+ outgoingMessageWriter, incomingContentStrategy, outgoingContentStrategy, null, streamListener);
+ }
+
+ /**
+ * @since 5.1
+ */
+ public ClientHttp1StreamDuplexer(
+ final ProtocolIOSession ioSession,
+ final HttpProcessor httpProcessor,
+ final Http1Config http1Config,
+ final CharCodingConfig charCodingConfig,
+ final ConnectionReuseStrategy connectionReuseStrategy,
+ final NHttpMessageParser<HttpResponse> incomingMessageParser,
+ final NHttpMessageWriter<HttpRequest> outgoingMessageWriter,
+ final ContentLengthStrategy incomingContentStrategy,
+ final ContentLengthStrategy outgoingContentStrategy,
+ final Lookup<ProtocolUpgradeHandler> protocolUpgradeHandlerLookup,
+ final Http1StreamListener streamListener) {
+ super(ioSession, http1Config, charCodingConfig, incomingMessageParser, outgoingMessageWriter,
+ incomingContentStrategy, outgoingContentStrategy, protocolUpgradeHandlerLookup);
this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
this.connectionReuseStrategy = connectionReuseStrategy != null ? connectionReuseStrategy :
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexerFactory.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexerFactory.java
index 12544e8..616f6a6 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexerFactory.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexerFactory.java
@@ -36,6 +36,7 @@ import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.config.CharCodingConfig;
import org.apache.hc.core5.http.config.Http1Config;
+import org.apache.hc.core5.http.config.Lookup;
import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
import org.apache.hc.core5.http.impl.Http1StreamListener;
@@ -43,6 +44,7 @@ import org.apache.hc.core5.http.nio.NHttpMessageParserFactory;
import org.apache.hc.core5.http.nio.NHttpMessageWriterFactory;
import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.apache.hc.core5.reactor.ProtocolIOSession;
+import org.apache.hc.core5.reactor.ProtocolUpgradeHandler;
import org.apache.hc.core5.util.Args;
/**
@@ -62,8 +64,12 @@ public final class ClientHttp1StreamDuplexerFactory {
private final NHttpMessageWriterFactory<HttpRequest> requestWriterFactory;
private final ContentLengthStrategy incomingContentStrategy;
private final ContentLengthStrategy outgoingContentStrategy;
+ private final Lookup<ProtocolUpgradeHandler> protocolUpgradeHandlerLookup;
private final Http1StreamListener streamListener;
+ /**
+ * @since 5.1
+ */
public ClientHttp1StreamDuplexerFactory(
final HttpProcessor httpProcessor,
final Http1Config http1Config,
@@ -73,6 +79,7 @@ public final class ClientHttp1StreamDuplexerFactory {
final NHttpMessageWriterFactory<HttpRequest> requestWriterFactory,
final ContentLengthStrategy incomingContentStrategy,
final ContentLengthStrategy outgoingContentStrategy,
+ final Lookup<ProtocolUpgradeHandler> protocolUpgradeHandlerLookup,
final Http1StreamListener streamListener) {
this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
@@ -87,6 +94,7 @@ public final class ClientHttp1StreamDuplexerFactory {
DefaultContentLengthStrategy.INSTANCE;
this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
DefaultContentLengthStrategy.INSTANCE;
+ this.protocolUpgradeHandlerLookup = protocolUpgradeHandlerLookup;
this.streamListener = streamListener;
}
@@ -97,9 +105,23 @@ public final class ClientHttp1StreamDuplexerFactory {
final ConnectionReuseStrategy connectionReuseStrategy,
final NHttpMessageParserFactory<HttpResponse> responseParserFactory,
final NHttpMessageWriterFactory<HttpRequest> requestWriterFactory,
+ final ContentLengthStrategy incomingContentStrategy,
+ final ContentLengthStrategy outgoingContentStrategy,
+ final Http1StreamListener streamListener) {
+ this(httpProcessor, http1Config, charCodingConfig, connectionReuseStrategy, responseParserFactory,
+ requestWriterFactory, incomingContentStrategy, outgoingContentStrategy, null, streamListener);
+ }
+
+ public ClientHttp1StreamDuplexerFactory(
+ final HttpProcessor httpProcessor,
+ final Http1Config http1Config,
+ final CharCodingConfig charCodingConfig,
+ final ConnectionReuseStrategy connectionReuseStrategy,
+ final NHttpMessageParserFactory<HttpResponse> responseParserFactory,
+ final NHttpMessageWriterFactory<HttpRequest> requestWriterFactory,
final Http1StreamListener streamListener) {
this(httpProcessor, http1Config, charCodingConfig, connectionReuseStrategy,
- responseParserFactory, requestWriterFactory, null, null, streamListener);
+ responseParserFactory, requestWriterFactory, null, null, null, streamListener);
}
public ClientHttp1StreamDuplexerFactory(
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1IOEventHandlerFactory.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1IOEventHandlerFactory.java
index 09ece66..5d9668a 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1IOEventHandlerFactory.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1IOEventHandlerFactory.java
@@ -32,6 +32,7 @@ import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
+import org.apache.hc.core5.reactor.EndpointParameters;
import org.apache.hc.core5.reactor.IOEventHandler;
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.ProtocolIOSession;
@@ -65,14 +66,14 @@ public class ServerHttp1IOEventHandlerFactory implements IOEventHandlerFactory {
String endpointScheme = URIScheme.HTTP.id;
if (attachment instanceof EndpointParameters) {
final EndpointParameters params = (EndpointParameters) attachment;
- endpointScheme = params.scheme;
+ endpointScheme = params.getScheme();
if (tlsStrategy != null && URIScheme.HTTPS.same(endpointScheme)) {
tlsStrategy.upgrade(
ioSession,
null,
ioSession.getLocalAddress(),
ioSession.getRemoteAddress(),
- params.attachment,
+ params.getAttachment(),
handshakeTimeout);
}
} else {
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamDuplexer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamDuplexer.java
index 9409940..aab360e 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamDuplexer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamDuplexer.java
@@ -47,6 +47,7 @@ import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.config.CharCodingConfig;
import org.apache.hc.core5.http.config.Http1Config;
+import org.apache.hc.core5.http.config.Lookup;
import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
@@ -65,6 +66,7 @@ import org.apache.hc.core5.http.protocol.HttpCoreContext;
import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.reactor.ProtocolIOSession;
+import org.apache.hc.core5.reactor.ProtocolUpgradeHandler;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.Asserts;
import org.apache.hc.core5.util.Timeout;
@@ -104,7 +106,30 @@ public class ServerHttp1StreamDuplexer extends AbstractHttp1StreamDuplexer<HttpR
final ContentLengthStrategy incomingContentStrategy,
final ContentLengthStrategy outgoingContentStrategy,
final Http1StreamListener streamListener) {
- super(ioSession, http1Config, charCodingConfig, incomingMessageParser, outgoingMessageWriter, incomingContentStrategy, outgoingContentStrategy);
+ this(ioSession, httpProcessor, exchangeHandlerFactory, scheme, http1Config, charCodingConfig,
+ connectionReuseStrategy, incomingMessageParser, outgoingMessageWriter, incomingContentStrategy,
+ outgoingContentStrategy, null, streamListener);
+ }
+
+ /**
+ * @since 5.1
+ */
+ public ServerHttp1StreamDuplexer(
+ final ProtocolIOSession ioSession,
+ final HttpProcessor httpProcessor,
+ final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
+ final String scheme,
+ final Http1Config http1Config,
+ final CharCodingConfig charCodingConfig,
+ final ConnectionReuseStrategy connectionReuseStrategy,
+ final NHttpMessageParser<HttpRequest> incomingMessageParser,
+ final NHttpMessageWriter<HttpResponse> outgoingMessageWriter,
+ final ContentLengthStrategy incomingContentStrategy,
+ final ContentLengthStrategy outgoingContentStrategy,
+ final Lookup<ProtocolUpgradeHandler> protocolUpgradeHandlerLookup,
+ final Http1StreamListener streamListener) {
+ super(ioSession, http1Config, charCodingConfig, incomingMessageParser, outgoingMessageWriter,
+ incomingContentStrategy, outgoingContentStrategy, protocolUpgradeHandlerLookup);
this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
this.exchangeHandlerFactory = Args.notNull(exchangeHandlerFactory, "Exchange handler factory");
this.scheme = scheme;
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamDuplexerFactory.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamDuplexerFactory.java
index 5d1c646..c724dbc 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamDuplexerFactory.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamDuplexerFactory.java
@@ -36,6 +36,7 @@ import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.config.CharCodingConfig;
import org.apache.hc.core5.http.config.Http1Config;
+import org.apache.hc.core5.http.config.Lookup;
import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
import org.apache.hc.core5.http.impl.Http1StreamListener;
@@ -45,6 +46,7 @@ import org.apache.hc.core5.http.nio.NHttpMessageParserFactory;
import org.apache.hc.core5.http.nio.NHttpMessageWriterFactory;
import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.apache.hc.core5.reactor.ProtocolIOSession;
+import org.apache.hc.core5.reactor.ProtocolUpgradeHandler;
import org.apache.hc.core5.util.Args;
/**
@@ -65,8 +67,12 @@ public final class ServerHttp1StreamDuplexerFactory {
private final NHttpMessageWriterFactory<HttpResponse> responseWriterFactory;
private final ContentLengthStrategy incomingContentStrategy;
private final ContentLengthStrategy outgoingContentStrategy;
+ private final Lookup<ProtocolUpgradeHandler> protocolUpgradeHandlerLookup;
private final Http1StreamListener streamListener;
+ /**
+ * @since 5.1
+ */
public ServerHttp1StreamDuplexerFactory(
final HttpProcessor httpProcessor,
final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
@@ -77,6 +83,7 @@ public final class ServerHttp1StreamDuplexerFactory {
final NHttpMessageWriterFactory<HttpResponse> responseWriterFactory,
final ContentLengthStrategy incomingContentStrategy,
final ContentLengthStrategy outgoingContentStrategy,
+ final Lookup<ProtocolUpgradeHandler> protocolUpgradeHandlerLookup,
final Http1StreamListener streamListener) {
this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
this.exchangeHandlerFactory = Args.notNull(exchangeHandlerFactory, "Exchange handler factory");
@@ -92,6 +99,7 @@ public final class ServerHttp1StreamDuplexerFactory {
DefaultContentLengthStrategy.INSTANCE;
this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
DefaultContentLengthStrategy.INSTANCE;
+ this.protocolUpgradeHandlerLookup = protocolUpgradeHandlerLookup;
this.streamListener = streamListener;
}
@@ -103,10 +111,26 @@ public final class ServerHttp1StreamDuplexerFactory {
final ConnectionReuseStrategy connectionReuseStrategy,
final NHttpMessageParserFactory<HttpRequest> requestParserFactory,
final NHttpMessageWriterFactory<HttpResponse> responseWriterFactory,
+ final ContentLengthStrategy incomingContentStrategy,
+ final ContentLengthStrategy outgoingContentStrategy,
+ final Http1StreamListener streamListener) {
+ this(httpProcessor, exchangeHandlerFactory, http1Config, charCodingConfig, connectionReuseStrategy,
+ requestParserFactory, responseWriterFactory, incomingContentStrategy, outgoingContentStrategy,
+ null, streamListener);
+ }
+
+ public ServerHttp1StreamDuplexerFactory(
+ final HttpProcessor httpProcessor,
+ final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
+ final Http1Config http1Config,
+ final CharCodingConfig charCodingConfig,
+ final ConnectionReuseStrategy connectionReuseStrategy,
+ final NHttpMessageParserFactory<HttpRequest> requestParserFactory,
+ final NHttpMessageWriterFactory<HttpResponse> responseWriterFactory,
final Http1StreamListener streamListener) {
this(httpProcessor, exchangeHandlerFactory, http1Config, charCodingConfig,
connectionReuseStrategy, requestParserFactory, responseWriterFactory,
- null, null, streamListener);
+ null, null, null, streamListener);
}
public ServerHttp1StreamDuplexerFactory(
@@ -128,6 +152,7 @@ public final class ServerHttp1StreamDuplexerFactory {
responseWriterFactory.create(),
incomingContentStrategy,
outgoingContentStrategy,
+ protocolUpgradeHandlerLookup,
streamListener);
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/ssl/TlsStrategy.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/ssl/TlsStrategy.java
index aa7b488..5ebee69 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/ssl/TlsStrategy.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/ssl/TlsStrategy.java
@@ -44,7 +44,7 @@ public interface TlsStrategy {
* Secures current session layer with TLS security.
*
* @param sessionLayer the session layer
- * @param host the name of the opposite endpoint when givem or {@code null} otherwise.
+ * @param host the name of the opposite endpoint when given or {@code null} otherwise.
* @param localAddress the address of the local endpoint.
* @param remoteAddress the address of the remote endpoint.
* @param attachment arbitrary object passes to the TLS session initialization code.
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/EndpointParameters.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/EndpointParameters.java
similarity index 52%
copy from httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/EndpointParameters.java
copy to httpcore5/src/main/java/org/apache/hc/core5/reactor/EndpointParameters.java
index 2a3d54d..a2b2f37 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/EndpointParameters.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/EndpointParameters.java
@@ -24,10 +24,13 @@
* <http://www.apache.org/>.
*
*/
-package org.apache.hc.core5.http.impl.nio;
+package org.apache.hc.core5.reactor;
import org.apache.hc.core5.annotation.Internal;
-import org.apache.hc.core5.http.URIScheme;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.net.NamedEndpoint;
+import org.apache.hc.core5.net.Ports;
+import org.apache.hc.core5.util.Args;
/**
* Endpoint initialization parameters
@@ -35,20 +38,51 @@ import org.apache.hc.core5.http.URIScheme;
* @since 5.1
*/
@Internal
-public final class EndpointParameters {
+public final class EndpointParameters implements NamedEndpoint {
- public final String scheme;
- public final Object attachment;
+ private final String scheme;
+ private final String hostName;
+ private final int port;
+ private final Object attachment;
- public EndpointParameters(final String scheme, final Object attachment) {
- this.scheme = scheme != null ? scheme : URIScheme.HTTP.id;
+ public EndpointParameters(final String scheme, final String hostName, final int port, final Object attachment) {
+ this.scheme = Args.notBlank(scheme, "Protocol scheme");
+ this.hostName = Args.notBlank(hostName, "Endpoint name");
+ this.port = Ports.checkWithDefault(port);
this.attachment = attachment;
}
+ public EndpointParameters(final HttpHost host, final Object attachment) {
+ Args.notNull(host, "HTTP host");
+ this.scheme = host.getSchemeName();
+ this.hostName = host.getHostName();
+ this.port = host.getPort();
+ this.attachment = attachment;
+ }
+
+ public String getScheme() {
+ return scheme;
+ }
+
+ public String getHostName() {
+ return hostName;
+ }
+
+ @Override
+ public int getPort() {
+ return port;
+ }
+
+ public Object getAttachment() {
+ return attachment;
+ }
+
@Override
public String toString() {
return "EndpointParameters{" +
- "scheme=" + scheme +
+ "scheme='" + scheme + '\'' +
+ ", name='" + hostName + '\'' +
+ ", port=" + port +
", attachment=" + attachment +
'}';
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
index 12ee7c6..ed9be48 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
@@ -32,6 +32,7 @@ import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
+import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -39,7 +40,9 @@ import java.util.concurrent.locks.Lock;
import javax.net.ssl.SSLContext;
+import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.function.Callback;
+import org.apache.hc.core5.http.ConnectionClosedException;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.net.NamedEndpoint;
import org.apache.hc.core5.reactor.ssl.SSLBufferMode;
@@ -51,13 +54,14 @@ import org.apache.hc.core5.reactor.ssl.TlsDetails;
import org.apache.hc.core5.util.Asserts;
import org.apache.hc.core5.util.Timeout;
-final class InternalDataChannel extends InternalChannel implements ProtocolIOSession {
+final class InternalDataChannel extends InternalChannel implements ProtocolIOSession, TransportSecurityLayerEx {
private final IOSession ioSession;
private final NamedEndpoint initialEndpoint;
private final IOSessionListener sessionListener;
private final AtomicReference<SSLIOSession> tlsSessionRef;
private final Queue<InternalDataChannel> closedSessions;
+ private final Queue<FutureCallback<ProtocolIOSession>> callbackQueue;
private final AtomicBoolean closed;
InternalDataChannel(
@@ -70,6 +74,7 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
this.closedSessions = closedSessions;
this.sessionListener = sessionListener;
this.tlsSessionRef = new AtomicReference<>(null);
+ this.callbackQueue = new LinkedList<>();
this.closed = new AtomicBoolean(false);
}
@@ -162,18 +167,21 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
if (handler != null) {
handler.exception(this, cause);
}
+ notifySubscribers(cause);
}
- void onTLSSessionStart() {
+ void onTLSSessionStart(final SSLIOSession sslSession) {
if (sessionListener != null) {
sessionListener.connected(this);
}
+ notifySubscribers();
}
void onTLSSessionEnd() {
if (closed.compareAndSet(false, true)) {
closedSessions.add(this);
}
+ cancelSubscribers();
}
void disconnected() {
@@ -208,7 +216,7 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
@Override
public void execute(final SSLIOSession sslSession) {
- onTLSSessionStart();
+ onTLSSessionStart(sslSession);
}
},
@@ -237,6 +245,53 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
}
@Override
+ public void subscribe(final FutureCallback<ProtocolIOSession> callback) {
+ if (callback == null) {
+ return;
+ }
+ synchronized (callbackQueue) {
+ if (getSessionImpl().getStatus() != Status.ACTIVE) {
+ callback.failed(new ConnectionClosedException());
+ return;
+ }
+ final SSLIOSession sslIoSession = tlsSessionRef.get();
+ final TlsDetails tlsDetails = sslIoSession != null ? sslIoSession.getTlsDetails() : null;
+ if (tlsDetails != null) {
+ callback.completed(this);
+ } else {
+ callbackQueue.add(callback);
+ }
+ }
+ }
+
+ void notifySubscribers() {
+ synchronized (callbackQueue) {
+ FutureCallback<ProtocolIOSession> callback;
+ while ((callback = callbackQueue.poll()) != null) {
+ callback.completed(this);
+ }
+ }
+ }
+
+ void notifySubscribers(final Exception ex) {
+ synchronized (callbackQueue) {
+ FutureCallback<ProtocolIOSession> callback;
+ while ((callback = callbackQueue.poll()) != null) {
+ callback.failed(ex);
+ }
+ }
+ }
+
+ void cancelSubscribers() {
+ synchronized (callbackQueue) {
+ FutureCallback<ProtocolIOSession> callback;
+ while ((callback = callbackQueue.poll()) != null) {
+ callback.cancelled();
+ }
+ }
+ }
+
+ @Override
public Lock getLock() {
return ioSession.getLock();
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/EndpointParameters.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolLayer.java
similarity index 67%
copy from httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/EndpointParameters.java
copy to httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolLayer.java
index 2a3d54d..3b6533c 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/EndpointParameters.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolLayer.java
@@ -24,33 +24,27 @@
* <http://www.apache.org/>.
*
*/
-package org.apache.hc.core5.http.impl.nio;
+
+package org.apache.hc.core5.reactor;
import org.apache.hc.core5.annotation.Internal;
-import org.apache.hc.core5.http.URIScheme;
/**
- * Endpoint initialization parameters
+ * Application protocol layer interface.
*
* @since 5.1
*/
@Internal
-public final class EndpointParameters {
-
- public final String scheme;
- public final Object attachment;
+public interface ProtocolLayer {
- public EndpointParameters(final String scheme, final Object attachment) {
- this.scheme = scheme != null ? scheme : URIScheme.HTTP.id;
- this.attachment = attachment;
- }
+ /**
+ * Switches to the given application protocol.
+ */
+ void upgrade(ProtocolUpgradeHandler handler, EndpointParameters parameters) throws UnsupportedOperationException;
- @Override
- public String toString() {
- return "EndpointParameters{" +
- "scheme=" + scheme +
- ", attachment=" + attachment +
- '}';
- }
+ /**
+ * Switches to the application protocol with the given protocol id.
+ */
+ void upgrade(String id, EndpointParameters parameters) throws UnsupportedOperationException;
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/EndpointParameters.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolUpgradeHandler.java
similarity index 67%
copy from httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/EndpointParameters.java
copy to httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolUpgradeHandler.java
index 2a3d54d..5582484 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/EndpointParameters.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolUpgradeHandler.java
@@ -24,33 +24,23 @@
* <http://www.apache.org/>.
*
*/
-package org.apache.hc.core5.http.impl.nio;
+
+package org.apache.hc.core5.reactor;
import org.apache.hc.core5.annotation.Internal;
-import org.apache.hc.core5.http.URIScheme;
+import org.apache.hc.core5.concurrent.FutureCallback;
/**
- * Endpoint initialization parameters
+ * Application protocol upgrade handler.
*
* @since 5.1
*/
@Internal
-public final class EndpointParameters {
-
- public final String scheme;
- public final Object attachment;
-
- public EndpointParameters(final String scheme, final Object attachment) {
- this.scheme = scheme != null ? scheme : URIScheme.HTTP.id;
- this.attachment = attachment;
- }
+public interface ProtocolUpgradeHandler {
- @Override
- public String toString() {
- return "EndpointParameters{" +
- "scheme=" + scheme +
- ", attachment=" + attachment +
- '}';
- }
+ /**
+ * Upgrades application protocol of the given I/O session.
+ */
+ void upgrade(ProtocolIOSession ioSession, EndpointParameters parameters, FutureCallback<ProtocolIOSession> callback);
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/EndpointParameters.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/TransportSecurityLayerEx.java
similarity index 67%
rename from httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/EndpointParameters.java
rename to httpcore5/src/main/java/org/apache/hc/core5/reactor/TransportSecurityLayerEx.java
index 2a3d54d..a41b907 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/EndpointParameters.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/TransportSecurityLayerEx.java
@@ -24,33 +24,24 @@
* <http://www.apache.org/>.
*
*/
-package org.apache.hc.core5.http.impl.nio;
+
+package org.apache.hc.core5.reactor;
import org.apache.hc.core5.annotation.Internal;
-import org.apache.hc.core5.http.URIScheme;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
/**
- * Endpoint initialization parameters
+ * Application protocol layer interface.
*
* @since 5.1
*/
@Internal
-public final class EndpointParameters {
-
- public final String scheme;
- public final Object attachment;
-
- public EndpointParameters(final String scheme, final Object attachment) {
- this.scheme = scheme != null ? scheme : URIScheme.HTTP.id;
- this.attachment = attachment;
- }
+public interface TransportSecurityLayerEx extends TransportSecurityLayer {
- @Override
- public String toString() {
- return "EndpointParameters{" +
- "scheme=" + scheme +
- ", attachment=" + attachment +
- '}';
- }
+ /**
+ * Adds subscription to the TLS handshake completion event.
+ */
+ void subscribe(FutureCallback<ProtocolIOSession> callback);
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
index e6a40b4..8530858 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
@@ -93,7 +93,7 @@ public class SSLIOSession implements IOSession {
private volatile Status status = Status.ACTIVE;
private volatile boolean initialized;
private volatile Timeout socketTimeout;
- private TlsDetails tlsDetails;
+ private volatile TlsDetails tlsDetails;
/**
* Creates new instance of {@code SSLIOSession} class.