You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/22 08:48:46 UTC

[2/4] flink git commit: [FLINK-5073] Use Executor to run ZooKeeper callbacks in ZooKeeperStateHandleStore

[FLINK-5073] Use Executor to run ZooKeeper callbacks in ZooKeeperStateHandleStore

Use dedicated Executor to run ZooKeeper callbacks in ZooKeeperStateHandleStore instead
of running it in the ZooKeeper client's thread. The callback can be blocking because it
discards state which might entail deleting files from disk.

Add TestExecutors

Introduce dedicated Executor for blocking io operations


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2e4c193
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2e4c193
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2e4c193

Branch: refs/heads/release-1.1
Commit: f2e4c193e1fb6b0cf26861bc01c2f3d6bcd4d8f6
Parents: 7fb71c5
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Nov 15 22:45:04 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 22 09:47:57 2016 +0100

----------------------------------------------------------------------
 .../BackPressureStatsTrackerITCase.java         |  6 +-
 .../StackTraceSampleCoordinatorITCase.java      |  6 +-
 .../webmonitor/WebRuntimeMonitorITCase.java     |  1 +
 .../ZooKeeperCheckpointRecoveryFactory.java     | 12 +++-
 .../ZooKeeperCompletedCheckpointStore.java      |  7 ++-
 .../ZooKeeperSubmittedJobGraphStore.java        |  7 ++-
 .../flink/runtime/util/ZooKeeperUtils.java      | 22 +++++---
 .../runtime/zookeeper/StateStorageHelper.java   |  2 +-
 .../zookeeper/ZooKeeperStateHandleStore.java    | 13 ++++-
 .../flink/runtime/jobmanager/JobManager.scala   | 59 +++++++++++++-------
 .../runtime/minicluster/FlinkMiniCluster.scala  |  9 ++-
 .../minicluster/LocalFlinkMiniCluster.scala     |  3 +-
 ...ZooKeeperCompletedCheckpointStoreITCase.java |  3 +-
 .../runtime/jobmanager/JobManagerTest.java      |  4 ++
 .../flink/runtime/jobmanager/JobSubmitTest.java |  1 +
 .../ZooKeeperSubmittedJobGraphsStoreITCase.java | 15 ++---
 .../resourcemanager/ClusterShutdownITCase.java  | 14 ++++-
 .../resourcemanager/ResourceManagerITCase.java  | 14 ++++-
 ...askManagerComponentsStartupShutdownTest.java |  1 +
 .../TaskManagerProcessReapingTestBase.java      |  1 +
 .../TaskManagerRegistrationTest.java            |  8 ++-
 .../flink/runtime/util/TestExecutors.java       | 40 +++++++++++++
 .../ZooKeeperStateHandleStoreITCase.java        | 31 +++++-----
 .../jobmanager/JobManagerRegistrationTest.scala |  1 +
 .../runtime/testingUtils/TestingCluster.scala   |  5 +-
 .../runtime/testingUtils/TestingUtils.scala     | 35 ++++++++----
 .../test/util/ForkableFlinkMiniCluster.scala    |  3 +-
 ...ctTaskManagerProcessFailureRecoveryTest.java |  1 +
 .../recovery/ProcessFailureCancelingITCase.java |  1 +
 .../flink/yarn/YarnApplicationMasterRunner.java | 12 +++-
 30 files changed, 248 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
