You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/10 15:33:41 UTC

[2/4] flink git commit: [FLINK-5759] [jobmanager] Set UncaughtExceptionHandlers for JobManager's Future and I/O thread pools

[FLINK-5759] [jobmanager] Set UncaughtExceptionHandlers for JobManager's Future and I/O thread pools

This closes #3290


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef77c254
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef77c254
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef77c254

Branch: refs/heads/master
Commit: ef77c254dadbe4c04810681fe765f5ec7d2a7400
Parents: 6630513
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Feb 9 14:04:17 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 10 16:28:30 2017 +0100

----------------------------------------------------------------------
 .../MesosApplicationMasterRunner.java           |  10 +-
 .../flink/runtime/filecache/FileCache.java      |   3 +-
 .../runtime/jobmaster/JobManagerServices.java   |   6 +-
 .../runtime/util/ExecutorThreadFactory.java     | 123 ++++++++++++++-----
 .../flink/runtime/util/NamedThreadFactory.java  |  58 ---------
 .../flink/runtime/jobmanager/JobManager.scala   |   4 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |  10 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |   8 +-
 8 files changed, 119 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 5033692..a23c9f6 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -22,10 +22,12 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Props;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
+
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -52,15 +54,17 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.runtime.util.NamedThreadFactory;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.apache.mesos.Protos;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -216,11 +220,11 @@ public class MesosApplicationMasterRunner {
 
 			futureExecutor = Executors.newScheduledThreadPool(
 				numberProcessors,
-				new NamedThreadFactory("mesos-jobmanager-future-", "-thread-"));
+				new ExecutorThreadFactory("mesos-jobmanager-future"));
 
 			ioExecutor = Executors.newFixedThreadPool(
 				numberProcessors,
-				new NamedThreadFactory("mesos-jobmanager-io-", "-thread-"));
+				new ExecutorThreadFactory("mesos-jobmanager-io"));
 
 			mesosServices = MesosServicesUtils.createMesosServices(config);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index 21456de..4f2166f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -99,7 +99,8 @@ public class FileCache {
 		this.shutdownHook = createShutdownHook(this, LOG);
 
 		this.entries = new HashMap<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>>();
-		this.executorService = Executors.newScheduledThreadPool(10, ExecutorThreadFactory.INSTANCE);
+		this.executorService = Executors.newScheduledThreadPool(10, 
+				new ExecutorThreadFactory("flink-file-cache"));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
index 95500e5..8cda0f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
@@ -120,8 +120,12 @@ public class JobManagerServices {
 			throw new IllegalConfigurationException(AkkaUtils.formatDurationParingErrorMessage());
 		}
 
+		final ScheduledExecutorService futureExecutor = Executors.newScheduledThreadPool(
+				Hardware.getNumberCPUCores(),
+				new ExecutorThreadFactory("jobmanager-future"));
+
 		return new JobManagerServices(
-			Executors.newScheduledThreadPool(Hardware.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE),
+			futureExecutor,
 			libraryCacheManager,
 			RestartStrategyFactory.createRestartStrategyFactory(config),
 			Time.of(timeout.length(), timeout.unit()));

http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
index 2fb5972..4a79db3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
@@ -18,49 +18,114 @@
 
 package org.apache.flink.runtime.util;
 
+import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A thread factory intended for use by critical thread pools. Critical thread pools here
+ * mean thread pools that support Flink's core coordination and processing work, and which
+ * must not simply cause unnoticed errors.
+ * 
+ * <p>The thread factory can be given an {@link UncaughtExceptionHandler} for the threads.
+ * If no handler is explicitly given, the default handler for uncaught exceptions will log
+ * the exceptions and kill the process afterwards. That guarantees that critical exceptions are
+ * not accidentally lost and leave the system running in an inconsistent state.
+ * 
+ * <p>Threads created by this factory are all called '(pool-name)-thread-n', where
+ * <i>(pool-name)</i> is configurable, and <i>n</i> is an incrementing number.
+ * 
+ * <p>All threads created by this factory are daemon threads and have the default (normal)
+ * priority.
+ */
 public class ExecutorThreadFactory implements ThreadFactory {
-	
-	private static final Logger LOG = LoggerFactory.getLogger(ExecutorThreadFactory.class);
-	
-	
-	private static final String THREAD_NAME_PREFIX = "Flink Executor Thread - ";
-	
-	private static final AtomicInteger COUNTER = new AtomicInteger(1);
-	
-	private static final ThreadGroup THREAD_GROUP = new ThreadGroup("Flink Executor Threads");
-	
-	private static final Thread.UncaughtExceptionHandler EXCEPTION_HANDLER = new LoggingExceptionHander();
-	
-	
-	public static final ExecutorThreadFactory INSTANCE = new ExecutorThreadFactory();
-	
-	// --------------------------------------------------------------------------------------------
-	
-	private ExecutorThreadFactory() {}
-	
-	
-	public Thread newThread(Runnable target) {
-		Thread t = new Thread(THREAD_GROUP, target, THREAD_NAME_PREFIX + COUNTER.getAndIncrement());
+
+	/** The thread pool name used when no explicit pool name has been specified */ 
+	private static final String DEFAULT_POOL_NAME = "flink-executor-pool";
+
+	private final AtomicInteger threadNumber = new AtomicInteger(1);
+
+	private final ThreadGroup group;
+
+	private final String namePrefix;
+
+	private final UncaughtExceptionHandler exceptionHandler;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new thread factory using the default thread pool name ('flink-executor-pool')
+	 * and the default uncaught exception handler (log exception and kill process).
+	 */
+	public ExecutorThreadFactory() {
+		this(DEFAULT_POOL_NAME);
+	}
+
+	/**
+	 * Creates a new thread factory using the given thread pool name and the default
+	 * uncaught exception handler (log exception and kill process).
+	 * 
+	 * @param poolName The pool name, used as the threads' name prefix
+	 */
+	public ExecutorThreadFactory(String poolName) {
+		this(poolName, FatalExitExceptionHandler.INSTANCE);
+	}
+
+	/**
+	 * Creates a new thread factory using the given thread pool name and the given
+	 * uncaught exception handler.
+	 * 
+	 * @param poolName The pool name, used as the threads' name prefix
+	 * @param exceptionHandler The uncaught exception handler for the threads
+	 */
+	public ExecutorThreadFactory(String poolName, UncaughtExceptionHandler exceptionHandler) {
+		checkNotNull(poolName, "poolName");
+
+		SecurityManager securityManager = System.getSecurityManager();
+		this.group = (securityManager != null) ? securityManager.getThreadGroup() :
+				Thread.currentThread().getThreadGroup();
+
+		this.namePrefix = poolName + "-thread-";
+		this.exceptionHandler = exceptionHandler;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public Thread newThread(Runnable runnable) {
+		Thread t = new Thread(group, runnable, namePrefix + threadNumber.getAndIncrement());
 		t.setDaemon(true);
-		t.setUncaughtExceptionHandler(EXCEPTION_HANDLER);
+
+		// normalize the priority
+		if (t.getPriority() != Thread.NORM_PRIORITY) {
+			t.setPriority(Thread.NORM_PRIORITY);
+		}
+
+		// optional handler for uncaught exceptions
+		if (exceptionHandler != null) {
+			t.setUncaughtExceptionHandler(exceptionHandler);
+		}
+
 		return t;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
-	private static final class LoggingExceptionHander implements Thread.UncaughtExceptionHandler {
+
+	private static final class FatalExitExceptionHandler implements UncaughtExceptionHandler {
+
+		private static final Logger LOG = LoggerFactory.getLogger(FatalExitExceptionHandler.class);
+
+		static final FatalExitExceptionHandler INSTANCE = new FatalExitExceptionHandler(); 
 
 		@Override
 		public void uncaughtException(Thread t, Throwable e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Thread '" + t.getName() + "' produced an uncaught exception.", e);
-			}
+			LOG.error("FATAL: Thread '" + t.getName() + "' produced an uncaught exception. Stopping the process...", e);
+			System.exit(-17);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
deleted file mode 100644
index bd97963..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Thread factory which allows to specify a thread pool name and a thread name.
- *
- * The code is based on {@link java.util.concurrent.Executors.DefaultThreadFactory}.
- */
-public class NamedThreadFactory implements ThreadFactory {
-	private static final AtomicInteger poolNumber = new AtomicInteger(1);
-	private final ThreadGroup group;
-	private final AtomicInteger threadNumber = new AtomicInteger(1);
-	private final String namePrefix;
-
-	public NamedThreadFactory(final String poolName, final String threadName) {
-		SecurityManager securityManager = System.getSecurityManager();
-		group = (securityManager != null) ? securityManager.getThreadGroup() :
-			Thread.currentThread().getThreadGroup();
-
-		namePrefix = poolName +
-			poolNumber.getAndIncrement() +
-			threadName;
-	}
-
-	@Override
-	public Thread newThread(Runnable runnable) {
-		Thread t = new Thread(group, runnable,
-			namePrefix + threadNumber.getAndIncrement(),
-			0);
-		if (t.isDaemon()) {
-			t.setDaemon(false);
-		}
-		if (t.getPriority() != Thread.NORM_PRIORITY) {
-			t.setPriority(Thread.NORM_PRIORITY);
-		}
-		return t;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d575f68..a335916 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2023,11 +2023,11 @@ object JobManager {
 
     val futureExecutor = Executors.newScheduledThreadPool(
       numberProcessors,
-      new NamedThreadFactory("jobmanager-future-", "-thread-"))
+      new ExecutorThreadFactory("jobmanager-future"))
 
     val ioExecutor = Executors.newFixedThreadPool(
       numberProcessors,
-      new NamedThreadFactory("jobmanager-io-", "-thread-"))
+      new ExecutorThreadFactory("jobmanager-io"))
 
     val timeout = AkkaUtils.getTimeout(configuration)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 64cc97d..07fb996 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.minicluster
 
-import java.net.InetAddress
 import java.util.UUID
 import java.util.concurrent.{Executors, TimeUnit}
 
@@ -26,7 +25,7 @@ import akka.pattern.Patterns.gracefulStop
 import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
 import com.typesafe.config.Config
-import org.apache.flink.api.common.time.Time
+
 import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
@@ -37,8 +36,9 @@ import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService, StandaloneLeaderRetrievalService}
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
-import org.apache.flink.runtime.util.{Hardware, NamedThreadFactory, ZooKeeperUtils}
+import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware, ZooKeeperUtils}
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
+
 import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration.{Duration, FiniteDuration}
@@ -109,11 +109,11 @@ abstract class FlinkMiniCluster(
 
   val futureExecutor = Executors.newScheduledThreadPool(
     Hardware.getNumberCPUCores(),
-    new NamedThreadFactory("mini-cluster-future-", "-thread"))
+    new ExecutorThreadFactory("mini-cluster-future"))
 
   val ioExecutor = Executors.newFixedThreadPool(
     Hardware.getNumberCPUCores(),
-    new NamedThreadFactory("mini-cluster-io-", "-thread"))
+    new ExecutorThreadFactory("mini-cluster-io"))
 
   def configuration: Configuration = {
     if (originalConfiguration.getInteger(

http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 29f1827..5cc51e4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -38,14 +38,14 @@ import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.runtime.util.NamedThreadFactory;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
-
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -230,11 +230,11 @@ public class YarnApplicationMasterRunner {
 
 		final ScheduledExecutorService futureExecutor = Executors.newScheduledThreadPool(
 			numberProcessors,
-			new NamedThreadFactory("yarn-jobmanager-future-", "-thread-"));
+			new ExecutorThreadFactory("yarn-jobmanager-future"));
 
 		final ExecutorService ioExecutor = Executors.newFixedThreadPool(
 			numberProcessors,
-			new NamedThreadFactory("yarn-jobmanager-io-", "-thread-"));
+			new ExecutorThreadFactory("yarn-jobmanager-io"));
 
 		try {
 			// ------- (1) load and parse / validate all configurations -------