You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2019/09/26 20:38:03 UTC

[httpcomponents-core] branch master updated (44c3aa3 -> dac5470)

This is an automated email from the ASF dual-hosted git repository.

olegk pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git.


    from 44c3aa3  HTTPCLIENT-2016, regression: Tab chars are replaced by question marks in header values
     new 043de30  Added mechanism for low level i/o event handlers to push input data up the protocol processing chain instead of expecting the protocol handlers to consume it from the i/o session buffers
     new 8273dcb  Redesign of SSL/TLS async I/O event handling
     new dac5470  Simplification of HTTP/1.1 read event handling logic; better fix for HTTPCORE-599

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../http2/impl/nio/AbstractH2IOEventHandler.java   |   7 +-
 .../impl/nio/AbstractH2StreamMultiplexer.java      |  10 +-
 .../impl/nio/ClientHttpProtocolNegotiator.java     |  18 +-
 .../impl/nio/H2OnlyClientProtocolNegotiator.java   |   2 +-
 .../impl/nio/ServerHttpProtocolNegotiator.java     |  34 +-
 .../testing/nio/TestDefaultListeningIOReactor.java |   3 +-
 .../http/impl/nio/AbstractHttp1IOEventHandler.java |   7 +-
 .../http/impl/nio/AbstractHttp1StreamDuplexer.java | 110 +++--
 .../hc/core5/http/impl/nio/BufferedData.java       |  20 +
 .../apache/hc/core5/reactor/IOEventHandler.java    |   3 +-
 .../org/apache/hc/core5/reactor/IOSessionImpl.java |   3 -
 .../hc/core5/reactor/InternalDataChannel.java      |   2 +-
 .../core5/reactor/SocksProxyProtocolHandler.java   |  10 +-
 .../apache/hc/core5/reactor/ssl/SSLIOSession.java  | 474 ++++++++-------------
 14 files changed, 314 insertions(+), 389 deletions(-)


[httpcomponents-core] 01/03: Added mechanism for low level i/o event handlers to push input data up the protocol processing chain instead of expecting the protocol handlers to consume it from the i/o session buffers

Posted by ol...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

olegk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git

commit 043de30cccfac419baa6abd26353db0a8d6844fb
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Fri Sep 13 11:31:00 2019 +0200

    Added mechanism for low level i/o event handlers to push input data up the protocol processing chain instead of expecting the protocol handlers to consume it from the i/o session buffers
---
 .../hc/core5/http2/impl/nio/AbstractH2IOEventHandler.java    |  7 ++++---
 .../hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java | 10 +++++-----
 .../core5/http2/impl/nio/ClientHttpProtocolNegotiator.java   |  2 +-
 .../core5/http2/impl/nio/H2OnlyClientProtocolNegotiator.java |  2 +-
 .../core5/http2/impl/nio/ServerHttpProtocolNegotiator.java   | 12 ++++++------
 .../hc/core5/testing/nio/TestDefaultListeningIOReactor.java  |  3 ++-
 .../hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java  |  7 ++++---
 .../hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java  | 10 +++++-----
 .../java/org/apache/hc/core5/reactor/IOEventHandler.java     |  3 ++-
 .../org/apache/hc/core5/reactor/InternalDataChannel.java     |  2 +-
 .../apache/hc/core5/reactor/SocksProxyProtocolHandler.java   | 10 +++++++++-
 .../java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java   |  5 ++---
 12 files changed, 42 insertions(+), 31 deletions(-)

diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2IOEventHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2IOEventHandler.java
index 0195bb0..9dcd38e 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2IOEventHandler.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2IOEventHandler.java
@@ -29,6 +29,7 @@ package org.apache.hc.core5.http2.impl.nio;
 
 import java.io.IOException;
 import java.net.SocketAddress;
+import java.nio.ByteBuffer;
 
 import javax.net.ssl.SSLSession;
 
