You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/09 16:12:04 UTC

[1/2] flink git commit: [FLINK-5748] [jobmanager] Make the 'future executor' a ScheduledExecutorService

Repository: flink
Updated Branches:
  refs/heads/master 95765b6d8 -> 665c7e399


http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 12dab93..20260c7 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -18,8 +18,9 @@
 
 package org.apache.flink.runtime.testingUtils
 
-import java.util.UUID
-import java.util.concurrent.Executor
+import java.util
+import java.util.{Collections, UUID}
+import java.util.concurrent._
 
 import akka.actor.{ActorRef, ActorSystem, Kill, Props}
 import akka.pattern.ask
@@ -42,8 +43,9 @@ import org.apache.flink.runtime.testutils.TestingResourceManager
 import org.apache.flink.runtime.util.LeaderRetrievalUtils
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
 
+import scala.concurrent.duration.TimeUnit
 import scala.concurrent.duration._
-import scala.concurrent.{Await, ExecutionContext}
+import scala.concurrent.{ExecutionContextExecutor, Await, ExecutionContext}
 import scala.language.postfixOps
 
 /**
@@ -51,8 +53,10 @@ import scala.language.postfixOps
  */
 object TestingUtils {
 
-  val testConfig = ConfigFactory.parseString(getDefaultTestingActorSystemConfigString)
+  private var sharedExecutorInstance: ScheduledExecutorService = _
 
+  val testConfig = ConfigFactory.parseString(getDefaultTestingActorSystemConfigString)
+  
   val TESTING_DURATION = 2 minute
 
   val DEFAULT_AKKA_ASK_TIMEOUT = "200 s"
@@ -87,12 +91,25 @@ object TestingUtils {
     cluster
   }
 
-  /** Returns the global [[ExecutionContext]] which is a [[scala.concurrent.forkjoin.ForkJoinPool]]
-    * with a default parallelism equal to the number of available cores.
-    *
-    * @return ExecutionContext.global
+  /** 
+    * Gets the shared global testing execution context 
     */
-  def defaultExecutionContext = ExecutionContext.global
+  def defaultExecutionContext: ExecutionContextExecutor = {
+    ExecutionContext.fromExecutor(defaultExecutor)
+  }
+
+  /**
+   * Gets the shared global testing scheduled executor
+   */
+  def defaultExecutor: ScheduledExecutorService = {
+    synchronized {
+      if (sharedExecutorInstance == null || sharedExecutorInstance.isShutdown) {
+        sharedExecutorInstance = Executors.newSingleThreadScheduledExecutor()
+      }
+
+      sharedExecutorInstance
+    }
+  }
 
   /** Returns an [[ExecutionContext]] which uses the current thread to execute the runnable.
     *
@@ -108,11 +125,9 @@ object TestingUtils {
   /** [[ExecutionContext]] which queues [[Runnable]] up in an [[ActionQueue]] instead of
     * execution them. If the automatic execution mode is activated, then the [[Runnable]] are
     * executed.
-    *
-    * @param actionQueue
     */
   class QueuedActionExecutionContext private[testingUtils] (val actionQueue: ActionQueue)
-    extends ExecutionContext with Executor {
+    extends AbstractExecutorService with ExecutionContext with ScheduledExecutorService {
 
     var automaticExecution = false
 
@@ -131,18 +146,53 @@ object TestingUtils {
     override def reportFailure(t: Throwable): Unit = {
       t.printStackTrace()
     }
+
+    override def scheduleAtFixedRate(
+        command: Runnable,
+        initialDelay: Long,
+        period: Long,
+        unit: TimeUnit): ScheduledFuture[_] = {
+      throw new UnsupportedOperationException()
+    }
+
+    override def schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture[_] = {
+      throw new UnsupportedOperationException()
+    }
+
+    override def schedule[V](callable: Callable[V], delay: Long, unit: TimeUnit)
+        : ScheduledFuture[V] = {
+      throw new UnsupportedOperationException()
+    }
+
+    override def scheduleWithFixedDelay(
+        command: Runnable,
+        initialDelay: Long,
+        delay: Long,
+        unit: TimeUnit): ScheduledFuture[_] = {
+      throw new UnsupportedOperationException()
+    }
+
+    override def shutdown(): Unit = ()
+
+    override def isTerminated: Boolean = false
+
+    override def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = false
+
+    override def shutdownNow(): util.List[Runnable] = Collections.emptyList()
+
+    override def isShutdown: Boolean = false
   }
 
   /** Queue which stores [[Runnable]] */
   class ActionQueue {
     private val runnables = scala.collection.mutable.Queue[Runnable]()
 
-    def triggerNextAction {
+    def triggerNextAction() {
       val r = runnables.dequeue
       r.run()
     }
 
-    def popNextAction: Runnable = {
+    def popNextAction(): Runnable = {
       runnables.dequeue()
     }
 
@@ -309,7 +359,7 @@ object TestingUtils {
     */
   def createJobManager(
       actorSystem: ActorSystem,
-      futureExecutor: Executor,
+      futureExecutor: ScheduledExecutorService,
       ioExecutor: Executor,
       configuration: Configuration)
     : ActorGateway = {
@@ -335,7 +385,7 @@ object TestingUtils {
     */
   def createJobManager(
       actorSystem: ActorSystem,
-      futureExecutor: Executor,
+      futureExecutor: ScheduledExecutorService,
       ioExecutor: Executor,
       configuration: Configuration,
       prefix: String)
@@ -362,7 +412,7 @@ object TestingUtils {
     */
   def createJobManager(
       actorSystem: ActorSystem,
-      futureExecutor: Executor,
+      futureExecutor: ScheduledExecutorService,
       ioExecutor: Executor,
       configuration: Configuration,
       jobManagerClass: Class[_ <: JobManager])
@@ -385,7 +435,7 @@ object TestingUtils {
     */
   def createJobManager(
       actorSystem: ActorSystem,
-      futureExecutor: Executor,
+      futureExecutor: ScheduledExecutorService,
       ioExecutor: Executor,
       configuration: Configuration,
       jobManagerClass: Class[_ <: JobManager],

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index e77cbb3..3c81112 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -138,8 +138,8 @@ public class RescalePartitionerTest extends TestLogger {
 		assertEquals(2, sinkVertex.getParallelism());
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(),
-			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
 			jobId,
 			jobName,
 			cfg,

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 942b212..515570d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
@@ -131,8 +132,8 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 			ActorRef jmActor = JobManager.startJobManagerActors(
 				jmConfig,
 				jmActorSystem,
-				jmActorSystem.dispatcher(),
-				jmActorSystem.dispatcher(),
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
 				JobManager.class,
 				MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index 8243e97..dde0b9e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.util.NetUtils;
 
@@ -104,8 +105,8 @@ public class ProcessFailureCancelingITCase {
 			ActorRef jmActor = JobManager.startJobManagerActors(
 				jmConfig,
 				jmActorSystem,
-				jmActorSystem.dispatcher(),
-				jmActorSystem.dispatcher(),
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
 				JobManager.class,
 				MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index 5244124..b539961 100644
--- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.yarn
 
-import java.util.concurrent.Executor
+import java.util.concurrent.{Executor, ScheduledExecutorService}
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
@@ -54,7 +54,7 @@ import scala.concurrent.duration.FiniteDuration
   */
 class TestingYarnJobManager(
     flinkConfiguration: Configuration,
-    futureExecutor: Executor,
+    futureExecutor: ScheduledExecutorService,
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 2193174..29f1827 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -75,6 +75,7 @@ import java.util.Collections;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
@@ -227,7 +228,7 @@ public class YarnApplicationMasterRunner {
 
 		int numberProcessors = Hardware.getNumberCPUCores();
 
-		final ExecutorService futureExecutor = Executors.newFixedThreadPool(
+		final ScheduledExecutorService futureExecutor = Executors.newScheduledThreadPool(
 			numberProcessors,
 			new NamedThreadFactory("yarn-jobmanager-future-", "-thread-"));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index db4eea8..efb4801 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.yarn
 
-import java.util.concurrent.{Executor, TimeUnit}
+import java.util.concurrent.{Executor, ScheduledExecutorService, TimeUnit}
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.{ConfigConstants, Configuration => FlinkConfiguration}
@@ -54,7 +54,7 @@ import scala.language.postfixOps
   */
 class YarnJobManager(
     flinkConfiguration: FlinkConfiguration,
-    futureExecutor: Executor,
+    futureExecutor: ScheduledExecutorService,
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,


[2/2] flink git commit: [FLINK-5748] [jobmanager] Make the 'future executor' a ScheduledExecutorService

Posted by se...@apache.org.
[FLINK-5748] [jobmanager] Make the 'future executor' a ScheduledExecutorService

This closes #3289


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/665c7e39
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/665c7e39
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/665c7e39

Branch: refs/heads/master
Commit: 665c7e399928188b22a7963cc05654589d47941c
Parents: 95765b6
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 8 20:51:46 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 9 14:36:35 2017 +0100

----------------------------------------------------------------------
 .../MesosApplicationMasterRunner.java           |   5 +-
 .../clusterframework/MesosJobManager.scala      |   4 +-
 .../BackPressureStatsTrackerITCase.java         |   4 +-
 .../StackTraceSampleCoordinatorITCase.java      |   4 +-
 .../webmonitor/WebRuntimeMonitorITCase.java     |   5 +-
 .../runtime/executiongraph/ExecutionGraph.java  |  11 +-
 .../executiongraph/ExecutionGraphBuilder.java   |   6 +-
 .../runtime/jobmaster/JobManagerServices.java   |  12 ++-
 .../flink/runtime/jobmaster/JobMaster.java      |   3 +-
 .../ContaineredJobManager.scala                 |   4 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  14 +--
 .../runtime/minicluster/FlinkMiniCluster.scala  |   4 +-
 .../minicluster/LocalFlinkMiniCluster.scala     |   4 +-
 ...ExecutionGraphCheckpointCoordinatorTest.java |   4 +-
 .../clusterframework/ClusterShutdownITCase.java |  13 +--
 .../clusterframework/ResourceManagerITCase.java |   8 +-
 .../ArchivedExecutionGraphTest.java             |   4 +-
 .../ExecutionGraphConstructionTest.java         |  32 +++---
 .../ExecutionGraphDeploymentTest.java           |  13 +--
 .../ExecutionGraphMetricsTest.java              |   4 +-
 .../ExecutionGraphRestartTest.java              |  12 +--
 .../ExecutionGraphSignalsTest.java              |   4 +-
 .../executiongraph/ExecutionGraphTestUtils.java |   9 +-
 .../ExecutionStateProgressTest.java             |   4 +-
 .../ExecutionVertexCancelTest.java              |  18 ++--
 .../ExecutionVertexDeploymentTest.java          |   5 +-
 .../ExecutionVertexLocalityTest.java            |   6 +-
 .../executiongraph/LegacyJobVertexIdTest.java   |   3 +-
 .../executiongraph/PointwisePatternTest.java    |  28 ++---
 .../TerminalStateDeadlockTest.java              |   4 +-
 .../executiongraph/VertexSlotSharingTest.java   |   4 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |  24 ++---
 .../runtime/jobmanager/JobManagerTest.java      |  20 ++--
 .../flink/runtime/jobmanager/JobSubmitTest.java |   5 +-
 .../JobManagerLeaderElectionTest.java           |  17 ++-
 .../runtime/metrics/TaskManagerMetricsTest.java |   5 +-
 ...askManagerComponentsStartupShutdownTest.java |   6 +-
 .../TaskManagerProcessReapingTestBase.java      |   5 +-
 .../TaskManagerRegistrationTest.java            |  12 +--
 .../DirectScheduledExecutorService.java         | 107 +++++++++++++++++++
 .../TaskManagerLossFailsTasksTest.scala         |  14 ++-
 .../jobmanager/JobManagerRegistrationTest.scala |   8 +-
 .../runtime/testingUtils/TestingCluster.scala   |   4 +-
 .../testingUtils/TestingJobManager.scala        |   4 +-
 .../runtime/testingUtils/TestingUtils.scala     |  86 +++++++++++----
 .../partitioner/RescalePartitionerTest.java     |   4 +-
 ...ctTaskManagerProcessFailureRecoveryTest.java |   5 +-
 .../recovery/ProcessFailureCancelingITCase.java |   5 +-
 .../flink/yarn/TestingYarnJobManager.scala      |   4 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |   3 +-
 .../org/apache/flink/yarn/YarnJobManager.scala  |   4 +-
 51 files changed, 385 insertions(+), 212 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index de76d8e..5033692 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -72,6 +72,7 @@ import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkState;
@@ -195,7 +196,7 @@ public class MesosApplicationMasterRunner {
 		ActorSystem actorSystem = null;
 		WebMonitor webMonitor = null;
 		MesosArtifactServer artifactServer = null;
-		ExecutorService futureExecutor = null;
+		ScheduledExecutorService futureExecutor = null;
 		ExecutorService ioExecutor = null;
 		MesosServices mesosServices = null;
 
@@ -213,7 +214,7 @@ public class MesosApplicationMasterRunner {
 			// JM configuration
 			int numberProcessors = Hardware.getNumberCPUCores();
 
-			futureExecutor = Executors.newFixedThreadPool(
+			futureExecutor = Executors.newScheduledThreadPool(
 				numberProcessors,
 				new NamedThreadFactory("mesos-jobmanager-future-", "-thread-"));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
index 38886f8..3e7c55f 100644
--- a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
+++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.mesos.runtime.clusterframework
 
-import java.util.concurrent.Executor
+import java.util.concurrent.{Executor, ScheduledExecutorService}
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.{Configuration => FlinkConfiguration}
@@ -51,7 +51,7 @@ import scala.concurrent.duration._
   */
 class MesosJobManager(
     flinkConfiguration: FlinkConfiguration,
-    futureExecutor: Executor,
+    futureExecutor: ScheduledExecutorService,
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
index 357301a..30a86a2 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -122,8 +122,8 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
 			try {
 				jobManger = TestingUtils.createJobManager(
 					testActorSystem,
-					testActorSystem.dispatcher(),
-					testActorSystem.dispatcher(),
+					TestingUtils.defaultExecutor(),
+					TestingUtils.defaultExecutor(),
 					new Configuration());
 
 				final Configuration config = new Configuration();

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index c9fa547..a44e212 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -94,8 +94,8 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger {
 			try {
 				jobManger = TestingUtils.createJobManager(
 					testActorSystem,
-					testActorSystem.dispatcher(),
-					testActorSystem.dispatcher(),
+					TestingUtils.defaultExecutor(),
+					TestingUtils.defaultExecutor(),
 					new Configuration());
 
 				final Configuration config = new Configuration();

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index d8bd6af..8af1f46 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderelection.TestingListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.webmonitor.files.MimeTypes;
@@ -161,8 +162,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 				jobManager[i] = JobManager.startJobManagerActors(
 					jmConfig,
 					jobManagerSystem[i],
-					jobManagerSystem[i].dispatcher(),
-					jobManagerSystem[i].dispatcher(),
+					TestingUtils.defaultExecutor(),
+					TestingUtils.defaultExecutor(),
 					JobManager.class,
 					MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index da4a66e..f25120c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -78,6 +78,7 @@ import java.util.NoSuchElementException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -201,7 +202,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	private CheckpointStatsTracker checkpointStatsTracker;
 
 	/** The executor which is used to execute futures. */
-	private final Executor futureExecutor;
+	private final ScheduledExecutorService futureExecutor;
 
 	/** The executor which is used to execute blocking io operations */
 	private final Executor ioExecutor;
@@ -220,7 +221,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * This constructor is for tests only, because it does not include class loading information.
 	 */
 	ExecutionGraph(
-			Executor futureExecutor,
+			ScheduledExecutorService futureExecutor,
 			Executor ioExecutor,
 			JobID jobId,
 			String jobName,
@@ -237,15 +238,15 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			serializedConfig,
 			timeout,
 			restartStrategy,
-			new ArrayList<BlobKey>(),
-			new ArrayList<URL>(),
+			Collections.<BlobKey>emptyList(),
+			Collections.<URL>emptyList(),
 			ExecutionGraph.class.getClassLoader(),
 			new UnregisteredMetricsGroup()
 		);
 	}
 
 	public ExecutionGraph(
-			Executor futureExecutor,
+			ScheduledExecutorService futureExecutor,
 			Executor ioExecutor,
 			JobID jobId,
 			String jobName,

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 386f202..c558e43 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -44,6 +44,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -51,16 +52,17 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * Utility class to encapsulate the logic of building an {@link ExecutionGraph} from a {@link JobGraph}.
  */
 public class ExecutionGraphBuilder {
+
 	/**
 	 * Builds the ExecutionGraph from the JobGraph.
 	 * If a prior execution graph exists, the JobGraph will be attached. If no prior execution
-	 * graph exists, then the JobGraph will become attach to a new emoty execution graph.
+	 * graph exists, then the JobGraph will become attach to a new empty execution graph.
 	 */
 	public static ExecutionGraph buildGraph(
 			@Nullable ExecutionGraph prior,
 			JobGraph jobGraph,
 			Configuration jobManagerConfig,
-			Executor futureExecutor,
+			ScheduledExecutorService futureExecutor,
 			Executor ioExecutor,
 			ClassLoader classLoader,
 			CheckpointRecoveryFactory recoveryFactory,

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
index fff75d5..95500e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
@@ -27,12 +27,14 @@ import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.util.ExceptionUtils;
 
 import scala.concurrent.duration.FiniteDuration;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -41,7 +43,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class JobManagerServices {
 
-	public final ExecutorService executorService;
+	public final ScheduledExecutorService executorService;
 
 	public final BlobLibraryCacheManager libraryCacheManager;
 
@@ -50,7 +52,7 @@ public class JobManagerServices {
 	public final Time rpcAskTimeout;
 
 	public JobManagerServices(
-			ExecutorService executorService,
+			ScheduledExecutorService executorService,
 			BlobLibraryCacheManager libraryCacheManager,
 			RestartStrategyFactory restartStrategyFactory,
 			Time rpcAskTimeout) {
@@ -119,7 +121,7 @@ public class JobManagerServices {
 		}
 
 		return new JobManagerServices(
-			new ForkJoinPool(),
+			Executors.newScheduledThreadPool(Hardware.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE),
 			libraryCacheManager,
 			RestartStrategyFactory.createRestartStrategyFactory(config),
 			Time.of(timeout.length(), timeout.unit()));

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index d6bcf2c..a318657 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -103,6 +103,7 @@ import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -182,7 +183,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			Configuration configuration,
 			RpcService rpcService,
 			HighAvailabilityServices highAvailabilityService,
-			ExecutorService executorService,
+			ScheduledExecutorService executorService,
 			BlobLibraryCacheManager libraryCacheManager,
 			RestartStrategyFactory restartStrategyFactory,
 			Time rpcAskTimeout,

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
index cbe80f1..cd7b363 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.clusterframework
 
-import java.util.concurrent.Executor
+import java.util.concurrent.{ScheduledExecutorService, Executor}
 
 import akka.actor.ActorRef
 import org.apache.flink.api.common.JobID
@@ -59,7 +59,7 @@ import scala.language.postfixOps
   */
 abstract class ContaineredJobManager(
     flinkConfiguration: Configuration,
-    futureExecutor: Executor,
+    futureExecutor: ScheduledExecutorService,
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 81e9fc4..d575f68 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -117,7 +117,7 @@ import scala.language.postfixOps
  */
 class JobManager(
     protected val flinkConfiguration: Configuration,
-    protected val futureExecutor: Executor,
+    protected val futureExecutor: ScheduledExecutorService,
     protected val ioExecutor: Executor,
     protected val instanceManager: InstanceManager,
     protected val scheduler: FlinkScheduler,
@@ -2021,7 +2021,7 @@ object JobManager {
 
     val numberProcessors = Hardware.getNumberCPUCores()
 
-    val futureExecutor = Executors.newFixedThreadPool(
+    val futureExecutor = Executors.newScheduledThreadPool(
       numberProcessors,
       new NamedThreadFactory("jobmanager-future-", "-thread-"))
 
@@ -2189,7 +2189,7 @@ object JobManager {
       executionMode: JobManagerMode,
       externalHostname: String,
       port: Int,
-      futureExecutor: Executor,
+      futureExecutor: ScheduledExecutorService,
       ioExecutor: Executor,
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist],
@@ -2466,7 +2466,7 @@ object JobManager {
    */
   def createJobManagerComponents(
       configuration: Configuration,
-      futureExecutor: Executor,
+      futureExecutor: ScheduledExecutorService,
       ioExecutor: Executor,
       leaderElectionServiceOption: Option[LeaderElectionService]) :
     (InstanceManager,
@@ -2601,7 +2601,7 @@ object JobManager {
   def startJobManagerActors(
       configuration: Configuration,
       actorSystem: ActorSystem,
-      futureExecutor: Executor,
+      futureExecutor: ScheduledExecutorService,
       ioExecutor: Executor,
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist])
@@ -2637,7 +2637,7 @@ object JobManager {
   def startJobManagerActors(
       configuration: Configuration,
       actorSystem: ActorSystem,
-      futureExecutor: Executor,
+      futureExecutor: ScheduledExecutorService,
       ioExecutor: Executor,
       jobManagerActorName: Option[String],
       archiveActorName: Option[String],
@@ -2707,7 +2707,7 @@ object JobManager {
   def getJobManagerProps(
     jobManagerClass: Class[_ <: JobManager],
     configuration: Configuration,
-    futureExecutor: Executor,
+    futureExecutor: ScheduledExecutorService,
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 88d7b3a..64cc97d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -107,13 +107,13 @@ abstract class FlinkMiniCluster(
 
   private var isRunning = false
 
-  val futureExecutor = Executors.newFixedThreadPool(
+  val futureExecutor = Executors.newScheduledThreadPool(
     Hardware.getNumberCPUCores(),
     new NamedThreadFactory("mini-cluster-future-", "-thread"))
 
   val ioExecutor = Executors.newFixedThreadPool(
     Hardware.getNumberCPUCores(),
-    new NamedThreadFactory("mini-cluster-future-", "-thread"))
+    new NamedThreadFactory("mini-cluster-io-", "-thread"))
 
   def configuration: Configuration = {
     if (originalConfiguration.getInteger(

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 4ec655e..adace0b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.minicluster
 
 import java.net.InetAddress
-import java.util.concurrent.Executor
+import java.util.concurrent.{Executor, ScheduledExecutorService}
 
 import akka.actor.{ActorRef, ActorSystem, Props}
 import org.apache.flink.api.common.JobID
@@ -254,7 +254,7 @@ class LocalFlinkMiniCluster(
   def getJobManagerProps(
       jobManagerClass: Class[_ <: JobManager],
       configuration: Configuration,
-      futureExecutor: Executor,
+      futureExecutor: ScheduledExecutorService,
       ioExecutor: Executor,
       instanceManager: InstanceManager,
       scheduler: Scheduler,

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index bc95de7..47e6826 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -93,8 +93,8 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 			CheckpointIDCounter counter,
 			CompletedCheckpointStore store) throws Exception {
 		ExecutionGraph executionGraph = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(),
-			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
 			new JobID(),
 			"test",
 			new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java
index 04839af..41018dd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.clusterframework;
 
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
+
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.StopCluster;
 import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -31,11 +31,12 @@ import org.apache.flink.runtime.testingUtils.TestingMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.TestingResourceManager;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import scala.Option;
 
+import scala.Option;
 
 /**
  * Runs tests to ensure that a cluster is shutdown properly.
@@ -74,8 +75,8 @@ public class ClusterShutdownITCase extends TestLogger {
 			ActorGateway jobManager =
 				TestingUtils.createJobManager(
 					system,
-					system.dispatcher(),
-					system.dispatcher(),
+					TestingUtils.defaultExecutor(),
+					TestingUtils.defaultExecutor(),
 					config,
 					"jobmanager1");
 
@@ -121,8 +122,8 @@ public class ClusterShutdownITCase extends TestLogger {
 			ActorGateway jobManager =
 				TestingUtils.createJobManager(
 					system,
-					system.dispatcher(),
-					system.dispatcher(),
+					TestingUtils.defaultExecutor(),
+					TestingUtils.defaultExecutor(),
 					config,
 					"jobmanager2");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java
index d4ff03e..6191195 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java
@@ -77,8 +77,8 @@ public class ResourceManagerITCase extends TestLogger {
 			ActorGateway jobManager =
 				TestingUtils.createJobManager(
 					system,
-					system.dispatcher(),
-					system.dispatcher(),
+					TestingUtils.defaultExecutor(),
+					TestingUtils.defaultExecutor(),
 					config,
 					"ReconciliationTest");
 			ActorGateway me =
@@ -136,8 +136,8 @@ public class ResourceManagerITCase extends TestLogger {
 			ActorGateway jobManager =
 				TestingUtils.createJobManager(
 					system,
-					system.dispatcher(),
-					system.dispatcher(),
+					TestingUtils.defaultExecutor(),
+					TestingUtils.defaultExecutor(),
 					config,
 					"RegTest");
 			ActorGateway me =

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 9b1064d..46ce3f4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -96,8 +96,8 @@ public class ArchivedExecutionGraphTest {
 		config.setGlobalJobParameters(new TestJobParameters());
 
 		runtimeGraph = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(),
-			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
 			new JobID(),
 			"test job",
 			new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index bf3a17c..aed1095 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -112,8 +112,8 @@ public class ExecutionGraphConstructionTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(),
-			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -162,8 +162,8 @@ public class ExecutionGraphConstructionTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(),
-			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -237,8 +237,8 @@ public class ExecutionGraphConstructionTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(),
-			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -497,8 +497,8 @@ public class ExecutionGraphConstructionTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(),
-			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -562,8 +562,8 @@ public class ExecutionGraphConstructionTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v5, v4));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(),
-			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -631,8 +631,8 @@ public class ExecutionGraphConstructionTest {
 			List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
 			ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
 				jobId, 
 				jobName, 
 				cfg,
@@ -678,8 +678,8 @@ public class ExecutionGraphConstructionTest {
 			List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
 			ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
 				jobId, 
 				jobName,
 				cfg,
@@ -760,8 +760,8 @@ public class ExecutionGraphConstructionTest {
 			JobGraph jg = new JobGraph(jobId, jobName, v1, v2, v3, v4, v5, v6, v7, v8);
 			
 			ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
 				jobId, 
 				jobName, 
 				cfg,

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index ef4f74c..6b05987 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -53,6 +53,7 @@ import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.util.SerializedValue;
 
 import org.junit.Test;
@@ -89,8 +90,8 @@ public class ExecutionGraphDeploymentTest {
 			v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
 
 			ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
 				jobId, 
 				"some job", 
 				new Configuration(),
@@ -313,8 +314,8 @@ public class ExecutionGraphDeploymentTest {
 
 		// execution graph that executes actions synchronously
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.directExecutionContext(),
-			TestingUtils.defaultExecutionContext(),
+			new DirectScheduledExecutorService(),
+			TestingUtils.defaultExecutor(),
 			jobId,
 			"failing test job",
 			new Configuration(),
@@ -358,8 +359,8 @@ public class ExecutionGraphDeploymentTest {
 
 		// execution graph that executes actions synchronously
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.directExecutionContext(),
-			TestingUtils.defaultExecutionContext(),
+			new DirectScheduledExecutorService(),
+			TestingUtils.defaultExecutor(),
 			jobId, 
 			"some job", 
 			new Configuration(), 

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 41e47d2..6b28984 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -66,8 +66,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -82,7 +82,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 	 */
 	@Test
 	public void testExecutionGraphRestartTimeMetric() throws JobException, IOException, InterruptedException {
-		final ExecutorService executor = Executors.newCachedThreadPool();
+		final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
 		try {
 			// setup execution graph with mocked scheduling logic
 			int parallelism = 1;

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 11f12a5..e4f49bb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -232,8 +232,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		// Blocking program
 		ExecutionGraph executionGraph = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(),
-			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
 			new JobID(),
 			"TestJob",
 			new Configuration(),
@@ -548,8 +548,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		ControllableRestartStrategy controllableRestartStrategy = new ControllableRestartStrategy(timeout);
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(),
-			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
 			new JobID(),
 			"Test job",
 			new Configuration(),
@@ -682,8 +682,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 	private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy) throws IOException {
 		return new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(),
-			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
 			new JobID(),
 			"Test job",
 			new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
index fde967e..b7850fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
@@ -132,8 +132,8 @@ public class ExecutionGraphSignalsTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
 		eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(),
-			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
 			jobId,
 			jobName,
 			cfg,

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 71ae3b6..ef94917 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -24,7 +24,7 @@ import static org.mockito.Mockito.spy;
 
 import java.lang.reflect.Field;
 import java.net.InetAddress;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
@@ -168,7 +168,10 @@ public class ExecutionGraphTestUtils {
 
 	public static final String ERROR_MESSAGE = "test_failure_error_message";
 
-	public static ExecutionJobVertex getExecutionVertex(JobVertexID id, Executor executor) throws Exception {
+	public static ExecutionJobVertex getExecutionVertex(
+			JobVertexID id, ScheduledExecutorService executor) 
+		throws Exception {
+
 		JobVertex ajv = new JobVertex("TestVertex", id);
 		ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
 
@@ -200,6 +203,6 @@ public class ExecutionGraphTestUtils {
 	}
 	
 	public static ExecutionJobVertex getExecutionVertex(JobVertexID id) throws Exception {
-		return getExecutionVertex(id, TestingUtils.defaultExecutionContext());
+		return getExecutionVertex(id, TestingUtils.defaultExecutor());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index 19e2d6d..7427a1d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -52,8 +52,8 @@ public class ExecutionStateProgressTest {
 			ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
 
 			ExecutionGraph graph = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
 				jid, 
 				"test job", 
 				new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index b10b796..7b6c6ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -39,8 +39,10 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 
 import org.junit.Test;
+
 import scala.concurrent.ExecutionContext;
 
 @SuppressWarnings("serial")
@@ -121,7 +123,6 @@ public class ExecutionVertexCancelTest {
 
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 					AkkaUtils.getDefaultTimeout());
-			final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
 
 			setVertexState(vertex, ExecutionState.SCHEDULED);
 			assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
@@ -187,7 +188,6 @@ public class ExecutionVertexCancelTest {
 
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 					AkkaUtils.getDefaultTimeout());
-			final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
 
 			setVertexState(vertex, ExecutionState.SCHEDULED);
 			assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
@@ -249,11 +249,10 @@ public class ExecutionVertexCancelTest {
 	public void testCancelFromRunning() {
 		try {
 			final JobVertexID jid = new JobVertexID();
-			final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext());
+			final ExecutionJobVertex ejv = getExecutionVertex(jid, new DirectScheduledExecutorService());
 
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 					AkkaUtils.getDefaultTimeout());
-			final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
 
 			ActorGateway actorGateway = new CancelSequenceActorGateway(
 					TestingUtils.directExecutionContext(),
@@ -268,7 +267,7 @@ public class ExecutionVertexCancelTest {
 			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
 
 			vertex.cancel();
-			vertex.getCurrentExecutionAttempt().cancelingComplete(); // responce by task manager once actially canceled
+			vertex.getCurrentExecutionAttempt().cancelingComplete(); // response by task manager once actually canceled
 
 			assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
 
@@ -290,11 +289,10 @@ public class ExecutionVertexCancelTest {
 		try {
 
 			final JobVertexID jid = new JobVertexID();
-			final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext());
+			final ExecutionJobVertex ejv = getExecutionVertex(jid, new DirectScheduledExecutorService());
 
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 					AkkaUtils.getDefaultTimeout());
-			final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
 
 			final ActorGateway actorGateway = new CancelSequenceActorGateway(
 					TestingUtils.directExecutionContext(),
@@ -339,12 +337,10 @@ public class ExecutionVertexCancelTest {
 		// this may happen when the task finished or failed while the call was in progress
 		try {
 			final JobVertexID jid = new JobVertexID();
-			final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext());
+			final ExecutionJobVertex ejv = getExecutionVertex(jid, new DirectScheduledExecutorService());
 
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 					AkkaUtils.getDefaultTimeout());
-			final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
-
 
 			final ActorGateway actorGateway = new CancelSequenceActorGateway(
 					TestingUtils.directExecutionContext(),
@@ -376,7 +372,7 @@ public class ExecutionVertexCancelTest {
 	public void testCancelCallFails() {
 		try {
 			final JobVertexID jid = new JobVertexID();
-			final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext());
+			final ExecutionJobVertex ejv = getExecutionVertex(jid, new DirectScheduledExecutorService());
 
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 					AkkaUtils.getDefaultTimeout());

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index c04ebc6..26cb3f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.junit.Test;
 
 import java.util.Collection;
@@ -96,7 +97,7 @@ public class ExecutionVertexDeploymentTest {
 		try {
 			final JobVertexID jid = new JobVertexID();
 
-			final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext());
+			final ExecutionJobVertex ejv = getExecutionVertex(jid, new DirectScheduledExecutorService());
 
 			final Instance instance = getInstance(
 				new ActorTaskManagerGateway(
@@ -180,7 +181,7 @@ public class ExecutionVertexDeploymentTest {
 	public void testDeployFailedSynchronous() {
 		try {
 			final JobVertexID jid = new JobVertexID();
-			final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext());
+			final ExecutionJobVertex ejv = getExecutionVertex(jid, new DirectScheduledExecutorService());
 
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 				AkkaUtils.getDefaultTimeout());

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index 36b7575..47863ac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.instance.SimpleSlot;
@@ -39,6 +38,7 @@ import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.TestLogger;
 
@@ -207,8 +207,8 @@ public class ExecutionVertexLocalityTest extends TestLogger {
 				null,
 				testJob,
 				new Configuration(),
-				Executors.directExecutor(),
-				Executors.directExecutor(),
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
 				getClass().getClassLoader(),
 				new StandaloneCheckpointRecoveryFactory(),
 				Time.of(10, TimeUnit.SECONDS),

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
index 44dc0a4..32867bf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
@@ -33,6 +33,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
 
 import static org.mockito.Mockito.mock;
 
@@ -48,7 +49,7 @@ public class LegacyJobVertexIdTest {
 		jobVertex.setInvokableClass(AbstractInvokable.class);
 
 		ExecutionGraph executionGraph = new ExecutionGraph(
-				mock(Executor.class),
+				mock(ScheduledExecutorService.class),
 				mock(Executor.class),
 				new JobID(),
 				"test",

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index 0e147e3..3a7e759 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -66,8 +66,8 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(),
-			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutor(), 
+			TestingUtils.defaultExecutor(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -112,8 +112,8 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(),
-			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -159,8 +159,8 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(),
-			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -207,8 +207,8 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(),
-			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
 			jobId, 
 			jobName,
 			cfg,
@@ -253,8 +253,8 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(),
-			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -319,8 +319,8 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(),
-			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -376,8 +376,8 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(),
-			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
 			jobId, 
 			jobName, 
 			cfg,

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index 5b1a03e..afdafe3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -180,8 +180,8 @@ public class TerminalStateDeadlockTest {
 
 		TestExecGraph(JobID jobId) throws IOException {
 			super(
-				TestingUtils.defaultExecutionContext(),
-				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
 				jobId,
 				"test graph",
 				EMPTY_CONFIG,

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
index 27708a2..4e1bfae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
@@ -80,8 +80,8 @@ public class VertexSlotSharingTest {
 			List<JobVertex> vertices = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 			
 			ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
 				new JobID(),
 				"test job",
 				new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 398505f..8985a34 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -45,7 +45,6 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
@@ -80,11 +79,13 @@ import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
 import org.apache.flink.util.InstantiationUtil;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+
 import scala.Int;
 import scala.Option;
 import scala.PartialFunction;
@@ -105,8 +106,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -156,8 +156,6 @@ public class JobManagerHARecoveryTest {
 		flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());
 		flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots);
 
-		ExecutorService executor = null;
-
 		try {
 			Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 
@@ -175,13 +173,11 @@ public class JobManagerHARecoveryTest {
 					MemoryArchivist.class,
 					10), "archive");
 
-			executor = new ForkJoinPool();
-
 			Props jobManagerProps = Props.create(
 				TestingJobManager.class,
 				flinkConfiguration,
-				executor,
-				executor,
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
 				instanceManager,
 				scheduler,
 				new BlobLibraryCacheManager(new BlobServer(flinkConfiguration), 3600000),
@@ -309,10 +305,6 @@ public class JobManagerHARecoveryTest {
 			if (taskManager != null) {
 				taskManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
 			}
-
-			if (executor != null) {
-				executor.shutdownNow();
-			}
 		}
 	}
 
@@ -353,8 +345,8 @@ public class JobManagerHARecoveryTest {
 			Props jobManagerProps = Props.create(
 				TestingFailingHAJobManager.class,
 				flinkConfiguration,
-				Executors.directExecutor(),
-				Executors.directExecutor(),
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
 				mock(InstanceManager.class),
 				mock(Scheduler.class),
 				new BlobLibraryCacheManager(mock(BlobService.class), 1 << 20),
@@ -390,7 +382,7 @@ public class JobManagerHARecoveryTest {
 
 		public TestingFailingHAJobManager(
 			Configuration flinkConfiguration,
-			Executor futureExecutor,
+			ScheduledExecutorService futureExecutor,
 			Executor ioExecutor,
 			InstanceManager instanceManager,
 			Scheduler scheduler,

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index b627273..c5f6d99 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -577,8 +577,8 @@ public class JobManagerTest {
 				JobManager.startJobManagerActors(
 					config,
 					system,
-					system.dispatcher(),
-					system.dispatcher(),
+					TestingUtils.defaultExecutor(),
+					TestingUtils.defaultExecutor(),
 					TestingJobManager.class,
 					MemoryArchivist.class)._1(),
 				leaderSessionId);
@@ -787,8 +787,8 @@ public class JobManagerTest {
 			Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
 				config,
 				actorSystem,
-				actorSystem.dispatcher(),
-				actorSystem.dispatcher(),
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
 				Option.apply("jm"),
 				Option.apply("arch"),
 				TestingJobManager.class,
@@ -912,8 +912,8 @@ public class JobManagerTest {
 			Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
 				config,
 				actorSystem,
-				actorSystem.dispatcher(),
-				actorSystem.dispatcher(),
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
 				Option.apply("jm"),
 				Option.apply("arch"),
 				TestingJobManager.class,
@@ -1017,8 +1017,8 @@ public class JobManagerTest {
 			Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
 				config,
 				actorSystem,
-				actorSystem.dispatcher(),
-				actorSystem.dispatcher(),
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
 				Option.apply("jm"),
 				Option.apply("arch"),
 				TestingJobManager.class,
@@ -1116,8 +1116,8 @@ public class JobManagerTest {
 			Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
 				new Configuration(),
 				actorSystem,
-				actorSystem.dispatcher(),
-				actorSystem.dispatcher(),
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
 				Option.apply("jm"),
 				Option.apply("arch"),
 				TestingJobManager.class,

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index c5b6697..feb3d4d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.util.NetUtils;
@@ -84,8 +85,8 @@ public class JobSubmitTest {
 		JobManager.startJobManagerActors(
 			jmConfig,
 			jobManagerSystem,
-			jobManagerSystem.dispatcher(),
-			jobManagerSystem.dispatcher(),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
 			JobManager.class,
 			MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index f051281..d6257ba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -25,6 +25,7 @@ import akka.actor.Props;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import akka.util.Timeout;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.TestingServer;
 import org.apache.flink.configuration.Configuration;
@@ -45,18 +46,18 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+
 import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
-import scala.concurrent.forkjoin.ForkJoinPool;
 
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 public class JobManagerLeaderElectionTest extends TestLogger {
@@ -66,8 +67,7 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 
 	private static ActorSystem actorSystem;
 	private static TestingServer testingServer;
-	private static ExecutorService executor;
-	
+
 	private static Timeout timeout = new Timeout(TestingUtils.TESTING_DURATION());
 	private static FiniteDuration duration = new FiniteDuration(5, TimeUnit.MINUTES);
 
@@ -75,7 +75,6 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 	public static void setup() throws Exception {
 		actorSystem = ActorSystem.create("TestingActorSystem");
 		testingServer = new TestingServer();
-		executor = new ForkJoinPool();
 	}
 
 	@AfterClass
@@ -87,10 +86,6 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 		if (testingServer != null) {
 			testingServer.stop();
 		}
-		
-		if (executor != null) {
-			executor.shutdownNow();
-		}
 	}
 
 	/**
@@ -185,8 +180,8 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 		return Props.create(
 			TestingJobManager.class,
 			configuration,
-			executor,
-			executor,
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
 			new InstanceManager(),
 			new Scheduler(TestingUtils.defaultExecutionContext()),
 			new BlobLibraryCacheManager(new BlobServer(configuration), 10L),

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index cb37905..aed2b6f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -60,8 +61,8 @@ public class TaskManagerMetricsTest {
 			final ActorRef jobManager = JobManager.startJobManagerActors(
 				new Configuration(),
 				actorSystem,
-				actorSystem.dispatcher(),
-				actorSystem.dispatcher(),
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
 				JobManager.class,
 				MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 873ec8e..a6e1a2b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -40,7 +40,6 @@ import org.apache.flink.runtime.io.network.LocalConnectionManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
@@ -52,6 +51,7 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 
 import org.junit.Test;
@@ -86,8 +86,8 @@ public class TaskManagerComponentsStartupShutdownTest {
 			final ActorRef jobManager = JobManager.startJobManagerActors(
 				config,
 				actorSystem,
-				actorSystem.dispatcher(),
-				actorSystem.dispatcher(),
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
 				JobManager.class,
 				MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
index 385d1ac..2528e24 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.util.NetUtils;
 import org.junit.Test;
@@ -100,8 +101,8 @@ public abstract class TaskManagerProcessReapingTestBase {
 			ActorRef jmActor = JobManager.startJobManagerActors(
 				new Configuration(),
 				jmActorSystem,
-				jmActorSystem.dispatcher(),
-				jmActorSystem.dispatcher(),
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
 				JobManager.class,
 				MemoryArchivist.class)._1;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 5753349..e234cba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -108,8 +108,8 @@ public class TaskManagerRegistrationTest extends TestLogger {
 				// a simple JobManager
 				jobManager = createJobManager(
 					actorSystem,
-					actorSystem.dispatcher(),
-					actorSystem.dispatcher(),
+					TestingUtils.defaultExecutor(),
+					TestingUtils.defaultExecutor(),
 					config);
 				startResourceManager(config, jobManager.actor());
 
@@ -192,8 +192,8 @@ public class TaskManagerRegistrationTest extends TestLogger {
 				// now start the JobManager, with the regular akka URL
 				jobManager = createJobManager(
 					actorSystem,
-					actorSystem.dispatcher(),
-					actorSystem.dispatcher(),
+					TestingUtils.defaultExecutor(),
+					TestingUtils.defaultExecutor(),
 					new Configuration());
 
 				startResourceManager(config, jobManager.actor());
@@ -635,8 +635,8 @@ public class TaskManagerRegistrationTest extends TestLogger {
 		return JobManager.startJobManagerActors(
 			configuration,
 			actorSystem,
-			actorSystem.dispatcher(),
-			actorSystem.dispatcher(),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
 			NONE_STRING,
 			NONE_STRING,
 			JobManager.class,

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DirectScheduledExecutorService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DirectScheduledExecutorService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DirectScheduledExecutorService.java
new file mode 100644
index 0000000..06d632b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DirectScheduledExecutorService.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.runtime.concurrent.Executors;
+
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This implements a variant of the {@link Executors#directExecutor()} that also implements the
+ * {@link ScheduledExecutorService} interface.
+ * Scheduled executables are actually scheduled, all other (call / execute) execute synchronously.
+ */
+public class DirectScheduledExecutorService extends AbstractExecutorService implements ScheduledExecutorService {
+
+	private final ScheduledExecutorService scheduledService = 
+			java.util.concurrent.Executors.newSingleThreadScheduledExecutor();
+
+	// ------------------------------------------------------------------------
+	//  Direct Executor 
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void execute(Runnable command) {
+		if (!isShutdown()) {
+			command.run();
+		} else {
+			throw new RejectedExecutionException();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Scheduled Executor 
+	// ------------------------------------------------------------------------
+
+
+	@Override
+	public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+		return scheduledService.schedule(command, delay, unit);
+	}
+
+	@Override
+	public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+		return scheduledService.schedule(callable, delay, unit);
+	}
+
+	@Override
+	public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+		return scheduledService.scheduleAtFixedRate(command, initialDelay, period, unit);
+	}
+
+	@Override
+	public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+		return scheduledService.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Shutdown 
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void shutdown() {
+		scheduledService.shutdown();
+	}
+
+	@Override
+	public List<Runnable> shutdownNow() {
+		return scheduledService.shutdownNow();
+	}
+
+	@Override
+	public boolean isShutdown() {
+		return scheduledService.isShutdown();
+	}
+
+	@Override
+	public boolean isTerminated() {
+		return scheduledService.isTerminated();
+	}
+
+	@Override
+	public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+		return scheduledService.awaitTermination(timeout, unit);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
index 6f833f1..258e44e 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -18,19 +18,22 @@
 
 package org.apache.flink.runtime.executiongraph
 
+import java.util.concurrent.Executors
+
 import org.apache.flink.api.common.{ExecutionConfig, JobID}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertex}
-import org.apache.flink.runtime.jobmanager.Tasks
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway
 import org.apache.flink.runtime.testingUtils.TestingUtils
 import org.apache.flink.runtime.testtasks.NoOpInvokable
 import org.apache.flink.util.SerializedValue
+
 import org.junit.runner.RunWith
+
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{Matchers, WordSpecLike}
 
@@ -39,6 +42,8 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers {
 
   "A task manager loss" must {
     "fail the assigned tasks" in {
+      val executor = Executors.newScheduledThreadPool(1)
+
       try {
         val instance1 = ExecutionGraphTestUtils.getInstance(
           new ActorTaskManagerGateway(new SimpleActorGateway(TestingUtils.defaultExecutionContext)),
@@ -58,8 +63,8 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers {
         val jobGraph = new JobGraph("Pointwise job", sender)
 
         val eg = new ExecutionGraph(
-          TestingUtils.defaultExecutionContext,
-          TestingUtils.defaultExecutionContext,
+          executor,
+          executor,
           new JobID(),
           "test job",
           new Configuration(),
@@ -81,6 +86,9 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers {
           t.printStackTrace()
           fail(t.getMessage)
       }
+      finally {
+        executor.shutdownNow()
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index cf00206..dfcbf77 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmanager
 
 import java.net.InetAddress
+import java.util.concurrent.{Executors, ScheduledExecutorService}
 
 import akka.actor._
 import akka.testkit.{ImplicitSender, TestKit}
@@ -51,7 +52,10 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
 
   def this() = this(AkkaUtils.createLocalActorSystem(new Configuration()))
 
+  val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(2)
+  
   override def afterAll(): Unit = {
+    executor.shutdownNow()
     TestKit.shutdownActorSystem(system)
   }
 
@@ -172,8 +176,8 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
     val (jm: ActorRef, _) = JobManager.startJobManagerActors(
       new Configuration(),
       _system,
-      _system.dispatcher,
-      _system.dispatcher,
+      executor,
+      executor,
       None,
       None,
       classOf[JobManager],

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index d6215eb..bd4a8fc 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.testingUtils
 
 import java.io.IOException
-import java.util.concurrent.{Executor, TimeUnit, TimeoutException}
+import java.util.concurrent.{Executor, ScheduledExecutorService, TimeUnit, TimeoutException}
 
 import akka.actor.{ActorRef, ActorSystem, Props}
 import akka.pattern.Patterns._
@@ -85,7 +85,7 @@ class TestingCluster(
   override def getJobManagerProps(
     jobManagerClass: Class[_ <: JobManager],
     configuration: Configuration,
-    futureExecutor: Executor,
+    futureExecutor: ScheduledExecutorService,
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 39c7a53..f50a832 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.testingUtils
 
-import java.util.concurrent.{Executor, ExecutorService}
+import java.util.concurrent.{Executor, ScheduledExecutorService}
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
@@ -39,7 +39,7 @@ import scala.language.postfixOps
   */
 class TestingJobManager(
     flinkConfiguration: Configuration,
-    futureExecutor: Executor,
+    futureExecutor: ScheduledExecutorService,
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,