You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/09 12:02:32 UTC
[7/9] flink git commit: [FLINK-5830] [distributed coordination]
Handle OutOfMemory and fatal errors during process async message in akka rpc
actor
[FLINK-5830] [distributed coordination] Handle OutOfMemory and fatal errors during process async message in akka rpc actor
This closes #3360
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/527eabdd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/527eabdd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/527eabdd
Branch: refs/heads/master
Commit: 527eabdd4fb3e34b0698b53ec9a7fb1348882791
Parents: 8b49ee5
Author: \u6dd8\u6c5f <ta...@alibaba-inc.com>
Authored: Mon Feb 20 17:54:54 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 9 13:00:56 2017 +0100
----------------------------------------------------------------------
.../java/org/apache/flink/util/ExceptionUtils.java | 13 +++++++++++++
.../apache/flink/runtime/rpc/akka/AkkaRpcActor.java | 6 ++++--
2 files changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/527eabdd/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 7167a0b..ca81465 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -113,6 +113,19 @@ public final class ExceptionUtils {
}
/**
+ * Rethrows the given {@code Throwable}, if it represents an error that is fatal to the JVM
+ * or an out-of-memory error. See {@link ExceptionUtils#isJvmFatalError(Throwable)} for a
+ * definition of fatal errors.
+ *
+ * @param t The Throwable to check and rethrow.
+ */
+ public static void rethrowIfFatalErrorOrOOM(Throwable t) {
+ if (isJvmFatalError(t) || t instanceof OutOfMemoryError) {
+ throw (Error) t;
+ }
+ }
+
+ /**
* Adds a new exception as a {@link Throwable#addSuppressed(Throwable) suppressed exception}
* to a prior exception, or returns the new exception, if no prior exception exists.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/527eabdd/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 264ba96..99f8211 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
+import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -271,8 +272,9 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp
// run immediately
try {
runAsync.getRunnable().run();
- } catch (final Throwable e) {
- LOG.error("Caught exception while executing runnable in main thread.", e);
+ } catch (Throwable t) {
+ LOG.error("Caught exception while executing runnable in main thread.", t);
+ ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
}
}
else {