You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/10 15:33:40 UTC

[1/4] flink git commit: [FLINK-5766] [distributed coordination] Unify the handling of NoResourceAvailableException

Repository: flink
Updated Branches:
  refs/heads/master 66305135b -> e29dfb840


[FLINK-5766] [distributed coordination] Unify the handling of NoResourceAvailableException


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3bde6ffb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3bde6ffb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3bde6ffb

Branch: refs/heads/master
Commit: 3bde6ffb6f55ec7ff807633ab1e79d9238e5a942
Parents: ef77c25
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Feb 9 19:12:32 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 10 16:28:30 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java |  4 +-
 .../executiongraph/ExecutionJobVertex.java      |  4 +-
 .../runtime/executiongraph/ExecutionVertex.java |  3 +-
 .../flink/runtime/instance/SlotProvider.java    |  5 +-
 .../runtime/jobmanager/scheduler/Scheduler.java | 28 ++++++-----
 .../ScheduleWithCoLocationHintTest.java         | 16 ++++---
 .../scheduler/SchedulerIsolatedTasksTest.java   | 11 +++--
 .../scheduler/SchedulerSlotSharingTest.java     | 49 ++++++++++----------
 8 files changed, 63 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3bde6ffb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index e29e5b6..60e5575 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -40,7 +40,6 @@ import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
@@ -248,9 +247,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 *               immediately deploy it.
 	 * 
 	 * @throws IllegalStateException Thrown, if the vertex is not in CREATED state, which is the only state that permits scheduling.
-	 * @throws NoResourceAvailableException Thrown is no queued scheduling is allowed and no resources are currently available.
 	 */
