You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jo...@apache.org on 2021/08/02 03:40:55 UTC
[mina] 10/15: Improves loop encode/decode functions
This is an automated email from the ASF dual-hosted git repository.
johnnyv pushed a commit to branch bugfix/DIRMINA1132
in repository https://gitbox.apache.org/repos/asf/mina.git
commit 2bd87b6250741ab1d1df161d3a801ffda1641b8e
Author: Jonathan Valliere <jo...@apache.org>
AuthorDate: Sun Jul 25 12:02:46 2021 -0400
Improves loop encode/decode functions
---
.../org/apache/mina/filter/ssl2/SSL2HandlerG0.java | 209 +++++++++++++++------
.../apache/mina/filter/ssl2/SSL2SimpleTest.java | 22 +--
2 files changed, 160 insertions(+), 71 deletions(-)
diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2HandlerG0.java b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2HandlerG0.java
index 2f8300d..8f4e8d6 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2HandlerG0.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2HandlerG0.java
@@ -20,10 +20,25 @@ public class SSL2HandlerG0 extends SSL2Handler {
static protected final int MAX_UNACK_MESSAGES = 6;
/**
+ * Enable aggregation of handshake messages
+ */
+ static protected final boolean ENABLE_FAST_HANDSHAKE = true;
+
+ /**
+ * Enable asynchronous tasks
+ */
+ static protected final boolean ENABLE_ASYNC_TASKS = true;
+
+ /**
* Indicates whether the first handshake was completed
*/
protected boolean mHandshakeComplete = false;
+ /**
+ * Indicated whether the first handshake was started
+ */
+ protected boolean mHandshakeStarted = false;
+
public SSL2HandlerG0(SSLEngine p, Executor e, IoSession s) {
super(p, e, s);
}
@@ -48,12 +63,15 @@ public class SSL2HandlerG0 extends SSL2Handler {
* {@inheritDoc}
*/
synchronized public void open(final NextFilter next) throws SSLException {
- if (this.mEngine.getUseClientMode()) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} open() - begin handshaking", toString());
+ if (this.mHandshakeStarted == false) {
+ this.mHandshakeStarted = true;
+ if (this.mEngine.getUseClientMode()) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} open() - begin handshaking", toString());
+ }
+ this.mEngine.beginHandshake();
+ this.qwrite(next);
}
- this.mEngine.beginHandshake();
- this.lwrite(next);
}
}
@@ -61,13 +79,13 @@ public class SSL2HandlerG0 extends SSL2Handler {
* {@inheritDoc}
*/
synchronized public void receive(final NextFilter next, final IoBuffer message) throws SSLException {
- final IoBuffer source = resume_decode_buffer(message);
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} receive() - source {}", toString(), source);
+ LOGGER.debug("{} receive() - message {}", toString(), message);
}
+ final IoBuffer source = resume_decode_buffer(message);
try {
- while (lreceive(next, source) && message.hasRemaining()) {
- // loop until the message is consumed
+ if (source.hasRemaining()) {
+ this.qreceive(next, source);
}
} finally {
save_decode_buffer(source);
@@ -80,14 +98,12 @@ public class SSL2HandlerG0 extends SSL2Handler {
* @param message received data
* @param session user session
* @param next filter
- * @return {@code true} if some of the message was consumed
* @throws SSLException
*/
@SuppressWarnings("incomplete-switch")
- protected boolean lreceive(final NextFilter next, final IoBuffer message) throws SSLException {
-
+ protected void qreceive(final NextFilter next, final IoBuffer message) throws SSLException {
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} lreceive() - source {}", toString(), message);
+ LOGGER.debug("{} qreceive() - source {}", toString(), message);
}
final IoBuffer source = message == null ? ZERO : message;
@@ -96,43 +112,60 @@ public class SSL2HandlerG0 extends SSL2Handler {
final SSLEngineResult result = mEngine.unwrap(source.buf(), dest.buf());
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} lreceive() - bytes-consumed {}, bytes-produced {}, status {}", toString(),
- result.bytesConsumed(), result.bytesProduced(), result.getStatus());
+ LOGGER.debug("{} qreceive() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}", toString(),
+ result.bytesConsumed(), result.bytesProduced(), result.getStatus(), result.getHandshakeStatus());
}
+ final boolean success = result.bytesConsumed() != 0;
+
if (result.bytesProduced() == 0) {
dest.free();
} else {
dest.flip();
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} lreceive() - result {}", toString(), dest);
+ LOGGER.debug("{} qreceive() - result {}", toString(), dest);
}
next.messageReceived(this.mSession, dest);
}
switch (result.getHandshakeStatus()) {
+ case NEED_UNWRAP:
+ case NEED_UNWRAP_AGAIN:
+ if (success && source.hasRemaining()) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} qreceive() - handshake needs unwrap, looping", toString());
+ }
+ this.qreceive(next, message);
+ }
+ break;
case NEED_TASK:
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} lreceive() - handshake needs task, scheduling tasks", toString());
+ LOGGER.debug("{} qreceive() - handshake needs task, scheduling", toString());
}
this.schedule_task(next);
break;
case NEED_WRAP:
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} lreceive() - handshake needs to write a new message", toString());
+ LOGGER.debug("{} qreceive() - handshake needs wrap, invoking write", toString());
}
- this.lwrite(next);
+ this.qwrite(next);
break;
case FINISHED:
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} lreceive() - handshake finished, flushing pending requests", toString());
+ LOGGER.debug("{} qreceive() - handshake finished, flushing queue", toString());
}
this.lfinish(next);
this.lflush(next);
break;
+ case NOT_HANDSHAKING:
+ if (success && message.hasRemaining()) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} qreceive() - trying to decode more messages, looping", toString());
+ }
+ this.qreceive(next, message);
+ }
+ break;
}
-
- return result.bytesConsumed() > 0;
}
/**
@@ -159,7 +192,7 @@ public class SSL2HandlerG0 extends SSL2Handler {
}
if (this.mEncodeQueue.isEmpty()) {
- if (lwrite(next, request) == false) {
+ if (qwrite(next, request) == false) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} write() - unable to write right now, saving request for later", toString(),
request);
@@ -185,9 +218,9 @@ public class SSL2HandlerG0 extends SSL2Handler {
* @throws SSLException
*/
@SuppressWarnings("incomplete-switch")
- synchronized protected boolean lwrite(final NextFilter next, final WriteRequest request) throws SSLException {
+ synchronized protected boolean qwrite(final NextFilter next, final WriteRequest request) throws SSLException {
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} lwrite() - source {}", toString(), request);
+ LOGGER.debug("{} qwrite() - source {}", toString(), request);
}
final IoBuffer source = IoBuffer.class.cast(request.getMessage());
@@ -196,7 +229,7 @@ public class SSL2HandlerG0 extends SSL2Handler {
final SSLEngineResult result = this.mEngine.wrap(source.buf(), dest.buf());
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} lwrite() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}", toString(),
+ LOGGER.debug("{} qwrite() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}", toString(),
result.bytesConsumed(), result.bytesProduced(), result.getStatus(), result.getHandshakeStatus());
}
@@ -206,7 +239,7 @@ public class SSL2HandlerG0 extends SSL2Handler {
if (result.bytesConsumed() == 0) {
EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null);
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} lwrite() - result {}", toString(), encrypted);
+ LOGGER.debug("{} qwrite() - result {}", toString(), encrypted);
}
next.filterWrite(this.mSession, encrypted);
} else {
@@ -216,11 +249,11 @@ public class SSL2HandlerG0 extends SSL2Handler {
EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null);
this.mAckQueue.add(encrypted);
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} lwrite() - result {}", toString(), encrypted);
+ LOGGER.debug("{} qwrite() - result {}", toString(), encrypted);
}
next.filterWrite(this.mSession, encrypted);
if (this.mAckQueue.size() < MAX_UNACK_MESSAGES) {
- return lwrite(next, request); // write additional chunks
+ return qwrite(next, request); // write additional chunks
}
return false;
} else {
@@ -228,7 +261,7 @@ public class SSL2HandlerG0 extends SSL2Handler {
EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, request);
this.mAckQueue.add(encrypted);
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} lwrite() - result {}", toString(), encrypted);
+ LOGGER.debug("{} qwrite() - result {}", toString(), encrypted);
}
next.filterWrite(this.mSession, encrypted);
return true;
@@ -239,21 +272,21 @@ public class SSL2HandlerG0 extends SSL2Handler {
switch (result.getHandshakeStatus()) {
case NEED_TASK:
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} lwrite() - handshake needs task, scheduling tasks", toString());
+ LOGGER.debug("{} qwrite() - handshake needs task, scheduling", toString());
}
this.schedule_task(next);
break;
case NEED_WRAP:
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} lwrite() - handshake needs to encode a message", toString());
+ LOGGER.debug("{} qwrite() - handshake needs wrap, looping", toString());
}
- return this.lwrite(next, request);
+ return this.qwrite(next, request);
case FINISHED:
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} lwrite() - handshake finished, flushing pending requests", toString());
+ LOGGER.debug("{} qwrite() - handshake finished, flushing queue", toString());
}
this.lfinish(next);
- if (this.lwrite(next, request)) {
+ if (this.qwrite(next, request)) {
this.lflush(next);
return true;
}
@@ -271,24 +304,64 @@ public class SSL2HandlerG0 extends SSL2Handler {
* @return {@code true} if a message was generated and written
* @throws SSLException
*/
- @SuppressWarnings("incomplete-switch")
- synchronized protected boolean lwrite(NextFilter next) throws SSLException {
-
+ synchronized protected boolean qwrite(NextFilter next) throws SSLException {
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} lwrite() - internal", toString());
+ LOGGER.debug("{} qwrite() - internal", toString());
}
final IoBuffer source = ZERO;
final IoBuffer dest = allocate_encode_buffer(source.remaining());
+ return lwrite(next, source, dest);
+ }
+
+ /**
+ * Attempts to generate a handshake message and write the data to the IoSession.
+ * <p>
+ * If FAST_HANDSHAKE is enabled, this method will recursively loop in order to
+ * combine multiple messages into one buffer.
+ *
+ * @param session
+ * @param next
+ * @return {@code true} if a message was generated and written
+ * @throws SSLException
+ */
+ @SuppressWarnings("incomplete-switch")
+ protected boolean lwrite(NextFilter next, IoBuffer source, IoBuffer dest) throws SSLException {
final SSLEngineResult result = this.mEngine.wrap(source.buf(), dest.buf());
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} lwrite() - bytes-consumed {}, bytes-produced {}", toString(), result.bytesConsumed(),
- result.bytesProduced());
+ LOGGER.debug("{} lwrite() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}", toString(),
+ result.bytesConsumed(), result.bytesProduced(), result.getStatus(), result.getHandshakeStatus());
}
- if (result.bytesProduced() == 0) {
+ if (ENABLE_FAST_HANDSHAKE) {
+ /**
+ * Fast handshaking allows multiple handshake messages to be written to a single
+ * buffer. This reduces the number of network messages used during the handshake
+ * process.
+ *
+ * Additional handshake messages are only written if a message was produced in
+ * the last loop otherwise any additional messages need to be written by
+ * NEED_WRAP will be handled in the standard routine below which allocates a new
+ * buffer.
+ */
+ switch (result.getHandshakeStatus()) {
+ case NEED_WRAP:
+ switch (result.getStatus()) {
+ case OK:
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} lwrite() - handshake needs wrap, fast looping", toString());
+ }
+ return lwrite(next, source, dest);
+ }
+ break;
+ }
+ }
+
+ final boolean success = dest.position() != 0;
+
+ if (success == false) {
dest.free();
} else {
dest.flip();
@@ -300,30 +373,42 @@ public class SSL2HandlerG0 extends SSL2Handler {
}
switch (result.getHandshakeStatus()) {
- case NEED_TASK:
+ case NEED_UNWRAP:
+ case NEED_UNWRAP_AGAIN:
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} lwrite() - handshake needs task, scheduling tasks", toString());
+ LOGGER.debug("{} lwrite() - handshake needs unwrap, invoking receive", toString());
}
- this.schedule_task(next);
+ this.receive(next, ZERO);
break;
case NEED_WRAP:
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} lwrite() - handshake needs to encode a message", toString());
+ LOGGER.debug("{} lwrite() - handshake needs wrap, looping", toString());
+ }
+ this.qwrite(next);
+ break;
+ case NEED_TASK:
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} lwrite() - handshake needs task, scheduling", toString());
}
- this.lwrite(next);
+ this.schedule_task(next);
break;
case FINISHED:
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} lwrite() - handshake finished, flushing pending requests", toString());
+ LOGGER.debug("{} lwrite() - handshake finished, flushing queue", toString());
}
this.lfinish(next);
this.lflush(next);
break;
}
- return result.bytesProduced() > 0;
+ return success;
}
+ /**
+ * Marks the handshake as complete and emits any signals
+ *
+ * @param next
+ */
synchronized protected void lfinish(final NextFilter next) {
this.mHandshakeComplete = true;
next.event(this.mSession, SslEvent.SECURED);
@@ -348,7 +433,7 @@ public class SSL2HandlerG0 extends SSL2Handler {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} flush() - {}", toString(), current);
}
- if (lwrite(next, current) == false) {
+ if (qwrite(next, current) == false) {
this.mEncodeQueue.addFirst(current);
break;
}
@@ -367,19 +452,23 @@ public class SSL2HandlerG0 extends SSL2Handler {
}
mEngine.closeOutbound();
- this.lwrite(next);
+ this.qwrite(next);
}
protected void schedule_task(final NextFilter next) {
- if (this.mExecutor == null) {
- this.execute_task(next);
+ if (ENABLE_ASYNC_TASKS) {
+ if (this.mExecutor == null) {
+ this.execute_task(next);
+ } else {
+ this.mExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ SSL2HandlerG0.this.execute_task(next);
+ }
+ });
+ }
} else {
- this.mExecutor.execute(new Runnable() {
- @Override
- public void run() {
- SSL2HandlerG0.this.execute_task(next);
- }
- });
+ this.execute_task(next);
}
}
@@ -397,7 +486,7 @@ public class SSL2HandlerG0 extends SSL2Handler {
LOGGER.debug("{} task() - writing handshake messages", toString());
}
- lwrite(next);
+ qwrite(next);
} catch (SSLException e) {
e.printStackTrace();
}
diff --git a/mina-core/src/test/java/org/apache/mina/filter/ssl2/SSL2SimpleTest.java b/mina-core/src/test/java/org/apache/mina/filter/ssl2/SSL2SimpleTest.java
index aef4ead..8896875 100644
--- a/mina-core/src/test/java/org/apache/mina/filter/ssl2/SSL2SimpleTest.java
+++ b/mina-core/src/test/java/org/apache/mina/filter/ssl2/SSL2SimpleTest.java
@@ -2,6 +2,7 @@ package org.apache.mina.filter.ssl2;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
@@ -20,8 +21,6 @@ import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
-import org.apache.mina.core.write.DefaultWriteRequest;
-import org.apache.mina.core.write.WriteRequest;
import org.apache.mina.filter.ssl.SslDIRMINA937Test;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
@@ -63,23 +62,24 @@ public class SSL2SimpleTest {
socket_connector.getFilterChain().addFirst("ssl", filter);
socket_connector.setHandler(new DebugFilter());
- final InetSocketAddress server_address = new InetSocketAddress("0.0.0.0", 53301);
- socket_acceptor.bind(server_address);
+ socket_acceptor.bind(new InetSocketAddress("0.0.0.0", 0));
+
+ final SocketAddress server_address = socket_acceptor.getLocalAddress();
final IoFuture connect_future = socket_connector.connect(server_address);
connect_future.awaitUninterruptibly();
final IoSession client_socket = connect_future.getSession();
-// try {
-// Thread.sleep(1000);
-// } catch (InterruptedException e) {
-//
-// }
-
client_socket.write(createMosaicRequest()).awaitUninterruptibly();
- client_socket.closeNow();
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+
+ client_socket.closeNow().awaitUninterruptibly();
socket_connector.dispose();