You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/19 20:26:10 UTC

flink git commit: [FLINK-4625] [core] Add a safety net to forcibly terminate JVM is clean shutdown freezed.

Repository: flink
Updated Branches:
  refs/heads/master 4b1a9c72e -> 5066125f9


[FLINK-4625] [core] Add a safety net to forcibly terminate JVM is clean shutdown freezed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5066125f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5066125f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5066125f

Branch: refs/heads/master
Commit: 5066125f9a377d232f77f6fbcac3c22ebea66b39
Parents: 4b1a9c7
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Sep 15 19:27:06 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 19 20:00:53 2016 +0200

----------------------------------------------------------------------
 .../MesosApplicationMasterRunner.java           |   2 +
 .../MesosTaskManagerRunner.java                 |   5 +-
 .../runtime/util/JvmShutdownSafeguard.java      | 126 ++++++++++
 .../flink/runtime/jobmanager/JobManager.scala   |   1 +
 .../flink/runtime/taskmanager/TaskManager.scala |   1 +
 .../runtime/testutils/CommonTestUtils.java      |  10 +-
 .../flink/runtime/testutils/TestJvmProcess.java | 163 +++++++++----
 .../runtime/util/BlockingShutdownTest.java      | 229 +++++++++++++++++++
 .../cassandra/CassandraConnectorITCase.java     |  19 +-
 .../flink/core/testutils/CommonTestUtils.java   |  26 +++
 .../flink/yarn/TestingApplicationMaster.java    |   2 +
 .../flink/yarn/YarnApplicationMasterRunner.java |   2 +
 .../flink/yarn/YarnTaskManagerRunner.java       |   5 +-
 13 files changed, 525 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 9916a87..8fb6af4 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
