You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2020/10/29 22:06:35 UTC
[httpcomponents-core] 01/01: Revised HTTP protocol negotiation for
non-blocking I/O sessions
This is an automated email from the ASF dual-hosted git repository.
olegk pushed a commit to branch tls_upgrade_redesign
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git
commit 92c0e5e00f7252802ab6c5a306edcbce3a21d9f7
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Thu Oct 29 15:43:45 2020 +0100
Revised HTTP protocol negotiation for non-blocking I/O sessions
---
.../impl/nio/ClientHttpProtocolNegotiator.java | 144 +++++++++------
.../impl/nio/ProtocolNegotiationException.java | 49 +++++
.../impl/nio/ServerHttpProtocolNegotiator.java | 205 ++++++++++++---------
3 files changed, 259 insertions(+), 139 deletions(-)
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
index 30e6117..df80e96 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
@@ -32,16 +32,18 @@ import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLSession;
import org.apache.hc.core5.annotation.Internal;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.ConnectionClosedException;
import org.apache.hc.core5.http.EndpointDetails;
import org.apache.hc.core5.http.ProtocolVersion;
import org.apache.hc.core5.http.impl.nio.BufferedData;
import org.apache.hc.core5.http.impl.nio.ClientHttp1IOEventHandler;
-import org.apache.hc.core5.http.impl.nio.ClientHttp1StreamDuplexer;
import org.apache.hc.core5.http.impl.nio.ClientHttp1StreamDuplexerFactory;
import org.apache.hc.core5.http.impl.nio.HttpConnectionEventHandler;
import org.apache.hc.core5.http.nio.command.CommandSupport;
@@ -76,6 +78,9 @@ public class ClientHttpProtocolNegotiator implements HttpConnectionEventHandler
private final ClientH2StreamMultiplexerFactory http2StreamHandlerFactory;
private final HttpVersionPolicy versionPolicy;
private final AtomicReference<HttpConnectionEventHandler> protocolHandlerRef;
+ private final FutureCallback<HttpConnectionEventHandler> resultCallback;
+ private final AtomicBoolean initialized;
+ private final AtomicBoolean completed;
private volatile ByteBuffer preface;
private volatile BufferedData inBuf;
@@ -85,62 +90,50 @@ public class ClientHttpProtocolNegotiator implements HttpConnectionEventHandler
final ClientHttp1StreamDuplexerFactory http1StreamHandlerFactory,
final ClientH2StreamMultiplexerFactory http2StreamHandlerFactory,
final HttpVersionPolicy versionPolicy) {
+ this(ioSession, http1StreamHandlerFactory, http2StreamHandlerFactory, versionPolicy, null);
+ }
+
+ /**
+ * @since 5.1
+ */
+ public ClientHttpProtocolNegotiator(
+ final ProtocolIOSession ioSession,
+ final ClientHttp1StreamDuplexerFactory http1StreamHandlerFactory,
+ final ClientH2StreamMultiplexerFactory http2StreamHandlerFactory,
+ final HttpVersionPolicy versionPolicy,
+ final FutureCallback<HttpConnectionEventHandler> resultCallback) {
this.ioSession = Args.notNull(ioSession, "I/O session");
this.http1StreamHandlerFactory = Args.notNull(http1StreamHandlerFactory, "HTTP/1.1 stream handler factory");
this.http2StreamHandlerFactory = Args.notNull(http2StreamHandlerFactory, "HTTP/2 stream handler factory");
this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
this.protocolHandlerRef = new AtomicReference<>(null);
+ this.resultCallback = resultCallback;
+ this.initialized = new AtomicBoolean();
+ this.completed = new AtomicBoolean();
}
- private void startHttp1(final IOSession session) {
- final ClientHttp1StreamDuplexer http1StreamHandler = http1StreamHandlerFactory.create(ioSession);
- final HttpConnectionEventHandler protocolHandler = new ClientHttp1IOEventHandler(http1StreamHandler);
- try {
- ioSession.upgrade(protocolHandler);
- protocolHandlerRef.set(protocolHandler);
- protocolHandler.connected(session);
- if (inBuf != null) {
- protocolHandler.inputReady(session, inBuf.data());
- inBuf.clear();
- }
- } catch (final Exception ex) {
- protocolHandler.exception(session, ex);
- session.close(CloseMode.IMMEDIATE);
+ private void startProtocol(final HttpConnectionEventHandler protocolHandler) throws IOException {
+ protocolHandlerRef.set(protocolHandler);
+ ioSession.upgrade(protocolHandler);
+ protocolHandler.connected(ioSession);
+ if (inBuf != null) {
+ protocolHandler.inputReady(ioSession, inBuf.data());
+ inBuf.clear();
+ }
+ if (completed.compareAndSet(true, false)) {
+ resultCallback.completed(protocolHandler);
}
}
- private void startHttp2(final IOSession session) {
- final ClientH2StreamMultiplexer streamMultiplexer = http2StreamHandlerFactory.create(ioSession);
- final HttpConnectionEventHandler protocolHandler = new ClientH2IOEventHandler(streamMultiplexer);
- try {
- ioSession.upgrade(protocolHandler);
- protocolHandlerRef.set(protocolHandler);
- protocolHandler.connected(session);
- if (inBuf != null) {
- protocolHandler.inputReady(session, inBuf.data());
- inBuf.clear();
- }
- } catch (final Exception ex) {
- protocolHandler.exception(session, ex);
- session.close(CloseMode.IMMEDIATE);
- }
+ private void startHttp1() throws IOException {
+ startProtocol(new ClientHttp1IOEventHandler(http1StreamHandlerFactory.create(ioSession)));
}
- private void writeOutPreface(final IOSession session) throws IOException {
- if (preface.hasRemaining()) {
- final ByteChannel channel = session;
- channel.write(preface);
- }
- if (!preface.hasRemaining()) {
- session.clearEvent(SelectionKey.OP_WRITE);
- startHttp2(session);
- } else {
- session.setEvent(SelectionKey.OP_WRITE);
- }
+ private void startHttp2() throws IOException {
+ startProtocol(new ClientH2IOEventHandler(http2StreamHandlerFactory.create(ioSession)));
}
- @Override
- public void connected(final IOSession session) throws IOException {
+ private void initialize() throws IOException {
switch (versionPolicy) {
case NEGOTIATE:
final TlsDetails tlsDetails = ioSession.getTlsDetails();
@@ -156,8 +149,30 @@ public class ClientHttpProtocolNegotiator implements HttpConnectionEventHandler
break;
}
if (preface == null) {
- startHttp1(session);
+ startHttp1();
} else {
+ ioSession.setEvent(SelectionKey.OP_WRITE);
+ }
+ }
+
+ private void writeOutPreface(final IOSession session) throws IOException {
+ if (preface.hasRemaining()) {
+ final ByteChannel channel = session;
+ channel.write(preface);
+ }
+ if (!preface.hasRemaining()) {
+ session.clearEvent(SelectionKey.OP_WRITE);
+ startHttp2();
+ preface = null;
+ }
+ }
+
+ @Override
+ public void connected(final IOSession session) throws IOException {
+ if (initialized.compareAndSet(false, true)) {
+ initialize();
+ }
+ if (preface != null) {
writeOutPreface(session);
}
}
@@ -170,15 +185,22 @@ public class ClientHttpProtocolNegotiator implements HttpConnectionEventHandler
}
inBuf.put(src);
}
- outputReady(session);
+ if (preface != null) {
+ writeOutPreface(session);
+ } else {
+ throw new ProtocolNegotiationException("Unexpected input");
+ }
}
@Override
public void outputReady(final IOSession session) throws IOException {
+ if (initialized.compareAndSet(false, true)) {
+ initialize();
+ }
if (preface != null) {
writeOutPreface(session);
} else {
- session.close(CloseMode.GRACEFUL);
+ throw new ProtocolNegotiationException("Unexpected output");
}
}
@@ -189,22 +211,34 @@ public class ClientHttpProtocolNegotiator implements HttpConnectionEventHandler
@Override
public void exception(final IOSession session, final Exception cause) {
- session.close(CloseMode.IMMEDIATE);
final HttpConnectionEventHandler protocolHandler = protocolHandlerRef.get();
- if (protocolHandler != null) {
- protocolHandler.exception(session, cause);
- } else {
- CommandSupport.failCommands(session, cause);
+ try {
+ session.close(CloseMode.IMMEDIATE);
+ if (protocolHandler != null) {
+ protocolHandler.exception(session, cause);
+ } else {
+ CommandSupport.failCommands(session, cause);
+ }
+ } catch (final Exception ex) {
+ if (completed.compareAndSet(true, false)) {
+ resultCallback.failed(ex);
+ }
}
}
@Override
public void disconnected(final IOSession session) {
final HttpConnectionEventHandler protocolHandler = protocolHandlerRef.getAndSet(null);
- if (protocolHandler != null) {
- protocolHandler.disconnected(ioSession);
- } else {
- CommandSupport.cancelCommands(session);
+ try {
+ if (protocolHandler != null) {
+ protocolHandler.disconnected(ioSession);
+ } else {
+ CommandSupport.cancelCommands(session);
+ }
+ } finally {
+ if (completed.compareAndSet(true, false)) {
+ resultCallback.failed(new ConnectionClosedException());
+ }
}
}
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ProtocolNegotiationException.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ProtocolNegotiationException.java
new file mode 100644
index 0000000..b984db2
--- /dev/null
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ProtocolNegotiationException.java
@@ -0,0 +1,49 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http2.impl.nio;
+
+import java.io.IOException;
+
+/**
+ * Signals a protocol error in HTTP protocol negotiation.
+ *
+ * @since 5.1
+ */
+public class ProtocolNegotiationException extends IOException {
+
+ /**
+ * Creates a MessageConstraintException with the specified detail message.
+ *
+ * @param message The exception detail message
+ */
+ public ProtocolNegotiationException(final String message) {
+ super(message);
+ }
+
+
+}
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
index 7bb65d2..a19f2b7 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
@@ -30,14 +30,15 @@ package org.apache.hc.core5.http2.impl.nio;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLSession;
import org.apache.hc.core5.annotation.Internal;
+import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ConnectionClosedException;
import org.apache.hc.core5.http.EndpointDetails;
-import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.ProtocolVersion;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http.impl.nio.BufferedData;
@@ -74,6 +75,9 @@ public class ServerHttpProtocolNegotiator implements HttpConnectionEventHandler
private final HttpVersionPolicy versionPolicy;
private final BufferedData inBuf;
private final AtomicReference<HttpConnectionEventHandler> protocolHandlerRef;
+ private final FutureCallback<HttpConnectionEventHandler> resultCallback;
+ private final AtomicBoolean initialized;
+ private final AtomicBoolean completed;
private volatile boolean expectValidH2Preface;
@@ -82,102 +86,122 @@ public class ServerHttpProtocolNegotiator implements HttpConnectionEventHandler
final ServerHttp1StreamDuplexerFactory http1StreamHandlerFactory,
final ServerH2StreamMultiplexerFactory http2StreamHandlerFactory,
final HttpVersionPolicy versionPolicy) {
+ this(ioSession, http1StreamHandlerFactory, http2StreamHandlerFactory, versionPolicy, null);
+ }
+
+ /**
+ * @since 5.1
+ */
+ public ServerHttpProtocolNegotiator(
+ final ProtocolIOSession ioSession,
+ final ServerHttp1StreamDuplexerFactory http1StreamHandlerFactory,
+ final ServerH2StreamMultiplexerFactory http2StreamHandlerFactory,
+ final HttpVersionPolicy versionPolicy,
+ final FutureCallback<HttpConnectionEventHandler> resultCallback) {
this.ioSession = Args.notNull(ioSession, "I/O session");
this.http1StreamHandlerFactory = Args.notNull(http1StreamHandlerFactory, "HTTP/1.1 stream handler factory");
this.http2StreamHandlerFactory = Args.notNull(http2StreamHandlerFactory, "HTTP/2 stream handler factory");
this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
this.inBuf = BufferedData.allocate(1024);
this.protocolHandlerRef = new AtomicReference<>(null);
+ this.resultCallback = resultCallback;
+ this.initialized = new AtomicBoolean();
+ this.completed = new AtomicBoolean();
+ }
+
+ private void startProtocol(final HttpConnectionEventHandler protocolHandler, final ByteBuffer data) throws IOException {
+ protocolHandlerRef.set(protocolHandler);
+ ioSession.upgrade(protocolHandler);
+ protocolHandler.connected(ioSession);
+ if (data != null) {
+ protocolHandler.inputReady(ioSession, data.hasRemaining() ? data : null);
+ }
+ if (completed.compareAndSet(true, false)) {
+ resultCallback.completed(protocolHandler);
+ }
+ }
+
+ private void startHttp1(final TlsDetails tlsDetails, final ByteBuffer data) throws IOException {
+ final ServerHttp1StreamDuplexer http1StreamHandler = http1StreamHandlerFactory.create(
+ tlsDetails != null ? URIScheme.HTTPS.id : URIScheme.HTTP.id,
+ ioSession);
+ startProtocol(new ServerHttp1IOEventHandler(http1StreamHandler), data);
+ }
+
+ private void startHttp2(final ByteBuffer data) throws IOException {
+ startProtocol(new ServerH2IOEventHandler(http2StreamHandlerFactory.create(ioSession)), data);
+ }
+
+ private void initialize() throws IOException {
+ final TlsDetails tlsDetails = ioSession.getTlsDetails();
+ switch (versionPolicy) {
+ case NEGOTIATE:
+ if (tlsDetails != null &&
+ ApplicationProtocol.HTTP_2.id.equals(tlsDetails.getApplicationProtocol())) {
+ expectValidH2Preface = true;
+ }
+ break;
+ case FORCE_HTTP_2:
+ if (tlsDetails == null ||
+ !ApplicationProtocol.HTTP_1_1.id.equals(tlsDetails.getApplicationProtocol())) {
+ expectValidH2Preface = true;
+ }
+ break;
+ case FORCE_HTTP_1:
+ startHttp1(tlsDetails, null);
+ break;
+ }
}
@Override
- public void connected(final IOSession session) {
- try {
- final TlsDetails tlsDetails = ioSession.getTlsDetails();
- switch (versionPolicy) {
- case NEGOTIATE:
- if (tlsDetails != null &&
- ApplicationProtocol.HTTP_2.id.equals(tlsDetails.getApplicationProtocol())) {
- expectValidH2Preface = true;
- }
- break;
- case FORCE_HTTP_2:
- if (tlsDetails == null ||
- !ApplicationProtocol.HTTP_1_1.id.equals(tlsDetails.getApplicationProtocol())) {
- expectValidH2Preface = true;
- }
- break;
- case FORCE_HTTP_1:
- final ServerHttp1StreamDuplexer http1StreamHandler = http1StreamHandlerFactory.create(
- tlsDetails != null ? URIScheme.HTTPS.id : URIScheme.HTTP.id,
- ioSession);
- final HttpConnectionEventHandler protocolHandler = new ServerHttp1IOEventHandler(http1StreamHandler);
- ioSession.upgrade(protocolHandler);
- protocolHandlerRef.set(protocolHandler);
- http1StreamHandler.onConnect();
- break;
- }
- } catch (final Exception ex) {
- exception(session, ex);
+ public void connected(final IOSession session) throws IOException {
+ if (initialized.compareAndSet(false, true)) {
+ initialize();
}
}
@Override
- public void inputReady(final IOSession session, final ByteBuffer src) {
- try {
- if (src != null) {
- inBuf.put(src);
- }
- boolean endOfStream = false;
- if (inBuf.length() < PREFACE.length) {
- final int bytesRead = inBuf.readFrom(session);
- if (bytesRead == -1) {
- endOfStream = true;
- }
+ public void inputReady(final IOSession session, final ByteBuffer src) throws IOException {
+ if (src != null) {
+ inBuf.put(src);
+ }
+ boolean endOfStream = false;
+ if (inBuf.length() < PREFACE.length) {
+ final int bytesRead = inBuf.readFrom(session);
+ if (bytesRead == -1) {
+ endOfStream = true;
}
- final ByteBuffer data = inBuf.data();
- if (data.remaining() >= PREFACE.length) {
- boolean validH2Preface = true;
- for (int i = 0; i < PREFACE.length; i++) {
- if (data.get() != PREFACE[i]) {
- if (expectValidH2Preface) {
- throw new HttpException("Unexpected HTTP/2 preface");
- }
- validH2Preface = false;
+ }
+ final ByteBuffer data = inBuf.data();
+ if (data.remaining() >= PREFACE.length) {
+ boolean validH2Preface = true;
+ for (int i = 0; i < PREFACE.length; i++) {
+ if (data.get() != PREFACE[i]) {
+ if (expectValidH2Preface) {
+ throw new ProtocolNegotiationException("Unexpected HTTP/2 preface");
}
+ validH2Preface = false;
}
- if (validH2Preface) {
- final ServerH2StreamMultiplexer http2StreamHandler = http2StreamHandlerFactory.create(ioSession);
- final HttpConnectionEventHandler protocolHandler = new ServerH2IOEventHandler(http2StreamHandler);
- ioSession.upgrade(protocolHandler);
- protocolHandlerRef.set(protocolHandler);
- http2StreamHandler.onConnect();
- http2StreamHandler.onInput(data.hasRemaining() ? data : null);
- } else {
- final TlsDetails tlsDetails = ioSession.getTlsDetails();
- final ServerHttp1StreamDuplexer http1StreamHandler = http1StreamHandlerFactory.create(
- tlsDetails != null ? URIScheme.HTTPS.id : URIScheme.HTTP.id,
- ioSession);
- final HttpConnectionEventHandler protocolHandler = new ServerHttp1IOEventHandler(http1StreamHandler);
- ioSession.upgrade(protocolHandler);
- protocolHandlerRef.set(protocolHandler);
- data.rewind();
- http1StreamHandler.onConnect();
- http1StreamHandler.onInput(data);
- }
+ }
+ if (validH2Preface) {
+ startHttp2(data.hasRemaining() ? data : null);
} else {
- if (endOfStream) {
- throw new ConnectionClosedException();
- }
+ data.rewind();
+ startHttp1(ioSession.getTlsDetails(), data);
+ }
+ } else {
+ if (endOfStream) {
+ throw new ConnectionClosedException();
}
- data.clear();
- } catch (final Exception ex) {
- exception(session, ex);
}
+ data.clear();
}
@Override
- public void outputReady(final IOSession session) {
+ public void outputReady(final IOSession session) throws IOException {
+ if (initialized.compareAndSet(false, true)) {
+ initialize();
+ }
}
@Override
@@ -187,25 +211,38 @@ public class ServerHttpProtocolNegotiator implements HttpConnectionEventHandler
@Override
public void exception(final IOSession session, final Exception cause) {
- session.close(CloseMode.IMMEDIATE);
final HttpConnectionEventHandler protocolHandler = protocolHandlerRef.get();
- if (protocolHandler != null) {
- protocolHandler.exception(session, cause);
- } else {
- CommandSupport.failCommands(session, cause);
+ try {
+ session.close(CloseMode.IMMEDIATE);
+ if (protocolHandler != null) {
+ protocolHandler.exception(session, cause);
+ } else {
+ CommandSupport.failCommands(session, cause);
+ }
+ } catch (final Exception ex) {
+ if (completed.compareAndSet(true, false)) {
+ resultCallback.failed(ex);
+ }
}
}
@Override
public void disconnected(final IOSession session) {
final HttpConnectionEventHandler protocolHandler = protocolHandlerRef.getAndSet(null);
- if (protocolHandler != null) {
- protocolHandler.disconnected(ioSession);
- } else {
- CommandSupport.cancelCommands(session);
+ try {
+ if (protocolHandler != null) {
+ protocolHandler.disconnected(ioSession);
+ } else {
+ CommandSupport.cancelCommands(session);
+ }
+ } finally {
+ if (completed.compareAndSet(true, false)) {
+ resultCallback.failed(new ConnectionClosedException());
+ }
}
}
+
@Override
public SSLSession getSSLSession() {
final TlsDetails tlsDetails = ioSession.getTlsDetails();