You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/08/15 07:00:04 UTC

[GitHub] asfgit closed pull request #6334: [FLINK-5232] Add a Thread default uncaught exception handler on the JobManager

asfgit closed pull request #6334: [FLINK-5232] Add a Thread default uncaught exception handler on the JobManager
URL: https://github.com/apache/flink/pull/6334
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 57ca9d4160a..613bb4605c3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -103,7 +103,7 @@ object AkkaUtils {
   def createActorSystem(akkaConfig: Config): ActorSystem = {
     // Initialize slf4j as logger of Akka's Netty instead of java.util.logging (FLINK-1650)
     InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory)
-    ActorSystem.create("flink", akkaConfig)
+    new RobustActorSystem("flink", akkaConfig)
   }
 
   /**
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/RobustActorSystem.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/RobustActorSystem.scala
new file mode 100644
index 00000000000..5a326de2fda
--- /dev/null
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/RobustActorSystem.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package akka.actor
+
+import akka.actor.RobustActorSystem.LOG
+import akka.actor.setup.ActorSystemSetup
+import com.typesafe.config.{Config, ConfigFactory}
+import grizzled.slf4j.Logger
+
+/**
+  * A robust actor system which can caught the escape exception
+  *
+  * @param name the actor system name.
+  * @param applicationConfig the actor system configuration.
+  */
+class RobustActorSystem(name: String, applicationConfig: Config)
+  extends ActorSystemImpl(name, applicationConfig, ActorSystem.findClassLoader(),
+    None, guardianProps = None, ActorSystemSetup.empty) {
+
+  override protected def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler =
+    new Thread.UncaughtExceptionHandler() {
+
+      override def uncaughtException(t: Thread, e: Throwable): Unit = {
+        try
+          LOG.error("Thread " +  t.getName + " died due to an uncaught exception. Killing process.")
+        finally Runtime.getRuntime.halt(-1)
+      }
+
+    }
+}
+
+object RobustActorSystem {
+
+  private val LOG = Logger(classOf[RobustActorSystem])
+
+  def apply(name: String = "default"): ActorSystem = {
+    val classLoader = ActorSystem.findClassLoader()
+    apply(name, ConfigFactory.load(classLoader), classLoader)
+  }
+
+  def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem = new RobustActorSystem(name, config).start()
+
+}
+
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
index 00a84755daa..1390e3e310a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
@@ -26,6 +26,7 @@
 import akka.actor.ActorSystem;
 import akka.actor.Kill;
 import akka.actor.Props;
+import akka.actor.RobustActorSystem;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import org.junit.AfterClass;
@@ -46,7 +47,7 @@
 
 	@BeforeClass
 	public static void setup() {
-		actorSystem = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
+		actorSystem = new RobustActorSystem("TestingActorSystem", TestingUtils.testConfig());
 	}
 
 	@AfterClass
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
index 5d7a520ae10..b1d860e3ea2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.instance;
 
 import akka.actor.ActorSystem;
+import akka.actor.RobustActorSystem;
 import akka.testkit.JavaTestKit;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -55,7 +56,7 @@
 
 	@BeforeClass
 	public static void setup(){
-		system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
+		system = new RobustActorSystem("TestingActorSystem", TestingUtils.testConfig());
 	}
 
 	@AfterClass
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index 703cd0bf085..e5e845b0ded 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -47,6 +47,7 @@
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.actor.RobustActorSystem;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import akka.util.Timeout;
@@ -78,7 +79,7 @@
 
 	@BeforeClass
 	public static void setup() throws Exception {
-		actorSystem = ActorSystem.create("TestingActorSystem");
+		actorSystem = new RobustActorSystem("TestingActorSystem", TestingUtils.getDefaultTestingActorSystemConfig());
 		testingServer = new TestingServer();
 	}
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index ce750d3b067..59eb1df4880 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.recovery;
 
+import akka.actor.RobustActorSystem;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
@@ -349,7 +350,7 @@ private void testCheckpointedStreamingProgram(AbstractStateBackend stateBackend)
 		final int sequenceEnd = 5000;
 		final long expectedSum = Parallelism * sequenceEnd * (sequenceEnd + 1) / 2;
 
-		final ActorSystem system = ActorSystem.create("Test", AkkaUtils.getDefaultAkkaConfig());
+		final ActorSystem system = new RobustActorSystem("Test", AkkaUtils.getDefaultAkkaConfig());
 		final TestingServer testingServer = new TestingServer();
 		final TemporaryFolder temporaryFolder = new TemporaryFolder();
 		temporaryFolder.create();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
index aa2f38d2619..a35db05a37e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
@@ -31,6 +31,7 @@
 import org.apache.flink.util.TestLogger;
 
 import akka.actor.ActorSystem;
+import akka.actor.RobustActorSystem;
 import akka.testkit.JavaTestKit;
 import org.junit.Test;
 
@@ -64,7 +65,7 @@
 	@Test
 	public void testLocalFlinkMiniClusterWithMultipleTaskManagers() {
 
-		final ActorSystem system = ActorSystem.create("Testkit", AkkaUtils.getDefaultAkkaConfig());
+		final ActorSystem system = new RobustActorSystem("Testkit", AkkaUtils.getDefaultAkkaConfig());
 		LocalFlinkMiniCluster miniCluster = null;
 
 		final int numTMs = 3;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services