You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/22 08:48:45 UTC

[1/4] flink git commit: [FLINK-5082] Pull ExecutorService lifecycle management out of the JobManager

Repository: flink
Updated Branches:
  refs/heads/release-1.1 68585d145 -> cf4b22127


[FLINK-5082] Pull ExecutorService lifecycle management out of the JobManager

The provided ExecutorService will no longer be closed by the JobManager. Instead the
lifecycle is managed outside of it where it was created. This will give a nicer behaviour,
because it better seperates responsibilities.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7fb71c5b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7fb71c5b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7fb71c5b

Branch: refs/heads/release-1.1
Commit: 7fb71c5bf1aab85250bf29bd0ea0654079cea48f
Parents: 68585d1
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Nov 16 18:33:54 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Nov 21 15:53:19 2016 +0100

----------------------------------------------------------------------
 .../BackPressureStatsTrackerITCase.java         |  2 +-
 .../StackTraceSampleCoordinatorITCase.java      |  2 +-
 .../webmonitor/WebRuntimeMonitorITCase.java     |  1 +
 .../flink/runtime/util/NamedThreadFactory.java  | 58 ++++++++++++++++
 .../flink/runtime/jobmanager/JobManager.scala   | 71 +++++++++++--------
 .../runtime/minicluster/FlinkMiniCluster.scala  | 25 ++++---
 .../minicluster/LocalFlinkMiniCluster.scala     |  1 +
 .../testingUtils/TestingJobManager.scala        |  8 +--
 .../runtime/jobmanager/JobManagerTest.java      | 26 +++----
 .../flink/runtime/jobmanager/JobSubmitTest.java |  7 +-
 .../resourcemanager/ClusterShutdownITCase.java  |  4 +-
 .../resourcemanager/ResourceManagerITCase.java  |  4 +-
 ...askManagerComponentsStartupShutdownTest.java |  1 +
 .../TaskManagerProcessReapingTestBase.java      |  1 +
 .../TaskManagerRegistrationTest.java            |  8 ++-
 .../jobmanager/JobManagerRegistrationTest.scala |  1 +
 .../runtime/testingUtils/TestingCluster.scala   |  6 +-
 .../runtime/testingUtils/TestingUtils.scala     | 72 +++++---------------
 .../test/util/ForkableFlinkMiniCluster.scala    |  1 +
 ...ctTaskManagerProcessFailureRecoveryTest.java |  1 +
 .../recovery/ProcessFailureCancelingITCase.java |  1 +
 .../flink/yarn/TestingYarnJobManager.scala      |  8 +--
 .../flink/yarn/YarnApplicationMasterRunner.java | 19 +++++-
 .../org/apache/flink/yarn/YarnJobManager.scala  | 15 ++--
 24 files changed, 205 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
