You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jo...@apache.org on 2021/08/02 03:40:53 UTC

[mina] 08/15: Adds SSL2 flow control

This is an automated email from the ASF dual-hosted git repository.

johnnyv pushed a commit to branch bugfix/DIRMINA1132
in repository https://gitbox.apache.org/repos/asf/mina.git

commit d64c8c7fd46cc14cf37aa5505860913925311d17
Author: Jonathan Valliere <jo...@apache.org>
AuthorDate: Sat Jul 24 13:33:57 2021 -0400

    Adds SSL2 flow control
---
 .../mina/filter/ssl2/EncryptedWriteRequest.java    |  23 +---
 .../org/apache/mina/filter/ssl2/SSL2Filter.java    |  83 ++++++++-------
 .../org/apache/mina/filter/ssl2/SSL2Handler.java   |  20 +++-
 .../org/apache/mina/filter/ssl2/SSL2HandlerG0.java | 116 ++++++++++++++-------
 .../apache/mina/filter/ssl2/SSL2SimpleTest.java    |  21 ++--
 5 files changed, 157 insertions(+), 106 deletions(-)

diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl2/EncryptedWriteRequest.java b/mina-core/src/main/java/org/apache/mina/filter/ssl2/EncryptedWriteRequest.java
index 91fabc7..caf32d7 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/ssl2/EncryptedWriteRequest.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/ssl2/EncryptedWriteRequest.java
@@ -1,32 +1,19 @@
 package org.apache.mina.filter.ssl2;
 
