You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/19 20:26:10 UTC
flink git commit: [FLINK-4625] [core] Add a safety net to forcibly
terminate JVM is clean shutdown freezed.
Repository: flink
Updated Branches:
refs/heads/master 4b1a9c72e -> 5066125f9
[FLINK-4625] [core] Add a safety net to forcibly terminate JVM is clean shutdown freezed.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5066125f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5066125f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5066125f
Branch: refs/heads/master
Commit: 5066125f9a377d232f77f6fbcac3c22ebea66b39
Parents: 4b1a9c7
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Sep 15 19:27:06 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 19 20:00:53 2016 +0200
----------------------------------------------------------------------
.../MesosApplicationMasterRunner.java | 2 +
.../MesosTaskManagerRunner.java | 5 +-
.../runtime/util/JvmShutdownSafeguard.java | 126 ++++++++++
.../flink/runtime/jobmanager/JobManager.scala | 1 +
.../flink/runtime/taskmanager/TaskManager.scala | 1 +
.../runtime/testutils/CommonTestUtils.java | 10 +-
.../flink/runtime/testutils/TestJvmProcess.java | 163 +++++++++----
.../runtime/util/BlockingShutdownTest.java | 229 +++++++++++++++++++
.../cassandra/CassandraConnectorITCase.java | 19 +-
.../flink/core/testutils/CommonTestUtils.java | 26 +++
.../flink/yarn/TestingApplicationMaster.java | 2 +
.../flink/yarn/YarnApplicationMasterRunner.java | 2 +
.../flink/yarn/YarnTaskManagerRunner.java | 5 +-
13 files changed, 525 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 9916a87..8fb6af4 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.process.ProcessReaper;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.webmonitor.WebMonitor;
@@ -109,6 +110,7 @@ public class MesosApplicationMasterRunner {
public static void main(String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "Mesos AppMaster", args);
SignalHandler.register(LOG);
+ JvmShutdownSafeguard.installAsShutdownHook(LOG);
// run and exit with the proper return code
int returnCode = new MesosApplicationMasterRunner().run(args);
http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
index 94a9e99..ddc2097 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -29,6 +29,8 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@@ -49,7 +51,8 @@ public class MesosTaskManagerRunner {
public static void runTaskManager(String[] args, final Class<? extends TaskManager> taskManager) throws IOException {
EnvironmentInformation.logEnvironmentInfo(LOG, taskManager.getSimpleName(), args);
- org.apache.flink.runtime.util.SignalHandler.register(LOG);
+ SignalHandler.register(LOG);
+ JvmShutdownSafeguard.installAsShutdownHook(LOG);
// try to parse the command line arguments
final Configuration configuration;
http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmShutdownSafeguard.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmShutdownSafeguard.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmShutdownSafeguard.java
new file mode 100644
index 0000000..e8e378e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmShutdownSafeguard.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.slf4j.Logger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A utility that guards against blocking shutdown hooks that block JVM shutdown.
+ *
+ * <p>When the JVM shuts down cleanly (<i>SIGTERM</i> or {@link System#exit(int)}) it runs
+ * all installed shutdown hooks. It is possible that any of the shutdown hooks blocks,
+ * which causes the JVM to get stuck and not exit at all.
+ *
+ * <p>This utility installs a shutdown hook that forcibly terminates the JVM if it is still alive
+ * a certain time after clean shutdown was initiated. Even if some shutdown hooks block, the JVM will
+ * terminate within a certain time.
+ */
+public class JvmShutdownSafeguard extends Thread {
+
+ /** Default delay to wait after clean shutdown was stared, before forcibly terminating the JVM */
+ private static final long DEFAULT_DELAY = 5000L;
+
+ /** The exit code returned by the JVM process if it is killed by the safeguard */
+ private static final int EXIT_CODE = -17;
+
+ /** The thread that actually does the termination */
+ private final Thread terminator;
+
+ private JvmShutdownSafeguard(long delayMillis) {
+ setName("JVM Terminator Launcher");
+
+ this.terminator = new Thread(new DelayedTerminator(delayMillis), "Jvm Terminator");
+ this.terminator.setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ // Because this thread is registered as a shutdown hook, we cannot
+ // wait here and then call for termination. That would always delay the JVM shutdown.
+ // Instead, we spawn a non shutdown hook thread from here.
+ // That thread is a daemon, so it does not keep the JVM alive.
+ terminator.start();
+ }
+
+ // ------------------------------------------------------------------------
+ // The actual Shutdown thread
+ // ------------------------------------------------------------------------
+
+ private static class DelayedTerminator implements Runnable {
+
+ private final long delayMillis;
+
+ private DelayedTerminator(long delayMillis) {
+ this.delayMillis = delayMillis;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(delayMillis);
+ }
+ catch (Throwable t) {
+ // catch all, including thread death, etc
+ }
+
+ Runtime.getRuntime().halt(EXIT_CODE);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Installing as a shutdown hook
+ // ------------------------------------------------------------------------
+
+ /**
+ * Installs the safeguard shutdown hook. The maximum time that the JVM is allowed to spend
+ * on shutdown before being killed is five seconds.
+ *
+ * @param logger The logger to log errors to.
+ */
+ public static void installAsShutdownHook(Logger logger) {
+ installAsShutdownHook(logger, DEFAULT_DELAY);
+ }
+
+ /**
+ * Installs the safeguard shutdown hook. The maximum time that the JVM is allowed to spend
+ * on shutdown before being killed is the given number of milliseconds.
+ *
+ * @param logger The logger to log errors to.
+ * @param delayMillis The delay (in milliseconds) to wait after clean shutdown was stared,
+ * before forcibly terminating the JVM.
+ */
+ public static void installAsShutdownHook(Logger logger, long delayMillis) {
+ checkArgument(delayMillis >= 0, "delay must be >= 0");
+
+ // install the blocking shutdown hook
+ Thread shutdownHook = new JvmShutdownSafeguard(delayMillis);
+ try {
+ // Add JVM shutdown hook to call shutdown of service
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
+ }
+ catch (IllegalStateException ignored) {
+ // JVM is already shutting down. No need to do this.
+ }
+ catch (Throwable t) {
+ logger.error("Cannot install JVM Shutdown Safeguard against blocked shutdown hooks");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 1c68874..9c844ba 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2019,6 +2019,7 @@ object JobManager {
// startup checks and logging
EnvironmentInformation.logEnvironmentInfo(LOG.logger, "JobManager", args)
SignalHandler.register(LOG.logger)
+ JvmShutdownSafeguard.installAsShutdownHook(LOG.logger)
// parsing the command line arguments
val (configuration: Configuration,
http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index c882631..63a64a0 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1497,6 +1497,7 @@ object TaskManager {
// startup checks and logging
EnvironmentInformation.logEnvironmentInfo(LOG.logger, "TaskManager", args)
SignalHandler.register(LOG.logger)
+ JvmShutdownSafeguard.installAsShutdownHook(LOG.logger)
val maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit()
if (maxOpenFileHandles != -1) {
http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
index 59c37b7..2a787f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
@@ -71,8 +71,8 @@ public class CommonTestUtils {
* Create a temporary log4j configuration for the test.
*/
public static File createTemporaryLog4JProperties() throws IOException {
- File log4jProps = File.createTempFile(FileUtils.getRandomFilename(""), "-log4j" +
- ".properties");
+ File log4jProps = File.createTempFile(
+ FileUtils.getRandomFilename(""), "-log4j.properties");
log4jProps.deleteOnExit();
CommonTestUtils.printLog4jDebugConfig(log4jProps);
@@ -137,9 +137,7 @@ public class CommonTestUtils {
}
public static void printLog4jDebugConfig(File file) throws IOException {
- try (FileWriter fw = new FileWriter(file)) {
- PrintWriter writer = new PrintWriter(fw);
-
+ try (PrintWriter writer = new PrintWriter(new FileWriter(file))) {
writer.println("log4j.rootLogger=DEBUG, console");
writer.println("log4j.appender.console=org.apache.log4j.ConsoleAppender");
writer.println("log4j.appender.console.target = System.err");
@@ -147,9 +145,7 @@ public class CommonTestUtils {
writer.println("log4j.appender.console.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n");
writer.println("log4j.logger.org.eclipse.jetty.util.log=OFF");
writer.println("log4j.logger.org.apache.zookeeper=OFF");
-
writer.flush();
- writer.close();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
index 73a0088..5954ee5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
@@ -26,6 +26,8 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.StringWriter;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.util.Arrays;
import static org.apache.flink.runtime.testutils.CommonTestUtils.createTemporaryLog4JProperties;
@@ -33,6 +35,7 @@ import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClass
import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assert.fail;
/**
@@ -58,11 +61,15 @@ public abstract class TestJvmProcess {
private int jvmMemoryInMb = 80;
/** The JVM process */
- private Process process;
+ private volatile Process process;
/** Writer for the process output */
private volatile StringWriter processOutput;
+ /** flag to mark the process as already destroyed */
+ private volatile boolean destroyed;
+
+
public TestJvmProcess() throws Exception {
this(getJavaCommandPath(), createTemporaryLog4JProperties().getPath());
}
@@ -111,7 +118,9 @@ public abstract class TestJvmProcess {
* @param jvmMemoryInMb Amount of memory in Megabytes for the JVM (>= 80).
*/
public void setJVMMemory(int jvmMemoryInMb) {
- checkArgument(jvmMemoryInMb >= 80, "JobManager JVM Requires at least 80 MBs of memory.");
+ checkArgument(jvmMemoryInMb >= 80, "Process JVM Requires at least 80 MBs of memory.");
+ checkState(process == null, "Cannot set memory after process was started");
+
this.jvmMemoryInMb = jvmMemoryInMb;
}
@@ -139,35 +148,30 @@ public abstract class TestJvmProcess {
}
synchronized (createDestroyLock) {
- if (process == null) {
- LOG.debug("Running command '{}'.", Arrays.toString(cmd));
- this.process = new ProcessBuilder(cmd).start();
+ checkState(process == null, "process already started");
- // Forward output
- this.processOutput = new StringWriter();
- new CommonTestUtils.PipeForwarder(process.getErrorStream(), processOutput);
+ LOG.debug("Running command '{}'.", Arrays.toString(cmd));
+ this.process = new ProcessBuilder(cmd).start();
- try {
- // Add JVM shutdown hook to call shutdown of service
- Runtime.getRuntime().addShutdownHook(shutdownHook);
- }
- catch (IllegalStateException ignored) {
- // JVM is already shutting down. No need to do this.
- }
- catch (Throwable t) {
- LOG.error("Cannot register process cleanup shutdown hook.", t);
- }
+ // Forward output
+ this.processOutput = new StringWriter();
+ new CommonTestUtils.PipeForwarder(process.getErrorStream(), processOutput);
+
+ try {
+ // Add JVM shutdown hook to call shutdown of service
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
}
- else {
- throw new IllegalStateException("Already running.");
+ catch (IllegalStateException ignored) {
+ // JVM is already shutting down. No need to do this.
+ }
+ catch (Throwable t) {
+ LOG.error("Cannot register process cleanup shutdown hook.", t);
}
}
}
public void printProcessLog() {
- if (processOutput == null) {
- throw new IllegalStateException("Not started");
- }
+ checkState(processOutput != null, "not started");
System.out.println("-----------------------------------------");
System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + getName());
@@ -188,28 +192,53 @@ public abstract class TestJvmProcess {
public void destroy() {
synchronized (createDestroyLock) {
- if (process != null) {
- LOG.debug("Destroying " + getName() + " process.");
+ checkState(process != null, "process not started");
+
+ if (destroyed) {
+ // already done
+ return;
+ }
+
+ LOG.info("Destroying " + getName() + " process.");
+ try {
+ // try to call "destroyForcibly()" on Java 8
+ boolean destroyed = false;
try {
- process.destroy();
+ Method m = process.getClass().getMethod("destroyForcibly");
+ m.setAccessible(true);
+ m.invoke(process);
+ destroyed = true;
+ }
+ catch (NoSuchMethodException ignored) {
+ // happens on Java 7
}
catch (Throwable t) {
- LOG.error("Error while trying to destroy process.", t);
+ LOG.error("Failed to forcibly destroy process", t);
+ }
+
+ // if it was not destroyed, call the regular destroy method
+ if (!destroyed) {
+ try {
+ process.destroy();
+ }
+ catch (Throwable t) {
+ LOG.error("Error while trying to destroy process.", t);
+ }
}
- finally {
- process = null;
-
- if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
- try {
- Runtime.getRuntime().removeShutdownHook(shutdownHook);
- }
- catch (IllegalStateException ignored) {
- // JVM is in shutdown already, we can safely ignore this.
- }
- catch (Throwable t) {
- LOG.warn("Exception while unregistering prcess cleanup shutdown hook.");
- }
+ }
+ finally {
+ destroyed = true;
+
+ if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
+ try {
+ Runtime.getRuntime().removeShutdownHook(shutdownHook);
+ }
+ catch (IllegalStateException ignored) {
+ // JVM is in shutdown already, we can safely ignore this.
+ }
+ catch (Throwable t) {
+ LOG.warn("Exception while unregistering process cleanup shutdown hook.");
}
}
}
@@ -225,6 +254,47 @@ public abstract class TestJvmProcess {
}
}
+ /**
+ * Gets the process ID, if possible. This method currently only work on UNIX-based
+ * operating systems. On others, it returns {@code -1}.
+ *
+ * @return The process ID, or -1, if the ID cannot be determined.
+ */
+ public long getProcessId() {
+ checkState(process != null, "process not started");
+
+ try {
+ Class<? extends Process> clazz = process.getClass();
+ if (clazz.getName().equals("java.lang.UNIXProcess")) {
+ Field pidField = clazz.getDeclaredField("pid");
+ pidField.setAccessible(true);
+ return pidField.getLong(process);
+ } else {
+ return -1;
+ }
+ }
+ catch (Throwable ignored) {
+ return -1;
+ }
+ }
+
+ public boolean isAlive() {
+ if (destroyed) {
+ return false;
+ } else {
+ try {
+ // the method throws an exception as long as the
+ // process is alive
+ process.exitValue();
+ return false;
+ }
+ catch (IllegalThreadStateException ignored) {
+ // thi
+ return true;
+ }
+ }
+ }
+
// ---------------------------------------------------------------------------------------------
// File based synchronization utilities
// ---------------------------------------------------------------------------------------------
@@ -238,6 +308,19 @@ public abstract class TestJvmProcess {
}
}
+ public static void waitForMarkerFile(File file, long timeoutMillis) throws InterruptedException {
+ final long deadline = System.nanoTime() + timeoutMillis * 1_000_000;
+
+ boolean exists;
+ while (!(exists = file.exists()) && System.nanoTime() < deadline) {
+ Thread.sleep(10);
+ }
+
+ if (!exists) {
+ fail("The marker file was not found within " + timeoutMillis + " msecs");
+ }
+ }
+
public static void waitForMarkerFiles(File basedir, String prefix, int num, long timeout) {
long now = System.currentTimeMillis();
final long deadline = now + timeout;
http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingShutdownTest.java
new file mode 100644
index 0000000..f22f42f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingShutdownTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.TestJvmProcess;
+import org.apache.flink.util.OperatingSystem;
+
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Test that verifies the behavior of blocking shutdown hooks and of the
+ * {@link JvmShutdownSafeguard} that guards against it.
+ */
+public class BlockingShutdownTest {
+
+ @Test
+ public void testProcessShutdownBlocking() throws Exception {
+ // this test works only on linux
+ assumeTrue(OperatingSystem.isLinux());
+
+ // this test leaves remaining processes if not executed with Java 8
+ CommonTestUtils.assumeJava8();
+
+ final File markerFile = new File(
+ EnvironmentInformation.getTemporaryFileDirectory(), UUID.randomUUID() + ".marker");
+
+ final BlockingShutdownProcess blockingProcess =
+ new BlockingShutdownProcess(markerFile.getAbsolutePath(), 0, false);
+
+ try {
+ blockingProcess.startProcess();
+ long pid = blockingProcess.getProcessId();
+ assertTrue("Cannot determine process ID", pid != -1);
+
+ // wait for the marker file to appear, which means the process is up properly
+ TestJvmProcess.waitForMarkerFile(markerFile, 30000);
+
+ // send it a regular kill command (SIG_TERM)
+ Process kill = Runtime.getRuntime().exec("kill " + pid);
+ kill.waitFor();
+ assertEquals("failed to send SIG_TERM to process", 0, kill.exitValue());
+
+ // minimal delay until the Java process object notices that the process is gone
+ // this will not let the test fail predictably if the process is actually in fact going away,
+ // but it would create frequent failures. Not ideal, but the best we can do without
+ // severely prolonging the test
+ Thread.sleep(50);
+
+ // the process should not go away by itself
+ assertTrue("Test broken, process shutdown blocking does not work", blockingProcess.isAlive());
+ }
+ finally {
+ blockingProcess.destroy();
+
+ //noinspection ResultOfMethodCallIgnored
+ markerFile.delete();
+ }
+ }
+
+ @Test
+ public void testProcessExitsDespiteBlockingShutdownHook() throws Exception {
+ // this test works only on linux
+ assumeTrue(OperatingSystem.isLinux());
+
+ final File markerFile = new File(
+ EnvironmentInformation.getTemporaryFileDirectory(), UUID.randomUUID() + ".marker");
+
+ final BlockingShutdownProcess blockingProcess =
+ new BlockingShutdownProcess(markerFile.getAbsolutePath(), 100, true);
+
+ try {
+ blockingProcess.startProcess();
+ long pid = blockingProcess.getProcessId();
+ assertTrue("Cannot determine process ID", pid != -1);
+
+ // wait for the marker file to appear, which means the process is up properly
+ TestJvmProcess.waitForMarkerFile(markerFile, 30000);
+
+ // send it a regular kill command (SIG_TERM)
+ Process kill = Runtime.getRuntime().exec("kill " + pid);
+ kill.waitFor();
+ assertEquals("failed to send SIG_TERM to process", 0, kill.exitValue());
+
+ // the process should eventually go away
+ final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs in nanos
+ while (blockingProcess.isAlive() && System.nanoTime() < deadline) {
+ Thread.sleep(50);
+ }
+
+ assertFalse("shutdown blocking process does not properly terminate itself", blockingProcess.isAlive());
+ }
+ finally {
+ blockingProcess.destroy();
+
+ //noinspection ResultOfMethodCallIgnored
+ markerFile.delete();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ // a method that blocks indefinitely
+ static void parkForever() {
+ // park this forever
+ final Object lock = new Object();
+ //noinspection InfiniteLoopStatement
+ while (true) {
+ try {
+ //noinspection SynchronizationOnLocalVariableOrMethodParameter
+ synchronized (lock) {
+ lock.wait();
+ }
+ } catch (InterruptedException ignored) {}
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Blocking Process Implementation
+ // ------------------------------------------------------------------------
+
+ private static final class BlockingShutdownProcess extends TestJvmProcess {
+
+ private final String tempFilePath;
+ private final long selfKillDelay;
+ private final boolean installSignalHandler;
+
+ public BlockingShutdownProcess(String tempFilePath, long selfKillDelay, boolean installSignalHandler)
+ throws Exception {
+
+ this.tempFilePath = tempFilePath;
+ this.selfKillDelay = selfKillDelay;
+ this.installSignalHandler = installSignalHandler;
+ }
+
+ @Override
+ public String getName() {
+ return "BlockingShutdownProcess";
+ }
+
+ @Override
+ public String[] getJvmArgs() {
+ return new String[] { tempFilePath, String.valueOf(installSignalHandler), String.valueOf(selfKillDelay) };
+ }
+
+ @Override
+ public String getEntryPointClassName() {
+ return BlockingShutdownProcessEntryPoint.class.getName();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ public static final class BlockingShutdownProcessEntryPoint {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BlockingShutdownProcessEntryPoint.class);
+
+ public static void main(String[] args) throws Exception {
+ File touchFile = new File(args[0]);
+ boolean installHandler = Boolean.parseBoolean(args[1]);
+ long killDelay = Long.parseLong(args[2]);
+
+ // install the blocking shutdown hook
+ Thread shutdownHook = new Thread(new BlockingRunnable(), "Blocking ShutdownHook");
+ try {
+ // Add JVM shutdown hook to call shutdown of service
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
+ }
+ catch (IllegalStateException ignored) {
+ // JVM is already shutting down. No need to do this.
+ }
+ catch (Throwable t) {
+ System.err.println("Cannot register process cleanup shutdown hook.");
+ t.printStackTrace();
+ }
+
+ // install the jvm terminator, if we want it
+ if (installHandler) {
+ JvmShutdownSafeguard.installAsShutdownHook(LOG, killDelay);
+ }
+
+ System.err.println("signaling process started");
+ TestJvmProcess.touchFile(touchFile);
+
+ System.err.println("parking the main thread");
+ parkForever();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ static final class BlockingRunnable implements Runnable {
+
+ @Override
+ public void run() {
+ System.err.println("starting shutdown hook");
+ parkForever();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index cc4a527..d58b0af 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -47,18 +47,16 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
-
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.TestEnvironment;
+
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
-import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.junit.internal.AssumptionViolatedException;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -144,20 +142,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
public static void startCassandra() throws IOException {
// check if we should run this test, current Cassandra version requires Java >= 1.8
- try {
- String javaVersionString = System.getProperty("java.runtime.version").substring(0, 3);
- float javaVersion = Float.parseFloat(javaVersionString);
- Assume.assumeTrue(javaVersion >= 1.8f);
- }
- catch (AssumptionViolatedException e) {
- System.out.println("Skipping CassandraConnectorITCase, because the JDK is < Java 8+");
- throw e;
- }
- catch (Exception e) {
- LOG.error("Cannot determine Java version", e);
- e.printStackTrace();
- fail("Cannot determine Java version");
- }
+ org.apache.flink.core.testutils.CommonTestUtils.assumeJava8();
// generate temporary files
tmpDir = CommonTestUtils.createTempDirectory();
http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
index a50a106..d318a3c 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
@@ -18,6 +18,9 @@
package org.apache.flink.core.testutils;
+import org.junit.Assume;
+import org.junit.internal.AssumptionViolatedException;
+
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -27,6 +30,8 @@ import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import static org.junit.Assert.fail;
+
/**
* This class contains reusable utility methods for unit tests.
*/
@@ -88,4 +93,25 @@ public class CommonTestUtils {
}
return f.toURI().toString();
}
+
+ /**
+ * Checks whether this code runs in a Java 8 (Java 1.8) JVM. If not, this throws a
+ * {@link AssumptionViolatedException}, which causes JUnit to skip the test that
+ * called this method.
+ */
+ public static void assumeJava8() {
+ try {
+ String javaVersionString = System.getProperty("java.runtime.version").substring(0, 3);
+ float javaVersion = Float.parseFloat(javaVersionString);
+ Assume.assumeTrue(javaVersion >= 1.8f);
+ }
+ catch (AssumptionViolatedException e) {
+ System.out.println("Skipping CassandraConnectorITCase, because the JDK is < Java 8+");
+ throw e;
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail("Cannot determine Java version: " + e.getMessage());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java
index b0757f5..785dff9 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
import org.apache.flink.runtime.testutils.TestingResourceManager;
import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
/**
@@ -55,6 +56,7 @@ public class TestingApplicationMaster extends YarnApplicationMasterRunner {
public static void main(String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / JobManager", args);
SignalHandler.register(LOG);
+ JvmShutdownSafeguard.installAsShutdownHook(LOG);
// run and exit with the proper return code
int returnCode = new TestingApplicationMaster().run(args);
http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 7453344..6619633 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.process.ProcessReaper;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.webmonitor.WebMonitor;
@@ -107,6 +108,7 @@ public class YarnApplicationMasterRunner {
public static void main(String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / JobManager", args);
SignalHandler.register(LOG);
+ JvmShutdownSafeguard.installAsShutdownHook(LOG);
// run and exit with the proper return code
int returnCode = new YarnApplicationMasterRunner().run(args);
http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
index 6839bb5..9638137 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
@@ -28,6 +28,8 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@@ -46,7 +48,8 @@ public class YarnTaskManagerRunner {
public static void runYarnTaskManager(String[] args, final Class<? extends YarnTaskManager> taskManager) throws IOException {
EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskManager", args);
- org.apache.flink.runtime.util.SignalHandler.register(LOG);
+ SignalHandler.register(LOG);
+ JvmShutdownSafeguard.installAsShutdownHook(LOG);
// try to parse the command line arguments
final Configuration configuration;