You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/09 12:02:32 UTC

[7/9] flink git commit: [FLINK-5830] [distributed coordination] Handle OutOfMemory and fatal errors during process async message in akka rpc actor

[FLINK-5830] [distributed coordination] Handle OutOfMemory and fatal errors during process async message in akka rpc actor

This closes #3360


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/527eabdd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/527eabdd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/527eabdd

Branch: refs/heads/master
Commit: 527eabdd4fb3e34b0698b53ec9a7fb1348882791
Parents: 8b49ee5
Author: \u6dd8\u6c5f <ta...@alibaba-inc.com>
Authored: Mon Feb 20 17:54:54 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 9 13:00:56 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/util/ExceptionUtils.java     | 13 +++++++++++++
 .../apache/flink/runtime/rpc/akka/AkkaRpcActor.java    |  6 ++++--
 2 files changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/527eabdd/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 7167a0b..ca81465 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -113,6 +113,19 @@ public final class ExceptionUtils {
 	}
 
 	/**
+	 * Rethrows the given {@code Throwable}, if it represents an error that is fatal to the JVM
+	 * or an out-of-memory error. See {@link ExceptionUtils#isJvmFatalError(Throwable)} for a
+	 * definition of fatal errors.
+	 *
+	 * @param t The Throwable to check and rethrow.
+	 */
+	public static void rethrowIfFatalErrorOrOOM(Throwable t) {
+		if (isJvmFatalError(t) || t instanceof OutOfMemoryError) {
+			throw (Error) t;
+		}
+	}
+
+	/**
 	 * Adds a new exception as a {@link Throwable#addSuppressed(Throwable) suppressed exception}
 	 * to a prior exception, or returns the new exception, if no prior exception exists.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/527eabdd/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 264ba96..99f8211 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
 
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
+import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -271,8 +272,9 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp
 			// run immediately
 			try {
 				runAsync.getRunnable().run();
-			} catch (final Throwable e) {
-				LOG.error("Caught exception while executing runnable in main thread.", e);
+			} catch (Throwable t) {
+				LOG.error("Caught exception while executing runnable in main thread.", t);
+				ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
 			}
 		}
 		else {