You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2020/10/29 22:06:35 UTC

[httpcomponents-core] 01/01: Revised HTTP protocol negotiation for non-blocking I/O sessions

This is an automated email from the ASF dual-hosted git repository.

olegk pushed a commit to branch tls_upgrade_redesign
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git

commit 92c0e5e00f7252802ab6c5a306edcbce3a21d9f7
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Thu Oct 29 15:43:45 2020 +0100

    Revised HTTP protocol negotiation for non-blocking I/O sessions
---
 .../impl/nio/ClientHttpProtocolNegotiator.java     | 144 +++++++++------
 .../impl/nio/ProtocolNegotiationException.java     |  49 +++++
 .../impl/nio/ServerHttpProtocolNegotiator.java     | 205 ++++++++++++---------
 3 files changed, 259 insertions(+), 139 deletions(-)

diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
index 30e6117..df80e96 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
@@ -32,16 +32,18 @@ import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.ByteChannel;
 import java.nio.channels.SelectionKey;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.net.ssl.SSLSession;
 
 import org.apache.hc.core5.annotation.Internal;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.ConnectionClosedException;
 import org.apache.hc.core5.http.EndpointDetails;
 import org.apache.hc.core5.http.ProtocolVersion;
 import org.apache.hc.core5.http.impl.nio.BufferedData;
 import org.apache.hc.core5.http.impl.nio.ClientHttp1IOEventHandler;
-import org.apache.hc.core5.http.impl.nio.ClientHttp1StreamDuplexer;
 import org.apache.hc.core5.http.impl.nio.ClientHttp1StreamDuplexerFactory;
 import org.apache.hc.core5.http.impl.nio.HttpConnectionEventHandler;
 import org.apache.hc.core5.http.nio.command.CommandSupport;
@@ -76,6 +78,9 @@ public class ClientHttpProtocolNegotiator implements HttpConnectionEventHandler
     private final ClientH2StreamMultiplexerFactory http2StreamHandlerFactory;
     private final HttpVersionPolicy versionPolicy;
     private final AtomicReference<HttpConnectionEventHandler> protocolHandlerRef;
+    private final FutureCallback<HttpConnectionEventHandler> resultCallback;
+    private final AtomicBoolean initialized;
+    private final AtomicBoolean completed;
 
     private volatile ByteBuffer preface;
     private volatile BufferedData inBuf;
@@ -85,62 +90,50 @@ public class ClientHttpProtocolNegotiator implements HttpConnectionEventHandler
             final ClientHttp1StreamDuplexerFactory http1StreamHandlerFactory,
             final ClientH2StreamMultiplexerFactory http2StreamHandlerFactory,
             final HttpVersionPolicy versionPolicy) {
+        this(ioSession, http1StreamHandlerFactory, http2StreamHandlerFactory, versionPolicy, null);
+    }
+
+    /**
+     * @since 5.1
+     */
+    public ClientHttpProtocolNegotiator(
+            final ProtocolIOSession ioSession,
+            final ClientHttp1StreamDuplexerFactory http1StreamHandlerFactory,
+            final ClientH2StreamMultiplexerFactory http2StreamHandlerFactory,
+            final HttpVersionPolicy versionPolicy,
+            final FutureCallback<HttpConnectionEventHandler> resultCallback) {
         this.ioSession = Args.notNull(ioSession, "I/O session");
         this.http1StreamHandlerFactory = Args.notNull(http1StreamHandlerFactory, "HTTP/1.1 stream handler factory");
         this.http2StreamHandlerFactory = Args.notNull(http2StreamHandlerFactory, "HTTP/2 stream handler factory");
         this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
         this.protocolHandlerRef = new AtomicReference<>(null);
+        this.resultCallback = resultCallback;
+        this.initialized = new AtomicBoolean();
+        this.completed = new AtomicBoolean();
     }
 