index 25dc189..9fbbd90 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -120,7 +120,7 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
 			}
 
 			try {
-				jobManger = TestingUtils.createJobManager(testActorSystem, new Configuration());
+				jobManger = TestingUtils.createJobManager(testActorSystem, testActorSystem.dispatcher(), new Configuration());
 
 				Configuration config = new Configuration();
 				config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index 9b1f608..868dae1 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -90,7 +90,7 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger {
 			ActorGateway taskManager = null;
 
 			try {
-				jobManger = TestingUtils.createJobManager(testActorSystem, new Configuration());
+				jobManger = TestingUtils.createJobManager(testActorSystem, testActorSystem.dispatcher(), new Configuration());
 
 				Configuration config = new Configuration();
 				config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 677ff54..a6b958a 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -178,6 +178,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 				jobManager[i] = JobManager.startJobManagerActors(
 					jmConfig,
 					jobManagerSystem[i],
+					jobManagerSystem[i].dispatcher(),
 					JobManager.class,
 					MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
new file mode 100644
index 0000000..bd97963
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Thread factory which allows to specify a thread pool name and a thread name.
+ *
+ * The code is based on {@link java.util.concurrent.Executors.DefaultThreadFactory}.
+ */
+public class NamedThreadFactory implements ThreadFactory {
+	private static final AtomicInteger poolNumber = new AtomicInteger(1);
+	private final ThreadGroup group;
+	private final AtomicInteger threadNumber = new AtomicInteger(1);
+	private final String namePrefix;
+
+	public NamedThreadFactory(final String poolName, final String threadName) {
+		SecurityManager securityManager = System.getSecurityManager();
+		group = (securityManager != null) ? securityManager.getThreadGroup() :
+			Thread.currentThread().getThreadGroup();
+
+		namePrefix = poolName +
+			poolNumber.getAndIncrement() +
+			threadName;
+	}
+
+	@Override
+	public Thread newThread(Runnable runnable) {
+		Thread t = new Thread(group, runnable,
+			namePrefix + threadNumber.getAndIncrement(),
+			0);
+		if (t.isDaemon()) {
+			t.setDaemon(false);
+		}
+		if (t.getPriority() != Thread.NORM_PRIORITY) {
+			t.setPriority(Thread.NORM_PRIORITY);
+		}
+		return t;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 41218c9..c6e18e9 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -22,7 +22,7 @@ import java.io.{File, IOException}
 import java.lang.management.ManagementFactory
 import java.net._
 import java.util.UUID
-import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException}
+import java.util.concurrent.{TimeUnit, Future => _, TimeoutException => _, _}
 import javax.management.ObjectName
 
 import akka.actor.Status.Failure
@@ -50,7 +50,7 @@ import org.apache.flink.runtime.execution.SuppressRestartsException
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionGraphException, ExecutionJobVertex}
-import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager}
+import org.apache.flink.runtime.instance.{AkkaActorGateway, Hardware, InstanceManager}
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener
@@ -114,7 +114,7 @@ import scala.language.postfixOps
  */
 class JobManager(
     protected val flinkConfiguration: Configuration,
-    protected val executorService: ExecutorService,
+    protected val executor: Executor,
     protected val instanceManager: InstanceManager,
     protected val scheduler: FlinkScheduler,
     protected val libraryCacheManager: BlobLibraryCacheManager,
@@ -137,7 +137,7 @@ class JobManager(
 
   /** The extra execution context, for futures, with a custom logging reporter */
   protected val executionContext: ExecutionContext = ExecutionContext.fromExecutor(
-    executorService,
+    executor,
     (t: Throwable) => {
       if (!context.system.isTerminated) {
         log.error("Executor could not execute task", t)
@@ -277,9 +277,6 @@ class JobManager(
       case e: IOException => log.error("Could not properly shutdown the library cache manager.", e)
     }
 
-    // shut down the extra thread pool for futures
-    executorService.shutdown()
-
     // failsafe shutdown of the metrics registry
     try {
       metricsRegistry.map(_.shutdown())
@@ -2013,15 +2010,29 @@ object JobManager {
       listeningPort: Int)
     : Unit = {
 
-    val (jobManagerSystem, _, _, webMonitorOption, _) = startActorSystemAndJobManagerActors(
-      configuration,
-      executionMode,
-      listeningAddress,
-      listeningPort,
-      classOf[JobManager],
-      classOf[MemoryArchivist],
-      Option(classOf[StandaloneResourceManager])
-    )
+    val numberProcessors = Hardware.getNumberCPUCores()
+
+    val executor = Executors.newFixedThreadPool(
+      numberProcessors,
+      new NamedThreadFactory("jobmanager-future-", "-thread-"))
+
+    val (jobManagerSystem, _, _, webMonitorOption, _) = try {
+      startActorSystemAndJobManagerActors(
+        configuration,
+        executionMode,
+        listeningAddress,
+        listeningPort,
+        executor,
+        classOf[JobManager],
+        classOf[MemoryArchivist],
+        Option(classOf[StandaloneResourceManager])
+      )
+    } catch {
+      case t: Throwable =>
+          executor.shutdownNow()
+
+        throw t
+    }
 
     // block until everything is shut down
     jobManagerSystem.awaitTermination()
@@ -2035,6 +2046,8 @@ object JobManager {
             LOG.warn("Could not properly stop the web monitor.", t)
         }
     }
+
+    executor.shutdownNow()
   }
 
   /**
@@ -2142,6 +2155,7 @@ object JobManager {
     *                      additional TaskManager in the same process.
     * @param listeningAddress The hostname where the JobManager should listen for messages.
     * @param listeningPort The port where the JobManager should listen for messages
+    * @param executor to run the JobManager's futures
     * @param jobManagerClass The class of the JobManager to be started
     * @param archiveClass The class of the Archivist to be started
     * @param resourceManagerClass Optional class of resource manager if one should be started
@@ -2153,6 +2167,7 @@ object JobManager {
       executionMode: JobManagerMode,
       listeningAddress: String,
       listeningPort: Int,
+      executor: Executor,
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist],
       resourceManagerClass: Option[Class[_ <: FlinkResourceManager[_]]])
@@ -2221,6 +2236,7 @@ object JobManager {
       val (jobManager, archive) = startJobManagerActors(
         configuration,
         jobManagerSystem,
+        executor,
         jobManagerClass,
         archiveClass)
 
@@ -2424,15 +2440,16 @@ object JobManager {
    *              delayBetweenRetries, timeout)
    *
    * @param configuration The configuration from which to parse the config values.
+   * @param executor to run JobManager's futures
    * @param leaderElectionServiceOption LeaderElectionService which shall be returned if the option
    *                                    is defined
    * @return The members for a default JobManager.
    */
   def createJobManagerComponents(
       configuration: Configuration,
+      executor: Executor,
       leaderElectionServiceOption: Option[LeaderElectionService]) :
-    (ExecutorService,
-    InstanceManager,
+    (InstanceManager,
     FlinkScheduler,
     BlobLibraryCacheManager,
     RestartStrategyFactory,
@@ -2462,13 +2479,11 @@ object JobManager {
     var instanceManager: InstanceManager = null
     var scheduler: FlinkScheduler = null
     var libraryCacheManager: BlobLibraryCacheManager = null
-
-    val executorService: ExecutorService = new ForkJoinPool()
     
     try {
       blobServer = new BlobServer(configuration)
       instanceManager = new InstanceManager()
-      scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(executorService))
+      scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(executor))
       libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval)
 
       instanceManager.addInstanceListener(scheduler)
@@ -2487,7 +2502,6 @@ object JobManager {
         if (blobServer != null) {
           blobServer.shutdown()
         }
-        executorService.shutdownNow()
         
         throw t
     }
@@ -2542,8 +2556,7 @@ object JobManager {
         None
     }
 
-    (executorService,
-      instanceManager,
+    (instanceManager,
       scheduler,
       libraryCacheManager,
       restartStrategy,
@@ -2570,6 +2583,7 @@ object JobManager {
   def startJobManagerActors(
       configuration: Configuration,
       actorSystem: ActorSystem,
+      executor: Executor,
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist])
     : (ActorRef, ActorRef) = {
@@ -2577,6 +2591,7 @@ object JobManager {
     startJobManagerActors(
       configuration,
       actorSystem,
+      executor,
       Some(JOB_MANAGER_NAME),
       Some(ARCHIVE_NAME),
       jobManagerClass,
@@ -2589,6 +2604,7 @@ object JobManager {
    *
    * @param configuration The configuration for the JobManager
    * @param actorSystem The actor system running the JobManager
+   * @param executor to run JobManager's futures
    * @param jobManagerActorName Optionally the name of the JobManager actor. If none is given,
    *                          the actor will have the name generated by the actor system.
    * @param archiveActorName Optionally the name of the archive actor. If none is given,
@@ -2600,14 +2616,14 @@ object JobManager {
   def startJobManagerActors(
       configuration: Configuration,
       actorSystem: ActorSystem,
+      executor: Executor,
       jobManagerActorName: Option[String],
       archiveActorName: Option[String],
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist])
     : (ActorRef, ActorRef) = {
 
-    val (executorService: ExecutorService,
-    instanceManager,
+    val (instanceManager,
     scheduler,
     libraryCacheManager,
     restartStrategy,
@@ -2620,6 +2636,7 @@ object JobManager {
     jobRecoveryTimeout, 
     metricsRegistry) = createJobManagerComponents(
       configuration,
+      executor,
       None)
 
     val archiveProps = Props(archiveClass, archiveCount)
@@ -2633,7 +2650,7 @@ object JobManager {
     val jobManagerProps = Props(
       jobManagerClass,
       configuration,
-      executorService,
+      executor,
       instanceManager,
       scheduler,
       libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 5074b8c..271535e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -20,26 +20,23 @@ package org.apache.flink.runtime.minicluster
 
 import java.net.InetAddress
 import java.util.UUID
+import java.util.concurrent.Executors
 
 import akka.pattern.Patterns.gracefulStop
 import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
-
 import com.typesafe.config.Config
-
-import org.apache.flink.api.common.{JobID, JobExecutionResult, JobSubmissionResult}
+import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.client.{JobExecutionException, JobClient}
-import org.apache.flink.runtime.instance.{AkkaActorGateway, ActorGateway}
+import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
+import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway, Hardware}
 import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.jobmanager.RecoveryMode
-import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalService, LeaderRetrievalListener,
-StandaloneLeaderRetrievalService}
+import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService, StandaloneLeaderRetrievalService}
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
-import org.apache.flink.runtime.util.ZooKeeperUtils
-import org.apache.flink.runtime.webmonitor.{WebMonitorUtils, WebMonitor}
-
+import org.apache.flink.runtime.util.{NamedThreadFactory, ZooKeeperUtils}
+import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration.{Duration, FiniteDuration}
@@ -109,6 +106,12 @@ abstract class FlinkMiniCluster(
 
   private var isRunning = false
 
+  val executor = Executors.newFixedThreadPool(
+    Hardware.getNumberCPUCores(),
+    new NamedThreadFactory("mini-cluster-future-", "-thread-"))
+
+
+
   // --------------------------------------------------------------------------
   //                           Abstract Methods
   // --------------------------------------------------------------------------
@@ -370,6 +373,8 @@ abstract class FlinkMiniCluster(
 
     jobManagerLeaderRetrievalService.foreach(_.stop())
     isRunning = false
+
+    executor.shutdownNow
   }
 
   protected def shutdown(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 5bebd48..594997c 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -82,6 +82,7 @@ class LocalFlinkMiniCluster(
     val (jobManager, _) = JobManager.startJobManagerActors(
       config,
       system,
+      executor,
       Some(jobManagerName),
       Some(archiveName),
       classOf[JobManager],

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 16331ac..e9db9b0 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.testingUtils
 
 import akka.actor.ActorRef
-
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
@@ -33,15 +32,14 @@ import org.apache.flink.runtime.metrics.MetricRegistry
 
 import scala.concurrent.duration._
 import scala.language.postfixOps
-
-import java.util.concurrent.ExecutorService
+import java.util.concurrent.Executor
 
 /** JobManager implementation extended by testing messages
   *
   */
 class TestingJobManager(
     flinkConfiguration: Configuration,
-    executorService: ExecutorService,
+    executor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -56,7 +54,7 @@ class TestingJobManager(
     metricRegistry : Option[MetricRegistry])
   extends JobManager(
     flinkConfiguration,
-      executorService,
+    executor,
     instanceManager,
     scheduler,
     libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index d60e060..148e88f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -388,12 +388,13 @@ public class JobManagerTest extends TestLogger {
 			actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
 
 			Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
-					config,
-					actorSystem,
-					Option.apply("jm"),
-					Option.apply("arch"),
-					TestingJobManager.class,
-					TestingMemoryArchivist.class);
+				config,
+				actorSystem,
+				actorSystem.dispatcher(),
+				Option.apply("jm"),
+				Option.apply("arch"),
+				TestingJobManager.class,
+				TestingMemoryArchivist.class);
 
 			jobManager = new AkkaActorGateway(master._1(), null);
 			archiver = new AkkaActorGateway(master._2(), null);
@@ -481,12 +482,13 @@ public class JobManagerTest extends TestLogger {
 			actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
 
 			Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
-					new Configuration(),
-					actorSystem,
-					Option.apply("jm"),
-					Option.apply("arch"),
-					TestingJobManager.class,
-					TestingMemoryArchivist.class);
+				new Configuration(),
+				actorSystem,
+				actorSystem.dispatcher(),
+				Option.apply("jm"),
+				Option.apply("arch"),
+				TestingJobManager.class,
+				TestingMemoryArchivist.class);
 
 			jobManager = new AkkaActorGateway(master._1(), null);
 			archiver = new AkkaActorGateway(master._2(), null);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 536b729..42ed25b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -79,9 +79,10 @@ public class JobSubmitTest {
 		// only start JobManager (no ResourceManager)
 		JobManager.startJobManagerActors(
 				config,
-				jobManagerSystem,
-				JobManager.class,
-				MemoryArchivist.class)._1();
+			jobManagerSystem,
+			jobManagerSystem.dispatcher(),
+			JobManager.class,
+			MemoryArchivist.class)._1();
 
 		try {
 			LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
index 32c6cac..8530ce6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
@@ -72,7 +72,7 @@ public class ClusterShutdownITCase extends TestLogger {
 
 			// start job manager which doesn't shutdown the actor system
 			ActorGateway jobManager =
-				TestingUtils.createJobManager(system, config, "jobmanager1");
+				TestingUtils.createJobManager(system, system.dispatcher(), config, "jobmanager1");
 
 			// Tell the JobManager to inform us of shutdown actions
 			jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
@@ -114,7 +114,7 @@ public class ClusterShutdownITCase extends TestLogger {
 
 			// start job manager which doesn't shutdown the actor system
 			ActorGateway jobManager =
-				TestingUtils.createJobManager(system, config, "jobmanager2");
+				TestingUtils.createJobManager(system, system.dispatcher(), config, "jobmanager2");
 
 			// Tell the JobManager to inform us of shutdown actions
 			jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
index ca09634..bfc6abe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
@@ -72,7 +72,7 @@ public class ResourceManagerITCase extends TestLogger {
 		protected void run() {
 
 			ActorGateway jobManager =
-				TestingUtils.createJobManager(system, config, "ReconciliationTest");
+				TestingUtils.createJobManager(system, system.dispatcher(), config, "ReconciliationTest");
 			ActorGateway me =
 				TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
 
@@ -125,7 +125,7 @@ public class ResourceManagerITCase extends TestLogger {
 		protected void run() {
 
 			ActorGateway jobManager =
-				TestingUtils.createJobManager(system, config, "RegTest");
+				TestingUtils.createJobManager(system, system.dispatcher(), config, "RegTest");
 			ActorGateway me =
 				TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index b4c456c..46bc7a5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -80,6 +80,7 @@ public class TaskManagerComponentsStartupShutdownTest {
 			final ActorRef jobManager = JobManager.startJobManagerActors(
 				config,
 				actorSystem,
+				actorSystem.dispatcher(),
 				JobManager.class,
 				MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
index c7913f7..63c1b29 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -102,6 +102,7 @@ public abstract class TaskManagerProcessReapingTestBase {
 			ActorRef jmActor = JobManager.startJobManagerActors(
 				new Configuration(),
 				jmActorSystem,
+				jmActorSystem.dispatcher(),
 				JobManager.class,
 				MemoryArchivist.class)._1;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index e23aba7..52d500d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -112,7 +112,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 
 			try {
 				// a simple JobManager
-				jobManager = createJobManager(actorSystem, config);
+				jobManager = createJobManager(actorSystem, actorSystem.dispatcher(), config);
 				startResourceManager(config, jobManager.actor());
 
 				// start two TaskManagers. it will automatically try to register
@@ -193,8 +193,9 @@ public class TaskManagerRegistrationTest extends TestLogger {
 
 				// now start the JobManager, with the regular akka URL
 				jobManager = createJobManager(
-						actorSystem,
-						new Configuration());
+					actorSystem,
+					actorSystem.dispatcher(),
+					new Configuration());
 
 				startResourceManager(config, jobManager.actor());
 
@@ -698,6 +699,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 		return JobManager.startJobManagerActors(
 			configuration,
 			actorSystem,
+			actorSystem.dispatcher(),
 			NONE_STRING,
 			NONE_STRING,
 			JobManager.class,

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index 7feb949..b35cdb4 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -168,6 +168,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
     val (jm: ActorRef, _) = JobManager.startJobManagerActors(
       new Configuration(),
       _system,
+      _system.dispatcher,
       None,
       None,
       classOf[JobManager],

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index b4ba40b..c3f846e 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -96,8 +96,7 @@ class TestingCluster(
       config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
     }
 
-    val (executionContext,
-    instanceManager,
+    val (instanceManager,
     scheduler,
     libraryCacheManager,
     restartStrategyFactory,
@@ -110,6 +109,7 @@ class TestingCluster(
     jobRecoveryTimeout,
     metricRegistry) = JobManager.createJobManagerComponents(
       config,
+      executor,
       createLeaderElectionService())
 
     val testArchiveProps = Props(new TestingMemoryArchivist(archiveCount))
@@ -118,7 +118,7 @@ class TestingCluster(
     val jobManagerProps = Props(
       new TestingJobManager(
         configuration,
-        executionContext,
+        executor,
         instanceManager,
         scheduler,
         libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 02a0fec..576993d 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -19,26 +19,25 @@
 package org.apache.flink.runtime.testingUtils
 
 import java.util.UUID
+import java.util.concurrent.Executor
 
-import akka.actor.{Props, Kill, ActorSystem, ActorRef}
+import akka.actor.{ActorRef, ActorSystem, Kill, Props}
 import akka.pattern.ask
 import com.google.common.util.concurrent.MoreExecutors
-
 import com.typesafe.config.ConfigFactory
 import grizzled.slf4j.Logger
 import org.apache.flink.api.common.JobExecutionResult
-
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.client.JobClient
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
+import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist}
 import org.apache.flink.runtime.testutils.TestingResourceManager
 import org.apache.flink.runtime.util.LeaderRetrievalUtils
-import org.apache.flink.runtime.{LogMessages, LeaderSessionMessageFilter, FlinkActor}
+import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.instance.{AkkaActorGateway, ActorGateway}
+import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
 import org.apache.flink.runtime.taskmanager.TaskManager
@@ -304,15 +303,18 @@ object TestingUtils {
   /** Creates a testing JobManager using the default recovery mode (standalone)
     *
     * @param actorSystem The ActorSystem to use
+    * @param executor to run the JobManager's futures
     * @param configuration The Flink configuration
     * @return
     */
   def createJobManager(
       actorSystem: ActorSystem,
+      executor: Executor,
       configuration: Configuration)
     : ActorGateway = {
     createJobManager(
       actorSystem,
+      executor,
       configuration,
       classOf[TestingJobManager],
       ""
@@ -323,86 +325,43 @@ object TestingUtils {
     * Additional prefix can be supplied for the Actor system names
     *
     * @param actorSystem The ActorSystem to use
+    * @param executor to run the JobManager's futures
     * @param configuration The Flink configuration
     * @param prefix The prefix for the actor names
     * @return
     */
   def createJobManager(
       actorSystem: ActorSystem,
+      executor: Executor,
       configuration: Configuration,
       prefix: String)
     : ActorGateway = {
     createJobManager(
       actorSystem,
+      executor,
       configuration,
       classOf[TestingJobManager],
       prefix
     )
   }
 
-  def createJobManager(
-      actorSystem: ActorSystem,
-      configuration: Configuration,
-      executionContext: ExecutionContext)
-    : ActorGateway = {
-
-    val (_,
-    instanceManager,
-    scheduler,
-    libraryCacheManager,
-    restartStrategy,
-    timeout,
-    archiveCount,
-    leaderElectionService,
-    submittedJobGraphs,
-    checkpointRecoveryFactory,
-    savepointStore,
-    jobRecoveryTimeout,
-    metricsRegistry) = JobManager.createJobManagerComponents(
-      configuration,
-      None
-    )
-
-    val archiveProps = Props(classOf[TestingMemoryArchivist], archiveCount)
-
-    val archive: ActorRef = actorSystem.actorOf(archiveProps, JobManager.ARCHIVE_NAME)
-
-    val jobManagerProps = Props(
-      classOf[TestingJobManager],
-      configuration,
-      executionContext,
-      instanceManager,
-      scheduler,
-      libraryCacheManager,
-      archive,
-      restartStrategy,
-      timeout,
-      leaderElectionService,
-      submittedJobGraphs,
-      checkpointRecoveryFactory,
-      jobRecoveryTimeout,
-      metricsRegistry)
-
-    val jobManager: ActorRef = actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
-
-    new AkkaActorGateway(jobManager, null)
-  }
-
   /**
     * Creates a JobManager of the given class using the default recovery mode (standalone)
     *
     * @param actorSystem ActorSystem to use
+    * @param executor to run the JobManager's futures
     * @param configuration Configuration to use
     * @param jobManagerClass JobManager class to instantiate
     * @return
     */
   def createJobManager(
       actorSystem: ActorSystem,
+      executor: Executor,
       configuration: Configuration,
       jobManagerClass: Class[_ <: JobManager])
     : ActorGateway = {
 
-    createJobManager(actorSystem, configuration, jobManagerClass, "")
+    createJobManager(actorSystem, executor, configuration, jobManagerClass, "")
   }
 
   /**
@@ -410,6 +369,7 @@ object TestingUtils {
     * Additional prefix for the Actor names can be added.
     *
     * @param actorSystem ActorSystem to use
+    * @param executor to run the JobManager's futures
     * @param configuration Configuration to use
     * @param jobManagerClass JobManager class to instantiate
     * @param prefix The prefix to use for the Actor names
@@ -418,6 +378,7 @@ object TestingUtils {
     */
   def createJobManager(
       actorSystem: ActorSystem,
+      executor: Executor,
       configuration: Configuration,
       jobManagerClass: Class[_ <: JobManager],
       prefix: String)
@@ -428,6 +389,7 @@ object TestingUtils {
       val (actor, _) = JobManager.startJobManagerActors(
         configuration,
         actorSystem,
+        executor,
         Some(prefix + JobManager.JOB_MANAGER_NAME),
         Some(prefix + JobManager.ARCHIVE_NAME),
         jobManagerClass,

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 79c5a25..f2a4c5c 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -103,6 +103,7 @@ class ForkableFlinkMiniCluster(
     val (jobManager, _) = JobManager.startJobManagerActors(
       config,
       actorSystem,
+      executor,
       Some(jobManagerName),
       Some(archiveName),
       classOf[TestingJobManager],

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index b6eb7ba..af86983 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -129,6 +129,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 			ActorRef jmActor = JobManager.startJobManagerActors(
 				jmConfig,
 				jmActorSystem,
+				jmActorSystem.dispatcher(),
 				JobManager.class,
 				MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index b66fb5d..f72ef34 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -104,6 +104,7 @@ public class ProcessFailureCancelingITCase {
 			ActorRef jmActor = JobManager.startJobManagerActors(
 				jmConfig,
 				jmActorSystem,
+				jmActorSystem.dispatcher(),
 				JobManager.class,
 				MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index 7ca9c3e..0ed0d83 100644
--- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.yarn
 
-import java.util.concurrent.ExecutorService
+import java.util.concurrent.Executor
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
@@ -41,7 +41,7 @@ import scala.concurrent.duration.FiniteDuration
   * instead of an anonymous class with the respective mixin to obtain a more readable logger name.
   *
   * @param flinkConfiguration Configuration object for the actor
-  * @param executorService Execution context which is used to execute concurrent tasks in the
+  * @param executor Execution context which is used to execute concurrent tasks in the
   *                         [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
@@ -54,7 +54,7 @@ import scala.concurrent.duration.FiniteDuration
   */
 class TestingYarnJobManager(
     flinkConfiguration: Configuration,
-    executorService: ExecutorService,
+    executor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -69,7 +69,7 @@ class TestingYarnJobManager(
     metricRegistry : Option[MetricRegistry])
   extends YarnJobManager(
     flinkConfiguration,
-    executorService,
+    executor,
     instanceManager,
     scheduler,
     libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index d19ddde..eb00992 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.instance.Hardware;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -35,6 +36,7 @@ import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.NamedThreadFactory;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 
@@ -64,13 +66,15 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
 
 /**
  * This class is the executable entry point for the YARN application master.
- * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager}
+ * It starts actor system and the actors for {@link JobManager}
  * and {@link YarnFlinkResourceManager}.
  * 
  * The JobManager handles Flink job execution, while the YarnFlinkResourceManager handles container
@@ -174,6 +178,12 @@ public class YarnApplicationMasterRunner {
 		ActorSystem actorSystem = null;
 		WebMonitor webMonitor = null;
 
+		int numberProcessors = Hardware.getNumberCPUCores();
+
+		final ExecutorService executor = Executors.newFixedThreadPool(
+			numberProcessors,
+			new NamedThreadFactory("yarn-jobmanager-future-", "-thread-"));
+
 		try {
 			// ------- (1) load and parse / validate all configurations -------
 
@@ -277,7 +287,9 @@ public class YarnApplicationMasterRunner {
 
 			// we start the JobManager with its standard name
 			ActorRef jobManager = JobManager.startJobManagerActors(
-				config, actorSystem,
+				config,
+				actorSystem,
+				executor,
 				new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
 				scala.Option.<String>empty(),
 				getJobManagerClass(),
@@ -364,6 +376,9 @@ public class YarnApplicationMasterRunner {
 				LOG.error("Failed to stop the web frontend", t);
 			}
 		}
+
+		executor.shutdownNow();
+
 		return 0;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 94ad9f2..d7df66a 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -18,21 +18,20 @@
 
 package org.apache.flink.yarn
 
-import java.util.concurrent.{TimeUnit, ExecutorService}
+import java.util.concurrent.{Executor, TimeUnit}
 
 import akka.actor.ActorRef
-
 import org.apache.flink.api.common.JobID
-import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants}
+import org.apache.flink.configuration.{ConfigConstants, Configuration => FlinkConfiguration}
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
 import org.apache.flink.runtime.clusterframework.ApplicationStatus
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.clusterframework.messages._
 import org.apache.flink.runtime.jobgraph.JobStatus
-import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager}
+import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus, CurrentJobStatus, JobNotFound}
+import org.apache.flink.runtime.messages.JobManagerMessages.{CurrentJobStatus, JobNotFound, RequestJobStatus}
 import org.apache.flink.runtime.messages.Messages.Acknowledge
 import org.apache.flink.runtime.metrics.MetricRegistry
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
@@ -47,7 +46,7 @@ import scala.language.postfixOps
   * to start/administer/stop the Yarn session.
   *
   * @param flinkConfiguration Configuration object for the actor
-  * @param executorService Execution context which is used to execute concurrent tasks in the
+  * @param executor Execution context which is used to execute concurrent tasks in the
   *                         [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
@@ -60,7 +59,7 @@ import scala.language.postfixOps
   */
 class YarnJobManager(
     flinkConfiguration: FlinkConfiguration,
-    executorService: ExecutorService,
+    executor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -75,7 +74,7 @@ class YarnJobManager(
     metricsRegistry: Option[MetricRegistry])
   extends JobManager(
     flinkConfiguration,
-    executorService,
+    executor,
     instanceManager,
     scheduler,
     libraryCacheManager,


[4/4] flink git commit: [FLINK-5085] Execute CheckpointCoordinator's state discard calls asynchronously

Posted by tr...@apache.org.
[FLINK-5085] Execute CheckpointCoordinator's state discard calls asynchronously

The CheckpointCoordinator is now given an Executor which is used to execute the state discard
calls asynchronously. This will prevent blocking operations to be executed from within the
calling thread.

Shut down ExecutorServices gracefully


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cf4b2212
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cf4b2212
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cf4b2212

Branch: refs/heads/release-1.1
Commit: cf4b221270cff3541bea318f907f9d8207b2fa4d
Parents: f2e4c19
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Nov 17 15:39:11 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 22 09:48:00 2016 +0100

----------------------------------------------------------------------
 .../webmonitor/BackPressureStatsTracker.java    |  2 +-
 .../BackPressureStatsTrackerTest.java           |  5 +-
 .../checkpoint/CheckpointCoordinator.java       | 90 +++++++++++---------
 .../runtime/checkpoint/PendingCheckpoint.java   | 40 +++++++--
 .../savepoint/SavepointCoordinator.java         | 31 ++++---
 .../runtime/executiongraph/ExecutionGraph.java  | 66 ++++++++------
 .../runtime/executiongraph/ExecutionVertex.java |  4 +-
 .../restart/FailureRateRestartStrategy.java     |  2 +-
 .../restart/FixedDelayRestartStrategy.java      |  4 +-
 .../flink/runtime/util/ExecutorUtils.java       | 72 ++++++++++++++++
 .../flink/runtime/jobmanager/JobManager.scala   | 24 ++++--
 .../runtime/minicluster/FlinkMiniCluster.scala  | 12 ++-
 .../testingUtils/TestingJobManager.scala        |  6 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 49 +++++++----
 .../checkpoint/CheckpointStateRestoreTest.java  | 14 +--
 ...ExecutionGraphCheckpointCoordinatorTest.java |  1 +
 .../SavepointCoordinatorRestoreTest.java        | 24 +++---
 .../savepoint/SavepointCoordinatorTest.java     | 24 +++---
 .../executiongraph/AllVerticesIteratorTest.java |  2 +-
 .../ExecutionGraphConstructionTest.java         | 24 ++++--
 .../ExecutionGraphDeploymentTest.java           |  7 +-
 .../ExecutionGraphMetricsTest.java              |  6 +-
 .../ExecutionGraphRestartTest.java              | 17 ++--
 .../ExecutionGraphSignalsTest.java              |  1 +
 .../executiongraph/ExecutionGraphTestUtils.java |  6 +-
 .../ExecutionStateProgressTest.java             |  3 +-
 .../executiongraph/LocalInputSplitsTest.java    |  4 +-
 .../executiongraph/PointwisePatternTest.java    | 21 +++--
 .../TerminalStateDeadlockTest.java              |  1 +
 .../VertexLocationConstraintTest.java           | 20 +++--
 .../executiongraph/VertexSlotSharingTest.java   | 15 ++--
 .../restart/FixedDelayRestartStrategyTest.java  |  6 +-
 .../jobmanager/JobManagerHARecoveryTest.java    | 12 ++-
 .../JobManagerLeaderElectionTest.java           | 31 +++----
 .../flink/runtime/util/TestExecutors.java       | 49 +++++++++++
 .../TaskManagerLossFailsTasksTest.scala         |  1 +
 .../runtime/testingUtils/TestingCluster.scala   |  3 +-
 .../runtime/testingUtils/TestingUtils.scala     |  4 +-
 .../partitioner/RescalePartitionerTest.java     |  1 +
 .../flink/yarn/TestingYarnJobManager.scala      |  9 +-
 .../flink/yarn/YarnApplicationMasterRunner.java | 59 +++++++------
 .../org/apache/flink/yarn/YarnJobManager.scala  |  9 +-
 42 files changed, 527 insertions(+), 254 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
index f890106..aae22a1 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
@@ -165,7 +165,7 @@ public class BackPressureStatsTracker {
 			if (!pendingStats.contains(vertex) &&
 					!vertex.getGraph().getState().isGloballyTerminalState()) {
 
-				ExecutionContext executionContext = vertex.getGraph().getExecutionContext();
+				ExecutionContext executionContext = vertex.getGraph().getFutureExecutionContext();
 
 				// Only trigger if still active job
 				if (executionContext != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
index b2b0afd..2dee200 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
@@ -40,7 +40,6 @@ import java.util.concurrent.TimeUnit;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.eq;
@@ -67,7 +66,7 @@ public class BackPressureStatsTrackerTest {
 		when(graph.getState()).thenReturn(JobStatus.RUNNING);
 
 		// Same Thread execution context
-		when(graph.getExecutionContext()).thenReturn(new ExecutionContext() {
+		when(graph.getFutureExecutionContext()).thenReturn(new ExecutionContext() {
 
 			@Override
 			public void execute(Runnable runnable) {
@@ -76,7 +75,7 @@ public class BackPressureStatsTrackerTest {
 
 			@Override
 			public void reportFailure(Throwable t) {
-				fail();
+				// do nothing
 			}
 
 			@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 5c517b2..6e7b4b9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -54,6 +54,7 @@ import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.UUID;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -153,6 +154,8 @@ public class CheckpointCoordinator {
 
 	protected final int numberKeyGroups;
 
+	private final Executor executor;
+
 	// --------------------------------------------------------------------------------------------
 
 	public CheckpointCoordinator(
@@ -166,12 +169,13 @@ public class CheckpointCoordinator {
 			ClassLoader userClassLoader,
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore completedCheckpointStore,
-			RecoveryMode recoveryMode) throws Exception {
+			RecoveryMode recoveryMode,
+			Executor executor) {
 
 		this(job, baseInterval, checkpointTimeout, 0L, Integer.MAX_VALUE, numberKeyGroups,
 				tasksToTrigger, tasksToWaitFor, tasksToCommitTo,
 				userClassLoader, checkpointIDCounter, completedCheckpointStore, recoveryMode,
-				new DisabledCheckpointStatsTracker());
+				new DisabledCheckpointStatsTracker(), executor);
 	}
 
 	public CheckpointCoordinator(
@@ -188,7 +192,8 @@ public class CheckpointCoordinator {
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore completedCheckpointStore,
 			RecoveryMode recoveryMode,
-			CheckpointStatsTracker statsTracker) throws Exception {
+			CheckpointStatsTracker statsTracker,
+			Executor executor) {
 
 		// Sanity check
 		checkArgument(baseInterval > 0, "Checkpoint timeout must be larger than zero");
@@ -249,6 +254,8 @@ public class CheckpointCoordinator {
 		}
 
 		this.numberKeyGroups = numberKeyGroups;
+
+		this.executor = checkNotNull(executor);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -386,7 +393,7 @@ public class CheckpointCoordinator {
 	 * @param nextCheckpointId The checkpoint ID to use for this checkpoint or <code>-1</code> if
 	 *                         the checkpoint ID counter should be queried.
 	 */
-	public boolean triggerCheckpoint(long timestamp, long nextCheckpointId) throws Exception {
+	public boolean triggerCheckpoint(long timestamp, long nextCheckpointId) {
 		// make some eager pre-checks
 		synchronized (lock) {
 			// abort if the coordinator has been shutdown in the meantime
@@ -473,32 +480,32 @@ public class CheckpointCoordinator {
 
 		LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
 
-		final PendingCheckpoint checkpoint = new PendingCheckpoint(job, checkpointID, timestamp, ackTasks);
+		final PendingCheckpoint checkpoint = new PendingCheckpoint(
+			job,
+			checkpointID,
+			timestamp,
+			ackTasks,
+			executor);
 
 		// schedule the timer that will clean up the expired checkpoints
 		TimerTask canceller = new TimerTask() {
 			@Override
 			public void run() {
-				try {
-					synchronized (lock) {
-						// only do the work if the checkpoint is not discarded anyways
-						// note that checkpoint completion discards the pending checkpoint object
-						if (!checkpoint.isDiscarded()) {
-							LOG.info("Checkpoint " + checkpointID + " expired before completing.");
+				synchronized (lock) {
+					// only do the work if the checkpoint is not discarded anyways
+					// note that checkpoint completion discards the pending checkpoint object
+					if (!checkpoint.isDiscarded()) {
+						LOG.info("Checkpoint " + checkpointID + " expired before completing.");
 
-							checkpoint.discard(userClassLoader);
-							pendingCheckpoints.remove(checkpointID);
-							rememberRecentCheckpointId(checkpointID);
+						checkpoint.discard(userClassLoader);
+						pendingCheckpoints.remove(checkpointID);
+						rememberRecentCheckpointId(checkpointID);
 
-							onCancelCheckpoint(checkpointID);
+						onCancelCheckpoint(checkpointID);
 
-							triggerQueuedRequests();
-						}
+						triggerQueuedRequests();
 					}
 				}
-				catch (Throwable t) {
-					LOG.error("Exception while handling checkpoint timeout", t);
-				}
 			}
 		};
 
@@ -565,7 +572,7 @@ public class CheckpointCoordinator {
 	 * @return Flag indicating whether the declined checkpoint was associated
 	 * with a pending checkpoint.
 	 */
-	public boolean receiveDeclineMessage(DeclineCheckpoint message) throws Exception {
+	public boolean receiveDeclineMessage(DeclineCheckpoint message) {
 		if (shutdown || message == null) {
 			return false;
 		}
@@ -714,12 +721,7 @@ public class CheckpointCoordinator {
 								"the state handle to avoid lingering state.", message.getCheckpointId(),
 							message.getTaskExecutionId(), message.getJob());
 
-						try {
-							message.getState().deserializeValue(userClassLoader).discardState();
-						} catch (Exception e) {
-							LOG.warn("Could not properly discard state for checkpoint {} of task {} of job {}.",
-								message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), e);
-						}
+						discardState(message.getState());
 						break;
 					case DISCARDED:
 						LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
@@ -727,12 +729,7 @@ public class CheckpointCoordinator {
 								"state handle tp avoid lingering state.",
 							message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
 
-						try {
-							message.getState().deserializeValue(userClassLoader).discardState();
-						} catch (Exception e) {
-							LOG.warn("Could not properly discard state for checkpoint {} of task {} of job {}.",
-								message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), e);
-						}
+						discardState(message.getState());
 				}
 			}
 			else if (checkpoint != null) {
@@ -751,13 +748,8 @@ public class CheckpointCoordinator {
 					isPendingCheckpoint = false;
 				}
 
-				try {
-					// try to discard the state so that we don't have lingering state lying around
-					message.getState().deserializeValue(userClassLoader).discardState();
-				} catch (Exception e) {
-					LOG.warn("Could not properly discard state for checkpoint {} of task {} of job {}.",
-						message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), e);
-				}
+				// try to discard the state so that we don't have lingering state lying around
+				discardState(message.getState());
 			}
 		}
 
@@ -788,7 +780,7 @@ public class CheckpointCoordinator {
 		recentPendingCheckpoints.addLast(id);
 	}
 
-	private void dropSubsumedCheckpoints(long timestamp) throws Exception {
+	private void dropSubsumedCheckpoints(long timestamp) {
 		Iterator<Map.Entry<Long, PendingCheckpoint>> entries = pendingCheckpoints.entrySet().iterator();
 		while (entries.hasNext()) {
 			PendingCheckpoint p = entries.next().getValue();
@@ -809,7 +801,7 @@ public class CheckpointCoordinator {
 	 *
 	 * <p>NOTE: The caller of this method must hold the lock when invoking the method!
 	 */
-	private void triggerQueuedRequests() throws Exception {
+	private void triggerQueuedRequests() {
 		if (triggerRequestQueued) {
 			triggerRequestQueued = false;
 
@@ -1057,4 +1049,18 @@ public class CheckpointCoordinator {
 			}
 		}
 	}
+
+	private void discardState(final SerializedValue<StateHandle<?>> stateObject) {
+
+		executor.execute(new Runnable() {
+			@Override
+			public void run() {
+				try {
+					stateObject.deserializeValue(userClassLoader).discardState();
+				} catch (Exception e) {
+					LOG.warn("Could not properly discard state object.", e);
+				}
+			}
+		});
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index e81bb09..22ba9f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -27,8 +27,13 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.Executor;
 
 /**
  * A pending checkpoint is a checkpoint that has been started, but has not been
@@ -39,6 +44,8 @@ import java.util.Set;
  * state handles always as serialized values, never as actual values.</p>
  */
 public class PendingCheckpoint {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PendingCheckpoint.class);
 		
 	private final Object lock = new Object();
 	
@@ -55,15 +62,20 @@ public class PendingCheckpoint {
 	/** Set of acknowledged tasks */
 	private final Set<ExecutionAttemptID> acknowledgedTasks;
 
+	private final Executor executor;
+
 	private int numAcknowledgedTasks;
 	
 	private boolean discarded;
 	
 	// --------------------------------------------------------------------------------------------
 	
-	public PendingCheckpoint(JobID jobId, long checkpointId, long checkpointTimestamp,
-							Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm)
-	{
+	public PendingCheckpoint(
+			JobID jobId,
+			long checkpointId,
+			long checkpointTimestamp,
+			Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm,
+			Executor executor) {
 		if (jobId == null || verticesToConfirm == null) {
 			throw new NullPointerException();
 		}
@@ -74,6 +86,7 @@ public class PendingCheckpoint {
 		this.jobId = jobId;
 		this.checkpointId = checkpointId;
 		this.checkpointTimestamp = checkpointTimestamp;
+		this.executor = Preconditions.checkNotNull(executor);
 		
 		this.notYetAcknowledgedTasks = verticesToConfirm;
 		this.taskStates = new HashMap<>();
@@ -218,19 +231,30 @@ public class PendingCheckpoint {
 	/**
 	 * Discards the pending checkpoint, releasing all held resources.
 	 */
-	public void discard(ClassLoader userClassLoader) throws Exception {
+	public void discard(ClassLoader userClassLoader) {
 		dispose(userClassLoader, true);
 	}
 
-	private void dispose(ClassLoader userClassLoader, boolean releaseState) throws Exception {
+	private void dispose(final ClassLoader userClassLoader, boolean releaseState) {
 		synchronized (lock) {
 			discarded = true;
 			numAcknowledgedTasks = -1;
 			try {
 				if (releaseState) {
-					for (TaskState taskState : taskStates.values()) {
-						taskState.discard(userClassLoader);
-					}
+					executor.execute(new Runnable() {
+						@Override
+						public void run() {
+							try {
+								for (TaskState taskState: taskStates.values()) {
+									taskState.discard(userClassLoader);
+								}
+							} catch (Exception e) {
+								LOG.warn("Could not properly dispose the pending checkpoint " +
+									"{} of job {}.", checkpointId, jobId, e);
+							}
+						}
+					});
+
 				}
 			} finally {
 				taskStates.clear();

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java
index 2cb8636..a365795 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java
@@ -49,6 +49,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -96,22 +97,24 @@ public class SavepointCoordinator extends CheckpointCoordinator {
 			ClassLoader userClassLoader,
 			CheckpointIDCounter checkpointIDCounter,
 			SavepointStore savepointStore,
-			CheckpointStatsTracker statsTracker) throws Exception {
+			CheckpointStatsTracker statsTracker,
+			Executor executor) {
 
 		super(jobId,
-				baseInterval,
-				checkpointTimeout,
-				0L,
-				Integer.MAX_VALUE,
-				numberKeyGroups,
-				tasksToTrigger,
-				tasksToWaitFor,
-				tasksToCommitTo,
-				userClassLoader,
-				checkpointIDCounter,
-				IgnoreCheckpointsStore.INSTANCE,
-				RecoveryMode.STANDALONE,
-				statsTracker);
+			baseInterval,
+			checkpointTimeout,
+			0L,
+			Integer.MAX_VALUE,
+			numberKeyGroups,
+			tasksToTrigger,
+			tasksToWaitFor,
+			tasksToCommitTo,
+			userClassLoader,
+			checkpointIDCounter,
+			IgnoreCheckpointsStore.INSTANCE,
+			RecoveryMode.STANDALONE,
+			statsTracker,
+			executor);
 
 		this.savepointStore = checkNotNull(savepointStore);
 		this.savepointPromises = new ConcurrentHashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index de337ab..bfd93e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -59,10 +59,12 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.ExecutionContext;
+import scala.concurrent.ExecutionContext$;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
@@ -82,6 +84,7 @@ import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -221,8 +224,12 @@ public class ExecutionGraph implements Serializable {
 	private CheckpointStatsTracker checkpointStatsTracker;
 
 	/** The execution context which is used to execute futures. */
-	@SuppressWarnings("NonSerializableFieldInSerializableClass")
-	private ExecutionContext executionContext;
+	private final transient Executor futureExecutor;
+
+	private final transient ExecutionContext futureExecutionContext;
+
+	/** The executor which is used to execute blocking io operations */
+	private final transient Executor ioExecutor;
 
 	// ------ Fields that are only relevant for archived execution graphs ------------
 	private String jsonPlan;
@@ -238,7 +245,8 @@ public class ExecutionGraph implements Serializable {
 	 * This constructor is for tests only, because it does not include class loading information.
 	 */
 	ExecutionGraph(
-			ExecutionContext executionContext,
+			Executor futureExecutor,
+			Executor ioExecutor,
 			JobID jobId,
 			String jobName,
 			Configuration jobConfig,
@@ -246,7 +254,8 @@ public class ExecutionGraph implements Serializable {
 			FiniteDuration timeout,
 			RestartStrategy restartStrategy) throws IOException {
 		this(
-			executionContext,
+			futureExecutor,
+			ioExecutor,
 			jobId,
 			jobName,
 			jobConfig,
@@ -261,7 +270,8 @@ public class ExecutionGraph implements Serializable {
 	}
 
 	public ExecutionGraph(
-			ExecutionContext executionContext,
+			Executor futureExecutor,
+			Executor ioExecutor,
 			JobID jobId,
 			String jobName,
 			Configuration jobConfig,
@@ -289,6 +299,9 @@ public class ExecutionGraph implements Serializable {
 		// serialize the job information to do the serialisation work only once
 		this.serializedJobInformation = new SerializedValue<>(jobInformation);
 
+		this.futureExecutor = Preconditions.checkNotNull(futureExecutor);
+		this.futureExecutionContext = ExecutionContext$.MODULE$.fromExecutor(futureExecutor);
+		this.ioExecutor = Preconditions.checkNotNull(ioExecutor);
 
 		this.userClassLoader = userClassLoader;
 
@@ -309,8 +322,6 @@ public class ExecutionGraph implements Serializable {
 
 		metricGroup.gauge(RESTARTING_TIME_METRIC_NAME, new RestartTimeGauge());
 
-		this.executionContext = checkNotNull(executionContext);
-
 		// create a summary of all relevant data accessed in the web interface's JobConfigHandler
 		try {
 			ExecutionConfig executionConfig = serializedConfig.deserializeValue(userClassLoader);
@@ -393,20 +404,21 @@ public class ExecutionGraph implements Serializable {
 		if (interval != Long.MAX_VALUE) {
 			// create the coordinator that triggers and commits checkpoints and holds the state
 			checkpointCoordinator = new CheckpointCoordinator(
-				jobInformation.getJobId(),
-				interval,
-				checkpointTimeout,
-				minPauseBetweenCheckpoints,
-				maxConcurrentCheckpoints,
-				numberKeyGroups,
-				tasksToTrigger,
-				tasksToWaitFor,
-				tasksToCommitTo,
-				userClassLoader,
-				checkpointIDCounter,
-				checkpointStore,
-				recoveryMode,
-				checkpointStatsTracker);
+			jobInformation.getJobId(),
+			interval,
+			checkpointTimeout,
+			minPauseBetweenCheckpoints,
+			maxConcurrentCheckpoints,
+			numberKeyGroups,
+			tasksToTrigger,
+			tasksToWaitFor,
+			tasksToCommitTo,
+			userClassLoader,
+			checkpointIDCounter,
+			checkpointStore,
+			recoveryMode,
+			checkpointStatsTracker,
+			ioExecutor);
 
 			// the periodic checkpoint scheduler is activated and deactivated as a result of
 			// job status changes (running -> on, all other states -> off)
@@ -428,7 +440,8 @@ public class ExecutionGraph implements Serializable {
 			// checkpoint coordinator.
 			checkpointIDCounter,
 			savepointStore,
-			checkpointStatsTracker);
+			checkpointStatsTracker,
+			ioExecutor);
 
 		registerJobStatusListener(savepointCoordinator
 			.createActivatorDeactivator(actorSystem, leaderSessionID));
@@ -623,8 +636,12 @@ public class ExecutionGraph implements Serializable {
 	 *
 	 * @return ExecutionContext associated with this ExecutionGraph
 	 */
-	public ExecutionContext getExecutionContext() {
-		return executionContext;
+	public Executor getFutureExecutor() {
+		return futureExecutor;
+	}
+
+	public ExecutionContext getFutureExecutionContext() {
+		return futureExecutionContext;
 	}
 
 	/**
@@ -1020,7 +1037,6 @@ public class ExecutionGraph implements Serializable {
 		restartStrategy = null;
 		scheduler = null;
 		checkpointCoordinator = null;
-		executionContext = null;
 
 		for (ExecutionJobVertex vertex : verticesInCreationOrder) {
 			vertex.prepareForArchiving();

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index c101548..b48fa27 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -129,7 +129,7 @@ public class ExecutionVertex implements Serializable {
 		this.priorExecutions = new CopyOnWriteArrayList<Execution>();
 
 		this.currentExecution = new Execution(
-			getExecutionGraph().getExecutionContext(),
+			getExecutionGraph().getFutureExecutionContext(),
 			this,
 			0,
 			createTimestamp,
@@ -430,7 +430,7 @@ public class ExecutionVertex implements Serializable {
 			if (state == FINISHED || state == CANCELED || state == FAILED) {
 				priorExecutions.add(execution);
 				currentExecution = new Execution(
-					getExecutionGraph().getExecutionContext(),
+					getExecutionGraph().getFutureExecutionContext(),
 					this,
 					execution.getAttemptNumber()+1,
 					System.currentTimeMillis(),

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
index 528eacb..c6551c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
@@ -71,7 +71,7 @@ public class FailureRateRestartStrategy implements RestartStrategy {
 			restartTimestampsDeque.remove();
 		}
 		restartTimestampsDeque.add(System.currentTimeMillis());
-		future(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayInterval.toMilliseconds()), executionGraph.getExecutionContext());
+		future(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayInterval.toMilliseconds()), executionGraph.getFutureExecutionContext());
 	}
 
 	private boolean isRestartTimestampsQueueFull() {

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
index 8053e95..69fd3db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
@@ -59,7 +59,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
 	@Override
 	public void restart(final ExecutionGraph executionGraph) {
 		currentRestartAttempt++;
-		future(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayBetweenRestartAttempts), executionGraph.getExecutionContext());
+		future(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayBetweenRestartAttempts), executionGraph.getFutureExecutionContext());
 	}
 
 	/**
@@ -125,4 +125,4 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
 			return new FixedDelayRestartStrategy(maxAttempts, delay);
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorUtils.java
new file mode 100644
index 0000000..d2012a0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class ExecutorUtils {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ExecutorUtils.class);
+
+	/**
+	 * Gracefully shutdown the given {@link ExecutorService}. The call waits the given timeout that
+	 * all ExecutorServices terminate. If the ExecutorServices do not terminate in this time,
+	 * they will be shut down hard.
+	 *
+	 * @param timeout to wait for the termination of all ExecutorServices
+	 * @param unit of the timeout
+	 * @param executorServices to shut down
+	 */
+	public static void gracefulShutdown(long timeout, TimeUnit unit, ExecutorService... executorServices) {
+		for (ExecutorService executorService : executorServices) {
+			executorService.shutdown();
+		}
+
+		boolean wasInterrupted = false;
+		final long endTime = unit.toMillis(timeout) + System.currentTimeMillis();
+		long timeLeft = unit.toMillis(timeout);
+		boolean hasTimeLeft = timeLeft > 0L;
+
+		for (ExecutorService executorService : executorServices) {
+			if (wasInterrupted || !hasTimeLeft) {
+				executorService.shutdownNow();
+			} else {
+				try {
+					if (!executorService.awaitTermination(timeLeft, TimeUnit.MILLISECONDS)) {
+						LOG.warn("ExecutorService did not terminate in time. Shutting it down now.");
+						executorService.shutdownNow();
+					}
+				} catch (InterruptedException e) {
+					LOG.warn("Interrupted while shutting down executor services. Shutting all " +
+						"remaining ExecutorServices down now.", e);
+					executorService.shutdownNow();
+
+					wasInterrupted = true;
+				}
+
+				timeLeft = endTime - System.currentTimeMillis();
+				hasTimeLeft = timeLeft > 0L;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index ad998de..4a4968f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -114,7 +114,8 @@ import scala.language.postfixOps
  */
 class JobManager(
     protected val flinkConfiguration: Configuration,
-    protected val executor: Executor,
+    protected val futureExecutor: Executor,
+    protected val ioExecutor: Executor,
     protected val instanceManager: InstanceManager,
     protected val scheduler: FlinkScheduler,
     protected val libraryCacheManager: BlobLibraryCacheManager,
@@ -137,7 +138,7 @@ class JobManager(
 
   /** The extra execution context, for futures, with a custom logging reporter */
   protected val executionContext: ExecutionContext = ExecutionContext.fromExecutor(
-    executor,
+    futureExecutor,
     (t: Throwable) => {
       if (!context.system.isTerminated) {
         log.error("Executor could not execute task", t)
@@ -1106,7 +1107,8 @@ class JobManager(
             graph
           case None =>
             val graph = new ExecutionGraph(
-              executionContext,
+              futureExecutor,
+              ioExecutor,
               jobGraph.getJobID,
               jobGraph.getName,
               jobGraph.getJobConfiguration,
@@ -2018,8 +2020,9 @@ object JobManager {
 
     val ioExecutor = Executors.newFixedThreadPool(
       numberProcessors,
-      new NamedThreadFactory("jobmanager-io-", "-thread-")
-    )
+      new NamedThreadFactory("jobmanager-io-", "-thread-"))
+
+    val timeout = AkkaUtils.getTimeout(configuration)
 
     val (jobManagerSystem, _, _, webMonitorOption, _) = try {
       startActorSystemAndJobManagerActors(
@@ -2035,7 +2038,8 @@ object JobManager {
       )
     } catch {
       case t: Throwable =>
-          futureExecutor.shutdownNow()
+        futureExecutor.shutdownNow()
+        ioExecutor.shutdownNow()
 
         throw t
     }
@@ -2053,8 +2057,11 @@ object JobManager {
         }
     }
 
-    futureExecutor.shutdownNow()
-    ioExecutor.shutdownNow()
+    ExecutorUtils.gracefulShutdown(
+      timeout.toMillis,
+      TimeUnit.MILLISECONDS,
+      futureExecutor,
+      ioExecutor)
   }
 
   /**
@@ -2670,6 +2677,7 @@ object JobManager {
       jobManagerClass,
       configuration,
       futureExecutor,
+      ioExecutor,
       instanceManager,
       scheduler,
       libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 57f0a83..4136f8f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -20,12 +20,13 @@ package org.apache.flink.runtime.minicluster
 
 import java.net.InetAddress
 import java.util.UUID
-import java.util.concurrent.Executors
+import java.util.concurrent.{Executors, TimeUnit}
 
 import akka.pattern.Patterns.gracefulStop
 import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
 import com.typesafe.config.Config
+import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
@@ -35,7 +36,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.jobmanager.RecoveryMode
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService, StandaloneLeaderRetrievalService}
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
-import org.apache.flink.runtime.util.{NamedThreadFactory, ZooKeeperUtils}
+import org.apache.flink.runtime.util.{ExecutorUtils, NamedThreadFactory, ZooKeeperUtils}
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.slf4j.LoggerFactory
 
@@ -378,8 +379,11 @@ abstract class FlinkMiniCluster(
     jobManagerLeaderRetrievalService.foreach(_.stop())
     isRunning = false
 
-    futureExecutor.shutdownNow()
-    ioExecutor.shutdownNow()
+    ExecutorUtils.gracefulShutdown(
+      timeout.toMillis,
+      TimeUnit.MILLISECONDS,
+      futureExecutor,
+      ioExecutor)
   }
 
   protected def shutdown(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index e9db9b0..2023036 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -39,7 +39,8 @@ import java.util.concurrent.Executor
   */
 class TestingJobManager(
     flinkConfiguration: Configuration,
-    executor: Executor,
+    futureExecutor: Executor,
+    ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -54,7 +55,8 @@ class TestingJobManager(
     metricRegistry : Option[MetricRegistry])
   extends JobManager(
     flinkConfiguration,
-    executor,
+    futureExecutor,
+    ioExecutor,
     instanceManager,
     scheduler,
     libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index f05b5d2..35cce85 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.TestExecutors;
 import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -94,7 +95,8 @@ public class CheckpointCoordinatorTest {
 				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1, cl),
-				RecoveryMode.STANDALONE);
+				RecoveryMode.STANDALONE,
+				TestExecutors.directExecutor());
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -145,7 +147,8 @@ public class CheckpointCoordinatorTest {
 				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1, cl),
-				RecoveryMode.STANDALONE);
+				RecoveryMode.STANDALONE,
+				TestExecutors.directExecutor());
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -194,7 +197,8 @@ public class CheckpointCoordinatorTest {
 				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1, cl),
-				RecoveryMode.STANDALONE);
+				RecoveryMode.STANDALONE,
+				TestExecutors.directExecutor());
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -244,7 +248,8 @@ public class CheckpointCoordinatorTest {
 				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1, cl),
-				RecoveryMode.STANDALONE);
+				RecoveryMode.STANDALONE,
+				TestExecutors.directExecutor());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -342,7 +347,8 @@ public class CheckpointCoordinatorTest {
 				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1, cl),
-				RecoveryMode.STANDALONE);
+				RecoveryMode.STANDALONE,
+				TestExecutors.directExecutor());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -461,7 +467,8 @@ public class CheckpointCoordinatorTest {
 				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1, cl),
-				RecoveryMode.STANDALONE);
+				RecoveryMode.STANDALONE,
+				TestExecutors.directExecutor());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -610,7 +617,8 @@ public class CheckpointCoordinatorTest {
 				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2, cl),
-				RecoveryMode.STANDALONE);
+				RecoveryMode.STANDALONE,
+				TestExecutors.directExecutor());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -744,7 +752,8 @@ public class CheckpointCoordinatorTest {
 				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(10, cl),
-				RecoveryMode.STANDALONE);
+				RecoveryMode.STANDALONE,
+				TestExecutors.directExecutor());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -864,7 +873,8 @@ public class CheckpointCoordinatorTest {
 				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2, cl),
-				RecoveryMode.STANDALONE);
+				RecoveryMode.STANDALONE,
+				TestExecutors.directExecutor());
 
 			// trigger a checkpoint, partially acknowledged
 			assertTrue(coord.triggerCheckpoint(timestamp));
@@ -931,7 +941,8 @@ public class CheckpointCoordinatorTest {
 				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2, cl),
-				RecoveryMode.STANDALONE);
+				RecoveryMode.STANDALONE,
+				TestExecutors.directExecutor());
 
 			assertTrue(coord.triggerCheckpoint(timestamp));
 
@@ -992,7 +1003,8 @@ public class CheckpointCoordinatorTest {
 			cl,
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(2, cl),
-			RecoveryMode.STANDALONE);
+			RecoveryMode.STANDALONE,
+			TestExecutors.directExecutor());
 
 		assertTrue(coord.triggerCheckpoint(timestamp));
 
@@ -1121,7 +1133,8 @@ public class CheckpointCoordinatorTest {
 				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2, cl),
-				RecoveryMode.STANDALONE);
+				RecoveryMode.STANDALONE,
+				TestExecutors.directExecutor());
 
 			
 			coord.startCheckpointScheduler();
@@ -1214,7 +1227,8 @@ public class CheckpointCoordinatorTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2, cl),
 				RecoveryMode.STANDALONE,
-				new DisabledCheckpointStatsTracker());
+				new DisabledCheckpointStatsTracker(),
+				TestExecutors.directExecutor());
 
 			coord.startCheckpointScheduler();
 
@@ -1308,7 +1322,8 @@ public class CheckpointCoordinatorTest {
 				new ExecutionVertex[] { ackVertex },
 				new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter
 				(), new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE,
-				new DisabledCheckpointStatsTracker());
+				new DisabledCheckpointStatsTracker(),
+				TestExecutors.directExecutor());
 
 			coord.startCheckpointScheduler();
 
@@ -1379,7 +1394,8 @@ public class CheckpointCoordinatorTest {
 				new ExecutionVertex[] { ackVertex },
 				new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter
 				(), new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE,
-				new DisabledCheckpointStatsTracker());
+				new DisabledCheckpointStatsTracker(),
+				TestExecutors.directExecutor());
 
 			coord.startCheckpointScheduler();
 
@@ -1459,7 +1475,8 @@ public class CheckpointCoordinatorTest {
 				new ExecutionVertex[] { ackVertex },
 				new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE,
-				new DisabledCheckpointStatsTracker());
+				new DisabledCheckpointStatsTracker(),
+				TestExecutors.directExecutor());
 			
 			coord.startCheckpointScheduler();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 2b1b7e1..e535645 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.runtime.util.TestExecutors;
 import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -91,7 +92,8 @@ public class CheckpointStateRestoreTest {
 				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1, cl),
-				RecoveryMode.STANDALONE);
+				RecoveryMode.STANDALONE,
+				TestExecutors.directExecutor());
 
 			// create ourselves a checkpoint with state
 			final long timestamp = 34623786L;
@@ -168,7 +170,8 @@ public class CheckpointStateRestoreTest {
 				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1, cl),
-				RecoveryMode.STANDALONE);
+				RecoveryMode.STANDALONE,
+				TestExecutors.directExecutor());
 
 			// create ourselves a checkpoint with state
 			final long timestamp = 34623786L;
@@ -206,15 +209,16 @@ public class CheckpointStateRestoreTest {
 	public void testNoCheckpointAvailable() {
 		try {
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-				new JobID(),
+			new JobID(),
 				200000L,
 				200000L,
 				42,
 				new ExecutionVertex[] { mock(ExecutionVertex.class) },
 				new ExecutionVertex[] { mock(ExecutionVertex.class) },
 				new ExecutionVertex[0], cl,
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
+			new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE,
+			TestExecutors.directExecutor());
 
 			try {
 				coord.restoreLatestCheckpointedState(new HashMap<JobVertexID, ExecutionJobVertex>(), true, false);

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index ddbc463..c23aaf8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -126,6 +126,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 			CompletedCheckpointStore store) throws Exception {
 		ExecutionGraph executionGraph = new ExecutionGraph(
 				TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 				new JobID(),
 				"test",
 				new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorRestoreTest.java
index cd03403..43d6530 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorRestoreTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.util.TestExecutors;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -72,17 +73,18 @@ public class SavepointCoordinatorRestoreTest {
 		SavepointStore store = new HeapSavepointStore();
 
 		SavepointCoordinator coord = new SavepointCoordinator(
-				new JobID(),
-				Integer.MAX_VALUE,
-				Integer.MAX_VALUE,
-				0,
-				new ExecutionVertex[] {},
-				new ExecutionVertex[] {},
-				new ExecutionVertex[] {},
-				getClass().getClassLoader(),
-				new StandaloneCheckpointIDCounter(),
-				store,
-				new DisabledCheckpointStatsTracker());
+			new JobID(),
+			Integer.MAX_VALUE,
+			Integer.MAX_VALUE,
+			0,
+			new ExecutionVertex[] {},
+			new ExecutionVertex[] {},
+			new ExecutionVertex[] {},
+			getClass().getClassLoader(),
+			new StandaloneCheckpointIDCounter(),
+			store,
+			new DisabledCheckpointStatsTracker(),
+			TestExecutors.directExecutor());
 
 		// --- (2) Checkpoint misses state for a jobVertex (should work) ---
 		Map<JobVertexID, TaskState> checkpointTaskStates = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
index dc2b23f..dac4903 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
 import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.util.TestExecutors;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -878,17 +879,18 @@ public class SavepointCoordinatorTest extends TestLogger {
 		ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
 
 		return new SavepointCoordinator(
-				jobId,
-				checkpointTimeout,
-				checkpointTimeout,
-				42,
-				triggerVertices,
-				ackVertices,
-				commitVertices,
-				classLoader,
-				checkpointIdCounter,
-				savepointStore,
-				new DisabledCheckpointStatsTracker());
+			jobId,
+			checkpointTimeout,
+			checkpointTimeout,
+			42,
+			triggerVertices,
+			ackVertices,
+			commitVertices,
+			classLoader,
+			checkpointIdCounter,
+			savepointStore,
+			new DisabledCheckpointStatsTracker(),
+			TestExecutors.directExecutor());
 	}
 
 	private static Map<JobVertexID, ExecutionJobVertex> createExecutionJobVertexMap(

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
index 8b2c21d..a9a9d4b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
@@ -50,7 +50,7 @@ public class AllVerticesIteratorTest {
 			v4.setParallelism(2);
 			
 			ExecutionGraph eg = Mockito.mock(ExecutionGraph.class);
-			Mockito.when(eg.getExecutionContext()).thenReturn(TestingUtils.directExecutionContext());
+			Mockito.when(eg.getFutureExecutionContext()).thenReturn(TestingUtils.directExecutionContext());
 					
 			ExecutionJobVertex ejv1 = new ExecutionJobVertex(eg, v1, 1,
 					AkkaUtils.getDefaultTimeout());

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index 6f6fcd0..bf3a17c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -112,7 +112,8 @@ public class ExecutionGraphConstructionTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -161,7 +162,8 @@ public class ExecutionGraphConstructionTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -235,7 +237,8 @@ public class ExecutionGraphConstructionTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -494,7 +497,8 @@ public class ExecutionGraphConstructionTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -558,7 +562,8 @@ public class ExecutionGraphConstructionTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v5, v4));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -626,7 +631,8 @@ public class ExecutionGraphConstructionTest {
 			List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
 			ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(), 
+				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
 				jobId, 
 				jobName, 
 				cfg,
@@ -672,7 +678,8 @@ public class ExecutionGraphConstructionTest {
 			List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
 			ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(), 
+				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
 				jobId, 
 				jobName,
 				cfg,
@@ -753,7 +760,8 @@ public class ExecutionGraphConstructionTest {
 			JobGraph jg = new JobGraph(jobId, jobName, v1, v2, v3, v4, v5, v6, v7, v8);
 			
 			ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(), 
+				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
 				jobId, 
 				jobName, 
 				cfg,

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index cfbde6a..cc0ffba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -93,7 +93,8 @@ public class ExecutionGraphDeploymentTest {
 			v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
 
 			ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(), 
+				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
 				jobId, 
 				"some job", 
 				new Configuration(),
@@ -317,6 +318,7 @@ public class ExecutionGraphDeploymentTest {
 		// execution graph that executes actions synchronously
 		ExecutionGraph eg = new ExecutionGraph(
 			TestingUtils.directExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId,
 			"failing test job",
 			new Configuration(),
@@ -359,7 +361,8 @@ public class ExecutionGraphDeploymentTest {
 
 		// execution graph that executes actions synchronously
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.directExecutionContext(), 
+			TestingUtils.directExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			"some job", 
 			new Configuration(), 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 788f8b9..3e6aba5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -46,11 +46,11 @@ import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 import org.mockito.Matchers;
-import scala.concurrent.ExecutionContext$;
 import scala.concurrent.Future$;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -61,7 +61,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
@@ -135,7 +134,8 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 		TestingRestartStrategy testingRestartStrategy = new TestingRestartStrategy();
 
 		ExecutionGraph executionGraph = new ExecutionGraph(
-			ExecutionContext$.MODULE$.fromExecutor(new ForkJoinPool()),
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobGraph.getJobID(),
 			jobGraph.getName(),
 			jobConfig,

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 3041ad3..659a912 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -228,6 +228,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		// Blocking program
 		ExecutionGraph executionGraph = new ExecutionGraph(
 			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			new JobID(),
 			"TestJob",
 			new Configuration(),
@@ -539,6 +540,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		ExecutionGraph eg = new ExecutionGraph(
 			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			new JobID(),
 			"Test job",
 			new Configuration(),
@@ -670,13 +672,14 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 	private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy) throws IOException {
 		return new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				new JobID(),
-				"Test job",
-				new Configuration(),
-				new SerializedValue<>(new ExecutionConfig()),
-				AkkaUtils.getDefaultTimeout(),
-				restartStrategy);
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
+			new JobID(),
+			"Test job",
+			new Configuration(),
+			new SerializedValue<>(new ExecutionConfig()),
+			AkkaUtils.getDefaultTimeout(),
+			restartStrategy);
 	}
 
 	private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException {

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
index de4a026..fde967e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
@@ -133,6 +133,7 @@ public class ExecutionGraphSignalsTest {
 
 		eg = new ExecutionGraph(
 			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId,
 			jobName,
 			cfg,

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 903d5f9..89d8a9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.spy;
 
 import java.lang.reflect.Field;
 import java.net.InetAddress;
+import java.util.concurrent.Executor;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
@@ -167,12 +168,13 @@ public class ExecutionGraphTestUtils {
 
 	public static final String ERROR_MESSAGE = "test_failure_error_message";
 
-	public static ExecutionJobVertex getExecutionVertex(JobVertexID id, ExecutionContext executionContext) throws Exception {
+	public static ExecutionJobVertex getExecutionVertex(JobVertexID id, Executor executor) throws Exception {
 		JobVertex ajv = new JobVertex("TestVertex", id);
 		ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
 
 		ExecutionGraph graph = new ExecutionGraph(
-			executionContext, 
+			executor,
+			executor,
 			new JobID(), 
 			"test job", 
 			new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index f3743c1..955c9db 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -51,7 +51,8 @@ public class ExecutionStateProgressTest {
 			ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
 
 			ExecutionGraph graph = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(), 
+				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
 				jid, 
 				"test job", 
 				new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
index f03370c..11dad92 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
@@ -270,7 +270,8 @@ public class LocalInputSplitsTest {
 			JobGraph jobGraph = new JobGraph("test job", vertex);
 			
 			ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(), 
+				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
 				jobGraph.getJobID(),
 				jobGraph.getName(),  
 				jobGraph.getJobConfiguration(),
@@ -336,6 +337,7 @@ public class LocalInputSplitsTest {
 		
 		ExecutionGraph eg = new ExecutionGraph(
 			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobGraph.getJobID(),
 			jobGraph.getName(),  
 			jobGraph.getJobConfiguration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index 7a28b4a..0e147e3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -66,7 +66,8 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -111,7 +112,8 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -157,7 +159,8 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -204,7 +207,8 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName,
 			cfg,
@@ -249,7 +253,8 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -314,7 +319,8 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -370,7 +376,8 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index 2a690d9..504822b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -184,6 +184,7 @@ public class TerminalStateDeadlockTest {
 		TestExecGraph(JobID jobId) throws IOException {
 			super(
 				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
 				jobId,
 				"test graph",
 				EMPTY_CONFIG,

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
index d145dd3..b160561 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
@@ -81,13 +81,14 @@ public class VertexLocationConstraintTest {
 			JobGraph jg = new JobGraph("test job", jobVertex);
 			
 			ExecutionGraph eg = new ExecutionGraph(
-					TestingUtils.defaultExecutionContext(),
-					jg.getJobID(),
-					jg.getName(),
-					jg.getJobConfiguration(),
-					new SerializedValue<>(new ExecutionConfig()),
-					timeout,
-					new NoRestartStrategy());
+				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
+				jg.getJobID(),
+				jg.getName(),
+				jg.getJobConfiguration(),
+				new SerializedValue<>(new ExecutionConfig()),
+				timeout,
+				new NoRestartStrategy());
 			eg.attachJobGraph(Collections.singletonList(jobVertex));
 			
 			ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex.getID());
@@ -155,6 +156,7 @@ public class VertexLocationConstraintTest {
 			
 			ExecutionGraph eg = new ExecutionGraph(
 					TestingUtils.defaultExecutionContext(),
+					TestingUtils.defaultExecutionContext(),
 					jg.getJobID(),
 					jg.getName(),
 					jg.getJobConfiguration(),
@@ -232,6 +234,7 @@ public class VertexLocationConstraintTest {
 			
 			ExecutionGraph eg = new ExecutionGraph(
 					TestingUtils.defaultExecutionContext(),
+					TestingUtils.defaultExecutionContext(),
 					jg.getJobID(),
 					jg.getName(),
 					jg.getJobConfiguration(),
@@ -300,6 +303,7 @@ public class VertexLocationConstraintTest {
 			
 			ExecutionGraph eg = new ExecutionGraph(
 					TestingUtils.defaultExecutionContext(),
+					TestingUtils.defaultExecutionContext(),
 					jg.getJobID(),
 					jg.getName(),
 					jg.getJobConfiguration(),
@@ -370,6 +374,7 @@ public class VertexLocationConstraintTest {
 			
 			ExecutionGraph eg = new ExecutionGraph(
 					TestingUtils.defaultExecutionContext(),
+					TestingUtils.defaultExecutionContext(),
 					jg.getJobID(),
 					jg.getName(),
 					jg.getJobConfiguration(),
@@ -411,6 +416,7 @@ public class VertexLocationConstraintTest {
 			
 			ExecutionGraph eg = new ExecutionGraph(
 					TestingUtils.defaultExecutionContext(),
+					TestingUtils.defaultExecutionContext(),
 					jg.getJobID(),
 					jg.getName(),
 					jg.getJobConfiguration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
index 0c95695..27708a2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
@@ -80,13 +80,14 @@ public class VertexSlotSharingTest {
 			List<JobVertex> vertices = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 			
 			ExecutionGraph eg = new ExecutionGraph(
-					TestingUtils.defaultExecutionContext(),
-					new JobID(),
-					"test job",
-					new Configuration(),
-					new SerializedValue<>(new ExecutionConfig()),
-					AkkaUtils.getDefaultTimeout(),
-					new NoRestartStrategy());
+				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
+				new JobID(),
+				"test job",
+				new Configuration(),
+				new SerializedValue<>(new ExecutionConfig()),
+				AkkaUtils.getDefaultTimeout(),
+				new NoRestartStrategy());
 			eg.attachJobGraph(vertices);
 			
 			// verify that the vertices are all in the same slot sharing group

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
index d19060a..ceb5da4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.executiongraph.restart;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.util.TestExecutors;
 import org.junit.Test;
 import org.mockito.Mockito;
 import scala.concurrent.ExecutionContext$;
@@ -39,8 +39,8 @@ public class FixedDelayRestartStrategyTest {
 			restartDelay);
 
 		ExecutionGraph executionGraph = mock(ExecutionGraph.class);
-		when(executionGraph.getExecutionContext())
-			.thenReturn(ExecutionContext$.MODULE$.fromExecutor(MoreExecutors.directExecutor()));
+		when(executionGraph.getFutureExecutionContext())
+			.thenReturn(ExecutionContext$.MODULE$.fromExecutor(TestExecutors.directExecutor()));
 
 		while(fixedDelayRestartStrategy.canRestart()) {
 			fixedDelayRestartStrategy.restart(executionGraph);

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 2e3bace..b98f338 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -82,6 +82,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 
@@ -127,6 +128,8 @@ public class JobManagerHARecoveryTest {
 		flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, temporaryFolder.newFolder().toString());
 		flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots);
 
+		ExecutorService executor = null;
+
 		try {
 			Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 
@@ -144,10 +147,13 @@ public class JobManagerHARecoveryTest {
 				MemoryArchivist.class,
 				10), "archive");
 
+			executor = new ForkJoinPool();
+
 			Props jobManagerProps = Props.create(
 				TestingJobManager.class,
 				flinkConfiguration,
-				new ForkJoinPool(),
+				executor,
+				executor,
 				instanceManager,
 				scheduler,
 				new BlobLibraryCacheManager(new BlobServer(flinkConfiguration), 3600000),
@@ -274,6 +280,10 @@ public class JobManagerHARecoveryTest {
 			if (taskManager != null) {
 				taskManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
 			}
+
+			if (executor != null) {
+				executor.shutdownNow();
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index bdd019d..8f38b5b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -186,21 +186,22 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 		SavepointStore savepointStore = SavepointStoreFactory.createFromConfig(configuration);
 
 		return Props.create(
-				TestingJobManager.class,
-				configuration,
-				executor,
-				new InstanceManager(),
-				new Scheduler(TestingUtils.defaultExecutionContext()),
-				new BlobLibraryCacheManager(new BlobServer(configuration), 10L),
-				ActorRef.noSender(),
-				new NoRestartStrategy.NoRestartStrategyFactory(),
-				AkkaUtils.getDefaultTimeout(),
-				leaderElectionService,
-				submittedJobGraphStore,
-				checkpointRecoveryFactory,
-				savepointStore,
-				AkkaUtils.getDefaultTimeout(),
-				Option.apply(null)
+			TestingJobManager.class,
+			configuration,
+			executor,
+			executor,
+			new InstanceManager(),
+			new Scheduler(TestingUtils.defaultExecutionContext()),
+			new BlobLibraryCacheManager(new BlobServer(configuration), 10L),
+			ActorRef.noSender(),
+			new NoRestartStrategy.NoRestartStrategyFactory(),
+			AkkaUtils.getDefaultTimeout(),
+			leaderElectionService,
+			submittedJobGraphStore,
+			checkpointRecoveryFactory,
+			savepointStore,
+			AkkaUtils.getDefaultTimeout(),
+			Option.apply(null)
 		);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestExecutors.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestExecutors.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestExecutors.java
index 703593c..d2cf9e8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestExecutors.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestExecutors.java
@@ -18,10 +18,17 @@
 
 package org.apache.flink.runtime.util;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 public class TestExecutors {
 
+	private static final Logger LOG = LoggerFactory.getLogger(TestExecutors.class);
+
 	public static Executor directExecutor() {
 		return DirectExecutor.INSTANCE;
 	}
@@ -37,4 +44,46 @@ public class TestExecutors {
 			command.run();
 		}
 	}
+
+	/**
+	 * Gracefully shutdown the given {@link ExecutorService}. The call waits the given timeout that
+	 * all ExecutorServices terminate. If the ExecutorServices do not terminate in this time,
+	 * they will be shut down hard.
+	 *
+	 * @param timeout to wait for the termination of all ExecutorServices
+	 * @param unit of the timeout
+	 * @param executorServices to shut down
+	 */
+	public static void gracefulShutdown(long timeout, TimeUnit unit, ExecutorService... executorServices) {
+		for (ExecutorService executorService: executorServices) {
+			executorService.shutdown();
+		}
+
+		boolean wasInterrupted = false;
+		final long endTime = unit.toMillis(timeout) + System.currentTimeMillis();
+		long timeLeft = unit.toMillis(timeout);
+		boolean hasTimeLeft = timeLeft > 0L;
+
+		for (ExecutorService executorService: executorServices) {
+			if (wasInterrupted || !hasTimeLeft) {
+				executorService.shutdownNow();
+			} else {
+				try {
+					if (!executorService.awaitTermination(timeLeft, TimeUnit.MILLISECONDS)) {
+						LOG.warn("ExecutorService did not terminate in time. Shutting it down now.");
+						executorService.shutdownNow();
+					}
+				} catch (InterruptedException e) {
+					LOG.warn("Interrupted while shutting down executor services. Shutting all " +
+						"remaining ExecutorServices down now.", e);
+					executorService.shutdownNow();
+
+					wasInterrupted = true;
+				}
+
+				timeLeft = endTime - System.currentTimeMillis();
+				hasTimeLeft = timeLeft > 0L;
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
index c30d244..7c77ce7 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -55,6 +55,7 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers {
 
         val eg = new ExecutionGraph(
           TestingUtils.defaultExecutionContext,
+          TestingUtils.defaultExecutionContext,
           new JobID(),
           "test job",
           new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index c7c141a..f16dc1c 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -119,7 +119,8 @@ class TestingCluster(
     val jobManagerProps = Props(
       new TestingJobManager(
         configuration,
-        futureExecutor,
+      futureExecutor,
+      ioExecutor,
         instanceManager,
         scheduler,
         libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index fadef28..f400685 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -43,7 +43,7 @@ import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegistere
 import org.apache.flink.runtime.taskmanager.TaskManager
 
 import scala.concurrent.duration._
-import scala.concurrent.{Await, ExecutionContext}
+import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor}
 import scala.language.postfixOps
 
 /**
@@ -112,7 +112,7 @@ object TestingUtils {
     * @param actionQueue
     */
   class QueuedActionExecutionContext private[testingUtils] (val actionQueue: ActionQueue)
-    extends ExecutionContext {
+    extends ExecutionContextExecutor {
 
     var automaticExecution = false
 


[2/4] flink git commit: [FLINK-5073] Use Executor to run ZooKeeper callbacks in ZooKeeperStateHandleStore

Posted by tr...@apache.org.
[FLINK-5073] Use Executor to run ZooKeeper callbacks in ZooKeeperStateHandleStore

Use dedicated Executor to run ZooKeeper callbacks in ZooKeeperStateHandleStore instead
of running it in the ZooKeeper client's thread. The callback can be blocking because it
discards state which might entail deleting files from disk.

Add TestExecutors

Introduce dedicated Executor for blocking io operations


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2e4c193
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2e4c193
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2e4c193

Branch: refs/heads/release-1.1
Commit: f2e4c193e1fb6b0cf26861bc01c2f3d6bcd4d8f6
Parents: 7fb71c5
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Nov 15 22:45:04 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 22 09:47:57 2016 +0100

----------------------------------------------------------------------
 .../BackPressureStatsTrackerITCase.java         |  6 +-
 .../StackTraceSampleCoordinatorITCase.java      |  6 +-
 .../webmonitor/WebRuntimeMonitorITCase.java     |  1 +
 .../ZooKeeperCheckpointRecoveryFactory.java     | 12 +++-
 .../ZooKeeperCompletedCheckpointStore.java      |  7 ++-
 .../ZooKeeperSubmittedJobGraphStore.java        |  7 ++-
 .../flink/runtime/util/ZooKeeperUtils.java      | 22 +++++---
 .../runtime/zookeeper/StateStorageHelper.java   |  2 +-
 .../zookeeper/ZooKeeperStateHandleStore.java    | 13 ++++-
 .../flink/runtime/jobmanager/JobManager.scala   | 59 +++++++++++++-------
 .../runtime/minicluster/FlinkMiniCluster.scala  |  9 ++-
 .../minicluster/LocalFlinkMiniCluster.scala     |  3 +-
 ...ZooKeeperCompletedCheckpointStoreITCase.java |  3 +-
 .../runtime/jobmanager/JobManagerTest.java      |  4 ++
 .../flink/runtime/jobmanager/JobSubmitTest.java |  1 +
 .../ZooKeeperSubmittedJobGraphsStoreITCase.java | 15 ++---
 .../resourcemanager/ClusterShutdownITCase.java  | 14 ++++-
 .../resourcemanager/ResourceManagerITCase.java  | 14 ++++-
 ...askManagerComponentsStartupShutdownTest.java |  1 +
 .../TaskManagerProcessReapingTestBase.java      |  1 +
 .../TaskManagerRegistrationTest.java            |  8 ++-
 .../flink/runtime/util/TestExecutors.java       | 40 +++++++++++++
 .../ZooKeeperStateHandleStoreITCase.java        | 31 +++++-----
 .../jobmanager/JobManagerRegistrationTest.scala |  1 +
 .../runtime/testingUtils/TestingCluster.scala   |  5 +-
 .../runtime/testingUtils/TestingUtils.scala     | 35 ++++++++----
 .../test/util/ForkableFlinkMiniCluster.scala    |  3 +-
 ...ctTaskManagerProcessFailureRecoveryTest.java |  1 +
 .../recovery/ProcessFailureCancelingITCase.java |  1 +
 .../flink/yarn/YarnApplicationMasterRunner.java | 12 +++-
 30 files changed, 248 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
index 9fbbd90..ee6a411 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -120,7 +120,11 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
 			}
 
 			try {
-				jobManger = TestingUtils.createJobManager(testActorSystem, testActorSystem.dispatcher(), new Configuration());
+				jobManger = TestingUtils.createJobManager(
+					testActorSystem,
+					testActorSystem.dispatcher(),
+					testActorSystem.dispatcher(),
+					new Configuration());
 
 				Configuration config = new Configuration();
 				config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index 868dae1..cb13f02 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -90,7 +90,11 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger {
 			ActorGateway taskManager = null;
 
 			try {
-				jobManger = TestingUtils.createJobManager(testActorSystem, testActorSystem.dispatcher(), new Configuration());
+				jobManger = TestingUtils.createJobManager(
+					testActorSystem,
+					testActorSystem.dispatcher(),
+					testActorSystem.dispatcher(),
+					new Configuration());
 
 				Configuration config = new Configuration();
 				config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index a6b958a..6c836c8 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -179,6 +179,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 					jmConfig,
 					jobManagerSystem[i],
 					jobManagerSystem[i].dispatcher(),
+					jobManagerSystem[i].dispatcher(),
 					JobManager.class,
 					MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
index dcd6260..df713d8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
@@ -24,6 +24,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 
+import java.util.concurrent.Executor;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -35,9 +37,15 @@ public class ZooKeeperCheckpointRecoveryFactory implements CheckpointRecoveryFac
 
 	private final Configuration config;
 
-	public ZooKeeperCheckpointRecoveryFactory(CuratorFramework client, Configuration config) {
+	private final Executor executor;
+
+	public ZooKeeperCheckpointRecoveryFactory(
+			CuratorFramework client,
+			Configuration config,
+			Executor executor) {
 		this.client = checkNotNull(client, "Curator client");
 		this.config = checkNotNull(config, "Configuration");
+		this.executor = checkNotNull(executor, "Executor");
 	}
 
 	@Override
@@ -55,7 +63,7 @@ public class ZooKeeperCheckpointRecoveryFactory implements CheckpointRecoveryFac
 			throws Exception {
 
 		return ZooKeeperUtils.createCompletedCheckpoints(client, config, jobId,
-				NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, userClassLoader);
+				NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, userClassLoader, executor);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 541629d..6570d00 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -35,6 +35,7 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.ConcurrentModificationException;
 import java.util.List;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -94,6 +95,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 	 *                                       start with a '/')
 	 * @param stateStorage                   State storage to be used to persist the completed
 	 *                                       checkpoint
+	 * @param executor to give to the ZooKeeperStateHandleStore to run ZooKeeper callbacks
 	 * @throws Exception
 	 */
 	public ZooKeeperCompletedCheckpointStore(
@@ -101,7 +103,8 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 			ClassLoader userClassLoader,
 			CuratorFramework client,
 			String checkpointsPath,
-			StateStorageHelper<CompletedCheckpoint> stateStorage) throws Exception {
+			StateStorageHelper<CompletedCheckpoint> stateStorage,
+			Executor executor) throws Exception {
 
 		checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint.");
 		checkNotNull(stateStorage, "State storage");
@@ -119,7 +122,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 		// All operations will have the path as root
 		this.client = client.usingNamespace(client.getNamespace() + checkpointsPath);
 
-		this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage);
+		this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage, executor);
 
 		this.checkpointStateHandles = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 128db83..a1dd14b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -39,6 +39,7 @@ import java.util.ConcurrentModificationException;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -93,12 +94,14 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 	 * @param client ZooKeeper client
 	 * @param currentJobsPath ZooKeeper path for current job graphs
 	 * @param stateStorage State storage used to persist the submitted jobs
+	 * @param executor to give to the ZooKeeperStateHandleStore to run ZooKeeper callbacks
 	 * @throws Exception
 	 */
 	public ZooKeeperSubmittedJobGraphStore(
 			CuratorFramework client,
 			String currentJobsPath,
-			StateStorageHelper<SubmittedJobGraph> stateStorage) throws Exception {
+			StateStorageHelper<SubmittedJobGraph> stateStorage,
+			Executor executor) throws Exception {
 
 		checkNotNull(currentJobsPath, "Current jobs path");
 		checkNotNull(stateStorage, "State storage");
@@ -114,7 +117,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 		// All operations will have the path as root
 		CuratorFramework facade = client.usingNamespace(client.getNamespace() + currentJobsPath);
 
-		this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage);
+		this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage, executor);
 
 		this.pathCache = new PathChildrenCache(facade, "/", false);
 		pathCache.getListenable().addListener(new SubmittedJobGraphsPathCacheListener());

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 3986fed..472a2fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -188,11 +189,13 @@ public class ZooKeeperUtils {
 	 *
 	 * @param client        The {@link CuratorFramework} ZooKeeper client to use
 	 * @param configuration {@link Configuration} object
+	 * @param executor to run ZooKeeper callbacks
 	 * @return {@link ZooKeeperSubmittedJobGraphStore} instance
 	 */
 	public static ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphs(
 			CuratorFramework client,
-			Configuration configuration) throws Exception {
+			Configuration configuration,
+			Executor executor) throws Exception {
 
 		checkNotNull(configuration, "Configuration");
 
@@ -204,7 +207,7 @@ public class ZooKeeperUtils {
 				ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
 
 		return new ZooKeeperSubmittedJobGraphStore(
-				client, zooKeeperSubmittedJobsPath, stateStorage);
+				client, zooKeeperSubmittedJobsPath, stateStorage, executor);
 	}
 
 	/**
@@ -215,6 +218,7 @@ public class ZooKeeperUtils {
 	 * @param jobId                          ID of job to create the instance for
 	 * @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain
 	 * @param userClassLoader                User code class loader
+	 * @param executor to run ZooKeeper callbacks
 	 * @return {@link ZooKeeperCompletedCheckpointStore} instance
 	 */
 	public static CompletedCheckpointStore createCompletedCheckpoints(
@@ -222,7 +226,8 @@ public class ZooKeeperUtils {
 			Configuration configuration,
 			JobID jobId,
 			int maxNumberOfCheckpointsToRetain,
-			ClassLoader userClassLoader) throws Exception {
+			ClassLoader userClassLoader,
+			Executor executor) throws Exception {
 
 		checkNotNull(configuration, "Configuration");
 
@@ -237,11 +242,12 @@ public class ZooKeeperUtils {
 		checkpointsPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
 
 		return new ZooKeeperCompletedCheckpointStore(
-				maxNumberOfCheckpointsToRetain,
-				userClassLoader,
-				client,
-				checkpointsPath,
-				stateStorage);
+			maxNumberOfCheckpointsToRetain,
+			userClassLoader,
+			client,
+			checkpointsPath,
+			stateStorage,
+			executor);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java
index 36fb849..ce47462 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java
@@ -23,7 +23,7 @@ import org.apache.flink.runtime.state.StateHandle;
 import java.io.Serializable;
 
 /**
- * State storage helper which is used by {@link ZooKeeperStateHandleStore} to persiste state before
+ * State storage helper which is used by {@link ZooKeeperStateHandleStore} to persist state before
  * the state handle is written to ZooKeeper.
  *
  * @param <T>

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index dea3452..6576ff8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -30,10 +30,10 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -74,6 +74,8 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 
 	private final StateStorageHelper<T> storage;
 
+	private final Executor executor;
+
 	/**
 	 * Creates a {@link ZooKeeperStateHandleStore}.
 	 *
@@ -81,13 +83,18 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	 *                            expected that the client's namespace ensures that the root
 	 *                            path is exclusive for all state handles managed by this
 	 *                            instance, e.g. <code>client.usingNamespace("/stateHandles")</code>
+	 * @param storage to persist the actual state and whose returned state handle is then written
+	 *                to ZooKeeper
+	 * @param executor to run the ZooKeeper callbacks
 	 */
 	public ZooKeeperStateHandleStore(
 		CuratorFramework client,
-		StateStorageHelper storage) throws IOException {
+		StateStorageHelper storage,
+		Executor executor) {
 
 		this.client = checkNotNull(client, "Curator client");
 		this.storage = checkNotNull(storage, "State storage");
+		this.executor = checkNotNull(executor);
 	}
 
 	/**
@@ -350,7 +357,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 		checkNotNull(callback, "Background callback");
 
-		client.delete().deletingChildrenIfNeeded().inBackground(callback).forPath(pathInZooKeeper);
+		client.delete().deletingChildrenIfNeeded().inBackground(callback, executor).forPath(pathInZooKeeper);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index c6e18e9..ad998de 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2012,24 +2012,30 @@ object JobManager {
 
     val numberProcessors = Hardware.getNumberCPUCores()
 
-    val executor = Executors.newFixedThreadPool(
+    val futureExecutor = Executors.newFixedThreadPool(
       numberProcessors,
       new NamedThreadFactory("jobmanager-future-", "-thread-"))
 
+    val ioExecutor = Executors.newFixedThreadPool(
+      numberProcessors,
+      new NamedThreadFactory("jobmanager-io-", "-thread-")
+    )
+
     val (jobManagerSystem, _, _, webMonitorOption, _) = try {
       startActorSystemAndJobManagerActors(
         configuration,
         executionMode,
         listeningAddress,
         listeningPort,
-        executor,
+        futureExecutor,
+        ioExecutor,
         classOf[JobManager],
         classOf[MemoryArchivist],
         Option(classOf[StandaloneResourceManager])
       )
     } catch {
       case t: Throwable =>
-          executor.shutdownNow()
+          futureExecutor.shutdownNow()
 
         throw t
     }
@@ -2047,7 +2053,8 @@ object JobManager {
         }
     }
 
-    executor.shutdownNow()
+    futureExecutor.shutdownNow()
+    ioExecutor.shutdownNow()
   }
 
   /**
@@ -2155,7 +2162,8 @@ object JobManager {
     *                      additional TaskManager in the same process.
     * @param listeningAddress The hostname where the JobManager should listen for messages.
     * @param listeningPort The port where the JobManager should listen for messages
-    * @param executor to run the JobManager's futures
+    * @param futureExecutor to run the JobManager's futures
+    * @param ioExecutor to run blocking io operations
     * @param jobManagerClass The class of the JobManager to be started
     * @param archiveClass The class of the Archivist to be started
     * @param resourceManagerClass Optional class of resource manager if one should be started
@@ -2167,7 +2175,8 @@ object JobManager {
       executionMode: JobManagerMode,
       listeningAddress: String,
       listeningPort: Int,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist],
       resourceManagerClass: Option[Class[_ <: FlinkResourceManager[_]]])
@@ -2236,7 +2245,8 @@ object JobManager {
       val (jobManager, archive) = startJobManagerActors(
         configuration,
         jobManagerSystem,
-        executor,
+        futureExecutor,
+        ioExecutor,
         jobManagerClass,
         archiveClass)
 
@@ -2440,14 +2450,16 @@ object JobManager {
    *              delayBetweenRetries, timeout)
    *
    * @param configuration The configuration from which to parse the config values.
-   * @param executor to run JobManager's futures
+   * @param futureExecutor to run JobManager's futures
+   * @param ioExecutor to run blocking io operations
    * @param leaderElectionServiceOption LeaderElectionService which shall be returned if the option
    *                                    is defined
    * @return The members for a default JobManager.
    */
   def createJobManagerComponents(
       configuration: Configuration,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       leaderElectionServiceOption: Option[LeaderElectionService]) :
     (InstanceManager,
     FlinkScheduler,
@@ -2479,11 +2491,11 @@ object JobManager {
     var instanceManager: InstanceManager = null
     var scheduler: FlinkScheduler = null
     var libraryCacheManager: BlobLibraryCacheManager = null
-    
+
     try {
       blobServer = new BlobServer(configuration)
       instanceManager = new InstanceManager()
-      scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(executor))
+      scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(futureExecutor))
       libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval)
 
       instanceManager.addInstanceListener(scheduler)
@@ -2528,8 +2540,8 @@ object JobManager {
           }
 
           (leaderElectionService,
-            ZooKeeperUtils.createSubmittedJobGraphs(client, configuration),
-            new ZooKeeperCheckpointRecoveryFactory(client, configuration))
+            ZooKeeperUtils.createSubmittedJobGraphs(client, configuration, ioExecutor),
+            new ZooKeeperCheckpointRecoveryFactory(client, configuration, ioExecutor))
       }
 
     val savepointStore = SavepointStoreFactory.createFromConfig(configuration)
@@ -2576,14 +2588,17 @@ object JobManager {
    *
    * @param configuration The configuration for the JobManager
    * @param actorSystem The actor system running the JobManager
+   * @param futureExecutor to run JobManager's futures
+   * @param ioExecutor to run blocking io operations
    * @param jobManagerClass The class of the JobManager to be started
    * @param archiveClass The class of the MemoryArchivist to be started
-    * @return A tuple of references (JobManager Ref, Archiver Ref)
+   * @return A tuple of references (JobManager Ref, Archiver Ref)
    */
   def startJobManagerActors(
       configuration: Configuration,
       actorSystem: ActorSystem,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist])
     : (ActorRef, ActorRef) = {
@@ -2591,7 +2606,8 @@ object JobManager {
     startJobManagerActors(
       configuration,
       actorSystem,
-      executor,
+      futureExecutor,
+      ioExecutor,
       Some(JOB_MANAGER_NAME),
       Some(ARCHIVE_NAME),
       jobManagerClass,
@@ -2604,7 +2620,8 @@ object JobManager {
    *
    * @param configuration The configuration for the JobManager
    * @param actorSystem The actor system running the JobManager
-   * @param executor to run JobManager's futures
+   * @param futureExecutor to run JobManager's futures
+   * @param ioExecutor to run blocking io operations
    * @param jobManagerActorName Optionally the name of the JobManager actor. If none is given,
    *                          the actor will have the name generated by the actor system.
    * @param archiveActorName Optionally the name of the archive actor. If none is given,
@@ -2616,7 +2633,8 @@ object JobManager {
   def startJobManagerActors(
       configuration: Configuration,
       actorSystem: ActorSystem,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       jobManagerActorName: Option[String],
       archiveActorName: Option[String],
       jobManagerClass: Class[_ <: JobManager],
@@ -2636,7 +2654,8 @@ object JobManager {
     jobRecoveryTimeout, 
     metricsRegistry) = createJobManagerComponents(
       configuration,
-      executor,
+      futureExecutor,
+      ioExecutor,
       None)
 
     val archiveProps = Props(archiveClass, archiveCount)
@@ -2650,7 +2669,7 @@ object JobManager {
     val jobManagerProps = Props(
       jobManagerClass,
       configuration,
-      executor,
+      futureExecutor,
       instanceManager,
       scheduler,
       libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 271535e..57f0a83 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -106,7 +106,11 @@ abstract class FlinkMiniCluster(
 
   private var isRunning = false
 
-  val executor = Executors.newFixedThreadPool(
+  val futureExecutor = Executors.newFixedThreadPool(
+    Hardware.getNumberCPUCores(),
+    new NamedThreadFactory("mini-cluster-future-", "-thread"))
+
+  val ioExecutor = Executors.newFixedThreadPool(
     Hardware.getNumberCPUCores(),
     new NamedThreadFactory("mini-cluster-future-", "-thread-"))
 
@@ -374,7 +378,8 @@ abstract class FlinkMiniCluster(
     jobManagerLeaderRetrievalService.foreach(_.stop())
     isRunning = false
 
-    executor.shutdownNow
+    futureExecutor.shutdownNow()
+    ioExecutor.shutdownNow()
   }
 
   protected def shutdown(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 594997c..ba9639b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -82,7 +82,8 @@ class LocalFlinkMiniCluster(
     val (jobManager, _) = JobManager.startJobManagerActors(
       config,
       system,
-      executor,
+      futureExecutor,
+      ioExecutor,
       Some(jobManagerName),
       Some(archiveName),
       classOf[JobManager],

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 380ba2c..44ffbb6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.TestExecutors;
 import org.apache.flink.runtime.zookeeper.StateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.junit.AfterClass;
@@ -66,7 +67,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 			public StateHandle<CompletedCheckpoint> store(CompletedCheckpoint state) throws Exception {
 				return new LocalStateHandle<>(state);
 			}
-		});
+		}, TestExecutors.directExecutor());
 	}
 
 	// ---------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 148e88f..b56bf29 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -366,6 +366,8 @@ public class JobManagerTest extends TestLogger {
 	}
 
 	/**
+					system.dispatcher(),
+				actorSystem.dispatcher(),
 	 * Tests that we can trigger a
 	 *
 	 * @throws Exception
@@ -391,6 +393,7 @@ public class JobManagerTest extends TestLogger {
 				config,
 				actorSystem,
 				actorSystem.dispatcher(),
+				actorSystem.dispatcher(),
 				Option.apply("jm"),
 				Option.apply("arch"),
 				TestingJobManager.class,
@@ -485,6 +488,7 @@ public class JobManagerTest extends TestLogger {
 				new Configuration(),
 				actorSystem,
 				actorSystem.dispatcher(),
+				actorSystem.dispatcher(),
 				Option.apply("jm"),
 				Option.apply("arch"),
 				TestingJobManager.class,

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 42ed25b..31ecb46 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -81,6 +81,7 @@ public class JobSubmitTest {
 				config,
 			jobManagerSystem,
 			jobManagerSystem.dispatcher(),
+			jobManagerSystem.dispatcher(),
 			JobManager.class,
 			MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index c71bd35..8eaecd0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.jobmanager;
 
 import akka.actor.ActorRef;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -27,6 +26,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener;
 import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.TestExecutors;
 import org.apache.flink.runtime.zookeeper.StateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.apache.flink.util.TestLogger;
@@ -82,7 +82,8 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
 			ZooKeeper.createClient(),
 			"/testPutAndRemoveJobGraph",
-			localStateStorage);
+			localStateStorage,
+			TestExecutors.directExecutor());
 
 		try {
 			SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
@@ -134,7 +135,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 	@Test
 	public void testRecoverJobGraphs() throws Exception {
 		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-				ZooKeeper.createClient(), "/testRecoverJobGraphs", localStateStorage);
+				ZooKeeper.createClient(), "/testRecoverJobGraphs", localStateStorage, TestExecutors.directExecutor());
 
 		try {
 			SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
@@ -184,10 +185,10 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 
 		try {
 			jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage);
+					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage, TestExecutors.directExecutor());
 
 			otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
-					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage);
+					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage, TestExecutors.directExecutor());
 
 
 			SubmittedJobGraph jobGraph = createSubmittedJobGraph(new JobID(), 0);
@@ -243,10 +244,10 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 	@Test(expected = IllegalStateException.class)
 	public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception {
 		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
+				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage, TestExecutors.directExecutor());
 
 		ZooKeeperSubmittedJobGraphStore otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
-				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
+				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage, TestExecutors.directExecutor());
 
 		jobGraphs.start(null);
 		otherJobGraphs.start(null);

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
index 8530ce6..fa7841a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
@@ -72,7 +72,12 @@ public class ClusterShutdownITCase extends TestLogger {
 
 			// start job manager which doesn't shutdown the actor system
 			ActorGateway jobManager =
-				TestingUtils.createJobManager(system, system.dispatcher(), config, "jobmanager1");
+				TestingUtils.createJobManager(
+					system,
+					system.dispatcher(),
+					system.dispatcher(),
+					config,
+					"jobmanager1");
 
 			// Tell the JobManager to inform us of shutdown actions
 			jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
@@ -114,7 +119,12 @@ public class ClusterShutdownITCase extends TestLogger {
 
 			// start job manager which doesn't shutdown the actor system
 			ActorGateway jobManager =
-				TestingUtils.createJobManager(system, system.dispatcher(), config, "jobmanager2");
+				TestingUtils.createJobManager(
+					system,
+					system.dispatcher(),
+					system.dispatcher(),
+					config,
+					"jobmanager2");
 
 			// Tell the JobManager to inform us of shutdown actions
 			jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
index bfc6abe..3a8a200 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
@@ -72,7 +72,12 @@ public class ResourceManagerITCase extends TestLogger {
 		protected void run() {
 
 			ActorGateway jobManager =
-				TestingUtils.createJobManager(system, system.dispatcher(), config, "ReconciliationTest");
+				TestingUtils.createJobManager(
+					system,
+					system.dispatcher(),
+					system.dispatcher(),
+					config,
+					"ReconciliationTest");
 			ActorGateway me =
 				TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
 
@@ -125,7 +130,12 @@ public class ResourceManagerITCase extends TestLogger {
 		protected void run() {
 
 			ActorGateway jobManager =
-				TestingUtils.createJobManager(system, system.dispatcher(), config, "RegTest");
+				TestingUtils.createJobManager(
+					system,
+					system.dispatcher(),
+					system.dispatcher(),
+					config,
+					"RegTest");
 			ActorGateway me =
 				TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 46bc7a5..04f7fdb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -81,6 +81,7 @@ public class TaskManagerComponentsStartupShutdownTest {
 				config,
 				actorSystem,
 				actorSystem.dispatcher(),
+				actorSystem.dispatcher(),
 				JobManager.class,
 				MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
index 63c1b29..dead732 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -103,6 +103,7 @@ public abstract class TaskManagerProcessReapingTestBase {
 				new Configuration(),
 				jmActorSystem,
 				jmActorSystem.dispatcher(),
+				jmActorSystem.dispatcher(),
 				JobManager.class,
 				MemoryArchivist.class)._1;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 52d500d..88e549e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -112,7 +112,11 @@ public class TaskManagerRegistrationTest extends TestLogger {
 
 			try {
 				// a simple JobManager
-				jobManager = createJobManager(actorSystem, actorSystem.dispatcher(), config);
+				jobManager = createJobManager(
+					actorSystem,
+					actorSystem.dispatcher(),
+					actorSystem.dispatcher(),
+					config);
 				startResourceManager(config, jobManager.actor());
 
 				// start two TaskManagers. it will automatically try to register
@@ -195,6 +199,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 				jobManager = createJobManager(
 					actorSystem,
 					actorSystem.dispatcher(),
+					actorSystem.dispatcher(),
 					new Configuration());
 
 				startResourceManager(config, jobManager.actor());
@@ -700,6 +705,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 			configuration,
 			actorSystem,
 			actorSystem.dispatcher(),
+			actorSystem.dispatcher(),
 			NONE_STRING,
 			NONE_STRING,
 			JobManager.class,

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestExecutors.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestExecutors.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestExecutors.java
new file mode 100644
index 0000000..703593c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestExecutors.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import java.util.concurrent.Executor;
+
+public class TestExecutors {
+
+	public static Executor directExecutor() {
+		return DirectExecutor.INSTANCE;
+	}
+
+	private static final class DirectExecutor implements Executor {
+
+		public static final DirectExecutor INSTANCE = new DirectExecutor();
+
+		private DirectExecutor() {}
+
+		@Override
+		public void execute(Runnable command) {
+			command.run();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
index 7505bfc..5a2b337 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
@@ -23,6 +23,7 @@ import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.TestExecutors;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
 import org.apache.zookeeper.CreateMode;
@@ -85,7 +86,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 	public void testAdd() throws Exception {
 		LongStateStorage longStateStorage = new LongStateStorage();
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<Long>(
-				ZooKeeper.getClient(), longStateStorage);
+				ZooKeeper.getClient(), longStateStorage, TestExecutors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testAdd";
@@ -120,7 +121,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 	public void testAddWithCreateMode() throws Exception {
 		LongStateStorage longStateStorage = new LongStateStorage();
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<Long>(
-				ZooKeeper.getClient(), longStateStorage);
+				ZooKeeper.getClient(), longStateStorage, TestExecutors.directExecutor());
 
 		// Config
 		Long state = 3457347234L;
@@ -182,7 +183,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, TestExecutors.directExecutor());
 
 		ZooKeeper.getClient().create().forPath("/testAddAlreadyExistingPath");
 
@@ -201,7 +202,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		when(client.create()).thenThrow(new RuntimeException("Expected test Exception."));
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				client, stateHandleProvider);
+				client, stateHandleProvider, TestExecutors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testAddDiscardStateHandleAfterFailure";
@@ -231,7 +232,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, TestExecutors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testReplace";
@@ -270,7 +271,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		StateStorageHelper<Long> stateStorage = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateStorage);
+				ZooKeeper.getClient(), stateStorage, TestExecutors.directExecutor());
 
 		store.replace("/testReplaceNonExistingPath", 0, 1L);
 	}
@@ -287,7 +288,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		when(client.setData()).thenThrow(new RuntimeException("Expected test Exception."));
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				client, stateHandleProvider);
+				client, stateHandleProvider, TestExecutors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testReplaceDiscardStateHandleAfterFailure";
@@ -329,7 +330,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, TestExecutors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testGetAndExists";
@@ -354,7 +355,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, TestExecutors.directExecutor());
 
 		store.get("/testGetNonExistingPath");
 	}
@@ -368,7 +369,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, TestExecutors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testGetAll";
@@ -399,7 +400,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, TestExecutors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testGetAllSortedByName";
@@ -429,7 +430,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, TestExecutors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testRemove";
@@ -453,7 +454,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, TestExecutors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testRemoveWithCallback";
@@ -492,7 +493,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, TestExecutors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testDiscard";
@@ -514,7 +515,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateHandleProvider, TestExecutors.directExecutor());
 
 		// Config
 		final String pathInZooKeeper = "/testDiscardAll";

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index b35cdb4..7174bc8 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -169,6 +169,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
       new Configuration(),
       _system,
       _system.dispatcher,
+      _system.dispatcher,
       None,
       None,
       classOf[JobManager],

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index c3f846e..c7c141a 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -109,7 +109,8 @@ class TestingCluster(
     jobRecoveryTimeout,
     metricRegistry) = JobManager.createJobManagerComponents(
       config,
-      executor,
+      futureExecutor,
+      ioExecutor,
       createLeaderElectionService())
 
     val testArchiveProps = Props(new TestingMemoryArchivist(archiveCount))
@@ -118,7 +119,7 @@ class TestingCluster(
     val jobManagerProps = Props(
       new TestingJobManager(
         configuration,
-        executor,
+        futureExecutor,
         instanceManager,
         scheduler,
         libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 576993d..fadef28 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -303,18 +303,21 @@ object TestingUtils {
   /** Creates a testing JobManager using the default recovery mode (standalone)
     *
     * @param actorSystem The ActorSystem to use
-    * @param executor to run the JobManager's futures
+    * @param futureExecutor to run the JobManager's futures
+    * @param ioExecutor to run blocking io operations
     * @param configuration The Flink configuration
     * @return
     */
   def createJobManager(
       actorSystem: ActorSystem,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       configuration: Configuration)
     : ActorGateway = {
     createJobManager(
       actorSystem,
-      executor,
+      futureExecutor,
+      ioExecutor,
       configuration,
       classOf[TestingJobManager],
       ""
@@ -325,20 +328,23 @@ object TestingUtils {
     * Additional prefix can be supplied for the Actor system names
     *
     * @param actorSystem The ActorSystem to use
-    * @param executor to run the JobManager's futures
+    * @param futureExecutor to run the JobManager's futures
+    * @param ioExecutor to run blocking io operations
     * @param configuration The Flink configuration
     * @param prefix The prefix for the actor names
     * @return
     */
   def createJobManager(
       actorSystem: ActorSystem,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       configuration: Configuration,
       prefix: String)
     : ActorGateway = {
     createJobManager(
       actorSystem,
-      executor,
+      futureExecutor,
+      ioExecutor,
       configuration,
       classOf[TestingJobManager],
       prefix
@@ -349,19 +355,21 @@ object TestingUtils {
     * Creates a JobManager of the given class using the default recovery mode (standalone)
     *
     * @param actorSystem ActorSystem to use
-    * @param executor to run the JobManager's futures
+    * @param futureExecutor to run the JobManager's futures
+    * @param ioExecutor to run blocking io operations
     * @param configuration Configuration to use
     * @param jobManagerClass JobManager class to instantiate
     * @return
     */
   def createJobManager(
       actorSystem: ActorSystem,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       configuration: Configuration,
       jobManagerClass: Class[_ <: JobManager])
     : ActorGateway = {
 
-    createJobManager(actorSystem, executor, configuration, jobManagerClass, "")
+    createJobManager(actorSystem, futureExecutor, ioExecutor, configuration, jobManagerClass, "")
   }
 
   /**
@@ -369,7 +377,8 @@ object TestingUtils {
     * Additional prefix for the Actor names can be added.
     *
     * @param actorSystem ActorSystem to use
-    * @param executor to run the JobManager's futures
+    * @param futureExecutor to run the JobManager's futures
+    * @param ioExecutor to run blocking io operations
     * @param configuration Configuration to use
     * @param jobManagerClass JobManager class to instantiate
     * @param prefix The prefix to use for the Actor names
@@ -378,7 +387,8 @@ object TestingUtils {
     */
   def createJobManager(
       actorSystem: ActorSystem,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       configuration: Configuration,
       jobManagerClass: Class[_ <: JobManager],
       prefix: String)
@@ -389,7 +399,8 @@ object TestingUtils {
       val (actor, _) = JobManager.startJobManagerActors(
         configuration,
         actorSystem,
-        executor,
+        futureExecutor,
+        ioExecutor,
         Some(prefix + JobManager.JOB_MANAGER_NAME),
         Some(prefix + JobManager.ARCHIVE_NAME),
         jobManagerClass,

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index f2a4c5c..65af576 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -103,7 +103,8 @@ class ForkableFlinkMiniCluster(
     val (jobManager, _) = JobManager.startJobManagerActors(
       config,
       actorSystem,
-      executor,
+      futureExecutor,
+      ioExecutor,
       Some(jobManagerName),
       Some(archiveName),
       classOf[TestingJobManager],

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index af86983..0ff2e78 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -130,6 +130,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 				jmConfig,
 				jmActorSystem,
 				jmActorSystem.dispatcher(),
+				jmActorSystem.dispatcher(),
 				JobManager.class,
 				MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index f72ef34..8243e97 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -105,6 +105,7 @@ public class ProcessFailureCancelingITCase {
 				jmConfig,
 				jmActorSystem,
 				jmActorSystem.dispatcher(),
+				jmActorSystem.dispatcher(),
 				JobManager.class,
 				MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f2e4c193/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index eb00992..70894b0 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -180,10 +180,14 @@ public class YarnApplicationMasterRunner {
 
 		int numberProcessors = Hardware.getNumberCPUCores();
 
-		final ExecutorService executor = Executors.newFixedThreadPool(
+		final ExecutorService futureExecutor = Executors.newFixedThreadPool(
 			numberProcessors,
 			new NamedThreadFactory("yarn-jobmanager-future-", "-thread-"));
 
+		final ExecutorService ioExecutor = Executors.newFixedThreadPool(
+			numberProcessors,
+			new NamedThreadFactory("yarn-jobmanager-io-", "-thread-"));
+
 		try {
 			// ------- (1) load and parse / validate all configurations -------
 
@@ -289,7 +293,8 @@ public class YarnApplicationMasterRunner {
 			ActorRef jobManager = JobManager.startJobManagerActors(
 				config,
 				actorSystem,
-				executor,
+				futureExecutor,
+				ioExecutor,
 				new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
 				scala.Option.<String>empty(),
 				getJobManagerClass(),
@@ -377,7 +382,8 @@ public class YarnApplicationMasterRunner {
 			}
 		}
 
-		executor.shutdownNow();
+		futureExecutor.shutdownNow();
+		ioExecutor.shutdownNow();
 
 		return 0;
 	}


[3/4] flink git commit: [FLINK-5085] Execute CheckpointCoordinator's state discard calls asynchronously

Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index 8c7360a..03e3535 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -135,6 +135,7 @@ public class RescalePartitionerTest extends TestLogger {
 
 		ExecutionGraph eg = new ExecutionGraph(
 			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId,
 			jobName,
 			cfg,

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index 0ed0d83..f0622dd 100644
--- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -41,8 +41,9 @@ import scala.concurrent.duration.FiniteDuration
   * instead of an anonymous class with the respective mixin to obtain a more readable logger name.
   *
   * @param flinkConfiguration Configuration object for the actor
-  * @param executor Execution context which is used to execute concurrent tasks in the
+  * @param futureExecutor Execution context which is used to execute concurrent tasks in the
   *                         [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
+  * @param ioExecutor for blocking io operations
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
   * @param scheduler Scheduler to schedule Flink jobs
@@ -54,7 +55,8 @@ import scala.concurrent.duration.FiniteDuration
   */
 class TestingYarnJobManager(
     flinkConfiguration: Configuration,
-    executor: Executor,
+    futureExecutor: Executor,
+    ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -69,7 +71,8 @@ class TestingYarnJobManager(
     metricRegistry : Option[MetricRegistry])
   extends YarnJobManager(
     flinkConfiguration,
-    executor,
+    futureExecutor,
+    ioExecutor,
     instanceManager,
     scheduler,
     libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 70894b0..24eb45c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.ExecutorUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.NamedThreadFactory;
 import org.apache.flink.runtime.util.SignalHandler;
@@ -56,6 +57,8 @@ import org.apache.hadoop.yarn.util.Records;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.Option;
+import scala.Some;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
@@ -188,33 +191,34 @@ public class YarnApplicationMasterRunner {
 			numberProcessors,
 			new NamedThreadFactory("yarn-jobmanager-io-", "-thread-"));
 
-		try {
-			// ------- (1) load and parse / validate all configurations -------
+		// ------- (1) load and parse / validate all configurations -------
+
+		// loading all config values here has the advantage that the program fails fast, if any
+		// configuration problem occurs
 
-			// loading all config values here has the advantage that the program fails fast, if any
-			// configuration problem occurs
+		final String currDir = ENV.get(Environment.PWD.key());
+		require(currDir != null, "Current working directory variable (%s) not set", Environment.PWD.key());
 
-			final String currDir = ENV.get(Environment.PWD.key());
-			require(currDir != null, "Current working directory variable (%s) not set", Environment.PWD.key());
+		// Note that we use the "appMasterHostname" given by YARN here, to make sure
+		// we use the hostnames given by YARN consistently throughout akka.
+		// for akka "localhost" and "localhost.localdomain" are different actors.
+		final String appMasterHostname = ENV.get(Environment.NM_HOST.key());
+		require(appMasterHostname != null,
+			"ApplicationMaster hostname variable %s not set", Environment.NM_HOST.key());
 
-			// Note that we use the "appMasterHostname" given by YARN here, to make sure
-			// we use the hostnames given by YARN consistently throughout akka.
-			// for akka "localhost" and "localhost.localdomain" are different actors.
-			final String appMasterHostname = ENV.get(Environment.NM_HOST.key());
-			require(appMasterHostname != null,
-				"ApplicationMaster hostname variable %s not set", Environment.NM_HOST.key());
+		LOG.info("YARN assigned hostname for application master: {}", appMasterHostname);
 
-			LOG.info("YARN assigned hostname for application master: {}", appMasterHostname);
+		// Flink configuration
+		final Map<String, String> dynamicProperties =
+			FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
+		LOG.debug("YARN dynamic properties: {}", dynamicProperties);
 
-			// Flink configuration
-			final Map<String, String> dynamicProperties =
-				FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
-			LOG.debug("YARN dynamic properties: {}", dynamicProperties);
+		final Configuration config = createConfiguration(currDir, dynamicProperties);
 
-			final Configuration config = createConfiguration(currDir, dynamicProperties);
+		// Hadoop/Yarn configuration (loads config data automatically from classpath files)
+		final YarnConfiguration yarnConfig = new YarnConfiguration();
 
-			// Hadoop/Yarn configuration (loads config data automatically from classpath files)
-			final YarnConfiguration yarnConfig = new YarnConfiguration();
+		try {
 
 			final int taskManagerContainerMemory;
 			final int numInitialTaskManagers;
@@ -295,8 +299,8 @@ public class YarnApplicationMasterRunner {
 				actorSystem,
 				futureExecutor,
 				ioExecutor,
-				new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
-				scala.Option.<String>empty(),
+				new Some<>(JobManager.JOB_MANAGER_NAME()),
+				Option.<String>empty(),
 				getJobManagerClass(),
 				getArchivistClass())._1();
 
@@ -329,7 +333,6 @@ public class YarnApplicationMasterRunner {
 
 			ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps);
 
-
 			// 4: Process reapers
 			// The process reapers ensure that upon unexpected actor death, the process exits
 			// and does not stay lingering around unresponsive
@@ -364,6 +367,9 @@ public class YarnApplicationMasterRunner {
 				}
 			}
 
+			futureExecutor.shutdownNow();
+			ioExecutor.shutdownNow();
+
 			return INIT_ERROR_EXIT_CODE;
 		}
 
@@ -382,8 +388,11 @@ public class YarnApplicationMasterRunner {
 			}
 		}
 
-		futureExecutor.shutdownNow();
-		ioExecutor.shutdownNow();
+		ExecutorUtils.gracefulShutdown(
+			AkkaUtils.getTimeout(config).toMillis(),
+			TimeUnit.MILLISECONDS,
+			futureExecutor,
+			ioExecutor);
 
 		return 0;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index d7df66a..28615e6 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -46,8 +46,9 @@ import scala.language.postfixOps
   * to start/administer/stop the Yarn session.
   *
   * @param flinkConfiguration Configuration object for the actor
-  * @param executor Execution context which is used to execute concurrent tasks in the
+  * @param futureExecutor Execution context which is used to execute concurrent tasks in the
   *                         [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
+  * @param ioExecutor for blocking io operations
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
   * @param scheduler Scheduler to schedule Flink jobs
@@ -59,7 +60,8 @@ import scala.language.postfixOps
   */
 class YarnJobManager(
     flinkConfiguration: FlinkConfiguration,
-    executor: Executor,
+    futureExecutor: Executor,
+    ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -74,7 +76,8 @@ class YarnJobManager(
     metricsRegistry: Option[MetricRegistry])
   extends JobManager(
     flinkConfiguration,
-    executor,
+    futureExecutor,
+    ioExecutor,
     instanceManager,
     scheduler,
     libraryCacheManager,