You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2019/09/22 10:56:55 UTC

[httpcomponents-core] branch async-io-handling created (now 6dc3c65)

This is an automated email from the ASF dual-hosted git repository.

olegk pushed a change to branch async-io-handling
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git.


      at 6dc3c65  Moved SSL/TLS specific i/o event handling logic to SSLIOSession

This branch includes the following new commits:

     new 6dc3c65  Moved SSL/TLS specific i/o event handling logic to SSLIOSession

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[httpcomponents-core] 01/01: Moved SSL/TLS specific i/o event handling logic to SSLIOSession

Posted by ol...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

olegk pushed a commit to branch async-io-handling
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git

commit 6dc3c65d02a0d3eb66e5a258c71be66a5570b301
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Thu Sep 12 10:53:48 2019 +0200

    Moved SSL/TLS specific i/o event handling logic to SSLIOSession
---
 .../testing/nio/LoggingIOSessionListener.java      |  21 ----
 .../org/apache/hc/core5/reactor/IOSession.java     |  14 +++
 .../org/apache/hc/core5/reactor/IOSessionImpl.java |  13 +++
 .../apache/hc/core5/reactor/IOSessionListener.java |   6 -
 .../hc/core5/reactor/InternalDataChannel.java      | 130 +++++++--------------
 .../apache/hc/core5/reactor/ProtocolIOSession.java |   2 +-
 .../org/apache/hc/core5/reactor/ProtocolLayer.java |  44 -------
 .../apache/hc/core5/reactor/ssl/SSLIOSession.java  | 102 +++++++++++++---
 8 files changed, 159 insertions(+), 173 deletions(-)

diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSessionListener.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSessionListener.java
index 09aae51..33597b1 100644
--- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSessionListener.java
+++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSessionListener.java
@@ -43,27 +43,6 @@ public class LoggingIOSessionListener implements IOSessionListener {
     }
 
     @Override