-    private void startHttp1(final IOSession session) {
-        final ClientHttp1StreamDuplexer http1StreamHandler = http1StreamHandlerFactory.create(ioSession);
-        final HttpConnectionEventHandler protocolHandler = new ClientHttp1IOEventHandler(http1StreamHandler);
-        try {
-            ioSession.upgrade(protocolHandler);
-            protocolHandlerRef.set(protocolHandler);
-            protocolHandler.connected(session);
-            if (inBuf != null) {
-                protocolHandler.inputReady(session, inBuf.data());
-                inBuf.clear();
-            }
-        } catch (final Exception ex) {
-            protocolHandler.exception(session, ex);
-            session.close(CloseMode.IMMEDIATE);
+    private void startProtocol(final HttpConnectionEventHandler protocolHandler) throws IOException {
+        protocolHandlerRef.set(protocolHandler);
+        ioSession.upgrade(protocolHandler);
+        protocolHandler.connected(ioSession);
+        if (inBuf != null) {
+            protocolHandler.inputReady(ioSession, inBuf.data());
+            inBuf.clear();
+        }
+        if (completed.compareAndSet(true, false)) {
+            resultCallback.completed(protocolHandler);
         }
     }
 
-    private void startHttp2(final IOSession session) {
-        final ClientH2StreamMultiplexer streamMultiplexer = http2StreamHandlerFactory.create(ioSession);
-        final HttpConnectionEventHandler protocolHandler = new ClientH2IOEventHandler(streamMultiplexer);
-        try {
-            ioSession.upgrade(protocolHandler);
-            protocolHandlerRef.set(protocolHandler);
-            protocolHandler.connected(session);
-            if (inBuf != null) {
-                protocolHandler.inputReady(session, inBuf.data());
-                inBuf.clear();
-            }
-        } catch (final Exception ex) {
-            protocolHandler.exception(session, ex);
-            session.close(CloseMode.IMMEDIATE);
-        }
+    private void startHttp1() throws IOException {
+        startProtocol(new ClientHttp1IOEventHandler(http1StreamHandlerFactory.create(ioSession)));
     }
 
-    private void writeOutPreface(final IOSession session) throws IOException {
-        if (preface.hasRemaining()) {
-            final ByteChannel channel = session;
-            channel.write(preface);
-        }
-        if (!preface.hasRemaining()) {
-            session.clearEvent(SelectionKey.OP_WRITE);
-            startHttp2(session);
-        } else {
-            session.setEvent(SelectionKey.OP_WRITE);
-        }
+    private void startHttp2() throws IOException {
+        startProtocol(new ClientH2IOEventHandler(http2StreamHandlerFactory.create(ioSession)));
     }
 
-    @Override
-    public void connected(final IOSession session) throws IOException {
+    private void initialize() throws IOException {
         switch (versionPolicy) {
             case NEGOTIATE:
                 final TlsDetails tlsDetails = ioSession.getTlsDetails();
@@ -156,8 +149,30 @@ public class ClientHttpProtocolNegotiator implements HttpConnectionEventHandler
                 break;
         }
         if (preface == null) {
-            startHttp1(session);
+            startHttp1();
         } else {
+            ioSession.setEvent(SelectionKey.OP_WRITE);
+        }
+    }
+
+    private void writeOutPreface(final IOSession session) throws IOException {
+        if (preface.hasRemaining()) {
+            final ByteChannel channel = session;
+            channel.write(preface);
+        }
+        if (!preface.hasRemaining()) {
+            session.clearEvent(SelectionKey.OP_WRITE);
+            startHttp2();
+            preface = null;
+        }
+    }
+
+    @Override
+    public void connected(final IOSession session) throws IOException {
+        if (initialized.compareAndSet(false, true)) {
+            initialize();
+        }
+        if (preface != null) {
             writeOutPreface(session);
         }
     }
@@ -170,15 +185,22 @@ public class ClientHttpProtocolNegotiator implements HttpConnectionEventHandler
             }
             inBuf.put(src);
         }
-        outputReady(session);
+        if (preface != null) {
+            writeOutPreface(session);
+        } else {
+            throw new ProtocolNegotiationException("Unexpected input");
+        }
     }
 
     @Override
     public void outputReady(final IOSession session) throws IOException {
+        if (initialized.compareAndSet(false, true)) {
+            initialize();
+        }
         if (preface != null) {
             writeOutPreface(session);
         } else {
-            session.close(CloseMode.GRACEFUL);
+            throw new ProtocolNegotiationException("Unexpected output");
         }
     }
 
@@ -189,22 +211,34 @@ public class ClientHttpProtocolNegotiator implements HttpConnectionEventHandler
 
     @Override
     public void exception(final IOSession session, final Exception cause) {
-        session.close(CloseMode.IMMEDIATE);
         final HttpConnectionEventHandler protocolHandler = protocolHandlerRef.get();
-        if (protocolHandler != null) {
-            protocolHandler.exception(session, cause);
-        } else {
-            CommandSupport.failCommands(session, cause);
+        try {
+            session.close(CloseMode.IMMEDIATE);
+            if (protocolHandler != null) {
+                protocolHandler.exception(session, cause);
+            } else {
+                CommandSupport.failCommands(session, cause);
+            }
+        } catch (final Exception ex) {
+            if (completed.compareAndSet(true, false)) {
+                resultCallback.failed(ex);
+            }
         }
     }
 
     @Override
     public void disconnected(final IOSession session) {
         final HttpConnectionEventHandler protocolHandler = protocolHandlerRef.getAndSet(null);
-        if (protocolHandler != null) {
-            protocolHandler.disconnected(ioSession);
-        } else {
-            CommandSupport.cancelCommands(session);
+        try {
+            if (protocolHandler != null) {
+                protocolHandler.disconnected(ioSession);
+            } else {
+                CommandSupport.cancelCommands(session);
+            }
+        } finally {
+            if (completed.compareAndSet(true, false)) {
+                resultCallback.failed(new ConnectionClosedException());
+            }
         }
     }
 
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ProtocolNegotiationException.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ProtocolNegotiationException.java
new file mode 100644
index 0000000..b984db2
--- /dev/null
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ProtocolNegotiationException.java
@@ -0,0 +1,49 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http2.impl.nio;
+
+import java.io.IOException;
+
+/**
+ * Signals a protocol error in HTTP protocol negotiation.
+ *
+ * @since 5.1
+ */
+public class ProtocolNegotiationException extends IOException {
+
+    /**
+     * Creates a MessageConstraintException with the specified detail message.
+     *
+     * @param message The exception detail message
+     */
+    public ProtocolNegotiationException(final String message) {
+        super(message);
+    }
+
+
+}
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
index 7bb65d2..a19f2b7 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
@@ -30,14 +30,15 @@ package org.apache.hc.core5.http2.impl.nio;
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.net.ssl.SSLSession;
 
 import org.apache.hc.core5.annotation.Internal;
+import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.http.ConnectionClosedException;
 import org.apache.hc.core5.http.EndpointDetails;
-import org.apache.hc.core5.http.HttpException;
 import org.apache.hc.core5.http.ProtocolVersion;
 import org.apache.hc.core5.http.URIScheme;
 import org.apache.hc.core5.http.impl.nio.BufferedData;
@@ -74,6 +75,9 @@ public class ServerHttpProtocolNegotiator implements HttpConnectionEventHandler
     private final HttpVersionPolicy versionPolicy;
     private final BufferedData inBuf;
     private final AtomicReference<HttpConnectionEventHandler> protocolHandlerRef;
+    private final FutureCallback<HttpConnectionEventHandler> resultCallback;
+    private final AtomicBoolean initialized;
+    private final AtomicBoolean completed;
 
     private volatile boolean expectValidH2Preface;
 
@@ -82,102 +86,122 @@ public class ServerHttpProtocolNegotiator implements HttpConnectionEventHandler
             final ServerHttp1StreamDuplexerFactory http1StreamHandlerFactory,
             final ServerH2StreamMultiplexerFactory http2StreamHandlerFactory,
             final HttpVersionPolicy versionPolicy) {
+        this(ioSession, http1StreamHandlerFactory, http2StreamHandlerFactory, versionPolicy, null);
+    }
+
+    /**
+     * @since 5.1
+     */
+    public ServerHttpProtocolNegotiator(
+            final ProtocolIOSession ioSession,
+            final ServerHttp1StreamDuplexerFactory http1StreamHandlerFactory,
+            final ServerH2StreamMultiplexerFactory http2StreamHandlerFactory,
+            final HttpVersionPolicy versionPolicy,
+            final FutureCallback<HttpConnectionEventHandler> resultCallback) {
         this.ioSession = Args.notNull(ioSession, "I/O session");
         this.http1StreamHandlerFactory = Args.notNull(http1StreamHandlerFactory, "HTTP/1.1 stream handler factory");
         this.http2StreamHandlerFactory = Args.notNull(http2StreamHandlerFactory, "HTTP/2 stream handler factory");
         this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
         this.inBuf = BufferedData.allocate(1024);
         this.protocolHandlerRef = new AtomicReference<>(null);
+        this.resultCallback = resultCallback;
+        this.initialized = new AtomicBoolean();
+        this.completed = new AtomicBoolean();
+    }
+
+    private void startProtocol(final HttpConnectionEventHandler protocolHandler, final ByteBuffer data) throws IOException {
+        protocolHandlerRef.set(protocolHandler);
+        ioSession.upgrade(protocolHandler);
+        protocolHandler.connected(ioSession);
+        if (data != null) {
+            protocolHandler.inputReady(ioSession, data.hasRemaining() ? data : null);
+        }
+        if (completed.compareAndSet(true, false)) {
+            resultCallback.completed(protocolHandler);
+        }
+    }
+
+    private void startHttp1(final TlsDetails tlsDetails, final ByteBuffer data) throws IOException {
+        final ServerHttp1StreamDuplexer http1StreamHandler = http1StreamHandlerFactory.create(
+                tlsDetails != null ? URIScheme.HTTPS.id : URIScheme.HTTP.id,
+                ioSession);
+        startProtocol(new ServerHttp1IOEventHandler(http1StreamHandler), data);
+    }
+
+    private void startHttp2(final ByteBuffer data) throws IOException {
+        startProtocol(new ServerH2IOEventHandler(http2StreamHandlerFactory.create(ioSession)), data);
+    }
+
+    private void initialize() throws IOException {
+        final TlsDetails tlsDetails = ioSession.getTlsDetails();
+        switch (versionPolicy) {
+            case NEGOTIATE:
+                if (tlsDetails != null &&
+                        ApplicationProtocol.HTTP_2.id.equals(tlsDetails.getApplicationProtocol())) {
+                    expectValidH2Preface = true;
+                }
+                break;
+            case FORCE_HTTP_2:
+                if (tlsDetails == null ||
+                        !ApplicationProtocol.HTTP_1_1.id.equals(tlsDetails.getApplicationProtocol())) {
+                    expectValidH2Preface = true;
+                }
+                break;
+            case FORCE_HTTP_1:
+                startHttp1(tlsDetails, null);
+                break;
+        }
     }
 
     @Override
-    public void connected(final IOSession session) {
-        try {
-            final TlsDetails tlsDetails = ioSession.getTlsDetails();
-            switch (versionPolicy) {
-                case NEGOTIATE:
-                    if (tlsDetails != null &&
-                            ApplicationProtocol.HTTP_2.id.equals(tlsDetails.getApplicationProtocol())) {
-                        expectValidH2Preface = true;
-                    }
-                    break;
-                case FORCE_HTTP_2:
-                    if (tlsDetails == null ||
-                            !ApplicationProtocol.HTTP_1_1.id.equals(tlsDetails.getApplicationProtocol())) {
-                        expectValidH2Preface = true;
-                    }
-                    break;
-                case FORCE_HTTP_1:
-                    final ServerHttp1StreamDuplexer http1StreamHandler = http1StreamHandlerFactory.create(
-                            tlsDetails != null ? URIScheme.HTTPS.id : URIScheme.HTTP.id,
-                            ioSession);
-                    final HttpConnectionEventHandler protocolHandler = new ServerHttp1IOEventHandler(http1StreamHandler);
-                    ioSession.upgrade(protocolHandler);
-                    protocolHandlerRef.set(protocolHandler);
-                    http1StreamHandler.onConnect();
-                    break;
-            }
-        } catch (final Exception ex) {
-            exception(session, ex);
+    public void connected(final IOSession session) throws IOException {
+        if (initialized.compareAndSet(false, true)) {
+            initialize();
         }
     }
 
     @Override
-    public void inputReady(final IOSession session, final ByteBuffer src) {
-        try {
-            if (src != null) {
-                inBuf.put(src);
-            }
-            boolean endOfStream = false;
-            if (inBuf.length() < PREFACE.length) {
-                final int bytesRead = inBuf.readFrom(session);
-                if (bytesRead == -1) {
-                    endOfStream = true;
-                }
+    public void inputReady(final IOSession session, final ByteBuffer src) throws IOException {
+        if (src != null) {
+            inBuf.put(src);
+        }
+        boolean endOfStream = false;
+        if (inBuf.length() < PREFACE.length) {
+            final int bytesRead = inBuf.readFrom(session);
+            if (bytesRead == -1) {
+                endOfStream = true;
             }
-            final ByteBuffer data = inBuf.data();
-            if (data.remaining() >= PREFACE.length) {
-                boolean validH2Preface = true;
-                for (int i = 0; i < PREFACE.length; i++) {
-                    if (data.get() != PREFACE[i]) {
-                        if (expectValidH2Preface) {
-                            throw new HttpException("Unexpected HTTP/2 preface");
-                        }
-                        validH2Preface = false;
+        }
+        final ByteBuffer data = inBuf.data();
+        if (data.remaining() >= PREFACE.length) {
+            boolean validH2Preface = true;
+            for (int i = 0; i < PREFACE.length; i++) {
+                if (data.get() != PREFACE[i]) {
+                    if (expectValidH2Preface) {
+                        throw new ProtocolNegotiationException("Unexpected HTTP/2 preface");
                     }
+                    validH2Preface = false;
                 }
-                if (validH2Preface) {
-                    final ServerH2StreamMultiplexer http2StreamHandler = http2StreamHandlerFactory.create(ioSession);
-                    final HttpConnectionEventHandler protocolHandler = new ServerH2IOEventHandler(http2StreamHandler);
-                    ioSession.upgrade(protocolHandler);
-                    protocolHandlerRef.set(protocolHandler);
-                    http2StreamHandler.onConnect();
-                    http2StreamHandler.onInput(data.hasRemaining() ? data : null);
-                } else {
-                    final TlsDetails tlsDetails = ioSession.getTlsDetails();
-                    final ServerHttp1StreamDuplexer http1StreamHandler = http1StreamHandlerFactory.create(
-                            tlsDetails != null ? URIScheme.HTTPS.id : URIScheme.HTTP.id,
-                            ioSession);
-                    final HttpConnectionEventHandler protocolHandler = new ServerHttp1IOEventHandler(http1StreamHandler);
-                    ioSession.upgrade(protocolHandler);
-                    protocolHandlerRef.set(protocolHandler);
-                    data.rewind();
-                    http1StreamHandler.onConnect();
-                    http1StreamHandler.onInput(data);
-                }
+            }
+            if (validH2Preface) {
+                startHttp2(data.hasRemaining() ? data : null);
             } else {
-                if (endOfStream) {
-                    throw new ConnectionClosedException();
-                }
+                data.rewind();
+                startHttp1(ioSession.getTlsDetails(), data);
+            }
+        } else {
+            if (endOfStream) {
+                throw new ConnectionClosedException();
             }
-            data.clear();
-        } catch (final Exception ex) {
-            exception(session, ex);
         }
+        data.clear();
     }
 
     @Override
-    public void outputReady(final IOSession session) {
+    public void outputReady(final IOSession session) throws IOException {
+        if (initialized.compareAndSet(false, true)) {
+            initialize();
+        }
     }
 
     @Override
@@ -187,25 +211,38 @@ public class ServerHttpProtocolNegotiator implements HttpConnectionEventHandler
 
     @Override
     public void exception(final IOSession session, final Exception cause) {
-        session.close(CloseMode.IMMEDIATE);
         final HttpConnectionEventHandler protocolHandler = protocolHandlerRef.get();
-        if (protocolHandler != null) {
-            protocolHandler.exception(session, cause);
-        } else {
-            CommandSupport.failCommands(session, cause);
+        try {
+            session.close(CloseMode.IMMEDIATE);
+            if (protocolHandler != null) {
+                protocolHandler.exception(session, cause);
+            } else {
+                CommandSupport.failCommands(session, cause);
+            }
+        } catch (final Exception ex) {
+            if (completed.compareAndSet(true, false)) {
+                resultCallback.failed(ex);
+            }
         }
     }
 
     @Override
     public void disconnected(final IOSession session) {
         final HttpConnectionEventHandler protocolHandler = protocolHandlerRef.getAndSet(null);
-        if (protocolHandler != null) {
-            protocolHandler.disconnected(ioSession);
-        } else {
-            CommandSupport.cancelCommands(session);
+        try {
+            if (protocolHandler != null) {
+                protocolHandler.disconnected(ioSession);
+            } else {
+                CommandSupport.cancelCommands(session);
+            }
+        } finally {
+            if (completed.compareAndSet(true, false)) {
+                resultCallback.failed(new ConnectionClosedException());
+            }
         }
     }
 
+
     @Override
     public SSLSession getSSLSession() {
         final TlsDetails tlsDetails = ioSession.getTlsDetails();