You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2019/09/22 14:17:43 UTC

[httpcomponents-core] 02/02: Redesign of SSL/TLS async I/O event handling

This is an automated email from the ASF dual-hosted git repository.

olegk pushed a commit to branch tls-nio
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git

commit d5aacdacb3885a345bdf1f814f580b900a6b908d
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Sat Sep 21 16:34:18 2019 +0200

    Redesign of SSL/TLS async I/O event handling
---
 .../impl/nio/ClientHttpProtocolNegotiator.java     |  16 +
 .../impl/nio/ServerHttpProtocolNegotiator.java     |  26 +-
 .../hc/core5/http/impl/nio/BufferedData.java       |  20 +
 .../apache/hc/core5/reactor/ssl/SSLIOSession.java  | 468 ++++++++-------------
 4 files changed, 224 insertions(+), 306 deletions(-)

diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
index b1da774..22a9b4f 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
@@ -39,6 +39,7 @@ import javax.net.ssl.SSLSession;
 import org.apache.hc.core5.annotation.Internal;
 import org.apache.hc.core5.http.EndpointDetails;
 import org.apache.hc.core5.http.ProtocolVersion;
+import org.apache.hc.core5.http.impl.nio.BufferedData;
 import org.apache.hc.core5.http.impl.nio.ClientHttp1IOEventHandler;
 import org.apache.hc.core5.http.impl.nio.ClientHttp1StreamDuplexer;
 import org.apache.hc.core5.http.impl.nio.ClientHttp1StreamDuplexerFactory;
@@ -77,6 +78,7 @@ public class ClientHttpProtocolNegotiator implements HttpConnectionEventHandler
     private final AtomicReference<HttpConnectionEventHandler> protocolHandlerRef;
 
     private volatile ByteBuffer preface;
+    private volatile BufferedData inBuf;
 
     public ClientHttpProtocolNegotiator(
             final ProtocolIOSession ioSession,
@@ -97,6 +99,10 @@ public class ClientHttpProtocolNegotiator implements HttpConnectionEventHandler
             ioSession.upgrade(protocolHandler);
             protocolHandlerRef.set(protocolHandler);
             protocolHandler.connected(session);
+            if (inBuf != null) {
+                protocolHandler.inputReady(session, inBuf.data());
+                inBuf.clear();
+            }
         } catch (final Exception ex) {
             protocolHandler.exception(session, ex);
             session.close(CloseMode.IMMEDIATE);
@@ -110,6 +116,10 @@ public class ClientHttpProtocolNegotiator implements HttpConnectionEventHandler
             ioSession.upgrade(protocolHandler);
             protocolHandlerRef.set(protocolHandler);
             protocolHandler.connected(session);
+            if (inBuf != null) {
+                protocolHandler.inputReady(session, inBuf.data());
+                inBuf.clear();
+            }
         } catch (final Exception ex) {
             protocolHandler.exception(session, ex);
             session.close(CloseMode.IMMEDIATE);
@@ -154,6 +164,12 @@ public class ClientHttpProtocolNegotiator implements HttpConnectionEventHandler
 
     @Override
     public void inputReady(final IOSession session, final ByteBuffer src) throws IOException  {
+        if (src != null) {
+            if (inBuf == null) {
+                inBuf = BufferedData.allocate(src.remaining());
+            }
+            inBuf.put(src);
+        }
         outputReady(session);
     }
 
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
index 4a85534..174366f 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
@@ -40,6 +40,7 @@ import org.apache.hc.core5.http.EndpointDetails;
 import org.apache.hc.core5.http.HttpException;
 import org.apache.hc.core5.http.ProtocolVersion;
 import org.apache.hc.core5.http.URIScheme;
+import org.apache.hc.core5.http.impl.nio.BufferedData;
 import org.apache.hc.core5.http.impl.nio.HttpConnectionEventHandler;
 import org.apache.hc.core5.http.impl.nio.ServerHttp1IOEventHandler;
 import org.apache.hc.core5.http.impl.nio.ServerHttp1StreamDuplexer;
@@ -71,7 +72,7 @@ public class ServerHttpProtocolNegotiator implements HttpConnectionEventHandler
     private final ServerHttp1StreamDuplexerFactory http1StreamHandlerFactory;
     private final ServerH2StreamMultiplexerFactory http2StreamHandlerFactory;
     private final HttpVersionPolicy versionPolicy;
-    private final ByteBuffer bytebuf;
+    private final BufferedData inBuf;
     private final AtomicReference<HttpConnectionEventHandler> protocolHandlerRef;
 
     private volatile boolean expectValidH2Preface;
@@ -85,7 +86,7 @@ public class ServerHttpProtocolNegotiator implements HttpConnectionEventHandler
         this.http1StreamHandlerFactory = Args.notNull(http1StreamHandlerFactory, "HTTP/1.1 stream handler factory");
         this.http2StreamHandlerFactory = Args.notNull(http2StreamHandlerFactory, "HTTP/2 stream handler factory");
         this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
-        this.bytebuf = ByteBuffer.allocate(1024);
+        this.inBuf = BufferedData.allocate(1024);
         this.protocolHandlerRef = new AtomicReference<>(null);
     }
 
