You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jo...@apache.org on 2021/08/02 03:40:55 UTC

[mina] 10/15: Improves loop encode/decode functions

This is an automated email from the ASF dual-hosted git repository.

johnnyv pushed a commit to branch bugfix/DIRMINA1132
in repository https://gitbox.apache.org/repos/asf/mina.git

commit 2bd87b6250741ab1d1df161d3a801ffda1641b8e
Author: Jonathan Valliere <jo...@apache.org>
AuthorDate: Sun Jul 25 12:02:46 2021 -0400

    Improves loop encode/decode functions
---
 .../org/apache/mina/filter/ssl2/SSL2HandlerG0.java | 209 +++++++++++++++------
 .../apache/mina/filter/ssl2/SSL2SimpleTest.java    |  22 +--
 2 files changed, 160 insertions(+), 71 deletions(-)

diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2HandlerG0.java b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2HandlerG0.java
index 2f8300d..8f4e8d6 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2HandlerG0.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2HandlerG0.java
@@ -20,10 +20,25 @@ public class SSL2HandlerG0 extends SSL2Handler {
 	static protected final int MAX_UNACK_MESSAGES = 6;
 
 	/**
+	 * Enable aggregation of handshake messages
+	 */
+	static protected final boolean ENABLE_FAST_HANDSHAKE = true;
+
+	/**
+	 * Enable asynchronous tasks
+	 */
+	static protected final boolean ENABLE_ASYNC_TASKS = true;
+
+	/**
 	 * Indicates whether the first handshake was completed
 	 */
 	protected boolean mHandshakeComplete = false;
 
+	/**
+	 * Indicated whether the first handshake was started
+	 */
+	protected boolean mHandshakeStarted = false;
+
 	public SSL2HandlerG0(SSLEngine p, Executor e, IoSession s) {
 		super(p, e, s);
 	}
@@ -48,12 +63,15 @@ public class SSL2HandlerG0 extends SSL2Handler {
 	 * {@inheritDoc}
 	 */
 	synchronized public void open(final NextFilter next) throws SSLException {
-		if (this.mEngine.getUseClientMode()) {
-			if (LOGGER.isDebugEnabled()) {
-				LOGGER.debug("{} open() - begin handshaking", toString());
+		if (this.mHandshakeStarted == false) {
+			this.mHandshakeStarted = true;
+			if (this.mEngine.getUseClientMode()) {
+				if (LOGGER.isDebugEnabled()) {
+					LOGGER.debug("{} open() - begin handshaking", toString());
+				}
+				this.mEngine.beginHandshake();
+				this.qwrite(next);
 			}
-			this.mEngine.beginHandshake();
-			this.lwrite(next);
 		}
 	}
 
@@ -61,13 +79,13 @@ public class SSL2HandlerG0 extends SSL2Handler {
 	 * {@inheritDoc}
 	 */
 	synchronized public void receive(final NextFilter next, final IoBuffer message) throws SSLException {
-		final IoBuffer source = resume_decode_buffer(message);
 		if (LOGGER.isDebugEnabled()) {
-			LOGGER.debug("{} receive() - source {}", toString(), source);
+			LOGGER.debug("{} receive() - message {}", toString(), message);
 		}
+		final IoBuffer source = resume_decode_buffer(message);
 		try {
-			while (lreceive(next, source) && message.hasRemaining()) {
-				// loop until the message is consumed
+			if (source.hasRemaining()) {
+				this.qreceive(next, source);
 			}
 		} finally {
 			save_decode_buffer(source);
@@ -80,14 +98,12 @@ public class SSL2HandlerG0 extends SSL2Handler {
 	 * @param message received data
 	 * @param session user session
 	 * @param next    filter
-	 * @return {@code true} if some of the message was consumed
 	 * @throws SSLException
 	 */
 	@SuppressWarnings("incomplete-switch")
-	protected boolean lreceive(final NextFilter next, final IoBuffer message) throws SSLException {
-
+	protected void qreceive(final NextFilter next, final IoBuffer message) throws SSLException {
 		if (LOGGER.isDebugEnabled()) {
-			LOGGER.debug("{} lreceive() - source {}", toString(), message);
+			LOGGER.debug("{} qreceive() - source {}", toString(), message);
 		}
 
 		final IoBuffer source = message == null ? ZERO : message;
@@ -96,43 +112,60 @@ public class SSL2HandlerG0 extends SSL2Handler {
 		final SSLEngineResult result = mEngine.unwrap(source.buf(), dest.buf());
 
 		if (LOGGER.isDebugEnabled()) {
-			LOGGER.debug("{} lreceive() - bytes-consumed {}, bytes-produced {}, status {}", toString(),
-					result.bytesConsumed(), result.bytesProduced(), result.getStatus());
+			LOGGER.debug("{} qreceive() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}", toString(),
+					result.bytesConsumed(), result.bytesProduced(), result.getStatus(), result.getHandshakeStatus());
 		}
 
+		final boolean success = result.bytesConsumed() != 0;
+
 		if (result.bytesProduced() == 0) {
 			dest.free();
 		} else {
 			dest.flip();
 			if (LOGGER.isDebugEnabled()) {
-				LOGGER.debug("{} lreceive() - result {}", toString(), dest);
+				LOGGER.debug("{} qreceive() - result {}", toString(), dest);
 			}
 			next.messageReceived(this.mSession, dest);
 		}
 
 		switch (result.getHandshakeStatus()) {
+			case NEED_UNWRAP:
+			case NEED_UNWRAP_AGAIN:
+				if (success && source.hasRemaining()) {
+					if (LOGGER.isDebugEnabled()) {
+						LOGGER.debug("{} qreceive() - handshake needs unwrap, looping", toString());
+					}
+					this.qreceive(next, message);
+				}
+				break;
 			case NEED_TASK:
 				if (LOGGER.isDebugEnabled()) {
-					LOGGER.debug("{} lreceive() - handshake needs task, scheduling tasks", toString());
+					LOGGER.debug("{} qreceive() - handshake needs task, scheduling", toString());
 				}
 				this.schedule_task(next);
 				break;
 			case NEED_WRAP:
 				if (LOGGER.isDebugEnabled()) {
-					LOGGER.debug("{} lreceive() - handshake needs to write a new message", toString());
+					LOGGER.debug("{} qreceive() - handshake needs wrap, invoking write", toString());
 				}
-				this.lwrite(next);
+				this.qwrite(next);
 				break;
 			case FINISHED:
 				if (LOGGER.isDebugEnabled()) {
-					LOGGER.debug("{} lreceive() - handshake finished, flushing pending requests", toString());
+					LOGGER.debug("{} qreceive() - handshake finished, flushing queue", toString());
 				}
 				this.lfinish(next);
 				this.lflush(next);
 				break;
+			case NOT_HANDSHAKING:
+				if (success && message.hasRemaining()) {
+					if (LOGGER.isDebugEnabled()) {
+						LOGGER.debug("{} qreceive() - trying to decode more messages, looping", toString());
+					}
+					this.qreceive(next, message);
+				}
+				break;
 		}
-
-		return result.bytesConsumed() > 0;
 	}
 
 	/**
@@ -159,7 +192,7 @@ public class SSL2HandlerG0 extends SSL2Handler {
 		}
 
 		if (this.mEncodeQueue.isEmpty()) {
-			if (lwrite(next, request) == false) {
+			if (qwrite(next, request) == false) {
 				if (LOGGER.isDebugEnabled()) {
 					LOGGER.debug("{} write() - unable to write right now, saving request for later", toString(),
 							request);
@@ -185,9 +218,9 @@ public class SSL2HandlerG0 extends SSL2Handler {
 	 * @throws SSLException
 	 */
 	@SuppressWarnings("incomplete-switch")
-	synchronized protected boolean lwrite(final NextFilter next, final WriteRequest request) throws SSLException {
+	synchronized protected boolean qwrite(final NextFilter next, final WriteRequest request) throws SSLException {
 		if (LOGGER.isDebugEnabled()) {
-			LOGGER.debug("{} lwrite() - source {}", toString(), request);
+			LOGGER.debug("{} qwrite() - source {}", toString(), request);
 		}
 
 		final IoBuffer source = IoBuffer.class.cast(request.getMessage());
@@ -196,7 +229,7 @@ public class SSL2HandlerG0 extends SSL2Handler {
 		final SSLEngineResult result = this.mEngine.wrap(source.buf(), dest.buf());
 
 		if (LOGGER.isDebugEnabled()) {
-			LOGGER.debug("{} lwrite() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}", toString(),
+			LOGGER.debug("{} qwrite() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}", toString(),
 					result.bytesConsumed(), result.bytesProduced(), result.getStatus(), result.getHandshakeStatus());
 		}
 
@@ -206,7 +239,7 @@ public class SSL2HandlerG0 extends SSL2Handler {
 			if (result.bytesConsumed() == 0) {
 				EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null);
 				if (LOGGER.isDebugEnabled()) {
-					LOGGER.debug("{} lwrite() - result {}", toString(), encrypted);
+					LOGGER.debug("{} qwrite() - result {}", toString(), encrypted);
 				}
 				next.filterWrite(this.mSession, encrypted);
 			} else {
@@ -216,11 +249,11 @@ public class SSL2HandlerG0 extends SSL2Handler {
 					EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null);
 					this.mAckQueue.add(encrypted);
 					if (LOGGER.isDebugEnabled()) {
-						LOGGER.debug("{} lwrite() - result {}", toString(), encrypted);
+						LOGGER.debug("{} qwrite() - result {}", toString(), encrypted);
 					}
 					next.filterWrite(this.mSession, encrypted);
 					if (this.mAckQueue.size() < MAX_UNACK_MESSAGES) {
-						return lwrite(next, request); // write additional chunks
+						return qwrite(next, request); // write additional chunks
 					}
 					return false;
 				} else {
@@ -228,7 +261,7 @@ public class SSL2HandlerG0 extends SSL2Handler {
 					EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, request);
 					this.mAckQueue.add(encrypted);
 					if (LOGGER.isDebugEnabled()) {
-						LOGGER.debug("{} lwrite() - result {}", toString(), encrypted);
+						LOGGER.debug("{} qwrite() - result {}", toString(), encrypted);
 					}
 					next.filterWrite(this.mSession, encrypted);
 					return true;
@@ -239,21 +272,21 @@ public class SSL2HandlerG0 extends SSL2Handler {
 		switch (result.getHandshakeStatus()) {
 			case NEED_TASK:
 				if (LOGGER.isDebugEnabled()) {
-					LOGGER.debug("{} lwrite() - handshake needs task, scheduling tasks", toString());
+					LOGGER.debug("{} qwrite() - handshake needs task, scheduling", toString());
 				}
 				this.schedule_task(next);
 				break;
 			case NEED_WRAP:
 				if (LOGGER.isDebugEnabled()) {
-					LOGGER.debug("{} lwrite() - handshake needs to encode a message", toString());
+					LOGGER.debug("{} qwrite() - handshake needs wrap, looping", toString());
 				}
-				return this.lwrite(next, request);
+				return this.qwrite(next, request);
 			case FINISHED:
 				if (LOGGER.isDebugEnabled()) {
-					LOGGER.debug("{} lwrite() - handshake finished, flushing pending requests", toString());
+					LOGGER.debug("{} qwrite() - handshake finished, flushing queue", toString());
 				}
 				this.lfinish(next);
-				if (this.lwrite(next, request)) {
+				if (this.qwrite(next, request)) {
 					this.lflush(next);
 					return true;
 				}
@@ -271,24 +304,64 @@ public class SSL2HandlerG0 extends SSL2Handler {
 	 * @return {@code true} if a message was generated and written
 	 * @throws SSLException
 	 */
-	@SuppressWarnings("incomplete-switch")
-	synchronized protected boolean lwrite(NextFilter next) throws SSLException {
-
+	synchronized protected boolean qwrite(NextFilter next) throws SSLException {
 		if (LOGGER.isDebugEnabled()) {
-			LOGGER.debug("{} lwrite() - internal", toString());
+			LOGGER.debug("{} qwrite() - internal", toString());
 		}
 
 		final IoBuffer source = ZERO;
 		final IoBuffer dest = allocate_encode_buffer(source.remaining());
 
+		return lwrite(next, source, dest);
+	}
+
+	/**
+	 * Attempts to generate a handshake message and write the data to the IoSession.
+	 * <p>
+	 * If FAST_HANDSHAKE is enabled, this method will recursively loop in order to
+	 * combine multiple messages into one buffer.
+	 * 
+	 * @param session
+	 * @param next
+	 * @return {@code true} if a message was generated and written
+	 * @throws SSLException
+	 */
+	@SuppressWarnings("incomplete-switch")
+	protected boolean lwrite(NextFilter next, IoBuffer source, IoBuffer dest) throws SSLException {
 		final SSLEngineResult result = this.mEngine.wrap(source.buf(), dest.buf());
 
 		if (LOGGER.isDebugEnabled()) {
-			LOGGER.debug("{} lwrite() - bytes-consumed {}, bytes-produced {}", toString(), result.bytesConsumed(),
-					result.bytesProduced());
+			LOGGER.debug("{} lwrite() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}", toString(),
+					result.bytesConsumed(), result.bytesProduced(), result.getStatus(), result.getHandshakeStatus());
 		}
 
-		if (result.bytesProduced() == 0) {
+		if (ENABLE_FAST_HANDSHAKE) {
+			/**
+			 * Fast handshaking allows multiple handshake messages to be written to a single
+			 * buffer. This reduces the number of network messages used during the handshake
+			 * process.
+			 * 
+			 * Additional handshake messages are only written if a message was produced in
+			 * the last loop otherwise any additional messages need to be written by
+			 * NEED_WRAP will be handled in the standard routine below which allocates a new
+			 * buffer.
+			 */
+			switch (result.getHandshakeStatus()) {
+				case NEED_WRAP:
+					switch (result.getStatus()) {
+						case OK:
+							if (LOGGER.isDebugEnabled()) {
+								LOGGER.debug("{} lwrite() - handshake needs wrap, fast looping", toString());
+							}
+							return lwrite(next, source, dest);
+					}
+					break;
+			}
+		}
+
+		final boolean success = dest.position() != 0;
+
+		if (success == false) {
 			dest.free();
 		} else {
 			dest.flip();
@@ -300,30 +373,42 @@ public class SSL2HandlerG0 extends SSL2Handler {
 		}
 
 		switch (result.getHandshakeStatus()) {
-			case NEED_TASK:
+			case NEED_UNWRAP:
+			case NEED_UNWRAP_AGAIN:
 				if (LOGGER.isDebugEnabled()) {
-					LOGGER.debug("{} lwrite() - handshake needs task, scheduling tasks", toString());
+					LOGGER.debug("{} lwrite() - handshake needs unwrap, invoking receive", toString());
 				}
-				this.schedule_task(next);
+				this.receive(next, ZERO);
 				break;
 			case NEED_WRAP:
 				if (LOGGER.isDebugEnabled()) {
-					LOGGER.debug("{} lwrite() - handshake needs to encode a message", toString());
+					LOGGER.debug("{} lwrite() - handshake needs wrap, looping", toString());
+				}
+				this.qwrite(next);
+				break;
+			case NEED_TASK:
+				if (LOGGER.isDebugEnabled()) {
+					LOGGER.debug("{} lwrite() - handshake needs task, scheduling", toString());
 				}
-				this.lwrite(next);
+				this.schedule_task(next);
 				break;
 			case FINISHED:
 				if (LOGGER.isDebugEnabled()) {
-					LOGGER.debug("{} lwrite() - handshake finished, flushing pending requests", toString());
+					LOGGER.debug("{} lwrite() - handshake finished, flushing queue", toString());
 				}
 				this.lfinish(next);
 				this.lflush(next);
 				break;
 		}
 
-		return result.bytesProduced() > 0;
+		return success;
 	}
 
+	/**
+	 * Marks the handshake as complete and emits any signals
+	 * 
+	 * @param next
+	 */
 	synchronized protected void lfinish(final NextFilter next) {
 		this.mHandshakeComplete = true;
 		next.event(this.mSession, SslEvent.SECURED);
@@ -348,7 +433,7 @@ public class SSL2HandlerG0 extends SSL2Handler {
 			if (LOGGER.isDebugEnabled()) {
 				LOGGER.debug("{} flush() - {}", toString(), current);
 			}
-			if (lwrite(next, current) == false) {
+			if (qwrite(next, current) == false) {
 				this.mEncodeQueue.addFirst(current);
 				break;
 			}
@@ -367,19 +452,23 @@ public class SSL2HandlerG0 extends SSL2Handler {
 		}
 
 		mEngine.closeOutbound();
-		this.lwrite(next);
+		this.qwrite(next);
 	}
 
 	protected void schedule_task(final NextFilter next) {
-		if (this.mExecutor == null) {
-			this.execute_task(next);
+		if (ENABLE_ASYNC_TASKS) {
+			if (this.mExecutor == null) {
+				this.execute_task(next);
+			} else {
+				this.mExecutor.execute(new Runnable() {
+					@Override
+					public void run() {
+						SSL2HandlerG0.this.execute_task(next);
+					}
+				});
+			}
 		} else {
-			this.mExecutor.execute(new Runnable() {
-				@Override
-				public void run() {
-					SSL2HandlerG0.this.execute_task(next);
-				}
-			});
+			this.execute_task(next);
 		}
 	}
 
@@ -397,7 +486,7 @@ public class SSL2HandlerG0 extends SSL2Handler {
 					LOGGER.debug("{} task() - writing handshake messages", toString());
 				}
 
-				lwrite(next);
+				qwrite(next);
 			} catch (SSLException e) {
 				e.printStackTrace();
 			}
diff --git a/mina-core/src/test/java/org/apache/mina/filter/ssl2/SSL2SimpleTest.java b/mina-core/src/test/java/org/apache/mina/filter/ssl2/SSL2SimpleTest.java
index aef4ead..8896875 100644
--- a/mina-core/src/test/java/org/apache/mina/filter/ssl2/SSL2SimpleTest.java
+++ b/mina-core/src/test/java/org/apache/mina/filter/ssl2/SSL2SimpleTest.java
@@ -2,6 +2,7 @@ package org.apache.mina.filter.ssl2;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.security.KeyManagementException;
 import java.security.KeyStore;
 import java.security.KeyStoreException;
@@ -20,8 +21,6 @@ import org.apache.mina.core.service.IoAcceptor;
 import org.apache.mina.core.service.IoConnector;
 import org.apache.mina.core.service.IoHandlerAdapter;
 import org.apache.mina.core.session.IoSession;
-import org.apache.mina.core.write.DefaultWriteRequest;
-import org.apache.mina.core.write.WriteRequest;
 import org.apache.mina.filter.ssl.SslDIRMINA937Test;
 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
 import org.apache.mina.transport.socket.nio.NioSocketConnector;
@@ -63,23 +62,24 @@ public class SSL2SimpleTest {
 		socket_connector.getFilterChain().addFirst("ssl", filter);
 		socket_connector.setHandler(new DebugFilter());
 
-		final InetSocketAddress server_address = new InetSocketAddress("0.0.0.0", 53301);
-		socket_acceptor.bind(server_address);
+		socket_acceptor.bind(new InetSocketAddress("0.0.0.0", 0));
+
+		final SocketAddress server_address = socket_acceptor.getLocalAddress();
 
 		final IoFuture connect_future = socket_connector.connect(server_address);
 		connect_future.awaitUninterruptibly();
 
 		final IoSession client_socket = connect_future.getSession();
 
-//		try {
-//			Thread.sleep(1000);
-//		} catch (InterruptedException e) {
-//
-//		}
-
 		client_socket.write(createMosaicRequest()).awaitUninterruptibly();
 
-		client_socket.closeNow();
+		try {
+			Thread.sleep(250);
+		} catch (InterruptedException e) {
+			// ignore
+		}
+
+		client_socket.closeNow().awaitUninterruptibly();
 
 		socket_connector.dispose();