You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jo...@apache.org on 2021/11/23 13:50:19 UTC

[mina] branch 2.2.X updated: Store captured errors in async tasks

This is an automated email from the ASF dual-hosted git repository.

johnnyv pushed a commit to branch 2.2.X
in repository https://gitbox.apache.org/repos/asf/mina.git


The following commit(s) were added to refs/heads/2.2.X by this push:
     new 3b5e22e  Store captured errors in async tasks
3b5e22e is described below

commit 3b5e22ec41d02ff975d1138b94e29df7e8883006
Author: Jonathan Valliere <jo...@apache.org>
AuthorDate: Tue Nov 23 08:50:17 2021 -0500

    Store captured errors in async tasks
    
    Errors captured during async tasks will now be stored and re-thrown
    during valid filterchain operations.  This allows the filterchain to
    capture exceptions.  I may need to disable ENABLE_ASYNC_TASKS later to
    ensure that the filterchain captures all exceptions quickly.
---
 .../java/org/apache/mina/filter/ssl/SSLFilter.java |  13 +-
 .../org/apache/mina/filter/ssl/SSLHandlerG0.java   | 140 +++++++++++++--------
 2 files changed, 93 insertions(+), 60 deletions(-)

diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLFilter.java b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLFilter.java
index 400920a..2904c90 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLFilter.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLFilter.java
@@ -40,8 +40,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A SSL processor which performs flow control of encrypted information
- * on the filter-chain.
+ * A SSL processor which performs flow control of encrypted information on the
+ * filter-chain.
  * <p>
  * The initial handshake is automatically enabled for "client" sessions once the
  * filter is added to the filter-chain and the session is connected.