@@ -52,16 +53,16 @@ class AbstractH2IOEventHandler implements HttpConnectionEventHandler {
     @Override
     public void connected(final IOSession session) throws IOException {
         try {
-            streamMultiplexer.onConnect(null);
+            streamMultiplexer.onConnect();
         } catch (final HttpException ex) {
             streamMultiplexer.onException(ex);
         }
     }
 
     @Override
-    public void inputReady(final IOSession session) throws IOException {
+    public void inputReady(final IOSession session, final ByteBuffer src) throws IOException {
         try {
-            streamMultiplexer.onInput();
+            streamMultiplexer.onInput(src);
         } catch (final HttpException ex) {
             streamMultiplexer.onException(ex);
         }
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java
index fdfa55d..831114b 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java
@@ -398,10 +398,7 @@ abstract class AbstractH2StreamMultiplexer implements Identifiable, HttpConnecti
         }
     }
 
-    public final void onConnect(final ByteBuffer prefeed) throws HttpException, IOException {
-        if (prefeed != null) {
-            inputBuffer.put(prefeed);
-        }
+    public final void onConnect() throws HttpException, IOException {
         connState = ConnectionHandshake.ACTIVE;
         final RawFrame settingsFrame = frameFactory.createSettings(
                 new H2Setting(H2Param.HEADER_TABLE_SIZE, localConfig.getHeaderTableSize()),
@@ -415,10 +412,13 @@ abstract class AbstractH2StreamMultiplexer implements Identifiable, HttpConnecti
         localSettingState = SettingsHandshake.TRANSMITTED;
     }
 
-    public final void onInput() throws HttpException, IOException {
+    public final void onInput(final ByteBuffer src) throws HttpException, IOException {
         if (connState == ConnectionHandshake.SHUTDOWN) {
             ioSession.clearEvent(SelectionKey.OP_READ);
         } else {
+            if (src != null) {
+                inputBuffer.put(src);
+            }
             RawFrame frame;
             while ((frame = inputBuffer.read(ioSession)) != null) {
                 if (streamListener != null) {
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
index 8e6e346..b1da774 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
@@ -153,7 +153,7 @@ public class ClientHttpProtocolNegotiator implements HttpConnectionEventHandler
     }
 
     @Override
-    public void inputReady(final IOSession session) throws IOException  {
+    public void inputReady(final IOSession session, final ByteBuffer src) throws IOException  {
         outputReady(session);
     }
 
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2OnlyClientProtocolNegotiator.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2OnlyClientProtocolNegotiator.java
index 0decbf0..94a145f 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2OnlyClientProtocolNegotiator.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2OnlyClientProtocolNegotiator.java
@@ -114,7 +114,7 @@ public class H2OnlyClientProtocolNegotiator implements HttpConnectionEventHandle
     }
 
     @Override
-    public void inputReady(final IOSession session) {
+    public void inputReady(final IOSession session, final ByteBuffer src) {
         outputReady(session);
     }
 
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
index 157efe1..4a85534 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
@@ -113,7 +113,7 @@ public class ServerHttpProtocolNegotiator implements HttpConnectionEventHandler
                     final HttpConnectionEventHandler protocolHandler = new ServerHttp1IOEventHandler(http1StreamHandler);
                     ioSession.upgrade(protocolHandler);
                     protocolHandlerRef.set(protocolHandler);
-                    http1StreamHandler.onConnect(null);
+                    http1StreamHandler.onConnect();
                     break;
             }
         } catch (final Exception ex) {
@@ -122,7 +122,7 @@ public class ServerHttpProtocolNegotiator implements HttpConnectionEventHandler
     }
 
     @Override
-    public void inputReady(final IOSession session) {
+    public void inputReady(final IOSession session, final ByteBuffer src) {
         try {
             boolean endOfStream = false;
             if (bytebuf.position() < PREFACE.length) {
@@ -148,8 +148,8 @@ public class ServerHttpProtocolNegotiator implements HttpConnectionEventHandler
                     final HttpConnectionEventHandler protocolHandler = new ServerH2IOEventHandler(http2StreamHandler);
                     ioSession.upgrade(protocolHandler);
                     protocolHandlerRef.set(protocolHandler);
-                    http2StreamHandler.onConnect(bytebuf.hasRemaining() ? bytebuf : null);
-                    http2StreamHandler.onInput();
+                    http2StreamHandler.onConnect();
+                    http2StreamHandler.onInput(bytebuf.hasRemaining() ? bytebuf : null);
                 } else {
                     final TlsDetails tlsDetails = ioSession.getTlsDetails();
                     final ServerHttp1StreamDuplexer http1StreamHandler = http1StreamHandlerFactory.create(
@@ -159,8 +159,8 @@ public class ServerHttpProtocolNegotiator implements HttpConnectionEventHandler
                     ioSession.upgrade(protocolHandler);
                     protocolHandlerRef.set(protocolHandler);
                     bytebuf.rewind();
-                    http1StreamHandler.onConnect(bytebuf);
-                    http1StreamHandler.onInput();
+                    http1StreamHandler.onConnect();
+                    http1StreamHandler.onInput(bytebuf);
                 }
             } else {
                 if (endOfStream) {
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
index 73fc912..0bfea82 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
@@ -28,6 +28,7 @@
 package org.apache.hc.core5.testing.nio;
 
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -66,7 +67,7 @@ public class TestDefaultListeningIOReactor {
                 }
 
                 @Override
-                public void inputReady(final IOSession session) {
+                public void inputReady(final IOSession session, final ByteBuffer src) {
                 }
 
                 @Override
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java
index 81211fa..8f89d80 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java
@@ -29,6 +29,7 @@ package org.apache.hc.core5.http.impl.nio;
 
 import java.io.IOException;
 import java.net.SocketAddress;
+import java.nio.ByteBuffer;
 
 import javax.net.ssl.SSLSession;
 
@@ -51,16 +52,16 @@ class AbstractHttp1IOEventHandler implements HttpConnectionEventHandler {
     @Override
     public void connected(final IOSession session) throws IOException {
         try {
-            streamDuplexer.onConnect(null);
+            streamDuplexer.onConnect();
         } catch (final HttpException ex) {
             streamDuplexer.onException(ex);
         }
     }
 
     @Override
-    public void inputReady(final IOSession session) throws IOException {
+    public void inputReady(final IOSession session, final ByteBuffer src) throws IOException {
         try {
-            streamDuplexer.onInput();
+            streamDuplexer.onInput(src);
         } catch (final HttpException ex) {
             streamDuplexer.onException(ex);
         }
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
index 3b21997..6860bd3 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
@@ -229,15 +229,15 @@ abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
         }
     }
 
-    public final void onConnect(final ByteBuffer prefeed) throws HttpException, IOException {
-        if (prefeed != null) {
-            inbuf.put(prefeed);
-        }
+    public final void onConnect() throws HttpException, IOException {
         connState = ConnectionState.ACTIVE;
         processCommands();
     }
 
-    public final void onInput() throws HttpException, IOException {
+    public final void onInput(final ByteBuffer src) throws HttpException, IOException {
+        if (src != null) {
+            inbuf.put(src);
+        }
         while (connState.compareTo(ConnectionState.SHUTDOWN) < 0) {
             int totalBytesRead = 0;
             int messagesReceived = 0;
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandler.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandler.java
index 52cfb5c..b747ef2 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandler.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandler.java
@@ -28,6 +28,7 @@
 package org.apache.hc.core5.reactor;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.hc.core5.annotation.Internal;
 import org.apache.hc.core5.util.Timeout;
@@ -56,7 +57,7 @@ public interface IOEventHandler {
      *
      * @param session the I/O session.
      */
-    void inputReady(IOSession session) throws IOException;
+    void inputReady(IOSession session, ByteBuffer src) throws IOException;
 
     /**
      * Triggered when the given session is ready for output.
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
index 356202a..7fce8aa 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
@@ -121,7 +121,7 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
                 sessionListener.inputReady(this);
             }
             final IOEventHandler handler = ensureHandler(currentSession);
-            handler.inputReady(this);
+            handler.inputReady(this, null);
         }
         if ((readyOps & SelectionKey.OP_WRITE) != 0
                 || (ioSession.getEventMask() & SelectionKey.OP_WRITE) != 0) {
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SocksProxyProtocolHandler.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SocksProxyProtocolHandler.java
index ed55892..73fd246 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SocksProxyProtocolHandler.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SocksProxyProtocolHandler.java
@@ -32,6 +32,7 @@ import java.net.Inet4Address;
 import java.net.Inet6Address;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ByteChannel;
 import java.nio.channels.SelectionKey;
@@ -138,7 +139,14 @@ final class SocksProxyProtocolHandler implements IOEventHandler {
     }
 
     @Override
-    public void inputReady(final IOSession session) throws IOException {
+    public void inputReady(final IOSession session, final ByteBuffer src) throws IOException {
+        if (src != null) {
+            try {
+                this.buffer.put(src);
+            } catch (final BufferOverflowException ex) {
+                throw new IOException("Unexpected input data");
+            }
+        }
         switch (this.state) {
             case RECEIVE_AUTH_METHOD:
                 if (fillBuffer(session)) {
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
index 5574639..7197429 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
@@ -180,13 +180,13 @@ public class SSLIOSession implements IOSession {
             }
 
             @Override
-            public void inputReady(final IOSession protocolSession) throws IOException {
+            public void inputReady(final IOSession protocolSession, final ByteBuffer src) throws IOException {
                 ensureInitialized();
                 do {
                     bytesReadCount.set(0L);
                     if (isAppInputReady()) {
                         final IOEventHandler handler = ensureHandler();
-                        handler.inputReady(protocolSession);
+                        handler.inputReady(protocolSession, src);
                     }
                     inboundTransport();
                 } while (bytesReadCount.get() > 0);
@@ -286,7 +286,6 @@ public class SSLIOSession implements IOSession {
 
             this.inEncrypted.release();
             this.outEncrypted.release();
-            this.inPlain.release();
             doHandshake();
         } finally {
             this.session.getLock().unlock();


[httpcomponents-core] 02/03: Redesign of SSL/TLS async I/O event handling

Posted by ol...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

olegk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git

commit 8273dcbb6480c872eeb8d976df55089d32cb50a1
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Sat Sep 21 16:34:18 2019 +0200

    Redesign of SSL/TLS async I/O event handling
---
 .../impl/nio/ClientHttpProtocolNegotiator.java     |  16 +
 .../impl/nio/ServerHttpProtocolNegotiator.java     |  26 +-
 .../hc/core5/http/impl/nio/BufferedData.java       |  20 +
 .../apache/hc/core5/reactor/ssl/SSLIOSession.java  | 471 ++++++++-------------
 4 files changed, 227 insertions(+), 306 deletions(-)

diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
index b1da774..22a9b4f 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
@@ -39,6 +39,7 @@ import javax.net.ssl.SSLSession;
 import org.apache.hc.core5.annotation.Internal;
 import org.apache.hc.core5.http.EndpointDetails;
 import org.apache.hc.core5.http.ProtocolVersion;
+import org.apache.hc.core5.http.impl.nio.BufferedData;
 import org.apache.hc.core5.http.impl.nio.ClientHttp1IOEventHandler;
 import org.apache.hc.core5.http.impl.nio.ClientHttp1StreamDuplexer;
 import org.apache.hc.core5.http.impl.nio.ClientHttp1StreamDuplexerFactory;
@@ -77,6 +78,7 @@ public class ClientHttpProtocolNegotiator implements HttpConnectionEventHandler
     private final AtomicReference<HttpConnectionEventHandler> protocolHandlerRef;
 
     private volatile ByteBuffer preface;
+    private volatile BufferedData inBuf;
 
     public ClientHttpProtocolNegotiator(
             final ProtocolIOSession ioSession,
@@ -97,6 +99,10 @@ public class ClientHttpProtocolNegotiator implements HttpConnectionEventHandler
             ioSession.upgrade(protocolHandler);
             protocolHandlerRef.set(protocolHandler);
             protocolHandler.connected(session);
+            if (inBuf != null) {
+                protocolHandler.inputReady(session, inBuf.data());
+                inBuf.clear();
+            }
         } catch (final Exception ex) {
             protocolHandler.exception(session, ex);
             session.close(CloseMode.IMMEDIATE);
@@ -110,6 +116,10 @@ public class ClientHttpProtocolNegotiator implements HttpConnectionEventHandler
             ioSession.upgrade(protocolHandler);
             protocolHandlerRef.set(protocolHandler);
             protocolHandler.connected(session);
+            if (inBuf != null) {
+                protocolHandler.inputReady(session, inBuf.data());
+                inBuf.clear();
+            }
         } catch (final Exception ex) {
             protocolHandler.exception(session, ex);
             session.close(CloseMode.IMMEDIATE);
@@ -154,6 +164,12 @@ public class ClientHttpProtocolNegotiator implements HttpConnectionEventHandler
 
     @Override
     public void inputReady(final IOSession session, final ByteBuffer src) throws IOException  {
+        if (src != null) {
+            if (inBuf == null) {
+                inBuf = BufferedData.allocate(src.remaining());
+            }
+            inBuf.put(src);
+        }
         outputReady(session);
     }
 
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
index 4a85534..174366f 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
@@ -40,6 +40,7 @@ import org.apache.hc.core5.http.EndpointDetails;
 import org.apache.hc.core5.http.HttpException;
 import org.apache.hc.core5.http.ProtocolVersion;
 import org.apache.hc.core5.http.URIScheme;
+import org.apache.hc.core5.http.impl.nio.BufferedData;
 import org.apache.hc.core5.http.impl.nio.HttpConnectionEventHandler;
 import org.apache.hc.core5.http.impl.nio.ServerHttp1IOEventHandler;
 import org.apache.hc.core5.http.impl.nio.ServerHttp1StreamDuplexer;
@@ -71,7 +72,7 @@ public class ServerHttpProtocolNegotiator implements HttpConnectionEventHandler
     private final ServerHttp1StreamDuplexerFactory http1StreamHandlerFactory;
     private final ServerH2StreamMultiplexerFactory http2StreamHandlerFactory;
     private final HttpVersionPolicy versionPolicy;
-    private final ByteBuffer bytebuf;
+    private final BufferedData inBuf;
     private final AtomicReference<HttpConnectionEventHandler> protocolHandlerRef;
 
     private volatile boolean expectValidH2Preface;
@@ -85,7 +86,7 @@ public class ServerHttpProtocolNegotiator implements HttpConnectionEventHandler
         this.http1StreamHandlerFactory = Args.notNull(http1StreamHandlerFactory, "HTTP/1.1 stream handler factory");
         this.http2StreamHandlerFactory = Args.notNull(http2StreamHandlerFactory, "HTTP/2 stream handler factory");
         this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
-        this.bytebuf = ByteBuffer.allocate(1024);
+        this.inBuf = BufferedData.allocate(1024);
         this.protocolHandlerRef = new AtomicReference<>(null);
     }
 
@@ -124,19 +125,21 @@ public class ServerHttpProtocolNegotiator implements HttpConnectionEventHandler
     @Override
     public void inputReady(final IOSession session, final ByteBuffer src) {
         try {
+            if (src != null) {
+                inBuf.put(src);
+            }
             boolean endOfStream = false;
-            if (bytebuf.position() < PREFACE.length) {
-                final int bytesRead = session.read(bytebuf);
+            if (inBuf.length() < PREFACE.length) {
+                final int bytesRead = inBuf.readFrom(session);
                 if (bytesRead == -1) {
                     endOfStream = true;
                 }
             }
-            if (bytebuf.position() >= PREFACE.length) {
-                bytebuf.flip();
-
+            final ByteBuffer data = inBuf.data();
+            if (data.remaining() >= PREFACE.length) {
                 boolean validH2Preface = true;
                 for (int i = 0; i < PREFACE.length; i++) {
-                    if (bytebuf.get() != PREFACE[i]) {
+                    if (data.get() != PREFACE[i]) {
                         if (expectValidH2Preface) {
                             throw new HttpException("Unexpected HTTP/2 preface");
                         }
@@ -149,7 +152,7 @@ public class ServerHttpProtocolNegotiator implements HttpConnectionEventHandler
                     ioSession.upgrade(protocolHandler);
                     protocolHandlerRef.set(protocolHandler);
                     http2StreamHandler.onConnect();
-                    http2StreamHandler.onInput(bytebuf.hasRemaining() ? bytebuf : null);
+                    http2StreamHandler.onInput(data.hasRemaining() ? data : null);
                 } else {
                     final TlsDetails tlsDetails = ioSession.getTlsDetails();
                     final ServerHttp1StreamDuplexer http1StreamHandler = http1StreamHandlerFactory.create(
@@ -158,15 +161,16 @@ public class ServerHttpProtocolNegotiator implements HttpConnectionEventHandler
                     final HttpConnectionEventHandler protocolHandler = new ServerHttp1IOEventHandler(http1StreamHandler);
                     ioSession.upgrade(protocolHandler);
                     protocolHandlerRef.set(protocolHandler);
-                    bytebuf.rewind();
+                    data.rewind();
                     http1StreamHandler.onConnect();
-                    http1StreamHandler.onInput(bytebuf);
+                    http1StreamHandler.onInput(data);
                 }
             } else {
                 if (endOfStream) {
                     throw new ConnectionClosedException();
                 }
             }
+            data.clear();
         } catch (final Exception ex) {
             exception(session, ex);
         }
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/BufferedData.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/BufferedData.java
index 8b37751..214d3ab 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/BufferedData.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/BufferedData.java
@@ -27,7 +27,10 @@
 
 package org.apache.hc.core5.http.impl.nio;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
 
 import org.apache.hc.core5.util.Args;
 
@@ -77,6 +80,23 @@ public class BufferedData extends ExpandableBuffer {
         buffer().put(src);
     }
 
+    public final int readFrom(final ReadableByteChannel channel) throws IOException {
+        Args.notNull(channel, "Channel");
+        setInputMode();
+        if (!buffer().hasRemaining()) {
+            expand();
+        }
+        return channel.read(buffer());
+    }
+
+    public final int writeTo(final WritableByteChannel dst) throws IOException {
+        if (dst == null) {
+            return 0;
+        }
+        setOutputMode();
+        return dst.write(buffer());
+    }
+
     public final ByteBuffer data() {
         setOutputMode();
         return buffer();
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
index 7197429..a996610 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
@@ -35,7 +35,6 @@ import java.nio.channels.CancelledKeyException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectionKey;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 
 import javax.net.ssl.SSLContext;
@@ -84,13 +83,12 @@ public class SSLIOSession implements IOSession {
     private final SSLSessionVerifier verifier;
     private final Callback<SSLIOSession> connectedCallback;
     private final Callback<SSLIOSession> disconnectedCallback;
-    private final AtomicLong bytesReadCount;
     private final Timeout connectTimeout;
     private final SSLMode sslMode;
     private final AtomicInteger outboundClosedCount;
     private int appEventMask;
 
-    private boolean endOfStream;
+    private volatile boolean endOfStream;
     private volatile int status;
     private volatile boolean initialized;
     private volatile Timeout socketTimeout;
@@ -149,15 +147,9 @@ public class SSLIOSession implements IOSession {
         final int appBufferSize = sslSession.getApplicationBufferSize();
         this.inPlain = SSLManagedBuffer.create(sslBufferMode, appBufferSize);
         this.outboundClosedCount = new AtomicInteger(0);
-        this.bytesReadCount = new AtomicLong(0);
         this.connectTimeout = connectTimeout;
     }
 
-    @Override
-    public String getId() {
-        return session.getId();
-    }
-
     private IOEventHandler ensureHandler() {
         final IOEventHandler handler = session.getHandler();
         Asserts.notNull(handler, "IO event handler");
@@ -168,54 +160,49 @@ public class SSLIOSession implements IOSession {
     public IOEventHandler getHandler() {
         return new IOEventHandler() {
 
-            private void ensureInitialized() throws IOException {
-                if (!isInitialized()) {
-                    initialize();
-                }
-            }
-
             @Override
             public void connected(final IOSession protocolSession) throws IOException {
-                ensureInitialized();
+                if (!initialized) {
+                    initialize();
+                }
             }
 
             @Override
             public void inputReady(final IOSession protocolSession, final ByteBuffer src) throws IOException {
-                ensureInitialized();
-                do {
-                    bytesReadCount.set(0L);
-                    if (isAppInputReady()) {
-                        final IOEventHandler handler = ensureHandler();
-                        handler.inputReady(protocolSession, src);
-                    }
-                    inboundTransport();
-                } while (bytesReadCount.get() > 0);
+                if (!initialized) {
+                    initialize();
+                }
+                receiveEncryptedData();
+                doHandshake();
+                decryptData();
+                updateEventMask();
             }
 
             @Override
             public void outputReady(final IOSession protocolSession) throws IOException {
-                ensureInitialized();
-                if (isAppOutputReady()) {
-                    final IOEventHandler handler = ensureHandler();
-                    handler.outputReady(protocolSession);
+                if (!initialized) {
+                    initialize();
                 }
-                outboundTransport();
+                encryptData();
+                sendEncryptedData();
+                doHandshake();
+                updateEventMask();
             }
 
             @Override
             public void timeout(final IOSession protocolSession, final Timeout timeout) throws IOException {
-                if (isOutboundDone() && !isInboundDone()) {
+                if (sslEngine.isInboundDone() && !sslEngine.isInboundDone()) {
                     // The session failed to terminate cleanly
                     close(CloseMode.IMMEDIATE);
                 }
-                ensureHandler().timeout(protocolSession, timeout);
+                ensureHandler().timeout(SSLIOSession.this, timeout);
             }
 
             @Override
             public void exception(final IOSession protocolSession, final Exception cause) {
                 final IOEventHandler handler = session.getHandler();
                 if (handler != null) {
-                    handler.exception(protocolSession, cause);
+                    handler.exception(SSLIOSession.this, cause);
                 }
             }
 
@@ -223,39 +210,13 @@ public class SSLIOSession implements IOSession {
             public void disconnected(final IOSession protocolSession) {
                 final IOEventHandler handler = session.getHandler();
                 if (handler != null) {
-                    handler.disconnected(protocolSession);
+                    handler.disconnected(SSLIOSession.this);
                 }
             }
 
         };
     }
 
-    @Override
-    public void upgrade(final IOEventHandler handler) {
-        this.session.upgrade(handler);
-    }
-
-    @Override
-    public Lock getLock() {
-        return this.session.getLock();
-    }
-
-    /**
-     * Returns {@code true} is the session has been fully initialized,
-     * {@code false} otherwise.
-     */
-     private boolean isInitialized() {
-        return this.initialized;
-    }
-
-    /**
-     * Initializes the session. This method invokes the {@link
-     * SSLSessionInitializer#initialize(NamedEndpoint, SSLEngine)} callback
-     * if an instance of {@link SSLSessionInitializer} was specified at the construction time.
-     *
-     * @throws SSLException in case of a SSL protocol exception.
-     * @throws IllegalStateException if the session has already been initialized.
-     */
     private void initialize() throws SSLException {
         Asserts.check(!this.initialized, "SSL I/O session already initialized");
 
@@ -292,10 +253,6 @@ public class SSLIOSession implements IOSession {
         }
     }
 
-    public TlsDetails getTlsDetails() {
-        return tlsDetails;
-    }
-
     // A works-around for exception handling craziness in Sun/Oracle's SSLEngine
     // implementation.
     //
@@ -354,16 +311,21 @@ public class SSLIOSession implements IOSession {
             case NEED_WRAP:
                 // Generate outgoing handshake data
 
-                // Acquire buffers
-                final ByteBuffer outEncryptedBuf = this.outEncrypted.acquire();
+                this.session.getLock().lock();
+                try {
+                    // Acquire buffers
+                    final ByteBuffer outEncryptedBuf = this.outEncrypted.acquire();
 
-                // Just wrap an empty buffer because there is no data to write.
-                result = doWrap(EMPTY_BUFFER, outEncryptedBuf);
+                    // Just wrap an empty buffer because there is no data to write.
+                    result = doWrap(EMPTY_BUFFER, outEncryptedBuf);
 
-                if (result.getStatus() != Status.OK || result.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) {
-                    handshaking = false;
+                    if (result.getStatus() != Status.OK || result.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) {
+                        handshaking = false;
+                    }
+                    break;
+                } finally {
+                    this.session.getLock().unlock();
                 }
-                break;
             case NEED_UNWRAP:
                 // Process incoming handshake data
 
@@ -426,93 +388,103 @@ public class SSLIOSession implements IOSession {
     }
 
     private void updateEventMask() {
-        // Graceful session termination
-        if (this.status == ACTIVE
-                && (this.endOfStream || this.sslEngine.isInboundDone())) {
-            this.status = CLOSING;
-        }
-        if (this.status == CLOSING && !this.outEncrypted.hasData()) {
-            this.sslEngine.closeOutbound();
-            this.outboundClosedCount.incrementAndGet();
-        }
-        if (this.status == CLOSING && this.sslEngine.isOutboundDone()
-                && (this.endOfStream || this.sslEngine.isInboundDone())) {
-            this.status = CLOSED;
-        }
-        // Abnormal session termination
-        if (this.status <= CLOSING && this.endOfStream
-                && this.sslEngine.getHandshakeStatus() == HandshakeStatus.NEED_UNWRAP) {
-            this.status = CLOSED;
-        }
-        if (this.status == CLOSED) {
-            this.session.close();
-            if (disconnectedCallback != null) {
-                disconnectedCallback.execute(this);
+        this.session.getLock().lock();
+        try {
+            // Graceful session termination
+            if (this.status == ACTIVE
+                    && (this.endOfStream || this.sslEngine.isInboundDone())) {
+                this.status = CLOSING;
+            }
+            if (this.status == CLOSING && !this.outEncrypted.hasData()) {
+                this.sslEngine.closeOutbound();
+                this.outboundClosedCount.incrementAndGet();
+            }
+            if (this.status == CLOSING && this.sslEngine.isOutboundDone()
+                    && (this.endOfStream || this.sslEngine.isInboundDone())) {
+                this.status = CLOSED;
+            }
+            // Abnormal session termination
+            if (this.status <= CLOSING && this.endOfStream
+                    && this.sslEngine.getHandshakeStatus() == HandshakeStatus.NEED_UNWRAP) {
+                this.status = CLOSED;
+            }
+            if (this.status == CLOSED) {
+                this.session.close();
+                if (disconnectedCallback != null) {
+                    disconnectedCallback.execute(this);
+                }
+                return;
+            }
+            // Need to toggle the event mask for this channel?
+            final int oldMask = this.session.getEventMask();
+            int newMask = oldMask;
+            switch (this.sslEngine.getHandshakeStatus()) {
+                case NEED_WRAP:
+                    newMask = EventMask.READ_WRITE;
+                    break;
+                case NEED_UNWRAP:
+                    newMask = EventMask.READ;
+                    break;
+                case NOT_HANDSHAKING:
+                    newMask = this.appEventMask;
+                    break;
+                case NEED_TASK:
+                    break;
+                case FINISHED:
+                    break;
             }
-            return;
-        }
-        // Need to toggle the event mask for this channel?
-        final int oldMask = this.session.getEventMask();
-        int newMask = oldMask;
-        switch (this.sslEngine.getHandshakeStatus()) {
-        case NEED_WRAP:
-            newMask = EventMask.READ_WRITE;
-            break;
-        case NEED_UNWRAP:
-            newMask = EventMask.READ;
-            break;
-        case NOT_HANDSHAKING:
-            newMask = this.appEventMask;
-            break;
-        case NEED_TASK:
-            break;
-        case FINISHED:
-            break;
-        }
 
-        if (this.endOfStream && !this.inPlain.hasData()) {
-            newMask = newMask & ~EventMask.READ;
-        }
+            if (this.endOfStream && !this.inPlain.hasData()) {
+                newMask = newMask & ~EventMask.READ;
+            }
 
-        // Do we have encrypted data ready to be sent?
-        if (this.outEncrypted.hasData()) {
-            newMask = newMask | EventMask.WRITE;
-        }
+            // Do we have encrypted data ready to be sent?
+            if (this.outEncrypted.hasData()) {
+                newMask = newMask | EventMask.WRITE;
+            }
 
-        // Update the mask if necessary
-        if (oldMask != newMask) {
-            this.session.setEventMask(newMask);
+            // Update the mask if necessary
+            if (oldMask != newMask) {
+                this.session.setEventMask(newMask);
+            }
+        } finally {
+            this.session.getLock().unlock();
         }
     }
 
     private int sendEncryptedData() throws IOException {
-        if (!this.outEncrypted.hasData()) {
-            // If the buffer isn't acquired or is empty, call write() with an empty buffer.
-            // This will ensure that tests performed by write() still take place without
-            // having to acquire and release an empty buffer (e.g. connection closed,
-            // interrupted thread, etc..)
-            return this.session.write(EMPTY_BUFFER);
-        }
+        this.session.getLock().lock();
+        try {
+            if (!this.outEncrypted.hasData()) {
+                // If the buffer isn't acquired or is empty, call write() with an empty buffer.
+                // This will ensure that tests performed by write() still take place without
+                // having to acquire and release an empty buffer (e.g. connection closed,
+                // interrupted thread, etc..)
+                return this.session.write(EMPTY_BUFFER);
+            }
 
-        // Acquire buffer
-        final ByteBuffer outEncryptedBuf = this.outEncrypted.acquire();
+            // Acquire buffer
+            final ByteBuffer outEncryptedBuf = this.outEncrypted.acquire();
 
-        // Perform operation
-        int bytesWritten = 0;
-        if (outEncryptedBuf.position() > 0) {
-            outEncryptedBuf.flip();
-            try {
-                bytesWritten = this.session.write(outEncryptedBuf);
-            } finally {
-                outEncryptedBuf.compact();
+            // Perform operation
+            int bytesWritten = 0;
+            if (outEncryptedBuf.position() > 0) {
+                outEncryptedBuf.flip();
+                try {
+                    bytesWritten = this.session.write(outEncryptedBuf);
+                } finally {
+                    outEncryptedBuf.compact();
+                }
             }
-        }
 
-        // Release if empty
-        if (outEncryptedBuf.position() == 0) {
-            this.outEncrypted.release();
+            // Release if empty
+            if (outEncryptedBuf.position() == 0) {
+                this.outEncrypted.release();
+            }
+            return bytesWritten;
+        } finally {
+            this.session.getLock().unlock();
         }
-        return bytesWritten;
     }
 
     private int receiveEncryptedData() throws IOException {
@@ -536,136 +508,66 @@ public class SSLIOSession implements IOSession {
         return bytesRead;
     }
 
-    private boolean decryptData() throws SSLException {
-        boolean decrypted = false;
-        while (this.inEncrypted.hasData()) {
-            // Get buffers
-            final ByteBuffer inEncryptedBuf = this.inEncrypted.acquire();
-            final ByteBuffer inPlainBuf = this.inPlain.acquire();
-
-            final SSLEngineResult result;
-            // Perform operations
+    private void decryptData() throws IOException {
+        final HandshakeStatus handshakeStatus = sslEngine.getHandshakeStatus();
+        if ((handshakeStatus == HandshakeStatus.NOT_HANDSHAKING || handshakeStatus == HandshakeStatus.FINISHED)
+                && inEncrypted.hasData()) {
+            final ByteBuffer inEncryptedBuf = inEncrypted.acquire();
             inEncryptedBuf.flip();
             try {
-                result = doUnwrap(inEncryptedBuf, inPlainBuf);
-            } finally {
-                inEncryptedBuf.compact();
-            }
-
-            try {
-                if (!inEncryptedBuf.hasRemaining() && result.getHandshakeStatus() == HandshakeStatus.NEED_UNWRAP) {
-                    throw new SSLException("Unable to complete SSL handshake");
-                }
-                final Status status = result.getStatus();
-                if (status == Status.OK) {
-                    decrypted = true;
-                } else {
-                    if (status == Status.BUFFER_UNDERFLOW && this.endOfStream) {
-                        throw new SSLException("Unable to decrypt incoming data due to unexpected end of stream");
+                while (inEncryptedBuf.hasRemaining()) {
+                    final ByteBuffer inPlainBuf = inPlain.acquire();
+                    try {
+                        final SSLEngineResult result = doUnwrap(inEncryptedBuf, inPlainBuf);
+                        if (!inEncryptedBuf.hasRemaining() && result.getHandshakeStatus() == HandshakeStatus.NEED_UNWRAP) {
+                            throw new SSLException("Unable to complete SSL handshake");
+                        }
+                        if (sslEngine.isInboundDone()) {
+                            endOfStream = true;
+                        }
+                        if (inPlainBuf.hasRemaining()) {
+                            inPlainBuf.flip();
+                            try {
+                                ensureHandler().inputReady(this, inPlainBuf.hasRemaining() ? inPlainBuf : null);
+                            } finally {
+                                inPlainBuf.clear();
+                            }
+                        }
+                        if (result.getStatus() != Status.OK) {
+                            if (result.getStatus() == Status.BUFFER_UNDERFLOW && endOfStream) {
+                                throw new SSLException("Unable to decrypt incoming data due to unexpected end of stream");
+                            }
+                            break;
+                        }
+                    } finally {
+                        inPlain.release();
                     }
-                    break;
                 }
             } finally {
+                inEncryptedBuf.compact();
                 // Release inEncrypted if empty
-                if (this.inEncrypted.acquire().position() == 0) {
-                    this.inEncrypted.release();
+                if (inEncryptedBuf.position() == 0) {
+                    inEncrypted.release();
                 }
             }
         }
-        if (this.sslEngine.isInboundDone()) {
-            this.endOfStream = true;
-        }
-        return decrypted;
     }
 
-    /**
-     * Reads encrypted data and returns whether the channel associated with
-     * this session has any decrypted inbound data available for reading.
-     *
-     * @throws IOException in case of an I/O error.
-     */
-    private boolean isAppInputReady() throws IOException {
+    private void encryptData() throws IOException {
+        final boolean appReady;
         this.session.getLock().lock();
         try {
-            do {
-                receiveEncryptedData();
-                doHandshake();
-                final HandshakeStatus status = this.sslEngine.getHandshakeStatus();
-                if (status == HandshakeStatus.NOT_HANDSHAKING || status == HandshakeStatus.FINISHED) {
-                    decryptData();
-                }
-            } while (this.sslEngine.getHandshakeStatus() == HandshakeStatus.NEED_TASK);
-            // Some decrypted data is available or at the end of stream
-            return this.inPlain.hasData() || (this.endOfStream && this.status == ACTIVE);
-        } finally {
-            this.session.getLock().unlock();
-        }
-    }
-
-    /**
-     * Returns whether the channel associated with this session is ready to
-     * accept outbound unecrypted data for writing.
-     *
-     * @throws IOException - not thrown currently
-     */
-    private boolean isAppOutputReady() throws IOException {
-        this.session.getLock().lock();
-        try {
-            return (this.appEventMask & SelectionKey.OP_WRITE) > 0
+            appReady = (this.appEventMask & SelectionKey.OP_WRITE) > 0
                     && this.status == ACTIVE
                     && this.sslEngine.getHandshakeStatus() == HandshakeStatus.NOT_HANDSHAKING;
         } finally {
             this.session.getLock().unlock();
         }
-    }
-
-    /**
-     * Executes inbound SSL transport operations.
-     *
-     * @throws IOException - not thrown currently
-     */
-    private void inboundTransport() throws IOException {
-        this.session.getLock().lock();
-        try {
-            updateEventMask();
-        } finally {
-            this.session.getLock().unlock();
-        }
-    }
-
-    /**
-     * Sends encrypted data and executes outbound SSL transport operations.
-     *
-     * @throws IOException in case of an I/O error.
-     */
-    private void outboundTransport() throws IOException {
-        this.session.getLock().lock();
-        try {
-            if (!this.session.isOpen()) {
-                return;
-            }
-            sendEncryptedData();
-            doHandshake();
-            updateEventMask();
-        } finally {
-            this.session.getLock().unlock();
+        if (appReady) {
+            ensureHandler().outputReady(this);
         }
     }
 
-    /**
-     * Returns whether the session will produce any more inbound data.
-     */
-    private boolean isInboundDone() {
-        return this.sslEngine.isInboundDone();
-    }
-
-    /**
-     * Returns whether the session will accept any more outbound data.
-     */
-    private boolean isOutboundDone() {
-        return this.sslEngine.isOutboundDone();
-    }
-
     @Override
     public int write(final ByteBuffer src) throws IOException {
         Args.notNull(src, "Byte buffer");
@@ -679,9 +581,6 @@ public class SSLIOSession implements IOSession {
             }
             final ByteBuffer outEncryptedBuf = this.outEncrypted.acquire();
             final SSLEngineResult result = doWrap(src, outEncryptedBuf);
-            if (result.getStatus() == Status.CLOSED) {
-                this.status = CLOSED;
-            }
             return result.bytesConsumed();
         } finally {
             this.session.getLock().unlock();
@@ -690,38 +589,26 @@ public class SSLIOSession implements IOSession {
 
     @Override
     public int read(final ByteBuffer dst) {
-        Args.notNull(dst, "Byte buffer");
-        this.session.getLock().lock();
-        try {
-            if (!this.initialized) {
-                return 0;
-            }
-            if (this.inPlain.hasData()) {
-                // Acquire buffer
-                final ByteBuffer inPlainBuf = this.inPlain.acquire();
+        return endOfStream ? -1 : 0;
+    }
 
-                // Perform operations
-                inPlainBuf.flip();
-                final int n = Math.min(inPlainBuf.remaining(), dst.remaining());
-                for (int i = 0; i < n; i++) {
-                    dst.put(inPlainBuf.get());
-                }
-                inPlainBuf.compact();
+    @Override
+    public String getId() {
+        return session.getId();
+    }
 
-                // Release if empty
-                if (inPlainBuf.position() == 0) {
-                    this.inPlain.release();
-                }
-                bytesReadCount.addAndGet(n);
-                return n;
-            }
-            if (this.endOfStream) {
-                return -1;
-            }
-            return 0;
-        } finally {
-            this.session.getLock().unlock();
-        }
+    @Override
+    public Lock getLock() {
+        return this.session.getLock();
+    }
+
+    @Override
+    public void upgrade(final IOEventHandler handler) {
+        this.session.upgrade(handler);
+    }
+
+    public TlsDetails getTlsDetails() {
+        return tlsDetails;
     }
 
     @Override
@@ -870,14 +757,8 @@ public class SSLIOSession implements IOSession {
     @Override
     public void setSocketTimeout(final Timeout timeout) {
         this.socketTimeout = timeout;
-
-        this.session.getLock().lock();
-        try {
-            if (this.sslEngine.getHandshakeStatus() == HandshakeStatus.FINISHED) {
-                this.session.setSocketTimeout(timeout);
-            }
-        } finally {
-            this.session.getLock().unlock();
+        if (this.sslEngine.getHandshakeStatus() == HandshakeStatus.FINISHED) {
+            this.session.setSocketTimeout(timeout);
         }
     }
 


[httpcomponents-core] 03/03: Simplification of HTTP/1.1 read event handling logic; better fix for HTTPCORE-599

Posted by ol...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

olegk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git

commit dac54708766ece837f2755b7928ef83e12f9e384
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Tue Sep 24 19:56:46 2019 +0200

    Simplification of HTTP/1.1 read event handling logic; better fix for HTTPCORE-599
---
 .../http/impl/nio/AbstractHttp1StreamDuplexer.java | 100 ++++++++++-----------
 .../org/apache/hc/core5/reactor/IOSessionImpl.java |   3 -
 2 files changed, 48 insertions(+), 55 deletions(-)

diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
index 6860bd3..4fe2a88 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
@@ -238,61 +238,51 @@ abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
         if (src != null) {
             inbuf.put(src);
         }
-        while (connState.compareTo(ConnectionState.SHUTDOWN) < 0) {
-            int totalBytesRead = 0;
-            int messagesReceived = 0;
+
+        if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inbuf.hasData() && inputIdle()) {
+            ioSession.clearEvent(SelectionKey.OP_READ);
+            return;
+        }
+
+        boolean endOfStream = false;
+        if (incomingMessage == null) {
+            final int bytesRead = inbuf.fill(ioSession);
+            if (bytesRead > 0) {
+                inTransportMetrics.incrementBytesTransferred(bytesRead);
+            }
+            endOfStream = bytesRead == -1;
+        }
+
+        do {
             if (incomingMessage == null) {
 
-                if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inputIdle()) {
-                    ioSession.clearEvent(SelectionKey.OP_READ);
-                    return;
-                }
+                final IncomingMessage messageHead = incomingMessageParser.parse(inbuf, endOfStream);
+                if (messageHead != null) {
+                    incomingMessageParser.reset();
+
+                    this.version = messageHead.getVersion();
 
-                int bytesRead;
-                do {
-                    bytesRead = inbuf.fill(ioSession);
-                    if (bytesRead > 0) {
-                        totalBytesRead += bytesRead;
-                        inTransportMetrics.incrementBytesTransferred(bytesRead);
+                    updateInputMetrics(messageHead, connMetrics);
+                    final ContentDecoder contentDecoder;
+                    if (handleIncomingMessage(messageHead)) {
+                        final long len = incomingContentStrategy.determineLength(messageHead);
+                        contentDecoder = createContentDecoder(len, ioSession, inbuf, inTransportMetrics);
+                        consumeHeader(messageHead, contentDecoder != null ? new IncomingEntityDetails(messageHead, len) : null);
+                    } else {
+                        consumeHeader(messageHead, null);
+                        contentDecoder = null;
                     }
-                    final IncomingMessage messageHead = incomingMessageParser.parse(inbuf, bytesRead == -1);
-                    if (messageHead != null) {
-                        messagesReceived++;
-                        incomingMessageParser.reset();
-
-                        this.version = messageHead.getVersion();
-
-                        updateInputMetrics(messageHead, connMetrics);
-                        final ContentDecoder contentDecoder;
-                        if (handleIncomingMessage(messageHead)) {
-                            final long len = incomingContentStrategy.determineLength(messageHead);
-                            contentDecoder = createContentDecoder(len, ioSession, inbuf, inTransportMetrics);
-                            consumeHeader(messageHead, contentDecoder != null ? new IncomingEntityDetails(messageHead, len) : null);
-                        } else {
-                            consumeHeader(messageHead, null);
-                            contentDecoder = null;
-                        }
-                        capacityWindow = new CapacityWindow(http1Config.getInitialWindowSize(), ioSession);
-                        if (contentDecoder != null) {
-                            incomingMessage = new Message<>(messageHead, contentDecoder);
-                            break;
-                        }
+                    capacityWindow = new CapacityWindow(http1Config.getInitialWindowSize(), ioSession);
+                    if (contentDecoder != null) {
+                        incomingMessage = new Message<>(messageHead, contentDecoder);
+                    } else {
                         inputEnd();
                         if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
                             ioSession.setEvent(SelectionKey.OP_READ);
-                        } else {
-                            break;
                         }
                     }
-                } while (bytesRead > 0);
-
-                if (bytesRead == -1 && !inbuf.hasData()) {
-                    if (outputIdle() && inputIdle()) {
-                        requestShutdown(CloseMode.GRACEFUL);
-                    } else {
-                        shutdownSession(new ConnectionClosedException("Connection closed by peer"));
-                    }
-                    return;
+                } else {
+                    break;
                 }
             }
 
@@ -303,9 +293,8 @@ abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
                 // over its declared capacity in order to avoid having
                 // unprocessed message body content stuck in the session
                 // input buffer
-                int bytesRead;
-                while ((bytesRead = contentDecoder.read(contentBuffer)) > 0) {
-                    totalBytesRead += bytesRead;
+                final int bytesRead = contentDecoder.read(contentBuffer);
+                if (bytesRead > 0) {
                     contentBuffer.flip();
                     consumeData(contentBuffer);
                     contentBuffer.clear();
@@ -314,7 +303,6 @@ abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
                         if (!contentDecoder.isCompleted()) {
                             updateCapacity(capacityWindow);
                         }
-                        break;
                     }
                 }
                 if (contentDecoder.isCompleted()) {
@@ -324,9 +312,17 @@ abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
                     ioSession.setEvent(SelectionKey.OP_READ);
                     inputEnd();
                 }
+                if (bytesRead == 0) {
+                    break;
+                }
             }
-            if (totalBytesRead == 0 && messagesReceived == 0) {
-                break;
+        } while (inbuf.hasData());
+
+        if (endOfStream && !inbuf.hasData()) {
+            if (outputIdle() && inputIdle()) {
+                requestShutdown(CloseMode.GRACEFUL);
+            } else {
+                shutdownSession(new ConnectionClosedException("Connection closed by peer"));
             }
         }
     }
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
index 930acde..6cd742a 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
@@ -196,9 +196,6 @@ class IOSessionImpl implements IOSession {
 
     @Override
     public int read(final ByteBuffer dst) throws IOException {
-        if ((this.key.interestOps() & SelectionKey.OP_READ) == 0) {
-            return 0;
-        }
         return this.channel.read(dst);
     }