You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jo...@apache.org on 2021/11/23 13:50:19 UTC
[mina] branch 2.2.X updated: Store captured errors in async tasks
This is an automated email from the ASF dual-hosted git repository.
johnnyv pushed a commit to branch 2.2.X
in repository https://gitbox.apache.org/repos/asf/mina.git
The following commit(s) were added to refs/heads/2.2.X by this push:
new 3b5e22e Store captured errors in async tasks
3b5e22e is described below
commit 3b5e22ec41d02ff975d1138b94e29df7e8883006
Author: Jonathan Valliere <jo...@apache.org>
AuthorDate: Tue Nov 23 08:50:17 2021 -0500
Store captured errors in async tasks
Errors captured during async tasks will now be stored and re-thrown
during valid filterchain operations. This allows the filterchain to
capture exceptions. I may need to disable ENABLE_ASYNC_TASKS later to
ensure that the filterchain captures all exceptions quickly.
---
.../java/org/apache/mina/filter/ssl/SSLFilter.java | 13 +-
.../org/apache/mina/filter/ssl/SSLHandlerG0.java | 140 +++++++++++++--------
2 files changed, 93 insertions(+), 60 deletions(-)
diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLFilter.java b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLFilter.java
index 400920a..2904c90 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLFilter.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLFilter.java
@@ -40,8 +40,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * A SSL processor which performs flow control of encrypted information
- * on the filter-chain.
+ * A SSL processor which performs flow control of encrypted information on the
+ * filter-chain.
* <p>
* The initial handshake is automatically enabled for "client" sessions once the
* filter is added to the filter-chain and the session is connected.
@@ -51,8 +51,7 @@ import org.slf4j.LoggerFactory;
*/
public class SSLFilter extends IoFilterAdapter {
/**
- * The presence of this attribute in a session indicates that the session is
- * secured.
+ * SSLSession object when the session is secured, otherwise null.
*/
static public final AttributeKey SSL_SECURED = new AttributeKey(SSLFilter.class, "status");
@@ -73,8 +72,8 @@ public class SSLFilter extends IoFilterAdapter {
new LinkedBlockingDeque<Runnable>(), new BasicThreadFactory("ssl-exec", true));
protected final SSLContext mContext;
- protected boolean mNeedClientAuth;
- protected boolean mWantClientAuth;
+ protected boolean mNeedClientAuth = false;
+ protected boolean mWantClientAuth = false;
protected String[] mEnabledCipherSuites;
protected String[] mEnabledProtocols;
@@ -230,7 +229,7 @@ public class SSLFilter extends IoFilterAdapter {
* Customization handler for creating the engine
*
* @param session source session
- * @param addr socket address used for fast reconnect
+ * @param addr socket address used for fast reconnect
* @return an SSLEngine
*/
protected SSLEngine createEngine(IoSession session, InetSocketAddress addr) {
diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java
index f6072d9..db007b3 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java
@@ -90,6 +90,11 @@ public class SSLHandlerG0 extends SSLHandler {
protected Thread mDecodeThread = null;
/**
+ * Captured error state
+ */
+ protected SSLException mPendingError = null;
+
+ /**
* Instantiates a new handler
*
* @param p engine
@@ -127,7 +132,7 @@ public class SSLHandlerG0 extends SSLHandler {
LOGGER.debug("{} open() - begin handshaking", toString());
}
this.mEngine.beginHandshake();
- this.write(next);
+ this.write_handshake(next);
}
}
}
@@ -143,7 +148,7 @@ public class SSLHandlerG0 extends SSLHandler {
this.mDecodeThread = Thread.currentThread();
final IoBuffer source = resume_decode_buffer(message);
try {
- this.qreceive(next, source);
+ this.receive_loop(next, source);
} finally {
suspend_decode_buffer(source);
this.mDecodeThread = null;
@@ -152,8 +157,10 @@ public class SSLHandlerG0 extends SSLHandler {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} receive() - recursion", toString());
}
- this.qreceive(next, this.mDecodeBuffer);
+ this.receive_loop(next, this.mDecodeBuffer);
}
+
+ this.throw_pending_error();
}
/**
@@ -165,9 +172,9 @@ public class SSLHandlerG0 extends SSLHandler {
* @throws SSLException
*/
@SuppressWarnings("incomplete-switch")
- protected void qreceive(final NextFilter next, final IoBuffer message) throws SSLException {
+ protected void receive_loop(final NextFilter next, final IoBuffer message) throws SSLException {
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} qreceive() - source {}", toString(), message);
+ LOGGER.debug("{} receive_loop() - source {}", toString(), message);
}
final IoBuffer source = message;
@@ -176,7 +183,7 @@ public class SSLHandlerG0 extends SSLHandler {
final SSLEngineResult result = mEngine.unwrap(source.buf(), dest.buf());
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} qreceive() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}", toString(),
+ LOGGER.debug("{} receive_loop() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}", toString(),
result.bytesConsumed(), result.bytesProduced(), result.getStatus(), result.getHandshakeStatus());
}
@@ -185,7 +192,7 @@ public class SSLHandlerG0 extends SSLHandler {
} else {
dest.flip();
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} qreceive() - result {}", toString(), dest);
+ LOGGER.debug("{} receive_loop() - result {}", toString(), dest);
}
next.messageReceived(this.mSession, dest);
}
@@ -194,35 +201,35 @@ public class SSLHandlerG0 extends SSLHandler {
case NEED_UNWRAP:
if (result.bytesConsumed() != 0 && message.hasRemaining()) {
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} qreceive() - handshake needs unwrap, looping", toString());
+ LOGGER.debug("{} receive_loop() - handshake needs unwrap, looping", toString());
}
- this.qreceive(next, message);
+ this.receive_loop(next, message);
}
break;
case NEED_TASK:
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} qreceive() - handshake needs task, scheduling", toString());
+ LOGGER.debug("{} receive_loop() - handshake needs task, scheduling", toString());
}
this.schedule_task(next);
break;
case NEED_WRAP:
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} qreceive() - handshake needs wrap, invoking write", toString());
+ LOGGER.debug("{} receive_loop() - handshake needs wrap, invoking write", toString());
}
- this.write(next);
+ this.write_handshake(next);
break;
case FINISHED:
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} qreceive() - handshake finished, flushing queue", toString());
+ LOGGER.debug("{} receive_loop() - handshake finished, flushing queue", toString());
}
- this.lfinish(next);
+ this.finish_handshake(next);
break;
case NOT_HANDSHAKING:
if ((result.bytesProduced() != 0 || result.bytesConsumed() != 0) && message.hasRemaining()) {
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} qreceive() - trying to decode more messages, looping", toString());
+ LOGGER.debug("{} receive_loop() - trying to decode more messages, looping", toString());
}
- this.qreceive(next, message);
+ this.receive_loop(next, message);
}
break;
}
@@ -241,6 +248,8 @@ public class SSLHandlerG0 extends SSLHandler {
}
this.flush(next);
}
+
+ this.throw_pending_error();
}
/**
@@ -257,7 +266,7 @@ public class SSLHandlerG0 extends SSLHandler {
}
if (this.mEncodeQueue.isEmpty()) {
- if (this.qwrite(next, request) == false) {
+ if (this.write_user_loop(next, request) == false) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} write() - unable to write right now, saving request for later", toString(),
request);
@@ -276,6 +285,8 @@ public class SSLHandlerG0 extends SSLHandler {
}
this.mEncodeQueue.add(request);
}
+
+ this.throw_pending_error();
}
/**
@@ -290,9 +301,10 @@ public class SSLHandlerG0 extends SSLHandler {
* @throws SSLException
*/
@SuppressWarnings("incomplete-switch")
- synchronized protected boolean qwrite(final NextFilter next, final WriteRequest request) throws SSLException {
+ synchronized protected boolean write_user_loop(final NextFilter next, final WriteRequest request)
+ throws SSLException {
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} qwrite() - source {}", toString(), request);
+ LOGGER.debug("{} write_user_loop() - source {}", toString(), request);
}
final IoBuffer source = IoBuffer.class.cast(request.getMessage());
@@ -301,8 +313,9 @@ public class SSLHandlerG0 extends SSLHandler {
final SSLEngineResult result = this.mEngine.wrap(source.buf(), dest.buf());
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} qwrite() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}", toString(),
- result.bytesConsumed(), result.bytesProduced(), result.getStatus(), result.getHandshakeStatus());
+ LOGGER.debug("{} write_user_loop() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}",
+ toString(), result.bytesConsumed(), result.bytesProduced(), result.getStatus(),
+ result.getHandshakeStatus());
}
if (result.bytesProduced() == 0) {
@@ -312,7 +325,7 @@ public class SSLHandlerG0 extends SSLHandler {
// an handshaking message must have been produced
EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null);
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} qwrite() - result {}", toString(), encrypted);
+ LOGGER.debug("{} write_user_loop() - result {}", toString(), encrypted);
}
next.filterWrite(this.mSession, encrypted);
// do not return because we want to enter the handshake switch
@@ -323,18 +336,18 @@ public class SSLHandlerG0 extends SSLHandler {
EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null);
this.mAckQueue.add(encrypted);
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} qwrite() - result {}", toString(), encrypted);
+ LOGGER.debug("{} write_user_loop() - result {}", toString(), encrypted);
}
next.filterWrite(this.mSession, encrypted);
if (this.mAckQueue.size() < MAX_UNACK_MESSAGES) {
- return qwrite(next, request); // write additional chunks
+ return write_user_loop(next, request); // write additional chunks
}
return false;
} else {
EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, request);
this.mAckQueue.add(encrypted);
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} qwrite() - result {}", toString(), encrypted);
+ LOGGER.debug("{} write_user_loop() - result {}", toString(), encrypted);
}
next.filterWrite(this.mSession, encrypted);
return true;
@@ -346,21 +359,21 @@ public class SSLHandlerG0 extends SSLHandler {
switch (result.getHandshakeStatus()) {
case NEED_TASK:
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} qwrite() - handshake needs task, scheduling", toString());
+ LOGGER.debug("{} write_user_loop() - handshake needs task, scheduling", toString());
}
this.schedule_task(next);
break;
case NEED_WRAP:
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} qwrite() - handshake needs wrap, looping", toString());
+ LOGGER.debug("{} write_user_loop() - handshake needs wrap, looping", toString());
}
- return this.qwrite(next, request);
+ return this.write_user_loop(next, request);
case FINISHED:
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} qwrite() - handshake finished, flushing queue", toString());
+ LOGGER.debug("{} write_user_loop() - handshake finished, flushing queue", toString());
}
- this.lfinish(next);
- return this.qwrite(next, request);
+ this.finish_handshake(next);
+ return this.write_user_loop(next, request);
}
return false;
@@ -375,14 +388,14 @@ public class SSLHandlerG0 extends SSLHandler {
*
* @throws SSLException
*/
- synchronized public boolean write(NextFilter next) throws SSLException {
+ synchronized protected boolean write_handshake(NextFilter next) throws SSLException {
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} write() - internal", toString());
+ LOGGER.debug("{} write_handshake() - internal", toString());
}
final IoBuffer source = ZERO;
final IoBuffer dest = allocate_encode_buffer(source.remaining());
- return lwrite(next, source, dest);
+ return write_handshake_loop(next, source, dest);
}
/**
@@ -400,7 +413,7 @@ public class SSLHandlerG0 extends SSLHandler {
* @throws SSLException
*/
@SuppressWarnings("incomplete-switch")
- protected boolean lwrite(NextFilter next, IoBuffer source, IoBuffer dest) throws SSLException {
+ protected boolean write_handshake_loop(NextFilter next, IoBuffer source, IoBuffer dest) throws SSLException {
if (this.mOutboundClosing && this.mEngine.isOutboundDone()) {
return false;
}
@@ -408,8 +421,9 @@ public class SSLHandlerG0 extends SSLHandler {
final SSLEngineResult result = this.mEngine.wrap(source.buf(), dest.buf());
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} lwrite() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}", toString(),
- result.bytesConsumed(), result.bytesProduced(), result.getStatus(), result.getHandshakeStatus());
+ LOGGER.debug("{} write_handshake_loop() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}",
+ toString(), result.bytesConsumed(), result.bytesProduced(), result.getStatus(),
+ result.getHandshakeStatus());
}
if (ENABLE_FAST_HANDSHAKE) {
@@ -428,9 +442,10 @@ public class SSLHandlerG0 extends SSLHandler {
switch (result.getStatus()) {
case OK:
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} lwrite() - handshake needs wrap, fast looping", toString());
+ LOGGER.debug("{} write_handshake_loop() - handshake needs wrap, fast looping",
+ toString());
}
- return lwrite(next, source, dest);
+ return write_handshake_loop(next, source, dest);
}
break;
}
@@ -443,7 +458,7 @@ public class SSLHandlerG0 extends SSLHandler {
} else {
dest.flip();
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} lwrite() - result {}", toString(), dest);
+ LOGGER.debug("{} write_handshake_loop() - result {}", toString(), dest);
}
final EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null);
next.filterWrite(this.mSession, encrypted);
@@ -452,27 +467,28 @@ public class SSLHandlerG0 extends SSLHandler {
switch (result.getHandshakeStatus()) {
case NEED_UNWRAP:
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} lwrite() - handshake needs unwrap, invoking receive", toString());
+ LOGGER.debug("{} lwrwrite_handshake_loopite() - handshake needs unwrap, invoking receive",
+ toString());
}
this.receive(next, ZERO);
break;
case NEED_WRAP:
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} lwrite() - handshake needs wrap, looping", toString());
+ LOGGER.debug("{} write_handshake_loop() - handshake needs wrap, looping", toString());
}
- this.write(next);
+ this.write_handshake(next);
break;
case NEED_TASK:
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} lwrite() - handshake needs task, scheduling", toString());
+ LOGGER.debug("{} write_handshake_loop() - handshake needs task, scheduling", toString());
}
this.schedule_task(next);
break;
case FINISHED:
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} lwrite() - handshake finished, flushing queue", toString());
+ LOGGER.debug("{} write_handshake_loop() - handshake finished, flushing queue", toString());
}
- this.lfinish(next);
+ this.finish_handshake(next);
break;
}
@@ -485,7 +501,7 @@ public class SSLHandlerG0 extends SSLHandler {
* @param next
* @throws SSLException
*/
- synchronized protected void lfinish(final NextFilter next) throws SSLException {
+ synchronized protected void finish_handshake(final NextFilter next) throws SSLException {
if (this.mHandshakeComplete == false) {
this.mHandshakeComplete = true;
this.mSession.setAttribute(SSLFilter.SSL_SECURED, this.mEngine.getSession());
@@ -522,7 +538,7 @@ public class SSLHandlerG0 extends SSLHandler {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} flush() - {}", toString(), current);
}
- if (this.qwrite(next, current) == false) {
+ if (this.write_user_loop(next, current) == false) {
this.mEncodeQueue.addFirst(current);
break;
}
@@ -530,7 +546,7 @@ public class SSLHandlerG0 extends SSLHandler {
if (this.mOutboundClosing && this.mEncodeQueue.size() == 0) {
this.mEngine.closeOutbound();
- this.write(next);
+ this.write_handshake(next);
}
}
@@ -544,7 +560,7 @@ public class SSLHandlerG0 extends SSLHandler {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} close() - closing session", toString());
}
-
+
if (this.mHandshakeComplete) {
next.event(this.mSession, SSLEvent.UNSECURED);
}
@@ -559,12 +575,27 @@ public class SSLHandlerG0 extends SSLHandler {
this.mEncodeQueue.clear();
}
this.mEngine.closeOutbound();
- this.write(next);
+ this.write_handshake(next);
} else {
this.flush(next);
}
}
+ synchronized protected void throw_pending_error() throws SSLException {
+ final SSLException e = this.mPendingError;
+ if (e != null) {
+ this.mPendingError = null;
+ throw e;
+ }
+ }
+
+ synchronized protected void store_pending_error(SSLException e) {
+ SSLException x = this.mPendingError;
+ if (x == null) {
+ this.mPendingError = e;
+ }
+ }
+
protected void schedule_task(final NextFilter next) {
if (ENABLE_ASYNC_TASKS) {
if (this.mExecutor == null) {
@@ -596,9 +627,12 @@ public class SSLHandlerG0 extends SSLHandler {
LOGGER.debug("{} task() - writing handshake messages", toString());
}
- write(next);
+ write_handshake(next);
} catch (SSLException e) {
- e.printStackTrace();
+ this.store_pending_error(e);
+ if (LOGGER.isErrorEnabled()) {
+ LOGGER.error("{} task() - storing error {}", toString(), e);
+ }
}
}
}