@@ -51,8 +51,7 @@ import org.slf4j.LoggerFactory;
  */
 public class SSLFilter extends IoFilterAdapter {
 	/**
-	 * The presence of this attribute in a session indicates that the session is
-	 * secured.
+	 * SSLSession object when the session is secured, otherwise null.
 	 */
 	static public final AttributeKey SSL_SECURED = new AttributeKey(SSLFilter.class, "status");
 
@@ -73,8 +72,8 @@ public class SSLFilter extends IoFilterAdapter {
 			new LinkedBlockingDeque<Runnable>(), new BasicThreadFactory("ssl-exec", true));
 
 	protected final SSLContext mContext;
-	protected boolean mNeedClientAuth;
-	protected boolean mWantClientAuth;
+	protected boolean mNeedClientAuth = false;
+	protected boolean mWantClientAuth = false;
 	protected String[] mEnabledCipherSuites;
 	protected String[] mEnabledProtocols;
 
@@ -230,7 +229,7 @@ public class SSLFilter extends IoFilterAdapter {
 	 * Customization handler for creating the engine
 	 * 
 	 * @param session source session
-	 * @param addr socket address used for fast reconnect
+	 * @param addr    socket address used for fast reconnect
 	 * @return an SSLEngine
 	 */
 	protected SSLEngine createEngine(IoSession session, InetSocketAddress addr) {
diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java
index f6072d9..db007b3 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java
@@ -90,6 +90,11 @@ public class SSLHandlerG0 extends SSLHandler {
 	protected Thread mDecodeThread = null;
 
 	/**
+	 * Captured error state
+	 */
+	protected SSLException mPendingError = null;
+
+	/**
 	 * Instantiates a new handler
 	 * 
 	 * @param p engine
@@ -127,7 +132,7 @@ public class SSLHandlerG0 extends SSLHandler {
 					LOGGER.debug("{} open() - begin handshaking", toString());
 				}
 				this.mEngine.beginHandshake();
-				this.write(next);
+				this.write_handshake(next);
 			}
 		}
 	}
@@ -143,7 +148,7 @@ public class SSLHandlerG0 extends SSLHandler {
 			this.mDecodeThread = Thread.currentThread();
 			final IoBuffer source = resume_decode_buffer(message);
 			try {
-				this.qreceive(next, source);
+				this.receive_loop(next, source);
 			} finally {
 				suspend_decode_buffer(source);
 				this.mDecodeThread = null;
@@ -152,8 +157,10 @@ public class SSLHandlerG0 extends SSLHandler {
 			if (LOGGER.isDebugEnabled()) {
 				LOGGER.debug("{} receive() - recursion", toString());
 			}
-			this.qreceive(next, this.mDecodeBuffer);
+			this.receive_loop(next, this.mDecodeBuffer);
 		}
+
+		this.throw_pending_error();
 	}
 
 	/**
@@ -165,9 +172,9 @@ public class SSLHandlerG0 extends SSLHandler {
 	 * @throws SSLException
 	 */
 	@SuppressWarnings("incomplete-switch")
-	protected void qreceive(final NextFilter next, final IoBuffer message) throws SSLException {
+	protected void receive_loop(final NextFilter next, final IoBuffer message) throws SSLException {
 		if (LOGGER.isDebugEnabled()) {
-			LOGGER.debug("{} qreceive() - source {}", toString(), message);
+			LOGGER.debug("{} receive_loop() - source {}", toString(), message);
 		}
 
 		final IoBuffer source = message;
@@ -176,7 +183,7 @@ public class SSLHandlerG0 extends SSLHandler {
 		final SSLEngineResult result = mEngine.unwrap(source.buf(), dest.buf());
 
 		if (LOGGER.isDebugEnabled()) {
-			LOGGER.debug("{} qreceive() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}", toString(),
+			LOGGER.debug("{} receive_loop() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}", toString(),
 					result.bytesConsumed(), result.bytesProduced(), result.getStatus(), result.getHandshakeStatus());
 		}
 
@@ -185,7 +192,7 @@ public class SSLHandlerG0 extends SSLHandler {
 		} else {
 			dest.flip();
 			if (LOGGER.isDebugEnabled()) {
-				LOGGER.debug("{} qreceive() - result {}", toString(), dest);
+				LOGGER.debug("{} receive_loop() - result {}", toString(), dest);
 			}
 			next.messageReceived(this.mSession, dest);
 		}
@@ -194,35 +201,35 @@ public class SSLHandlerG0 extends SSLHandler {
 			case NEED_UNWRAP:
 				if (result.bytesConsumed() != 0 && message.hasRemaining()) {
 					if (LOGGER.isDebugEnabled()) {
-						LOGGER.debug("{} qreceive() - handshake needs unwrap, looping", toString());
+						LOGGER.debug("{} receive_loop() - handshake needs unwrap, looping", toString());
 					}
-					this.qreceive(next, message);
+					this.receive_loop(next, message);
 				}
 				break;
 			case NEED_TASK:
 				if (LOGGER.isDebugEnabled()) {
-					LOGGER.debug("{} qreceive() - handshake needs task, scheduling", toString());
+					LOGGER.debug("{} receive_loop() - handshake needs task, scheduling", toString());
 				}
 				this.schedule_task(next);
 				break;
 			case NEED_WRAP:
 				if (LOGGER.isDebugEnabled()) {
-					LOGGER.debug("{} qreceive() - handshake needs wrap, invoking write", toString());
+					LOGGER.debug("{} receive_loop() - handshake needs wrap, invoking write", toString());
 				}
-				this.write(next);
+				this.write_handshake(next);
 				break;
 			case FINISHED:
 				if (LOGGER.isDebugEnabled()) {
-					LOGGER.debug("{} qreceive() - handshake finished, flushing queue", toString());
+					LOGGER.debug("{} receive_loop() - handshake finished, flushing queue", toString());
 				}
-				this.lfinish(next);
+				this.finish_handshake(next);
 				break;
 			case NOT_HANDSHAKING:
 				if ((result.bytesProduced() != 0 || result.bytesConsumed() != 0) && message.hasRemaining()) {
 					if (LOGGER.isDebugEnabled()) {
-						LOGGER.debug("{} qreceive() - trying to decode more messages, looping", toString());
+						LOGGER.debug("{} receive_loop() - trying to decode more messages, looping", toString());
 					}
-					this.qreceive(next, message);
+					this.receive_loop(next, message);
 				}
 				break;
 		}
@@ -241,6 +248,8 @@ public class SSLHandlerG0 extends SSLHandler {
 			}
 			this.flush(next);
 		}
+
+		this.throw_pending_error();
 	}
 
 	/**
@@ -257,7 +266,7 @@ public class SSLHandlerG0 extends SSLHandler {
 		}
 
 		if (this.mEncodeQueue.isEmpty()) {
-			if (this.qwrite(next, request) == false) {
+			if (this.write_user_loop(next, request) == false) {
 				if (LOGGER.isDebugEnabled()) {
 					LOGGER.debug("{} write() - unable to write right now, saving request for later", toString(),
 							request);
@@ -276,6 +285,8 @@ public class SSLHandlerG0 extends SSLHandler {
 			}
 			this.mEncodeQueue.add(request);
 		}
+
+		this.throw_pending_error();
 	}
 
 	/**
@@ -290,9 +301,10 @@ public class SSLHandlerG0 extends SSLHandler {
 	 * @throws SSLException
 	 */
 	@SuppressWarnings("incomplete-switch")
-	synchronized protected boolean qwrite(final NextFilter next, final WriteRequest request) throws SSLException {
+	synchronized protected boolean write_user_loop(final NextFilter next, final WriteRequest request)
+			throws SSLException {
 		if (LOGGER.isDebugEnabled()) {
-			LOGGER.debug("{} qwrite() - source {}", toString(), request);
+			LOGGER.debug("{} write_user_loop() - source {}", toString(), request);
 		}
 
 		final IoBuffer source = IoBuffer.class.cast(request.getMessage());
@@ -301,8 +313,9 @@ public class SSLHandlerG0 extends SSLHandler {
 		final SSLEngineResult result = this.mEngine.wrap(source.buf(), dest.buf());
 
 		if (LOGGER.isDebugEnabled()) {
-			LOGGER.debug("{} qwrite() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}", toString(),
-					result.bytesConsumed(), result.bytesProduced(), result.getStatus(), result.getHandshakeStatus());
+			LOGGER.debug("{} write_user_loop() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}",
+					toString(), result.bytesConsumed(), result.bytesProduced(), result.getStatus(),
+					result.getHandshakeStatus());
 		}
 
 		if (result.bytesProduced() == 0) {
@@ -312,7 +325,7 @@ public class SSLHandlerG0 extends SSLHandler {
 				// an handshaking message must have been produced
 				EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null);
 				if (LOGGER.isDebugEnabled()) {
-					LOGGER.debug("{} qwrite() - result {}", toString(), encrypted);
+					LOGGER.debug("{} write_user_loop() - result {}", toString(), encrypted);
 				}
 				next.filterWrite(this.mSession, encrypted);
 				// do not return because we want to enter the handshake switch
@@ -323,18 +336,18 @@ public class SSLHandlerG0 extends SSLHandler {
 					EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null);
 					this.mAckQueue.add(encrypted);
 					if (LOGGER.isDebugEnabled()) {
-						LOGGER.debug("{} qwrite() - result {}", toString(), encrypted);
+						LOGGER.debug("{} write_user_loop() - result {}", toString(), encrypted);
 					}
 					next.filterWrite(this.mSession, encrypted);
 					if (this.mAckQueue.size() < MAX_UNACK_MESSAGES) {
-						return qwrite(next, request); // write additional chunks
+						return write_user_loop(next, request); // write additional chunks
 					}
 					return false;
 				} else {
 					EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, request);
 					this.mAckQueue.add(encrypted);
 					if (LOGGER.isDebugEnabled()) {
-						LOGGER.debug("{} qwrite() - result {}", toString(), encrypted);
+						LOGGER.debug("{} write_user_loop() - result {}", toString(), encrypted);
 					}
 					next.filterWrite(this.mSession, encrypted);
 					return true;
@@ -346,21 +359,21 @@ public class SSLHandlerG0 extends SSLHandler {
 		switch (result.getHandshakeStatus()) {
 			case NEED_TASK:
 				if (LOGGER.isDebugEnabled()) {
-					LOGGER.debug("{} qwrite() - handshake needs task, scheduling", toString());
+					LOGGER.debug("{} write_user_loop() - handshake needs task, scheduling", toString());
 				}
 				this.schedule_task(next);
 				break;
 			case NEED_WRAP:
 				if (LOGGER.isDebugEnabled()) {
-					LOGGER.debug("{} qwrite() - handshake needs wrap, looping", toString());
+					LOGGER.debug("{} write_user_loop() - handshake needs wrap, looping", toString());
 				}
-				return this.qwrite(next, request);
+				return this.write_user_loop(next, request);
 			case FINISHED:
 				if (LOGGER.isDebugEnabled()) {
-					LOGGER.debug("{} qwrite() - handshake finished, flushing queue", toString());
+					LOGGER.debug("{} write_user_loop() - handshake finished, flushing queue", toString());
 				}
-				this.lfinish(next);
-				return this.qwrite(next, request);
+				this.finish_handshake(next);
+				return this.write_user_loop(next, request);
 		}
 
 		return false;
@@ -375,14 +388,14 @@ public class SSLHandlerG0 extends SSLHandler {
 	 * 
 	 * @throws SSLException
 	 */
-	synchronized public boolean write(NextFilter next) throws SSLException {
+	synchronized protected boolean write_handshake(NextFilter next) throws SSLException {
 		if (LOGGER.isDebugEnabled()) {
-			LOGGER.debug("{} write() - internal", toString());
+			LOGGER.debug("{} write_handshake() - internal", toString());
 		}
 
 		final IoBuffer source = ZERO;
 		final IoBuffer dest = allocate_encode_buffer(source.remaining());
-		return lwrite(next, source, dest);
+		return write_handshake_loop(next, source, dest);
 	}
 
 	/**
@@ -400,7 +413,7 @@ public class SSLHandlerG0 extends SSLHandler {
 	 * @throws SSLException
 	 */
 	@SuppressWarnings("incomplete-switch")
-	protected boolean lwrite(NextFilter next, IoBuffer source, IoBuffer dest) throws SSLException {
+	protected boolean write_handshake_loop(NextFilter next, IoBuffer source, IoBuffer dest) throws SSLException {
 		if (this.mOutboundClosing && this.mEngine.isOutboundDone()) {
 			return false;
 		}
@@ -408,8 +421,9 @@ public class SSLHandlerG0 extends SSLHandler {
 		final SSLEngineResult result = this.mEngine.wrap(source.buf(), dest.buf());
 
 		if (LOGGER.isDebugEnabled()) {
-			LOGGER.debug("{} lwrite() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}", toString(),
-					result.bytesConsumed(), result.bytesProduced(), result.getStatus(), result.getHandshakeStatus());
+			LOGGER.debug("{} write_handshake_loop() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}",
+					toString(), result.bytesConsumed(), result.bytesProduced(), result.getStatus(),
+					result.getHandshakeStatus());
 		}
 
 		if (ENABLE_FAST_HANDSHAKE) {
@@ -428,9 +442,10 @@ public class SSLHandlerG0 extends SSLHandler {
 					switch (result.getStatus()) {
 						case OK:
 							if (LOGGER.isDebugEnabled()) {
-								LOGGER.debug("{} lwrite() - handshake needs wrap, fast looping", toString());
+								LOGGER.debug("{} write_handshake_loop() - handshake needs wrap, fast looping",
+										toString());
 							}
-							return lwrite(next, source, dest);
+							return write_handshake_loop(next, source, dest);
 					}
 					break;
 			}
@@ -443,7 +458,7 @@ public class SSLHandlerG0 extends SSLHandler {
 		} else {
 			dest.flip();
 			if (LOGGER.isDebugEnabled()) {
-				LOGGER.debug("{} lwrite() - result {}", toString(), dest);
+				LOGGER.debug("{} write_handshake_loop() - result {}", toString(), dest);
 			}
 			final EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null);
 			next.filterWrite(this.mSession, encrypted);
@@ -452,27 +467,28 @@ public class SSLHandlerG0 extends SSLHandler {
 		switch (result.getHandshakeStatus()) {
 			case NEED_UNWRAP:
 				if (LOGGER.isDebugEnabled()) {
-					LOGGER.debug("{} lwrite() - handshake needs unwrap, invoking receive", toString());
+					LOGGER.debug("{} lwrwrite_handshake_loopite() - handshake needs unwrap, invoking receive",
+							toString());
 				}
 				this.receive(next, ZERO);
 				break;
 			case NEED_WRAP:
 				if (LOGGER.isDebugEnabled()) {
-					LOGGER.debug("{} lwrite() - handshake needs wrap, looping", toString());
+					LOGGER.debug("{} write_handshake_loop() - handshake needs wrap, looping", toString());
 				}
-				this.write(next);
+				this.write_handshake(next);
 				break;
 			case NEED_TASK:
 				if (LOGGER.isDebugEnabled()) {
-					LOGGER.debug("{} lwrite() - handshake needs task, scheduling", toString());
+					LOGGER.debug("{} write_handshake_loop() - handshake needs task, scheduling", toString());
 				}
 				this.schedule_task(next);
 				break;
 			case FINISHED:
 				if (LOGGER.isDebugEnabled()) {
-					LOGGER.debug("{} lwrite() - handshake finished, flushing queue", toString());
+					LOGGER.debug("{} write_handshake_loop() - handshake finished, flushing queue", toString());
 				}
-				this.lfinish(next);
+				this.finish_handshake(next);
 				break;
 		}
 
@@ -485,7 +501,7 @@ public class SSLHandlerG0 extends SSLHandler {
 	 * @param next
 	 * @throws SSLException
 	 */
-	synchronized protected void lfinish(final NextFilter next) throws SSLException {
+	synchronized protected void finish_handshake(final NextFilter next) throws SSLException {
 		if (this.mHandshakeComplete == false) {
 			this.mHandshakeComplete = true;
 			this.mSession.setAttribute(SSLFilter.SSL_SECURED, this.mEngine.getSession());
@@ -522,7 +538,7 @@ public class SSLHandlerG0 extends SSLHandler {
 			if (LOGGER.isDebugEnabled()) {
 				LOGGER.debug("{} flush() - {}", toString(), current);
 			}
-			if (this.qwrite(next, current) == false) {
+			if (this.write_user_loop(next, current) == false) {
 				this.mEncodeQueue.addFirst(current);
 				break;
 			}
@@ -530,7 +546,7 @@ public class SSLHandlerG0 extends SSLHandler {
 
 		if (this.mOutboundClosing && this.mEncodeQueue.size() == 0) {
 			this.mEngine.closeOutbound();
-			this.write(next);
+			this.write_handshake(next);
 		}
 	}
 
@@ -544,7 +560,7 @@ public class SSLHandlerG0 extends SSLHandler {
 		if (LOGGER.isDebugEnabled()) {
 			LOGGER.debug("{} close() - closing session", toString());
 		}
-		
+
 		if (this.mHandshakeComplete) {
 			next.event(this.mSession, SSLEvent.UNSECURED);
 		}
@@ -559,12 +575,27 @@ public class SSLHandlerG0 extends SSLHandler {
 				this.mEncodeQueue.clear();
 			}
 			this.mEngine.closeOutbound();
-			this.write(next);
+			this.write_handshake(next);
 		} else {
 			this.flush(next);
 		}
 	}
 
+	synchronized protected void throw_pending_error() throws SSLException {
+		final SSLException e = this.mPendingError;
+		if (e != null) {
+			this.mPendingError = null;
+			throw e;
+		}
+	}
+
+	synchronized protected void store_pending_error(SSLException e) {
+		SSLException x = this.mPendingError;
+		if (x == null) {
+			this.mPendingError = e;
+		}
+	}
+
 	protected void schedule_task(final NextFilter next) {
 		if (ENABLE_ASYNC_TASKS) {
 			if (this.mExecutor == null) {
@@ -596,9 +627,12 @@ public class SSLHandlerG0 extends SSLHandler {
 					LOGGER.debug("{} task() - writing handshake messages", toString());
 				}
 
-				write(next);
+				write_handshake(next);
 			} catch (SSLException e) {
-				e.printStackTrace();
+				this.store_pending_error(e);
+				if (LOGGER.isErrorEnabled()) {
+					LOGGER.error("{} task() - storing error {}", toString(), e);
+				}
 			}
 		}
 	}