You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/22 22:09:40 UTC

[1/4] flink git commit: [FLINK-5073] Use Executor to run ZooKeeper callbacks in ZooKeeperStateHandleStore

Repository: flink
Updated Branches:
  refs/heads/master 698e53e47 -> c590912c9


[FLINK-5073] Use Executor to run ZooKeeper callbacks in ZooKeeperStateHandleStore

Use dedicated Executor to run ZooKeeper callbacks in ZooKeeperStateHandleStore instead
of running it in the ZooKeeper client's thread. The callback can be blocking because it
discards state which might entail deleting files from disk.

Introduce dedicated Executor for blocking io operations

This closes #2815.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3fb92d86
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3fb92d86
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3fb92d86

Branch: refs/heads/master
Commit: 3fb92d8687f03c1fac8b87396b2b5a7ff29f6dd6
Parents: ae4b274
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Nov 15 22:45:04 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 22 23:00:16 2016 +0100

----------------------------------------------------------------------
 .../MesosApplicationMasterRunner.java           | 12 +++-
 .../BackPressureStatsTrackerITCase.java         |  6 +-
 .../StackTraceSampleCoordinatorITCase.java      |  6 +-
 .../webmonitor/WebRuntimeMonitorITCase.java     |  1 +
 .../ZooKeeperCheckpointRecoveryFactory.java     | 12 +++-
 .../ZooKeeperCompletedCheckpointStore.java      |  7 ++-
 .../ZooKeeperSubmittedJobGraphStore.java        |  7 ++-
 .../flink/runtime/util/ZooKeeperUtils.java      | 20 ++++---
 .../RetrievableStateStorageHelper.java          |  2 +-
 .../zookeeper/ZooKeeperStateHandleStore.java    | 13 ++++-
 .../flink/runtime/jobmanager/JobManager.scala   | 59 +++++++++++++-------
 .../runtime/minicluster/FlinkMiniCluster.scala  |  9 ++-
 .../minicluster/LocalFlinkMiniCluster.scala     |  5 +-
 ...ZooKeeperCompletedCheckpointStoreITCase.java |  3 +-
 .../runtime/jobmanager/JobManagerTest.java      |  4 ++
 .../flink/runtime/jobmanager/JobSubmitTest.java |  1 +
 .../ZooKeeperSubmittedJobGraphsStoreITCase.java | 14 +++--
 .../resourcemanager/ClusterShutdownITCase.java  | 14 ++++-
 .../resourcemanager/ResourceManagerITCase.java  | 14 ++++-
 ...askManagerComponentsStartupShutdownTest.java |  1 +
 .../TaskManagerProcessReapingTestBase.java      |  1 +
 .../TaskManagerRegistrationTest.java            |  8 ++-
 .../ZooKeeperStateHandleStoreITCase.java        | 31 +++++-----
 .../jobmanager/JobManagerRegistrationTest.scala |  1 +
 .../runtime/testingUtils/TestingUtils.scala     | 35 ++++++++----
 ...ctTaskManagerProcessFailureRecoveryTest.java |  1 +
 .../recovery/ProcessFailureCancelingITCase.java |  1 +
 .../flink/yarn/YarnApplicationMasterRunner.java | 12 +++-
 28 files changed, 212 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 09f87bd..166218f 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -174,10 +174,14 @@ public class MesosApplicationMasterRunner {
 
 		int numberProcessors = Hardware.getNumberCPUCores();
 
-		final ExecutorService executor = Executors.newFixedThreadPool(
+		final ExecutorService futureExecutor = Executors.newFixedThreadPool(
 			numberProcessors,
 			new NamedThreadFactory("mesos-jobmanager-future-", "-thread-"));
 
+		final ExecutorService ioExecutor = Executors.newFixedThreadPool(
+			numberProcessors,
+			new NamedThreadFactory("mesos-jobmanager-io-", "-thread-"));
+
 		try {
 			// ------- (1) load and parse / validate all configurations -------
 
@@ -293,7 +297,8 @@ public class MesosApplicationMasterRunner {
 			ActorRef jobManager = JobManager.startJobManagerActors(
 				config,
 				actorSystem,
-				executor,
+				futureExecutor,
+				ioExecutor,
 				new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
 				scala.Option.<String>empty(),
 				getJobManagerClass(),
@@ -399,7 +404,8 @@ public class MesosApplicationMasterRunner {
 			LOG.error("Failed to stop the artifact server", t);
 		}
 
-		executor.shutdownNow();
+		futureExecutor.shutdownNow();
+		ioExecutor.shutdownNow();
 
 		return 0;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
index d01e0cf..357301a 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -120,7 +120,11 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
 			}
 
 			try {
-				jobManger = TestingUtils.createJobManager(testActorSystem, testActorSystem.dispatcher(), new Configuration());
+				jobManger = TestingUtils.createJobManager(
+					testActorSystem,
+					testActorSystem.dispatcher(),
+					testActorSystem.dispatcher(),
+					new Configuration());
 
 				final Configuration config = new Configuration();
 				config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index 4b5bd2f..f31f41f 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -90,7 +90,11 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger {
 			ActorGateway taskManager = null;
 
 			try {
-				jobManger = TestingUtils.createJobManager(testActorSystem, testActorSystem.dispatcher(), new Configuration());
+				jobManger = TestingUtils.createJobManager(
+					testActorSystem,
+					testActorSystem.dispatcher(),
+					testActorSystem.dispatcher(),
+					new Configuration());
 
 				final Configuration config = new Configuration();
 				config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index fcdf94d..853ef14 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -180,6 +180,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 					jmConfig,
 					jobManagerSystem[i],
 					jobManagerSystem[i].dispatcher(),
+					jobManagerSystem[i].dispatcher(),
 					JobManager.class,
 					MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
index f47012d..09bfa8c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
@@ -24,6 +24,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 
+import java.util.concurrent.Executor;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -35,9 +37,15 @@ public class ZooKeeperCheckpointRecoveryFactory implements CheckpointRecoveryFac
 
 	private final Configuration config;
 
-	public ZooKeeperCheckpointRecoveryFactory(CuratorFramework client, Configuration config) {
+	private final Executor executor;
+
+	public ZooKeeperCheckpointRecoveryFactory(
+			CuratorFramework client,
+			Configuration config,
+			Executor executor) {
 		this.client = checkNotNull(client, "Curator client");
 		this.config = checkNotNull(config, "Configuration");
+		this.executor = checkNotNull(executor, "Executor");
 	}
 
 	@Override
@@ -55,7 +63,7 @@ public class ZooKeeperCheckpointRecoveryFactory implements CheckpointRecoveryFac
 			throws Exception {
 
 		return ZooKeeperUtils.createCompletedCheckpoints(client, config, jobId,
-				NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN);
+				NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, executor);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 4f67921..4add504 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -37,6 +37,7 @@ import java.util.ArrayList;
 import java.util.ConcurrentModificationException;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -92,13 +93,15 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 	 *                                       start with a '/')
 	 * @param stateStorage                   State storage to be used to persist the completed
 	 *                                       checkpoint
+	 * @param executor to give to the ZooKeeperStateHandleStore to run ZooKeeper callbacks
 	 * @throws Exception
 	 */
 	public ZooKeeperCompletedCheckpointStore(
 			int maxNumberOfCheckpointsToRetain,
 			CuratorFramework client,
 			String checkpointsPath,
-			RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage) throws Exception {
+			RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage,
+			Executor executor) throws Exception {
 
 		checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint.");
 		checkNotNull(stateStorage, "State storage");
@@ -115,7 +118,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 		// All operations will have the path as root
 		this.client = client.usingNamespace(client.getNamespace() + checkpointsPath);
 
-		this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage);
+		this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage, executor);
 
 		this.checkpointStateHandles = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index ec05f1e..b241712 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -39,6 +39,7 @@ import java.util.ConcurrentModificationException;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -93,12 +94,14 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 	 * @param client ZooKeeper client
 	 * @param currentJobsPath ZooKeeper path for current job graphs
 	 * @param stateStorage State storage used to persist the submitted jobs
+	 * @param executor to give to the ZooKeeperStateHandleStore to run ZooKeeper callbacks
 	 * @throws Exception
 	 */
 	public ZooKeeperSubmittedJobGraphStore(
 			CuratorFramework client,
 			String currentJobsPath,
-			RetrievableStateStorageHelper<SubmittedJobGraph> stateStorage) throws Exception {
+			RetrievableStateStorageHelper<SubmittedJobGraph> stateStorage,
+			Executor executor) throws Exception {
 
 		checkNotNull(currentJobsPath, "Current jobs path");
 		checkNotNull(stateStorage, "State storage");
@@ -114,7 +117,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 		// All operations will have the path as root
 		CuratorFramework facade = client.usingNamespace(client.getNamespace() + currentJobsPath);
 
-		this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage);
+		this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage, executor);
 
 		this.pathCache = new PathChildrenCache(facade, "/", false);
 		pathCache.getListenable().addListener(new SubmittedJobGraphsPathCacheListener());

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index a9887a6..70ac6c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -216,12 +217,14 @@ public class ZooKeeperUtils {
 	 *
 	 * @param client        The {@link CuratorFramework} ZooKeeper client to use
 	 * @param configuration {@link Configuration} object
+	 * @param executor to run ZooKeeper callbacks
 	 * @return {@link ZooKeeperSubmittedJobGraphStore} instance
 	 * @throws Exception if the submitted job graph store cannot be created
 	 */
 	public static ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphs(
 			CuratorFramework client,
-			Configuration configuration) throws Exception {
+			Configuration configuration,
+			Executor executor) throws Exception {
 
 		checkNotNull(configuration, "Configuration");
 
@@ -235,7 +238,7 @@ public class ZooKeeperUtils {
 				ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH);
 
 		return new ZooKeeperSubmittedJobGraphStore(
-				client, zooKeeperSubmittedJobsPath, stateStorage);
+				client, zooKeeperSubmittedJobsPath, stateStorage, executor);
 	}
 
 	/**
@@ -245,6 +248,7 @@ public class ZooKeeperUtils {
 	 * @param configuration                  {@link Configuration} object
 	 * @param jobId                          ID of job to create the instance for
 	 * @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain
+	 * @param executor to run ZooKeeper callbacks
 	 * @return {@link ZooKeeperCompletedCheckpointStore} instance
 	 * @throws Exception if the completed checkpoint store cannot be created
 	 */
@@ -252,7 +256,8 @@ public class ZooKeeperUtils {
 			CuratorFramework client,
 			Configuration configuration,
 			JobID jobId,
-			int maxNumberOfCheckpointsToRetain) throws Exception {
+			int maxNumberOfCheckpointsToRetain,
+			Executor executor) throws Exception {
 
 		checkNotNull(configuration, "Configuration");
 
@@ -269,10 +274,11 @@ public class ZooKeeperUtils {
 		checkpointsPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
 
 		return new ZooKeeperCompletedCheckpointStore(
-				maxNumberOfCheckpointsToRetain,
-				client,
-				checkpointsPath,
-				stateStorage);
+			maxNumberOfCheckpointsToRetain,
+			client,
+			checkpointsPath,
+			stateStorage,
+			executor);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/RetrievableStateStorageHelper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/RetrievableStateStorageHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/RetrievableStateStorageHelper.java
index 1434f74..f6acea3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/RetrievableStateStorageHelper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/RetrievableStateStorageHelper.java
@@ -23,7 +23,7 @@ import org.apache.flink.runtime.state.RetrievableStateHandle;
 import java.io.Serializable;
 
 /**
- * State storage helper which is used by {@link ZooKeeperStateHandleStore} to persiste state before
+ * State storage helper which is used by {@link ZooKeeperStateHandleStore} to persist state before
  * the state handle is written to ZooKeeper.
  *
  * @param <T> The type of the data that can be stored by this storage helper.

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index 5623715..14d9f6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -30,10 +30,10 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -75,6 +75,8 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 
 	private final RetrievableStateStorageHelper<T> storage;
 
+	private final Executor executor;
+
 	/**
 	 * Creates a {@link ZooKeeperStateHandleStore}.
 	 *
@@ -82,13 +84,18 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	 *                            expected that the client's namespace ensures that the root
 	 *                            path is exclusive for all state handles managed by this
 	 *                            instance, e.g. <code>client.usingNamespace("/stateHandles")</code>
+	 * @param storage to persist the actual state and whose returned state handle is then written
+	 *                to ZooKeeper
+	 * @param executor to run the ZooKeeper callbacks
 	 */
 	public ZooKeeperStateHandleStore(
 		CuratorFramework client,
-		RetrievableStateStorageHelper<T> storage) throws IOException {
+		RetrievableStateStorageHelper<T> storage,
+		Executor executor) {
 
 		this.client = checkNotNull(client, "Curator client");
 		this.storage = checkNotNull(storage, "State storage");
+		this.executor = checkNotNull(executor);
 	}
 
 	/**
@@ -348,7 +355,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 		checkNotNull(callback, "Background callback");
 
-		client.delete().deletingChildrenIfNeeded().inBackground(callback).forPath(pathInZooKeeper);
+		client.delete().deletingChildrenIfNeeded().inBackground(callback, executor).forPath(pathInZooKeeper);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index df80d72..08ed0a4 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1970,24 +1970,30 @@ object JobManager {
 
     val numberProcessors = Hardware.getNumberCPUCores()
 
-    val executor = Executors.newFixedThreadPool(
+    val futureExecutor = Executors.newFixedThreadPool(
       numberProcessors,
       new NamedThreadFactory("jobmanager-future-", "-thread-"))
 
+    val ioExecutor = Executors.newFixedThreadPool(
+      numberProcessors,
+      new NamedThreadFactory("jobmanager-io-", "-thread-")
+    )
+
     val (jobManagerSystem, _, _, webMonitorOption, _) = try {
       startActorSystemAndJobManagerActors(
         configuration,
         executionMode,
         listeningAddress,
         listeningPort,
-        executor,
+        futureExecutor,
+        ioExecutor,
         classOf[JobManager],
         classOf[MemoryArchivist],
         Option(classOf[StandaloneResourceManager])
       )
     } catch {
       case t: Throwable =>
-          executor.shutdownNow()
+          futureExecutor.shutdownNow()
 
         throw t
     }
@@ -2005,7 +2011,8 @@ object JobManager {
         }
     }
 
-    executor.shutdownNow()
+    futureExecutor.shutdownNow()
+    ioExecutor.shutdownNow()
   }
 
   /**
@@ -2113,7 +2120,8 @@ object JobManager {
     *                      additional TaskManager in the same process.
     * @param listeningAddress The hostname where the JobManager should listen for messages.
     * @param listeningPort The port where the JobManager should listen for messages
-    * @param executor to run the JobManager's futures
+    * @param futureExecutor to run the JobManager's futures
+    * @param ioExecutor to run blocking io operations
     * @param jobManagerClass The class of the JobManager to be started
     * @param archiveClass The class of the Archivist to be started
     * @param resourceManagerClass Optional class of resource manager if one should be started
@@ -2125,7 +2133,8 @@ object JobManager {
       executionMode: JobManagerMode,
       listeningAddress: String,
       listeningPort: Int,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist],
       resourceManagerClass: Option[Class[_ <: FlinkResourceManager[_]]])
@@ -2194,7 +2203,8 @@ object JobManager {
       val (jobManager, archive) = startJobManagerActors(
         configuration,
         jobManagerSystem,
-        executor,
+        futureExecutor,
+        ioExecutor,
         jobManagerClass,
         archiveClass)
 
@@ -2395,14 +2405,16 @@ object JobManager {
    *              delayBetweenRetries, timeout)
    *
    * @param configuration The configuration from which to parse the config values.
-   * @param executor to run JobManager's futures
+   * @param futureExecutor to run JobManager's futures
+   * @param ioExecutor to run blocking io operations
    * @param leaderElectionServiceOption LeaderElectionService which shall be returned if the option
    *                                    is defined
    * @return The members for a default JobManager.
    */
   def createJobManagerComponents(
       configuration: Configuration,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       leaderElectionServiceOption: Option[LeaderElectionService]) :
     (InstanceManager,
     FlinkScheduler,
@@ -2433,11 +2445,11 @@ object JobManager {
     var instanceManager: InstanceManager = null
     var scheduler: FlinkScheduler = null
     var libraryCacheManager: BlobLibraryCacheManager = null
-    
+
     try {
       blobServer = new BlobServer(configuration)
       instanceManager = new InstanceManager()
-      scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(executor))
+      scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(futureExecutor))
       libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval)
 
       instanceManager.addInstanceListener(scheduler)
@@ -2482,8 +2494,8 @@ object JobManager {
           }
 
           (leaderElectionService,
-            ZooKeeperUtils.createSubmittedJobGraphs(client, configuration),
-            new ZooKeeperCheckpointRecoveryFactory(client, configuration))
+            ZooKeeperUtils.createSubmittedJobGraphs(client, configuration, ioExecutor),
+            new ZooKeeperCheckpointRecoveryFactory(client, configuration, ioExecutor))
       }
 
     val jobRecoveryTimeoutStr = configuration.getValue(HighAvailabilityOptions.HA_JOB_DELAY)
@@ -2527,14 +2539,17 @@ object JobManager {
    *
    * @param configuration The configuration for the JobManager
    * @param actorSystem The actor system running the JobManager
+   * @param futureExecutor to run JobManager's futures
+   * @param ioExecutor to run blocking io operations
    * @param jobManagerClass The class of the JobManager to be started
    * @param archiveClass The class of the MemoryArchivist to be started
-    * @return A tuple of references (JobManager Ref, Archiver Ref)
+   * @return A tuple of references (JobManager Ref, Archiver Ref)
    */
   def startJobManagerActors(
       configuration: Configuration,
       actorSystem: ActorSystem,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist])
     : (ActorRef, ActorRef) = {
@@ -2542,7 +2557,8 @@ object JobManager {
     startJobManagerActors(
       configuration,
       actorSystem,
-      executor,
+      futureExecutor,
+      ioExecutor,
       Some(JOB_MANAGER_NAME),
       Some(ARCHIVE_NAME),
       jobManagerClass,
@@ -2555,7 +2571,8 @@ object JobManager {
    *
    * @param configuration The configuration for the JobManager
    * @param actorSystem The actor system running the JobManager
-   * @param executor to run JobManager's futures
+   * @param futureExecutor to run JobManager's futures
+   * @param ioExecutor to run blocking io operations
    * @param jobManagerActorName Optionally the name of the JobManager actor. If none is given,
    *                          the actor will have the name generated by the actor system.
    * @param archiveActorName Optionally the name of the archive actor. If none is given,
@@ -2567,7 +2584,8 @@ object JobManager {
   def startJobManagerActors(
       configuration: Configuration,
       actorSystem: ActorSystem,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       jobManagerActorName: Option[String],
       archiveActorName: Option[String],
       jobManagerClass: Class[_ <: JobManager],
@@ -2586,7 +2604,8 @@ object JobManager {
     jobRecoveryTimeout,
     metricsRegistry) = createJobManagerComponents(
       configuration,
-      executor,
+      futureExecutor,
+      ioExecutor,
       None)
 
     val archiveProps = getArchiveProps(archiveClass, archiveCount)
@@ -2600,7 +2619,7 @@ object JobManager {
     val jobManagerProps = getJobManagerProps(
       jobManagerClass,
       configuration,
-      executor,
+      futureExecutor,
       instanceManager,
       scheduler,
       libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index d9a208d..4367442 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -105,7 +105,11 @@ abstract class FlinkMiniCluster(
 
   private var isRunning = false
 
-  val executor = Executors.newFixedThreadPool(
+  val futureExecutor = Executors.newFixedThreadPool(
+    Hardware.getNumberCPUCores(),
+    new NamedThreadFactory("mini-cluster-future-", "-thread"))
+
+  val ioExecutor = Executors.newFixedThreadPool(
     Hardware.getNumberCPUCores(),
     new NamedThreadFactory("mini-cluster-future-", "-thread"))
 
@@ -405,7 +409,8 @@ abstract class FlinkMiniCluster(
     jobManagerLeaderRetrievalService.foreach(_.stop())
     isRunning = false
 
-    executor.shutdownNow
+    futureExecutor.shutdownNow()
+    ioExecutor.shutdownNow()
   }
 
   protected def shutdown(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 59dd399..b2aedf7 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -126,7 +126,8 @@ class LocalFlinkMiniCluster(
     jobRecoveryTimeout,
     metricsRegistry) = JobManager.createJobManagerComponents(
       config,
-      executor,
+      futureExecutor,
+      ioExecutor,
       createLeaderElectionService())
 
     val archive = system.actorOf(
@@ -139,7 +140,7 @@ class LocalFlinkMiniCluster(
       getJobManagerProps(
         jobManagerClass,
         config,
-        executor,
+        futureExecutor,
         instanceManager,
         scheduler,
         libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 2e44ecd..f46f7d2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
@@ -68,7 +69,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 			public RetrievableStateHandle<CompletedCheckpoint> store(CompletedCheckpoint state) throws Exception {
 				return new HeapRetrievableStateHandle<CompletedCheckpoint>(state);
 			}
-		});
+		}, Executors.directExecutor());
 	}
 
 	// ---------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index aeb1ae1..f941c24 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -400,6 +400,7 @@ public class JobManagerTest {
 					config,
 					system,
 					system.dispatcher(),
+					system.dispatcher(),
 					TestingJobManager.class,
 					MemoryArchivist.class)._1(),
 				leaderSessionId);
@@ -607,6 +608,7 @@ public class JobManagerTest {
 				config,
 				actorSystem,
 				actorSystem.dispatcher(),
+				actorSystem.dispatcher(),
 				Option.apply("jm"),
 				Option.apply("arch"),
 				TestingJobManager.class,
@@ -734,6 +736,7 @@ public class JobManagerTest {
 				config,
 				actorSystem,
 				actorSystem.dispatcher(),
+				actorSystem.dispatcher(),
 				Option.apply("jm"),
 				Option.apply("arch"),
 				TestingJobManager.class,
@@ -831,6 +834,7 @@ public class JobManagerTest {
 				new Configuration(),
 				actorSystem,
 				actorSystem.dispatcher(),
+				actorSystem.dispatcher(),
 				Option.apply("jm"),
 				Option.apply("arch"),
 				TestingJobManager.class,

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 4f26e68..1b8f0c3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -83,6 +83,7 @@ public class JobSubmitTest {
 			jmConfig,
 			jobManagerSystem,
 			jobManagerSystem.dispatcher(),
+			jobManagerSystem.dispatcher(),
 			JobManager.class,
 			MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index 7d21cfd..d156f02 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager;
 import akka.actor.ActorRef;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener;
@@ -89,7 +90,8 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
 			ZooKeeper.createClient(),
 			"/testPutAndRemoveJobGraph",
-			localStateStorage);
+			localStateStorage,
+			Executors.directExecutor());
 
 		try {
 			SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
@@ -141,7 +143,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 	@Test
 	public void testRecoverJobGraphs() throws Exception {
 		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-				ZooKeeper.createClient(), "/testRecoverJobGraphs", localStateStorage);
+				ZooKeeper.createClient(), "/testRecoverJobGraphs", localStateStorage, Executors.directExecutor());
 
 		try {
 			SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
@@ -191,10 +193,10 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 
 		try {
 			jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage);
+					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage, Executors.directExecutor());
 
 			otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
-					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage);
+					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage, Executors.directExecutor());
 
 
 			SubmittedJobGraph jobGraph = createSubmittedJobGraph(new JobID(), 0);
@@ -250,10 +252,10 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 	@Test(expected = IllegalStateException.class)
 	public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception {
 		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
+				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage, Executors.directExecutor());
 
 		ZooKeeperSubmittedJobGraphStore otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
-				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
+				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage, Executors.directExecutor());
 
 		jobGraphs.start(null);
 		otherJobGraphs.start(null);

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
index 3c8ea75..ddcb4e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
@@ -72,7 +72,12 @@ public class ClusterShutdownITCase extends TestLogger {
 
 			// start job manager which doesn't shutdown the actor system
 			ActorGateway jobManager =
-				TestingUtils.createJobManager(system, system.dispatcher(), config, "jobmanager1");
+				TestingUtils.createJobManager(
+					system,
+					system.dispatcher(),
+					system.dispatcher(),
+					config,
+					"jobmanager1");
 
 			// Tell the JobManager to inform us of shutdown actions
 			jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
@@ -114,7 +119,12 @@ public class ClusterShutdownITCase extends TestLogger {
 
 			// start job manager which doesn't shutdown the actor system
 			ActorGateway jobManager =
-				TestingUtils.createJobManager(system, system.dispatcher(), config, "jobmanager2");
+				TestingUtils.createJobManager(
+					system,
+					system.dispatcher(),
+					system.dispatcher(),
+					config,
+					"jobmanager2");
 
 			// Tell the JobManager to inform us of shutdown actions
 			jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
index 5a98c8d..e7828fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
@@ -75,7 +75,12 @@ public class ResourceManagerITCase extends TestLogger {
 		protected void run() {
 
 			ActorGateway jobManager =
-				TestingUtils.createJobManager(system, system.dispatcher(), config, "ReconciliationTest");
+				TestingUtils.createJobManager(
+					system,
+					system.dispatcher(),
+					system.dispatcher(),
+					config,
+					"ReconciliationTest");
 			ActorGateway me =
 				TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
 
@@ -129,7 +134,12 @@ public class ResourceManagerITCase extends TestLogger {
 		protected void run() {
 
 			ActorGateway jobManager =
-				TestingUtils.createJobManager(system, system.dispatcher(), config, "RegTest");
+				TestingUtils.createJobManager(
+					system,
+					system.dispatcher(),
+					system.dispatcher(),
+					config,
+					"RegTest");
 			ActorGateway me =
 				TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 83eaddb..83983d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -86,6 +86,7 @@ public class TaskManagerComponentsStartupShutdownTest {
 				config,
 				actorSystem,
 				actorSystem.dispatcher(),
+				actorSystem.dispatcher(),
 				JobManager.class,
 				MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
index 63c1b29..dead732 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -103,6 +103,7 @@ public abstract class TaskManagerProcessReapingTestBase {
 				new Configuration(),
 				jmActorSystem,
 				jmActorSystem.dispatcher(),
+				jmActorSystem.dispatcher(),
 				JobManager.class,
 				MemoryArchivist.class)._1;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index b21eba0..5753349 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -106,7 +106,11 @@ public class TaskManagerRegistrationTest extends TestLogger {
 
 			try {
 				// a simple JobManager
-				jobManager = createJobManager(actorSystem, actorSystem.dispatcher(), config);
+				jobManager = createJobManager(
+					actorSystem,
+					actorSystem.dispatcher(),
+					actorSystem.dispatcher(),
+					config);
 				startResourceManager(config, jobManager.actor());
 
 				// start two TaskManagers. it will automatically try to register
@@ -189,6 +193,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 				jobManager = createJobManager(
 					actorSystem,
 					actorSystem.dispatcher(),
+					actorSystem.dispatcher(),
 					new Configuration());
 
 				startResourceManager(config, jobManager.actor());
@@ -631,6 +636,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 			configuration,
 			actorSystem,
 			actorSystem.dispatcher(),
+			actorSystem.dispatcher(),
 			NONE_STRING,
 			NONE_STRING,
 			JobManager.class,

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
index 8f9c932..5db3557 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
@@ -22,6 +22,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
@@ -84,7 +85,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 	public void testAdd() throws Exception {
 		LongStateStorage longStateStorage = new LongStateStorage();
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<Long>(
-				ZooKeeper.getClient(), longStateStorage);
+				ZooKeeper.getClient(), longStateStorage, Executors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testAdd";
@@ -119,7 +120,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 	public void testAddWithCreateMode() throws Exception {
 		LongStateStorage longStateStorage = new LongStateStorage();
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<Long>(
-				ZooKeeper.getClient(), longStateStorage);
+				ZooKeeper.getClient(), longStateStorage, Executors.directExecutor());
 
 		// Config
 		Long state = 3457347234L;
@@ -181,7 +182,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
 
 		ZooKeeper.getClient().create().forPath("/testAddAlreadyExistingPath");
 
@@ -200,7 +201,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		when(client.create()).thenThrow(new RuntimeException("Expected test Exception."));
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				client, stateHandleProvider);
+				client, stateHandleProvider, Executors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testAddDiscardStateHandleAfterFailure";
@@ -230,7 +231,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testReplace";
@@ -269,7 +270,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		RetrievableStateStorageHelper<Long> stateStorage = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateStorage);
+				ZooKeeper.getClient(), stateStorage, Executors.directExecutor());
 
 		store.replace("/testReplaceNonExistingPath", 0, 1L);
 	}
@@ -286,7 +287,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		when(client.setData()).thenThrow(new RuntimeException("Expected test Exception."));
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				client, stateHandleProvider);
+				client, stateHandleProvider, Executors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testReplaceDiscardStateHandleAfterFailure";
@@ -328,7 +329,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testGetAndExists";
@@ -353,7 +354,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
 
 		store.get("/testGetNonExistingPath");
 	}
@@ -367,7 +368,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testGetAll";
@@ -398,7 +399,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testGetAllSortedByName";
@@ -428,7 +429,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testRemove";
@@ -452,7 +453,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testRemoveWithCallback";
@@ -491,7 +492,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testDiscard";
@@ -513,7 +514,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testDiscardAll";

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index 4485b65..cf00206 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -173,6 +173,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
       new Configuration(),
       _system,
       _system.dispatcher,
+      _system.dispatcher,
       None,
       None,
       classOf[JobManager],

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index b57a9dc..75891ed 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -303,18 +303,21 @@ object TestingUtils {
   /** Creates a testing JobManager using the default recovery mode (standalone)
     *
     * @param actorSystem The ActorSystem to use
-    * @param executor to run the JobManager's futures
+    * @param futureExecutor to run the JobManager's futures
+    * @param ioExecutor to run blocking io operations
     * @param configuration The Flink configuration
     * @return
     */
   def createJobManager(
       actorSystem: ActorSystem,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       configuration: Configuration)
     : ActorGateway = {
     createJobManager(
       actorSystem,
-      executor,
+      futureExecutor,
+      ioExecutor,
       configuration,
       classOf[TestingJobManager],
       ""
@@ -325,20 +328,23 @@ object TestingUtils {
     * Additional prefix can be supplied for the Actor system names
     *
     * @param actorSystem The ActorSystem to use
-    * @param executor to run the JobManager's futures
+    * @param futureExecutor to run the JobManager's futures
+    * @param ioExecutor to run blocking io operations
     * @param configuration The Flink configuration
     * @param prefix The prefix for the actor names
     * @return
     */
   def createJobManager(
       actorSystem: ActorSystem,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       configuration: Configuration,
       prefix: String)
     : ActorGateway = {
     createJobManager(
       actorSystem,
-      executor,
+      futureExecutor,
+      ioExecutor,
       configuration,
       classOf[TestingJobManager],
       prefix
@@ -349,19 +355,21 @@ object TestingUtils {
     * Creates a JobManager of the given class using the default recovery mode (standalone)
     *
     * @param actorSystem ActorSystem to use
-    * @param executor to run the JobManager's futures
+    * @param futureExecutor to run the JobManager's futures
+    * @param ioExecutor to run blocking io operations
     * @param configuration Configuration to use
     * @param jobManagerClass JobManager class to instantiate
     * @return
     */
   def createJobManager(
       actorSystem: ActorSystem,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       configuration: Configuration,
       jobManagerClass: Class[_ <: JobManager])
     : ActorGateway = {
 
-    createJobManager(actorSystem, executor, configuration, jobManagerClass, "")
+    createJobManager(actorSystem, futureExecutor, ioExecutor, configuration, jobManagerClass, "")
   }
 
   /**
@@ -369,7 +377,8 @@ object TestingUtils {
     * Additional prefix for the Actor names can be added.
     *
     * @param actorSystem ActorSystem to use
-    * @param executor to run the JobManager's futures
+    * @param futureExecutor to run the JobManager's futures
+    * @param ioExecutor to run blocking io operations
     * @param configuration Configuration to use
     * @param jobManagerClass JobManager class to instantiate
     * @param prefix The prefix to use for the Actor names
@@ -377,7 +386,8 @@ object TestingUtils {
     */
   def createJobManager(
       actorSystem: ActorSystem,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       configuration: Configuration,
       jobManagerClass: Class[_ <: JobManager],
       prefix: String)
@@ -390,7 +400,8 @@ object TestingUtils {
       val (actor, _) = JobManager.startJobManagerActors(
         configuration,
         actorSystem,
-        executor,
+        futureExecutor,
+        ioExecutor,
         Some(prefix + JobManager.JOB_MANAGER_NAME),
         Some(prefix + JobManager.ARCHIVE_NAME),
         jobManagerClass,

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index af86983..0ff2e78 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -130,6 +130,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 				jmConfig,
 				jmActorSystem,
 				jmActorSystem.dispatcher(),
+				jmActorSystem.dispatcher(),
 				JobManager.class,
 				MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index f72ef34..8243e97 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -105,6 +105,7 @@ public class ProcessFailureCancelingITCase {
 				jmConfig,
 				jmActorSystem,
 				jmActorSystem.dispatcher(),
+				jmActorSystem.dispatcher(),
 				JobManager.class,
 				MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 002e162..da5959b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -215,10 +215,14 @@ public class YarnApplicationMasterRunner {
 
 		int numberProcessors = Hardware.getNumberCPUCores();
 
-		final ExecutorService executor = Executors.newFixedThreadPool(
+		final ExecutorService futureExecutor = Executors.newFixedThreadPool(
 			numberProcessors,
 			new NamedThreadFactory("yarn-jobmanager-future-", "-thread-"));
 
+		final ExecutorService ioExecutor = Executors.newFixedThreadPool(
+			numberProcessors,
+			new NamedThreadFactory("yarn-jobmanager-io-", "-thread-"));
+
 		try {
 			// ------- (1) load and parse / validate all configurations -------
 
@@ -333,7 +337,8 @@ public class YarnApplicationMasterRunner {
 			ActorRef jobManager = JobManager.startJobManagerActors(
 				config,
 				actorSystem,
-				executor,
+				futureExecutor,
+				ioExecutor,
 				new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
 				scala.Option.<String>empty(),
 				getJobManagerClass(),
@@ -427,7 +432,8 @@ public class YarnApplicationMasterRunner {
 			}
 		}
 
-		executor.shutdownNow();
+		futureExecutor.shutdownNow();
+		ioExecutor.shutdownNow();
 
 		return 0;
 	}


[2/4] flink git commit: [FLINK-5082] Pull ExecutorService lifecycle management out of the JobManager

Posted by tr...@apache.org.
[FLINK-5082] Pull ExecutorService lifecycle management out of the JobManager

The provided ExecutorService will no longer be closed by the JobManager. Instead the
lifecycle is managed outside of it where it was created. This will give a nicer behaviour,
because it better seperates responsibilities.

This closes #2820.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ae4b274a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ae4b274a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ae4b274a

Branch: refs/heads/master
Commit: ae4b274a9919d01a236df4e819a0a07c5d8543ac
Parents: 698e53e
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Nov 16 18:33:54 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 22 23:00:16 2016 +0100

----------------------------------------------------------------------
 .../MesosApplicationMasterRunner.java           | 18 ++++-
 .../clusterframework/MesosJobManager.scala      |  8 +--
 .../BackPressureStatsTrackerITCase.java         |  2 +-
 .../StackTraceSampleCoordinatorITCase.java      |  2 +-
 .../webmonitor/WebRuntimeMonitorITCase.java     |  1 +
 .../flink/runtime/util/NamedThreadFactory.java  | 58 ++++++++++++++++
 .../ContaineredJobManager.scala                 |  8 +--
 .../flink/runtime/jobmanager/JobManager.scala   | 73 ++++++++++++--------
 .../runtime/minicluster/FlinkMiniCluster.scala  |  9 ++-
 .../minicluster/LocalFlinkMiniCluster.scala     | 10 +--
 .../runtime/jobmanager/JobManagerTest.java      | 48 +++++++------
 .../flink/runtime/jobmanager/JobSubmitTest.java |  9 +--
 .../resourcemanager/ClusterShutdownITCase.java  |  4 +-
 .../resourcemanager/ResourceManagerITCase.java  |  4 +-
 ...askManagerComponentsStartupShutdownTest.java |  1 +
 .../TaskManagerProcessReapingTestBase.java      |  1 +
 .../TaskManagerRegistrationTest.java            |  8 ++-
 .../jobmanager/JobManagerRegistrationTest.scala |  1 +
 .../testingUtils/TestingJobManager.scala        |  6 +-
 .../runtime/testingUtils/TestingUtils.scala     | 60 ++++------------
 ...ctTaskManagerProcessFailureRecoveryTest.java |  1 +
 .../recovery/ProcessFailureCancelingITCase.java |  1 +
 .../flink/yarn/TestingYarnJobManager.scala      |  9 ++-
 .../flink/yarn/YarnApplicationMasterRunner.java | 19 ++++-
 .../org/apache/flink/yarn/YarnJobManager.scala  |  8 +--
 25 files changed, 229 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 5ec39c2..09f87bd 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -45,8 +45,10 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.NamedThreadFactory;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 
@@ -66,6 +68,8 @@ import java.net.URL;
 import java.security.PrivilegedAction;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.mesos.Utils.uri;
@@ -75,7 +79,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * This class is the executable entry point for the Mesos Application Master.
- * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager}
+ * It starts actor system and the actors for {@link JobManager}
  * and {@link MesosFlinkResourceManager}.
  *
  * The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container
@@ -168,6 +172,12 @@ public class MesosApplicationMasterRunner {
 		WebMonitor webMonitor = null;
 		MesosArtifactServer artifactServer = null;
 
+		int numberProcessors = Hardware.getNumberCPUCores();
+
+		final ExecutorService executor = Executors.newFixedThreadPool(
+			numberProcessors,
+			new NamedThreadFactory("mesos-jobmanager-future-", "-thread-"));
+
 		try {
 			// ------- (1) load and parse / validate all configurations -------
 
@@ -281,7 +291,9 @@ public class MesosApplicationMasterRunner {
 
 			// we start the JobManager with its standard name
 			ActorRef jobManager = JobManager.startJobManagerActors(
-				config, actorSystem,
+				config,
+				actorSystem,
+				executor,
 				new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
 				scala.Option.<String>empty(),
 				getJobManagerClass(),
@@ -387,6 +399,8 @@ public class MesosApplicationMasterRunner {
 			LOG.error("Failed to stop the artifact server", t);
 		}
 
+		executor.shutdownNow();
+
 		return 0;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
index 113ab85..300539c 100644
--- a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
+++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.mesos.runtime.clusterframework
 
-import java.util.concurrent.ExecutorService
+import java.util.concurrent.Executor
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.{Configuration => FlinkConfiguration}
@@ -37,7 +37,7 @@ import scala.concurrent.duration._
 /** JobManager actor for execution on Mesos. .
   *
   * @param flinkConfiguration Configuration object for the actor
-  * @param executorService Execution context which is used to execute concurrent tasks in the
+  * @param executor Execution context which is used to execute concurrent tasks in the
   *                         [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
@@ -49,7 +49,7 @@ import scala.concurrent.duration._
   * @param leaderElectionService LeaderElectionService to participate in the leader election
   */
 class MesosJobManager(flinkConfiguration: FlinkConfiguration,
-                      executorService: ExecutorService,
+                      executor: Executor,
                       instanceManager: InstanceManager,
                       scheduler: FlinkScheduler,
                       libraryCacheManager: BlobLibraryCacheManager,
@@ -63,7 +63,7 @@ class MesosJobManager(flinkConfiguration: FlinkConfiguration,
                       metricsRegistry: Option[FlinkMetricRegistry])
   extends ContaineredJobManager(
     flinkConfiguration,
-    executorService,
+    executor,
     instanceManager,
     scheduler,
     libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
index 9d099e3..d01e0cf 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -120,7 +120,7 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
 			}
 
 			try {
-				jobManger = TestingUtils.createJobManager(testActorSystem, new Configuration());
+				jobManger = TestingUtils.createJobManager(testActorSystem, testActorSystem.dispatcher(), new Configuration());
 
 				final Configuration config = new Configuration();
 				config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index e012d0b..4b5bd2f 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -90,7 +90,7 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger {
 			ActorGateway taskManager = null;
 
 			try {
-				jobManger = TestingUtils.createJobManager(testActorSystem, new Configuration());
+				jobManger = TestingUtils.createJobManager(testActorSystem, testActorSystem.dispatcher(), new Configuration());
 
 				final Configuration config = new Configuration();
 				config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 1ae776c..fcdf94d 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -179,6 +179,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 				jobManager[i] = JobManager.startJobManagerActors(
 					jmConfig,
 					jobManagerSystem[i],
+					jobManagerSystem[i].dispatcher(),
 					JobManager.class,
 					MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
new file mode 100644
index 0000000..bd97963
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Thread factory which allows to specify a thread pool name and a thread name.
+ *
+ * The code is based on {@link java.util.concurrent.Executors.DefaultThreadFactory}.
+ */
+public class NamedThreadFactory implements ThreadFactory {
+	private static final AtomicInteger poolNumber = new AtomicInteger(1);
+	private final ThreadGroup group;
+	private final AtomicInteger threadNumber = new AtomicInteger(1);
+	private final String namePrefix;
+
+	public NamedThreadFactory(final String poolName, final String threadName) {
+		SecurityManager securityManager = System.getSecurityManager();
+		group = (securityManager != null) ? securityManager.getThreadGroup() :
+			Thread.currentThread().getThreadGroup();
+
+		namePrefix = poolName +
+			poolNumber.getAndIncrement() +
+			threadName;
+	}
+
+	@Override
+	public Thread newThread(Runnable runnable) {
+		Thread t = new Thread(group, runnable,
+			namePrefix + threadNumber.getAndIncrement(),
+			0);
+		if (t.isDaemon()) {
+			t.setDaemon(false);
+		}
+		if (t.getPriority() != Thread.NORM_PRIORITY) {
+			t.setPriority(Thread.NORM_PRIORITY);
+		}
+		return t;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
index 72df671..0f31eba 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.clusterframework
 
-import java.util.concurrent.ExecutorService
+import java.util.concurrent.Executor
 
 import akka.actor.ActorRef
 import org.apache.flink.api.common.JobID
@@ -45,7 +45,7 @@ import scala.language.postfixOps
   * to start/administer/stop the session.
   *
   * @param flinkConfiguration Configuration object for the actor
-  * @param executorService Execution context which is used to execute concurrent tasks in the
+  * @param executor Execution context which is used to execute concurrent tasks in the
   *                         [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
@@ -58,7 +58,7 @@ import scala.language.postfixOps
   */
 abstract class ContaineredJobManager(
     flinkConfiguration: Configuration,
-    executorService: ExecutorService,
+    executor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -72,7 +72,7 @@ abstract class ContaineredJobManager(
     metricsRegistry: Option[FlinkMetricRegistry])
   extends JobManager(
     flinkConfiguration,
-    executorService,
+    executor,
     instanceManager,
     scheduler,
     libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index b2e1002..df80d72 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager
 import java.io.{File, IOException}
 import java.net._
 import java.util.UUID
-import java.util.concurrent.{ExecutorService, ForkJoinPool, TimeUnit, TimeoutException}
+import java.util.concurrent.{TimeUnit, Future => _, TimeoutException => _, _}
 
 import akka.actor.Status.{Failure, Success}
 import akka.actor._
@@ -118,7 +118,7 @@ import scala.language.postfixOps
  */
 class JobManager(
     protected val flinkConfiguration: Configuration,
-    protected val executorService: ExecutorService,
+    protected val executor: Executor,
     protected val instanceManager: InstanceManager,
     protected val scheduler: FlinkScheduler,
     protected val libraryCacheManager: BlobLibraryCacheManager,
@@ -272,9 +272,6 @@ class JobManager(
       case e: IOException => log.error("Could not properly shutdown the library cache manager.", e)
     }
 
-    // shut down the extra thread pool for futures
-    executorService.shutdown()
-
     // failsafe shutdown of the metrics registry
     try {
       metricsRegistry.foreach(_.shutdown())
@@ -1250,7 +1247,7 @@ class JobManager(
           executionGraph,
           jobGraph,
           flinkConfiguration,
-          executorService,
+          executor,
           userCodeLoader,
           checkpointRecoveryFactory,
           Time.of(timeout.length, timeout.unit),
@@ -1971,15 +1968,29 @@ object JobManager {
       listeningPort: Int)
     : Unit = {
 
-    val (jobManagerSystem, _, _, webMonitorOption, _) = startActorSystemAndJobManagerActors(
-      configuration,
-      executionMode,
-      listeningAddress,
-      listeningPort,
-      classOf[JobManager],
-      classOf[MemoryArchivist],
-      Option(classOf[StandaloneResourceManager])
-    )
+    val numberProcessors = Hardware.getNumberCPUCores()
+
+    val executor = Executors.newFixedThreadPool(
+      numberProcessors,
+      new NamedThreadFactory("jobmanager-future-", "-thread-"))
+
+    val (jobManagerSystem, _, _, webMonitorOption, _) = try {
+      startActorSystemAndJobManagerActors(
+        configuration,
+        executionMode,
+        listeningAddress,
+        listeningPort,
+        executor,
+        classOf[JobManager],
+        classOf[MemoryArchivist],
+        Option(classOf[StandaloneResourceManager])
+      )
+    } catch {
+      case t: Throwable =>
+          executor.shutdownNow()
+
+        throw t
+    }
 
     // block until everything is shut down
     jobManagerSystem.awaitTermination()
@@ -1993,6 +2004,8 @@ object JobManager {
             LOG.warn("Could not properly stop the web monitor.", t)
         }
     }
+
+    executor.shutdownNow()
   }
 
   /**
@@ -2100,6 +2113,7 @@ object JobManager {
     *                      additional TaskManager in the same process.
     * @param listeningAddress The hostname where the JobManager should listen for messages.
     * @param listeningPort The port where the JobManager should listen for messages
+    * @param executor to run the JobManager's futures
     * @param jobManagerClass The class of the JobManager to be started
     * @param archiveClass The class of the Archivist to be started
     * @param resourceManagerClass Optional class of resource manager if one should be started
@@ -2111,6 +2125,7 @@ object JobManager {
       executionMode: JobManagerMode,
       listeningAddress: String,
       listeningPort: Int,
+      executor: Executor,
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist],
       resourceManagerClass: Option[Class[_ <: FlinkResourceManager[_]]])
@@ -2179,6 +2194,7 @@ object JobManager {
       val (jobManager, archive) = startJobManagerActors(
         configuration,
         jobManagerSystem,
+        executor,
         jobManagerClass,
         archiveClass)
 
@@ -2379,15 +2395,16 @@ object JobManager {
    *              delayBetweenRetries, timeout)
    *
    * @param configuration The configuration from which to parse the config values.
+   * @param executor to run JobManager's futures
    * @param leaderElectionServiceOption LeaderElectionService which shall be returned if the option
    *                                    is defined
    * @return The members for a default JobManager.
    */
   def createJobManagerComponents(
       configuration: Configuration,
+      executor: Executor,
       leaderElectionServiceOption: Option[LeaderElectionService]) :
-    (ExecutorService,
-    InstanceManager,
+    (InstanceManager,
     FlinkScheduler,
     BlobLibraryCacheManager,
     RestartStrategyFactory,
@@ -2416,13 +2433,11 @@ object JobManager {
     var instanceManager: InstanceManager = null
     var scheduler: FlinkScheduler = null
     var libraryCacheManager: BlobLibraryCacheManager = null
-
-    val executorService: ExecutorService = new ForkJoinPool()
     
     try {
       blobServer = new BlobServer(configuration)
       instanceManager = new InstanceManager()
-      scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(executorService))
+      scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(executor))
       libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval)
 
       instanceManager.addInstanceListener(scheduler)
@@ -2441,7 +2456,6 @@ object JobManager {
         if (blobServer != null) {
           blobServer.shutdown()
         }
-        executorService.shutdownNow()
         
         throw t
     }
@@ -2494,8 +2508,7 @@ object JobManager {
         None
     }
 
-    (executorService,
-      instanceManager,
+    (instanceManager,
       scheduler,
       libraryCacheManager,
       restartStrategy,
@@ -2521,6 +2534,7 @@ object JobManager {
   def startJobManagerActors(
       configuration: Configuration,
       actorSystem: ActorSystem,
+      executor: Executor,
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist])
     : (ActorRef, ActorRef) = {
@@ -2528,6 +2542,7 @@ object JobManager {
     startJobManagerActors(
       configuration,
       actorSystem,
+      executor,
       Some(JOB_MANAGER_NAME),
       Some(ARCHIVE_NAME),
       jobManagerClass,
@@ -2540,6 +2555,7 @@ object JobManager {
    *
    * @param configuration The configuration for the JobManager
    * @param actorSystem The actor system running the JobManager
+   * @param executor to run JobManager's futures
    * @param jobManagerActorName Optionally the name of the JobManager actor. If none is given,
    *                          the actor will have the name generated by the actor system.
    * @param archiveActorName Optionally the name of the archive actor. If none is given,
@@ -2551,14 +2567,14 @@ object JobManager {
   def startJobManagerActors(
       configuration: Configuration,
       actorSystem: ActorSystem,
+      executor: Executor,
       jobManagerActorName: Option[String],
       archiveActorName: Option[String],
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist])
     : (ActorRef, ActorRef) = {
 
-    val (executorService: ExecutorService,
-    instanceManager,
+    val (instanceManager,
     scheduler,
     libraryCacheManager,
     restartStrategy,
@@ -2570,6 +2586,7 @@ object JobManager {
     jobRecoveryTimeout,
     metricsRegistry) = createJobManagerComponents(
       configuration,
+      executor,
       None)
 
     val archiveProps = getArchiveProps(archiveClass, archiveCount)
@@ -2583,7 +2600,7 @@ object JobManager {
     val jobManagerProps = getJobManagerProps(
       jobManagerClass,
       configuration,
-      executorService,
+      executor,
       instanceManager,
       scheduler,
       libraryCacheManager,
@@ -2617,7 +2634,7 @@ object JobManager {
   def getJobManagerProps(
     jobManagerClass: Class[_ <: JobManager],
     configuration: Configuration,
-    executorService: ExecutorService,
+    executor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -2633,7 +2650,7 @@ object JobManager {
     Props(
       jobManagerClass,
       configuration,
-      executorService,
+      executor,
       instanceManager,
       scheduler,
       libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 048b013..d9a208d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.minicluster
 
 import java.net.InetAddress
 import java.util.UUID
+import java.util.concurrent.{Executors, ForkJoinPool}
 
 import akka.pattern.Patterns.gracefulStop
 import akka.pattern.ask
@@ -34,7 +35,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService, StandaloneLeaderRetrievalService}
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
-import org.apache.flink.runtime.util.ZooKeeperUtils
+import org.apache.flink.runtime.util.{Hardware, NamedThreadFactory, ZooKeeperUtils}
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.slf4j.LoggerFactory
 
@@ -104,6 +105,10 @@ abstract class FlinkMiniCluster(
 
   private var isRunning = false
 
+  val executor = Executors.newFixedThreadPool(
+    Hardware.getNumberCPUCores(),
+    new NamedThreadFactory("mini-cluster-future-", "-thread"))
+
   def configuration: Configuration = {
     if (originalConfiguration.getInteger(
       ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
@@ -399,6 +404,8 @@ abstract class FlinkMiniCluster(
 
     jobManagerLeaderRetrievalService.foreach(_.stop())
     isRunning = false
+
+    executor.shutdownNow
   }
 
   protected def shutdown(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 43ccda9..59dd399 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -114,8 +114,7 @@ class LocalFlinkMiniCluster(
       config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
     }
 
-    val (executorService,
-    instanceManager,
+    val (instanceManager,
     scheduler,
     libraryCacheManager,
     restartStrategyFactory,
@@ -125,7 +124,10 @@ class LocalFlinkMiniCluster(
     submittedJobGraphStore,
     checkpointRecoveryFactory,
     jobRecoveryTimeout,
-    metricsRegistry) = JobManager.createJobManagerComponents(config, createLeaderElectionService())
+    metricsRegistry) = JobManager.createJobManagerComponents(
+      config,
+      executor,
+      createLeaderElectionService())
 
     val archive = system.actorOf(
       getArchiveProps(
@@ -137,7 +139,7 @@ class LocalFlinkMiniCluster(
       getJobManagerProps(
         jobManagerClass,
         config,
-        executorService,
+        executor,
         instanceManager,
         scheduler,
         libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index ff604f1..aeb1ae1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -397,10 +397,11 @@ public class JobManagerTest {
 		UUID leaderSessionId = null;
 		ActorGateway jobManager = new AkkaActorGateway(
 				JobManager.startJobManagerActors(
-						config,
-						system,
-						TestingJobManager.class,
-						MemoryArchivist.class)._1(),
+					config,
+					system,
+					system.dispatcher(),
+					TestingJobManager.class,
+					MemoryArchivist.class)._1(),
 				leaderSessionId);
 
 		LeaderRetrievalService leaderRetrievalService = new StandaloneLeaderRetrievalService(
@@ -603,12 +604,13 @@ public class JobManagerTest {
 			actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
 
 			Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
-					config,
-					actorSystem,
-					Option.apply("jm"),
-					Option.apply("arch"),
-					TestingJobManager.class,
-					TestingMemoryArchivist.class);
+				config,
+				actorSystem,
+				actorSystem.dispatcher(),
+				Option.apply("jm"),
+				Option.apply("arch"),
+				TestingJobManager.class,
+				TestingMemoryArchivist.class);
 
 			jobManager = new AkkaActorGateway(master._1(), null);
 			archiver = new AkkaActorGateway(master._2(), null);
@@ -729,12 +731,13 @@ public class JobManagerTest {
 			actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
 
 			Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
-					config,
-					actorSystem,
-					Option.apply("jm"),
-					Option.apply("arch"),
-					TestingJobManager.class,
-					TestingMemoryArchivist.class);
+				config,
+				actorSystem,
+				actorSystem.dispatcher(),
+				Option.apply("jm"),
+				Option.apply("arch"),
+				TestingJobManager.class,
+				TestingMemoryArchivist.class);
 
 			jobManager = new AkkaActorGateway(master._1(), null);
 			archiver = new AkkaActorGateway(master._2(), null);
@@ -825,12 +828,13 @@ public class JobManagerTest {
 			actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
 
 			Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
-					new Configuration(),
-					actorSystem,
-					Option.apply("jm"),
-					Option.apply("arch"),
-					TestingJobManager.class,
-					TestingMemoryArchivist.class);
+				new Configuration(),
+				actorSystem,
+				actorSystem.dispatcher(),
+				Option.apply("jm"),
+				Option.apply("arch"),
+				TestingJobManager.class,
+				TestingMemoryArchivist.class);
 
 			jobManager = new AkkaActorGateway(master._1(), null);
 			archiver = new AkkaActorGateway(master._2(), null);

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 9aeea3d..4f26e68 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -80,10 +80,11 @@ public class JobSubmitTest {
 
 		// only start JobManager (no ResourceManager)
 		JobManager.startJobManagerActors(
-				jmConfig,
-				jobManagerSystem,
-				JobManager.class,
-				MemoryArchivist.class)._1();
+			jmConfig,
+			jobManagerSystem,
+			jobManagerSystem.dispatcher(),
+			JobManager.class,
+			MemoryArchivist.class)._1();
 
 		try {
 			LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(jmConfig);

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
index e0763f3..3c8ea75 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
@@ -72,7 +72,7 @@ public class ClusterShutdownITCase extends TestLogger {
 
 			// start job manager which doesn't shutdown the actor system
 			ActorGateway jobManager =
-				TestingUtils.createJobManager(system, config, "jobmanager1");
+				TestingUtils.createJobManager(system, system.dispatcher(), config, "jobmanager1");
 
 			// Tell the JobManager to inform us of shutdown actions
 			jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
@@ -114,7 +114,7 @@ public class ClusterShutdownITCase extends TestLogger {
 
 			// start job manager which doesn't shutdown the actor system
 			ActorGateway jobManager =
-				TestingUtils.createJobManager(system, config, "jobmanager2");
+				TestingUtils.createJobManager(system, system.dispatcher(), config, "jobmanager2");
 
 			// Tell the JobManager to inform us of shutdown actions
 			jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
index c83ce58..5a98c8d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
@@ -75,7 +75,7 @@ public class ResourceManagerITCase extends TestLogger {
 		protected void run() {
 
 			ActorGateway jobManager =
-				TestingUtils.createJobManager(system, config, "ReconciliationTest");
+				TestingUtils.createJobManager(system, system.dispatcher(), config, "ReconciliationTest");
 			ActorGateway me =
 				TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
 
@@ -129,7 +129,7 @@ public class ResourceManagerITCase extends TestLogger {
 		protected void run() {
 
 			ActorGateway jobManager =
-				TestingUtils.createJobManager(system, config, "RegTest");
+				TestingUtils.createJobManager(system, system.dispatcher(), config, "RegTest");
 			ActorGateway me =
 				TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index f9434e2..83eaddb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -85,6 +85,7 @@ public class TaskManagerComponentsStartupShutdownTest {
 			final ActorRef jobManager = JobManager.startJobManagerActors(
 				config,
 				actorSystem,
+				actorSystem.dispatcher(),
 				JobManager.class,
 				MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
index c7913f7..63c1b29 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -102,6 +102,7 @@ public abstract class TaskManagerProcessReapingTestBase {
 			ActorRef jmActor = JobManager.startJobManagerActors(
 				new Configuration(),
 				jmActorSystem,
+				jmActorSystem.dispatcher(),
 				JobManager.class,
 				MemoryArchivist.class)._1;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 53fa7c1..b21eba0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -106,7 +106,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 
 			try {
 				// a simple JobManager
-				jobManager = createJobManager(actorSystem, config);
+				jobManager = createJobManager(actorSystem, actorSystem.dispatcher(), config);
 				startResourceManager(config, jobManager.actor());
 
 				// start two TaskManagers. it will automatically try to register
@@ -187,8 +187,9 @@ public class TaskManagerRegistrationTest extends TestLogger {
 
 				// now start the JobManager, with the regular akka URL
 				jobManager = createJobManager(
-						actorSystem,
-						new Configuration());
+					actorSystem,
+					actorSystem.dispatcher(),
+					new Configuration());
 
 				startResourceManager(config, jobManager.actor());
 
@@ -629,6 +630,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 		return JobManager.startJobManagerActors(
 			configuration,
 			actorSystem,
+			actorSystem.dispatcher(),
 			NONE_STRING,
 			NONE_STRING,
 			JobManager.class,

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index f9c9b63..4485b65 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -172,6 +172,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
     val (jm: ActorRef, _) = JobManager.startJobManagerActors(
       new Configuration(),
       _system,
+      _system.dispatcher,
       None,
       None,
       classOf[JobManager],

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index e9bdb99..c6fd923 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.testingUtils
 
-import java.util.concurrent.ExecutorService
+import java.util.concurrent.{Executor, ExecutorService}
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
@@ -39,7 +39,7 @@ import scala.language.postfixOps
   */
 class TestingJobManager(
     flinkConfiguration: Configuration,
-    executorService: ExecutorService,
+    executor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -53,7 +53,7 @@ class TestingJobManager(
     metricRegistry : Option[MetricRegistry])
   extends JobManager(
     flinkConfiguration,
-    executorService,
+    executor,
     instanceManager,
     scheduler,
     libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 73fb928..b57a9dc 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -303,15 +303,18 @@ object TestingUtils {
   /** Creates a testing JobManager using the default recovery mode (standalone)
     *
     * @param actorSystem The ActorSystem to use
+    * @param executor to run the JobManager's futures
     * @param configuration The Flink configuration
     * @return
     */
   def createJobManager(
       actorSystem: ActorSystem,
+      executor: Executor,
       configuration: Configuration)
     : ActorGateway = {
     createJobManager(
       actorSystem,
+      executor,
       configuration,
       classOf[TestingJobManager],
       ""
@@ -322,85 +325,43 @@ object TestingUtils {
     * Additional prefix can be supplied for the Actor system names
     *
     * @param actorSystem The ActorSystem to use
+    * @param executor to run the JobManager's futures
     * @param configuration The Flink configuration
     * @param prefix The prefix for the actor names
     * @return
     */
   def createJobManager(
       actorSystem: ActorSystem,
+      executor: Executor,
       configuration: Configuration,
       prefix: String)
     : ActorGateway = {
     createJobManager(
       actorSystem,
+      executor,
       configuration,
       classOf[TestingJobManager],
       prefix
     )
   }
 
-  def createJobManager(
-      actorSystem: ActorSystem,
-      configuration: Configuration,
-      executionContext: ExecutionContext)
-    : ActorGateway = {
-
-    val (_,
-    instanceManager,
-    scheduler,
-    libraryCacheManager,
-    restartStrategy,
-    timeout,
-    archiveCount,
-    leaderElectionService,
-    submittedJobGraphs,
-    checkpointRecoveryFactory,
-    jobRecoveryTimeout,
-    metricsRegistry) = JobManager.createJobManagerComponents(
-      configuration,
-      None
-    )
-
-    val archiveProps = Props(classOf[TestingMemoryArchivist], archiveCount)
-
-    val archive: ActorRef = actorSystem.actorOf(archiveProps, JobManager.ARCHIVE_NAME)
-
-    val jobManagerProps = Props(
-      classOf[TestingJobManager],
-      configuration,
-      executionContext,
-      instanceManager,
-      scheduler,
-      libraryCacheManager,
-      archive,
-      restartStrategy,
-      timeout,
-      leaderElectionService,
-      submittedJobGraphs,
-      checkpointRecoveryFactory,
-      jobRecoveryTimeout,
-      metricsRegistry)
-
-    val jobManager: ActorRef = actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
-
-    new AkkaActorGateway(jobManager, null)
-  }
-
   /**
     * Creates a JobManager of the given class using the default recovery mode (standalone)
     *
     * @param actorSystem ActorSystem to use
+    * @param executor to run the JobManager's futures
     * @param configuration Configuration to use
     * @param jobManagerClass JobManager class to instantiate
     * @return
     */
   def createJobManager(
       actorSystem: ActorSystem,
+      executor: Executor,
       configuration: Configuration,
       jobManagerClass: Class[_ <: JobManager])
     : ActorGateway = {
 
-    createJobManager(actorSystem, configuration, jobManagerClass, "")
+    createJobManager(actorSystem, executor, configuration, jobManagerClass, "")
   }
 
   /**
@@ -408,6 +369,7 @@ object TestingUtils {
     * Additional prefix for the Actor names can be added.
     *
     * @param actorSystem ActorSystem to use
+    * @param executor to run the JobManager's futures
     * @param configuration Configuration to use
     * @param jobManagerClass JobManager class to instantiate
     * @param prefix The prefix to use for the Actor names
@@ -415,6 +377,7 @@ object TestingUtils {
     */
   def createJobManager(
       actorSystem: ActorSystem,
+      executor: Executor,
       configuration: Configuration,
       jobManagerClass: Class[_ <: JobManager],
       prefix: String)
@@ -427,6 +390,7 @@ object TestingUtils {
       val (actor, _) = JobManager.startJobManagerActors(
         configuration,
         actorSystem,
+        executor,
         Some(prefix + JobManager.JOB_MANAGER_NAME),
         Some(prefix + JobManager.ARCHIVE_NAME),
         jobManagerClass,

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index b6eb7ba..af86983 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -129,6 +129,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 			ActorRef jmActor = JobManager.startJobManagerActors(
 				jmConfig,
 				jmActorSystem,
+				jmActorSystem.dispatcher(),
 				JobManager.class,
 				MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index b66fb5d..f72ef34 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -104,6 +104,7 @@ public class ProcessFailureCancelingITCase {
 			ActorRef jmActor = JobManager.startJobManagerActors(
 				jmConfig,
 				jmActorSystem,
+				jmActorSystem.dispatcher(),
 				JobManager.class,
 				MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index aef2604..aabc19d 100644
--- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -18,12 +18,11 @@
 
 package org.apache.flink.yarn
 
-import java.util.concurrent.ExecutorService
+import java.util.concurrent.Executor
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.instance.InstanceManager
@@ -41,7 +40,7 @@ import scala.concurrent.duration.FiniteDuration
   * instead of an anonymous class with the respective mixin to obtain a more readable logger name.
   *
   * @param flinkConfiguration Configuration object for the actor
-  * @param executorService Execution context which is used to execute concurrent tasks in the
+  * @param executor Execution context which is used to execute concurrent tasks in the
   *                         [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
@@ -54,7 +53,7 @@ import scala.concurrent.duration.FiniteDuration
   */
 class TestingYarnJobManager(
     flinkConfiguration: Configuration,
-    executorService: ExecutorService,
+    executor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -68,7 +67,7 @@ class TestingYarnJobManager(
     metricRegistry : Option[MetricRegistry])
   extends YarnJobManager(
     flinkConfiguration,
-    executorService,
+    executor,
     instanceManager,
     scheduler,
     libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 8e3418c..002e162 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -37,8 +37,10 @@ import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.NamedThreadFactory;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 
@@ -67,13 +69,15 @@ import java.util.Map;
 import java.util.HashMap;
 import java.util.UUID;
 import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
 
 /**
  * This class is the executable entry point for the YARN application master.
- * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager}
+ * It starts actor system and the actors for {@link JobManager}
  * and {@link YarnFlinkResourceManager}.
  * 
  * The JobManager handles Flink job execution, while the YarnFlinkResourceManager handles container
@@ -209,6 +213,12 @@ public class YarnApplicationMasterRunner {
 		ActorSystem actorSystem = null;
 		WebMonitor webMonitor = null;
 
+		int numberProcessors = Hardware.getNumberCPUCores();
+
+		final ExecutorService executor = Executors.newFixedThreadPool(
+			numberProcessors,
+			new NamedThreadFactory("yarn-jobmanager-future-", "-thread-"));
+
 		try {
 			// ------- (1) load and parse / validate all configurations -------
 
@@ -321,7 +331,9 @@ public class YarnApplicationMasterRunner {
 
 			// we start the JobManager with its standard name
 			ActorRef jobManager = JobManager.startJobManagerActors(
-				config, actorSystem,
+				config,
+				actorSystem,
+				executor,
 				new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
 				scala.Option.<String>empty(),
 				getJobManagerClass(),
@@ -414,6 +426,9 @@ public class YarnApplicationMasterRunner {
 				LOG.error("Failed to stop the web frontend", t);
 			}
 		}
+
+		executor.shutdownNow();
+
 		return 0;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 2df78c2..a81e6cf 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.yarn
 
-import java.util.concurrent.{ExecutorService, TimeUnit}
+import java.util.concurrent.{Executor, TimeUnit}
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.{ConfigConstants, Configuration => FlinkConfiguration}
@@ -40,7 +40,7 @@ import scala.language.postfixOps
   * to start/administer/stop the Yarn session.
   *
   * @param flinkConfiguration Configuration object for the actor
-  * @param executorService Execution context which is used to execute concurrent tasks in the
+  * @param executor Execution context which is used to execute concurrent tasks in the
   *                         [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
@@ -53,7 +53,7 @@ import scala.language.postfixOps
   */
 class YarnJobManager(
     flinkConfiguration: FlinkConfiguration,
-    executorService: ExecutorService,
+    executor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -67,7 +67,7 @@ class YarnJobManager(
     metricsRegistry: Option[MetricRegistry])
   extends ContaineredJobManager(
     flinkConfiguration,
-    executorService,
+    executor,
     instanceManager,
     scheduler,
     libraryCacheManager,


[4/4] flink git commit: [FLINK-5085] Execute CheckpointCoordinator's state discard calls asynchronously

Posted by tr...@apache.org.
[FLINK-5085] Execute CheckpointCoordinator's state discard calls asynchronously

The CheckpointCoordinator is now given an Executor which is used to execute the state discard
calls asynchronously. This will prevent blocking operations to be executed from within the
calling thread.

Shut down ExecutorServices gracefully

This closes #2825.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c590912c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c590912c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c590912c

Branch: refs/heads/master
Commit: c590912c93a4059b40452dfa6cffbdd4d58cac13
Parents: 3fb92d8
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Nov 17 15:39:11 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 22 23:00:17 2016 +0100

----------------------------------------------------------------------
 .../MesosApplicationMasterRunner.java           |  69 +-
 .../clusterframework/MesosJobManager.scala      |  34 +-
 .../webmonitor/BackPressureStatsTracker.java    |   2 +-
 .../BackPressureStatsTrackerTest.java           |   2 +-
 .../checkpoint/CheckpointCoordinator.java       | 104 ++-
 .../runtime/checkpoint/PendingCheckpoint.java   |  31 +-
 .../flink/runtime/concurrent/Executors.java     |  49 ++
 .../runtime/executiongraph/ExecutionGraph.java  |  51 +-
 .../executiongraph/ExecutionGraphBuilder.java   |  28 +-
 .../runtime/executiongraph/ExecutionVertex.java |   4 +-
 .../restart/FailureRateRestartStrategy.java     |   2 +-
 .../restart/FixedDelayRestartStrategy.java      |   2 +-
 .../ContaineredJobManager.scala                 |   9 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  30 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |  11 +-
 .../minicluster/LocalFlinkMiniCluster.scala     |  38 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 679 ++++++++++---------
 .../checkpoint/CheckpointStateRestoreTest.java  |  83 +--
 ...ExecutionGraphCheckpointCoordinatorTest.java |   1 +
 .../checkpoint/PendingCheckpointTest.java       |  11 +-
 .../executiongraph/AllVerticesIteratorTest.java |   2 +-
 .../ArchivedExecutionGraphTest.java             |   1 +
 .../ExecutionGraphConstructionTest.java         |  24 +-
 .../ExecutionGraphDeploymentTest.java           |   7 +-
 .../ExecutionGraphMetricsTest.java              |   1 +
 .../ExecutionGraphRestartTest.java              |  17 +-
 .../ExecutionGraphSignalsTest.java              |   1 +
 .../executiongraph/ExecutionGraphTestUtils.java |   1 +
 .../ExecutionStateProgressTest.java             |   3 +-
 .../executiongraph/PointwisePatternTest.java    |  21 +-
 .../TerminalStateDeadlockTest.java              |   1 +
 .../executiongraph/VertexSlotSharingTest.java   |  15 +-
 .../restart/FixedDelayRestartStrategyTest.java  |   2 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |  38 +-
 .../JobManagerLeaderElectionTest.java           |  29 +-
 .../TaskManagerLossFailsTasksTest.scala         |   1 +
 .../runtime/testingUtils/TestingCluster.scala   |   8 +-
 .../testingUtils/TestingJobManager.scala        |   6 +-
 .../partitioner/RescalePartitionerTest.java     |   1 +
 .../flink/yarn/TestingYarnJobManager.scala      |   9 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |  17 +-
 .../org/apache/flink/yarn/YarnJobManager.scala  |   9 +-
 42 files changed, 834 insertions(+), 620 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 166218f..3695578 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -65,6 +65,7 @@ import scala.concurrent.duration.FiniteDuration;
 import java.io.File;
 import java.net.InetAddress;
 import java.net.URL;
+import java.net.UnknownHostException;
 import java.security.PrivilegedAction;
 import java.util.Map;
 import java.util.UUID;
@@ -172,43 +173,51 @@ public class MesosApplicationMasterRunner {
 		WebMonitor webMonitor = null;
 		MesosArtifactServer artifactServer = null;
 
-		int numberProcessors = Hardware.getNumberCPUCores();
+		// ------- (1) load and parse / validate all configurations -------
 
-		final ExecutorService futureExecutor = Executors.newFixedThreadPool(
-			numberProcessors,
-			new NamedThreadFactory("mesos-jobmanager-future-", "-thread-"));
+		// loading all config values here has the advantage that the program fails fast, if any
+		// configuration problem occurs
 
-		final ExecutorService ioExecutor = Executors.newFixedThreadPool(
-			numberProcessors,
-			new NamedThreadFactory("mesos-jobmanager-io-", "-thread-"));
+		final String workingDir = ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX);
+		checkState(workingDir != null, "Sandbox directory variable (%s) not set", MesosConfigKeys.ENV_MESOS_SANDBOX);
+
+		final String sessionID = ENV.get(MesosConfigKeys.ENV_SESSION_ID);
+		checkState(sessionID != null, "Session ID (%s) not set", MesosConfigKeys.ENV_SESSION_ID);
+
+		// Note that we use the "appMasterHostname" given by the system, to make sure
+		// we use the hostnames consistently throughout akka.
+		// for akka "localhost" and "localhost.localdomain" are different actors.
+		final String appMasterHostname;
 
 		try {
-			// ------- (1) load and parse / validate all configurations -------
+			appMasterHostname = InetAddress.getLocalHost().getHostName();
+		} catch (UnknownHostException uhe) {
+			LOG.error("Could not retrieve the local hostname.", uhe);
 
-			// loading all config values here has the advantage that the program fails fast, if any
-			// configuration problem occurs
+			return INIT_ERROR_EXIT_CODE;
+		}
 
-			final String workingDir = ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX);
-			checkState(workingDir != null, "Sandbox directory variable (%s) not set", MesosConfigKeys.ENV_MESOS_SANDBOX);
+		// Flink configuration
+		final Configuration dynamicProperties =
+			FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
+		LOG.debug("Mesos dynamic properties: {}", dynamicProperties);
 
-			final String sessionID = ENV.get(MesosConfigKeys.ENV_SESSION_ID);
-			checkState(sessionID != null, "Session ID (%s) not set", MesosConfigKeys.ENV_SESSION_ID);
+		final Configuration config = createConfiguration(workingDir, dynamicProperties);
 
-			// Note that we use the "appMasterHostname" given by the system, to make sure
-			// we use the hostnames consistently throughout akka.
-			// for akka "localhost" and "localhost.localdomain" are different actors.
-			final String appMasterHostname = InetAddress.getLocalHost().getHostName();
+		// Mesos configuration
+		final MesosConfiguration mesosConfig = createMesosConfig(config, appMasterHostname);
 
-			// Flink configuration
-			final Configuration dynamicProperties =
-				FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
-			LOG.debug("Mesos dynamic properties: {}", dynamicProperties);
+		int numberProcessors = Hardware.getNumberCPUCores();
 
-			final Configuration config = createConfiguration(workingDir, dynamicProperties);
+		final ExecutorService futureExecutor = Executors.newFixedThreadPool(
+			numberProcessors,
+			new NamedThreadFactory("mesos-jobmanager-future-", "-thread-"));
 
-			// Mesos configuration
-			final MesosConfiguration mesosConfig = createMesosConfig(config, appMasterHostname);
+		final ExecutorService ioExecutor = Executors.newFixedThreadPool(
+			numberProcessors,
+			new NamedThreadFactory("mesos-jobmanager-io-", "-thread-"));
 
+		try {
 			// environment values related to TM
 			final int taskManagerContainerMemory;
 			final int numInitialTaskManagers;
@@ -380,6 +389,9 @@ public class MesosApplicationMasterRunner {
 				}
 			}
 
+			futureExecutor.shutdownNow();
+			ioExecutor.shutdownNow();
+
 			return INIT_ERROR_EXIT_CODE;
 		}
 
@@ -404,8 +416,11 @@ public class MesosApplicationMasterRunner {
 			LOG.error("Failed to stop the artifact server", t);
 		}
 
-		futureExecutor.shutdownNow();
-		ioExecutor.shutdownNow();
+		org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(
+			AkkaUtils.getTimeout(config).toMillis(),
+			TimeUnit.MILLISECONDS,
+			futureExecutor,
+			ioExecutor);
 
 		return 0;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
index 300539c..38886f8 100644
--- a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
+++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
@@ -37,8 +37,9 @@ import scala.concurrent.duration._
 /** JobManager actor for execution on Mesos. .
   *
   * @param flinkConfiguration Configuration object for the actor
-  * @param executor Execution context which is used to execute concurrent tasks in the
+  * @param futureExecutor Execution context which is used to execute concurrent tasks in the
   *                         [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
+  * @param ioExecutor for blocking io operations
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
   * @param scheduler Scheduler to schedule Flink jobs
@@ -48,22 +49,25 @@ import scala.concurrent.duration._
   * @param timeout Timeout for futures
   * @param leaderElectionService LeaderElectionService to participate in the leader election
   */
-class MesosJobManager(flinkConfiguration: FlinkConfiguration,
-                      executor: Executor,
-                      instanceManager: InstanceManager,
-                      scheduler: FlinkScheduler,
-                      libraryCacheManager: BlobLibraryCacheManager,
-                      archive: ActorRef,
-                      restartStrategyFactory: RestartStrategyFactory,
-                      timeout: FiniteDuration,
-                      leaderElectionService: LeaderElectionService,
-                      submittedJobGraphs : SubmittedJobGraphStore,
-                      checkpointRecoveryFactory : CheckpointRecoveryFactory,
-                      jobRecoveryTimeout: FiniteDuration,
-                      metricsRegistry: Option[FlinkMetricRegistry])
+class MesosJobManager(
+    flinkConfiguration: FlinkConfiguration,
+    futureExecutor: Executor,
+    ioExecutor: Executor,
+    instanceManager: InstanceManager,
+    scheduler: FlinkScheduler,
+    libraryCacheManager: BlobLibraryCacheManager,
+    archive: ActorRef,
+    restartStrategyFactory: RestartStrategyFactory,
+    timeout: FiniteDuration,
+    leaderElectionService: LeaderElectionService,
+    submittedJobGraphs : SubmittedJobGraphStore,
+    checkpointRecoveryFactory : CheckpointRecoveryFactory,
+    jobRecoveryTimeout: FiniteDuration,
+    metricsRegistry: Option[FlinkMetricRegistry])
   extends ContaineredJobManager(
     flinkConfiguration,
-    executor,
+    futureExecutor,
+    ioExecutor,
     instanceManager,
     scheduler,
     libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
index 3702eb4..97de89b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
@@ -165,7 +165,7 @@ public class BackPressureStatsTracker {
 			if (!pendingStats.contains(vertex) &&
 					!vertex.getGraph().getState().isGloballyTerminalState()) {
 
-				Executor executor = vertex.getGraph().getExecutor();
+				Executor executor = vertex.getGraph().getFutureExecutor();
 
 				// Only trigger if still active job
 				if (executor != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
index 7ac2a69..c7e303d 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
@@ -66,7 +66,7 @@ public class BackPressureStatsTrackerTest {
 		when(graph.getState()).thenReturn(JobStatus.RUNNING);
 
 		// Same Thread execution context
-		when(graph.getExecutor()).thenReturn(new Executor() {
+		when(graph.getFutureExecutor()).thenReturn(new Executor() {
 
 			@Override
 			public void execute(Runnable runnable) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 886409d..638e0a7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,6 +48,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -150,6 +152,8 @@ public class CheckpointCoordinator {
 	/** Default checkpoint properties **/
 	private final CheckpointProperties checkpointProperties;
 
+	private final Executor executor;
+
 	// --------------------------------------------------------------------------------------------
 
 	public CheckpointCoordinator(
@@ -165,7 +169,8 @@ public class CheckpointCoordinator {
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore completedCheckpointStore,
 			String checkpointDirectory,
-			CheckpointStatsTracker statsTracker) {
+			CheckpointStatsTracker statsTracker,
+			Executor executor) {
 
 		// sanity checks
 		checkArgument(baseInterval > 0, "Checkpoint timeout must be larger than zero");
@@ -216,6 +221,8 @@ public class CheckpointCoordinator {
 		} catch (Throwable t) {
 			throw new RuntimeException("Failed to start checkpoint ID counter: " + t.getMessage(), t);
 		}
+
+		this.executor = checkNotNull(executor);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -296,7 +303,7 @@ public class CheckpointCoordinator {
 	 * the checkpoint will be declined.
 	 * @return <code>true</code> if triggering the checkpoint succeeded.
 	 */
-	public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) throws Exception {
+	public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) {
 		return triggerCheckpoint(timestamp, checkpointProperties, checkpointDirectory, isPeriodic).isSuccess();
 	}
 
@@ -305,7 +312,7 @@ public class CheckpointCoordinator {
 			long timestamp,
 			CheckpointProperties props,
 			String targetDirectory,
-			boolean isPeriodic) throws Exception {
+			boolean isPeriodic) {
 
 		// Sanity check
 		if (props.externalizeCheckpoint() && targetDirectory == null) {
@@ -415,36 +422,32 @@ public class CheckpointCoordinator {
 			}
 
 			final PendingCheckpoint checkpoint = new PendingCheckpoint(
-					job,
-					checkpointID,
-					timestamp,
-					ackTasks,
-					isPeriodic,
-					props,
-					targetDirectory);
+				job,
+				checkpointID,
+				timestamp,
+				ackTasks,
+				isPeriodic,
+				props,
+				targetDirectory,
+				executor);
 
 			// schedule the timer that will clean up the expired checkpoints
 			TimerTask canceller = new TimerTask() {
 				@Override
 				public void run() {
-					try {
-						synchronized (lock) {
-							// only do the work if the checkpoint is not discarded anyways
-							// note that checkpoint completion discards the pending checkpoint object
-							if (!checkpoint.isDiscarded()) {
-								LOG.info("Checkpoint " + checkpointID + " expired before completing.");
-
-								checkpoint.abortExpired();
-								pendingCheckpoints.remove(checkpointID);
-								rememberRecentCheckpointId(checkpointID);
-
-								triggerQueuedRequests();
-							}
+					synchronized (lock) {
+						// only do the work if the checkpoint is not discarded anyways
+						// note that checkpoint completion discards the pending checkpoint object
+						if (!checkpoint.isDiscarded()) {
+							LOG.info("Checkpoint " + checkpointID + " expired before completing.");
+
+							checkpoint.abortExpired();
+							pendingCheckpoints.remove(checkpointID);
+							rememberRecentCheckpointId(checkpointID);
+
+							triggerQueuedRequests();
 						}
 					}
-					catch (Throwable t) {
-						LOG.error("Exception while handling checkpoint timeout", t);
-					}
 				}
 			};
 
@@ -531,7 +534,7 @@ public class CheckpointCoordinator {
 	 *
 	 * @param message Checkpoint decline from the task manager
 	 */
-	public void receiveDeclineMessage(DeclineCheckpoint message) throws Exception {
+	public void receiveDeclineMessage(DeclineCheckpoint message) {
 		if (shutdown || message == null) {
 			return;
 		}
@@ -675,12 +678,8 @@ public class CheckpointCoordinator {
 								"the state handle to avoid lingering state.", message.getCheckpointId(),
 							message.getTaskExecutionId(), message.getJob());
 
-						try {
-							message.getSubtaskState().discardState();
-						} catch (Exception e) {
-							LOG.warn("Could not properly discard state for checkpoint {} of task {} of job {}.",
-								message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), e);
-						}
+						discardState(message.getSubtaskState());
+
 						break;
 					case DISCARDED:
 						LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
@@ -688,12 +687,7 @@ public class CheckpointCoordinator {
 								"state handle tp avoid lingering state.",
 							message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
 
-						try {
-							message.getSubtaskState().discardState();
-						} catch (Exception e) {
-							LOG.warn("Could not properly discard state for checkpoint {} of task {} of job {}.",
-								message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), e);
-						}
+						discardState(message.getSubtaskState());
 				}
 			}
 			else if (checkpoint != null) {
@@ -712,13 +706,8 @@ public class CheckpointCoordinator {
 					isPendingCheckpoint = false;
 				}
 
-				try {
-					// try to discard the state so that we don't have lingering state lying around
-					message.getSubtaskState().discardState();
-				} catch (Exception e) {
-					LOG.warn("Could not properly discard state for checkpoint {} of task {} of job {}.",
-						message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), e);
-				}
+				// try to discard the state so that we don't have lingering state lying around
+				discardState(message.getSubtaskState());
 			}
 		}
 
@@ -747,7 +736,7 @@ public class CheckpointCoordinator {
 		recentPendingCheckpoints.addLast(id);
 	}
 
-	private void dropSubsumedCheckpoints(long checkpointId) throws Exception {
+	private void dropSubsumedCheckpoints(long checkpointId) {
 		Iterator<Map.Entry<Long, PendingCheckpoint>> entries = pendingCheckpoints.entrySet().iterator();
 
 		while (entries.hasNext()) {
@@ -766,7 +755,7 @@ public class CheckpointCoordinator {
 	 *
 	 * <p>NOTE: The caller of this method must hold the lock when invoking the method!
 	 */
-	private void triggerQueuedRequests() throws Exception {
+	private void triggerQueuedRequests() {
 		if (triggerRequestQueued) {
 			triggerRequestQueued = false;
 
@@ -915,11 +904,7 @@ public class CheckpointCoordinator {
 			}
 
 			for (PendingCheckpoint p : pendingCheckpoints.values()) {
-				try {
-					p.abortError(new Exception("Checkpoint Coordinator is suspending."));
-				} catch (Throwable t) {
-					LOG.error("Error while disposing pending checkpoint", t);
-				}
+				p.abortError(new Exception("Checkpoint Coordinator is suspending."));
 			}
 
 			pendingCheckpoints.clear();
@@ -959,4 +944,17 @@ public class CheckpointCoordinator {
 			}
 		}
 	}
+
+	private void discardState(final StateObject stateObject) {
+		executor.execute(new Runnable() {
+			@Override
+			public void run() {
+				try {
+					stateObject.discardState();
+				} catch (Exception e) {
+					LOG.warn("Could not properly discard state object.", e);
+				}
+			}
+		});
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 5034502..cfb59f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,6 +39,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -84,6 +86,8 @@ public class PendingCheckpoint {
 	/** The promise to fulfill once the checkpoint has been completed. */
 	private final FlinkCompletableFuture<CompletedCheckpoint> onCompletionPromise = new FlinkCompletableFuture<>();
 
+	private final Executor executor;
+
 	private int numAcknowledgedTasks;
 
 	private boolean discarded;
@@ -97,7 +101,8 @@ public class PendingCheckpoint {
 			Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm,
 			boolean isPeriodic,
 			CheckpointProperties props,
-			String targetDirectory) {
+			String targetDirectory,
+			Executor executor) {
 		this.jobId = checkNotNull(jobId);
 		this.checkpointId = checkpointId;
 		this.checkpointTimestamp = checkpointTimestamp;
@@ -106,6 +111,7 @@ public class PendingCheckpoint {
 		this.taskStates = new HashMap<>();
 		this.props = checkNotNull(props);
 		this.targetDirectory = targetDirectory;
+		this.executor = Preconditions.checkNotNull(executor);
 
 		// Sanity check
 		if (props.externalizeCheckpoint() && targetDirectory == null) {
@@ -324,7 +330,7 @@ public class PendingCheckpoint {
 	/**
 	 * Aborts a checkpoint because it expired (took too long).
 	 */
-	public void abortExpired() throws Exception {
+	public void abortExpired() {
 		try {
 			onCompletionPromise.completeExceptionally(new Exception("Checkpoint expired before completing"));
 		} finally {
@@ -335,7 +341,7 @@ public class PendingCheckpoint {
 	/**
 	 * Aborts the pending checkpoint because a newer completed checkpoint subsumed it.
 	 */
-	public void abortSubsumed() throws Exception {
+	public void abortSubsumed() {
 		try {
 			if (props.forceCheckpoint()) {
 				onCompletionPromise.completeExceptionally(new Exception("Bug: forced checkpoints must never be subsumed"));
@@ -349,7 +355,7 @@ public class PendingCheckpoint {
 		}
 	}
 
-	public void abortDeclined() throws Exception {
+	public void abortDeclined() {
 		try {
 			onCompletionPromise.completeExceptionally(new Exception("Checkpoint was declined (tasks not ready)"));
 		} finally {
@@ -361,7 +367,7 @@ public class PendingCheckpoint {
 	 * Aborts the pending checkpoint due to an error.
 	 * @param cause The error's exception.
 	 */
-	public void abortError(Throwable cause) throws Exception {
+	public void abortError(Throwable cause) {
 		try {
 			onCompletionPromise.completeExceptionally(new Exception("Checkpoint failed: " + cause.getMessage(), cause));
 		} finally {
@@ -369,13 +375,24 @@ public class PendingCheckpoint {
 		}
 	}
 
-	private void dispose(boolean releaseState) throws Exception {
+	private void dispose(boolean releaseState) {
 		synchronized (lock) {
 			try {
 				discarded = true;
 				numAcknowledgedTasks = -1;
 				if (releaseState) {
-					StateUtil.bestEffortDiscardAllStateObjects(taskStates.values());
+					executor.execute(new Runnable() {
+						@Override
+						public void run() {
+							try {
+								StateUtil.bestEffortDiscardAllStateObjects(taskStates.values());
+							} catch (Exception e) {
+								LOG.warn("Could not properly dispose the pending checkpoint " +
+									"{} of job {}.", checkpointId, jobId, e);
+							}
+						}
+					});
+
 				}
 			} finally {
 				taskStates.clear();

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
index 1832d70..391f233 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
@@ -18,13 +18,20 @@
 
 package org.apache.flink.runtime.concurrent;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Collection of {@link Executor} implementations
  */
 public class Executors {
 
+	private static final Logger LOG = LoggerFactory.getLogger(Executors.class);
+
 	/**
 	 * Return a direct executor. The direct executor directly executes the runnable in the calling
 	 * thread.
@@ -49,4 +56,46 @@ public class Executors {
 			command.run();
 		}
 	}
+
+	/**
+	 * Gracefully shutdown the given {@link ExecutorService}. The call waits the given timeout that
+	 * all ExecutorServices terminate. If the ExecutorServices do not terminate in this time,
+	 * they will be shut down hard.
+	 *
+	 * @param timeout to wait for the termination of all ExecutorServices
+	 * @param unit of the timeout
+	 * @param executorServices to shut down
+	 */
+	public static void gracefulShutdown(long timeout, TimeUnit unit, ExecutorService... executorServices) {
+		for (ExecutorService executorService: executorServices) {
+			executorService.shutdown();
+		}
+
+		boolean wasInterrupted = false;
+		final long endTime = unit.toMillis(timeout) + System.currentTimeMillis();
+		long timeLeft = unit.toMillis(timeout);
+		boolean hasTimeLeft = timeLeft > 0L;
+
+		for (ExecutorService executorService: executorServices) {
+			if (wasInterrupted || !hasTimeLeft) {
+				executorService.shutdownNow();
+			} else {
+				try {
+					if (!executorService.awaitTermination(timeLeft, TimeUnit.MILLISECONDS)) {
+						LOG.warn("ExecutorService did not terminate in time. Shutting it down now.");
+						executorService.shutdownNow();
+					}
+				} catch (InterruptedException e) {
+					LOG.warn("Interrupted while shutting down executor services. Shutting all " +
+						"remaining ExecutorServices down now.", e);
+					executorService.shutdownNow();
+
+					wasInterrupted = true;
+				}
+
+				timeLeft = endTime - System.currentTimeMillis();
+				hasTimeLeft = timeLeft > 0L;
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index f8e894a..cbb4c7e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -58,6 +58,7 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -203,7 +204,10 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	private CheckpointStatsTracker checkpointStatsTracker;
 
 	/** The executor which is used to execute futures. */
-	private Executor executor;
+	private final Executor futureExecutor;
+
+	/** The executor which is used to execute blocking io operations */
+	private final Executor ioExecutor;
 
 	/** Registered KvState instances reported by the TaskManagers. */
 	private KvStateLocationRegistry kvStateLocationRegistry;
@@ -219,7 +223,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * This constructor is for tests only, because it does not include class loading information.
 	 */
 	ExecutionGraph(
-			Executor executor,
+			Executor futureExecutor,
+			Executor ioExecutor,
 			JobID jobId,
 			String jobName,
 			Configuration jobConfig,
@@ -227,7 +232,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			Time timeout,
 			RestartStrategy restartStrategy) throws IOException {
 		this(
-			executor,
+			futureExecutor,
+			ioExecutor,
 			jobId,
 			jobName,
 			jobConfig,
@@ -242,7 +248,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	}
 
 	public ExecutionGraph(
-			Executor executor,
+			Executor futureExecutor,
+			Executor ioExecutor,
 			JobID jobId,
 			String jobName,
 			Configuration jobConfig,
@@ -254,7 +261,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			ClassLoader userClassLoader,
 			MetricGroup metricGroup) throws IOException {
 
-		checkNotNull(executor);
+		checkNotNull(futureExecutor);
 		checkNotNull(jobId);
 		checkNotNull(jobName);
 		checkNotNull(jobConfig);
@@ -271,7 +278,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		// serialize the job information to do the serialisation work only once
 		this.serializedJobInformation = new SerializedValue<>(jobInformation);
 
-		this.executor = executor;
+		this.futureExecutor = Preconditions.checkNotNull(futureExecutor);
+		this.ioExecutor = Preconditions.checkNotNull(ioExecutor);
 
 		this.userClassLoader = userClassLoader;
 
@@ -365,19 +373,20 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 		// create the coordinator that triggers and commits checkpoints and holds the state
 		checkpointCoordinator = new CheckpointCoordinator(
-				jobInformation.getJobId(),
-				interval,
-				checkpointTimeout,
-				minPauseBetweenCheckpoints,
-				maxConcurrentCheckpoints,
-				externalizeSettings,
-				tasksToTrigger,
-				tasksToWaitFor,
-				tasksToCommitTo,
-				checkpointIDCounter,
-				checkpointStore,
-				checkpointDir,
-				checkpointStatsTracker);
+			jobInformation.getJobId(),
+			interval,
+			checkpointTimeout,
+			minPauseBetweenCheckpoints,
+			maxConcurrentCheckpoints,
+			externalizeSettings,
+			tasksToTrigger,
+			tasksToWaitFor,
+			tasksToCommitTo,
+			checkpointIDCounter,
+			checkpointStore,
+			checkpointDir,
+			checkpointStatsTracker,
+			ioExecutor);
 
 		// interval of max long value indicates disable periodic checkpoint,
 		// the CheckpointActivatorDeactivator should be created only if the interval is not max value
@@ -589,8 +598,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 *
 	 * @return ExecutionContext associated with this ExecutionGraph
 	 */
-	public Executor getExecutor() {
-		return executor;
+	public Executor getFutureExecutor() {
+		return futureExecutor;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 3be1d56..a1d7385 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -53,7 +53,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * Utility class to encapsulate the logic of building an {@link ExecutionGraph} from a {@link JobGraph}.
  */
 public class ExecutionGraphBuilder {
-		/**
+	/**
 	 * Builds the ExecutionGraph from the JobGraph.
 	 * If a prior execution graph exists, the JobGraph will be attached. If no prior execution
 	 * graph exists, then the JobGraph will become attach to a new emoty execution graph.
@@ -62,7 +62,8 @@ public class ExecutionGraphBuilder {
 			@Nullable ExecutionGraph prior,
 			JobGraph jobGraph,
 			Configuration jobManagerConfig,
-			Executor executor,
+			Executor futureExecutor,
+			Executor ioExecutor,
 			ClassLoader classLoader,
 			CheckpointRecoveryFactory recoveryFactory,
 			Time timeout,
@@ -83,17 +84,18 @@ public class ExecutionGraphBuilder {
 		try {
 			executionGraph = (prior != null) ? prior :
 					new ExecutionGraph(
-							executor,
-							jobId,
-							jobName,
-							jobGraph.getJobConfiguration(),
-							jobGraph.getSerializedExecutionConfig(),
-							timeout,
-							restartStrategy,
-							jobGraph.getUserJarBlobKeys(),
-							jobGraph.getClasspaths(),
-							classLoader,
-							metrics);
+						futureExecutor,
+						ioExecutor,
+						jobId,
+						jobName,
+						jobGraph.getJobConfiguration(),
+						jobGraph.getSerializedExecutionConfig(),
+						timeout,
+						restartStrategy,
+						jobGraph.getUserJarBlobKeys(),
+						jobGraph.getClasspaths(),
+						classLoader,
+						metrics);
 		} catch (IOException e) {
 			throw new JobException("Could not create the execution graph.", e);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 01e8660..39c60d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -128,7 +128,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 		this.priorExecutions = new CopyOnWriteArrayList<Execution>();
 
 		this.currentExecution = new Execution(
-			getExecutionGraph().getExecutor(),
+			getExecutionGraph().getFutureExecutor(),
 			this,
 			0,
 			createTimestamp,
@@ -435,7 +435,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 			if (state == FINISHED || state == CANCELED || state == FAILED) {
 				priorExecutions.add(execution);
 				currentExecution = new Execution(
-					getExecutionGraph().getExecutor(),
+					getExecutionGraph().getFutureExecutor(),
 					this,
 					execution.getAttemptNumber()+1,
 					System.currentTimeMillis(),

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
index 3962e91..10546a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
@@ -70,7 +70,7 @@ public class FailureRateRestartStrategy implements RestartStrategy {
 			restartTimestampsDeque.remove();
 		}
 		restartTimestampsDeque.add(System.currentTimeMillis());
-		FlinkFuture.supplyAsync(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayInterval.toMilliseconds()), executionGraph.getExecutor());
+		FlinkFuture.supplyAsync(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayInterval.toMilliseconds()), executionGraph.getFutureExecutor());
 	}
 
 	private boolean isRestartTimestampsQueueFull() {

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
index 5337c6a..f51ea7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
@@ -58,7 +58,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
 	@Override
 	public void restart(final ExecutionGraph executionGraph) {
 		currentRestartAttempt++;
-		FlinkFuture.supplyAsync(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayBetweenRestartAttempts), executionGraph.getExecutor());
+		FlinkFuture.supplyAsync(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayBetweenRestartAttempts), executionGraph.getFutureExecutor());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
index 0f31eba..cbe80f1 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
@@ -45,8 +45,9 @@ import scala.language.postfixOps
   * to start/administer/stop the session.
   *
   * @param flinkConfiguration Configuration object for the actor
-  * @param executor Execution context which is used to execute concurrent tasks in the
+  * @param futureExecutor Execution context which is used to execute concurrent tasks in the
   *                         [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
+  * @param ioExecutor to execute blocking io operations
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
   * @param scheduler Scheduler to schedule Flink jobs
@@ -58,7 +59,8 @@ import scala.language.postfixOps
   */
 abstract class ContaineredJobManager(
     flinkConfiguration: Configuration,
-    executor: Executor,
+    futureExecutor: Executor,
+    ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -72,7 +74,8 @@ abstract class ContaineredJobManager(
     metricsRegistry: Option[FlinkMetricRegistry])
   extends JobManager(
     flinkConfiguration,
-    executor,
+    futureExecutor,
+    ioExecutor,
     instanceManager,
     scheduler,
     libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 08ed0a4..197456f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -44,7 +44,7 @@ import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.messages._
 import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.concurrent.{AcceptFunction, BiFunction}
+import org.apache.flink.runtime.concurrent.{AcceptFunction, BiFunction, Executors => FlinkExecutors}
 import org.apache.flink.runtime.execution.SuppressRestartsException
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
@@ -118,7 +118,8 @@ import scala.language.postfixOps
  */
 class JobManager(
     protected val flinkConfiguration: Configuration,
-    protected val executor: Executor,
+    protected val futureExecutor: Executor,
+    protected val ioExecutor: Executor,
     protected val instanceManager: InstanceManager,
     protected val scheduler: FlinkScheduler,
     protected val libraryCacheManager: BlobLibraryCacheManager,
@@ -1247,7 +1248,8 @@ class JobManager(
           executionGraph,
           jobGraph,
           flinkConfiguration,
-          executor,
+          futureExecutor,
+          ioExecutor,
           userCodeLoader,
           checkpointRecoveryFactory,
           Time.of(timeout.length, timeout.unit),
@@ -1976,8 +1978,9 @@ object JobManager {
 
     val ioExecutor = Executors.newFixedThreadPool(
       numberProcessors,
-      new NamedThreadFactory("jobmanager-io-", "-thread-")
-    )
+      new NamedThreadFactory("jobmanager-io-", "-thread-"))
+
+    val timeout = AkkaUtils.getTimeout(configuration)
 
     val (jobManagerSystem, _, _, webMonitorOption, _) = try {
       startActorSystemAndJobManagerActors(
@@ -1993,7 +1996,8 @@ object JobManager {
       )
     } catch {
       case t: Throwable =>
-          futureExecutor.shutdownNow()
+        futureExecutor.shutdownNow()
+        ioExecutor.shutdownNow()
 
         throw t
     }
@@ -2011,8 +2015,11 @@ object JobManager {
         }
     }
 
-    futureExecutor.shutdownNow()
-    ioExecutor.shutdownNow()
+    FlinkExecutors.gracefulShutdown(
+      timeout.toMillis,
+      TimeUnit.MILLISECONDS,
+      futureExecutor,
+      ioExecutor)
   }
 
   /**
@@ -2620,6 +2627,7 @@ object JobManager {
       jobManagerClass,
       configuration,
       futureExecutor,
+      ioExecutor,
       instanceManager,
       scheduler,
       libraryCacheManager,
@@ -2653,7 +2661,8 @@ object JobManager {
   def getJobManagerProps(
     jobManagerClass: Class[_ <: JobManager],
     configuration: Configuration,
-    executor: Executor,
+    futureExecutor: Executor,
+    ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -2669,7 +2678,8 @@ object JobManager {
     Props(
       jobManagerClass,
       configuration,
-      executor,
+      futureExecutor,
+      ioExecutor,
       instanceManager,
       scheduler,
       libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 4367442..dc59048 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -20,16 +20,18 @@ package org.apache.flink.runtime.minicluster
 
 import java.net.InetAddress
 import java.util.UUID
-import java.util.concurrent.{Executors, ForkJoinPool}
+import java.util.concurrent.{Executors, TimeUnit}
 
 import akka.pattern.Patterns.gracefulStop
 import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
 import com.typesafe.config.Config
+import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
+import org.apache.flink.runtime.concurrent.{Executors => FlinkExecutors}
 import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
 import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode
@@ -409,8 +411,11 @@ abstract class FlinkMiniCluster(
     jobManagerLeaderRetrievalService.foreach(_.stop())
     isRunning = false
 
-    futureExecutor.shutdownNow()
-    ioExecutor.shutdownNow()
+    FlinkExecutors.gracefulShutdown(
+      timeout.toMillis,
+      TimeUnit.MILLISECONDS,
+      futureExecutor,
+      ioExecutor)
   }
 
   protected def shutdown(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index b2aedf7..09deadc 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -18,12 +18,12 @@
 
 package org.apache.flink.runtime.minicluster
 
-import java.util.concurrent.ExecutorService
+import java.util.concurrent.{Executor, ExecutorService}
 
 import akka.actor.{ActorRef, ActorSystem, Props}
 import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.io.FileOutputFormat
-import org.apache.flink.configuration.{QueryableStateOptions, ConfigConstants, Configuration}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, QueryableStateOptions}
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
@@ -141,6 +141,7 @@ class LocalFlinkMiniCluster(
         jobManagerClass,
         config,
         futureExecutor,
+        ioExecutor,
         instanceManager,
         scheduler,
         libraryCacheManager,
@@ -242,25 +243,28 @@ class LocalFlinkMiniCluster(
   }
 
   def getJobManagerProps(
-    jobManagerClass: Class[_ <: JobManager],
-    configuration: Configuration,
-    executorService: ExecutorService,
-    instanceManager: InstanceManager,
-    scheduler: Scheduler,
-    libraryCacheManager: BlobLibraryCacheManager,
-    archive: ActorRef,
-    restartStrategyFactory: RestartStrategyFactory,
-    timeout: FiniteDuration,
-    leaderElectionService: LeaderElectionService,
-    submittedJobGraphStore: SubmittedJobGraphStore,
-    checkpointRecoveryFactory: CheckpointRecoveryFactory,
-    jobRecoveryTimeout: FiniteDuration,
-    metricsRegistry: Option[MetricRegistry]): Props = {
+      jobManagerClass: Class[_ <: JobManager],
+      configuration: Configuration,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
+      instanceManager: InstanceManager,
+      scheduler: Scheduler,
+      libraryCacheManager: BlobLibraryCacheManager,
+      archive: ActorRef,
+      restartStrategyFactory: RestartStrategyFactory,
+      timeout: FiniteDuration,
+      leaderElectionService: LeaderElectionService,
+      submittedJobGraphStore: SubmittedJobGraphStore,
+      checkpointRecoveryFactory: CheckpointRecoveryFactory,
+      jobRecoveryTimeout: FiniteDuration,
+      metricsRegistry: Option[MetricRegistry])
+    : Props = {
 
     JobManager.getJobManagerProps(
       jobManagerClass,
       configuration,
-      executorService,
+      futureExecutor,
+      ioExecutor,
       instanceManager,
       scheduler,
       libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index a59ffa2..8e46f4c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
@@ -116,19 +117,20 @@ public class CheckpointCoordinatorTest {
 
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid,
-					600000,
-					600000,
-					0,
-					Integer.MAX_VALUE,
-					ExternalizedCheckpointSettings.none(),
-					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
-					new ExecutionVertex[] { ackVertex1, ackVertex2 },
-					new ExecutionVertex[] {},
-					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(1),
-					null,
-					new DisabledCheckpointStatsTracker());
+				jid,
+				600000,
+				600000,
+				0,
+				Integer.MAX_VALUE,
+				ExternalizedCheckpointSettings.none(),
+				new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
+				new ExecutionVertex[] { ackVertex1, ackVertex2 },
+				new ExecutionVertex[] {},
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(1),
+				null,
+				new DisabledCheckpointStatsTracker(),
+				Executors.directExecutor());
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -169,19 +171,20 @@ public class CheckpointCoordinatorTest {
 
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid,
-					600000,
-					600000,
-					0,
-					Integer.MAX_VALUE,
-					ExternalizedCheckpointSettings.none(),
-					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
-					new ExecutionVertex[] { ackVertex1, ackVertex2 },
-					new ExecutionVertex[] {},
-					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(1),
-					null,
-					new DisabledCheckpointStatsTracker());
+				jid,
+				600000,
+				600000,
+				0,
+				Integer.MAX_VALUE,
+				ExternalizedCheckpointSettings.none(),
+				new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
+				new ExecutionVertex[] { ackVertex1, ackVertex2 },
+				new ExecutionVertex[] {},
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(1),
+				null,
+				new DisabledCheckpointStatsTracker(),
+				Executors.directExecutor());
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -220,19 +223,20 @@ public class CheckpointCoordinatorTest {
 
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid,
-					600000,
-					600000,
-					0,
-					Integer.MAX_VALUE,
-					ExternalizedCheckpointSettings.none(),
-					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
-					new ExecutionVertex[] { ackVertex1, ackVertex2 },
-					new ExecutionVertex[] {},
-					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(1),
-					null,
-					new DisabledCheckpointStatsTracker());
+				jid,
+				600000,
+				600000,
+				0,
+				Integer.MAX_VALUE,
+				ExternalizedCheckpointSettings.none(),
+				new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
+				new ExecutionVertex[] { ackVertex1, ackVertex2 },
+				new ExecutionVertex[] {},
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(1),
+				null,
+				new DisabledCheckpointStatsTracker(),
+				Executors.directExecutor());
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -272,19 +276,20 @@ public class CheckpointCoordinatorTest {
 
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid,
-					600000,
-					600000,
-					0,
-					Integer.MAX_VALUE,
-					ExternalizedCheckpointSettings.none(),
-					new ExecutionVertex[] { vertex1, vertex2 },
-					new ExecutionVertex[] { vertex1, vertex2 },
-					new ExecutionVertex[] { vertex1, vertex2 },
-					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(1),
-					null,
-					new DisabledCheckpointStatsTracker());
+				jid,
+				600000,
+				600000,
+				0,
+				Integer.MAX_VALUE,
+				ExternalizedCheckpointSettings.none(),
+				new ExecutionVertex[] { vertex1, vertex2 },
+				new ExecutionVertex[] { vertex1, vertex2 },
+				new ExecutionVertex[] { vertex1, vertex2 },
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(1),
+				null,
+				new DisabledCheckpointStatsTracker(),
+				Executors.directExecutor());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -370,19 +375,20 @@ public class CheckpointCoordinatorTest {
 
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid,
-					600000,
-					600000,
-					0,
-					Integer.MAX_VALUE,
-					ExternalizedCheckpointSettings.none(),
-					new ExecutionVertex[] { vertex1, vertex2 },
-					new ExecutionVertex[] { vertex1, vertex2 },
-					new ExecutionVertex[] { vertex1, vertex2 },
-					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(1),
-					null,
-					new DisabledCheckpointStatsTracker());
+				jid,
+				600000,
+				600000,
+				0,
+				Integer.MAX_VALUE,
+				ExternalizedCheckpointSettings.none(),
+				new ExecutionVertex[] { vertex1, vertex2 },
+				new ExecutionVertex[] { vertex1, vertex2 },
+				new ExecutionVertex[] { vertex1, vertex2 },
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(1),
+				null,
+				new DisabledCheckpointStatsTracker(),
+				Executors.directExecutor());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -487,19 +493,20 @@ public class CheckpointCoordinatorTest {
 
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid,
-					600000,
-					600000,
-					0,
-					Integer.MAX_VALUE,
-					ExternalizedCheckpointSettings.none(),
-					new ExecutionVertex[] { vertex1, vertex2 },
-					new ExecutionVertex[] { vertex1, vertex2 },
-					new ExecutionVertex[] { vertex1, vertex2 },
-					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(1),
-					null,
-					new DisabledCheckpointStatsTracker());
+				jid,
+				600000,
+				600000,
+				0,
+				Integer.MAX_VALUE,
+				ExternalizedCheckpointSettings.none(),
+				new ExecutionVertex[] { vertex1, vertex2 },
+				new ExecutionVertex[] { vertex1, vertex2 },
+				new ExecutionVertex[] { vertex1, vertex2 },
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(1),
+				null,
+				new DisabledCheckpointStatsTracker(),
+				Executors.directExecutor());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -633,19 +640,20 @@ public class CheckpointCoordinatorTest {
 
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid,
-					600000,
-					600000,
-					0,
-					Integer.MAX_VALUE,
-					ExternalizedCheckpointSettings.none(),
-					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
-					new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
-					new ExecutionVertex[] { commitVertex },
-					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(2),
-					null,
-					new DisabledCheckpointStatsTracker());
+				jid,
+				600000,
+				600000,
+				0,
+				Integer.MAX_VALUE,
+				ExternalizedCheckpointSettings.none(),
+				new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
+				new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
+				new ExecutionVertex[] { commitVertex },
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(2),
+				null,
+				new DisabledCheckpointStatsTracker(),
+				Executors.directExecutor());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -767,19 +775,20 @@ public class CheckpointCoordinatorTest {
 
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid,
-					600000,
-					600000,
-					0,
-					Integer.MAX_VALUE,
-					ExternalizedCheckpointSettings.none(),
-					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
-					new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
-					new ExecutionVertex[] { commitVertex },
-					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(10),
-					null,
-					new DisabledCheckpointStatsTracker());
+				jid,
+				600000,
+				600000,
+				0,
+				Integer.MAX_VALUE,
+				ExternalizedCheckpointSettings.none(),
+				new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
+				new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
+				new ExecutionVertex[] { commitVertex },
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(10),
+				null,
+				new DisabledCheckpointStatsTracker(),
+				Executors.directExecutor());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -888,19 +897,20 @@ public class CheckpointCoordinatorTest {
 			// the timeout for the checkpoint is a 200 milliseconds
 
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid,
-					600000,
-					200,
-					0,
-					Integer.MAX_VALUE,
-					ExternalizedCheckpointSettings.none(),
-					new ExecutionVertex[] { triggerVertex },
-					new ExecutionVertex[] { ackVertex1, ackVertex2 },
-					new ExecutionVertex[] { commitVertex },
-					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(2),
-					null,
-					new DisabledCheckpointStatsTracker());
+				jid,
+				600000,
+				200,
+				0,
+				Integer.MAX_VALUE,
+				ExternalizedCheckpointSettings.none(),
+				new ExecutionVertex[] { triggerVertex },
+				new ExecutionVertex[] { ackVertex1, ackVertex2 },
+				new ExecutionVertex[] { commitVertex },
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(2),
+				null,
+				new DisabledCheckpointStatsTracker(),
+				Executors.directExecutor());
 
 			// trigger a checkpoint, partially acknowledged
 			assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -956,19 +966,20 @@ public class CheckpointCoordinatorTest {
 			ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
 
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid,
-					200000,
-					200000,
-					0,
-					Integer.MAX_VALUE,
-					ExternalizedCheckpointSettings.none(),
-					new ExecutionVertex[] { triggerVertex },
-					new ExecutionVertex[] { ackVertex1, ackVertex2 },
-					new ExecutionVertex[] { commitVertex },
-					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(2),
-					null,
-					new DisabledCheckpointStatsTracker());
+				jid,
+				200000,
+				200000,
+				0,
+				Integer.MAX_VALUE,
+				ExternalizedCheckpointSettings.none(),
+				new ExecutionVertex[] { triggerVertex },
+				new ExecutionVertex[] { ackVertex1, ackVertex2 },
+				new ExecutionVertex[] { commitVertex },
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(2),
+				null,
+				new DisabledCheckpointStatsTracker(),
+				Executors.directExecutor());
 
 			assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -1033,7 +1044,8 @@ public class CheckpointCoordinatorTest {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			new DisabledCheckpointStatsTracker());
+			new DisabledCheckpointStatsTracker(),
+			Executors.directExecutor());
 
 		assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -1145,19 +1157,20 @@ public class CheckpointCoordinatorTest {
 			}).when(execution).triggerCheckpoint(anyLong(), anyLong());
 			
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid,
-					10,        // periodic interval is 10 ms
-					200000,    // timeout is very long (200 s)
-					0,
-					Integer.MAX_VALUE,
-					ExternalizedCheckpointSettings.none(),
-					new ExecutionVertex[] { triggerVertex },
-					new ExecutionVertex[] { ackVertex },
-					new ExecutionVertex[] { commitVertex },
-					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(2),
-					null,
-					new DisabledCheckpointStatsTracker());
+				jid,
+				10,        // periodic interval is 10 ms
+				200000,    // timeout is very long (200 s)
+				0,
+				Integer.MAX_VALUE,
+				ExternalizedCheckpointSettings.none(),
+				new ExecutionVertex[] { triggerVertex },
+				new ExecutionVertex[] { ackVertex },
+				new ExecutionVertex[] { commitVertex },
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(2),
+				null,
+				new DisabledCheckpointStatsTracker(),
+				Executors.directExecutor());
 
 			
 			coord.startCheckpointScheduler();
@@ -1236,19 +1249,20 @@ public class CheckpointCoordinatorTest {
 			}).when(execution).triggerCheckpoint(anyLong(), anyLong());
 
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid,
-					10,        // periodic interval is 10 ms
-					200000,    // timeout is very long (200 s)
-					500,    // 500ms delay between checkpoints
-					10,
-					ExternalizedCheckpointSettings.none(),
-					new ExecutionVertex[] { vertex1 },
-					new ExecutionVertex[] { vertex1 },
-					new ExecutionVertex[] { vertex1 },
-					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(2),
-					null,
-					new DisabledCheckpointStatsTracker());
+				jid,
+				10,        // periodic interval is 10 ms
+				200000,    // timeout is very long (200 s)
+				500,    // 500ms delay between checkpoints
+				10,
+				ExternalizedCheckpointSettings.none(),
+				new ExecutionVertex[] { vertex1 },
+				new ExecutionVertex[] { vertex1 },
+				new ExecutionVertex[] { vertex1 },
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(2),
+				null,
+				new DisabledCheckpointStatsTracker(),
+				Executors.directExecutor());
 
 			coord.startCheckpointScheduler();
 
@@ -1321,19 +1335,20 @@ public class CheckpointCoordinatorTest {
 
 		// set up the coordinator and validate the initial state
 		CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				600000,
-				600000,
-				0,
-				Integer.MAX_VALUE,
-				ExternalizedCheckpointSettings.none(),
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(1),
-				null,
-				new DisabledCheckpointStatsTracker());
+			jid,
+			600000,
+			600000,
+			0,
+			Integer.MAX_VALUE,
+			ExternalizedCheckpointSettings.none(),
+			new ExecutionVertex[] { vertex1, vertex2 },
+			new ExecutionVertex[] { vertex1, vertex2 },
+			new ExecutionVertex[] { vertex1, vertex2 },
+			new StandaloneCheckpointIDCounter(),
+			new StandaloneCompletedCheckpointStore(1),
+			null,
+			new DisabledCheckpointStatsTracker(),
+			Executors.directExecutor());
 
 		assertEquals(0, coord.getNumberOfPendingCheckpoints());
 		assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1456,19 +1471,20 @@ public class CheckpointCoordinatorTest {
 
 		// set up the coordinator and validate the initial state
 		CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				600000,
-				600000,
-				0,
-				Integer.MAX_VALUE,
-				ExternalizedCheckpointSettings.none(),
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new ExecutionVertex[] { vertex1, vertex2 },
-				counter,
-				new StandaloneCompletedCheckpointStore(10),
-				null,
-				new DisabledCheckpointStatsTracker());
+			jid,
+			600000,
+			600000,
+			0,
+			Integer.MAX_VALUE,
+			ExternalizedCheckpointSettings.none(),
+			new ExecutionVertex[] { vertex1, vertex2 },
+			new ExecutionVertex[] { vertex1, vertex2 },
+			new ExecutionVertex[] { vertex1, vertex2 },
+			counter,
+			new StandaloneCompletedCheckpointStore(10),
+			null,
+			new DisabledCheckpointStatsTracker(),
+			Executors.directExecutor());
 
 		String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
@@ -1559,19 +1575,20 @@ public class CheckpointCoordinatorTest {
 			}).when(execution).notifyCheckpointComplete(anyLong(), anyLong());
 
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid,
-					10,        // periodic interval is 10 ms
-					200000,    // timeout is very long (200 s)
-					0L,        // no extra delay
-					maxConcurrentAttempts,
-					ExternalizedCheckpointSettings.none(),
-					new ExecutionVertex[] { triggerVertex },
-					new ExecutionVertex[] { ackVertex },
-					new ExecutionVertex[] { commitVertex },
-					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(2),
-					null,
-					new DisabledCheckpointStatsTracker());
+				jid,
+				10,        // periodic interval is 10 ms
+				200000,    // timeout is very long (200 s)
+				0L,        // no extra delay
+				maxConcurrentAttempts,
+				ExternalizedCheckpointSettings.none(),
+				new ExecutionVertex[] { triggerVertex },
+				new ExecutionVertex[] { ackVertex },
+				new ExecutionVertex[] { commitVertex },
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(2),
+				null,
+				new DisabledCheckpointStatsTracker(),
+				Executors.directExecutor());
 
 			coord.startCheckpointScheduler();
 
@@ -1632,19 +1649,20 @@ public class CheckpointCoordinatorTest {
 			ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
 
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid,
-					10,        // periodic interval is 10 ms
-					200000,    // timeout is very long (200 s)
-					0L,        // no extra delay
-					maxConcurrentAttempts, // max two concurrent checkpoints
-					ExternalizedCheckpointSettings.none(),
-					new ExecutionVertex[] { triggerVertex },
-					new ExecutionVertex[] { ackVertex },
-					new ExecutionVertex[] { commitVertex },
-					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(2),
-					null,
-					new DisabledCheckpointStatsTracker());
+				jid,
+				10,        // periodic interval is 10 ms
+				200000,    // timeout is very long (200 s)
+				0L,        // no extra delay
+				maxConcurrentAttempts, // max two concurrent checkpoints
+				ExternalizedCheckpointSettings.none(),
+				new ExecutionVertex[] { triggerVertex },
+				new ExecutionVertex[] { ackVertex },
+				new ExecutionVertex[] { commitVertex },
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(2),
+				null,
+				new DisabledCheckpointStatsTracker(),
+				Executors.directExecutor());
 
 			coord.startCheckpointScheduler();
 
@@ -1714,19 +1732,20 @@ public class CheckpointCoordinatorTest {
 					});
 			
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid,
-					10,        // periodic interval is 10 ms
-					200000,    // timeout is very long (200 s)
-					0L,        // no extra delay
-					2, // max two concurrent checkpoints
-					ExternalizedCheckpointSettings.none(),
-					new ExecutionVertex[] { triggerVertex },
-					new ExecutionVertex[] { ackVertex },
-					new ExecutionVertex[] { commitVertex },
-					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(2),
-					null,
-					new DisabledCheckpointStatsTracker());
+				jid,
+				10,        // periodic interval is 10 ms
+				200000,    // timeout is very long (200 s)
+				0L,        // no extra delay
+				2, // max two concurrent checkpoints
+				ExternalizedCheckpointSettings.none(),
+				new ExecutionVertex[] { triggerVertex },
+				new ExecutionVertex[] { ackVertex },
+				new ExecutionVertex[] { commitVertex },
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(2),
+				null,
+				new DisabledCheckpointStatsTracker(),
+				Executors.directExecutor());
 			
 			coord.startCheckpointScheduler();
 
@@ -1766,19 +1785,20 @@ public class CheckpointCoordinatorTest {
 		StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter();
 
 		CheckpointCoordinator coord = new CheckpointCoordinator(
-				jobId,
-				100000,
-				200000,
-				0L,
-				1, // max one checkpoint at a time => should not affect savepoints
-				ExternalizedCheckpointSettings.none(),
-				new ExecutionVertex[] { vertex1 },
-				new ExecutionVertex[] { vertex1 },
-				new ExecutionVertex[] { vertex1 },
-				checkpointIDCounter,
-				new StandaloneCompletedCheckpointStore(2),
-				null,
-				new DisabledCheckpointStatsTracker());
+			jobId,
+			100000,
+			200000,
+			0L,
+			1, // max one checkpoint at a time => should not affect savepoints
+			ExternalizedCheckpointSettings.none(),
+			new ExecutionVertex[] { vertex1 },
+			new ExecutionVertex[] { vertex1 },
+			new ExecutionVertex[] { vertex1 },
+			checkpointIDCounter,
+			new StandaloneCompletedCheckpointStore(2),
+			null,
+			new DisabledCheckpointStatsTracker(),
+			Executors.directExecutor());
 
 		List<Future<CompletedCheckpoint>> savepointFutures = new ArrayList<>();
 
@@ -1819,19 +1839,20 @@ public class CheckpointCoordinatorTest {
 		ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
 
 		CheckpointCoordinator coord = new CheckpointCoordinator(
-				jobId,
-				100000,
-				200000,
-				100000000L, // very long min delay => should not affect savepoints
-				1,
-				ExternalizedCheckpointSettings.none(),
-				new ExecutionVertex[] { vertex1 },
-				new ExecutionVertex[] { vertex1 },
-				new ExecutionVertex[] { vertex1 },
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(2),
-				null,
-				new DisabledCheckpointStatsTracker());
+			jobId,
+			100000,
+			200000,
+			100000000L, // very long min delay => should not affect savepoints
+			1,
+			ExternalizedCheckpointSettings.none(),
+			new ExecutionVertex[] { vertex1 },
+			new ExecutionVertex[] { vertex1 },
+			new ExecutionVertex[] { vertex1 },
+			new StandaloneCheckpointIDCounter(),
+			new StandaloneCompletedCheckpointStore(2),
+			null,
+			new DisabledCheckpointStatsTracker(),
+			Executors.directExecutor());
 
 		String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
@@ -1879,19 +1900,20 @@ public class CheckpointCoordinatorTest {
 
 		// set up the coordinator and validate the initial state
 		CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				600000,
-				600000,
-				0,
-				Integer.MAX_VALUE,
-				ExternalizedCheckpointSettings.none(),
-				arrayExecutionVertices,
-				arrayExecutionVertices,
-				arrayExecutionVertices,
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(1),
-				null,
-				new DisabledCheckpointStatsTracker());
+			jid,
+			600000,
+			600000,
+			0,
+			Integer.MAX_VALUE,
+			ExternalizedCheckpointSettings.none(),
+			arrayExecutionVertices,
+			arrayExecutionVertices,
+			arrayExecutionVertices,
+			new StandaloneCheckpointIDCounter(),
+			new StandaloneCompletedCheckpointStore(1),
+			null,
+			new DisabledCheckpointStatsTracker(),
+			Executors.directExecutor());
 
 		// trigger the checkpoint
 		coord.triggerCheckpoint(timestamp, false);
@@ -1984,19 +2006,20 @@ public class CheckpointCoordinatorTest {
 
 		// set up the coordinator and validate the initial state
 		CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				600000,
-				600000,
-				0,
-				Integer.MAX_VALUE,
-				ExternalizedCheckpointSettings.none(),
-				arrayExecutionVertices,
-				arrayExecutionVertices,
-				arrayExecutionVertices,
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(1),
-				null,
-				new DisabledCheckpointStatsTracker());
+			jid,
+			600000,
+			600000,
+			0,
+			Integer.MAX_VALUE,
+			ExternalizedCheckpointSettings.none(),
+			arrayExecutionVertices,
+			arrayExecutionVertices,
+			arrayExecutionVertices,
+			new StandaloneCheckpointIDCounter(),
+			new StandaloneCompletedCheckpointStore(1),
+			null,
+			new DisabledCheckpointStatsTracker(),
+			Executors.directExecutor());
 
 		// trigger the checkpoint
 		coord.triggerCheckpoint(timestamp, false);
@@ -2099,19 +2122,20 @@ public class CheckpointCoordinatorTest {
 
 		// set up the coordinator and validate the initial state
 		CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				600000,
-				600000,
-				0,
-				Integer.MAX_VALUE,
-				ExternalizedCheckpointSettings.none(),
-				arrayExecutionVertices,
-				arrayExecutionVertices,
-				arrayExecutionVertices,
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(1),
-				null,
-				new DisabledCheckpointStatsTracker());
+			jid,
+			600000,
+			600000,
+			0,
+			Integer.MAX_VALUE,
+			ExternalizedCheckpointSettings.none(),
+			arrayExecutionVertices,
+			arrayExecutionVertices,
+			arrayExecutionVertices,
+			new StandaloneCheckpointIDCounter(),
+			new StandaloneCompletedCheckpointStore(1),
+			null,
+			new DisabledCheckpointStatsTracker(),
+			Executors.directExecutor());
 
 		// trigger the checkpoint
 		coord.triggerCheckpoint(timestamp, false);
@@ -2234,19 +2258,20 @@ public class CheckpointCoordinatorTest {
 
 		// set up the coordinator and validate the initial state
 		CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				600000,
-				600000,
-				0,
-				Integer.MAX_VALUE,
-				ExternalizedCheckpointSettings.none(),
-				arrayExecutionVertices,
-				arrayExecutionVertices,
-				arrayExecutionVertices,
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(1),
-				null,
-				new DisabledCheckpointStatsTracker());
+			jid,
+			600000,
+			600000,
+			0,
+			Integer.MAX_VALUE,
+			ExternalizedCheckpointSettings.none(),
+			arrayExecutionVertices,
+			arrayExecutionVertices,
+			arrayExecutionVertices,
+			new StandaloneCheckpointIDCounter(),
+			new StandaloneCompletedCheckpointStore(1),
+			null,
+			new DisabledCheckpointStatsTracker(),
+			Executors.directExecutor());
 
 		// trigger the checkpoint
 		coord.triggerCheckpoint(timestamp, false);
@@ -2365,19 +2390,20 @@ public class CheckpointCoordinatorTest {
 
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid,
-					600000,
-					600000,
-					0,
-					Integer.MAX_VALUE,
-					ExternalizedCheckpointSettings.externalizeCheckpoints(true),
-					new ExecutionVertex[] { vertex1 },
-					new ExecutionVertex[] { vertex1 },
-					new ExecutionVertex[] { vertex1 },
-					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(1),
-					"fake-directory",
-					new DisabledCheckpointStatsTracker());
+				jid,
+				600000,
+				600000,
+				0,
+				Integer.MAX_VALUE,
+				ExternalizedCheckpointSettings.externalizeCheckpoints(true),
+				new ExecutionVertex[] { vertex1 },
+				new ExecutionVertex[] { vertex1 },
+				new ExecutionVertex[] { vertex1 },
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(1),
+				"fake-directory",
+				new DisabledCheckpointStatsTracker(),
+				Executors.directExecutor());
 
 			assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -2741,19 +2767,20 @@ public class CheckpointCoordinatorTest {
 
 		// set up the coordinator and validate the initial state
 		CheckpointCoordinator coord = new CheckpointCoordinator(
-				new JobID(),
-				600000,
-				600000,
-				0,
-				Integer.MAX_VALUE,
-				ExternalizedCheckpointSettings.none(),
-				new ExecutionVertex[] { vertex1 },
-				new ExecutionVertex[] { vertex1 },
-				new ExecutionVertex[] { vertex1 },
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(1),
-				null,
-				new DisabledCheckpointStatsTracker());
+			new JobID(),
+			600000,
+			600000,
+			0,
+			Integer.MAX_VALUE,
+			ExternalizedCheckpointSettings.none(),
+			new ExecutionVertex[] { vertex1 },
+			new ExecutionVertex[] { vertex1 },
+			new ExecutionVertex[] { vertex1 },
+			new StandaloneCheckpointIDCounter(),
+			new StandaloneCompletedCheckpointStore(1),
+			null,
+			new DisabledCheckpointStatsTracker(),
+			Executors.directExecutor());
 
 		// Periodic
 		CheckpointTriggerResult triggerResult = coord.triggerCheckpoint(

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 6e5279b..7cea130 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -40,7 +41,6 @@ import org.hamcrest.Description;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -97,19 +97,20 @@ public class CheckpointStateRestoreTest {
 			map.put(statelessId, stateless);
 
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid,
-					200000L,
-					200000L,
-					0,
-					Integer.MAX_VALUE,
-					ExternalizedCheckpointSettings.none(),
-					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
-					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
-					new ExecutionVertex[0],
-					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(1),
-					null,
-					new DisabledCheckpointStatsTracker());
+				jid,
+				200000L,
+				200000L,
+				0,
+				Integer.MAX_VALUE,
+				ExternalizedCheckpointSettings.none(),
+				new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
+				new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
+				new ExecutionVertex[0],
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(1),
+				null,
+				new DisabledCheckpointStatsTracker(),
+				Executors.directExecutor());
 
 			// create ourselves a checkpoint with state
 			final long timestamp = 34623786L;
@@ -172,19 +173,20 @@ public class CheckpointStateRestoreTest {
 	public void testNoCheckpointAvailable() {
 		try {
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					new JobID(),
-					200000L,
-					200000L,
-					0,
-					Integer.MAX_VALUE,
-					ExternalizedCheckpointSettings.none(),
-					new ExecutionVertex[] { mock(ExecutionVertex.class) },
-					new ExecutionVertex[] { mock(ExecutionVertex.class) },
-					new ExecutionVertex[0],
-					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(1),
-					null,
-					new DisabledCheckpointStatsTracker());
+				new JobID(),
+				200000L,
+				200000L,
+				0,
+				Integer.MAX_VALUE,
+				ExternalizedCheckpointSettings.none(),
+				new ExecutionVertex[] { mock(ExecutionVertex.class) },
+				new ExecutionVertex[] { mock(ExecutionVertex.class) },
+				new ExecutionVertex[0],
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(1),
+				null,
+				new DisabledCheckpointStatsTracker(),
+				Executors.directExecutor());
 
 			try {
 				coord.restoreLatestCheckpointedState(new HashMap<JobVertexID, ExecutionJobVertex>(), true, false);
@@ -227,19 +229,20 @@ public class CheckpointStateRestoreTest {
 		tasks.put(jobVertexId2, jobVertex2);
 
 		CheckpointCoordinator coord = new CheckpointCoordinator(
-				new JobID(),
-				Integer.MAX_VALUE,
-				Integer.MAX_VALUE,
-				0,
-				Integer.MAX_VALUE,
-				ExternalizedCheckpointSettings.none(),
-				new ExecutionVertex[] {},
-				new ExecutionVertex[] {},
-				new ExecutionVertex[] {},
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(1),
-				null,
-				new DisabledCheckpointStatsTracker());
+			new JobID(),
+			Integer.MAX_VALUE,
+			Integer.MAX_VALUE,
+			0,
+			Integer.MAX_VALUE,
+			ExternalizedCheckpointSettings.none(),
+			new ExecutionVertex[] {},
+			new ExecutionVertex[] {},
+			new ExecutionVertex[] {},
+			new StandaloneCheckpointIDCounter(),
+			new StandaloneCompletedCheckpointStore(1),
+			null,
+			new DisabledCheckpointStatsTracker(),
+			Executors.directExecutor());
 
 		ChainedStateHandle<StreamStateHandle> serializedState = CheckpointCoordinatorTest
 				.generateChainedStateHandle(new SerializableObject());

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index c8c9350..c6c7ae5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -97,6 +97,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 			CompletedCheckpointStore store) throws Exception {
 		ExecutionGraph executionGraph = new ExecutionGraph(
 			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			new JobID(),
 			"test",
 			new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 84f0e1f..e918965 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -210,7 +211,15 @@ public class PendingCheckpointTest {
 
 	private static PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, String targetDirectory) {
 		Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(ACK_TASKS);
-		return new PendingCheckpoint(new JobID(), 0, 1, ackTasks, false, props, targetDirectory);
+		return new PendingCheckpoint(
+			new JobID(),
+			0,
+			1,
+			ackTasks,
+			false,
+			props,
+			targetDirectory,
+			Executors.directExecutor());
 	}
 
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
index 34043eb..0223a2e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
@@ -50,7 +50,7 @@ public class AllVerticesIteratorTest {
 			v4.setParallelism(2);
 			
 			ExecutionGraph eg = Mockito.mock(ExecutionGraph.class);
-			Mockito.when(eg.getExecutor()).thenReturn(TestingUtils.directExecutionContext());
+			Mockito.when(eg.getFutureExecutor()).thenReturn(TestingUtils.directExecutionContext());
 					
 			ExecutionJobVertex ejv1 = new ExecutionJobVertex(eg, v1, 1,
 					AkkaUtils.getDefaultTimeout());


[3/4] flink git commit: [FLINK-5085] Execute CheckpointCoordinator's state discard calls asynchronously

Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 88f9ce0..4cba4e3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -98,6 +98,7 @@ public class ArchivedExecutionGraphTest {
 
 		runtimeGraph = new ExecutionGraph(
 			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			new JobID(),
 			"test job",
 			new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index 6f6fcd0..bf3a17c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -112,7 +112,8 @@ public class ExecutionGraphConstructionTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -161,7 +162,8 @@ public class ExecutionGraphConstructionTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -235,7 +237,8 @@ public class ExecutionGraphConstructionTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -494,7 +497,8 @@ public class ExecutionGraphConstructionTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -558,7 +562,8 @@ public class ExecutionGraphConstructionTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v5, v4));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -626,7 +631,8 @@ public class ExecutionGraphConstructionTest {
 			List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
 			ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(), 
+				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
 				jobId, 
 				jobName, 
 				cfg,
@@ -672,7 +678,8 @@ public class ExecutionGraphConstructionTest {
 			List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
 			ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(), 
+				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
 				jobId, 
 				jobName,
 				cfg,
@@ -753,7 +760,8 @@ public class ExecutionGraphConstructionTest {
 			JobGraph jg = new JobGraph(jobId, jobName, v1, v2, v3, v4, v5, v6, v7, v8);
 			
 			ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(), 
+				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
 				jobId, 
 				jobName, 
 				cfg,

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index d4acd8c..ef4f74c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -89,7 +89,8 @@ public class ExecutionGraphDeploymentTest {
 			v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
 
 			ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(), 
+				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
 				jobId, 
 				"some job", 
 				new Configuration(),
@@ -313,6 +314,7 @@ public class ExecutionGraphDeploymentTest {
 		// execution graph that executes actions synchronously
 		ExecutionGraph eg = new ExecutionGraph(
 			TestingUtils.directExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId,
 			"failing test job",
 			new Configuration(),
@@ -356,7 +358,8 @@ public class ExecutionGraphDeploymentTest {
 
 		// execution graph that executes actions synchronously
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.directExecutionContext(), 
+			TestingUtils.directExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			"some job", 
 			new Configuration(), 

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 09e7c3e..d8d8e24 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -147,6 +147,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 
 			ExecutionGraph executionGraph = new ExecutionGraph(
 				executor,
+				executor,
 				jobGraph.getJobID(),
 				jobGraph.getName(),
 				jobConfig,

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index d6770a6..52bfc96 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -232,6 +232,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		// Blocking program
 		ExecutionGraph executionGraph = new ExecutionGraph(
 			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			new JobID(),
 			"TestJob",
 			new Configuration(),
@@ -547,6 +548,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		ExecutionGraph eg = new ExecutionGraph(
 			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			new JobID(),
 			"Test job",
 			new Configuration(),
@@ -679,13 +681,14 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 	private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy) throws IOException {
 		return new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				new JobID(),
-				"Test job",
-				new Configuration(),
-				new SerializedValue<>(new ExecutionConfig()),
-				AkkaUtils.getDefaultTimeout(),
-				restartStrategy);
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
+			new JobID(),
+			"Test job",
+			new Configuration(),
+			new SerializedValue<>(new ExecutionConfig()),
+			AkkaUtils.getDefaultTimeout(),
+			restartStrategy);
 	}
 
 	private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException {

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
index de4a026..fde967e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
@@ -133,6 +133,7 @@ public class ExecutionGraphSignalsTest {
 
 		eg = new ExecutionGraph(
 			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId,
 			jobName,
 			cfg,

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 28dff02..71ae3b6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -174,6 +174,7 @@ public class ExecutionGraphTestUtils {
 
 		ExecutionGraph graph = new ExecutionGraph(
 			executor,
+			executor,
 			new JobID(), 
 			"test job", 
 			new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index 8434ed7..19e2d6d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -52,7 +52,8 @@ public class ExecutionStateProgressTest {
 			ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
 
 			ExecutionGraph graph = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(), 
+				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
 				jid, 
 				"test job", 
 				new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index 7a28b4a..0e147e3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -66,7 +66,8 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -111,7 +112,8 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -157,7 +159,8 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -204,7 +207,8 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName,
 			cfg,
@@ -249,7 +253,8 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -314,7 +319,8 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -370,7 +376,8 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index 4459970..5b1a03e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -181,6 +181,7 @@ public class TerminalStateDeadlockTest {
 		TestExecGraph(JobID jobId) throws IOException {
 			super(
 				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
 				jobId,
 				"test graph",
 				EMPTY_CONFIG,

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
index 0c95695..27708a2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
@@ -80,13 +80,14 @@ public class VertexSlotSharingTest {
 			List<JobVertex> vertices = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 			
 			ExecutionGraph eg = new ExecutionGraph(
-					TestingUtils.defaultExecutionContext(),
-					new JobID(),
-					"test job",
-					new Configuration(),
-					new SerializedValue<>(new ExecutionConfig()),
-					AkkaUtils.getDefaultTimeout(),
-					new NoRestartStrategy());
+				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
+				new JobID(),
+				"test job",
+				new Configuration(),
+				new SerializedValue<>(new ExecutionConfig()),
+				AkkaUtils.getDefaultTimeout(),
+				new NoRestartStrategy());
 			eg.attachJobGraph(vertices);
 			
 			// verify that the vertices are all in the same slot sharing group

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
index c0d59fe..4beedb0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
@@ -39,7 +39,7 @@ public class FixedDelayRestartStrategyTest {
 			restartDelay);
 
 		ExecutionGraph executionGraph = mock(ExecutionGraph.class);
-		when(executionGraph.getExecutor())
+		when(executionGraph.getFutureExecutor())
 			.thenReturn(ExecutionContext$.MODULE$.fromExecutor(MoreExecutors.directExecutor()));
 
 		while(fixedDelayRestartStrategy.canRestart()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 7d9c521..69aac31 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -90,6 +90,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 
@@ -135,6 +136,8 @@ public class JobManagerHARecoveryTest {
 		flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());
 		flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots);
 
+		ExecutorService executor = null;
+
 		try {
 			Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 
@@ -152,21 +155,24 @@ public class JobManagerHARecoveryTest {
 					MemoryArchivist.class,
 					10), "archive");
 
+			executor = new ForkJoinPool();
+
 			Props jobManagerProps = Props.create(
-					TestingJobManager.class,
-					flinkConfiguration,
-					new ForkJoinPool(),
-					instanceManager,
-					scheduler,
-					new BlobLibraryCacheManager(new BlobServer(flinkConfiguration), 3600000),
-					archive,
-					new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
-					timeout,
-					myLeaderElectionService,
-					mySubmittedJobGraphStore,
-					checkpointStateFactory,
-					jobRecoveryTimeout,
-					Option.apply(null));
+				TestingJobManager.class,
+				flinkConfiguration,
+				executor,
+				executor,
+				instanceManager,
+				scheduler,
+				new BlobLibraryCacheManager(new BlobServer(flinkConfiguration), 3600000),
+				archive,
+				new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
+				timeout,
+				myLeaderElectionService,
+				mySubmittedJobGraphStore,
+				checkpointStateFactory,
+				jobRecoveryTimeout,
+				Option.apply(null));
 
 			jobManager = system.actorOf(jobManagerProps, "jobmanager");
 			ActorGateway gateway = new AkkaActorGateway(jobManager, leaderSessionID);
@@ -282,6 +288,10 @@ public class JobManagerHARecoveryTest {
 			if (taskManager != null) {
 				taskManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
 			}
+
+			if (executor != null) {
+				executor.shutdownNow();
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index 082b1de..f051281 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -183,20 +183,21 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 		CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory();
 
 		return Props.create(
-				TestingJobManager.class,
-				configuration,
-				executor,
-				new InstanceManager(),
-				new Scheduler(TestingUtils.defaultExecutionContext()),
-				new BlobLibraryCacheManager(new BlobServer(configuration), 10L),
-				ActorRef.noSender(),
-				new NoRestartStrategy.NoRestartStrategyFactory(),
-				AkkaUtils.getDefaultTimeoutAsFiniteDuration(),
-				leaderElectionService,
-				submittedJobGraphStore,
-				checkpointRecoveryFactory,
-				AkkaUtils.getDefaultTimeoutAsFiniteDuration(),
-				Option.apply(null)
+			TestingJobManager.class,
+			configuration,
+			executor,
+			executor,
+			new InstanceManager(),
+			new Scheduler(TestingUtils.defaultExecutionContext()),
+			new BlobLibraryCacheManager(new BlobServer(configuration), 10L),
+			ActorRef.noSender(),
+			new NoRestartStrategy.NoRestartStrategyFactory(),
+			AkkaUtils.getDefaultTimeoutAsFiniteDuration(),
+			leaderElectionService,
+			submittedJobGraphStore,
+			checkpointRecoveryFactory,
+			AkkaUtils.getDefaultTimeoutAsFiniteDuration(),
+			Option.apply(null)
 		);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
index 68cb668..1cbd605 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -58,6 +58,7 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers {
 
         val eg = new ExecutionGraph(
           TestingUtils.defaultExecutionContext,
+          TestingUtils.defaultExecutionContext,
           new JobID(),
           "test job",
           new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 50a5559..269a66f 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.testingUtils
 
-import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException}
+import java.util.concurrent.{Executor, ExecutorService, TimeUnit, TimeoutException}
 
 import akka.actor.{ActorRef, ActorSystem, Props}
 import akka.pattern.Patterns._
@@ -80,7 +80,8 @@ class TestingCluster(
   override def getJobManagerProps(
     jobManagerClass: Class[_ <: JobManager],
     configuration: Configuration,
-    executorService: ExecutorService,
+    futureExecutor: Executor,
+    ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -96,7 +97,8 @@ class TestingCluster(
     val props = super.getJobManagerProps(
       jobManagerClass,
       configuration,
-      executorService,
+      futureExecutor,
+      ioExecutor,
       instanceManager,
       scheduler,
       libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index c6fd923..39c7a53 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -39,7 +39,8 @@ import scala.language.postfixOps
   */
 class TestingJobManager(
     flinkConfiguration: Configuration,
-    executor: Executor,
+    futureExecutor: Executor,
+    ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -53,7 +54,8 @@ class TestingJobManager(
     metricRegistry : Option[MetricRegistry])
   extends JobManager(
     flinkConfiguration,
-    executor,
+    futureExecutor,
+    ioExecutor,
     instanceManager,
     scheduler,
     libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index 37ea68a..e77cbb3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -139,6 +139,7 @@ public class RescalePartitionerTest extends TestLogger {
 
 		ExecutionGraph eg = new ExecutionGraph(
 			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId,
 			jobName,
 			cfg,

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index aabc19d..5244124 100644
--- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -40,8 +40,9 @@ import scala.concurrent.duration.FiniteDuration
   * instead of an anonymous class with the respective mixin to obtain a more readable logger name.
   *
   * @param flinkConfiguration Configuration object for the actor
-  * @param executor Execution context which is used to execute concurrent tasks in the
+  * @param futureExecutor Execution context which is used to execute concurrent tasks in the
   *                         [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
+  * @param ioExecutor for blocking io operations
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
   * @param scheduler Scheduler to schedule Flink jobs
@@ -53,7 +54,8 @@ import scala.concurrent.duration.FiniteDuration
   */
 class TestingYarnJobManager(
     flinkConfiguration: Configuration,
-    executor: Executor,
+    futureExecutor: Executor,
+    ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -67,7 +69,8 @@ class TestingYarnJobManager(
     metricRegistry : Option[MetricRegistry])
   extends YarnJobManager(
     flinkConfiguration,
-    executor,
+    futureExecutor,
+    ioExecutor,
     instanceManager,
     scheduler,
     libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index da5959b..8f2cc33 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -60,6 +60,8 @@ import org.apache.hadoop.yarn.util.Records;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.Option;
+import scala.Some;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
@@ -339,8 +341,8 @@ public class YarnApplicationMasterRunner {
 				actorSystem,
 				futureExecutor,
 				ioExecutor,
-				new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
-				scala.Option.<String>empty(),
+				new Some<>(JobManager.JOB_MANAGER_NAME()),
+				Option.<String>empty(),
 				getJobManagerClass(),
 				getArchivistClass())._1();
 
@@ -379,7 +381,6 @@ public class YarnApplicationMasterRunner {
 
 			ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps);
 
-
 			// 4: Process reapers
 			// The process reapers ensure that upon unexpected actor death, the process exits
 			// and does not stay lingering around unresponsive
@@ -414,6 +415,9 @@ public class YarnApplicationMasterRunner {
 				}
 			}
 
+			futureExecutor.shutdownNow();
+			ioExecutor.shutdownNow();
+
 			return INIT_ERROR_EXIT_CODE;
 		}
 
@@ -432,8 +436,11 @@ public class YarnApplicationMasterRunner {
 			}
 		}
 
-		futureExecutor.shutdownNow();
-		ioExecutor.shutdownNow();
+		org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(
+			AkkaUtils.getTimeout(config).toMillis(),
+			TimeUnit.MILLISECONDS,
+			futureExecutor,
+			ioExecutor);
 
 		return 0;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index a81e6cf..db4eea8 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -40,8 +40,9 @@ import scala.language.postfixOps
   * to start/administer/stop the Yarn session.
   *
   * @param flinkConfiguration Configuration object for the actor
-  * @param executor Execution context which is used to execute concurrent tasks in the
+  * @param futureExecutor Execution context which is used to execute concurrent tasks in the
   *                         [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
+  * @param ioExecutor for blocking io operations
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
   * @param scheduler Scheduler to schedule Flink jobs
@@ -53,7 +54,8 @@ import scala.language.postfixOps
   */
 class YarnJobManager(
     flinkConfiguration: FlinkConfiguration,
-    executor: Executor,
+    futureExecutor: Executor,
+    ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -67,7 +69,8 @@ class YarnJobManager(
     metricsRegistry: Option[MetricRegistry])
   extends ContaineredJobManager(
     flinkConfiguration,
-    executor,
+    futureExecutor,
+    ioExecutor,
     instanceManager,
     scheduler,
     libraryCacheManager,