You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jo...@apache.org on 2021/08/02 03:40:53 UTC
[mina] 08/15: Adds SSL2 flow control
This is an automated email from the ASF dual-hosted git repository.
johnnyv pushed a commit to branch bugfix/DIRMINA1132
in repository https://gitbox.apache.org/repos/asf/mina.git
commit d64c8c7fd46cc14cf37aa5505860913925311d17
Author: Jonathan Valliere <jo...@apache.org>
AuthorDate: Sat Jul 24 13:33:57 2021 -0400
Adds SSL2 flow control
---
.../mina/filter/ssl2/EncryptedWriteRequest.java | 23 +---
.../org/apache/mina/filter/ssl2/SSL2Filter.java | 83 ++++++++-------
.../org/apache/mina/filter/ssl2/SSL2Handler.java | 20 +++-
.../org/apache/mina/filter/ssl2/SSL2HandlerG0.java | 116 ++++++++++++++-------
.../apache/mina/filter/ssl2/SSL2SimpleTest.java | 21 ++--
5 files changed, 157 insertions(+), 106 deletions(-)
diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl2/EncryptedWriteRequest.java b/mina-core/src/main/java/org/apache/mina/filter/ssl2/EncryptedWriteRequest.java
index 91fabc7..caf32d7 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/ssl2/EncryptedWriteRequest.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/ssl2/EncryptedWriteRequest.java
@@ -1,32 +1,19 @@
package org.apache.mina.filter.ssl2;
-import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.write.DefaultWriteRequest;
import org.apache.mina.core.write.WriteRequest;
public class EncryptedWriteRequest extends DefaultWriteRequest {
// The original message
- private WriteRequest parentRequest;
+ private WriteRequest originalRequest;
public EncryptedWriteRequest(Object encodedMessage, WriteRequest parent) {
- super(encodedMessage, null);
+ super(encodedMessage, parent != null ? parent.getFuture() : null);
+ this.originalRequest = parent != null ? parent : this;
}
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isEncoded() {
- return true;
- }
-
- public WriteRequest getParentRequest() {
- return this.parentRequest;
- }
-
- @Override
- public WriteFuture getFuture() {
- return (this.getParentRequest() != null) ? this.getParentRequest().getFuture() : super.getFuture();
+ public WriteRequest getOriginalRequest() {
+ return this.originalRequest;
}
}
\ No newline at end of file
diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Filter.java b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Filter.java
index 052f806..80e5688 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Filter.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Filter.java
@@ -19,6 +19,7 @@
*/
package org.apache.mina.filter.ssl2;
+import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
@@ -38,16 +39,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * An SSL filter that encrypts and decrypts the data exchanged in the session.
- * Adding this filter triggers SSL handshake procedure immediately by sending a
- * SSL 'hello' message, so you don't need to call {@link #startSsl(IoSession)}
- * manually unless you are implementing StartTLS (see below). If you don't want
- * the handshake procedure to start immediately, please specify {@code false} as
- * {@code autoStart} parameter in the constructor.
+ * An SSL Filter which simplifies and controls the flow of encrypted information
+ * on the filter-chain.
* <p>
- * This filter uses an {@link SSLEngine} which was introduced in Java 5, so Java
- * version 5 or above is mandatory to use this filter. And please note that this
- * filter only works for TCP/IP connections.
+ * The initial handshake is automatically enabled for "client" sessions once the
+ * filter is added to the filter-chain and the session is connected.
*
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
@@ -160,15 +156,6 @@ public class SSL2Filter extends IoFilterAdapter {
this.mEnabledProtocols = protocols;
}
- /**
- * Executed just before the filter is added into the chain, we do :
- * <ul>
- * <li>check that we don't have a SSL filter already present
- * <li>we update the next filter
- * <li>we create the SSL handler helper class
- * <li>and we store it into the session's Attributes
- * </ul>
- */
@Override
public void onPreAdd(IoFilterChain parent, String name, NextFilter next) throws Exception {
// Check that we don't have a SSL filter already present in the chain
@@ -184,34 +171,47 @@ public class SSL2Filter extends IoFilterAdapter {
}
@Override
- public void onPreRemove(IoFilterChain parent, String name, NextFilter next) throws Exception {
+ public void onPostAdd(IoFilterChain parent, String name, NextFilter next) throws Exception {
IoSession session = parent.getSession();
- session.removeAttribute(SSL_HANDLER);
+ if (session.isConnected()) {
+ this.sessionConnected(next, session);
+ }
+ super.onPostAdd(parent, name, next);
}
@Override
- public void sessionOpened(NextFilter next, IoSession session) throws Exception {
-
- LOGGER.debug("session openend {}", session);
+ public void onPreRemove(IoFilterChain parent, String name, NextFilter next) throws Exception {
+ IoSession session = parent.getSession();
+ SSL2Handler x = SSL2Handler.class.cast(session.removeAttribute(SSL_HANDLER));
+ if (x != null) {
+ x.close(next);
+ }
+ }
+ protected void sessionConnected(NextFilter next, IoSession session) throws Exception {
SSL2Handler x = SSL2Handler.class.cast(session.getAttribute(SSL_HANDLER));
if (x == null) {
- SSLEngine e = mContext.createSSLEngine();
-
+ InetSocketAddress s = InetSocketAddress.class.cast(session.getRemoteAddress());
+ SSLEngine e = mContext.createSSLEngine(s.getHostString(), s.getPort());
e.setNeedClientAuth(mNeedClientAuth);
e.setWantClientAuth(mWantClientAuth);
e.setEnabledCipherSuites(mEnabledCipherSuites);
e.setEnabledProtocols(mEnabledProtocols);
e.setUseClientMode(!session.isServer());
-
x = new SSL2HandlerG0(e, EXECUTOR, session);
-
session.setAttribute(SSL_HANDLER, x);
}
x.open(next);
-
+ }
+
+ @Override
+ public void sessionOpened(NextFilter next, IoSession session) throws Exception {
+ if (LOGGER.isDebugEnabled())
+ LOGGER.debug("session {} openend", session);
+
+ this.sessionConnected(next, session);
super.sessionOpened(next, session);
}
@@ -222,29 +222,32 @@ public class SSL2Filter extends IoFilterAdapter {
}
@Override
- public void messageSent(NextFilter next, IoSession session, WriteRequest writeRequest) throws Exception {
- if (writeRequest instanceof EncryptedWriteRequest) {
- EncryptedWriteRequest e = EncryptedWriteRequest.class.cast(writeRequest);
+ public void messageSent(NextFilter next, IoSession session, WriteRequest request) throws Exception {
+ if (LOGGER.isDebugEnabled())
+ LOGGER.debug("session {} sent {}", session, request);
+
+ if (request instanceof EncryptedWriteRequest) {
+ EncryptedWriteRequest e = EncryptedWriteRequest.class.cast(request);
SSL2Handler x = SSL2Handler.class.cast(session.getAttribute(SSL_HANDLER));
- x.ack(next, writeRequest);
- if (e.getParentRequest() != null) {
- next.messageSent(session, e.getParentRequest());
+ x.ack(next, request);
+ if (e.getOriginalRequest() != e) {
+ next.messageSent(session, e.getOriginalRequest());
}
} else {
- super.messageSent(next, session, writeRequest);
+ super.messageSent(next, session, request);
}
}
@Override
- public void filterWrite(NextFilter next, IoSession session, WriteRequest writeRequest) throws Exception {
+ public void filterWrite(NextFilter next, IoSession session, WriteRequest request) throws Exception {
- LOGGER.debug("session write {}", session);
+ LOGGER.debug("session {} write {}", session, request);
- if (writeRequest instanceof EncryptedWriteRequest) {
- super.filterWrite(next, session, writeRequest);
+ if (request instanceof EncryptedWriteRequest) {
+ super.filterWrite(next, session, request);
} else {
SSL2Handler x = SSL2Handler.class.cast(session.getAttribute(SSL_HANDLER));
- x.write(next, writeRequest);
+ x.write(next, request);
}
}
}
diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Handler.java b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Handler.java
index 1e8e59b..d8eb1eb 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Handler.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Handler.java
@@ -18,6 +18,21 @@ import org.slf4j.LoggerFactory;
public abstract class SSL2Handler {
/**
+ * Minimum size of encoder buffer in packets
+ */
+ static protected final int MIN_ENCODER_PACKETS = 2;
+
+ /**
+ * Maximum size of encoder buffer in packets
+ */
+ static protected final int MAX_ENCODER_PACKETS = 8;
+
+ /**
+ * Zero length buffer used to prime the ssl engine
+ */
+ static protected final IoBuffer ZERO = IoBuffer.allocate(0, true);
+
+ /**
* Static logger
*/
static protected final Logger LOGGER = LoggerFactory.getLogger(SSL2Handler.class);
@@ -25,7 +40,7 @@ public abstract class SSL2Handler {
/**
* Write Requests which are enqueued prior to the completion of the handshaking
*/
- protected final Deque<WriteRequest> mWriteQueue = new ConcurrentLinkedDeque<>();
+ protected final Deque<WriteRequest> mEncodeQueue = new ConcurrentLinkedDeque<>();
/**
* Requests which have been sent to the socket and waiting acknowledgment
@@ -200,7 +215,8 @@ public abstract class SSL2Handler {
SSLSession session = this.mEngine.getHandshakeSession();
if (session == null)
session = this.mEngine.getSession();
- int packets = Math.max(2, Math.min(16, 1 + (estimate / session.getApplicationBufferSize())));
+ int packets = Math.max(MIN_ENCODER_PACKETS,
+ Math.min(MAX_ENCODER_PACKETS, 1 + (estimate / session.getApplicationBufferSize())));
return IoBuffer.allocate(packets * session.getPacketBufferSize());
}
diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2HandlerG0.java b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2HandlerG0.java
index 803927f..9961a32 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2HandlerG0.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2HandlerG0.java
@@ -4,7 +4,6 @@ import java.util.concurrent.Executor;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLException;
import org.apache.mina.core.buffer.IoBuffer;
@@ -14,36 +13,42 @@ import org.apache.mina.core.write.WriteRequest;
public class SSL2HandlerG0 extends SSL2Handler {
+ /**
+ * Maximum number of messages waiting acknowledgement
+ */
+ static protected final int MAX_UNACK_MESSAGES = 6;
+
public SSL2HandlerG0(SSLEngine p, Executor e, IoSession s) {
super(p, e, s);
}
+ /**
+ * {@inheritDoc}
+ */
synchronized public void open(final NextFilter next) throws SSLException {
if (this.mEngine.getUseClientMode()) {
-
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} open() - begin handshaking", toString());
}
-
this.mEngine.beginHandshake();
this.lwrite(next);
}
}
+ /**
+ * {@inheritDoc}
+ */
synchronized public void receive(final NextFilter next, final IoBuffer message) throws SSLException {
-
+ final IoBuffer source = resume_decode_buffer(message);
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} receive() - source {}", toString(), message);
+ LOGGER.debug("{} receive() - source {}", toString(), source);
}
-
- final IoBuffer input = resume_decode_buffer(message);
-
try {
- while (lreceive(next, input) && message.hasRemaining()) {
- // spin
+ while (lreceive(next, source) && message.hasRemaining()) {
+ // loop until the message is consumed
}
} finally {
- save_decode_buffer(input);
+ save_decode_buffer(source);
}
}
@@ -63,7 +68,7 @@ public class SSL2HandlerG0 extends SSL2Handler {
LOGGER.debug("{} lreceive() - source {}", toString(), message);
}
- final IoBuffer source = message == null ? IoBuffer.allocate(0) : message;
+ final IoBuffer source = message == null ? ZERO : message;
final IoBuffer dest = allocate_app_buffer(source.remaining());
final SSLEngineResult result = mEngine.unwrap(source.buf(), dest.buf());
@@ -77,11 +82,9 @@ public class SSL2HandlerG0 extends SSL2Handler {
dest.free();
} else {
dest.flip();
-
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} lreceive() - result {}", toString(), dest);
}
-
next.messageReceived(this.mSession, dest);
}
@@ -109,30 +112,42 @@ public class SSL2HandlerG0 extends SSL2Handler {
return result.bytesConsumed() > 0;
}
+ /**
+ * {@inheritDoc}
+ */
synchronized public void ack(final NextFilter next, final WriteRequest request) throws SSLException {
-
+ if (this.mAckQueue.remove(request)) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} ack() - {}", toString(), request);
+ }
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} ack() - checking to see if any messages can be flushed", toString(), request);
+ }
+ this.lflush(next);
+ }
}
+ /**
+ * {@inheritDoc}
+ */
synchronized public void write(final NextFilter next, final WriteRequest request) throws SSLException {
-
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} write() - source {}", toString(), request);
}
- if (this.mWriteQueue.isEmpty()) {
+ if (this.mEncodeQueue.isEmpty()) {
if (lwrite(next, request) == false) {
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} write() - unable to write right now, saving request for later", toString(), request);
+ LOGGER.debug("{} write() - unable to write right now, saving request for later", toString(),
+ request);
}
-
- this.mWriteQueue.add(request);
+ this.mEncodeQueue.add(request);
}
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} write() - unable to write right now, saving request for later", toString(), request);
}
-
- this.mWriteQueue.add(request);
+ this.mEncodeQueue.add(request);
}
}
@@ -142,7 +157,8 @@ public class SSL2HandlerG0 extends SSL2Handler {
* @param request
* @param session
* @param next
- * @return {@code true} if the WriteRequest was successfully written
+ * @return {@code true} if the WriteRequest was fully consumed; otherwise
+ * {@code false}
* @throws SSLException
*/
@SuppressWarnings("incomplete-switch")
@@ -166,19 +182,35 @@ public class SSL2HandlerG0 extends SSL2Handler {
dest.free();
} else {
if (result.bytesConsumed() == 0) {
- next.filterWrite(this.mSession, new EncryptedWriteRequest(dest, null));
+ EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} lwrite() - result {}", toString(), encrypted);
+ }
+ next.filterWrite(this.mSession, encrypted);
} else {
// then we probably consumed some data
dest.flip();
if (source.hasRemaining()) {
- next.filterWrite(this.mSession, new EncryptedWriteRequest(dest, null));
- lwrite(next, request); // write additional chunks
+ EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null);
+ this.mAckQueue.add(encrypted);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} lwrite() - result {}", toString(), encrypted);
+ }
+ next.filterWrite(this.mSession, encrypted);
+ if (this.mAckQueue.size() < MAX_UNACK_MESSAGES) {
+ return lwrite(next, request); // write additional chunks
+ }
+ return false;
} else {
source.rewind();
- next.filterWrite(this.mSession, new EncryptedWriteRequest(dest, request));
+ EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, request);
+ this.mAckQueue.add(encrypted);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} lwrite() - result {}", toString(), encrypted);
+ }
+ next.filterWrite(this.mSession, encrypted);
+ return true;
}
-
- return true;
}
}
@@ -206,7 +238,6 @@ public class SSL2HandlerG0 extends SSL2Handler {
}
return false;
-
}
/**
@@ -224,7 +255,7 @@ public class SSL2HandlerG0 extends SSL2Handler {
LOGGER.debug("{} lwrite() - internal", toString());
}
- final IoBuffer source = IoBuffer.allocate(0);
+ final IoBuffer source = ZERO;
final IoBuffer dest = allocate_encode_buffer(source.remaining());
final SSLEngineResult result = this.mEngine.wrap(source.buf(), dest.buf());
@@ -234,15 +265,13 @@ public class SSL2HandlerG0 extends SSL2Handler {
result.bytesProduced());
}
- if (dest.position() == 0) {
+ if (result.bytesProduced() == 0) {
dest.free();
} else {
dest.flip();
-
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} lwrite() - result {}", toString(), dest);
}
-
final EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null);
next.filterWrite(this.mSession, encrypted);
}
@@ -271,8 +300,14 @@ public class SSL2HandlerG0 extends SSL2Handler {
return result.bytesProduced() > 0;
}
+ /**
+ * Flushes the encode queue
+ *
+ * @param next
+ * @throws SSLException
+ */
synchronized protected void lflush(final NextFilter next) throws SSLException {
- if (this.mWriteQueue.isEmpty()) {
+ if (this.mEncodeQueue.isEmpty()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} flush() - no saved messages", toString());
}
@@ -280,15 +315,20 @@ public class SSL2HandlerG0 extends SSL2Handler {
}
WriteRequest current = null;
-
- while ((current = this.mWriteQueue.poll()) != null) {
+ while ((this.mAckQueue.size() < MAX_UNACK_MESSAGES) && (current = this.mEncodeQueue.poll()) != null) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} flush() - {}", toString(), current);
+ }
if (lwrite(next, current) == false) {
- this.mWriteQueue.addFirst(current);
+ this.mEncodeQueue.addFirst(current);
break;
}
}
}
+ /**
+ * {@inheritDoc}
+ */
synchronized public void close(final NextFilter next) throws SSLException {
if (mEngine.isOutboundDone())
return;
diff --git a/mina-core/src/test/java/org/apache/mina/filter/ssl2/SSL2SimpleTest.java b/mina-core/src/test/java/org/apache/mina/filter/ssl2/SSL2SimpleTest.java
index beb6577..aef4ead 100644
--- a/mina-core/src/test/java/org/apache/mina/filter/ssl2/SSL2SimpleTest.java
+++ b/mina-core/src/test/java/org/apache/mina/filter/ssl2/SSL2SimpleTest.java
@@ -77,13 +77,7 @@ public class SSL2SimpleTest {
//
// }
- client_socket.write(createWriteRequest());
-
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
-
- }
+ client_socket.write(createMosaicRequest()).awaitUninterruptibly();
client_socket.closeNow();
@@ -104,7 +98,18 @@ public class SSL2SimpleTest {
}
}
- public static IoBuffer createWriteRequest() {
+ public static IoBuffer createMosaicRequest() {
+ // HTTP request
+ IoBuffer message = IoBuffer.allocate(100 * 1024);
+ while (message.hasRemaining()) {
+ message.putInt(0xFF332211);
+ }
+ message.flip();
+
+ return message;
+ }
+
+ public static IoBuffer createHttpRequest() {
// HTTP request
StringBuilder http = new StringBuilder();
http.append("GET / HTTP/1.0\r\n");