-	public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {
+	public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) {
 		if (slotProvider == null) {
 			throw new IllegalArgumentException("Cannot send null Scheduler when scheduling execution.");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/3bde6ffb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index e8664f7..3828fc9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -39,11 +39,11 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.JobManagerOptions;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
+
 import org.slf4j.Logger;
 
 import java.io.IOException;
@@ -386,7 +386,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	//  Actions
 	//---------------------------------------------------------------------------------------------
 	
-	public void scheduleAll(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {
+	public void scheduleAll(SlotProvider slotProvider, boolean queued) {
 		
 		ExecutionVertex[] vertices = this.taskVertices;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3bde6ffb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index cb2e177..92327fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -39,7 +39,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.JobManagerOptions;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -528,7 +527,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 		}
 	}
 
-	public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {
+	public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) {
 		return this.currentExecution.scheduleForExecution(slotProvider, queued);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3bde6ffb/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
index 49e6d9f..919f6a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.instance;
 
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 
 /**
@@ -41,8 +40,6 @@ public interface SlotProvider {
 	 * @param task         The task to allocate the slot for
 	 * @param allowQueued  Whether allow the task be queued if we do not have enough resource
 	 * @return The future of the allocation
-	 * 
-	 * @throws NoResourceAvailableException
 	 */
-	Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) throws NoResourceAvailableException;
+	Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3bde6ffb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index dc82440..72f9789 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -145,19 +145,25 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 
 
 	@Override
-	public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued)
-			throws NoResourceAvailableException {
+	public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
+		try {
+			final Object ret = scheduleTask(task, allowQueued);
 
-		final Object ret = scheduleTask(task, allowQueued);
-		if (ret instanceof SimpleSlot) {
-			return FlinkCompletableFuture.completed((SimpleSlot) ret);
-		}
-		else if (ret instanceof Future) {
-			return (Future<SimpleSlot>) ret;
+			if (ret instanceof SimpleSlot) {
+				return FlinkCompletableFuture.completed((SimpleSlot) ret);
+			}
+			else if (ret instanceof Future) {
+				@SuppressWarnings("unchecked")
+				Future<SimpleSlot> typed = (Future<SimpleSlot>) ret;
+				return typed;
+			}
+			else {
+				// this should never happen, simply guard this case with an exception
+				throw new RuntimeException();
+			}
 		}
-		else {
-			// this should never happen, simply guard this case with an exception
-			throw new RuntimeException();
+		catch (NoResourceAvailableException e) {
+			return FlinkCompletableFuture.completedExceptionally(e);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3bde6ffb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
index b803702..afb9dac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
@@ -36,6 +36,8 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 
 import org.junit.Test;
 
+import java.util.concurrent.ExecutionException;
+
 public class ScheduleWithCoLocationHintTest {
 
 	@Test
@@ -241,8 +243,8 @@ public class ScheduleWithCoLocationHintTest {
 			try {
 				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false).get();
 				fail("Scheduled even though no resource was available.");
-			} catch (NoResourceAvailableException e) {
-				// expected
+			} catch (ExecutionException e) {
+				assertTrue(e.getCause() instanceof NoResourceAvailableException);
 			}
 
 			assertEquals(0, scheduler.getNumberOfLocalizedAssignments());
@@ -509,11 +511,13 @@ public class ScheduleWithCoLocationHintTest {
 
 			try {
 				scheduler.allocateSlot(
-						new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false);
+						new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false).get();
 				fail("should not be able to find a resource");
-			} catch (NoResourceAvailableException e) {
-				// good
-			} catch (Exception e) {
+			}
+			catch (ExecutionException e) {
+				assertTrue(e.getCause() instanceof NoResourceAvailableException);
+			}
+			catch (Exception e) {
 				fail("wrong exception");
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3bde6ffb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index 9c21533..643efae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -31,6 +31,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.areAllDistinct;
@@ -133,11 +134,11 @@ public class SchedulerIsolatedTasksTest {
 			assertTrue(areAllDistinct(s1, s2, s3, s4, s5));
 			
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false);
+				scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
 				fail("Scheduler accepted scheduling request without available resource.");
 			}
-			catch (NoResourceAvailableException e) {
-				// pass!
+			catch (ExecutionException e) {
+				assertTrue(e.getCause() instanceof NoResourceAvailableException);
 			}
 			
 			// release some slots again
@@ -313,8 +314,8 @@ public class SchedulerIsolatedTasksTest {
 				scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
 				fail("Scheduler served a slot from a dead instance");
 			}
-			catch (NoResourceAvailableException e) {
-				// fine
+			catch (ExecutionException e) {
+				assertTrue(e.getCause() instanceof NoResourceAvailableException);
 			}
 			catch (Exception e) {
 				fail("Wrong exception type.");

http://git-wip-us.apache.org/repos/asf/flink/blob/3bde6ffb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
index c4121f6..5238e95 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
 
 import java.util.Random;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -79,8 +80,8 @@ public class SchedulerSlotSharingTest {
 				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
-			catch (NoResourceAvailableException e) {
-				// good!
+			catch (ExecutionException e) {
+				assertTrue(e.getCause() instanceof NoResourceAvailableException);
 			}
 			catch (Exception e) {
 				fail("Wrong exception.");
@@ -164,8 +165,8 @@ public class SchedulerSlotSharingTest {
 				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
-			catch (NoResourceAvailableException e) {
-				// good!
+			catch (ExecutionException e) {
+				assertTrue(e.getCause() instanceof NoResourceAvailableException);
 			}
 			catch (Exception e) {
 				fail("Wrong exception.");
@@ -187,8 +188,8 @@ public class SchedulerSlotSharingTest {
 				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
-			catch (NoResourceAvailableException e) {
-				// good!
+			catch (ExecutionException e) {
+				assertTrue(e.getCause() instanceof NoResourceAvailableException);
 			}
 			catch (Exception e) {
 				fail("Wrong exception.");
@@ -208,8 +209,8 @@ public class SchedulerSlotSharingTest {
 				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
-			catch (NoResourceAvailableException e) {
-				// good!
+			catch (ExecutionException e) {
+				assertTrue(e.getCause() instanceof NoResourceAvailableException);
 			}
 			catch (Exception e) {
 				fail("Wrong exception.");
@@ -376,8 +377,8 @@ public class SchedulerSlotSharingTest {
 				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
-			catch (NoResourceAvailableException e) {
-				// good!
+			catch (ExecutionException e) {
+				assertTrue(e.getCause() instanceof NoResourceAvailableException);
 			}
 			catch (Exception e) {
 				fail("Wrong exception.");
@@ -519,8 +520,8 @@ public class SchedulerSlotSharingTest {
 				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
-			catch (NoResourceAvailableException e) {
-				// good!
+			catch (ExecutionException e) {
+				assertTrue(e.getCause() instanceof NoResourceAvailableException);
 			}
 			catch (Exception e) {
 				fail("Wrong exception.");
@@ -530,30 +531,30 @@ public class SchedulerSlotSharingTest {
 				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
-			catch (NoResourceAvailableException e) {
-				// good!
+			catch (ExecutionException e) {
+				assertTrue(e.getCause() instanceof NoResourceAvailableException);
 			}
 			catch (Exception e) {
 				fail("Wrong exception.");
 			}
 			
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false);
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
-			catch (NoResourceAvailableException e) {
-				// good!
+			catch (ExecutionException e) {
+				assertTrue(e.getCause() instanceof NoResourceAvailableException);
 			}
 			catch (Exception e) {
 				fail("Wrong exception.");
 			}
 			
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 1)), false);
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 1)), false).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
-			catch (NoResourceAvailableException e) {
-				// good!
+			catch (ExecutionException e) {
+				assertTrue(e.getCause() instanceof NoResourceAvailableException);
 			}
 			catch (Exception e) {
 				fail("Wrong exception.");
@@ -1059,13 +1060,13 @@ public class SchedulerSlotSharingTest {
 			SimpleSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false).get();
 			
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 4, 5), sharingGroup), false);
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 4, 5), sharingGroup), false).get();
 				fail("should throw an exception");
 			}
-			catch (NoResourceAvailableException e) {
-				// expected
+			catch (ExecutionException e) {
+				assertTrue(e.getCause() instanceof NoResourceAvailableException);
 			}
-			
+
 			assertEquals(0, scheduler.getNumberOfAvailableSlots());
 			
 			s3_0.releaseSlot();


[3/4] flink git commit: [FLINK-5718] [core] TaskManagers exit the JVM on fatal exceptions.

Posted by se...@apache.org.
[FLINK-5718] [core] TaskManagers exit the JVM on fatal exceptions.

This closes #3276


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dfc6fba5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dfc6fba5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dfc6fba5

Branch: refs/heads/master
Commit: dfc6fba5b9830e6a7804a6a0c9f69b36bf772730
Parents: 3bde6ff
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Feb 6 15:52:39 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 10 16:28:31 2017 +0100

----------------------------------------------------------------------
 docs/setup/config.md                            |   4 +-
 .../flink/configuration/TaskManagerOptions.java |   5 +
 .../org/apache/flink/util/ExceptionUtils.java   |  37 +++
 .../taskexecutor/TaskManagerConfiguration.java  |  19 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  14 +
 .../taskmanager/TaskManagerRuntimeInfo.java     |   8 +
 ...askManagerComponentsStartupShutdownTest.java |   3 +-
 .../flink/runtime/testutils/TestJvmProcess.java |   9 +
 .../runtime/util/JvmExitOnFatalErrorTest.java   | 259 +++++++++++++++++++
 .../util/TestingTaskManagerRuntimeInfo.java     |   6 +
 .../flink/core/testutils/CommonTestUtils.java   |  25 ++
 11 files changed, 385 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 2accdc2..b86c534 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -86,7 +86,7 @@ The default fraction for managed memory can be adjusted using the `taskmanager.m
 
 - `taskmanager.memory.segment-size`: The size of memory buffers used by the memory manager and the network stack in bytes (DEFAULT: 32768 (= 32 KiBytes)).
 
-- `taskmanager.memory.preallocate`: Can be either of `true` or `false`. Specifies whether task managers should allocate all managed memory when starting up. (DEFAULT: false). When `taskmanager.memory.off-heap` is set to `true`, then it is advised that this configuration is also set to `true`.  If this configuration is set to `false` cleaning up of the allocated offheap memory happens only when the configured JVM parameter MaxDirectMemorySize is reached by triggering a full GC.
+- `taskmanager.memory.preallocate`: Can be either of `true` or `false`. Specifies whether task managers should allocate all managed memory when starting up. (DEFAULT: false). When `taskmanager.memory.off-heap` is set to `true`, then it is advised that this configuration is also set to `true`.  If this configuration is set to `false` cleaning up of the allocated offheap memory happens only when the configured JVM parameter MaxDirectMemorySize is reached by triggering a full GC. **Note:** For streaming setups, we highly recommend to set this value to `false` as the core state backends currently do not use the managed memory.
 
 ### Memory and Performance Debugging
 
@@ -265,6 +265,8 @@ The following parameters configure Flink's JobManager and TaskManagers.
 
 - `taskmanager.refused-registration-pause`: The pause after a registration has been refused by the job manager before retrying to connect. The refused registration pause requires a time unit specifier (ms/s/min/h/d) (e.g. "5 s"). (DEFAULT: **10 s**)
 
+- `taskmanager.jvm-exit-on-oom`: Indicates that the TaskManager should immediately terminate the JVM if the task thread throws an `OutOfMemoryError` (DEFAULT: **false**).
+
 - `blob.fetch.retries`: The number of retries for the TaskManager to download BLOBs (such as JAR files) from the JobManager (DEFAULT: **50**).
 
 - `blob.fetch.num-concurrent`: The number concurrent BLOB fetches (such as JAR file downloads) that the JobManager serves (DEFAULT: **50**).

http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 6f6238b..b7ee20a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -33,6 +33,11 @@ public class TaskManagerOptions {
 	// ------------------------------------------------------------------------
 
 	// @TODO Migrate 'taskmanager.*' config options from ConfigConstants
+	
+	/** Whether to kill the TaskManager when the task thread throws an OutOfMemoryError */
+	public static final ConfigOption<Boolean> KILL_ON_OUT_OF_MEMORY =
+			key("taskmanager.jvm-exit-on-oom")
+			.defaultValue(false);
 
 	// ------------------------------------------------------------------------
 	//  Network Options

http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 1069f2d..6ba9ef6 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -33,8 +33,13 @@ import java.io.StringWriter;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
+/**
+ * A collection of utility functions for dealing with exceptions and exception workflows.
+ */
 @Internal
 public final class ExceptionUtils {
+
+	/** The stringified representation of a null exception reference */ 
 	public static final String STRINGIFIED_NULL_EXCEPTION = "(null)";
 
 	/**
@@ -64,6 +69,38 @@ public final class ExceptionUtils {
 	}
 
 	/**
+	 * Checks whether the given exception indicates a situation that may leave the
+	 * JVM in a corrupted state, meaning a state where continued normal operation can only be
+	 * guaranteed via clean process restart.
+	 *
+	 * <p>Currently considered fatal exceptions are Virtual Machine errors indicating
+	 * that the JVM is corrupted, like {@link InternalError}, {@link UnknownError},
+	 * and {@link java.util.zip.ZipError} (a special case of InternalError).
+	 *
+	 * @param t The exception to check.
+	 * @return True, if the exception is considered fatal to the JVM, false otherwise.
+	 */
+	public static boolean isJvmFatalError(Throwable t) {
+		return (t instanceof InternalError) || (t instanceof UnknownError);
+	}
+
+	/**
+	 * Checks whether the given exception indicates a situation that may leave the
+	 * JVM in a corrupted state, or an out-of-memory error.
+	 * 
+	 * <p>See {@link ExceptionUtils#isJvmFatalError(Throwable)} for a list of fatal JVM errors.
+	 * This method additionally classifies the {@link OutOfMemoryError} as fatal, because it
+	 * may occur in any thread (not the one that allocated the majority of the memory) and thus
+	 * is often not recoverable by destroying the particular thread that threw the exception.
+	 * 
+	 * @param t The exception to check.
+	 * @return True, if the exception is fatal to the JVM or and OutOfMemoryError, false otherwise.
+	 */
+	public static boolean isJvmFatalOrOutOfMemoryError(Throwable t) {
+		return isJvmFatalError(t) || t instanceof OutOfMemoryError;
+	}
+
+	/**
 	 * Adds a new exception as a {@link Throwable#addSuppressed(Throwable) suppressed exception}
 	 * to a prior exception, or returns the new exception, if no prior exception exists.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index 1d1e732..a6e4748 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -21,12 +21,15 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.concurrent.duration.Duration;
 
 import java.io.File;
@@ -53,6 +56,8 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 
 	private final UnmodifiableConfiguration configuration;
 
+	private final boolean exitJvmOnOutOfMemory;
+
 	public TaskManagerConfiguration(
 		int numberSlots,
 		String[] tmpDirectories,
@@ -62,7 +67,8 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 		Time maxRegistrationPause,
 		Time refusedRegistrationPause,
 		long cleanupInterval,
-		Configuration configuration) {
+		Configuration configuration,
+		boolean exitJvmOnOutOfMemory) {
 
 		this.numberSlots = numberSlots;
 		this.tmpDirectories = Preconditions.checkNotNull(tmpDirectories);
@@ -73,6 +79,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 		this.refusedRegistrationPause = Preconditions.checkNotNull(refusedRegistrationPause);
 		this.cleanupInterval = Preconditions.checkNotNull(cleanupInterval);
 		this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration));
+		this.exitJvmOnOutOfMemory = exitJvmOnOutOfMemory;
 	}
 
 	public int getNumberSlots() {
@@ -113,6 +120,11 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 		return tmpDirectories;
 	}
 
+	@Override
+	public boolean shouldExitJvmOnOutOfMemoryError() {
+		return exitJvmOnOutOfMemory;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Static factory methods
 	// --------------------------------------------------------------------------------------------
@@ -205,6 +217,8 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 				ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
 		}
 
+		final boolean exitOnOom = configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY);
+
 		return new TaskManagerConfiguration(
 			numberSlots,
 			tmpDirPaths,
@@ -214,6 +228,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 			maxRegistrationPause,
 			refusedRegistrationPause,
 			cleanupInterval,
-			configuration);
+			configuration,
+			exitOnOom);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index c2e6d09..64a83c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -64,6 +64,7 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
@@ -700,6 +701,19 @@ public class Task implements Runnable, TaskActions {
 			// ----------------------------------------------------------------
 
 			try {
+				// check if the exception is unrecoverable
+				if (ExceptionUtils.isJvmFatalError(t) || 
+					(t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError()))
+				{
+					// terminate the JVM immediately
+					// don't attempt a clean shutdown, because we cannot expect the clean shutdown to complete
+					try {
+						LOG.error("Encountered fatal error {} - terminating the JVM", t.getClass().getName(), t);
+					} finally {
+						Runtime.getRuntime().halt(-1);
+					}
+				}
+
 				// transition into our final state. we should be either in DEPLOYING, RUNNING, CANCELING, or FAILED
 				// loop for multiple retries during concurrent state changes via calls to cancel() or
 				// to failExternally()

http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
index d1efe34..dbf8c73 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
@@ -38,4 +38,12 @@ public interface TaskManagerRuntimeInfo {
 	 * @return The list of temporary file directories.
 	 */
 	String[] getTmpDirectories();
+
+	/**
+	 * Checks whether the TaskManager should exit the JVM when the task thread throws
+	 * an OutOfMemoryError.
+	 * 
+	 * @return True to terminate the JVM on an OutOfMemoryError, false otherwise.
+	 */
+	boolean shouldExitJvmOnOutOfMemoryError();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index a6e1a2b..e26e176 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -109,7 +109,8 @@ public class TaskManagerComponentsStartupShutdownTest {
 				Time.seconds(30),
 				Time.seconds(10),
 				1000000, // cleanup interval
-				config);
+				config,
+				false); // exit-jvm-on-fatal-error
 
 			final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration(
 					32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, 0, 0, null);

http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
index 5954ee5..4578edf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
@@ -295,6 +295,15 @@ public abstract class TestJvmProcess {
 		}
 	}
 
+	public void waitFor() throws InterruptedException {
+		Process process = this.process;
+		if (process != null) {
+			process.waitFor();
+		} else {
+			throw new IllegalStateException("process not started");
+		}
+	}
+
 	// ---------------------------------------------------------------------------------------------
 	// File based synchronization utilities
 	// ---------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
new file mode 100644
index 0000000..10f4303
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
+import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerActions;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.testutils.TestJvmProcess;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assume.assumeTrue;
+import static org.mockito.Mockito.*;
+
+/**
+ * Test that verifies the behavior of blocking shutdown hooks and of the
+ * {@link JvmShutdownSafeguard} that guards against it.
+ */
+public class JvmExitOnFatalErrorTest {
+
+	@Test
+	public void testExitJvmOnOutOfMemory() throws Exception {
+		// this test works only on linux
+		assumeTrue(OperatingSystem.isLinux());
+
+		// this test leaves remaining processes if not executed with Java 8
+		CommonTestUtils.assumeJava8();
+
+		// to check what went wrong (when the test hangs) uncomment this line 
+//		ProcessEntryPoint.main(new String[0]);
+
+		final KillOnFatalErrorProcess testProcess = new KillOnFatalErrorProcess();
+
+		try {
+			testProcess.startProcess();
+			testProcess.waitFor();
+		}
+		finally {
+			testProcess.destroy();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Blocking Process Implementation
+	// ------------------------------------------------------------------------
+
+	private static final class KillOnFatalErrorProcess extends TestJvmProcess {
+
+		public KillOnFatalErrorProcess() throws Exception {}
+
+		@Override
+		public String getName() {
+			return "KillOnFatalErrorProcess";
+		}
+
+		@Override
+		public String[] getJvmArgs() {
+			return new String[0];
+		}
+
+		@Override
+		public String getEntryPointClassName() {
+			return ProcessEntryPoint.class.getName();
+		}
+	} 
+
+	// ------------------------------------------------------------------------
+
+	public static final class ProcessEntryPoint {
+
+		public static void main(String[] args) throws Exception {
+
+			System.err.println("creating task");
+
+			// we suppress process exits via errors here to not
+			// have a test that exits accidentally due to a programming error 
+			try {
+				final Configuration taskManagerConfig = new Configuration();
+				taskManagerConfig.setBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY, true);
+
+				final JobID jid = new JobID();
+				final JobVertexID jobVertexId = new JobVertexID();
+				final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
+				final AllocationID slotAllocationId = new AllocationID();
+
+				final SerializedValue<ExecutionConfig> execConfig = new SerializedValue<>(new ExecutionConfig());
+
+				final JobInformation jobInformation = new JobInformation(
+						jid, "Test Job", execConfig, new Configuration(),
+						Collections.<BlobKey>emptyList(), Collections.<URL>emptyList());
+
+				final TaskInformation taskInformation = new TaskInformation(
+						jobVertexId, "Test Task", 1, 1, OomInvokable.class.getName(), new Configuration());
+
+				final MemoryManager memoryManager = new MemoryManager(1024 * 1024, 1);
+				final IOManager ioManager = new IOManagerAsync();
+
+				final NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
+				when(networkEnvironment.createKvStateTaskRegistry(jid, jobVertexId)).thenReturn(mock(TaskKvStateRegistry.class));
+
+				final TaskManagerRuntimeInfo tmInfo = TaskManagerConfiguration.fromConfiguration(taskManagerConfig);
+
+				final Executor executor = Executors.newCachedThreadPool();
+
+				Task task = new Task(
+						jobInformation,
+						taskInformation,
+						executionAttemptID,
+						slotAllocationId,
+						0,       // subtaskIndex
+						0,       // attemptNumber
+						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+						Collections.<InputGateDeploymentDescriptor>emptyList(),
+						0,       // targetSlotNumber
+						null,    // taskStateHandles,
+						memoryManager,
+						ioManager,
+						networkEnvironment,
+						new BroadcastVariableManager(),
+						new NoOpTaskManagerActions(),
+						new NoOpInputSplitProvider(),
+						new NoOpCheckpointResponder(),
+						new FallbackLibraryCacheManager(),
+						new FileCache(tmInfo.getTmpDirectories()),
+						tmInfo,
+						new UnregisteredTaskMetricsGroup(),
+						new NoOpResultPartitionConsumableNotifier(),
+						new NoOpPartitionProducerStateChecker(),
+						executor);
+
+				System.err.println("starting task thread");
+
+				task.startTaskThread();
+			}
+			catch (Throwable t) {
+				System.err.println("ERROR STARTING TASK");
+				t.printStackTrace();
+			}
+
+			System.err.println("parking the main thread");
+			CommonTestUtils.blockForeverNonInterruptibly();
+		}
+
+		public static final class OomInvokable extends AbstractInvokable {
+
+			@Override
+			public void invoke() throws Exception {
+				throw new OutOfMemoryError();
+			}
+		}
+
+		private static final class NoOpTaskManagerActions implements TaskManagerActions {
+
+			@Override
+			public void notifyFinalState(ExecutionAttemptID executionAttemptID) {}
+
+			@Override
+			public void notifyFatalError(String message, Throwable cause) {}
+
+			@Override
+			public void failTask(ExecutionAttemptID executionAttemptID, Throwable cause) {}
+
+			@Override
+			public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {}
+		}
+
+		private static final class NoOpInputSplitProvider implements InputSplitProvider {
+
+			@Override
+			public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) {
+				return null;
+			}
+		}
+
+		private static final class NoOpCheckpointResponder implements CheckpointResponder {
+
+			@Override
+			public void acknowledgeCheckpoint(JobID j, ExecutionAttemptID e, CheckpointMetaData c, SubtaskState s) {}
+
+			@Override
+			public void declineCheckpoint(JobID j, ExecutionAttemptID e, long l, Throwable t) {}
+		}
+
+		private static final class NoOpResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
+
+			@Override
+			public void notifyPartitionConsumable(JobID j, ResultPartitionID p, TaskActions t) {}
+		}
+
+		private static final class NoOpPartitionProducerStateChecker implements PartitionProducerStateChecker {
+
+			@Override
+			public Future<ExecutionState> requestPartitionProducerState(
+					JobID jobId, IntermediateDataSetID intermediateDataSetId, ResultPartitionID r) {
+				return null;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java
index 0d0363f..4183152 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java
@@ -59,4 +59,10 @@ public class TestingTaskManagerRuntimeInfo implements TaskManagerRuntimeInfo {
 	public String[] getTmpDirectories() {
 		return tmpDirectories;
 	}
+
+	@Override
+	public boolean shouldExitJvmOnOutOfMemoryError() {
+		// never kill the JVM in tests
+		return false;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
index 2eb18c1..639b065 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
@@ -97,6 +97,27 @@ public class CommonTestUtils {
 	}
 
 	/**
+	 * Permanently blocks the current thread. The thread cannot be woken
+	 * up via {@link Thread#interrupt()}.
+	 */
+	public static void blockForeverNonInterruptibly() {
+		final Object lock = new Object();
+		//noinspection InfiniteLoopStatement
+		while (true) {
+			try {
+				//noinspection SynchronizationOnLocalVariableOrMethodParameter
+				synchronized (lock) {
+					lock.wait();
+				}
+			} catch (InterruptedException ignored) {}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Preconditions on the test environment
+	// ------------------------------------------------------------------------
+
+	/**
 	 * Checks whether this code runs in a Java 8 (Java 1.8) JVM. If not, this throws a
 	 * {@link AssumptionViolatedException}, which causes JUnit to skip the test that
 	 * called this method.
@@ -117,6 +138,10 @@ public class CommonTestUtils {
 		}
 	}
 
+	// ------------------------------------------------------------------------
+	//  Manipulation of environment
+	// ------------------------------------------------------------------------
+
 	public static void setEnv(Map<String, String> newenv) {
 		setEnv(newenv, true);
 	}


[4/4] flink git commit: [hotfix] [dist] Add notice about memory pre-allocation to default 'flink-conf.yaml'

Posted by se...@apache.org.
[hotfix] [dist] Add notice about memory pre-allocation to default 'flink-conf.yaml'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e29dfb84
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e29dfb84
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e29dfb84

Branch: refs/heads/master
Commit: e29dfb840495e0c1fd01e87a4af1abbf98103fa4
Parents: dfc6fba
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Feb 10 14:53:58 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 10 16:28:31 2017 +0100

----------------------------------------------------------------------
 flink-dist/src/main/resources/flink-conf.yaml | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e29dfb84/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index 0f30595..72acbeb 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -53,6 +53,8 @@ taskmanager.numberOfTaskSlots: 1
 
 # Specify whether TaskManager memory should be allocated when starting up (true) or when
 # memory is required in the memory manager (false)
+# Important Note: For pure streaming setups, we highly recommend to set this value to `false`
+# as the default state backends currently do not use the managed memory.
 
 taskmanager.memory.preallocate: false
 


[2/4] flink git commit: [FLINK-5759] [jobmanager] Set UncaughtExceptionHandlers for JobManager's Future and I/O thread pools

Posted by se...@apache.org.
[FLINK-5759] [jobmanager] Set UncaughtExceptionHandlers for JobManager's Future and I/O thread pools

This closes #3290


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef77c254
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef77c254
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef77c254

Branch: refs/heads/master
Commit: ef77c254dadbe4c04810681fe765f5ec7d2a7400
Parents: 6630513
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Feb 9 14:04:17 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 10 16:28:30 2017 +0100

----------------------------------------------------------------------
 .../MesosApplicationMasterRunner.java           |  10 +-
 .../flink/runtime/filecache/FileCache.java      |   3 +-
 .../runtime/jobmaster/JobManagerServices.java   |   6 +-
 .../runtime/util/ExecutorThreadFactory.java     | 123 ++++++++++++++-----
 .../flink/runtime/util/NamedThreadFactory.java  |  58 ---------
 .../flink/runtime/jobmanager/JobManager.scala   |   4 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |  10 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |   8 +-
 8 files changed, 119 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 5033692..a23c9f6 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -22,10 +22,12 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Props;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
+
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -52,15 +54,17 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.runtime.util.NamedThreadFactory;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.apache.mesos.Protos;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -216,11 +220,11 @@ public class MesosApplicationMasterRunner {
 
 			futureExecutor = Executors.newScheduledThreadPool(
 				numberProcessors,
-				new NamedThreadFactory("mesos-jobmanager-future-", "-thread-"));
+				new ExecutorThreadFactory("mesos-jobmanager-future"));
 
 			ioExecutor = Executors.newFixedThreadPool(
 				numberProcessors,
-				new NamedThreadFactory("mesos-jobmanager-io-", "-thread-"));
+				new ExecutorThreadFactory("mesos-jobmanager-io"));
 
 			mesosServices = MesosServicesUtils.createMesosServices(config);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index 21456de..4f2166f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -99,7 +99,8 @@ public class FileCache {
 		this.shutdownHook = createShutdownHook(this, LOG);
 
 		this.entries = new HashMap<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>>();
-		this.executorService = Executors.newScheduledThreadPool(10, ExecutorThreadFactory.INSTANCE);
+		this.executorService = Executors.newScheduledThreadPool(10, 
+				new ExecutorThreadFactory("flink-file-cache"));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
index 95500e5..8cda0f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
@@ -120,8 +120,12 @@ public class JobManagerServices {
 			throw new IllegalConfigurationException(AkkaUtils.formatDurationParingErrorMessage());
 		}
 
+		final ScheduledExecutorService futureExecutor = Executors.newScheduledThreadPool(
+				Hardware.getNumberCPUCores(),
+				new ExecutorThreadFactory("jobmanager-future"));
+
 		return new JobManagerServices(
-			Executors.newScheduledThreadPool(Hardware.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE),
+			futureExecutor,
 			libraryCacheManager,
 			RestartStrategyFactory.createRestartStrategyFactory(config),
 			Time.of(timeout.length(), timeout.unit()));

http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
index 2fb5972..4a79db3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
@@ -18,49 +18,114 @@
 
 package org.apache.flink.runtime.util;
 
+import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A thread factory intended for use by critical thread pools. Critical thread pools here
+ * mean thread pools that support Flink's core coordination and processing work, and which
+ * must not simply cause unnoticed errors.
+ * 
+ * <p>The thread factory can be given an {@link UncaughtExceptionHandler} for the threads.
+ * If no handler is explicitly given, the default handler for uncaught exceptions will log
+ * the exceptions and kill the process afterwards. That guarantees that critical exceptions are
+ * not accidentally lost and leave the system running in an inconsistent state.
+ * 
+ * <p>Threads created by this factory are all called '(pool-name)-thread-n', where
+ * <i>(pool-name)</i> is configurable, and <i>n</i> is an incrementing number.
+ * 
+ * <p>All threads created by this factory are daemon threads and have the default (normal)
+ * priority.
+ */
 public class ExecutorThreadFactory implements ThreadFactory {
-	
-	private static final Logger LOG = LoggerFactory.getLogger(ExecutorThreadFactory.class);
-	
-	
-	private static final String THREAD_NAME_PREFIX = "Flink Executor Thread - ";
-	
-	private static final AtomicInteger COUNTER = new AtomicInteger(1);
-	
-	private static final ThreadGroup THREAD_GROUP = new ThreadGroup("Flink Executor Threads");
-	
-	private static final Thread.UncaughtExceptionHandler EXCEPTION_HANDLER = new LoggingExceptionHander();
-	
-	
-	public static final ExecutorThreadFactory INSTANCE = new ExecutorThreadFactory();
-	
-	// --------------------------------------------------------------------------------------------
-	
-	private ExecutorThreadFactory() {}
-	
-	
-	public Thread newThread(Runnable target) {
-		Thread t = new Thread(THREAD_GROUP, target, THREAD_NAME_PREFIX + COUNTER.getAndIncrement());
+
+	/** The thread pool name used when no explicit pool name has been specified */ 
+	private static final String DEFAULT_POOL_NAME = "flink-executor-pool";
+
+	private final AtomicInteger threadNumber = new AtomicInteger(1);
+
+	private final ThreadGroup group;
+
+	private final String namePrefix;
+
+	private final UncaughtExceptionHandler exceptionHandler;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new thread factory using the default thread pool name ('flink-executor-pool')
+	 * and the default uncaught exception handler (log exception and kill process).
+	 */
+	public ExecutorThreadFactory() {
+		this(DEFAULT_POOL_NAME);
+	}
+
+	/**
+	 * Creates a new thread factory using the given thread pool name and the default
+	 * uncaught exception handler (log exception and kill process).
+	 * 
+	 * @param poolName The pool name, used as the threads' name prefix
+	 */
+	public ExecutorThreadFactory(String poolName) {
+		this(poolName, FatalExitExceptionHandler.INSTANCE);
+	}
+
+	/**
+	 * Creates a new thread factory using the given thread pool name and the given
+	 * uncaught exception handler.
+	 * 
+	 * @param poolName The pool name, used as the threads' name prefix
+	 * @param exceptionHandler The uncaught exception handler for the threads
+	 */
+	public ExecutorThreadFactory(String poolName, UncaughtExceptionHandler exceptionHandler) {
+		checkNotNull(poolName, "poolName");
+
+		SecurityManager securityManager = System.getSecurityManager();
+		this.group = (securityManager != null) ? securityManager.getThreadGroup() :
+				Thread.currentThread().getThreadGroup();
+
+		this.namePrefix = poolName + "-thread-";
+		this.exceptionHandler = exceptionHandler;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public Thread newThread(Runnable runnable) {
+		Thread t = new Thread(group, runnable, namePrefix + threadNumber.getAndIncrement());
 		t.setDaemon(true);
-		t.setUncaughtExceptionHandler(EXCEPTION_HANDLER);
+
+		// normalize the priority
+		if (t.getPriority() != Thread.NORM_PRIORITY) {
+			t.setPriority(Thread.NORM_PRIORITY);
+		}
+
+		// optional handler for uncaught exceptions
+		if (exceptionHandler != null) {
+			t.setUncaughtExceptionHandler(exceptionHandler);
+		}
+
 		return t;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
-	private static final class LoggingExceptionHander implements Thread.UncaughtExceptionHandler {
+
+	private static final class FatalExitExceptionHandler implements UncaughtExceptionHandler {
+
+		private static final Logger LOG = LoggerFactory.getLogger(FatalExitExceptionHandler.class);
+
+		static final FatalExitExceptionHandler INSTANCE = new FatalExitExceptionHandler(); 
 
 		@Override
 		public void uncaughtException(Thread t, Throwable e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Thread '" + t.getName() + "' produced an uncaught exception.", e);
-			}
+			LOG.error("FATAL: Thread '" + t.getName() + "' produced an uncaught exception. Stopping the process...", e);
+			System.exit(-17);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
deleted file mode 100644
index bd97963..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Thread factory which allows to specify a thread pool name and a thread name.
- *
- * The code is based on {@link java.util.concurrent.Executors.DefaultThreadFactory}.
- */
-public class NamedThreadFactory implements ThreadFactory {
-	private static final AtomicInteger poolNumber = new AtomicInteger(1);
-	private final ThreadGroup group;
-	private final AtomicInteger threadNumber = new AtomicInteger(1);
-	private final String namePrefix;
-
-	public NamedThreadFactory(final String poolName, final String threadName) {
-		SecurityManager securityManager = System.getSecurityManager();
-		group = (securityManager != null) ? securityManager.getThreadGroup() :
-			Thread.currentThread().getThreadGroup();
-
-		namePrefix = poolName +
-			poolNumber.getAndIncrement() +
-			threadName;
-	}
-
-	@Override
-	public Thread newThread(Runnable runnable) {
-		Thread t = new Thread(group, runnable,
-			namePrefix + threadNumber.getAndIncrement(),
-			0);
-		if (t.isDaemon()) {
-			t.setDaemon(false);
-		}
-		if (t.getPriority() != Thread.NORM_PRIORITY) {
-			t.setPriority(Thread.NORM_PRIORITY);
-		}
-		return t;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d575f68..a335916 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2023,11 +2023,11 @@ object JobManager {
 
     val futureExecutor = Executors.newScheduledThreadPool(
       numberProcessors,
-      new NamedThreadFactory("jobmanager-future-", "-thread-"))
+      new ExecutorThreadFactory("jobmanager-future"))
 
     val ioExecutor = Executors.newFixedThreadPool(
       numberProcessors,
-      new NamedThreadFactory("jobmanager-io-", "-thread-"))
+      new ExecutorThreadFactory("jobmanager-io"))
 
     val timeout = AkkaUtils.getTimeout(configuration)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 64cc97d..07fb996 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.minicluster
 
-import java.net.InetAddress
 import java.util.UUID
 import java.util.concurrent.{Executors, TimeUnit}
 
@@ -26,7 +25,7 @@ import akka.pattern.Patterns.gracefulStop
 import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
 import com.typesafe.config.Config
-import org.apache.flink.api.common.time.Time
+
 import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
@@ -37,8 +36,9 @@ import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService, StandaloneLeaderRetrievalService}
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
-import org.apache.flink.runtime.util.{Hardware, NamedThreadFactory, ZooKeeperUtils}
+import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware, ZooKeeperUtils}
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
+
 import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration.{Duration, FiniteDuration}
@@ -109,11 +109,11 @@ abstract class FlinkMiniCluster(
 
   val futureExecutor = Executors.newScheduledThreadPool(
     Hardware.getNumberCPUCores(),
-    new NamedThreadFactory("mini-cluster-future-", "-thread"))
+    new ExecutorThreadFactory("mini-cluster-future"))
 
   val ioExecutor = Executors.newFixedThreadPool(
     Hardware.getNumberCPUCores(),
-    new NamedThreadFactory("mini-cluster-io-", "-thread"))
+    new ExecutorThreadFactory("mini-cluster-io"))
 
   def configuration: Configuration = {
     if (originalConfiguration.getInteger(

http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 29f1827..5cc51e4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -38,14 +38,14 @@ import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.runtime.util.NamedThreadFactory;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
-
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -230,11 +230,11 @@ public class YarnApplicationMasterRunner {
 
 		final ScheduledExecutorService futureExecutor = Executors.newScheduledThreadPool(
 			numberProcessors,
-			new NamedThreadFactory("yarn-jobmanager-future-", "-thread-"));
+			new ExecutorThreadFactory("yarn-jobmanager-future"));
 
 		final ExecutorService ioExecutor = Executors.newFixedThreadPool(
 			numberProcessors,
-			new NamedThreadFactory("yarn-jobmanager-io-", "-thread-"));
+			new ExecutorThreadFactory("yarn-jobmanager-io"));
 
 		try {
 			// ------- (1) load and parse / validate all configurations -------