@@ -124,19 +125,21 @@ public class ServerHttpProtocolNegotiator implements HttpConnectionEventHandler
     @Override
     public void inputReady(final IOSession session, final ByteBuffer src) {
         try {
+            if (src != null) {
+                inBuf.put(src);
+            }
             boolean endOfStream = false;
-            if (bytebuf.position() < PREFACE.length) {
-                final int bytesRead = session.read(bytebuf);
+            if (inBuf.length() < PREFACE.length) {
+                final int bytesRead = inBuf.readFrom(session);
                 if (bytesRead == -1) {
                     endOfStream = true;
                 }
             }
-            if (bytebuf.position() >= PREFACE.length) {
-                bytebuf.flip();
-
+            final ByteBuffer data = inBuf.data();
+            if (data.remaining() >= PREFACE.length) {
                 boolean validH2Preface = true;
                 for (int i = 0; i < PREFACE.length; i++) {
-                    if (bytebuf.get() != PREFACE[i]) {
+                    if (data.get() != PREFACE[i]) {
                         if (expectValidH2Preface) {
                             throw new HttpException("Unexpected HTTP/2 preface");
                         }
@@ -149,7 +152,7 @@ public class ServerHttpProtocolNegotiator implements HttpConnectionEventHandler
                     ioSession.upgrade(protocolHandler);
                     protocolHandlerRef.set(protocolHandler);
                     http2StreamHandler.onConnect();
-                    http2StreamHandler.onInput(bytebuf.hasRemaining() ? bytebuf : null);
+                    http2StreamHandler.onInput(data.hasRemaining() ? data : null);
                 } else {
                     final TlsDetails tlsDetails = ioSession.getTlsDetails();
                     final ServerHttp1StreamDuplexer http1StreamHandler = http1StreamHandlerFactory.create(
@@ -158,15 +161,16 @@ public class ServerHttpProtocolNegotiator implements HttpConnectionEventHandler
                     final HttpConnectionEventHandler protocolHandler = new ServerHttp1IOEventHandler(http1StreamHandler);
                     ioSession.upgrade(protocolHandler);
                     protocolHandlerRef.set(protocolHandler);
-                    bytebuf.rewind();
+                    data.rewind();
                     http1StreamHandler.onConnect();
-                    http1StreamHandler.onInput(bytebuf);
+                    http1StreamHandler.onInput(data);
                 }
             } else {
                 if (endOfStream) {
                     throw new ConnectionClosedException();
                 }
             }
+            data.clear();
         } catch (final Exception ex) {
             exception(session, ex);
         }
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/BufferedData.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/BufferedData.java
index 8b37751..214d3ab 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/BufferedData.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/BufferedData.java
@@ -27,7 +27,10 @@
 
 package org.apache.hc.core5.http.impl.nio;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
 
 import org.apache.hc.core5.util.Args;
 
@@ -77,6 +80,23 @@ public class BufferedData extends ExpandableBuffer {
         buffer().put(src);
     }
 
+    public final int readFrom(final ReadableByteChannel channel) throws IOException {
+        Args.notNull(channel, "Channel");
+        setInputMode();
+        if (!buffer().hasRemaining()) {
+            expand();
+        }
+        return channel.read(buffer());
+    }
+
+    public final int writeTo(final WritableByteChannel dst) throws IOException {
+        if (dst == null) {
+            return 0;
+        }
+        setOutputMode();
+        return dst.write(buffer());
+    }
+
     public final ByteBuffer data() {
         setOutputMode();
         return buffer();
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
index 7197429..8d5db39 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
@@ -35,7 +35,6 @@ import java.nio.channels.CancelledKeyException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectionKey;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 
 import javax.net.ssl.SSLContext;
@@ -84,13 +83,12 @@ public class SSLIOSession implements IOSession {
     private final SSLSessionVerifier verifier;
     private final Callback<SSLIOSession> connectedCallback;
     private final Callback<SSLIOSession> disconnectedCallback;
-    private final AtomicLong bytesReadCount;
     private final Timeout connectTimeout;
     private final SSLMode sslMode;
     private final AtomicInteger outboundClosedCount;
     private int appEventMask;
 
-    private boolean endOfStream;
+    private volatile boolean endOfStream;
     private volatile int status;
     private volatile boolean initialized;
     private volatile Timeout socketTimeout;
@@ -149,15 +147,9 @@ public class SSLIOSession implements IOSession {
         final int appBufferSize = sslSession.getApplicationBufferSize();
         this.inPlain = SSLManagedBuffer.create(sslBufferMode, appBufferSize);
         this.outboundClosedCount = new AtomicInteger(0);
-        this.bytesReadCount = new AtomicLong(0);
         this.connectTimeout = connectTimeout;
     }
 
-    @Override
-    public String getId() {
-        return session.getId();
-    }
-
     private IOEventHandler ensureHandler() {
         final IOEventHandler handler = session.getHandler();
         Asserts.notNull(handler, "IO event handler");
@@ -168,54 +160,49 @@ public class SSLIOSession implements IOSession {
     public IOEventHandler getHandler() {
         return new IOEventHandler() {
 
-            private void ensureInitialized() throws IOException {
-                if (!isInitialized()) {
-                    initialize();
-                }
-            }
-
             @Override
             public void connected(final IOSession protocolSession) throws IOException {
-                ensureInitialized();
+                if (!initialized) {
+                    initialize();
+                }
             }
 
             @Override
             public void inputReady(final IOSession protocolSession, final ByteBuffer src) throws IOException {
-                ensureInitialized();
-                do {
-                    bytesReadCount.set(0L);
-                    if (isAppInputReady()) {
-                        final IOEventHandler handler = ensureHandler();
-                        handler.inputReady(protocolSession, src);
-                    }
-                    inboundTransport();
-                } while (bytesReadCount.get() > 0);
+                if (!initialized) {
+                    initialize();
+                }
+                receiveEncryptedData();
+                doHandshake();
+                decryptData();
+                updateEventMask();
             }
 
             @Override
             public void outputReady(final IOSession protocolSession) throws IOException {
-                ensureInitialized();
-                if (isAppOutputReady()) {
-                    final IOEventHandler handler = ensureHandler();
-                    handler.outputReady(protocolSession);
+                if (!initialized) {
+                    initialize();
                 }
-                outboundTransport();
+                encryptData();
+                sendEncryptedData();
+                doHandshake();
+                updateEventMask();
             }
 
             @Override
             public void timeout(final IOSession protocolSession, final Timeout timeout) throws IOException {
-                if (isOutboundDone() && !isInboundDone()) {
+                if (sslEngine.isInboundDone() && !sslEngine.isInboundDone()) {
                     // The session failed to terminate cleanly
                     close(CloseMode.IMMEDIATE);
                 }
-                ensureHandler().timeout(protocolSession, timeout);
+                ensureHandler().timeout(SSLIOSession.this, timeout);
             }
 
             @Override
             public void exception(final IOSession protocolSession, final Exception cause) {
                 final IOEventHandler handler = session.getHandler();
                 if (handler != null) {
-                    handler.exception(protocolSession, cause);
+                    handler.exception(SSLIOSession.this, cause);
                 }
             }
 
@@ -223,39 +210,13 @@ public class SSLIOSession implements IOSession {
             public void disconnected(final IOSession protocolSession) {
                 final IOEventHandler handler = session.getHandler();
                 if (handler != null) {
-                    handler.disconnected(protocolSession);
+                    handler.disconnected(SSLIOSession.this);
                 }
             }
 
         };
     }
 
-    @Override
-    public void upgrade(final IOEventHandler handler) {
-        this.session.upgrade(handler);
-    }
-
-    @Override
-    public Lock getLock() {
-        return this.session.getLock();
-    }
-
-    /**
-     * Returns {@code true} is the session has been fully initialized,
-     * {@code false} otherwise.
-     */
-     private boolean isInitialized() {
-        return this.initialized;
-    }
-
-    /**
-     * Initializes the session. This method invokes the {@link
-     * SSLSessionInitializer#initialize(NamedEndpoint, SSLEngine)} callback
-     * if an instance of {@link SSLSessionInitializer} was specified at the construction time.
-     *
-     * @throws SSLException in case of a SSL protocol exception.
-     * @throws IllegalStateException if the session has already been initialized.
-     */
     private void initialize() throws SSLException {
         Asserts.check(!this.initialized, "SSL I/O session already initialized");
 
@@ -292,10 +253,6 @@ public class SSLIOSession implements IOSession {
         }
     }
 
-    public TlsDetails getTlsDetails() {
-        return tlsDetails;
-    }
-
     // A works-around for exception handling craziness in Sun/Oracle's SSLEngine
     // implementation.
     //
@@ -354,16 +311,21 @@ public class SSLIOSession implements IOSession {
             case NEED_WRAP:
                 // Generate outgoing handshake data
 
-                // Acquire buffers
-                final ByteBuffer outEncryptedBuf = this.outEncrypted.acquire();
+                this.session.getLock().lock();
+                try {
+                    // Acquire buffers
+                    final ByteBuffer outEncryptedBuf = this.outEncrypted.acquire();
 
-                // Just wrap an empty buffer because there is no data to write.
-                result = doWrap(EMPTY_BUFFER, outEncryptedBuf);
+                    // Just wrap an empty buffer because there is no data to write.
+                    result = doWrap(EMPTY_BUFFER, outEncryptedBuf);
 
-                if (result.getStatus() != Status.OK || result.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) {
-                    handshaking = false;
+                    if (result.getStatus() != Status.OK || result.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) {
+                        handshaking = false;
+                    }
+                    break;
+                } finally {
+                    this.session.getLock().unlock();
                 }
-                break;
             case NEED_UNWRAP:
                 // Process incoming handshake data
 
@@ -426,93 +388,103 @@ public class SSLIOSession implements IOSession {
     }
 
     private void updateEventMask() {
-        // Graceful session termination
-        if (this.status == ACTIVE
-                && (this.endOfStream || this.sslEngine.isInboundDone())) {
-            this.status = CLOSING;
-        }
-        if (this.status == CLOSING && !this.outEncrypted.hasData()) {
-            this.sslEngine.closeOutbound();
-            this.outboundClosedCount.incrementAndGet();
-        }
-        if (this.status == CLOSING && this.sslEngine.isOutboundDone()
-                && (this.endOfStream || this.sslEngine.isInboundDone())) {
-            this.status = CLOSED;
-        }
-        // Abnormal session termination
-        if (this.status <= CLOSING && this.endOfStream
-                && this.sslEngine.getHandshakeStatus() == HandshakeStatus.NEED_UNWRAP) {
-            this.status = CLOSED;
-        }
-        if (this.status == CLOSED) {
-            this.session.close();
-            if (disconnectedCallback != null) {
-                disconnectedCallback.execute(this);
+        this.session.getLock().lock();
+        try {
+            // Graceful session termination
+            if (this.status == ACTIVE
+                    && (this.endOfStream || this.sslEngine.isInboundDone())) {
+                this.status = CLOSING;
+            }
+            if (this.status == CLOSING && !this.outEncrypted.hasData()) {
+                this.sslEngine.closeOutbound();
+                this.outboundClosedCount.incrementAndGet();
+            }
+            if (this.status == CLOSING && this.sslEngine.isOutboundDone()
+                    && (this.endOfStream || this.sslEngine.isInboundDone())) {
+                this.status = CLOSED;
+            }
+            // Abnormal session termination
+            if (this.status <= CLOSING && this.endOfStream
+                    && this.sslEngine.getHandshakeStatus() == HandshakeStatus.NEED_UNWRAP) {
+                this.status = CLOSED;
+            }
+            if (this.status == CLOSED) {
+                this.session.close();
+                if (disconnectedCallback != null) {
+                    disconnectedCallback.execute(this);
+                }
+                return;
+            }
+            // Need to toggle the event mask for this channel?
+            final int oldMask = this.session.getEventMask();
+            int newMask = oldMask;
+            switch (this.sslEngine.getHandshakeStatus()) {
+                case NEED_WRAP:
+                    newMask = EventMask.READ_WRITE;
+                    break;
+                case NEED_UNWRAP:
+                    newMask = EventMask.READ;
+                    break;
+                case NOT_HANDSHAKING:
+                    newMask = this.appEventMask;
+                    break;
+                case NEED_TASK:
+                    break;
+                case FINISHED:
+                    break;
             }
-            return;
-        }
-        // Need to toggle the event mask for this channel?
-        final int oldMask = this.session.getEventMask();
-        int newMask = oldMask;
-        switch (this.sslEngine.getHandshakeStatus()) {
-        case NEED_WRAP:
-            newMask = EventMask.READ_WRITE;
-            break;
-        case NEED_UNWRAP:
-            newMask = EventMask.READ;
-            break;
-        case NOT_HANDSHAKING:
-            newMask = this.appEventMask;
-            break;
-        case NEED_TASK:
-            break;
-        case FINISHED:
-            break;
-        }
 
-        if (this.endOfStream && !this.inPlain.hasData()) {
-            newMask = newMask & ~EventMask.READ;
-        }
+            if (this.endOfStream && !this.inPlain.hasData()) {
+                newMask = newMask & ~EventMask.READ;
+            }
 
-        // Do we have encrypted data ready to be sent?
-        if (this.outEncrypted.hasData()) {
-            newMask = newMask | EventMask.WRITE;
-        }
+            // Do we have encrypted data ready to be sent?
+            if (this.outEncrypted.hasData()) {
+                newMask = newMask | EventMask.WRITE;
+            }
 
-        // Update the mask if necessary
-        if (oldMask != newMask) {
-            this.session.setEventMask(newMask);
+            // Update the mask if necessary
+            if (oldMask != newMask) {
+                this.session.setEventMask(newMask);
+            }
+        } finally {
+            this.session.getLock().unlock();
         }
     }
 
     private int sendEncryptedData() throws IOException {
-        if (!this.outEncrypted.hasData()) {
-            // If the buffer isn't acquired or is empty, call write() with an empty buffer.
-            // This will ensure that tests performed by write() still take place without
-            // having to acquire and release an empty buffer (e.g. connection closed,
-            // interrupted thread, etc..)
-            return this.session.write(EMPTY_BUFFER);
-        }
+        this.session.getLock().lock();
+        try {
+            if (!this.outEncrypted.hasData()) {
+                // If the buffer isn't acquired or is empty, call write() with an empty buffer.
+                // This will ensure that tests performed by write() still take place without
+                // having to acquire and release an empty buffer (e.g. connection closed,
+                // interrupted thread, etc..)
+                return this.session.write(EMPTY_BUFFER);
+            }
 
-        // Acquire buffer
-        final ByteBuffer outEncryptedBuf = this.outEncrypted.acquire();
+            // Acquire buffer
+            final ByteBuffer outEncryptedBuf = this.outEncrypted.acquire();
 
-        // Perform operation
-        int bytesWritten = 0;
-        if (outEncryptedBuf.position() > 0) {
-            outEncryptedBuf.flip();
-            try {
-                bytesWritten = this.session.write(outEncryptedBuf);
-            } finally {
-                outEncryptedBuf.compact();
+            // Perform operation
+            int bytesWritten = 0;
+            if (outEncryptedBuf.position() > 0) {
+                outEncryptedBuf.flip();
+                try {
+                    bytesWritten = this.session.write(outEncryptedBuf);
+                } finally {
+                    outEncryptedBuf.compact();
+                }
             }
-        }
 
-        // Release if empty
-        if (outEncryptedBuf.position() == 0) {
-            this.outEncrypted.release();
+            // Release if empty
+            if (outEncryptedBuf.position() == 0) {
+                this.outEncrypted.release();
+            }
+            return bytesWritten;
+        } finally {
+            this.session.getLock().unlock();
         }
-        return bytesWritten;
     }
 
     private int receiveEncryptedData() throws IOException {
@@ -536,136 +508,63 @@ public class SSLIOSession implements IOSession {
         return bytesRead;
     }
 
-    private boolean decryptData() throws SSLException {
-        boolean decrypted = false;
-        while (this.inEncrypted.hasData()) {
-            // Get buffers
-            final ByteBuffer inEncryptedBuf = this.inEncrypted.acquire();
-            final ByteBuffer inPlainBuf = this.inPlain.acquire();
-
-            final SSLEngineResult result;
-            // Perform operations
+    private void decryptData() throws IOException {
+        final HandshakeStatus handshakeStatus = sslEngine.getHandshakeStatus();
+        if ((handshakeStatus == HandshakeStatus.NOT_HANDSHAKING || handshakeStatus == HandshakeStatus.FINISHED)
+                && inEncrypted.hasData()) {
+            final ByteBuffer inEncryptedBuf = inEncrypted.acquire();
             inEncryptedBuf.flip();
             try {
-                result = doUnwrap(inEncryptedBuf, inPlainBuf);
-            } finally {
-                inEncryptedBuf.compact();
-            }
-
-            try {
-                if (!inEncryptedBuf.hasRemaining() && result.getHandshakeStatus() == HandshakeStatus.NEED_UNWRAP) {
-                    throw new SSLException("Unable to complete SSL handshake");
-                }
-                final Status status = result.getStatus();
-                if (status == Status.OK) {
-                    decrypted = true;
-                } else {
-                    if (status == Status.BUFFER_UNDERFLOW && this.endOfStream) {
-                        throw new SSLException("Unable to decrypt incoming data due to unexpected end of stream");
+                while (inEncryptedBuf.hasRemaining()) {
+                    final ByteBuffer inPlainBuf = inPlain.acquire();
+                    try {
+                        final SSLEngineResult result = doUnwrap(inEncryptedBuf, inPlainBuf);
+                        if (!inEncryptedBuf.hasRemaining() && result.getHandshakeStatus() == HandshakeStatus.NEED_UNWRAP) {
+                            throw new SSLException("Unable to complete SSL handshake");
+                        }
+                        if (inPlainBuf.hasRemaining()) {
+                            inPlainBuf.flip();
+                            try {
+                                ensureHandler().inputReady(this, inPlainBuf);
+                            } finally {
+                                inPlainBuf.clear();
+                            }
+                        }
+                        if (result.getStatus() != Status.OK) {
+                            if (result.getStatus() == Status.BUFFER_UNDERFLOW && endOfStream) {
+                                throw new SSLException("Unable to decrypt incoming data due to unexpected end of stream");
+                            }
+                            break;
+                        }
+                    } finally {
+                        inPlain.release();
                     }
-                    break;
                 }
             } finally {
+                inEncryptedBuf.compact();
                 // Release inEncrypted if empty
-                if (this.inEncrypted.acquire().position() == 0) {
-                    this.inEncrypted.release();
+                if (inEncryptedBuf.position() == 0) {
+                    inEncrypted.release();
                 }
             }
         }
-        if (this.sslEngine.isInboundDone()) {
-            this.endOfStream = true;
-        }
-        return decrypted;
     }
 
-    /**
-     * Reads encrypted data and returns whether the channel associated with
-     * this session has any decrypted inbound data available for reading.
-     *
-     * @throws IOException in case of an I/O error.
-     */
-    private boolean isAppInputReady() throws IOException {
+    private void encryptData() throws IOException {
+        final boolean appReady;
         this.session.getLock().lock();
         try {
-            do {
-                receiveEncryptedData();
-                doHandshake();
-                final HandshakeStatus status = this.sslEngine.getHandshakeStatus();
-                if (status == HandshakeStatus.NOT_HANDSHAKING || status == HandshakeStatus.FINISHED) {
-                    decryptData();
-                }
-            } while (this.sslEngine.getHandshakeStatus() == HandshakeStatus.NEED_TASK);
-            // Some decrypted data is available or at the end of stream
-            return this.inPlain.hasData() || (this.endOfStream && this.status == ACTIVE);
-        } finally {
-            this.session.getLock().unlock();
-        }
-    }
-
-    /**
-     * Returns whether the channel associated with this session is ready to
-     * accept outbound unecrypted data for writing.
-     *
-     * @throws IOException - not thrown currently
-     */
-    private boolean isAppOutputReady() throws IOException {
-        this.session.getLock().lock();
-        try {
-            return (this.appEventMask & SelectionKey.OP_WRITE) > 0
+            appReady = (this.appEventMask & SelectionKey.OP_WRITE) > 0
                     && this.status == ACTIVE
                     && this.sslEngine.getHandshakeStatus() == HandshakeStatus.NOT_HANDSHAKING;
         } finally {
             this.session.getLock().unlock();
         }
-    }
-
-    /**
-     * Executes inbound SSL transport operations.
-     *
-     * @throws IOException - not thrown currently
-     */
-    private void inboundTransport() throws IOException {
-        this.session.getLock().lock();
-        try {
-            updateEventMask();
-        } finally {
-            this.session.getLock().unlock();
-        }
-    }
-
-    /**
-     * Sends encrypted data and executes outbound SSL transport operations.
-     *
-     * @throws IOException in case of an I/O error.
-     */
-    private void outboundTransport() throws IOException {
-        this.session.getLock().lock();
-        try {
-            if (!this.session.isOpen()) {
-                return;
-            }
-            sendEncryptedData();
-            doHandshake();
-            updateEventMask();
-        } finally {
-            this.session.getLock().unlock();
+        if (appReady) {
+            ensureHandler().outputReady(this);
         }
     }
 
-    /**
-     * Returns whether the session will produce any more inbound data.
-     */
-    private boolean isInboundDone() {
-        return this.sslEngine.isInboundDone();
-    }
-
-    /**
-     * Returns whether the session will accept any more outbound data.
-     */
-    private boolean isOutboundDone() {
-        return this.sslEngine.isOutboundDone();
-    }
-
     @Override
     public int write(final ByteBuffer src) throws IOException {
         Args.notNull(src, "Byte buffer");
@@ -679,9 +578,6 @@ public class SSLIOSession implements IOSession {
             }
             final ByteBuffer outEncryptedBuf = this.outEncrypted.acquire();
             final SSLEngineResult result = doWrap(src, outEncryptedBuf);
-            if (result.getStatus() == Status.CLOSED) {
-                this.status = CLOSED;
-            }
             return result.bytesConsumed();
         } finally {
             this.session.getLock().unlock();
@@ -690,38 +586,26 @@ public class SSLIOSession implements IOSession {
 
     @Override
     public int read(final ByteBuffer dst) {
-        Args.notNull(dst, "Byte buffer");
-        this.session.getLock().lock();
-        try {
-            if (!this.initialized) {
-                return 0;
-            }
-            if (this.inPlain.hasData()) {
-                // Acquire buffer
-                final ByteBuffer inPlainBuf = this.inPlain.acquire();
+        return endOfStream || sslEngine.isInboundDone() ? -1 : 0;
+    }
 
-                // Perform operations
-                inPlainBuf.flip();
-                final int n = Math.min(inPlainBuf.remaining(), dst.remaining());
-                for (int i = 0; i < n; i++) {
-                    dst.put(inPlainBuf.get());
-                }
-                inPlainBuf.compact();
+    @Override
+    public String getId() {
+        return session.getId();
+    }
 
-                // Release if empty
-                if (inPlainBuf.position() == 0) {
-                    this.inPlain.release();
-                }
-                bytesReadCount.addAndGet(n);
-                return n;
-            }
-            if (this.endOfStream) {
-                return -1;
-            }
-            return 0;
-        } finally {
-            this.session.getLock().unlock();
-        }
+    @Override
+    public Lock getLock() {
+        return this.session.getLock();
+    }
+
+    @Override
+    public void upgrade(final IOEventHandler handler) {
+        this.session.upgrade(handler);
+    }
+
+    public TlsDetails getTlsDetails() {
+        return tlsDetails;
     }
 
     @Override
@@ -870,14 +754,8 @@ public class SSLIOSession implements IOSession {
     @Override
     public void setSocketTimeout(final Timeout timeout) {
         this.socketTimeout = timeout;
-
-        this.session.getLock().lock();
-        try {
-            if (this.sslEngine.getHandshakeStatus() == HandshakeStatus.FINISHED) {
-                this.session.setSocketTimeout(timeout);
-            }
-        } finally {
-            this.session.getLock().unlock();
+        if (this.sslEngine.getHandshakeStatus() == HandshakeStatus.FINISHED) {
+            this.session.setSocketTimeout(timeout);
         }
     }