You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/10 15:33:41 UTC
[2/4] flink git commit: [FLINK-5759] [jobmanager] Set
UncaughtExceptionHandlers for JobManager's Future and I/O thread pools
[FLINK-5759] [jobmanager] Set UncaughtExceptionHandlers for JobManager's Future and I/O thread pools
This closes #3290
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef77c254
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef77c254
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef77c254
Branch: refs/heads/master
Commit: ef77c254dadbe4c04810681fe765f5ec7d2a7400
Parents: 6630513
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Feb 9 14:04:17 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 10 16:28:30 2017 +0100
----------------------------------------------------------------------
.../MesosApplicationMasterRunner.java | 10 +-
.../flink/runtime/filecache/FileCache.java | 3 +-
.../runtime/jobmaster/JobManagerServices.java | 6 +-
.../runtime/util/ExecutorThreadFactory.java | 123 ++++++++++++++-----
.../flink/runtime/util/NamedThreadFactory.java | 58 ---------
.../flink/runtime/jobmanager/JobManager.scala | 4 +-
.../runtime/minicluster/FlinkMiniCluster.scala | 10 +-
.../flink/yarn/YarnApplicationMasterRunner.java | 8 +-
8 files changed, 119 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 5033692..a23c9f6 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -22,10 +22,12 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Props;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
+
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -52,15 +54,17 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.process.ProcessReaper;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.runtime.util.NamedThreadFactory;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.webmonitor.WebMonitor;
import org.apache.mesos.Protos;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
@@ -216,11 +220,11 @@ public class MesosApplicationMasterRunner {
futureExecutor = Executors.newScheduledThreadPool(
numberProcessors,
- new NamedThreadFactory("mesos-jobmanager-future-", "-thread-"));
+ new ExecutorThreadFactory("mesos-jobmanager-future"));
ioExecutor = Executors.newFixedThreadPool(
numberProcessors,
- new NamedThreadFactory("mesos-jobmanager-io-", "-thread-"));
+ new ExecutorThreadFactory("mesos-jobmanager-io"));
mesosServices = MesosServicesUtils.createMesosServices(config);
http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index 21456de..4f2166f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -99,7 +99,8 @@ public class FileCache {
this.shutdownHook = createShutdownHook(this, LOG);
this.entries = new HashMap<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>>();
- this.executorService = Executors.newScheduledThreadPool(10, ExecutorThreadFactory.INSTANCE);
+ this.executorService = Executors.newScheduledThreadPool(10,
+ new ExecutorThreadFactory("flink-file-cache"));
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
index 95500e5..8cda0f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
@@ -120,8 +120,12 @@ public class JobManagerServices {
throw new IllegalConfigurationException(AkkaUtils.formatDurationParingErrorMessage());
}
+ final ScheduledExecutorService futureExecutor = Executors.newScheduledThreadPool(
+ Hardware.getNumberCPUCores(),
+ new ExecutorThreadFactory("jobmanager-future"));
+
return new JobManagerServices(
- Executors.newScheduledThreadPool(Hardware.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE),
+ futureExecutor,
libraryCacheManager,
RestartStrategyFactory.createRestartStrategyFactory(config),
Time.of(timeout.length(), timeout.unit()));
http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
index 2fb5972..4a79db3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
@@ -18,49 +18,114 @@
package org.apache.flink.runtime.util;
+import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A thread factory intended for use by critical thread pools. Critical thread pools here
+ * mean thread pools that support Flink's core coordination and processing work, and which
+ * must not simply cause unnoticed errors.
+ *
+ * <p>The thread factory can be given an {@link UncaughtExceptionHandler} for the threads.
+ * If no handler is explicitly given, the default handler for uncaught exceptions will log
+ * the exceptions and kill the process afterwards. That guarantees that critical exceptions are
+ * not accidentally lost and leave the system running in an inconsistent state.
+ *
+ * <p>Threads created by this factory are all called '(pool-name)-thread-n', where
+ * <i>(pool-name)</i> is configurable, and <i>n</i> is an incrementing number.
+ *
+ * <p>All threads created by this factory are daemon threads and have the default (normal)
+ * priority.
+ */
public class ExecutorThreadFactory implements ThreadFactory {
-
- private static final Logger LOG = LoggerFactory.getLogger(ExecutorThreadFactory.class);
-
-
- private static final String THREAD_NAME_PREFIX = "Flink Executor Thread - ";
-
- private static final AtomicInteger COUNTER = new AtomicInteger(1);
-
- private static final ThreadGroup THREAD_GROUP = new ThreadGroup("Flink Executor Threads");
-
- private static final Thread.UncaughtExceptionHandler EXCEPTION_HANDLER = new LoggingExceptionHander();
-
-
- public static final ExecutorThreadFactory INSTANCE = new ExecutorThreadFactory();
-
- // --------------------------------------------------------------------------------------------
-
- private ExecutorThreadFactory() {}
-
-
- public Thread newThread(Runnable target) {
- Thread t = new Thread(THREAD_GROUP, target, THREAD_NAME_PREFIX + COUNTER.getAndIncrement());
+
+ /** The thread pool name used when no explicit pool name has been specified */
+ private static final String DEFAULT_POOL_NAME = "flink-executor-pool";
+
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+
+ private final ThreadGroup group;
+
+ private final String namePrefix;
+
+ private final UncaughtExceptionHandler exceptionHandler;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a new thread factory using the default thread pool name ('flink-executor-pool')
+ * and the default uncaught exception handler (log exception and kill process).
+ */
+ public ExecutorThreadFactory() {
+ this(DEFAULT_POOL_NAME);
+ }
+
+ /**
+ * Creates a new thread factory using the given thread pool name and the default
+ * uncaught exception handler (log exception and kill process).
+ *
+ * @param poolName The pool name, used as the threads' name prefix
+ */
+ public ExecutorThreadFactory(String poolName) {
+ this(poolName, FatalExitExceptionHandler.INSTANCE);
+ }
+
+ /**
+ * Creates a new thread factory using the given thread pool name and the given
+ * uncaught exception handler.
+ *
+ * @param poolName The pool name, used as the threads' name prefix
+ * @param exceptionHandler The uncaught exception handler for the threads
+ */
+ public ExecutorThreadFactory(String poolName, UncaughtExceptionHandler exceptionHandler) {
+ checkNotNull(poolName, "poolName");
+
+ SecurityManager securityManager = System.getSecurityManager();
+ this.group = (securityManager != null) ? securityManager.getThreadGroup() :
+ Thread.currentThread().getThreadGroup();
+
+ this.namePrefix = poolName + "-thread-";
+ this.exceptionHandler = exceptionHandler;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public Thread newThread(Runnable runnable) {
+ Thread t = new Thread(group, runnable, namePrefix + threadNumber.getAndIncrement());
t.setDaemon(true);
- t.setUncaughtExceptionHandler(EXCEPTION_HANDLER);
+
+ // normalize the priority
+ if (t.getPriority() != Thread.NORM_PRIORITY) {
+ t.setPriority(Thread.NORM_PRIORITY);
+ }
+
+ // optional handler for uncaught exceptions
+ if (exceptionHandler != null) {
+ t.setUncaughtExceptionHandler(exceptionHandler);
+ }
+
return t;
}
-
+
// --------------------------------------------------------------------------------------------
-
- private static final class LoggingExceptionHander implements Thread.UncaughtExceptionHandler {
+
+ private static final class FatalExitExceptionHandler implements UncaughtExceptionHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FatalExitExceptionHandler.class);
+
+ static final FatalExitExceptionHandler INSTANCE = new FatalExitExceptionHandler();
@Override
public void uncaughtException(Thread t, Throwable e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Thread '" + t.getName() + "' produced an uncaught exception.", e);
- }
+ LOG.error("FATAL: Thread '" + t.getName() + "' produced an uncaught exception. Stopping the process...", e);
+ System.exit(-17);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
deleted file mode 100644
index bd97963..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Thread factory which allows to specify a thread pool name and a thread name.
- *
- * The code is based on {@link java.util.concurrent.Executors.DefaultThreadFactory}.
- */
-public class NamedThreadFactory implements ThreadFactory {
- private static final AtomicInteger poolNumber = new AtomicInteger(1);
- private final ThreadGroup group;
- private final AtomicInteger threadNumber = new AtomicInteger(1);
- private final String namePrefix;
-
- public NamedThreadFactory(final String poolName, final String threadName) {
- SecurityManager securityManager = System.getSecurityManager();
- group = (securityManager != null) ? securityManager.getThreadGroup() :
- Thread.currentThread().getThreadGroup();
-
- namePrefix = poolName +
- poolNumber.getAndIncrement() +
- threadName;
- }
-
- @Override
- public Thread newThread(Runnable runnable) {
- Thread t = new Thread(group, runnable,
- namePrefix + threadNumber.getAndIncrement(),
- 0);
- if (t.isDaemon()) {
- t.setDaemon(false);
- }
- if (t.getPriority() != Thread.NORM_PRIORITY) {
- t.setPriority(Thread.NORM_PRIORITY);
- }
- return t;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d575f68..a335916 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2023,11 +2023,11 @@ object JobManager {
val futureExecutor = Executors.newScheduledThreadPool(
numberProcessors,
- new NamedThreadFactory("jobmanager-future-", "-thread-"))
+ new ExecutorThreadFactory("jobmanager-future"))
val ioExecutor = Executors.newFixedThreadPool(
numberProcessors,
- new NamedThreadFactory("jobmanager-io-", "-thread-"))
+ new ExecutorThreadFactory("jobmanager-io"))
val timeout = AkkaUtils.getTimeout(configuration)
http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 64cc97d..07fb996 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.minicluster
-import java.net.InetAddress
import java.util.UUID
import java.util.concurrent.{Executors, TimeUnit}
@@ -26,7 +25,7 @@ import akka.pattern.Patterns.gracefulStop
import akka.pattern.ask
import akka.actor.{ActorRef, ActorSystem}
import com.typesafe.config.Config
-import org.apache.flink.api.common.time.Time
+
import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.akka.AkkaUtils
@@ -37,8 +36,9 @@ import org.apache.flink.runtime.jobgraph.JobGraph
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode
import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService, StandaloneLeaderRetrievalService}
import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
-import org.apache.flink.runtime.util.{Hardware, NamedThreadFactory, ZooKeeperUtils}
+import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware, ZooKeeperUtils}
import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
+
import org.slf4j.LoggerFactory
import scala.concurrent.duration.{Duration, FiniteDuration}
@@ -109,11 +109,11 @@ abstract class FlinkMiniCluster(
val futureExecutor = Executors.newScheduledThreadPool(
Hardware.getNumberCPUCores(),
- new NamedThreadFactory("mini-cluster-future-", "-thread"))
+ new ExecutorThreadFactory("mini-cluster-future"))
val ioExecutor = Executors.newFixedThreadPool(
Hardware.getNumberCPUCores(),
- new NamedThreadFactory("mini-cluster-io-", "-thread"))
+ new ExecutorThreadFactory("mini-cluster-io"))
def configuration: Configuration = {
if (originalConfiguration.getInteger(
http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 29f1827..5cc51e4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -38,14 +38,14 @@ import org.apache.flink.runtime.process.ProcessReaper;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.runtime.util.NamedThreadFactory;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.webmonitor.WebMonitor;
-
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -230,11 +230,11 @@ public class YarnApplicationMasterRunner {
final ScheduledExecutorService futureExecutor = Executors.newScheduledThreadPool(
numberProcessors,
- new NamedThreadFactory("yarn-jobmanager-future-", "-thread-"));
+ new ExecutorThreadFactory("yarn-jobmanager-future"));
final ExecutorService ioExecutor = Executors.newFixedThreadPool(
numberProcessors,
- new NamedThreadFactory("yarn-jobmanager-io-", "-thread-"));
+ new ExecutorThreadFactory("yarn-jobmanager-io"));
try {
// ------- (1) load and parse / validate all configurations -------