index 9fbbd90..ee6a411 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -120,7 +120,11 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
 			}
 
 			try {
-				jobManger = TestingUtils.createJobManager(testActorSystem, testActorSystem.dispatcher(), new Configuration());
+				jobManger = TestingUtils.createJobManager(
+					testActorSystem,
+					testActorSystem.dispatcher(),
+					testActorSystem.dispatcher(),
+					new Configuration());
 
 				Configuration config = new Configuration();
 				config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index 868dae1..cb13f02 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -90,7 +90,11 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger {
 			ActorGateway taskManager = null;
 
 			try {
-				jobManger = TestingUtils.createJobManager(testActorSystem, testActorSystem.dispatcher(), new Configuration());
+				jobManger = TestingUtils.createJobManager(
+					testActorSystem,
+					testActorSystem.dispatcher(),
+					testActorSystem.dispatcher(),
+					new Configuration());
 
 				Configuration config = new Configuration();
 				config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index a6b958a..6c836c8 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -179,6 +179,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 					jmConfig,
 					jobManagerSystem[i],
 					jobManagerSystem[i].dispatcher(),
+					jobManagerSystem[i].dispatcher(),
 					JobManager.class,
 					MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
index dcd6260..df713d8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
@@ -24,6 +24,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 
+import java.util.concurrent.Executor;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -35,9 +37,15 @@ public class ZooKeeperCheckpointRecoveryFactory implements CheckpointRecoveryFac
 
 	private final Configuration config;
 
-	public ZooKeeperCheckpointRecoveryFactory(CuratorFramework client, Configuration config) {
+	private final Executor executor;
+
+	public ZooKeeperCheckpointRecoveryFactory(
+			CuratorFramework client,
+			Configuration config,
+			Executor executor) {
 		this.client = checkNotNull(client, "Curator client");
 		this.config = checkNotNull(config, "Configuration");
+		this.executor = checkNotNull(executor, "Executor");
 	}
 
 	@Override
@@ -55,7 +63,7 @@ public class ZooKeeperCheckpointRecoveryFactory implements CheckpointRecoveryFac
 			throws Exception {
 
 		return ZooKeeperUtils.createCompletedCheckpoints(client, config, jobId,
-				NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, userClassLoader);
+				NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, userClassLoader, executor);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 541629d..6570d00 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -35,6 +35,7 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.ConcurrentModificationException;
 import java.util.List;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -94,6 +95,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 	 *                                       start with a '/')
 	 * @param stateStorage                   State storage to be used to persist the completed
 	 *                                       checkpoint
+	 * @param executor to give to the ZooKeeperStateHandleStore to run ZooKeeper callbacks
 	 * @throws Exception
 	 */
 	public ZooKeeperCompletedCheckpointStore(
@@ -101,7 +103,8 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 			ClassLoader userClassLoader,
 			CuratorFramework client,
 			String checkpointsPath,
-			StateStorageHelper<CompletedCheckpoint> stateStorage) throws Exception {
+			StateStorageHelper<CompletedCheckpoint> stateStorage,
+			Executor executor) throws Exception {
 
 		checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint.");
 		checkNotNull(stateStorage, "State storage");
@@ -119,7 +122,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 		// All operations will have the path as root
 		this.client = client.usingNamespace(client.getNamespace() + checkpointsPath);
 
-		this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage);
+		this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage, executor);
 
 		this.checkpointStateHandles = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 128db83..a1dd14b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -39,6 +39,7 @@ import java.util.ConcurrentModificationException;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -93,12 +94,14 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 	 * @param client ZooKeeper client
 	 * @param currentJobsPath ZooKeeper path for current job graphs
 	 * @param stateStorage State storage used to persist the submitted jobs
+	 * @param executor to give to the ZooKeeperStateHandleStore to run ZooKeeper callbacks
 	 * @throws Exception
 	 */
 	public ZooKeeperSubmittedJobGraphStore(
 			CuratorFramework client,
 			String currentJobsPath,
-			StateStorageHelper<SubmittedJobGraph> stateStorage) throws Exception {
+			StateStorageHelper<SubmittedJobGraph> stateStorage,
+			Executor executor) throws Exception {
 
 		checkNotNull(currentJobsPath, "Current jobs path");
 		checkNotNull(stateStorage, "State storage");
@@ -114,7 +117,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 		// All operations will have the path as root
 		CuratorFramework facade = client.usingNamespace(client.getNamespace() + currentJobsPath);
 
-		this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage);
+		this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage, executor);
 
 		this.pathCache = new PathChildrenCache(facade, "/", false);
 		pathCache.getListenable().addListener(new SubmittedJobGraphsPathCacheListener());

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 3986fed..472a2fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -188,11 +189,13 @@ public class ZooKeeperUtils {
 	 *
 	 * @param client        The {@link CuratorFramework} ZooKeeper client to use
 	 * @param configuration {@link Configuration} object
+	 * @param executor to run ZooKeeper callbacks
 	 * @return {@link ZooKeeperSubmittedJobGraphStore} instance
 	 */
 	public static ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphs(
 			CuratorFramework client,
-			Configuration configuration) throws Exception {
+			Configuration configuration,
+			Executor executor) throws Exception {
 
 		checkNotNull(configuration, "Configuration");
 
@@ -204,7 +207,7 @@ public class ZooKeeperUtils {
 				ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
 
 		return new ZooKeeperSubmittedJobGraphStore(
-				client, zooKeeperSubmittedJobsPath, stateStorage);
+				client, zooKeeperSubmittedJobsPath, stateStorage, executor);
 	}
 
 	/**
@@ -215,6 +218,7 @@ public class ZooKeeperUtils {
 	 * @param jobId                          ID of job to create the instance for
 	 * @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain
 	 * @param userClassLoader                User code class loader
+	 * @param executor to run ZooKeeper callbacks
 	 * @return {@link ZooKeeperCompletedCheckpointStore} instance
 	 */
 	public static CompletedCheckpointStore createCompletedCheckpoints(
@@ -222,7 +226,8 @@ public class ZooKeeperUtils {
 			Configuration configuration,
 			JobID jobId,
 			int maxNumberOfCheckpointsToRetain,
-			ClassLoader userClassLoader) throws Exception {
+			ClassLoader userClassLoader,
+			Executor executor) throws Exception {
 
 		checkNotNull(configuration, "Configuration");
 
@@ -237,11 +242,12 @@ public class ZooKeeperUtils {
 		checkpointsPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
 
 		return new ZooKeeperCompletedCheckpointStore(
-				maxNumberOfCheckpointsToRetain,
-				userClassLoader,
-				client,
-				checkpointsPath,
-				stateStorage);
+			maxNumberOfCheckpointsToRetain,
+			userClassLoader,
+			client,
+			checkpointsPath,
+			stateStorage,
+			executor);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java
index 36fb849..ce47462 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java
@@ -23,7 +23,7 @@ import org.apache.flink.runtime.state.StateHandle;
 import java.io.Serializable;
 
 /**
- * State storage helper which is used by {@link ZooKeeperStateHandleStore} to persiste state before
+ * State storage helper which is used by {@link ZooKeeperStateHandleStore} to persist state before
  * the state handle is written to ZooKeeper.
  *
  * @param <T>

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index dea3452..6576ff8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -30,10 +30,10 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -74,6 +74,8 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 
 	private final StateStorageHelper<T> storage;
 
+	private final Executor executor;
+
 	/**
 	 * Creates a {@link ZooKeeperStateHandleStore}.
 	 *
@@ -81,13 +83,18 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	 *                            expected that the client's namespace ensures that the root
 	 *                            path is exclusive for all state handles managed by this
 	 *                            instance, e.g. <code>client.usingNamespace("/stateHandles")</code>
+	 * @param storage to persist the actual state and whose returned state handle is then written
+	 *                to ZooKeeper
+	 * @param executor to run the ZooKeeper callbacks
 	 */
 	public ZooKeeperStateHandleStore(
 		CuratorFramework client,
-		StateStorageHelper storage) throws IOException {
+		StateStorageHelper storage,
+		Executor executor) {
 
 		this.client = checkNotNull(client, "Curator client");
 		this.storage = checkNotNull(storage, "State storage");
+		this.executor = checkNotNull(executor);
 	}
 
 	/**
@@ -350,7 +357,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 		checkNotNull(callback, "Background callback");
 
-		client.delete().deletingChildrenIfNeeded().inBackground(callback).forPath(pathInZooKeeper);
+		client.delete().deletingChildrenIfNeeded().inBackground(callback, executor).forPath(pathInZooKeeper);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index c6e18e9..ad998de 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2012,24 +2012,30 @@ object JobManager {
 
     val numberProcessors = Hardware.getNumberCPUCores()
 
-    val executor = Executors.newFixedThreadPool(
+    val futureExecutor = Executors.newFixedThreadPool(
       numberProcessors,
       new NamedThreadFactory("jobmanager-future-", "-thread-"))
 
+    val ioExecutor = Executors.newFixedThreadPool(
+      numberProcessors,
+      new NamedThreadFactory("jobmanager-io-", "-thread-")
+    )
+
     val (jobManagerSystem, _, _, webMonitorOption, _) = try {
       startActorSystemAndJobManagerActors(
         configuration,
         executionMode,
         listeningAddress,
         listeningPort,
-        executor,
+        futureExecutor,
+        ioExecutor,
         classOf[JobManager],
         classOf[MemoryArchivist],
         Option(classOf[StandaloneResourceManager])
       )
     } catch {
       case t: Throwable =>
-          executor.shutdownNow()
+          futureExecutor.shutdownNow()
 
         throw t
     }
@@ -2047,7 +2053,8 @@ object JobManager {
         }
     }
 
-    executor.shutdownNow()
+    futureExecutor.shutdownNow()
+    ioExecutor.shutdownNow()
   }
 
   /**
@@ -2155,7 +2162,8 @@ object JobManager {
     *                      additional TaskManager in the same process.
     * @param listeningAddress The hostname where the JobManager should listen for messages.
     * @param listeningPort The port where the JobManager should listen for messages
-    * @param executor to run the JobManager's futures
+    * @param futureExecutor to run the JobManager's futures
+    * @param ioExecutor to run blocking io operations
     * @param jobManagerClass The class of the JobManager to be started
     * @param archiveClass The class of the Archivist to be started
     * @param resourceManagerClass Optional class of resource manager if one should be started
@@ -2167,7 +2175,8 @@ object JobManager {
       executionMode: JobManagerMode,
       listeningAddress: String,
       listeningPort: Int,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist],
       resourceManagerClass: Option[Class[_ <: FlinkResourceManager[_]]])
@@ -2236,7 +2245,8 @@ object JobManager {
       val (jobManager, archive) = startJobManagerActors(
         configuration,
         jobManagerSystem,
-        executor,
+        futureExecutor,
+        ioExecutor,
         jobManagerClass,
         archiveClass)
 
@@ -2440,14 +2450,16 @@ object JobManager {
    *              delayBetweenRetries, timeout)
    *
    * @param configuration The configuration from which to parse the config values.
-   * @param executor to run JobManager's futures
+   * @param futureExecutor to run JobManager's futures
+   * @param ioExecutor to run blocking io operations
    * @param leaderElectionServiceOption LeaderElectionService which shall be returned if the option
    *                                    is defined
    * @return The members for a default JobManager.
    */
   def createJobManagerComponents(
       configuration: Configuration,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       leaderElectionServiceOption: Option[LeaderElectionService]) :
     (InstanceManager,
     FlinkScheduler,
@@ -2479,11 +2491,11 @@ object JobManager {
     var instanceManager: InstanceManager = null
     var scheduler: FlinkScheduler = null
     var libraryCacheManager: BlobLibraryCacheManager = null
-    
+
     try {
       blobServer = new BlobServer(configuration)
       instanceManager = new InstanceManager()
-      scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(executor))
+      scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(futureExecutor))
       libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval)
 
       instanceManager.addInstanceListener(scheduler)
@@ -2528,8 +2540,8 @@ object JobManager {
           }
 
           (leaderElectionService,
-            ZooKeeperUtils.createSubmittedJobGraphs(client, configuration),
-            new ZooKeeperCheckpointRecoveryFactory(client, configuration))
+            ZooKeeperUtils.createSubmittedJobGraphs(client, configuration, ioExecutor),
+            new ZooKeeperCheckpointRecoveryFactory(client, configuration, ioExecutor))
       }
 
     val savepointStore = SavepointStoreFactory.createFromConfig(configuration)
@@ -2576,14 +2588,17 @@ object JobManager {
    *
    * @param configuration The configuration for the JobManager
    * @param actorSystem The actor system running the JobManager
+   * @param futureExecutor to run JobManager's futures
+   * @param ioExecutor to run blocking io operations
    * @param jobManagerClass The class of the JobManager to be started
    * @param archiveClass The class of the MemoryArchivist to be started
-    * @return A tuple of references (JobManager Ref, Archiver Ref)
+   * @return A tuple of references (JobManager Ref, Archiver Ref)
    */
   def startJobManagerActors(
       configuration: Configuration,
       actorSystem: ActorSystem,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist])
     : (ActorRef, ActorRef) = {
@@ -2591,7 +2606,8 @@ object JobManager {
     startJobManagerActors(
       configuration,
       actorSystem,
-      executor,
+      futureExecutor,
+      ioExecutor,
       Some(JOB_MANAGER_NAME),
       Some(ARCHIVE_NAME),
       jobManagerClass,
@@ -2604,7 +2620,8 @@ object JobManager {
    *
    * @param configuration The configuration for the JobManager
    * @param actorSystem The actor system running the JobManager
-   * @param executor to run JobManager's futures
+   * @param futureExecutor to run JobManager's futures
+   * @param ioExecutor to run blocking io operations
    * @param jobManagerActorName Optionally the name of the JobManager actor. If none is given,
    *                          the actor will have the name generated by the actor system.
    * @param archiveActorName Optionally the name of the archive actor. If none is given,
@@ -2616,7 +2633,8 @@ object JobManager {
   def startJobManagerActors(
       configuration: Configuration,
       actorSystem: ActorSystem,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       jobManagerActorName: Option[String],
       archiveActorName: Option[String],
       jobManagerClass: Class[_ <: JobManager],
@@ -2636,7 +2654,8 @@ object JobManager {
     jobRecoveryTimeout, 
     metricsRegistry) = createJobManagerComponents(
       configuration,
-      executor,
+      futureExecutor,
+      ioExecutor,
       None)
 
     val archiveProps = Props(archiveClass, archiveCount)
@@ -2650,7 +2669,7 @@ object JobManager {
     val jobManagerProps = Props(
       jobManagerClass,
       configuration,
-      executor,
+      futureExecutor,
       instanceManager,
       scheduler,
       libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 271535e..57f0a83 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -106,7 +106,11 @@ abstract class FlinkMiniCluster(
 
   private var isRunning = false
 
-  val executor = Executors.newFixedThreadPool(
+  val futureExecutor = Executors.newFixedThreadPool(
+    Hardware.getNumberCPUCores(),
+    new NamedThreadFactory("mini-cluster-future-", "-thread"))
+
+  val ioExecutor = Executors.newFixedThreadPool(
     Hardware.getNumberCPUCores(),
     new NamedThreadFactory("mini-cluster-future-", "-thread-"))
 
@@ -374,7 +378,8 @@ abstract class FlinkMiniCluster(
     jobManagerLeaderRetrievalService.foreach(_.stop())
     isRunning = false
 
-    executor.shutdownNow
+    futureExecutor.shutdownNow()
+    ioExecutor.shutdownNow()
   }
 
   protected def shutdown(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 594997c..ba9639b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -82,7 +82,8 @@ class LocalFlinkMiniCluster(
     val (jobManager, _) = JobManager.startJobManagerActors(
       config,
       system,
-      executor,
+      futureExecutor,
+      ioExecutor,
       Some(jobManagerName),
       Some(archiveName),
       classOf[JobManager],

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 380ba2c..44ffbb6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.TestExecutors;
 import org.apache.flink.runtime.zookeeper.StateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.junit.AfterClass;
@@ -66,7 +67,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 			public StateHandle<CompletedCheckpoint> store(CompletedCheckpoint state) throws Exception {
 				return new LocalStateHandle<>(state);
 			}
-		});
+		}, TestExecutors.directExecutor());
 	}
 
 	// ---------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 148e88f..b56bf29 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -366,6 +366,8 @@ public class JobManagerTest extends TestLogger {
 	}
 
 	/**
+					system.dispatcher(),
+				actorSystem.dispatcher(),
 	 * Tests that we can trigger a
 	 *
 	 * @throws Exception
@@ -391,6 +393,7 @@ public class JobManagerTest extends TestLogger {
 				config,
 				actorSystem,
 				actorSystem.dispatcher(),
+				actorSystem.dispatcher(),
 				Option.apply("jm"),
 				Option.apply("arch"),
 				TestingJobManager.class,
@@ -485,6 +488,7 @@ public class JobManagerTest extends TestLogger {
 				new Configuration(),
 				actorSystem,
 				actorSystem.dispatcher(),
+				actorSystem.dispatcher(),
 				Option.apply("jm"),
 				Option.apply("arch"),
 				TestingJobManager.class,

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 42ed25b..31ecb46 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -81,6 +81,7 @@ public class JobSubmitTest {
 				config,
 			jobManagerSystem,
 			jobManagerSystem.dispatcher(),
+			jobManagerSystem.dispatcher(),
 			JobManager.class,
 			MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index c71bd35..8eaecd0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.jobmanager;
 
 import akka.actor.ActorRef;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -27,6 +26,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener;
 import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.TestExecutors;
 import org.apache.flink.runtime.zookeeper.StateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.apache.flink.util.TestLogger;
@@ -82,7 +82,8 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
 			ZooKeeper.createClient(),
 			"/testPutAndRemoveJobGraph",
-			localStateStorage);
+			localStateStorage,
+			TestExecutors.directExecutor());
 
 		try {
 			SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
@@ -134,7 +135,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 	@Test
 	public void testRecoverJobGraphs() throws Exception {
 		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-				ZooKeeper.createClient(), "/testRecoverJobGraphs", localStateStorage);
+				ZooKeeper.createClient(), "/testRecoverJobGraphs", localStateStorage, TestExecutors.directExecutor());
 
 		try {
 			SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
@@ -184,10 +185,10 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 
 		try {
 			jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage);
+					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage, TestExecutors.directExecutor());
 
 			otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
-					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage);
+					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage, TestExecutors.directExecutor());
 
 
 			SubmittedJobGraph jobGraph = createSubmittedJobGraph(new JobID(), 0);
@@ -243,10 +244,10 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 	@Test(expected = IllegalStateException.class)
 	public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception {
 		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
+				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage, TestExecutors.directExecutor());
 
 		ZooKeeperSubmittedJobGraphStore otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
-				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
+				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage, TestExecutors.directExecutor());
 
 		jobGraphs.start(null);
 		otherJobGraphs.start(null);

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
index 8530ce6..fa7841a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
@@ -72,7 +72,12 @@ public class ClusterShutdownITCase extends TestLogger {
 
 			// start job manager which doesn't shutdown the actor system
 			ActorGateway jobManager =
-				TestingUtils.createJobManager(system, system.dispatcher(), config, "jobmanager1");
+				TestingUtils.createJobManager(
+					system,
+					system.dispatcher(),
+					system.dispatcher(),
+					config,
+					"jobmanager1");
 
 			// Tell the JobManager to inform us of shutdown actions
 			jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
@@ -114,7 +119,12 @@ public class ClusterShutdownITCase extends TestLogger {
 
 			// start job manager which doesn't shutdown the actor system
 			ActorGateway jobManager =
-				TestingUtils.createJobManager(system, system.dispatcher(), config, "jobmanager2");
+				TestingUtils.createJobManager(
+					system,
+					system.dispatcher(),
+					system.dispatcher(),
+					config,
+					"jobmanager2");
 
 			// Tell the JobManager to inform us of shutdown actions
 			jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
index bfc6abe..3a8a200 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
@@ -72,7 +72,12 @@ public class ResourceManagerITCase extends TestLogger {
 		protected void run() {
 
 			ActorGateway jobManager =
-				TestingUtils.createJobManager(system, system.dispatcher(), config, "ReconciliationTest");
+				TestingUtils.createJobManager(
+					system,
+					system.dispatcher(),
+					system.dispatcher(),
+					config,
+					"ReconciliationTest");
 			ActorGateway me =
 				TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
 
@@ -125,7 +130,12 @@ public class ResourceManagerITCase extends TestLogger {
 		protected void run() {
 
 			ActorGateway jobManager =
-				TestingUtils.createJobManager(system, system.dispatcher(), config, "RegTest");
+				TestingUtils.createJobManager(
+					system,
+					system.dispatcher(),
+					system.dispatcher(),
+					config,
+					"RegTest");
 			ActorGateway me =
 				TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 46bc7a5..04f7fdb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -81,6 +81,7 @@ public class TaskManagerComponentsStartupShutdownTest {
 				config,
 				actorSystem,
 				actorSystem.dispatcher(),
+				actorSystem.dispatcher(),
 				JobManager.class,
 				MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
index 63c1b29..dead732 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -103,6 +103,7 @@ public abstract class TaskManagerProcessReapingTestBase {
 				new Configuration(),
 				jmActorSystem,
 				jmActorSystem.dispatcher(),
+				jmActorSystem.dispatcher(),
 				JobManager.class,
 				MemoryArchivist.class)._1;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 52d500d..88e549e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -112,7 +112,11 @@ public class TaskManagerRegistrationTest extends TestLogger {
 
 			try {
 				// a simple JobManager
-				jobManager = createJobManager(actorSystem, actorSystem.dispatcher(), config);
+				jobManager = createJobManager(
+					actorSystem,
+					actorSystem.dispatcher(),
+					actorSystem.dispatcher(),
+					config);
 				startResourceManager(config, jobManager.actor());
 
 				// start two TaskManagers. it will automatically try to register
@@ -195,6 +199,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 				jobManager = createJobManager(
 					actorSystem,
 					actorSystem.dispatcher(),
+					actorSystem.dispatcher(),
 					new Configuration());
 
 				startResourceManager(config, jobManager.actor());
@@ -700,6 +705,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 			configuration,
 			actorSystem,
 			actorSystem.dispatcher(),
+			actorSystem.dispatcher(),
 			NONE_STRING,
 			NONE_STRING,
 			JobManager.class,

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestExecutors.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestExecutors.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestExecutors.java
new file mode 100644
index 0000000..703593c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestExecutors.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import java.util.concurrent.Executor;
+
+public class TestExecutors {
+
+	public static Executor directExecutor() {
+		return DirectExecutor.INSTANCE;
+	}
+
+	private static final class DirectExecutor implements Executor {
+
+		public static final DirectExecutor INSTANCE = new DirectExecutor();
+
+		private DirectExecutor() {}
+
+		@Override
+		public void execute(Runnable command) {
+			command.run();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
index 7505bfc..5a2b337 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
@@ -23,6 +23,7 @@ import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.TestExecutors;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
 import org.apache.zookeeper.CreateMode;
@@ -85,7 +86,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 	public void testAdd() throws Exception {
 		LongStateStorage longStateStorage = new LongStateStorage();
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<Long>(
-				ZooKeeper.getClient(), longStateStorage);
+				ZooKeeper.getClient(), longStateStorage, TestExecutors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testAdd";
@@ -120,7 +121,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 	public void testAddWithCreateMode() throws Exception {
 		LongStateStorage longStateStorage = new LongStateStorage();
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<Long>(
-				ZooKeeper.getClient(), longStateStorage);
+				ZooKeeper.getClient(), longStateStorage, TestExecutors.directExecutor());
 
 		// Config
 		Long state = 3457347234L;
@@ -182,7 +183,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, TestExecutors.directExecutor());
 
 		ZooKeeper.getClient().create().forPath("/testAddAlreadyExistingPath");
 
@@ -201,7 +202,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		when(client.create()).thenThrow(new RuntimeException("Expected test Exception."));
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				client, stateHandleProvider);
+				client, stateHandleProvider, TestExecutors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testAddDiscardStateHandleAfterFailure";
@@ -231,7 +232,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, TestExecutors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testReplace";
@@ -270,7 +271,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		StateStorageHelper<Long> stateStorage = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateStorage);
+				ZooKeeper.getClient(), stateStorage, TestExecutors.directExecutor());
 
 		store.replace("/testReplaceNonExistingPath", 0, 1L);
 	}
@@ -287,7 +288,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		when(client.setData()).thenThrow(new RuntimeException("Expected test Exception."));
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				client, stateHandleProvider);
+				client, stateHandleProvider, TestExecutors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testReplaceDiscardStateHandleAfterFailure";
@@ -329,7 +330,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, TestExecutors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testGetAndExists";
@@ -354,7 +355,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, TestExecutors.directExecutor());
 
 		store.get("/testGetNonExistingPath");
 	}
@@ -368,7 +369,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, TestExecutors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testGetAll";
@@ -399,7 +400,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, TestExecutors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testGetAllSortedByName";
@@ -429,7 +430,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, TestExecutors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testRemove";
@@ -453,7 +454,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, TestExecutors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testRemoveWithCallback";
@@ -492,7 +493,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, TestExecutors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testDiscard";
@@ -514,7 +515,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, TestExecutors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testDiscardAll";

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index b35cdb4..7174bc8 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -169,6 +169,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
       new Configuration(),
       _system,
       _system.dispatcher,
+      _system.dispatcher,
       None,
       None,
       classOf[JobManager],

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index c3f846e..c7c141a 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -109,7 +109,8 @@ class TestingCluster(
     jobRecoveryTimeout,
     metricRegistry) = JobManager.createJobManagerComponents(
       config,
-      executor,
+      futureExecutor,
+      ioExecutor,
       createLeaderElectionService())
 
     val testArchiveProps = Props(new TestingMemoryArchivist(archiveCount))
@@ -118,7 +119,7 @@ class TestingCluster(
     val jobManagerProps = Props(
       new TestingJobManager(
         configuration,
-        executor,
+        futureExecutor,
         instanceManager,
         scheduler,
         libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 576993d..fadef28 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -303,18 +303,21 @@ object TestingUtils {
   /** Creates a testing JobManager using the default recovery mode (standalone)
     *
     * @param actorSystem The ActorSystem to use
-    * @param executor to run the JobManager's futures
+    * @param futureExecutor to run the JobManager's futures
+    * @param ioExecutor to run blocking io operations
     * @param configuration The Flink configuration
     * @return
     */
   def createJobManager(
       actorSystem: ActorSystem,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       configuration: Configuration)
     : ActorGateway = {
     createJobManager(
       actorSystem,
-      executor,
+      futureExecutor,
+      ioExecutor,
       configuration,
       classOf[TestingJobManager],
       ""
@@ -325,20 +328,23 @@ object TestingUtils {
     * Additional prefix can be supplied for the Actor system names
     *
     * @param actorSystem The ActorSystem to use
-    * @param executor to run the JobManager's futures
+    * @param futureExecutor to run the JobManager's futures
+    * @param ioExecutor to run blocking io operations
     * @param configuration The Flink configuration
     * @param prefix The prefix for the actor names
     * @return
     */
   def createJobManager(
       actorSystem: ActorSystem,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       configuration: Configuration,
       prefix: String)
     : ActorGateway = {
     createJobManager(
       actorSystem,
-      executor,
+      futureExecutor,
+      ioExecutor,
       configuration,
       classOf[TestingJobManager],
       prefix
@@ -349,19 +355,21 @@ object TestingUtils {
     * Creates a JobManager of the given class using the default recovery mode (standalone)
     *
     * @param actorSystem ActorSystem to use
-    * @param executor to run the JobManager's futures
+    * @param futureExecutor to run the JobManager's futures
+    * @param ioExecutor to run blocking io operations
     * @param configuration Configuration to use
     * @param jobManagerClass JobManager class to instantiate
     * @return
     */
   def createJobManager(
       actorSystem: ActorSystem,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       configuration: Configuration,
       jobManagerClass: Class[_ <: JobManager])
     : ActorGateway = {
 
-    createJobManager(actorSystem, executor, configuration, jobManagerClass, "")
+    createJobManager(actorSystem, futureExecutor, ioExecutor, configuration, jobManagerClass, "")
   }
 
   /**
@@ -369,7 +377,8 @@ object TestingUtils {
     * Additional prefix for the Actor names can be added.
     *
     * @param actorSystem ActorSystem to use
-    * @param executor to run the JobManager's futures
+    * @param futureExecutor to run the JobManager's futures
+    * @param ioExecutor to run blocking io operations
     * @param configuration Configuration to use
     * @param jobManagerClass JobManager class to instantiate
     * @param prefix The prefix to use for the Actor names
@@ -378,7 +387,8 @@ object TestingUtils {
     */
   def createJobManager(
       actorSystem: ActorSystem,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       configuration: Configuration,
       jobManagerClass: Class[_ <: JobManager],
       prefix: String)
@@ -389,7 +399,8 @@ object TestingUtils {
       val (actor, _) = JobManager.startJobManagerActors(
         configuration,
         actorSystem,
-        executor,
+        futureExecutor,
+        ioExecutor,
         Some(prefix + JobManager.JOB_MANAGER_NAME),
         Some(prefix + JobManager.ARCHIVE_NAME),
         jobManagerClass,

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index f2a4c5c..65af576 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -103,7 +103,8 @@ class ForkableFlinkMiniCluster(
     val (jobManager, _) = JobManager.startJobManagerActors(
       config,
       actorSystem,
-      executor,
+      futureExecutor,
+      ioExecutor,
       Some(jobManagerName),
       Some(archiveName),
       classOf[TestingJobManager],

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index af86983..0ff2e78 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -130,6 +130,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 				jmConfig,
 				jmActorSystem,
 				jmActorSystem.dispatcher(),
+				jmActorSystem.dispatcher(),
 				JobManager.class,
 				MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index f72ef34..8243e97 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -105,6 +105,7 @@ public class ProcessFailureCancelingITCase {
 				jmConfig,
 				jmActorSystem,
 				jmActorSystem.dispatcher(),
+				jmActorSystem.dispatcher(),
 				JobManager.class,
 				MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index eb00992..70894b0 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -180,10 +180,14 @@ public class YarnApplicationMasterRunner {
 
 		int numberProcessors = Hardware.getNumberCPUCores();
 
-		final ExecutorService executor = Executors.newFixedThreadPool(
+		final ExecutorService futureExecutor = Executors.newFixedThreadPool(
 			numberProcessors,
 			new NamedThreadFactory("yarn-jobmanager-future-", "-thread-"));
 
+		final ExecutorService ioExecutor = Executors.newFixedThreadPool(
+			numberProcessors,
+			new NamedThreadFactory("yarn-jobmanager-io-", "-thread-"));
+
 		try {
 			// ------- (1) load and parse / validate all configurations -------
 
@@ -289,7 +293,8 @@ public class YarnApplicationMasterRunner {
 			ActorRef jobManager = JobManager.startJobManagerActors(
 				config,
 				actorSystem,
-				executor,
+				futureExecutor,
+				ioExecutor,
 				new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
 				scala.Option.<String>empty(),
 				getJobManagerClass(),
@@ -377,7 +382,8 @@ public class YarnApplicationMasterRunner {
 			}
 		}
 
-		executor.shutdownNow();
+		futureExecutor.shutdownNow();
+		ioExecutor.shutdownNow();
 
 		return 0;
 	}