-import org.apache.mina.core.future.WriteFuture;
 import org.apache.mina.core.write.DefaultWriteRequest;
 import org.apache.mina.core.write.WriteRequest;
 
 public class EncryptedWriteRequest extends DefaultWriteRequest {
 
 	// The original message
-	private WriteRequest parentRequest;
+	private WriteRequest originalRequest;
 
 	public EncryptedWriteRequest(Object encodedMessage, WriteRequest parent) {
-		super(encodedMessage, null);
+		super(encodedMessage, parent != null ? parent.getFuture() : null);
+		this.originalRequest = parent != null ? parent : this;
 	}
 
-	/**
-	 * {@inheritDoc}
-	 */
-	@Override
-	public boolean isEncoded() {
-		return true;
-	}
-
-	public WriteRequest getParentRequest() {
-		return this.parentRequest;
-	}
-
-	@Override
-	public WriteFuture getFuture() {
-		return (this.getParentRequest() != null) ? this.getParentRequest().getFuture() : super.getFuture();
+	public WriteRequest getOriginalRequest() {
+		return this.originalRequest;
 	}
 }
\ No newline at end of file
diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Filter.java b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Filter.java
index 052f806..80e5688 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Filter.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Filter.java
@@ -19,6 +19,7 @@
  */
 package org.apache.mina.filter.ssl2;
 
+import java.net.InetSocketAddress;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -38,16 +39,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * An SSL filter that encrypts and decrypts the data exchanged in the session.
- * Adding this filter triggers SSL handshake procedure immediately by sending a
- * SSL 'hello' message, so you don't need to call {@link #startSsl(IoSession)}
- * manually unless you are implementing StartTLS (see below). If you don't want
- * the handshake procedure to start immediately, please specify {@code false} as
- * {@code autoStart} parameter in the constructor.
+ * An SSL Filter which simplifies and controls the flow of encrypted information
+ * on the filter-chain.
  * <p>
- * This filter uses an {@link SSLEngine} which was introduced in Java 5, so Java
- * version 5 or above is mandatory to use this filter. And please note that this
- * filter only works for TCP/IP connections.
+ * The initial handshake is automatically enabled for "client" sessions once the
+ * filter is added to the filter-chain and the session is connected.
  *
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
@@ -160,15 +156,6 @@ public class SSL2Filter extends IoFilterAdapter {
 		this.mEnabledProtocols = protocols;
 	}
 
-	/**
-	 * Executed just before the filter is added into the chain, we do :
-	 * <ul>
-	 * <li>check that we don't have a SSL filter already present
-	 * <li>we update the next filter
-	 * <li>we create the SSL handler helper class
-	 * <li>and we store it into the session's Attributes
-	 * </ul>
-	 */
 	@Override
 	public void onPreAdd(IoFilterChain parent, String name, NextFilter next) throws Exception {
 		// Check that we don't have a SSL filter already present in the chain
@@ -184,34 +171,47 @@ public class SSL2Filter extends IoFilterAdapter {
 	}
 
 	@Override
-	public void onPreRemove(IoFilterChain parent, String name, NextFilter next) throws Exception {
+	public void onPostAdd(IoFilterChain parent, String name, NextFilter next) throws Exception {
 		IoSession session = parent.getSession();
-		session.removeAttribute(SSL_HANDLER);
+		if (session.isConnected()) {
+			this.sessionConnected(next, session);
+		}
+		super.onPostAdd(parent, name, next);
 	}
 
 	@Override
-	public void sessionOpened(NextFilter next, IoSession session) throws Exception {
-
-		LOGGER.debug("session openend {}", session);
+	public void onPreRemove(IoFilterChain parent, String name, NextFilter next) throws Exception {
+		IoSession session = parent.getSession();
+		SSL2Handler x = SSL2Handler.class.cast(session.removeAttribute(SSL_HANDLER));
+		if (x != null) {
+			x.close(next);
+		}
+	}
 
+	protected void sessionConnected(NextFilter next, IoSession session) throws Exception {
 		SSL2Handler x = SSL2Handler.class.cast(session.getAttribute(SSL_HANDLER));
 
 		if (x == null) {
-			SSLEngine e = mContext.createSSLEngine();
-
+			InetSocketAddress s = InetSocketAddress.class.cast(session.getRemoteAddress());
+			SSLEngine e = mContext.createSSLEngine(s.getHostString(), s.getPort());
 			e.setNeedClientAuth(mNeedClientAuth);
 			e.setWantClientAuth(mWantClientAuth);
 			e.setEnabledCipherSuites(mEnabledCipherSuites);
 			e.setEnabledProtocols(mEnabledProtocols);
 			e.setUseClientMode(!session.isServer());
-
 			x = new SSL2HandlerG0(e, EXECUTOR, session);
-
 			session.setAttribute(SSL_HANDLER, x);
 		}
 
 		x.open(next);
-		
+	}
+
+	@Override
+	public void sessionOpened(NextFilter next, IoSession session) throws Exception {
+		if (LOGGER.isDebugEnabled())
+			LOGGER.debug("session {} openend", session);
+
+		this.sessionConnected(next, session);
 		super.sessionOpened(next, session);
 	}
 
@@ -222,29 +222,32 @@ public class SSL2Filter extends IoFilterAdapter {
 	}
 
 	@Override
-	public void messageSent(NextFilter next, IoSession session, WriteRequest writeRequest) throws Exception {
-		if (writeRequest instanceof EncryptedWriteRequest) {
-			EncryptedWriteRequest e = EncryptedWriteRequest.class.cast(writeRequest);
+	public void messageSent(NextFilter next, IoSession session, WriteRequest request) throws Exception {
+		if (LOGGER.isDebugEnabled())
+			LOGGER.debug("session {} sent {}", session, request);
+
+		if (request instanceof EncryptedWriteRequest) {
+			EncryptedWriteRequest e = EncryptedWriteRequest.class.cast(request);
 			SSL2Handler x = SSL2Handler.class.cast(session.getAttribute(SSL_HANDLER));
-			x.ack(next, writeRequest);
-			if (e.getParentRequest() != null) {
-				next.messageSent(session, e.getParentRequest());
+			x.ack(next, request);
+			if (e.getOriginalRequest() != e) {
+				next.messageSent(session, e.getOriginalRequest());
 			}
 		} else {
-			super.messageSent(next, session, writeRequest);
+			super.messageSent(next, session, request);
 		}
 	}
 
 	@Override
-	public void filterWrite(NextFilter next, IoSession session, WriteRequest writeRequest) throws Exception {
+	public void filterWrite(NextFilter next, IoSession session, WriteRequest request) throws Exception {
 
-		LOGGER.debug("session write {}", session);
+		LOGGER.debug("session {} write {}", session, request);
 
-		if (writeRequest instanceof EncryptedWriteRequest) {
-			super.filterWrite(next, session, writeRequest);
+		if (request instanceof EncryptedWriteRequest) {
+			super.filterWrite(next, session, request);
 		} else {
 			SSL2Handler x = SSL2Handler.class.cast(session.getAttribute(SSL_HANDLER));
-			x.write(next, writeRequest);
+			x.write(next, request);
 		}
 	}
 }
diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Handler.java b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Handler.java
index 1e8e59b..d8eb1eb 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Handler.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Handler.java
@@ -18,6 +18,21 @@ import org.slf4j.LoggerFactory;
 public abstract class SSL2Handler {
 
 	/**
+	 * Minimum size of encoder buffer in packets
+	 */
+	static protected final int MIN_ENCODER_PACKETS = 2;
+
+	/**
+	 * Maximum size of encoder buffer in packets
+	 */
+	static protected final int MAX_ENCODER_PACKETS = 8;
+
+	/**
+	 * Zero length buffer used to prime the ssl engine
+	 */
+	static protected final IoBuffer ZERO = IoBuffer.allocate(0, true);
+
+	/**
 	 * Static logger
 	 */
 	static protected final Logger LOGGER = LoggerFactory.getLogger(SSL2Handler.class);
@@ -25,7 +40,7 @@ public abstract class SSL2Handler {
 	/**
 	 * Write Requests which are enqueued prior to the completion of the handshaking
 	 */
-	protected final Deque<WriteRequest> mWriteQueue = new ConcurrentLinkedDeque<>();
+	protected final Deque<WriteRequest> mEncodeQueue = new ConcurrentLinkedDeque<>();
 
 	/**
 	 * Requests which have been sent to the socket and waiting acknowledgment
@@ -200,7 +215,8 @@ public abstract class SSL2Handler {
 		SSLSession session = this.mEngine.getHandshakeSession();
 		if (session == null)
 			session = this.mEngine.getSession();
-		int packets = Math.max(2, Math.min(16, 1 + (estimate / session.getApplicationBufferSize())));
+		int packets = Math.max(MIN_ENCODER_PACKETS,
+				Math.min(MAX_ENCODER_PACKETS, 1 + (estimate / session.getApplicationBufferSize())));
 		return IoBuffer.allocate(packets * session.getPacketBufferSize());
 	}
 
diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2HandlerG0.java b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2HandlerG0.java
index 803927f..9961a32 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2HandlerG0.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2HandlerG0.java
@@ -4,7 +4,6 @@ import java.util.concurrent.Executor;
 
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLEngineResult.HandshakeStatus;
 import javax.net.ssl.SSLException;
 
 import org.apache.mina.core.buffer.IoBuffer;
@@ -14,36 +13,42 @@ import org.apache.mina.core.write.WriteRequest;
 
 public class SSL2HandlerG0 extends SSL2Handler {
 
+	/**
+	 * Maximum number of messages waiting acknowledgement
+	 */
+	static protected final int MAX_UNACK_MESSAGES = 6;
+
 	public SSL2HandlerG0(SSLEngine p, Executor e, IoSession s) {
 		super(p, e, s);
 	}
 
+	/**
+	 * {@inheritDoc}
+	 */
 	synchronized public void open(final NextFilter next) throws SSLException {
 		if (this.mEngine.getUseClientMode()) {
-
 			if (LOGGER.isDebugEnabled()) {
 				LOGGER.debug("{} open() - begin handshaking", toString());
 			}
-
 			this.mEngine.beginHandshake();
 			this.lwrite(next);
 		}
 	}
 
+	/**
+	 * {@inheritDoc}
+	 */
 	synchronized public void receive(final NextFilter next, final IoBuffer message) throws SSLException {
-
+		final IoBuffer source = resume_decode_buffer(message);
 		if (LOGGER.isDebugEnabled()) {
-			LOGGER.debug("{} receive() - source {}", toString(), message);
+			LOGGER.debug("{} receive() - source {}", toString(), source);
 		}
-
-		final IoBuffer input = resume_decode_buffer(message);
-
 		try {
-			while (lreceive(next, input) && message.hasRemaining()) {
-				// spin
+			while (lreceive(next, source) && message.hasRemaining()) {
+				// loop until the message is consumed
 			}
 		} finally {
-			save_decode_buffer(input);
+			save_decode_buffer(source);
 		}
 	}
 
@@ -63,7 +68,7 @@ public class SSL2HandlerG0 extends SSL2Handler {
 			LOGGER.debug("{} lreceive() - source {}", toString(), message);
 		}
 
-		final IoBuffer source = message == null ? IoBuffer.allocate(0) : message;
+		final IoBuffer source = message == null ? ZERO : message;
 		final IoBuffer dest = allocate_app_buffer(source.remaining());
 
 		final SSLEngineResult result = mEngine.unwrap(source.buf(), dest.buf());
@@ -77,11 +82,9 @@ public class SSL2HandlerG0 extends SSL2Handler {
 			dest.free();
 		} else {
 			dest.flip();
-
 			if (LOGGER.isDebugEnabled()) {
 				LOGGER.debug("{} lreceive() - result {}", toString(), dest);
 			}
-
 			next.messageReceived(this.mSession, dest);
 		}
 
@@ -109,30 +112,42 @@ public class SSL2HandlerG0 extends SSL2Handler {
 		return result.bytesConsumed() > 0;
 	}
 
+	/**
+	 * {@inheritDoc}
+	 */
 	synchronized public void ack(final NextFilter next, final WriteRequest request) throws SSLException {
-
+		if (this.mAckQueue.remove(request)) {
+			if (LOGGER.isDebugEnabled()) {
+				LOGGER.debug("{} ack() - {}", toString(), request);
+			}
+			if (LOGGER.isDebugEnabled()) {
+				LOGGER.debug("{} ack() - checking to see if any messages can be flushed", toString(), request);
+			}
+			this.lflush(next);
+		}
 	}
 
+	/**
+	 * {@inheritDoc}
+	 */
 	synchronized public void write(final NextFilter next, final WriteRequest request) throws SSLException {
-
 		if (LOGGER.isDebugEnabled()) {
 			LOGGER.debug("{} write() - source {}", toString(), request);
 		}
 
-		if (this.mWriteQueue.isEmpty()) {
+		if (this.mEncodeQueue.isEmpty()) {
 			if (lwrite(next, request) == false) {
 				if (LOGGER.isDebugEnabled()) {
-					LOGGER.debug("{} write() - unable to write right now, saving request for later", toString(), request);
+					LOGGER.debug("{} write() - unable to write right now, saving request for later", toString(),
+							request);
 				}
-
-				this.mWriteQueue.add(request);
+				this.mEncodeQueue.add(request);
 			}
 		} else {
 			if (LOGGER.isDebugEnabled()) {
 				LOGGER.debug("{} write() - unable to write right now, saving request for later", toString(), request);
 			}
-
-			this.mWriteQueue.add(request);
+			this.mEncodeQueue.add(request);
 		}
 	}
 
@@ -142,7 +157,8 @@ public class SSL2HandlerG0 extends SSL2Handler {
 	 * @param request
 	 * @param session
 	 * @param next
-	 * @return {@code true} if the WriteRequest was successfully written
+	 * @return {@code true} if the WriteRequest was fully consumed; otherwise
+	 *         {@code false}
 	 * @throws SSLException
 	 */
 	@SuppressWarnings("incomplete-switch")
@@ -166,19 +182,35 @@ public class SSL2HandlerG0 extends SSL2Handler {
 			dest.free();
 		} else {
 			if (result.bytesConsumed() == 0) {
-				next.filterWrite(this.mSession, new EncryptedWriteRequest(dest, null));
+				EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null);
+				if (LOGGER.isDebugEnabled()) {
+					LOGGER.debug("{} lwrite() - result {}", toString(), encrypted);
+				}
+				next.filterWrite(this.mSession, encrypted);
 			} else {
 				// then we probably consumed some data
 				dest.flip();
 				if (source.hasRemaining()) {
-					next.filterWrite(this.mSession, new EncryptedWriteRequest(dest, null));
-					lwrite(next, request); // write additional chunks
+					EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null);
+					this.mAckQueue.add(encrypted);
+					if (LOGGER.isDebugEnabled()) {
+						LOGGER.debug("{} lwrite() - result {}", toString(), encrypted);
+					}
+					next.filterWrite(this.mSession, encrypted);
+					if (this.mAckQueue.size() < MAX_UNACK_MESSAGES) {
+						return lwrite(next, request); // write additional chunks
+					}
+					return false;
 				} else {
 					source.rewind();
-					next.filterWrite(this.mSession, new EncryptedWriteRequest(dest, request));
+					EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, request);
+					this.mAckQueue.add(encrypted);
+					if (LOGGER.isDebugEnabled()) {
+						LOGGER.debug("{} lwrite() - result {}", toString(), encrypted);
+					}
+					next.filterWrite(this.mSession, encrypted);
+					return true;
 				}
-
-				return true;
 			}
 		}
 
@@ -206,7 +238,6 @@ public class SSL2HandlerG0 extends SSL2Handler {
 		}
 
 		return false;
-
 	}
 
 	/**
@@ -224,7 +255,7 @@ public class SSL2HandlerG0 extends SSL2Handler {
 			LOGGER.debug("{} lwrite() - internal", toString());
 		}
 
-		final IoBuffer source = IoBuffer.allocate(0);
+		final IoBuffer source = ZERO;
 		final IoBuffer dest = allocate_encode_buffer(source.remaining());
 
 		final SSLEngineResult result = this.mEngine.wrap(source.buf(), dest.buf());
@@ -234,15 +265,13 @@ public class SSL2HandlerG0 extends SSL2Handler {
 					result.bytesProduced());
 		}
 
-		if (dest.position() == 0) {
+		if (result.bytesProduced() == 0) {
 			dest.free();
 		} else {
 			dest.flip();
-
 			if (LOGGER.isDebugEnabled()) {
 				LOGGER.debug("{} lwrite() - result {}", toString(), dest);
 			}
-
 			final EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null);
 			next.filterWrite(this.mSession, encrypted);
 		}
@@ -271,8 +300,14 @@ public class SSL2HandlerG0 extends SSL2Handler {
 		return result.bytesProduced() > 0;
 	}
 
+	/**
+	 * Flushes the encode queue
+	 * 
+	 * @param next
+	 * @throws SSLException
+	 */
 	synchronized protected void lflush(final NextFilter next) throws SSLException {
-		if (this.mWriteQueue.isEmpty()) {
+		if (this.mEncodeQueue.isEmpty()) {
 			if (LOGGER.isDebugEnabled()) {
 				LOGGER.debug("{} flush() - no saved messages", toString());
 			}
@@ -280,15 +315,20 @@ public class SSL2HandlerG0 extends SSL2Handler {
 		}
 
 		WriteRequest current = null;
-
-		while ((current = this.mWriteQueue.poll()) != null) {
+		while ((this.mAckQueue.size() < MAX_UNACK_MESSAGES) && (current = this.mEncodeQueue.poll()) != null) {
+			if (LOGGER.isDebugEnabled()) {
+				LOGGER.debug("{} flush() - {}", toString(), current);
+			}
 			if (lwrite(next, current) == false) {
-				this.mWriteQueue.addFirst(current);
+				this.mEncodeQueue.addFirst(current);
 				break;
 			}
 		}
 	}
 
+	/**
+	 * {@inheritDoc}
+	 */
 	synchronized public void close(final NextFilter next) throws SSLException {
 		if (mEngine.isOutboundDone())
 			return;
diff --git a/mina-core/src/test/java/org/apache/mina/filter/ssl2/SSL2SimpleTest.java b/mina-core/src/test/java/org/apache/mina/filter/ssl2/SSL2SimpleTest.java
index beb6577..aef4ead 100644
--- a/mina-core/src/test/java/org/apache/mina/filter/ssl2/SSL2SimpleTest.java
+++ b/mina-core/src/test/java/org/apache/mina/filter/ssl2/SSL2SimpleTest.java
@@ -77,13 +77,7 @@ public class SSL2SimpleTest {
 //
 //		}
 
-		client_socket.write(createWriteRequest());
-
-		try {
-			Thread.sleep(2000);
-		} catch (InterruptedException e) {
-
-		}
+		client_socket.write(createMosaicRequest()).awaitUninterruptibly();
 
 		client_socket.closeNow();
 
@@ -104,7 +98,18 @@ public class SSL2SimpleTest {
 		}
 	}
 
-	public static IoBuffer createWriteRequest() {
+	public static IoBuffer createMosaicRequest() {
+		// HTTP request
+		IoBuffer message = IoBuffer.allocate(100 * 1024);
+		while (message.hasRemaining()) {
+			message.putInt(0xFF332211);
+		}
+		message.flip();
+
+		return message;
+	}
+
+	public static IoBuffer createHttpRequest() {
 		// HTTP request
 		StringBuilder http = new StringBuilder();
 		http.append("GET / HTTP/1.0\r\n");