You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2019/09/12 14:02:08 UTC
[httpcomponents-core] 01/01: Moved SSL/TLS specific i/o event
handling logic to SSLIOSession
This is an automated email from the ASF dual-hosted git repository.
olegk pushed a commit to branch tls-nio
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git
commit 22aa20e8915b58bb06e59f6bac9a2131f622d5fa
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Thu Sep 12 10:53:48 2019 +0200
Moved SSL/TLS specific i/o event handling logic to SSLIOSession
---
.../testing/nio/LoggingIOSessionListener.java | 21 ----
.../org/apache/hc/core5/reactor/IOSession.java | 14 +++
.../org/apache/hc/core5/reactor/IOSessionImpl.java | 13 +++
.../apache/hc/core5/reactor/IOSessionListener.java | 6 -
.../hc/core5/reactor/InternalDataChannel.java | 130 +++++++--------------
.../apache/hc/core5/reactor/ProtocolIOSession.java | 2 +-
.../org/apache/hc/core5/reactor/ProtocolLayer.java | 44 -------
.../apache/hc/core5/reactor/ssl/SSLIOSession.java | 102 +++++++++++++---
8 files changed, 159 insertions(+), 173 deletions(-)
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSessionListener.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSessionListener.java
index 09aae51..33597b1 100644
--- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSessionListener.java
+++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSessionListener.java
@@ -43,27 +43,6 @@ public class LoggingIOSessionListener implements IOSessionListener {
}
@Override
- public void tlsStarted(final IOSession session) {
- if (connLog.isDebugEnabled()) {
- connLog.debug(session + " TLS session started");
- }
- }
-
- @Override
- public void tlsInbound(final IOSession session) {
- if (connLog.isDebugEnabled()) {
- connLog.debug(session + " TLS inbound");
- }
- }
-
- @Override
- public void tlsOutbound(final IOSession session) {
- if (connLog.isDebugEnabled()) {
- connLog.debug(session + " TLS outbound");
- }
- }
-
- @Override
public void connected(final IOSession session) {
if (connLog.isDebugEnabled()) {
connLog.debug(session + " connected");
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java
index ec8948e..26cfca6 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java
@@ -58,6 +58,20 @@ public interface IOSession extends ByteChannel, SocketModalCloseable, Identifiab
int CLOSED = Integer.MAX_VALUE;
/**
+ * Returns event handler associated with the session.
+ *
+ * @since 5.0
+ */
+ IOEventHandler getHandler();
+
+ /**
+ * Upgrades event handler associated with the session.
+ *
+ * @since 5.0
+ */
+ void upgrade(IOEventHandler handler);
+
+ /**
* Returns session lock that should be used by I/O event handlers
* to synchronize access to the session.
*
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
index 6a5ce53..930acde 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
@@ -38,6 +38,7 @@ import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -55,6 +56,7 @@ class IOSessionImpl implements IOSession {
private final Deque<Command> commandQueue;
private final Lock lock;
private final String id;
+ private final AtomicReference<IOEventHandler> handlerRef;
private final AtomicInteger status;
private volatile Timeout socketTimeout;
@@ -76,6 +78,7 @@ class IOSessionImpl implements IOSession {
this.lock = new ReentrantLock();
this.socketTimeout = Timeout.DISABLED;
this.id = String.format("i/o-%08X", COUNT.getAndIncrement());
+ this.handlerRef = new AtomicReference<>();
this.status = new AtomicInteger(ACTIVE);
final long currentTimeMillis = System.currentTimeMillis();
this.lastReadTime = currentTimeMillis;
@@ -89,6 +92,16 @@ class IOSessionImpl implements IOSession {
}
@Override
+ public IOEventHandler getHandler() {
+ return handlerRef.get();
+ }
+
+ @Override
+ public void upgrade(final IOEventHandler handler) {
+ handlerRef.set(handler);
+ }
+
+ @Override
public Lock getLock() {
return lock;
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionListener.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionListener.java
index ae2bd9f..badf4fa 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionListener.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionListener.java
@@ -34,12 +34,6 @@ package org.apache.hc.core5.reactor;
*/
public interface IOSessionListener {
- void tlsStarted(IOSession session);
-
- void tlsInbound(IOSession session);
-
- void tlsOutbound(IOSession session);
-
void connected(IOSession session);
void inputReady(IOSession session);
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
index f2c0fe3..356202a 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
@@ -58,7 +58,6 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
private final IOSessionListener sessionListener;
private final AtomicReference<SSLIOSession> tlsSessionRef;
private final Queue<InternalDataChannel> closedSessions;
- private final AtomicReference<IOEventHandler> handlerRef;
private final AtomicBoolean connected;
private final AtomicBoolean closed;
@@ -72,7 +71,6 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
this.closedSessions = closedSessions;
this.sessionListener = sessionListener;
this.tlsSessionRef = new AtomicReference<>(null);
- this.handlerRef = new AtomicReference<>(null);
this.connected = new AtomicBoolean(false);
this.closed = new AtomicBoolean(false);
}
@@ -83,22 +81,22 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
}
@Override
- public IOEventHandler getHandler() {
- return handlerRef.get();
+ public NamedEndpoint getInitialEndpoint() {
+ return initialEndpoint;
}
@Override
- public NamedEndpoint getInitialEndpoint() {
- return initialEndpoint;
+ public IOEventHandler getHandler() {
+ return ioSession.getHandler();
}
@Override
public void upgrade(final IOEventHandler handler) {
- handlerRef.set(handler);
+ ioSession.upgrade(handler);
}
- private IOEventHandler ensureHandler() {
- final IOEventHandler handler = handlerRef.get();
+ private IOEventHandler ensureHandler(final IOSession session) {
+ final IOEventHandler handler = session.getHandler();
Asserts.notNull(handler, "IO event handler");
return handler;
}
@@ -106,75 +104,33 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
@Override
void onIOEvent(final int readyOps) throws IOException {
final SSLIOSession tlsSession = tlsSessionRef.get();
- if (tlsSession != null) {
- if (!tlsSession.isInitialized()) {
- tlsSession.initialize();
+ final IOSession currentSession = tlsSession != null ? tlsSession : ioSession;
+ if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
+ currentSession.clearEvent(SelectionKey.OP_CONNECT);
+ if (tlsSession == null && connected.compareAndSet(false, true)) {
if (sessionListener != null) {
- sessionListener.tlsStarted(tlsSession);
- }
- }
- if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
- tlsSession.clearEvent(SelectionKey.OP_CONNECT);
- }
- if ((readyOps & SelectionKey.OP_READ) != 0) {
- ioSession.updateReadTime();
- do {
- tlsSession.resetReadCount();
- if (tlsSession.isAppInputReady()) {
- if (sessionListener != null) {
- sessionListener.inputReady(this);
- }
- final IOEventHandler handler = ensureHandler();
- handler.inputReady(this);
- }
- tlsSession.inboundTransport();
- if (sessionListener != null) {
- sessionListener.tlsInbound(tlsSession);
- }
- } while (tlsSession.getReadCount() > 0);
- }
- if ((readyOps & SelectionKey.OP_WRITE) != 0
- || (ioSession.getEventMask() & SelectionKey.OP_WRITE) != 0) {
- ioSession.updateWriteTime();
- if (tlsSession.isAppOutputReady()) {
- if (sessionListener != null) {
- sessionListener.outputReady(this);
- }
- final IOEventHandler handler = ensureHandler();
- handler.outputReady(this);
- }
- tlsSession.outboundTransport();
- if (sessionListener != null) {
- sessionListener.tlsOutbound(tlsSession);
- }
- }
- } else {
- if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
- ioSession.clearEvent(SelectionKey.OP_CONNECT);
- if (connected.compareAndSet(false, true)) {
- if (sessionListener != null) {
- sessionListener.connected(this);
- }
- final IOEventHandler handler = ensureHandler();
- handler.connected(this);
+ sessionListener.connected(this);
}
+ final IOEventHandler handler = ensureHandler(currentSession);
+ handler.connected(this);
}
- if ((readyOps & SelectionKey.OP_READ) != 0) {
- ioSession.updateReadTime();
- if (sessionListener != null) {
- sessionListener.inputReady(this);
- }
- final IOEventHandler handler = ensureHandler();
- handler.inputReady(this);
+ }
+ if ((readyOps & SelectionKey.OP_READ) != 0) {
+ currentSession.updateReadTime();
+ if (sessionListener != null) {
+ sessionListener.inputReady(this);
}
- if ((readyOps & SelectionKey.OP_WRITE) != 0) {
- ioSession.updateWriteTime();
- if (sessionListener != null) {
- sessionListener.outputReady(this);
- }
- final IOEventHandler handler = ensureHandler();
- handler.outputReady(this);
+ final IOEventHandler handler = ensureHandler(currentSession);
+ handler.inputReady(this);
+ }
+ if ((readyOps & SelectionKey.OP_WRITE) != 0
+ || (ioSession.getEventMask() & SelectionKey.OP_WRITE) != 0) {
+ currentSession.updateWriteTime();
+ if (sessionListener != null) {
+ sessionListener.outputReady(this);
}
+ final IOEventHandler handler = ensureHandler(currentSession);
+ handler.outputReady(this);
}
}
@@ -185,15 +141,13 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
@Override
void onTimeout(final Timeout timeout) throws IOException {
- final IOEventHandler handler = ensureHandler();
- handler.timeout(this, timeout);
- final SSLIOSession tlsSession = tlsSessionRef.get();
- if (tlsSession != null) {
- if (tlsSession.isOutboundDone() && !tlsSession.isInboundDone()) {
- // The session failed to terminate cleanly
- tlsSession.close(CloseMode.IMMEDIATE);
- }
+ if (sessionListener != null) {
+ sessionListener.timeout(this);
}
+ final SSLIOSession tlsSession = tlsSessionRef.get();
+ final IOSession currentSession = tlsSession != null ? tlsSession : ioSession;
+ final IOEventHandler handler = ensureHandler(currentSession);
+ handler.timeout(this, timeout);
}
@Override
@@ -201,7 +155,9 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
if (sessionListener != null) {
sessionListener.exception(this, cause);
}
- final IOEventHandler handler = handlerRef.get();
+ final SSLIOSession tlsSession = tlsSessionRef.get();
+ final IOSession currentSession = tlsSession != null ? tlsSession : ioSession;
+ final IOEventHandler handler = currentSession.getHandler();
if (handler != null) {
handler.exception(this, cause);
}
@@ -211,8 +167,12 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
if (sessionListener != null) {
sessionListener.disconnected(this);
}
- final IOEventHandler handler = ensureHandler();
- handler.disconnected(this);
+ final SSLIOSession tlsSession = tlsSessionRef.get();
+ final IOSession currentSession = tlsSession != null ? tlsSession : ioSession;
+ final IOEventHandler handler = currentSession.getHandler();
+ if (handler != null) {
+ handler.disconnected(this);
+ }
}
@Override
@@ -236,7 +196,7 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
@Override
public void execute(final SSLIOSession sslSession) {
if (connected.compareAndSet(false, true)) {
- final IOEventHandler handler = ensureHandler();
+ final IOEventHandler handler = ensureHandler(ioSession);
try {
if (sessionListener != null) {
sessionListener.connected(InternalDataChannel.this);
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolIOSession.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolIOSession.java
index 2f561d5..35e8e29 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolIOSession.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolIOSession.java
@@ -35,7 +35,7 @@ import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
*
* @since 5.0
*/
-public interface ProtocolIOSession extends IOSession, ProtocolLayer, TransportSecurityLayer {
+public interface ProtocolIOSession extends IOSession, TransportSecurityLayer {
NamedEndpoint getInitialEndpoint();
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolLayer.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolLayer.java
deleted file mode 100644
index 8b5c2df..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ProtocolLayer.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation. For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-
-package org.apache.hc.core5.reactor;
-
-import org.apache.hc.core5.annotation.Internal;
-
-/**
- * Represents application protocol layer.
- *
- * @since 5.0
- */
-@Internal
-public interface ProtocolLayer {
-
- IOEventHandler getHandler();
-
- void upgrade(IOEventHandler handler);
-
-}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
index 6b54fcb..661e5af 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
@@ -53,6 +53,7 @@ import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.net.NamedEndpoint;
import org.apache.hc.core5.reactor.Command;
import org.apache.hc.core5.reactor.EventMask;
+import org.apache.hc.core5.reactor.IOEventHandler;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.Asserts;
@@ -155,6 +156,83 @@ public class SSLIOSession implements IOSession {
return session.getId();
}
+ private IOEventHandler ensureHandler() {
+ final IOEventHandler handler = session.getHandler();
+ Asserts.notNull(handler, "IO event handler");
+ return handler;
+ }
+
+ @Override
+ public IOEventHandler getHandler() {
+ return new IOEventHandler() {
+
+ private void ensureInitialized() throws IOException {
+ if (!isInitialized()) {
+ initialize();
+ }
+ }
+
+ @Override
+ public void connected(final IOSession protocolSession) throws IOException {
+ ensureInitialized();
+ }
+
+ @Override
+ public void inputReady(final IOSession protocolSession) throws IOException {
+ ensureInitialized();
+ do {
+ bytesReadCount.set(0L);
+ if (isAppInputReady()) {
+ final IOEventHandler handler = ensureHandler();
+ handler.inputReady(protocolSession);
+ }
+ inboundTransport();
+ } while (bytesReadCount.get() > 0);
+ }
+
+ @Override
+ public void outputReady(final IOSession protocolSession) throws IOException {
+ ensureInitialized();
+ if (isAppOutputReady()) {
+ final IOEventHandler handler = ensureHandler();
+ handler.outputReady(protocolSession);
+ }
+ outboundTransport();
+ }
+
+ @Override
+ public void timeout(final IOSession protocolSession, final Timeout timeout) throws IOException {
+ if (isOutboundDone() && !isInboundDone()) {
+ // The session failed to terminate cleanly
+ close(CloseMode.IMMEDIATE);
+ }
+ ensureHandler().timeout(protocolSession, timeout);
+ }
+
+ @Override
+ public void exception(final IOSession protocolSession, final Exception cause) {
+ final IOEventHandler handler = session.getHandler();
+ if (handler != null) {
+ handler.exception(protocolSession, cause);
+ }
+ }
+
+ @Override
+ public void disconnected(final IOSession protocolSession) {
+ final IOEventHandler handler = session.getHandler();
+ if (handler != null) {
+ handler.disconnected(protocolSession);
+ }
+ }
+
+ };
+ }
+
+ @Override
+ public void upgrade(final IOEventHandler handler) {
+ this.session.upgrade(handler);
+ }
+
@Override
public Lock getLock() {
return this.session.getLock();
@@ -164,7 +242,7 @@ public class SSLIOSession implements IOSession {
* Returns {@code true} is the session has been fully initialized,
* {@code false} otherwise.
*/
- public boolean isInitialized() {
+ private boolean isInitialized() {
return this.initialized;
}
@@ -176,7 +254,7 @@ public class SSLIOSession implements IOSession {
* @throws SSLException in case of a SSL protocol exception.
* @throws IllegalStateException if the session has already been initialized.
*/
- public void initialize() throws SSLException {
+ private void initialize() throws SSLException {
Asserts.check(!this.initialized, "SSL I/O session already initialized");
// Save the initial socketTimeout of the underlying IOSession, to be restored after the handshake is finished
@@ -495,7 +573,7 @@ public class SSLIOSession implements IOSession {
*
* @throws IOException in case of an I/O error.
*/
- public boolean isAppInputReady() throws IOException {
+ private boolean isAppInputReady() throws IOException {
this.session.getLock().lock();
try {
do {
@@ -519,7 +597,7 @@ public class SSLIOSession implements IOSession {
*
* @throws IOException - not thrown currently
*/
- public boolean isAppOutputReady() throws IOException {
+ private boolean isAppOutputReady() throws IOException {
this.session.getLock().lock();
try {
return (this.appEventMask & SelectionKey.OP_WRITE) > 0
@@ -535,7 +613,7 @@ public class SSLIOSession implements IOSession {
*
* @throws IOException - not thrown currently
*/
- public void inboundTransport() throws IOException {
+ private void inboundTransport() throws IOException {
this.session.getLock().lock();
try {
updateEventMask();
@@ -549,7 +627,7 @@ public class SSLIOSession implements IOSession {
*
* @throws IOException in case of an I/O error.
*/
- public void outboundTransport() throws IOException {
+ private void outboundTransport() throws IOException {
this.session.getLock().lock();
try {
if (!this.session.isOpen()) {
@@ -566,14 +644,14 @@ public class SSLIOSession implements IOSession {
/**
* Returns whether the session will produce any more inbound data.
*/
- public boolean isInboundDone() {
+ private boolean isInboundDone() {
return this.sslEngine.isInboundDone();
}
/**
* Returns whether the session will accept any more outbound data.
*/
- public boolean isOutboundDone() {
+ private boolean isOutboundDone() {
return this.sslEngine.isOutboundDone();
}
@@ -635,14 +713,6 @@ public class SSLIOSession implements IOSession {
}
}
- public void resetReadCount() {
- bytesReadCount.set(0L);
- }
-
- public long getReadCount() {
- return bytesReadCount.get();
- }
-
@Override
public boolean isOpen() {
return this.status == ACTIVE && this.session.isOpen();