@@ -109,6 +110,7 @@ public class MesosApplicationMasterRunner {
 	public static void main(String[] args) {
 		EnvironmentInformation.logEnvironmentInfo(LOG, "Mesos AppMaster", args);
 		SignalHandler.register(LOG);
+		JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
 		// run and exit with the proper return code
 		int returnCode = new MesosApplicationMasterRunner().run(args);

http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
index 94a9e99..ddc2097 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -29,6 +29,8 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -49,7 +51,8 @@ public class MesosTaskManagerRunner {
 
 	public static void runTaskManager(String[] args, final Class<? extends TaskManager> taskManager) throws IOException {
 		EnvironmentInformation.logEnvironmentInfo(LOG, taskManager.getSimpleName(), args);
-		org.apache.flink.runtime.util.SignalHandler.register(LOG);
+		SignalHandler.register(LOG);
+		JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
 		// try to parse the command line arguments
 		final Configuration configuration;

http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmShutdownSafeguard.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmShutdownSafeguard.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmShutdownSafeguard.java
new file mode 100644
index 0000000..e8e378e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmShutdownSafeguard.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.slf4j.Logger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A utility that guards against blocking shutdown hooks that block JVM shutdown.
+ * 
+ * <p>When the JVM shuts down cleanly (<i>SIGTERM</i> or {@link System#exit(int)}) it runs
+ * all installed shutdown hooks. It is possible that any of the shutdown hooks blocks,
+ * which causes the JVM to get stuck and not exit at all.
+ * 
+ * <p>This utility installs a shutdown hook that forcibly terminates the JVM if it is still alive
+ * a certain time after clean shutdown was initiated. Even if some shutdown hooks block, the JVM will
+ * terminate within a certain time.
+ */
+public class JvmShutdownSafeguard extends Thread {
+
+	/** Default delay to wait after clean shutdown was stared, before forcibly terminating the JVM */  
+	private static final long DEFAULT_DELAY = 5000L;
+
+	/** The exit code returned by the JVM process if it is killed by the safeguard */
+	private static final int EXIT_CODE = -17;
+
+	/** The thread that actually does the termination */
+	private final Thread terminator;
+	
+	private JvmShutdownSafeguard(long delayMillis) {
+		setName("JVM Terminator Launcher");
+
+		this.terminator = new Thread(new DelayedTerminator(delayMillis), "Jvm Terminator");
+		this.terminator.setDaemon(true);
+	}
+
+	@Override
+	public void run() {
+		// Because this thread is registered as a shutdown hook, we cannot
+		// wait here and then call for termination. That would always delay the JVM shutdown.
+		// Instead, we spawn a non shutdown hook thread from here. 
+		// That thread is a daemon, so it does not keep the JVM alive.
+		terminator.start();
+	}
+
+	// ------------------------------------------------------------------------
+	//  The actual Shutdown thread
+	// ------------------------------------------------------------------------
+
+	private static class DelayedTerminator implements Runnable {
+
+		private final long delayMillis;
+
+		private DelayedTerminator(long delayMillis) {
+			this.delayMillis = delayMillis;
+		}
+
+		@Override
+		public void run() {
+			try {
+				Thread.sleep(delayMillis);
+			}
+			catch (Throwable t) {
+				// catch all, including thread death, etc
+			}
+
+			Runtime.getRuntime().halt(EXIT_CODE);
+		}
+	} 
+
+	// ------------------------------------------------------------------------
+	//  Installing as a shutdown hook
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Installs the safeguard shutdown hook. The maximum time that the JVM is allowed to spend
+	 * on shutdown before being killed is five seconds.
+	 * 
+	 * @param logger The logger to log errors to.
+	 */
+	public static void installAsShutdownHook(Logger logger) {
+		installAsShutdownHook(logger, DEFAULT_DELAY);
+	}
+
+	/**
+	 * Installs the safeguard shutdown hook. The maximum time that the JVM is allowed to spend
+	 * on shutdown before being killed is the given number of milliseconds.
+	 * 
+	 * @param logger      The logger to log errors to.
+	 * @param delayMillis The delay (in milliseconds) to wait after clean shutdown was stared,
+	 *                    before forcibly terminating the JVM.
+	 */
+	public static void installAsShutdownHook(Logger logger, long delayMillis) {
+		checkArgument(delayMillis >= 0, "delay must be >= 0");
+
+		// install the blocking shutdown hook
+		Thread shutdownHook = new JvmShutdownSafeguard(delayMillis);
+		try {
+			// Add JVM shutdown hook to call shutdown of service
+			Runtime.getRuntime().addShutdownHook(shutdownHook);
+		}
+		catch (IllegalStateException ignored) {
+			// JVM is already shutting down. No need to do this.
+		}
+		catch (Throwable t) {
+			logger.error("Cannot install JVM Shutdown Safeguard against blocked shutdown hooks");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 1c68874..9c844ba 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2019,6 +2019,7 @@ object JobManager {
     // startup checks and logging
     EnvironmentInformation.logEnvironmentInfo(LOG.logger, "JobManager", args)
     SignalHandler.register(LOG.logger)
+    JvmShutdownSafeguard.installAsShutdownHook(LOG.logger)
 
     // parsing the command line arguments
     val (configuration: Configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index c882631..63a64a0 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1497,6 +1497,7 @@ object TaskManager {
     // startup checks and logging
     EnvironmentInformation.logEnvironmentInfo(LOG.logger, "TaskManager", args)
     SignalHandler.register(LOG.logger)
+    JvmShutdownSafeguard.installAsShutdownHook(LOG.logger)
 
     val maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit()
     if (maxOpenFileHandles != -1) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
index 59c37b7..2a787f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
@@ -71,8 +71,8 @@ public class CommonTestUtils {
 	 * Create a temporary log4j configuration for the test.
 	 */
 	public static File createTemporaryLog4JProperties() throws IOException {
-		File log4jProps = File.createTempFile(FileUtils.getRandomFilename(""), "-log4j" +
-				".properties");
+		File log4jProps = File.createTempFile(
+				FileUtils.getRandomFilename(""), "-log4j.properties");
 		log4jProps.deleteOnExit();
 		CommonTestUtils.printLog4jDebugConfig(log4jProps);
 
@@ -137,9 +137,7 @@ public class CommonTestUtils {
 	}
 
 	public static void printLog4jDebugConfig(File file) throws IOException {
-		try (FileWriter fw = new FileWriter(file)) {
-			PrintWriter writer = new PrintWriter(fw);
-
+		try (PrintWriter writer = new PrintWriter(new FileWriter(file))) {
 			writer.println("log4j.rootLogger=DEBUG, console");
 			writer.println("log4j.appender.console=org.apache.log4j.ConsoleAppender");
 			writer.println("log4j.appender.console.target = System.err");
@@ -147,9 +145,7 @@ public class CommonTestUtils {
 			writer.println("log4j.appender.console.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n");
 			writer.println("log4j.logger.org.eclipse.jetty.util.log=OFF");
 			writer.println("log4j.logger.org.apache.zookeeper=OFF");
-
 			writer.flush();
-			writer.close();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
index 73a0088..5954ee5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
@@ -26,6 +26,8 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.StringWriter;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.util.Arrays;
 
 import static org.apache.flink.runtime.testutils.CommonTestUtils.createTemporaryLog4JProperties;
@@ -33,6 +35,7 @@ import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClass
 import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 import static org.junit.Assert.fail;
 
 /**
@@ -58,11 +61,15 @@ public abstract class TestJvmProcess {
 	private int jvmMemoryInMb = 80;
 
 	/** The JVM process */
-	private Process process;
+	private volatile Process process;
 
 	/** Writer for the process output */
 	private volatile StringWriter processOutput;
 
+	/** flag to mark the process as already destroyed */
+	private volatile boolean destroyed;
+
+
 	public TestJvmProcess() throws Exception {
 		this(getJavaCommandPath(), createTemporaryLog4JProperties().getPath());
 	}
@@ -111,7 +118,9 @@ public abstract class TestJvmProcess {
 	 * @param jvmMemoryInMb Amount of memory in Megabytes for the JVM (>= 80).
 	 */
 	public void setJVMMemory(int jvmMemoryInMb) {
-		checkArgument(jvmMemoryInMb >= 80, "JobManager JVM Requires at least 80 MBs of memory.");
+		checkArgument(jvmMemoryInMb >= 80, "Process JVM Requires at least 80 MBs of memory.");
+		checkState(process == null, "Cannot set memory after process was started");
+
 		this.jvmMemoryInMb = jvmMemoryInMb;
 	}
 
@@ -139,35 +148,30 @@ public abstract class TestJvmProcess {
 		}
 
 		synchronized (createDestroyLock) {
-			if (process == null) {
-				LOG.debug("Running command '{}'.", Arrays.toString(cmd));
-				this.process = new ProcessBuilder(cmd).start();
+			checkState(process == null, "process already started");
 
-				// Forward output
-				this.processOutput = new StringWriter();
-				new CommonTestUtils.PipeForwarder(process.getErrorStream(), processOutput);
+			LOG.debug("Running command '{}'.", Arrays.toString(cmd));
+			this.process = new ProcessBuilder(cmd).start();
 
-				try {
-					// Add JVM shutdown hook to call shutdown of service
-					Runtime.getRuntime().addShutdownHook(shutdownHook);
-				}
-				catch (IllegalStateException ignored) {
-					// JVM is already shutting down. No need to do this.
-				}
-				catch (Throwable t) {
-					LOG.error("Cannot register process cleanup shutdown hook.", t);
-				}
+			// Forward output
+			this.processOutput = new StringWriter();
+			new CommonTestUtils.PipeForwarder(process.getErrorStream(), processOutput);
+
+			try {
+				// Add JVM shutdown hook to call shutdown of service
+				Runtime.getRuntime().addShutdownHook(shutdownHook);
 			}
-			else {
-				throw new IllegalStateException("Already running.");
+			catch (IllegalStateException ignored) {
+				// JVM is already shutting down. No need to do this.
+			}
+			catch (Throwable t) {
+				LOG.error("Cannot register process cleanup shutdown hook.", t);
 			}
 		}
 	}
 
 	public void printProcessLog() {
-		if (processOutput == null) {
-			throw new IllegalStateException("Not started");
-		}
+		checkState(processOutput != null, "not started");
 
 		System.out.println("-----------------------------------------");
 		System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + getName());
@@ -188,28 +192,53 @@ public abstract class TestJvmProcess {
 
 	public void destroy() {
 		synchronized (createDestroyLock) {
-			if (process != null) {
-				LOG.debug("Destroying " + getName() + " process.");
+			checkState(process != null, "process not started");
+
+			if (destroyed) {
+				// already done
+				return;
+			}
+
+			LOG.info("Destroying " + getName() + " process.");
 
+			try {
+				// try to call "destroyForcibly()" on Java 8
+				boolean destroyed = false;
 				try {
-					process.destroy();
+					Method m = process.getClass().getMethod("destroyForcibly");
+					m.setAccessible(true);
+					m.invoke(process);
+					destroyed = true;
+				}
+				catch (NoSuchMethodException ignored) {
+					// happens on Java 7
 				}
 				catch (Throwable t) {
-					LOG.error("Error while trying to destroy process.", t);
+					LOG.error("Failed to forcibly destroy process", t);
+				}
+
+				// if it was not destroyed, call the regular destroy method
+				if (!destroyed) {
+					try {
+						process.destroy();
+					}
+					catch (Throwable t) {
+						LOG.error("Error while trying to destroy process.", t);
+					}
 				}
-				finally {
-					process = null;
-
-					if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
-						try {
-							Runtime.getRuntime().removeShutdownHook(shutdownHook);
-						}
-						catch (IllegalStateException ignored) {
-							// JVM is in shutdown already, we can safely ignore this.
-						}
-						catch (Throwable t) {
-							LOG.warn("Exception while unregistering prcess cleanup shutdown hook.");
-						}
+			}
+			finally {
+				destroyed = true;
+
+				if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
+					try {
+						Runtime.getRuntime().removeShutdownHook(shutdownHook);
+					}
+					catch (IllegalStateException ignored) {
+						// JVM is in shutdown already, we can safely ignore this.
+					}
+					catch (Throwable t) {
+						LOG.warn("Exception while unregistering process cleanup shutdown hook.");
 					}
 				}
 			}
@@ -225,6 +254,47 @@ public abstract class TestJvmProcess {
 		}
 	}
 
+	/**
+	 * Gets the process ID, if possible. This method currently only work on UNIX-based
+	 * operating systems. On others, it returns {@code -1}.
+	 * 
+	 * @return The process ID, or -1, if the ID cannot be determined.
+	 */
+	public long getProcessId() {
+		checkState(process != null, "process not started");
+
+		try {
+			Class<? extends Process> clazz = process.getClass();
+			if (clazz.getName().equals("java.lang.UNIXProcess")) {
+				Field pidField = clazz.getDeclaredField("pid");
+				pidField.setAccessible(true);
+				return pidField.getLong(process);
+			} else {
+				return -1;
+			}
+		}
+		catch (Throwable ignored) {
+			return -1;
+		}
+	}
+
+	public boolean isAlive() {
+		if (destroyed) {
+			return false;
+		} else {
+			try {
+				// the method throws an exception as long as the
+				// process is alive
+				process.exitValue();
+				return false;
+			}
+			catch (IllegalThreadStateException ignored) {
+				// thi
+				return true;
+			}
+		}
+	}
+
 	// ---------------------------------------------------------------------------------------------
 	// File based synchronization utilities
 	// ---------------------------------------------------------------------------------------------
@@ -238,6 +308,19 @@ public abstract class TestJvmProcess {
 		}
 	}
 
+	public static void waitForMarkerFile(File file, long timeoutMillis) throws InterruptedException {
+		final long deadline = System.nanoTime() + timeoutMillis * 1_000_000;
+
+		boolean exists;
+		while (!(exists = file.exists()) && System.nanoTime() < deadline) {
+			Thread.sleep(10);
+		}
+
+		if (!exists) {
+			fail("The marker file was not found within " + timeoutMillis + " msecs");
+		}
+	}
+	
 	public static void waitForMarkerFiles(File basedir, String prefix, int num, long timeout) {
 		long now = System.currentTimeMillis();
 		final long deadline = now + timeout;

http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingShutdownTest.java
new file mode 100644
index 0000000..f22f42f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingShutdownTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.TestJvmProcess;
+import org.apache.flink.util.OperatingSystem;
+
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Test that verifies the behavior of blocking shutdown hooks and of the
+ * {@link JvmShutdownSafeguard} that guards against it.
+ */
+public class BlockingShutdownTest {
+
+	@Test
+	public void testProcessShutdownBlocking() throws Exception {
+		// this test works only on linux
+		assumeTrue(OperatingSystem.isLinux());
+
+		// this test leaves remaining processes if not executed with Java 8
+		CommonTestUtils.assumeJava8();
+
+		final File markerFile = new File(
+				EnvironmentInformation.getTemporaryFileDirectory(), UUID.randomUUID() + ".marker");
+
+		final BlockingShutdownProcess blockingProcess = 
+				new BlockingShutdownProcess(markerFile.getAbsolutePath(), 0, false);
+
+		try {
+			blockingProcess.startProcess();
+			long pid = blockingProcess.getProcessId();
+			assertTrue("Cannot determine process ID", pid != -1);
+
+			// wait for the marker file to appear, which means the process is up properly
+			TestJvmProcess.waitForMarkerFile(markerFile, 30000);
+
+			// send it a regular kill command (SIG_TERM)
+			Process kill = Runtime.getRuntime().exec("kill " + pid);
+			kill.waitFor();
+			assertEquals("failed to send SIG_TERM to process", 0, kill.exitValue());
+
+			// minimal delay until the Java process object notices that the process is gone
+			// this will not let the test fail predictably if the process is actually in fact going away,
+			// but it would create frequent failures. Not ideal, but the best we can do without
+			// severely prolonging the test
+			Thread.sleep(50);
+
+			// the process should not go away by itself
+			assertTrue("Test broken, process shutdown blocking does not work", blockingProcess.isAlive());
+		}
+		finally {
+			blockingProcess.destroy();
+
+			//noinspection ResultOfMethodCallIgnored
+			markerFile.delete();
+		}
+	}
+
+	@Test
+	public void testProcessExitsDespiteBlockingShutdownHook() throws Exception {
+		// this test works only on linux
+		assumeTrue(OperatingSystem.isLinux());
+
+		final File markerFile = new File(
+				EnvironmentInformation.getTemporaryFileDirectory(), UUID.randomUUID() + ".marker");
+
+		final BlockingShutdownProcess blockingProcess = 
+				new BlockingShutdownProcess(markerFile.getAbsolutePath(), 100, true);
+
+		try {
+			blockingProcess.startProcess();
+			long pid = blockingProcess.getProcessId();
+			assertTrue("Cannot determine process ID", pid != -1);
+
+			// wait for the marker file to appear, which means the process is up properly
+			TestJvmProcess.waitForMarkerFile(markerFile, 30000);
+
+			// send it a regular kill command (SIG_TERM)
+			Process kill = Runtime.getRuntime().exec("kill " + pid);
+			kill.waitFor();
+			assertEquals("failed to send SIG_TERM to process", 0, kill.exitValue());
+
+			// the process should eventually go away
+			final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs in nanos
+			while (blockingProcess.isAlive() && System.nanoTime() < deadline) {
+				Thread.sleep(50);
+			}
+
+			assertFalse("shutdown blocking process does not properly terminate itself", blockingProcess.isAlive());
+		}
+		finally {
+			blockingProcess.destroy();
+
+			//noinspection ResultOfMethodCallIgnored
+			markerFile.delete();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	// a method that blocks indefinitely
+	static void parkForever() {
+		// park this forever
+		final Object lock = new Object();
+		//noinspection InfiniteLoopStatement
+		while (true) {
+			try {
+				//noinspection SynchronizationOnLocalVariableOrMethodParameter
+				synchronized (lock) {
+					lock.wait();
+				}
+			} catch (InterruptedException ignored) {}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Blocking Process Implementation
+	// ------------------------------------------------------------------------
+
+	private static final class BlockingShutdownProcess extends TestJvmProcess {
+
+		private final String tempFilePath;
+		private final long selfKillDelay;
+		private final boolean installSignalHandler;
+
+		public BlockingShutdownProcess(String tempFilePath, long selfKillDelay, boolean installSignalHandler)
+				throws Exception {
+
+			this.tempFilePath = tempFilePath;
+			this.selfKillDelay = selfKillDelay;
+			this.installSignalHandler = installSignalHandler;
+		}
+
+		@Override
+		public String getName() {
+			return "BlockingShutdownProcess";
+		}
+
+		@Override
+		public String[] getJvmArgs() {
+			return new String[] { tempFilePath, String.valueOf(installSignalHandler), String.valueOf(selfKillDelay) };
+		}
+
+		@Override
+		public String getEntryPointClassName() {
+			return BlockingShutdownProcessEntryPoint.class.getName();
+		}
+	} 
+
+	// ------------------------------------------------------------------------
+
+	public static final class BlockingShutdownProcessEntryPoint {
+
+		private static final Logger LOG = LoggerFactory.getLogger(BlockingShutdownProcessEntryPoint.class);
+
+		public static void main(String[] args) throws Exception {
+			File touchFile = new File(args[0]);
+			boolean installHandler = Boolean.parseBoolean(args[1]);
+			long killDelay = Long.parseLong(args[2]);
+
+			// install the blocking shutdown hook
+			Thread shutdownHook = new Thread(new BlockingRunnable(), "Blocking ShutdownHook");
+			try {
+				// Add JVM shutdown hook to call shutdown of service
+				Runtime.getRuntime().addShutdownHook(shutdownHook);
+			}
+			catch (IllegalStateException ignored) {
+				// JVM is already shutting down. No need to do this.
+			}
+			catch (Throwable t) {
+				System.err.println("Cannot register process cleanup shutdown hook.");
+				t.printStackTrace();
+			}
+
+			// install the jvm terminator, if we want it
+			if (installHandler) {
+				JvmShutdownSafeguard.installAsShutdownHook(LOG, killDelay);
+			}
+
+			System.err.println("signaling process started");
+			TestJvmProcess.touchFile(touchFile);
+
+			System.err.println("parking the main thread");
+			parkForever();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	static final class BlockingRunnable implements Runnable {
+		
+		@Override
+		public void run() {
+			System.err.println("starting shutdown hook");
+			parkForever();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index cc4a527..d58b0af 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -47,18 +47,16 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
-
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.TestEnvironment;
+
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.internal.AssumptionViolatedException;
 import org.junit.runner.RunWith;
 
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -144,20 +142,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 	public static void startCassandra() throws IOException {
 
 		// check if we should run this test, current Cassandra version requires Java >= 1.8
-		try {
-			String javaVersionString = System.getProperty("java.runtime.version").substring(0, 3);
-			float javaVersion = Float.parseFloat(javaVersionString);
-			Assume.assumeTrue(javaVersion >= 1.8f);
-		}
-		catch (AssumptionViolatedException e) {
-			System.out.println("Skipping CassandraConnectorITCase, because the JDK is < Java 8+");
-			throw e;
-		}
-		catch (Exception e) {
-			LOG.error("Cannot determine Java version", e);
-			e.printStackTrace();
-			fail("Cannot determine Java version");
-		}
+		org.apache.flink.core.testutils.CommonTestUtils.assumeJava8();
 
 		// generate temporary files
 		tmpDir = CommonTestUtils.createTempDirectory();

http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
index a50a106..d318a3c 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.core.testutils;
 
+import org.junit.Assume;
+import org.junit.internal.AssumptionViolatedException;
+
 import java.io.BufferedWriter;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -27,6 +30,8 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 
+import static org.junit.Assert.fail;
+
 /**
  * This class contains reusable utility methods for unit tests.
  */
@@ -88,4 +93,25 @@ public class CommonTestUtils {
 		}
 		return f.toURI().toString();
 	}
+
+	/**
+	 * Checks whether this code runs in a Java 8 (Java 1.8) JVM. If not, this throws a
+	 * {@link AssumptionViolatedException}, which causes JUnit to skip the test that
+	 * called this method.
+	 */
+	public static void assumeJava8() {
+		try {
+			String javaVersionString = System.getProperty("java.runtime.version").substring(0, 3);
+			float javaVersion = Float.parseFloat(javaVersionString);
+			Assume.assumeTrue(javaVersion >= 1.8f);
+		}
+		catch (AssumptionViolatedException e) {
+			System.out.println("Skipping CassandraConnectorITCase, because the JDK is < Java 8+");
+			throw e;
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail("Cannot determine Java version: " + e.getMessage());
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java
index b0757f5..785dff9 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
 import org.apache.flink.runtime.testutils.TestingResourceManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 
 /**
@@ -55,6 +56,7 @@ public class TestingApplicationMaster extends YarnApplicationMasterRunner {
 	public static void main(String[] args) {
 		EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / JobManager", args);
 		SignalHandler.register(LOG);
+		JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
 		// run and exit with the proper return code
 		int returnCode = new TestingApplicationMaster().run(args);

http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 7453344..6619633 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
@@ -107,6 +108,7 @@ public class YarnApplicationMasterRunner {
 	public static void main(String[] args) {
 		EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / JobManager", args);
 		SignalHandler.register(LOG);
+		JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
 		// run and exit with the proper return code
 		int returnCode = new YarnApplicationMasterRunner().run(args);

http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
index 6839bb5..9638137 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
@@ -28,6 +28,8 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -46,7 +48,8 @@ public class YarnTaskManagerRunner {
 
 	public static void runYarnTaskManager(String[] args, final Class<? extends YarnTaskManager> taskManager) throws IOException {
 		EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskManager", args);
-		org.apache.flink.runtime.util.SignalHandler.register(LOG);
+		SignalHandler.register(LOG);
+		JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
 		// try to parse the command line arguments
 		final Configuration configuration;