You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/13 18:54:08 UTC

[5/7] flink git commit: [FLINK-2329] [runtime] Introduces InstanceGateway as an abstraction to communicate with the TaskManager.

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
index f987e07..3e90123 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
@@ -32,11 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.flink.runtime.instance.SimpleSlot;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -45,21 +41,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
  * Tests for the scheduler when scheduling tasks in slot sharing groups.
  */
 public class SchedulerSlotSharingTest {
-	private static ActorSystem system;
 
-	@BeforeClass
-	public static void setup(){
-		system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
-		TestingUtils.setCallingThreadDispatcher(system);
-	}
-
-	@AfterClass
-	public static void teardown(){
-		TestingUtils.setGlobalExecutionContext();
-		JavaTestKit.shutdownActorSystem(system);
-	}
-
-	
 	@Test
 	public void scheduleSingleVertexType() {
 		try {
@@ -67,7 +49,7 @@ public class SchedulerSlotSharingTest {
 			
 			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1);
 			
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
 			Instance i1 = getRandomInstance(2);
 			Instance i2 = getRandomInstance(2);
 			scheduler.newInstanceAvailable(i1);
@@ -154,7 +136,7 @@ public class SchedulerSlotSharingTest {
 			
 			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
 			
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
@@ -274,7 +256,7 @@ public class SchedulerSlotSharingTest {
 			
 			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
 			
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
@@ -339,7 +321,7 @@ public class SchedulerSlotSharingTest {
 			
 			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3);
 			
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
@@ -450,7 +432,7 @@ public class SchedulerSlotSharingTest {
 			
 			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
 			
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
 			// schedule 1 tasks from the first vertex group and 2 from the second
@@ -502,7 +484,7 @@ public class SchedulerSlotSharingTest {
 			
 			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
 			
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
 			scheduler.newInstanceAvailable(getRandomInstance(3));
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
@@ -649,7 +631,7 @@ public class SchedulerSlotSharingTest {
 			Instance i1 = getRandomInstance(2);
 			Instance i2 = getRandomInstance(2);
 			
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
 			scheduler.newInstanceAvailable(i1);
 			scheduler.newInstanceAvailable(i2);
 			
@@ -699,7 +681,7 @@ public class SchedulerSlotSharingTest {
 			Instance i1 = getRandomInstance(2);
 			Instance i2 = getRandomInstance(2);
 			
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
 			scheduler.newInstanceAvailable(i1);
 			scheduler.newInstanceAvailable(i2);
 			
@@ -749,7 +731,7 @@ public class SchedulerSlotSharingTest {
 			Instance i1 = getRandomInstance(2);
 			Instance i2 = getRandomInstance(2);
 			
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
 			scheduler.newInstanceAvailable(i1);
 			scheduler.newInstanceAvailable(i2);
 			
@@ -795,7 +777,6 @@ public class SchedulerSlotSharingTest {
 	
 	@Test
 	public void testSequentialAllocateAndRelease() {
-		TestingUtils.setGlobalExecutionContext();
 		try {
 			final JobVertexID jid1 = new JobVertexID();
 			final JobVertexID jid2 = new JobVertexID();
@@ -804,7 +785,7 @@ public class SchedulerSlotSharingTest {
 			
 			final SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3, jid4);
 			
-			final Scheduler scheduler = new Scheduler();
+			final Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 			scheduler.newInstanceAvailable(getRandomInstance(4));
 			
 			// allocate something from group 1 and 2 interleaved with schedule for group 3
@@ -853,15 +834,13 @@ public class SchedulerSlotSharingTest {
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
-		}finally{
-			TestingUtils.setCallingThreadDispatcher(system);
 		}
 	}
 	
 	@Test
 	public void testConcurrentAllocateAndRelease() {
 		final ExecutorService executor = Executors.newFixedThreadPool(20);
-		TestingUtils.setGlobalExecutionContext();
+
 		try {
 			for (int run = 0; run < 50; run++) {
 				final JobVertexID jid1 = new JobVertexID();
@@ -871,7 +850,7 @@ public class SchedulerSlotSharingTest {
 				
 				final SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3, jid4);
 				
-				final Scheduler scheduler = new Scheduler();
+				final Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 				scheduler.newInstanceAvailable(getRandomInstance(4));
 				
 				final AtomicInteger enumerator1 = new AtomicInteger();
@@ -1030,10 +1009,6 @@ public class SchedulerSlotSharingTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			executor.shutdownNow();
-			TestingUtils.setCallingThreadDispatcher(system);
-		}
 	}
 	
 	@Test
@@ -1046,7 +1021,7 @@ public class SchedulerSlotSharingTest {
 			
 			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3, jid4);
 			
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
 			scheduler.newInstanceAvailable(getRandomInstance(4));
 			
 			// schedule one task for the first and second vertex

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index 694b88b..2de0635 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -29,9 +29,9 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import akka.actor.ActorRef;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.DummyInstanceGateway;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
@@ -66,7 +66,7 @@ public class SchedulerTestUtils {
 		final long GB = 1024L*1024*1024;
 		HardwareDescription resources = new HardwareDescription(4, 4*GB, 3*GB, 2*GB);
 		
-		return new Instance(ActorRef.noSender(), ci, new InstanceID(), resources, numSlots);
+		return new Instance(DummyInstanceGateway.INSTANCE, ci, new InstanceID(), resources, numSlots);
 	}
 	
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index a67cd00..442ddcf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 
 import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
 import scala.Option;
 import scala.Tuple2;
@@ -89,7 +90,10 @@ public class TaskManagerComponentsStartupShutdownTest {
 
 			final MemoryManager memManager = new DefaultMemoryManager(32 * BUFFER_SIZE, 1, BUFFER_SIZE, false);
 			final IOManager ioManager = new IOManagerAsync(TMP_DIR);
-			final NetworkEnvironment network = new NetworkEnvironment(timeout, netConf);
+			final NetworkEnvironment network = new NetworkEnvironment(
+				TestingUtils.defaultExecutionContext(),
+				timeout,
+				netConf);
 			final int numberOfSlots = 1;
 
 			// create the task manager

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
index a1ca43c..b3090e6 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
@@ -18,36 +18,27 @@
 
 package org.apache.flink.runtime.executiongraph
 
-import akka.actor.{Props, ActorSystem}
-import akka.testkit.TestKit
 import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleInstanceGateway
 import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.testingUtils.TestingUtils
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+import org.scalatest.{Matchers, WordSpecLike}
 
-class ExecutionGraphRestartTest(_system: ActorSystem) extends TestKit(_system) with WordSpecLike
-with Matchers with BeforeAndAfterAll {
-
-  def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig))
-
-  override def afterAll(): Unit = {
-    TestKit.shutdownActorSystem(system)
-  }
+class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
 
   val NUM_TASKS = 31
 
   "The execution graph" must {
     "be manually restartable" in {
       try {
-        val tm = system.actorOf(Props(classOf[ExecutionGraphTestUtils
-        .SimpleAcknowledgingTaskManager], "TaskManager"))
-        val instance = ExecutionGraphTestUtils.getInstance(tm)
+        val instance = ExecutionGraphTestUtils.getInstance(
+          new SimpleInstanceGateway(TestingUtils.directExecutionContext))
 
-        val scheduler = new Scheduler
+        val scheduler = new Scheduler(TestingUtils.defaultExecutionContext)
         scheduler.newInstanceAvailable(instance)
 
         val sender = new JobVertex("Task")
@@ -56,7 +47,11 @@ with Matchers with BeforeAndAfterAll {
 
         val jobGraph = new JobGraph("Pointwise job", sender)
 
-        val eg = new ExecutionGraph(new JobID(), "test job", new Configuration(),
+        val eg = new ExecutionGraph(
+          TestingUtils.defaultExecutionContext,
+          new JobID(),
+          "test job",
+          new Configuration(),
           AkkaUtils.getDefaultTimeout)
         eg.setNumberOfRetriesLeft(0)
         eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources)
@@ -87,11 +82,10 @@ with Matchers with BeforeAndAfterAll {
 
     "restart itself automatically" in {
       try {
-        val tm = system.actorOf(Props
-          (classOf[ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager], "TaskManager"))
-        val instance = ExecutionGraphTestUtils.getInstance(tm)
+        val instance = ExecutionGraphTestUtils.getInstance(
+          new SimpleInstanceGateway(TestingUtils.directExecutionContext))
 
-        val scheduler = new Scheduler
+        val scheduler = new Scheduler(TestingUtils.defaultExecutionContext)
         scheduler.newInstanceAvailable(instance)
 
         val sender = new JobVertex("Task")
@@ -100,7 +94,11 @@ with Matchers with BeforeAndAfterAll {
 
         val jobGraph = new JobGraph("Pointwise job", sender)
 
-        val eg = new ExecutionGraph(new JobID(), "Test job", new Configuration(),
+        val eg = new ExecutionGraph(
+          TestingUtils.defaultExecutionContext,
+          new JobID(),
+          "Test job",
+          new Configuration(),
           AkkaUtils.getDefaultTimeout)
         eg.setNumberOfRetriesLeft(1)
         eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources)

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
index 13199bc..aaa0025 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -18,38 +18,27 @@
 
 package org.apache.flink.runtime.executiongraph
 
-import akka.actor.{Props, ActorSystem}
-import akka.testkit.TestKit
 import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils
-.SimpleAcknowledgingTaskManager
+import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleInstanceGateway
 import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.testingUtils.TestingUtils
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+import org.scalatest.{Matchers, WordSpecLike}
 
-class TaskManagerLossFailsTasksTest(_system: ActorSystem) extends TestKit(_system) with
-WordSpecLike with Matchers with BeforeAndAfterAll {
-
-  def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig))
-
-  override def afterAll(): Unit = {
-    TestKit.shutdownActorSystem(system)
-  }
+class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers {
 
   "A task manager loss" must {
     "fail the assigned tasks" in {
       try {
-        val tm1 = system.actorOf(Props(classOf[SimpleAcknowledgingTaskManager], "TaskManager1"))
-        val tm2 = system.actorOf(Props(classOf[SimpleAcknowledgingTaskManager], "TaskManager2"))
-
-        val instance1 = ExecutionGraphTestUtils.getInstance(tm1, 10)
-        val instance2 = ExecutionGraphTestUtils.getInstance(tm2, 10)
+        val instance1 = ExecutionGraphTestUtils.getInstance(
+          new SimpleInstanceGateway(TestingUtils.defaultExecutionContext), 10)
+        val instance2 = ExecutionGraphTestUtils.getInstance(
+          new SimpleInstanceGateway(TestingUtils.defaultExecutionContext), 10)
 
-        val scheduler = new Scheduler
+        val scheduler = new Scheduler(TestingUtils.defaultExecutionContext)
         scheduler.newInstanceAvailable(instance1)
         scheduler.newInstanceAvailable(instance2)
 
@@ -59,7 +48,11 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
         val jobGraph = new JobGraph("Pointwise job", sender)
 
-        val eg = new ExecutionGraph(new JobID(), "test job", new Configuration(),
+        val eg = new ExecutionGraph(
+          TestingUtils.defaultExecutionContext,
+          new JobID(),
+          "test job",
+          new Configuration(),
           AkkaUtils.getDefaultTimeout)
         eg.setNumberOfRetriesLeft(0)
         eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources)

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index 766ea55..f7bf56a 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -177,12 +177,12 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
           jm ! NotifyWhenJobStatus(jobGraph.getJobID, JobStatus.RESTARTING)
           jm ! RequestWorkingTaskManager(jobGraph.getJobID)
 
-          val WorkingTaskManager(tm) = expectMsgType[WorkingTaskManager]
+          val WorkingTaskManager(gatewayOption) = expectMsgType[WorkingTaskManager]
 
-          tm match {
-            case ActorRef.noSender => fail("There has to be at least one task manager on which" +
+          gatewayOption match {
+            case None => fail("There has to be at least one task manager on which" +
               "the tasks are running.")
-            case t => t ! PoisonPill
+            case Some(gateway) => gateway.tell(PoisonPill)
           }
 
           expectMsg(JobStatusIs(jobGraph.getJobID, JobStatus.RESTARTING))

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 11e93d6..219e5ae 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -60,16 +60,34 @@ class TestingCluster(userConfiguration: Configuration,
 
   override def startJobManager(actorSystem: ActorSystem): ActorRef = {
 
-    val (instanceManager, scheduler, libraryCacheManager, _, accumulatorManager,
-    executionRetries, delayBetweenRetries,
-    timeout, archiveCount) = JobManager.createJobManagerComponents(configuration)
+    val (executionContext,
+      instanceManager,
+      scheduler,
+      libraryCacheManager,
+      _,
+      accumulatorManager,
+      executionRetries,
+      delayBetweenRetries,
+      timeout,
+      archiveCount) = JobManager.createJobManagerComponents(configuration)
     
     val testArchiveProps = Props(new MemoryArchivist(archiveCount) with TestingMemoryArchivist)
     val archive = actorSystem.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME)
     
-    val jobManagerProps = Props(new JobManager(configuration, instanceManager, scheduler,
-      libraryCacheManager, archive, accumulatorManager, executionRetries,
-      delayBetweenRetries, timeout, streamingMode) with TestingJobManager)
+    val jobManagerProps = Props(
+      new JobManager(
+        configuration,
+        executionContext,
+        instanceManager,
+        scheduler,
+        libraryCacheManager,
+        archive,
+        accumulatorManager,
+        executionRetries,
+        delayBetweenRetries,
+        timeout,
+        streamingMode)
+      with TestingJobManager)
 
     actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 89e1d72..5747b7e 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -106,11 +106,10 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala {
 
 
     case NotifyWhenJobRemoved(jobID) =>
-      val tms = instanceManager.getAllRegisteredInstances.map(_.getTaskManager)
+      val gateways = instanceManager.getAllRegisteredInstances.map(_.getInstanceGateway)
 
-      val responses = tms.map{
-        tm =>
-          (tm ? NotifyWhenJobRemoved(jobID))(timeout).mapTo[Boolean]
+      val responses = gateways.map{
+        gateway => gateway.ask(NotifyWhenJobRemoved(jobID), timeout).mapTo[Boolean]
       }
 
       import context.dispatcher
@@ -135,17 +134,17 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala {
       currentJobs.get(jobID) match {
         case Some((eg, _)) =>
           if(eg.getAllExecutionVertices.isEmpty){
-            sender ! WorkingTaskManager(ActorRef.noSender)
+            sender ! WorkingTaskManager(None)
           } else {
             val resource = eg.getAllExecutionVertices.head.getCurrentAssignedResource
 
             if(resource == null){
-              sender ! WorkingTaskManager(ActorRef.noSender)
+              sender ! WorkingTaskManager(None)
             } else {
-              sender ! WorkingTaskManager(resource.getInstance().getTaskManager)
+              sender ! WorkingTaskManager(Some(resource.getInstance().getInstanceGateway))
             }
           }
-        case None => sender ! WorkingTaskManager(ActorRef.noSender)
+        case None => sender ! WorkingTaskManager(None)
       }
 
     case NotifyWhenJobStatus(jobID, state) =>

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index f810749..241c6c0 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.testingUtils
 import akka.actor.ActorRef
 import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.executiongraph.ExecutionGraph
+import org.apache.flink.runtime.instance.InstanceGateway
 import org.apache.flink.runtime.jobgraph.JobStatus
 
 object TestingJobManagerMessages {
@@ -43,7 +44,7 @@ object TestingJobManagerMessages {
   case class NotifyWhenJobRemoved(jobID: JobID)
 
   case class RequestWorkingTaskManager(jobID: JobID)
-  case class WorkingTaskManager(taskManager: ActorRef)
+  case class WorkingTaskManager(gatewayOption: Option[InstanceGateway])
 
   case class NotifyWhenJobStatus(jobID: JobID, state: JobStatus)
   case class JobStatusIs(jobID: JobID, state: JobStatus)

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 3611633..914f37c 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -18,14 +18,12 @@
 
 package org.apache.flink.runtime.testingUtils
 
-import akka.actor.{ActorRef, ActorSystem}
-import akka.testkit.CallingThreadDispatcher
+import com.google.common.util.concurrent.MoreExecutors
 
 import com.typesafe.config.ConfigFactory
 
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ActionQueue
 
 import scala.concurrent.duration._
 import scala.concurrent.ExecutionContext
@@ -67,19 +65,33 @@ object TestingUtils {
     new TestingCluster(config)
   }
 
-  def setGlobalExecutionContext(): Unit = {
-    AkkaUtils.globalExecutionContext = ExecutionContext.global
+  /** Returns the global [[ExecutionContext]] which is a [[scala.concurrent.forkjoin.ForkJoinPool]]
+    * with a default parallelism equal to the number of available cores.
+    *
+    * @return ExecutionContext.global
+    */
+  def defaultExecutionContext = ExecutionContext.global
+
+  /** Returns an [[ExecutionContext]] which uses the current thread to execute the runnable.
+    *
+    * @return Direct [[ExecutionContext]] which executes runnables directly
+    */
+  def directExecutionContext = ExecutionContext.fromExecutor(MoreExecutors.directExecutor())
+
+  /** @return A new [[QueuedActionExecutionContext]] */
+  def queuedActionExecutionContext = {
+    new QueuedActionExecutionContext(new ActionQueue())
   }
 
-  def setCallingThreadDispatcher(system: ActorSystem): Unit = {
-    AkkaUtils.globalExecutionContext = system.dispatchers.lookup(CallingThreadDispatcher.Id)
-  }
-
-  def setExecutionContext(context: ExecutionContext): Unit = {
-    AkkaUtils.globalExecutionContext = context
-  }
+  /** [[ExecutionContext]] which queues [[Runnable]] up in an [[ActionQueue]] instead of
+    * execution them. If the automatic execution mode is activated, then the [[Runnable]] are
+    * executed.
+    *
+    * @param actionQueue
+    */
+  class QueuedActionExecutionContext private[testingUtils] (val actionQueue: ActionQueue)
+    extends ExecutionContext {
 
-  class QueuedActionExecutionContext(queue: ActionQueue) extends ExecutionContext {
     var automaticExecution = false
 
     def toggleAutomaticExecution() = {
@@ -90,7 +102,7 @@ object TestingUtils {
       if(automaticExecution){
         runnable.run()
       }else {
-        queue.queueAction(runnable)
+        actionQueue.queueAction(runnable)
       }
     }
 
@@ -98,4 +110,26 @@ object TestingUtils {
       t.printStackTrace()
     }
   }
+
+  /** Queue which stores [[Runnable]] */
+  class ActionQueue {
+    private val runnables = scala.collection.mutable.Queue[Runnable]()
+
+    def triggerNextAction {
+      val r = runnables.dequeue
+      r.run()
+    }
+
+    def popNextAction: Runnable = {
+      runnables.dequeue()
+    }
+
+    def queueAction(r: Runnable) {
+      runnables.enqueue(r)
+    }
+
+    def isEmpty: Boolean = {
+      runnables.isEmpty
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index a0db4d6..e108970 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -30,8 +30,8 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.StreamingMode;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.junit.Assert;
 import org.slf4j.Logger;
@@ -148,7 +148,7 @@ public class TestBaseUtils {
 				}
 
 				Future<Iterable<Object>> bcVariableManagerFutureResponses = Futures.sequence(
-						bcVariableManagerResponseFutures, AkkaUtils.globalExecutionContext());
+						bcVariableManagerResponseFutures, TestingUtils.defaultExecutionContext());
 
 				Iterable<Object> responses = Await.result(bcVariableManagerFutureResponses, timeout);
 
@@ -158,7 +158,7 @@ public class TestBaseUtils {
 				}
 
 				Future<Iterable<Object>> numActiveConnectionsFutureResponses = Futures.sequence(
-						numActiveConnectionsResponseFutures, AkkaUtils.globalExecutionContext());
+						numActiveConnectionsResponseFutures, TestingUtils.defaultExecutionContext());
 
 				responses = Await.result(numActiveConnectionsFutureResponses, timeout);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 796ea09..0534178 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -80,16 +80,38 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration,
 
   override def startJobManager(actorSystem: ActorSystem): ActorRef = {
 
-    val (instanceManager, scheduler, libraryCacheManager, _, accumulatorManager,
-    executionRetries, delayBetweenRetries,
-    timeout, archiveCount) = JobManager.createJobManagerComponents(configuration)
+    val (executionContext,
+      instanceManager,
+      scheduler,
+      libraryCacheManager,
+      _,
+      accumulatorManager,
+      executionRetries,
+      delayBetweenRetries,
+      timeout,
+      archiveCount) = JobManager.createJobManagerComponents(configuration)
+
+    val testArchiveProps = Props(
+      new MemoryArchivist(
+        archiveCount)
+      with TestingMemoryArchivist)
 
-    val testArchiveProps = Props(new MemoryArchivist(archiveCount) with TestingMemoryArchivist)
     val archive = actorSystem.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME)
     
-    val jobManagerProps = Props(new JobManager(configuration, instanceManager, scheduler,
-      libraryCacheManager, archive, accumulatorManager, executionRetries,
-      delayBetweenRetries, timeout, streamingMode) with TestingJobManager)
+    val jobManagerProps = Props(
+      new JobManager(
+        configuration,
+        executionContext,
+        instanceManager,
+        scheduler,
+        libraryCacheManager,
+        archive,
+        accumulatorManager,
+        executionRetries,
+        delayBetweenRetries,
+        timeout,
+        streamingMode)
+      with TestingJobManager)
 
     val jobManager = actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index 1884fab..c20f621 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -110,9 +110,15 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
 
           jm ! RequestWorkingTaskManager(jobID)
 
-          val tm = expectMsgType[WorkingTaskManager].taskManager
-          // kill one task manager
-          tm ! PoisonPill
+          val gatewayOption = expectMsgType[WorkingTaskManager].gatewayOption
+
+          gatewayOption match {
+            case Some(gateway) =>
+              // kill one task manager
+              gateway.tell(PoisonPill)
+
+            case None => fail("Could not retrieve a working task manager.")
+          }
 
           val failure = expectMsgType[Failure]
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index 5dd197d..8cfeead 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -235,16 +235,34 @@ object ApplicationMaster {
 
     // start all the components inside the job manager
     LOG.debug("Starting JobManager components")
-    val (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager,
-                   executionRetries, delayBetweenRetries,
-                   timeout, _) = JobManager.createJobManagerComponents(configuration)
+    val (executionContext,
+      instanceManager,
+      scheduler,
+      libraryCacheManager,
+      archiveProps,
+      accumulatorManager,
+      executionRetries,
+      delayBetweenRetries,
+      timeout,
+      _) = JobManager.createJobManagerComponents(configuration)
 
     // start the archiver
     val archiver: ActorRef = jobManagerSystem.actorOf(archiveProps, JobManager.ARCHIVE_NAME)
 
-    val jobManagerProps = Props(new JobManager(configuration, instanceManager, scheduler,
-      libraryCacheManager, archiver, accumulatorManager, executionRetries,
-      delayBetweenRetries, timeout, streamingMode) with ApplicationMasterActor)
+    val jobManagerProps = Props(
+      new JobManager(
+        configuration,
+        executionContext,
+        instanceManager,
+        scheduler,
+        libraryCacheManager,
+        archiver,
+        accumulatorManager,
+        executionRetries,
+        delayBetweenRetries,
+        timeout,
+        streamingMode)
+      with ApplicationMasterActor)
 
     LOG.debug("Starting JobManager actor")
     val jobManager = JobManager.startActor(jobManagerProps, jobManagerSystem)

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
index 411808b..3fb5e30 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
@@ -103,7 +103,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
 
       instanceManager.getAllRegisteredInstances.asScala foreach {
         instance =>
-          instance.getTaskManager ! StopYarnSession(status, diag)
+          instance.getInstanceGateway.tell(StopYarnSession(status, diag))
       }
 
       rmClientOption foreach {