-    public void tlsStarted(final IOSession session) {
-        if (connLog.isDebugEnabled()) {
-            connLog.debug(session + " TLS session started");
-        }
-    }
-
-    @Override
-    public void tlsInbound(final IOSession session) {
-        if (connLog.isDebugEnabled()) {
-            connLog.debug(session + " TLS inbound");
-        }
-    }
-
-    @Override
-    public void tlsOutbound(final IOSession session) {
-        if (connLog.isDebugEnabled()) {
-            connLog.debug(session + " TLS outbound");
-        }
-    }
-
-    @Override
     public void connected(final IOSession session) {
         if (connLog.isDebugEnabled()) {
             connLog.debug(session + " connected");
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java
index ec8948e..26cfca6 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java
@@ -58,6 +58,20 @@ public interface IOSession extends ByteChannel, SocketModalCloseable, Identifiab
     int CLOSED       = Integer.MAX_VALUE;
 
     /**
+     * Returns event handler associated with the session.
+     *
+     * @since 5.0
+     */
+    IOEventHandler getHandler();
+
+    /**
+     * Upgrades event handler associated with the session.
+     *
+     * @since 5.0
+     */
+    void upgrade(IOEventHandler handler);
+
+    /**
      * Returns session lock that should be used by I/O event handlers
      * to synchronize access to the session.
      *
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
index 6a5ce53..930acde 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
@@ -38,6 +38,7 @@ import java.util.Deque;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -55,6 +56,7 @@ class IOSessionImpl implements IOSession {
     private final Deque<Command> commandQueue;
     private final Lock lock;
     private final String id;
+    private final AtomicReference<IOEventHandler> handlerRef;
     private final AtomicInteger status;
 
     private volatile Timeout socketTimeout;
@@ -76,6 +78,7 @@ class IOSessionImpl implements IOSession {
         this.lock = new ReentrantLock();
         this.socketTimeout = Timeout.DISABLED;
         this.id = String.format("i/o-%08X", COUNT.getAndIncrement());
+        this.handlerRef = new AtomicReference<>();
         this.status = new AtomicInteger(ACTIVE);
         final long currentTimeMillis = System.currentTimeMillis();
         this.lastReadTime = currentTimeMillis;
@@ -89,6 +92,16 @@ class IOSessionImpl implements IOSession {
     }
 
     @Override
+    public IOEventHandler getHandler() {
+        return handlerRef.get();
+    }
+
+    @Override
+    public void upgrade(final IOEventHandler handler) {
+        handlerRef.set(handler);
+    }
+
+    @Override
     public Lock getLock() {
         return lock;
     }
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionListener.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionListener.java
index ae2bd9f..badf4fa 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionListener.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionListener.java
@@ -34,12 +34,6 @@ package org.apache.hc.core5.reactor;
  */
 public interface IOSessionListener {
 
-    void tlsStarted(IOSession session);
-
-    void tlsInbound(IOSession session);
-
-    void tlsOutbound(IOSession session);
-
     void connected(IOSession session);
 
     void inputReady(IOSession session);
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
index f2c0fe3..356202a 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
@@ -58,7 +58,6 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
     private final IOSessionListener sessionListener;
     private final AtomicReference<SSLIOSession> tlsSessionRef;
     private final Queue<InternalDataChannel> closedSessions;
-    private final AtomicReference<IOEventHandler> handlerRef;
     private final AtomicBoolean connected;
     private final AtomicBoolean closed;
 
@@ -72,7 +71,6 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
         this.closedSessions = closedSessions;
         this.sessionListener = sessionListener;
         this.tlsSessionRef = new AtomicReference<>(null);
-        this.handlerRef = new AtomicReference<>(null);
         this.connected = new AtomicBoolean(false);
         this.closed = new AtomicBoolean(false);
     }
@@ -83,22 +81,22 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
     }
 
     @Override
-    public IOEventHandler getHandler() {
-        return handlerRef.get();
+    public NamedEndpoint getInitialEndpoint() {
+        return initialEndpoint;
     }
 
     @Override
-    public NamedEndpoint getInitialEndpoint() {
-        return initialEndpoint;
+    public IOEventHandler getHandler() {
+        return ioSession.getHandler();
     }
 
     @Override
     public void upgrade(final IOEventHandler handler) {
-        handlerRef.set(handler);
+        ioSession.upgrade(handler);
     }
 
-    private IOEventHandler ensureHandler() {
-        final IOEventHandler handler = handlerRef.get();
+    private IOEventHandler ensureHandler(final IOSession session) {
+        final IOEventHandler handler = session.getHandler();
         Asserts.notNull(handler, "IO event handler");
         return handler;
     }
@@ -106,75 +104,33 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
     @Override
     void onIOEvent(final int readyOps) throws IOException {
         final SSLIOSession tlsSession = tlsSessionRef.get();
-        if (tlsSession != null) {
-            if (!tlsSession.isInitialized()) {
-                tlsSession.initialize();
+        final IOSession currentSession = tlsSession != null ? tlsSession : ioSession;
+        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
+            currentSession.clearEvent(SelectionKey.OP_CONNECT);
+            if (tlsSession == null && connected.compareAndSet(false, true)) {
                 if (sessionListener != null) {
-                    sessionListener.tlsStarted(tlsSession);
-                }
-            }
-            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
-                tlsSession.clearEvent(SelectionKey.OP_CONNECT);
-            }
-            if ((readyOps & SelectionKey.OP_READ) != 0) {
-                ioSession.updateReadTime();
-                do {
-                    tlsSession.resetReadCount();
-                    if (tlsSession.isAppInputReady()) {
-                        if (sessionListener != null) {
-                            sessionListener.inputReady(this);
-                        }
-                        final IOEventHandler handler = ensureHandler();
-                        handler.inputReady(this);
-                    }
-                    tlsSession.inboundTransport();
-                    if (sessionListener != null) {
-                        sessionListener.tlsInbound(tlsSession);
-                    }
-                } while (tlsSession.getReadCount() > 0);
-            }
-            if ((readyOps & SelectionKey.OP_WRITE) != 0
-                    || (ioSession.getEventMask() & SelectionKey.OP_WRITE) != 0) {
-                ioSession.updateWriteTime();
-                if (tlsSession.isAppOutputReady()) {
-                    if (sessionListener != null) {
-                        sessionListener.outputReady(this);
-                    }
-                    final IOEventHandler handler = ensureHandler();
-                    handler.outputReady(this);
-                }
-                tlsSession.outboundTransport();
-                if (sessionListener != null) {
-                    sessionListener.tlsOutbound(tlsSession);
-                }
-            }
-        } else {
-            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
-                ioSession.clearEvent(SelectionKey.OP_CONNECT);
-                if (connected.compareAndSet(false, true)) {
-                    if (sessionListener != null) {
-                        sessionListener.connected(this);
-                    }
-                    final IOEventHandler handler = ensureHandler();
-                    handler.connected(this);
+                    sessionListener.connected(this);
                 }
+                final IOEventHandler handler = ensureHandler(currentSession);
+                handler.connected(this);
             }
-            if ((readyOps & SelectionKey.OP_READ) != 0) {
-                ioSession.updateReadTime();
-                if (sessionListener != null) {
-                    sessionListener.inputReady(this);
-                }
-                final IOEventHandler handler = ensureHandler();
-                handler.inputReady(this);
+        }
+        if ((readyOps & SelectionKey.OP_READ) != 0) {
+            currentSession.updateReadTime();
+            if (sessionListener != null) {
+                sessionListener.inputReady(this);
             }
-            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
-                ioSession.updateWriteTime();
-                if (sessionListener != null) {
-                    sessionListener.outputReady(this);
-                }
-                final IOEventHandler handler = ensureHandler();
-                handler.outputReady(this);
+            final IOEventHandler handler = ensureHandler(currentSession);
+            handler.inputReady(this);
+        }
+        if ((readyOps & SelectionKey.OP_WRITE) != 0
+                || (ioSession.getEventMask() & SelectionKey.OP_WRITE) != 0) {
+            currentSession.updateWriteTime();
+            if (sessionListener != null) {
+                sessionListener.outputReady(this);
             }
+            final IOEventHandler handler = ensureHandler(currentSession);
+            handler.outputReady(this);
         }
     }
 
@@ -185,15 +141,13 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
 
     @Override
     void onTimeout(final Timeout timeout) throws IOException {
-        final IOEventHandler handler = ensureHandler();
-        handler.timeout(this, timeout);
-        final SSLIOSession tlsSession = tlsSessionRef.get();
-        if (tlsSession != null) {
-            if (tlsSession.isOutboundDone() && !tlsSession.isInboundDone()) {
-                // The session failed to terminate cleanly
-                tlsSession.close(CloseMode.IMMEDIATE);
-            }
+        if (sessionListener != null) {
+            sessionListener.timeout(this);
         }
+        final SSLIOSession tlsSession = tlsSessionRef.get();
+        final IOSession currentSession = tlsSession != null ? tlsSession : ioSession;
+        final IOEventHandler handler = ensureHandler(currentSession);
+        handler.timeout(this, timeout);
     }
 
     @Override
@@ -201,7 +155,9 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
         if (sessionListener != null) {
             sessionListener.exception(this, cause);
         }
-        final IOEventHandler handler = handlerRef.get();
+        final SSLIOSession tlsSession = tlsSessionRef.get();
+        final IOSession currentSession = tlsSession != null ? tlsSession : ioSession;
+        final IOEventHandler handler = currentSession.getHandler();
         if (handler != null) {
             handler.exception(this, cause);
         }
@@ -211,8 +167,12 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
         if (sessionListener != null) {
             sessionListener.disconnected(this);
         }
-        final IOEventHandler handler = ensureHandler();
-        handler.disconnected(this);
+        final SSLIOSession tlsSession = tlsSessionRef.get();
+        final IOSession currentSession = tlsSession != null ? tlsSession : ioSession;
+        final IOEventHandler handler = currentSession.getHandler();
+        if (handler != null) {
+            handler.disconnected(this);
+        }
     }
 
     @Override
@@ -236,7 +196,7 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
                     @Override
                     public void execute(final SSLIOSession sslSession) {
                         if (connected.compareAndSet(false, true)) {
-                            final IOEventHandler handler = ensureHandler();
+                            final IOEventHandler handler = ensureHandler(ioSession);
                             try {
                                 if (sessionListener != null) {
                                     sessionListener.connected(InternalDataChannel.this);
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolIOSession.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolIOSession.java
index 2f561d5..35e8e29 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolIOSession.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolIOSession.java
@@ -35,7 +35,7 @@ import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
  *
  * @since 5.0
  */
-public interface ProtocolIOSession extends IOSession, ProtocolLayer, TransportSecurityLayer {
+public interface ProtocolIOSession extends IOSession, TransportSecurityLayer {
 
     NamedEndpoint getInitialEndpoint();
 
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolLayer.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolLayer.java
deleted file mode 100644
index 8b5c2df..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolLayer.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation.  For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-
-package org.apache.hc.core5.reactor;
-
-import org.apache.hc.core5.annotation.Internal;
-
-/**
- * Represents application protocol layer.
- *
- * @since 5.0
- */
-@Internal
-public interface ProtocolLayer {
-
-    IOEventHandler getHandler();
-
-    void upgrade(IOEventHandler handler);
-
-}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
index d79582b..5574639 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
@@ -54,6 +54,7 @@ import org.apache.hc.core5.io.CloseMode;
 import org.apache.hc.core5.net.NamedEndpoint;
 import org.apache.hc.core5.reactor.Command;
 import org.apache.hc.core5.reactor.EventMask;
+import org.apache.hc.core5.reactor.IOEventHandler;
 import org.apache.hc.core5.reactor.IOSession;
 import org.apache.hc.core5.util.Args;
 import org.apache.hc.core5.util.Asserts;
@@ -157,6 +158,83 @@ public class SSLIOSession implements IOSession {
         return session.getId();
     }
 
+    private IOEventHandler ensureHandler() {
+        final IOEventHandler handler = session.getHandler();
+        Asserts.notNull(handler, "IO event handler");
+        return handler;
+    }
+
+    @Override
+    public IOEventHandler getHandler() {
+        return new IOEventHandler() {
+
+            private void ensureInitialized() throws IOException {
+                if (!isInitialized()) {
+                    initialize();
+                }
+            }
+
+            @Override
+            public void connected(final IOSession protocolSession) throws IOException {
+                ensureInitialized();
+            }
+
+            @Override
+            public void inputReady(final IOSession protocolSession) throws IOException {
+                ensureInitialized();
+                do {
+                    bytesReadCount.set(0L);
+                    if (isAppInputReady()) {
+                        final IOEventHandler handler = ensureHandler();
+                        handler.inputReady(protocolSession);
+                    }
+                    inboundTransport();
+                } while (bytesReadCount.get() > 0);
+            }
+
+            @Override
+            public void outputReady(final IOSession protocolSession) throws IOException {
+                ensureInitialized();
+                if (isAppOutputReady()) {
+                    final IOEventHandler handler = ensureHandler();
+                    handler.outputReady(protocolSession);
+                }
+                outboundTransport();
+            }
+
+            @Override
+            public void timeout(final IOSession protocolSession, final Timeout timeout) throws IOException {
+                if (isOutboundDone() && !isInboundDone()) {
+                    // The session failed to terminate cleanly
+                    close(CloseMode.IMMEDIATE);
+                }
+                ensureHandler().timeout(protocolSession, timeout);
+            }
+
+            @Override
+            public void exception(final IOSession protocolSession, final Exception cause) {
+                final IOEventHandler handler = session.getHandler();
+                if (handler != null) {
+                    handler.exception(protocolSession, cause);
+                }
+            }
+
+            @Override
+            public void disconnected(final IOSession protocolSession) {
+                final IOEventHandler handler = session.getHandler();
+                if (handler != null) {
+                    handler.disconnected(protocolSession);
+                }
+            }
+
+        };
+    }
+
+    @Override
+    public void upgrade(final IOEventHandler handler) {
+        this.session.upgrade(handler);
+    }
+
     @Override
     public Lock getLock() {
         return this.session.getLock();
@@ -166,7 +244,7 @@ public class SSLIOSession implements IOSession {
      * Returns {@code true} is the session has been fully initialized,
      * {@code false} otherwise.
      */
-    public boolean isInitialized() {
+     private boolean isInitialized() {
         return this.initialized;
     }
 
@@ -178,7 +256,7 @@ public class SSLIOSession implements IOSession {
      * @throws SSLException in case of a SSL protocol exception.
      * @throws IllegalStateException if the session has already been initialized.
      */
-    public void initialize() throws SSLException {
+    private void initialize() throws SSLException {
         Asserts.check(!this.initialized, "SSL I/O session already initialized");
 
         // Save the initial socketTimeout of the underlying IOSession, to be restored after the handshake is finished
@@ -507,7 +585,7 @@ public class SSLIOSession implements IOSession {
      *
      * @throws IOException in case of an I/O error.
      */
-    public boolean isAppInputReady() throws IOException {
+    private boolean isAppInputReady() throws IOException {
         this.session.getLock().lock();
         try {
             do {
@@ -531,7 +609,7 @@ public class SSLIOSession implements IOSession {
      *
      * @throws IOException - not thrown currently
      */
-    public boolean isAppOutputReady() throws IOException {
+    private boolean isAppOutputReady() throws IOException {
         this.session.getLock().lock();
         try {
             return (this.appEventMask & SelectionKey.OP_WRITE) > 0
@@ -547,7 +625,7 @@ public class SSLIOSession implements IOSession {
      *
      * @throws IOException - not thrown currently
      */
-    public void inboundTransport() throws IOException {
+    private void inboundTransport() throws IOException {
         this.session.getLock().lock();
         try {
             updateEventMask();
@@ -561,7 +639,7 @@ public class SSLIOSession implements IOSession {
      *
      * @throws IOException in case of an I/O error.
      */
-    public void outboundTransport() throws IOException {
+    private void outboundTransport() throws IOException {
         this.session.getLock().lock();
         try {
             if (!this.session.isOpen()) {
@@ -578,14 +656,14 @@ public class SSLIOSession implements IOSession {
     /**
      * Returns whether the session will produce any more inbound data.
      */
-    public boolean isInboundDone() {
+    private boolean isInboundDone() {
         return this.sslEngine.isInboundDone();
     }
 
     /**
      * Returns whether the session will accept any more outbound data.
      */
-    public boolean isOutboundDone() {
+    private boolean isOutboundDone() {
         return this.sslEngine.isOutboundDone();
     }
 
@@ -647,14 +725,6 @@ public class SSLIOSession implements IOSession {
         }
     }
 
-    public void resetReadCount() {
-        bytesReadCount.set(0L);
-    }
-
-    public long getReadCount() {
-        return bytesReadCount.get();
-    }
-
     @Override
     public boolean isOpen() {
         return this.status == ACTIVE && this.session.isOpen();