You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/13 18:54:09 UTC

[6/7] flink git commit: [FLINK-2329] [runtime] Introduces InstanceGateway as an abstraction to communicate with the TaskManager.

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index eb0aab4..9db330b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -24,18 +24,11 @@ import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
 
-import akka.actor.Actor;
-import akka.testkit.TestActorRef;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.Status;
-import akka.actor.UntypedActor;
-import akka.japi.Creator;
-import akka.testkit.JavaTestKit;
-
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.BaseTestingInstanceGateway;
+import org.apache.flink.runtime.instance.DummyInstanceGateway;
+import org.apache.flink.runtime.instance.InstanceGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.api.common.JobID;
@@ -46,26 +39,12 @@ import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
+import scala.concurrent.ExecutionContext;
 
 @SuppressWarnings("serial")
 public class ExecutionVertexCancelTest {
 
-	private static ActorSystem system;
-
-	@BeforeClass
-	public static void setup(){
-		system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
-	}
-
-	@AfterClass
-	public static void teardown(){
-		JavaTestKit.shutdownActorSystem(system);
-		system = null;
-	}
-
 	// --------------------------------------------------------------------------------------------
 	//  Canceling in different states
 	// --------------------------------------------------------------------------------------------
@@ -127,388 +106,350 @@ public class ExecutionVertexCancelTest {
 
 	@Test
 	public void testCancelConcurrentlyToDeploying_CallsNotOvertaking() {
-		new JavaTestKit(system){{
-			try {
-				final JobVertexID jid = new JobVertexID();
-				final ActionQueue actions = new ActionQueue();
+		try {
+			final JobVertexID jid = new JobVertexID();
 
-				TestingUtils.setExecutionContext(new TestingUtils.QueuedActionExecutionContext(
-						actions));
+			final TestingUtils.QueuedActionExecutionContext executionContext = TestingUtils.queuedActionExecutionContext();
+			final TestingUtils.ActionQueue actions = executionContext.actionQueue();
 
-				final ExecutionJobVertex ejv = getExecutionVertex(jid);
 
-				final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
-						AkkaUtils.getDefaultTimeout());
-				final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
+			final ExecutionJobVertex ejv = getExecutionVertex(
+				jid,
+				executionContext
+			);
 
-				setVertexState(vertex, ExecutionState.SCHEDULED);
-				assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+					AkkaUtils.getDefaultTimeout());
+			final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
 
-				ActorRef taskManager = TestActorRef.create(system, Props.create(new
-						CancelSequenceTaskManagerCreator(new TaskOperationResult(execId, true),
-						new TaskOperationResult(execId, false))));
+			setVertexState(vertex, ExecutionState.SCHEDULED);
+			assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
 
-				Instance instance = getInstance(taskManager);
-				SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+			InstanceGateway instanceGateway = new CancelSequenceInstanceGateway(
+					executionContext,
+					new TaskOperationResult(execId, true),
+					new TaskOperationResult(execId, false));
 
-				vertex.deployToSlot(slot);
+			Instance instance = getInstance(instanceGateway);
+			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
+			vertex.deployToSlot(slot);
 
-				assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
+			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
 
-				vertex.cancel();
+			vertex.cancel();
 
-				assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
+			assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
 
-				// first action happens (deploy)
-				actions.triggerNextAction();
-				assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
+			// first action happens (deploy)
+			actions.triggerNextAction();
+			assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
 
-				// the deploy call found itself in canceling after it returned and needs to send a cancel call
-				// the call did not yet execute, so it is still in canceling
-				assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
+			// the deploy call found itself in canceling after it returned and needs to send a cancel call
+			// the call did not yet execute, so it is still in canceling
+			assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
 
-				// second action happens (cancel call from cancel function)
-				actions.triggerNextAction();
+			// second action happens (cancel call from cancel function)
+			actions.triggerNextAction();
 
-				// TaskManager reports back (canceling done)
-				vertex.getCurrentExecutionAttempt().cancelingComplete();
+			// TaskManager reports back (canceling done)
+			vertex.getCurrentExecutionAttempt().cancelingComplete();
 
-				// should properly set state to cancelled
-				assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
+			// should properly set state to cancelled
+			assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
 
-				// trigger the correction canceling call
-				actions.triggerNextAction();
-				assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
+			// trigger the correction canceling call
+			actions.triggerNextAction();
+			assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
 
-				assertTrue(slot.isReleased());
+			assertTrue(slot.isReleased());
 
-				assertNull(vertex.getFailureCause());
+			assertNull(vertex.getFailureCause());
 
-				assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
-				assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
-				assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
-			}
-			catch(Exception e) {
-				e.printStackTrace();
-				fail(e.getMessage());
-			}
-			finally {
-				TestingUtils.setGlobalExecutionContext();
-			}
-		}};
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
 	}
 
 	@Test
 	public void testCancelConcurrentlyToDeploying_CallsOvertaking() {
-		new JavaTestKit(system){
-			{
-				try {
-					final JobVertexID jid = new JobVertexID();
-					final ActionQueue actions = new ActionQueue();
+		try {
+			final JobVertexID jid = new JobVertexID();
 
-					TestingUtils.setExecutionContext(new TestingUtils
-							.QueuedActionExecutionContext(actions));
+			final TestingUtils.QueuedActionExecutionContext executionContext = TestingUtils.queuedActionExecutionContext();
+			final TestingUtils.ActionQueue actions = executionContext.actionQueue();
 
-					final ExecutionJobVertex ejv = getExecutionVertex(jid);
+			final ExecutionJobVertex ejv = getExecutionVertex(jid, executionContext);
 
-					final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
-							AkkaUtils.getDefaultTimeout());
-					final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+					AkkaUtils.getDefaultTimeout());
+			final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
 
-					setVertexState(vertex, ExecutionState.SCHEDULED);
-					assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
+			setVertexState(vertex, ExecutionState.SCHEDULED);
+			assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
 
-					// task manager cancel sequence mock actor
-					// first return NOT SUCCESS (task not found, cancel call overtook deploy call), then success (cancel call after deploy call)
-					TestActorRef<? extends Actor> taskManager = TestActorRef.create(system, Props.create(new
-							CancelSequenceTaskManagerCreator(new
-							TaskOperationResult(execId, false), new TaskOperationResult(execId, true))));
+			// task manager cancel sequence mock actor
+			// first return NOT SUCCESS (task not found, cancel call overtook deploy call), then success (cancel call after deploy call)
+			InstanceGateway instanceGateway = new CancelSequenceInstanceGateway(
+					executionContext,
+					new	TaskOperationResult(execId, false),
+					new TaskOperationResult(execId, true)
+			);
 
-					Instance instance = getInstance(taskManager);
-					SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+			Instance instance = getInstance(instanceGateway);
+			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
-					vertex.deployToSlot(slot);
+			vertex.deployToSlot(slot);
 
-					assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
+			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
 
-					vertex.cancel();
+			vertex.cancel();
 
-					assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
+			assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
 
-					// first action happens (deploy)
-					Runnable deployAction = actions.popNextAction();
-					Runnable cancelAction = actions.popNextAction();
+			// first action happens (deploy)
+			Runnable deployAction = actions.popNextAction();
+			Runnable cancelAction = actions.popNextAction();
 
-					// cancel call first
-					cancelAction.run();
-					// process onComplete callback
-					actions.triggerNextAction();
+			// cancel call first
+			cancelAction.run();
+			// process onComplete callback
+			actions.triggerNextAction();
 
-					// did not find the task, not properly cancelled, stay in canceling
-					assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
+			// did not find the task, not properly cancelled, stay in canceling
+			assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
 
-					// deploy action next
-					deployAction.run();
+			// deploy action next
+			deployAction.run();
 
-					// the deploy call found itself in canceling after it returned and needs to send a cancel call
-					// the call did not yet execute, so it is still in canceling
-					assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
+			// the deploy call found itself in canceling after it returned and needs to send a cancel call
+			// the call did not yet execute, so it is still in canceling
+			assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
 
-					vertex.getCurrentExecutionAttempt().cancelingComplete();
+			vertex.getCurrentExecutionAttempt().cancelingComplete();
 
-					assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
+			assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
 
-					assertTrue(slot.isReleased());
+			assertTrue(slot.isReleased());
 
-					assertNull(vertex.getFailureCause());
+			assertNull(vertex.getFailureCause());
 
-					assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
-					assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
-					assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
-				} catch (Exception e) {
-					e.printStackTrace();
-					fail(e.getMessage());
-				}finally{
-					TestingUtils.setGlobalExecutionContext();
-				}
-			}
-		};
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
 	}
 
 	@Test
 	public void testCancelFromRunning() {
-		new JavaTestKit(system) {
-			{
-				try {
-					TestingUtils.setCallingThreadDispatcher(system);
-					final JobVertexID jid = new JobVertexID();
-					final ExecutionJobVertex ejv = getExecutionVertex(jid);
+		try {
+			final JobVertexID jid = new JobVertexID();
+			final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext());
 
-					final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
-							AkkaUtils.getDefaultTimeout());
-					final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+					AkkaUtils.getDefaultTimeout());
+			final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
 
-					final TestActorRef<? extends Actor> taskManager = TestActorRef.create(system,
-							Props.create(new CancelSequenceTaskManagerCreator(new
-									TaskOperationResult(execId, true))));
+			InstanceGateway instanceGateway = new CancelSequenceInstanceGateway(
+					TestingUtils.directExecutionContext(),
+					new TaskOperationResult(execId, true));
 
-					Instance instance = getInstance(taskManager);
-					SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+			Instance instance = getInstance(instanceGateway);
+			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
-					setVertexState(vertex, ExecutionState.RUNNING);
-					setVertexResource(vertex, slot);
+			setVertexState(vertex, ExecutionState.RUNNING);
+			setVertexResource(vertex, slot);
 
-					assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
+			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
 
-					vertex.cancel();
-					vertex.getCurrentExecutionAttempt().cancelingComplete(); // responce by task manager once actially canceled
+			vertex.cancel();
+			vertex.getCurrentExecutionAttempt().cancelingComplete(); // responce by task manager once actially canceled
 
-					assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
+			assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
 
-					assertTrue(slot.isReleased());
+			assertTrue(slot.isReleased());
 
-					assertNull(vertex.getFailureCause());
+			assertNull(vertex.getFailureCause());
 
-					assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
-					assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
-					assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
-				} catch (Exception e) {
-					e.printStackTrace();
-					fail(e.getMessage());
-				}finally{
-					TestingUtils.setGlobalExecutionContext();
-				}
-			}
-		};
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
 	}
 
 	@Test
 	public void testRepeatedCancelFromRunning() {
-		new JavaTestKit(system) {
-			{
-				try {
-					TestingUtils.setCallingThreadDispatcher(system);
+		try {
 
-					final JobVertexID jid = new JobVertexID();
-					final ExecutionJobVertex ejv = getExecutionVertex(jid);
+			final JobVertexID jid = new JobVertexID();
+			final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext());
 
-					final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
-							AkkaUtils.getDefaultTimeout());
-					final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+					AkkaUtils.getDefaultTimeout());
+			final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
 
-					final ActorRef taskManager = TestActorRef.create(system, Props.create(new
-							CancelSequenceTaskManagerCreator(new
-							TaskOperationResult(execId, true))));
+			final InstanceGateway instanceGateway = new CancelSequenceInstanceGateway(
+					TestingUtils.directExecutionContext(),
+					new TaskOperationResult(execId, true)
+			);
 
-					Instance instance = getInstance(taskManager);
-					SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+			Instance instance = getInstance(instanceGateway);
+			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
-					setVertexState(vertex, ExecutionState.RUNNING);
-					setVertexResource(vertex, slot);
+			setVertexState(vertex, ExecutionState.RUNNING);
+			setVertexResource(vertex, slot);
 
-					assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
+			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
 
-					vertex.cancel();
+			vertex.cancel();
 
-					assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
+			assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
 
-					vertex.cancel();
+			vertex.cancel();
 
-					assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
+			assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
 
-					// callback by TaskManager after canceling completes
-					vertex.getCurrentExecutionAttempt().cancelingComplete();
+			// callback by TaskManager after canceling completes
+			vertex.getCurrentExecutionAttempt().cancelingComplete();
 
-					assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
+			assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
 
-					assertTrue(slot.isReleased());
+			assertTrue(slot.isReleased());
 
-					assertNull(vertex.getFailureCause());
+			assertNull(vertex.getFailureCause());
 
-					assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
-					assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
-					assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
-				} catch (Exception e) {
-					e.printStackTrace();
-					fail(e.getMessage());
-				}finally{
-					TestingUtils.setGlobalExecutionContext();
-				}
-			}
-		};
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
 	}
 
 	@Test
 	public void testCancelFromRunningDidNotFindTask() {
 		// this may happen when the task finished or failed while the call was in progress
-		new JavaTestKit(system) {
-			{
-				try {
-					TestingUtils.setCallingThreadDispatcher(system);
-					final JobVertexID jid = new JobVertexID();
-					final ExecutionJobVertex ejv = getExecutionVertex(jid);
+		try {
+			final JobVertexID jid = new JobVertexID();
+			final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext());
 
-					final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
-							AkkaUtils.getDefaultTimeout());
-					final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+					AkkaUtils.getDefaultTimeout());
+			final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
 
-					final ActorRef taskManager = TestActorRef.create(system,Props.create(new
-							CancelSequenceTaskManagerCreator(new
-							TaskOperationResult(execId, false))));
 
-					Instance instance = getInstance(taskManager);
-					SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+			final InstanceGateway instanceGateway = new CancelSequenceInstanceGateway(
+					TestingUtils.directExecutionContext(),
+					new TaskOperationResult(execId, false)
+			);
 
-					setVertexState(vertex, ExecutionState.RUNNING);
-					setVertexResource(vertex, slot);
+			Instance instance = getInstance(instanceGateway);
+			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
-					assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
+			setVertexState(vertex, ExecutionState.RUNNING);
+			setVertexResource(vertex, slot);
 
-					vertex.cancel();
+			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
 
-					assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
+			vertex.cancel();
 
-					assertNull(vertex.getFailureCause());
+			assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
 
-					assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
-					assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
-				} catch (Exception e) {
-					e.printStackTrace();
-					fail(e.getMessage());
-				}finally{
-					TestingUtils.setGlobalExecutionContext();
-				}
-			}
-		};
+			assertNull(vertex.getFailureCause());
+
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
 	}
 
 	@Test
 	public void testCancelCallFails() {
-		new JavaTestKit(system) {
-			{
-				try {
-					TestingUtils.setCallingThreadDispatcher(system);
-					final JobVertexID jid = new JobVertexID();
-					final ExecutionJobVertex ejv = getExecutionVertex(jid);
+		try {
+			final JobVertexID jid = new JobVertexID();
+			final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext());
 
-					final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
-							AkkaUtils.getDefaultTimeout());
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+					AkkaUtils.getDefaultTimeout());
 
-					final ActorRef taskManager = TestActorRef.create(system, Props.create(new
-							CancelSequenceTaskManagerCreator()));
+			final InstanceGateway gateway = new CancelSequenceInstanceGateway(TestingUtils.directExecutionContext());
 
-					Instance instance = getInstance(taskManager);
-					SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+			Instance instance = getInstance(gateway);
+			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
-					setVertexState(vertex, ExecutionState.RUNNING);
-					setVertexResource(vertex, slot);
+			setVertexState(vertex, ExecutionState.RUNNING);
+			setVertexResource(vertex, slot);
 
-					assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
+			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
 
-					vertex.cancel();
+			vertex.cancel();
 
-					assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
+			assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
 
-					assertTrue(slot.isReleased());
+			assertTrue(slot.isReleased());
 
-					assertNotNull(vertex.getFailureCause());
+			assertNotNull(vertex.getFailureCause());
 
-					assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
-					assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
-					assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
-				} catch (Exception e) {
-					e.printStackTrace();
-					fail(e.getMessage());
-				}finally{
-					TestingUtils.setGlobalExecutionContext();
-				}
-			}
-		};
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
 	}
 
 	@Test
 	public void testSendCancelAndReceiveFail() {
-		new JavaTestKit(system) {
-			{
-				try {
-					final JobVertexID jid = new JobVertexID();
-					final ExecutionJobVertex ejv = getExecutionVertex(jid);
+		try {
+			final JobVertexID jid = new JobVertexID();
+			final ExecutionJobVertex ejv = getExecutionVertex(jid);
 
-					final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
-							AkkaUtils.getDefaultTimeout());
-					final ExecutionAttemptID execID = vertex.getCurrentExecutionAttempt().getAttemptId();
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+					AkkaUtils.getDefaultTimeout());
+			final ExecutionAttemptID execID = vertex.getCurrentExecutionAttempt().getAttemptId();
 
-					final ActorRef taskManager = system.actorOf(
-							Props.create(new CancelSequenceTaskManagerCreator(
-									new TaskOperationResult(execID, true)
-							)));
+			final InstanceGateway gateway = new CancelSequenceInstanceGateway(
+					TestingUtils.defaultExecutionContext(),
+					new TaskOperationResult(execID, true));
 
-					Instance instance = getInstance(taskManager);
-					SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+			Instance instance = getInstance(gateway);
+			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
-					setVertexState(vertex, ExecutionState.RUNNING);
-					setVertexResource(vertex, slot);
+			setVertexState(vertex, ExecutionState.RUNNING);
+			setVertexResource(vertex, slot);
 
-					assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
+			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
 
-					vertex.cancel();
+			vertex.cancel();
 
-					assertTrue(vertex.getExecutionState() == ExecutionState.CANCELING || vertex.getExecutionState() == ExecutionState.FAILED);
+			assertTrue(vertex.getExecutionState() == ExecutionState.CANCELING || vertex.getExecutionState() == ExecutionState.FAILED);
 
-					vertex.getCurrentExecutionAttempt().markFailed(new Throwable("test"));
+			vertex.getCurrentExecutionAttempt().markFailed(new Throwable("test"));
 
-					assertTrue(vertex.getExecutionState() == ExecutionState.CANCELED || vertex.getExecutionState() == ExecutionState.FAILED);
+			assertTrue(vertex.getExecutionState() == ExecutionState.CANCELED || vertex.getExecutionState() == ExecutionState.FAILED);
 
-					assertTrue(slot.isReleased());
+			assertTrue(slot.isReleased());
 
-					assertEquals(0, vertex.getExecutionGraph().getRegisteredExecutions().size());
-				} catch (Exception e) {
-					e.printStackTrace();
-					fail(e.getMessage());
-				}
-			}
-		};
+			assertEquals(0, vertex.getExecutionGraph().getRegisteredExecutions().size());
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -541,7 +482,7 @@ public class ExecutionVertexCancelTest {
 			// deploying after canceling from CREATED needs to raise an exception, because
 			// the scheduler (or any caller) needs to know that the slot should be released
 			try {
-				Instance instance = getInstance(ActorRef.noSender());
+				Instance instance = getInstance(DummyInstanceGateway.INSTANCE);
 				SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 				vertex.deployToSlot(slot);
@@ -584,7 +525,7 @@ public class ExecutionVertexCancelTest {
 						AkkaUtils.getDefaultTimeout());
 				setVertexState(vertex, ExecutionState.CANCELING);
 
-				Instance instance = getInstance(ActorRef.noSender());
+				Instance instance = getInstance(DummyInstanceGateway.INSTANCE);
 				SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 				vertex.deployToSlot(slot);
@@ -600,7 +541,7 @@ public class ExecutionVertexCancelTest {
 				ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 						AkkaUtils.getDefaultTimeout());
 
-				Instance instance = getInstance(ActorRef.noSender());
+				Instance instance = getInstance(DummyInstanceGateway.INSTANCE);
 				SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 				setVertexResource(vertex, slot);
@@ -621,39 +562,33 @@ public class ExecutionVertexCancelTest {
 		}
 	}
 
-	public static class CancelSequenceTaskManagerCreator implements Creator<CancelSequenceTaskManager> {
-		private final TaskOperationResult[] results;
-		public CancelSequenceTaskManagerCreator(TaskOperationResult ... results){
-			this.results = results;
-		}
-
-		@Override
-		public CancelSequenceTaskManager create() throws Exception {
-			return new CancelSequenceTaskManager(results);
-		}
-	}
-
-	public static class CancelSequenceTaskManager extends UntypedActor{
+	public static class CancelSequenceInstanceGateway extends BaseTestingInstanceGateway {
 		private final TaskOperationResult[] results;
-		private int index;
+		private int index = -1;
 
-		public CancelSequenceTaskManager(TaskOperationResult[] results){
-			this.results = results;
-			index = -1;
+		public CancelSequenceInstanceGateway(ExecutionContext executionContext, TaskOperationResult ... result) {
+			super(executionContext);
+			this.results = result;
 		}
 
 		@Override
-		public void onReceive(Object message) throws Exception {
-			if(message instanceof TaskMessages.SubmitTask){
-				getSender().tell(Messages.getAcknowledge(), getSelf());
-			}else if(message instanceof TaskMessages.CancelTask){
+		public Object handleMessage(Object message) throws Exception {
+			Object result;
+			if(message instanceof TaskMessages.SubmitTask) {
+				result = Messages.getAcknowledge();
+			} else if(message instanceof TaskMessages.CancelTask) {
 				index++;
+
 				if(index >= results.length){
-					getSender().tell(new Status.Failure(new IOException("RPC call failed.")), getSelf());
-				}else {
-					getSender().tell(results[index], getSelf());
+					throw new IOException("RPC call failed.");
+				} else {
+					result = results[index];
 				}
+			} else {
+				result = null;
 			}
+
+			return result;
 		}
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index eadf328..431c3a9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -22,13 +22,6 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
 
 import static org.junit.Assert.*;
 
-import akka.actor.Actor;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
-
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
@@ -37,37 +30,20 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class ExecutionVertexDeploymentTest {
 
-	private static ActorSystem system;
-
-	@BeforeClass
-	public static void setup(){
-		system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
-	}
-
-	@AfterClass
-	public static void teardown(){
-		JavaTestKit.shutdownActorSystem(system);
-	}
-
 	@Test
 	public void testDeployCall() {
 		try {
 			final JobVertexID jid = new JobVertexID();
 
-			TestingUtils.setCallingThreadDispatcher(system);
-			ActorRef tm = TestActorRef.create(system, Props.create(SimpleAcknowledgingTaskManager
-					.class));
-
 			final ExecutionJobVertex ejv = getExecutionVertex(jid);
 
 			// mock taskmanager to simply accept the call
-			Instance instance = getInstance(tm);
+			Instance instance = getInstance(
+					new SimpleInstanceGateway(TestingUtils.directExecutionContext()));
 			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
@@ -95,24 +71,17 @@ public class ExecutionVertexDeploymentTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			TestingUtils.setGlobalExecutionContext();
-		}
 	}
 
 	@Test
 	public void testDeployWithSynchronousAnswer() {
 		try {
-			TestingUtils.setCallingThreadDispatcher(system);
-
 			final JobVertexID jid = new JobVertexID();
 
-			final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system,
-					Props.create(SimpleAcknowledgingTaskManager.class));
-
-			final ExecutionJobVertex ejv = getExecutionVertex(jid);
+			final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext());
 
-			final Instance instance = getInstance(simpleTaskManager);
+			final Instance instance = getInstance(
+					new SimpleInstanceGateway(TestingUtils.directExecutionContext()));
 			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
@@ -143,9 +112,6 @@ public class ExecutionVertexDeploymentTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			TestingUtils.setGlobalExecutionContext();
-		}
 	}
 
 	@Test
@@ -157,10 +123,8 @@ public class ExecutionVertexDeploymentTest {
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 					AkkaUtils.getDefaultTimeout());
 
-			final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system,
-					Props.create(SimpleAcknowledgingTaskManager.class));
-
-			final Instance instance = getInstance(simpleTaskManager);
+			final Instance instance = getInstance(
+					new SimpleInstanceGateway(TestingUtils.defaultExecutionContext()));
 			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
@@ -200,18 +164,14 @@ public class ExecutionVertexDeploymentTest {
 	@Test
 	public void testDeployFailedSynchronous() {
 		try {
-			TestingUtils.setCallingThreadDispatcher(system);
-
 			final JobVertexID jid = new JobVertexID();
-			final ExecutionJobVertex ejv = getExecutionVertex(jid);
+			final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext());
 
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 					AkkaUtils.getDefaultTimeout());
 
-			final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system,
-					Props.create(SimpleFailingTaskManager.class));
-
-			final Instance instance = getInstance(simpleTaskManager);
+			final Instance instance = getInstance(
+					new SimpleFailingInstanceGateway(TestingUtils.directExecutionContext()));
 			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
@@ -230,9 +190,6 @@ public class ExecutionVertexDeploymentTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			TestingUtils.setGlobalExecutionContext();
-		}
 	}
 
 	@Test
@@ -243,10 +200,8 @@ public class ExecutionVertexDeploymentTest {
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 					AkkaUtils.getDefaultTimeout());
 
-			final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system,
-					Props.create(SimpleFailingTaskManager.class));
-
-			final Instance instance = getInstance(simpleTaskManager);
+			final Instance instance = getInstance(
+					new SimpleFailingInstanceGateway(TestingUtils.directExecutionContext()));
 			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
@@ -280,20 +235,16 @@ public class ExecutionVertexDeploymentTest {
 	public void testFailExternallyDuringDeploy() {
 		try {
 			final JobVertexID jid = new JobVertexID();
-			final ExecutionJobVertex ejv = getExecutionVertex(jid);
 
-			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
-					AkkaUtils.getDefaultTimeout());
+			final TestingUtils.QueuedActionExecutionContext ec = TestingUtils.queuedActionExecutionContext();
+			final TestingUtils.ActionQueue queue = ec.actionQueue();
 
-			final ActionQueue queue = new ActionQueue();
-			final TestingUtils.QueuedActionExecutionContext ec = new TestingUtils
-					.QueuedActionExecutionContext(queue);
+			final ExecutionJobVertex ejv = getExecutionVertex(jid, ec);
 
-			TestingUtils.setExecutionContext(ec);
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+					AkkaUtils.getDefaultTimeout());
 
-			final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system,
-					Props.create(SimpleAcknowledgingTaskManager.class));
-			final Instance instance = getInstance(simpleTaskManager);
+			final Instance instance = getInstance(new SimpleInstanceGateway(TestingUtils.directExecutionContext()));
 			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
@@ -307,6 +258,7 @@ public class ExecutionVertexDeploymentTest {
 			assertEquals(testError, vertex.getFailureCause());
 
 			queue.triggerNextAction();
+			queue.triggerNextAction();
 
 			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
 			assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
@@ -316,34 +268,29 @@ public class ExecutionVertexDeploymentTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			TestingUtils.setGlobalExecutionContext();
-		}
 	}
 
 	@Test
 	public void testFailCallOvertakesDeploymentAnswer() {
 
 		try {
-			ActionQueue queue = new ActionQueue();
-			TestingUtils.QueuedActionExecutionContext context = new TestingUtils
-					.QueuedActionExecutionContext(queue);
-
-			TestingUtils.setExecutionContext(context);
+			final TestingUtils.QueuedActionExecutionContext context = TestingUtils.queuedActionExecutionContext();
+			final TestingUtils.ActionQueue queue = context.actionQueue();
 
 			final JobVertexID jid = new JobVertexID();
 
-			final ExecutionJobVertex ejv = getExecutionVertex(jid);
+			final ExecutionJobVertex ejv = getExecutionVertex(jid, context);
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 					AkkaUtils.getDefaultTimeout());
 
 			final ExecutionAttemptID eid = vertex.getCurrentExecutionAttempt().getAttemptId();
 
-			final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system, Props.create(new
-					ExecutionVertexCancelTest.CancelSequenceTaskManagerCreator(new
-					TaskOperationResult(eid, false), new TaskOperationResult(eid, true))));
+			final Instance instance = getInstance(
+					new ExecutionVertexCancelTest.CancelSequenceInstanceGateway(
+							context,
+							new TaskOperationResult(eid, false),
+							new TaskOperationResult(eid, true)));
 
-			final Instance instance = getInstance(simpleTaskManager);
 			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
@@ -361,11 +308,14 @@ public class ExecutionVertexDeploymentTest {
 			Runnable cancel1 = queue.popNextAction();
 
 			cancel1.run();
-			// execute onComplete callback
+			// execute onComplete callback of cancel
 			queue.triggerNextAction();
 
 			deploy.run();
 
+			// execute onComplete callback of deploy
+			queue.triggerNextAction();
+
 			assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
 
 			assertEquals(testError, vertex.getFailureCause());
@@ -380,8 +330,5 @@ public class ExecutionVertexDeploymentTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			TestingUtils.setGlobalExecutionContext();
-		}
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 1e9c30b..8ea7017 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -23,14 +23,9 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ge
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
-
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.DummyInstanceGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -38,28 +33,12 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.mockito.Matchers;
 
 public class ExecutionVertexSchedulingTest {
 
-	private static ActorSystem system;
-
-	@BeforeClass
-	public static void setup(){
-		system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
-	}
-
-	@AfterClass
-	public static void teardown(){
-		JavaTestKit.shutdownActorSystem(system);
-		system = null;
-	}
-
 	@Test
 	public void testSlotReleasedWhenScheduledImmediately() {
 		try {
@@ -68,7 +47,7 @@ public class ExecutionVertexSchedulingTest {
 					AkkaUtils.getDefaultTimeout());
 
 			// a slot than cannot be deployed to
-			final Instance instance = getInstance(ActorRef.noSender());
+			final Instance instance = getInstance(DummyInstanceGateway.INSTANCE);
 			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 			
 			slot.releaseSlot();
@@ -98,7 +77,7 @@ public class ExecutionVertexSchedulingTest {
 					AkkaUtils.getDefaultTimeout());
 
 			// a slot than cannot be deployed to
-			final Instance instance = getInstance(ActorRef.noSender());
+			final Instance instance = getInstance(DummyInstanceGateway.INSTANCE);
 			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 
 			slot.releaseSlot();
@@ -134,11 +113,7 @@ public class ExecutionVertexSchedulingTest {
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 					AkkaUtils.getDefaultTimeout());
 
-			TestingUtils.setCallingThreadDispatcher(system);
-			ActorRef tm = TestActorRef.create(system, Props.create(ExecutionGraphTestUtils
-					.SimpleAcknowledgingTaskManager.class));
-
-			final Instance instance = getInstance(tm);
+			final Instance instance = getInstance(new ExecutionGraphTestUtils.SimpleInstanceGateway(TestingUtils.defaultExecutionContext()));
 			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 
 			Scheduler scheduler = mock(Scheduler.class);
@@ -153,8 +128,6 @@ public class ExecutionVertexSchedulingTest {
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
-		}finally{
-			TestingUtils.setGlobalExecutionContext();
 		}
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
index 0d2ffeb..b4a7e63 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
@@ -39,39 +39,14 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import scala.concurrent.duration.FiniteDuration;
-import akka.actor.Actor;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
 
 public class LocalInputSplitsTest {
 	
 	private static final FiniteDuration TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS);
 	
-	private static ActorSystem system;
-	
-	private static TestActorRef<? extends Actor> taskManager;
-	
-	
-	@BeforeClass
-	public static void setup() {
-		system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
-		taskManager = TestActorRef.create(system,
-				Props.create(ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager.class));
-	}
-
-	@AfterClass
-	public static void teardown() {
-		JavaTestKit.shutdownActorSystem(system);
-		system = null;
-	}
-	
 	// --------------------------------------------------------------------------------------------
 	
 	@Test
@@ -290,14 +265,18 @@ public class LocalInputSplitsTest {
 			
 			JobGraph jobGraph = new JobGraph("test job", vertex);
 			
-			ExecutionGraph eg = new ExecutionGraph(jobGraph.getJobID(),
-					jobGraph.getName(), jobGraph.getJobConfiguration(), TIMEOUT);
+			ExecutionGraph eg = new ExecutionGraph(
+					TestingUtils.defaultExecutionContext(),
+					jobGraph.getJobID(),
+					jobGraph.getName(),
+					jobGraph.getJobConfiguration(),
+					TIMEOUT);
 			
 			eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 			eg.setQueuedSchedulingAllowed(false);
 			
 			// create a scheduler with 6 instances where always two are on the same host
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 			Instance i1 = getInstance(new byte[] {10,0,1,1}, 12345, "host1", 1);
 			Instance i2 = getInstance(new byte[] {10,0,1,1}, 12346, "host1", 1);
 			Instance i3 = getInstance(new byte[] {10,0,1,2}, 12345, "host2", 1);
@@ -349,8 +328,12 @@ public class LocalInputSplitsTest {
 		
 		JobGraph jobGraph = new JobGraph("test job", vertex);
 		
-		ExecutionGraph eg = new ExecutionGraph(jobGraph.getJobID(),
-				jobGraph.getName(), jobGraph.getJobConfiguration(), TIMEOUT);
+		ExecutionGraph eg = new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(),
+				jobGraph.getJobID(),
+				jobGraph.getName(),
+				jobGraph.getJobConfiguration(),
+				TIMEOUT);
 		eg.setQueuedSchedulingAllowed(false);
 		
 		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
@@ -370,7 +353,7 @@ public class LocalInputSplitsTest {
 	}
 	
 	private static Scheduler getScheduler(int numInstances, int numSlotsPerInstance) throws Exception {
-		Scheduler scheduler = new Scheduler();
+		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 		
 		for (int i = 0; i < numInstances; i++) {
 			byte[] ipAddress = new byte[] { 10, 0, 1, (byte) (1 + i) };
@@ -393,7 +376,13 @@ public class LocalInputSplitsTest {
 		when(connection.getHostname()).thenReturn(hostname);
 		when(connection.getFQDNHostname()).thenReturn(hostname);
 		
-		return new Instance(taskManager, connection, new InstanceID(), hardwareDescription, slots);
+		return new Instance(
+				new ExecutionGraphTestUtils.SimpleInstanceGateway(
+						TestingUtils.defaultExecutionContext()),
+				connection,
+				new InstanceID(),
+				hardwareDescription,
+				slots);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index 4677bf8..3cedb63 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -56,7 +57,12 @@ public class PointwisePatternTest {
 	
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
-		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
+		ExecutionGraph eg = new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(),
+				jobId,
+				jobName,
+				cfg,
+				AkkaUtils.getDefaultTimeout());
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -91,7 +97,12 @@ public class PointwisePatternTest {
 	
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
-		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
+		ExecutionGraph eg = new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(),
+				jobId,
+				jobName,
+				cfg,
+				AkkaUtils.getDefaultTimeout());
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -127,7 +138,12 @@ public class PointwisePatternTest {
 	
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
-		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
+		ExecutionGraph eg = new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(),
+				jobId,
+				jobName,
+				cfg,
+				AkkaUtils.getDefaultTimeout());
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -164,7 +180,12 @@ public class PointwisePatternTest {
 	
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
-		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
+		ExecutionGraph eg = new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(),
+				jobId,
+				jobName,
+				cfg,
+				AkkaUtils.getDefaultTimeout());
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -199,7 +220,12 @@ public class PointwisePatternTest {
 	
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
-		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
+		ExecutionGraph eg = new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(),
+				jobId,
+				jobName,
+				cfg,
+				AkkaUtils.getDefaultTimeout());
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -254,7 +280,12 @@ public class PointwisePatternTest {
 	
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
-		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
+		ExecutionGraph eg = new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(),
+				jobId,
+				jobName,
+				cfg,
+				AkkaUtils.getDefaultTimeout());
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -300,7 +331,12 @@ public class PointwisePatternTest {
 	
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
-		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
+		ExecutionGraph eg = new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(),
+				jobId,
+				jobName,
+				cfg,
+				AkkaUtils.getDefaultTimeout());
 		try {
 			eg.attachJobGraph(ordered);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index 376ff14..b779d79 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import akka.actor.ActorRef;
-
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.DummyInstanceGateway;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
@@ -34,6 +33,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
 
 import scala.concurrent.duration.FiniteDuration;
@@ -78,7 +78,7 @@ public class TerminalStateDeadlockTest {
 			InstanceConnectionInfo ci = new InstanceConnectionInfo(address, 12345);
 				
 			HardwareDescription resources = new HardwareDescription(4, 4000000, 3000000, 2000000);
-			Instance instance = new Instance(ActorRef.noSender(), ci, new InstanceID(), resources, 4);
+			Instance instance = new Instance(DummyInstanceGateway.INSTANCE, ci, new InstanceID(), resources, 4);
 
 			this.resource = instance.allocateSimpleSlot(new JobID());
 		}
@@ -116,7 +116,7 @@ public class TerminalStateDeadlockTest {
 				vertices = Arrays.asList(v1, v2);
 			}
 			
-			final Scheduler scheduler = new Scheduler();
+			final Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 			
 			final Executor executor = Executors.newFixedThreadPool(4);
 			
@@ -181,7 +181,7 @@ public class TerminalStateDeadlockTest {
 		private volatile boolean done;
 
 		TestExecGraph(JobID jobId) {
-			super(jobId, "test graph", EMPTY_CONFIG, TIMEOUT);
+			super(TestingUtils.defaultExecutionContext(), jobId, "test graph", EMPTY_CONFIG, TIMEOUT);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
index 756b9a4..3305254 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
@@ -26,6 +26,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.flink.runtime.instance.DummyInstanceGateway;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
@@ -39,42 +40,14 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import scala.concurrent.duration.FiniteDuration;
-import akka.actor.Actor;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
 
 public class VertexLocationConstraintTest {
 
 	private static final FiniteDuration timeout = new FiniteDuration(100, TimeUnit.SECONDS);
 	
-	private static ActorSystem system;
-	
-	private static TestActorRef<? extends Actor> taskManager;
-	
-	
-	@BeforeClass
-	public static void setup() {
-		system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
-		
-		taskManager = TestActorRef.create(system,
-				Props.create(ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager.class));
-	}
-
-	@AfterClass
-	public static void teardown() {
-		JavaTestKit.shutdownActorSystem(system);
-		system = null;
-	}
-	
-	
 	@Test
 	public void testScheduleWithConstraint1() {
 		try {
@@ -91,7 +64,7 @@ public class VertexLocationConstraintTest {
 			Instance instance2 = getInstance(address2, 6789, hostname2);
 			Instance instance3 = getInstance(address3, 6789, hostname3);
 			
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 			scheduler.newInstanceAvailable(instance1);
 			scheduler.newInstanceAvailable(instance2);
 			scheduler.newInstanceAvailable(instance3);
@@ -102,7 +75,12 @@ public class VertexLocationConstraintTest {
 			jobVertex.setParallelism(2);
 			JobGraph jg = new JobGraph("test job", jobVertex);
 			
-			ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(), timeout);
+			ExecutionGraph eg = new ExecutionGraph(
+					TestingUtils.defaultExecutionContext(),
+					jg.getJobID(),
+					jg.getName(),
+					jg.getJobConfiguration(),
+					timeout);
 			eg.attachJobGraph(Collections.singletonList(jobVertex));
 			
 			ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex.getID());
@@ -157,7 +135,7 @@ public class VertexLocationConstraintTest {
 			Instance instance2 = getInstance(address2, 6789, hostname2);
 			Instance instance3 = getInstance(address3, 6789, hostname3);
 			
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 			scheduler.newInstanceAvailable(instance1);
 			scheduler.newInstanceAvailable(instance2);
 			scheduler.newInstanceAvailable(instance3);
@@ -168,7 +146,12 @@ public class VertexLocationConstraintTest {
 			jobVertex.setParallelism(2);
 			JobGraph jg = new JobGraph("test job", jobVertex);
 			
-			ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(), timeout);
+			ExecutionGraph eg = new ExecutionGraph(
+					TestingUtils.defaultExecutionContext(),
+					jg.getJobID(),
+					jg.getName(),
+					jg.getJobConfiguration(),
+					timeout);
 			eg.attachJobGraph(Collections.singletonList(jobVertex));
 			
 			ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex.getID());
@@ -219,7 +202,7 @@ public class VertexLocationConstraintTest {
 			Instance instance2 = getInstance(address2, 6789, hostname2);
 			Instance instance3 = getInstance(address3, 6789, hostname3);
 			
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 			scheduler.newInstanceAvailable(instance1);
 			scheduler.newInstanceAvailable(instance2);
 			scheduler.newInstanceAvailable(instance3);
@@ -238,7 +221,12 @@ public class VertexLocationConstraintTest {
 			
 			JobGraph jg = new JobGraph("test job", jobVertex1, jobVertex2);
 			
-			ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(), timeout);
+			ExecutionGraph eg = new ExecutionGraph(
+					TestingUtils.defaultExecutionContext(),
+					jg.getJobID(),
+					jg.getName(),
+					jg.getJobConfiguration(),
+					timeout);
 			eg.attachJobGraph(Arrays.asList(jobVertex1, jobVertex2));
 			
 			ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex1.getID());
@@ -290,7 +278,7 @@ public class VertexLocationConstraintTest {
 			Instance instance1 = getInstance(address1, 6789, hostname1);
 			Instance instance2 = getInstance(address2, 6789, hostname2);
 			
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 			scheduler.newInstanceAvailable(instance1);
 			
 			// prepare the execution graph
@@ -299,7 +287,12 @@ public class VertexLocationConstraintTest {
 			jobVertex.setParallelism(1);
 			JobGraph jg = new JobGraph("test job", jobVertex);
 			
-			ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(), timeout);
+			ExecutionGraph eg = new ExecutionGraph(
+					TestingUtils.defaultExecutionContext(),
+					jg.getJobID(),
+					jg.getName(),
+					jg.getJobConfiguration(),
+					timeout);
 			eg.attachJobGraph(Collections.singletonList(jobVertex));
 			
 			ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex.getID());
@@ -343,7 +336,7 @@ public class VertexLocationConstraintTest {
 			Instance instance1 = getInstance(address1, 6789, hostname1);
 			Instance instance2 = getInstance(address2, 6789, hostname2);
 			
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 			scheduler.newInstanceAvailable(instance1);
 			
 			// prepare the execution graph
@@ -362,7 +355,12 @@ public class VertexLocationConstraintTest {
 			jobVertex1.setSlotSharingGroup(sharingGroup);
 			jobVertex2.setSlotSharingGroup(sharingGroup);
 			
-			ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(), timeout);
+			ExecutionGraph eg = new ExecutionGraph(
+					TestingUtils.defaultExecutionContext(),
+					jg.getJobID(),
+					jg.getName(),
+					jg.getJobConfiguration(),
+					timeout);
 			eg.attachJobGraph(Arrays.asList(jobVertex1, jobVertex2));
 			
 			ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex1.getID());
@@ -395,12 +393,17 @@ public class VertexLocationConstraintTest {
 			JobVertex vertex = new JobVertex("test vertex", new JobVertexID());
 			JobGraph jg = new JobGraph("test job", vertex);
 			
-			ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(), timeout);
+			ExecutionGraph eg = new ExecutionGraph(
+					TestingUtils.defaultExecutionContext(),
+					jg.getJobID(),
+					jg.getName(),
+					jg.getJobConfiguration(),
+					timeout);
 			eg.attachJobGraph(Collections.singletonList(vertex));
 			
 			ExecutionVertex ev = eg.getAllVertices().get(vertex.getID()).getTaskVertices()[0];
 			
-			Instance instance = ExecutionGraphTestUtils.getInstance(ActorRef.noSender());
+			Instance instance = ExecutionGraphTestUtils.getInstance(DummyInstanceGateway.INSTANCE);
 			ev.setLocationConstraintHosts(Collections.singletonList(instance));
 			
 			assertNotNull(ev.getPreferredLocations());
@@ -431,6 +434,12 @@ public class VertexLocationConstraintTest {
 		when(connection.getHostname()).thenReturn(hostname);
 		when(connection.getFQDNHostname()).thenReturn(hostname);
 		
-		return new Instance(taskManager, connection, new InstanceID(), hardwareDescription, 1);
+		return new Instance(
+				new ExecutionGraphTestUtils.SimpleInstanceGateway(
+						TestingUtils.defaultExecutionContext()),
+				connection,
+				new InstanceID(),
+				hardwareDescription,
+				1);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
index a1d6d03..d9e422c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
 
 public class VertexSlotSharingTest {
@@ -68,7 +69,11 @@ public class VertexSlotSharingTest {
 			
 			List<JobVertex> vertices = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 			
-			ExecutionGraph eg = new ExecutionGraph(new JobID(), "test job", new Configuration(),
+			ExecutionGraph eg = new ExecutionGraph(
+					TestingUtils.defaultExecutionContext(),
+					new JobID(),
+					"test job",
+					new Configuration(),
 					AkkaUtils.getDefaultTimeout());
 			eg.attachJobGraph(vertices);
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java
new file mode 100644
index 0000000..e9f8259
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import akka.actor.ActorPath;
+import akka.actor.ActorRef;
+import akka.dispatch.Futures;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract base class for testing {@link InstanceGateway} instances. The implementing subclass
+ * only has to provide an implementation for handleMessage which contains the logic to treat
+ * different messages.
+ */
+abstract public class BaseTestingInstanceGateway implements InstanceGateway {
+	/**
+	 * {@link ExecutionContext} which is used to execute the futures.
+	 */
+	private final ExecutionContext executionContext;
+
+	public BaseTestingInstanceGateway(ExecutionContext executionContext) {
+		this.executionContext = executionContext;
+	}
+
+	@Override
+	public Future<Object> ask(Object message, FiniteDuration timeout) {
+		try {
+			final Object result = handleMessage(message);
+
+			return Futures.future(new Callable<Object>() {
+				@Override
+				public Object call() throws Exception {
+					return result;
+				}
+			}, executionContext);
+
+		} catch (final Exception e) {
+			// if an exception occurred in the handleMessage method then return it as part of the future
+			return Futures.future(new Callable<Object>() {
+				@Override
+				public Object call() throws Exception {
+					throw e;
+				}
+			}, executionContext);
+		}
+	}
+
+	/**
+	 * Handles the supported messages by this InstanceGateway
+	 *
+	 * @param message Message to handle
+	 * @return Result
+	 * @throws Exception
+	 */
+	abstract public Object handleMessage(Object message) throws Exception;
+
+	@Override
+	public void tell(Object message) {}
+
+	@Override
+	public void forward(Object message, ActorRef sender) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) {
+		return ask(message, timeout);
+	}
+
+	@Override
+	public String path() {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java
new file mode 100644
index 0000000..5941201
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import akka.actor.ActorPath;
+import akka.actor.ActorRef;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Dummy {@link InstanceGateway} implementation used for testing.
+ */
+public class DummyInstanceGateway implements InstanceGateway {
+	public static final DummyInstanceGateway INSTANCE = new DummyInstanceGateway();
+
+	@Override
+	public Future<Object> ask(Object message, FiniteDuration timeout) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void tell(Object message) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void forward(Object message, ActorRef sender) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public String path() {
+		return "DummyInstanceGateway";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
index 595ac7e..c075a17 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.*;
 import java.lang.reflect.Method;
 import java.net.InetAddress;
 
-import akka.actor.ActorRef;
 import org.apache.flink.api.common.JobID;
 import org.junit.Test;
 
@@ -39,7 +38,7 @@ public class InstanceTest {
 			InetAddress address = InetAddress.getByName("127.0.0.1");
 			InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
 
-			Instance instance = new Instance(ActorRef.noSender(), connection, new InstanceID(), hardwareDescription, 4);
+			Instance instance = new Instance(DummyInstanceGateway.INSTANCE, connection, new InstanceID(), hardwareDescription, 4);
 
 			assertEquals(4, instance.getTotalNumberOfSlots());
 			assertEquals(4, instance.getNumberOfAvailableSlots());
@@ -100,7 +99,7 @@ public class InstanceTest {
 			InetAddress address = InetAddress.getByName("127.0.0.1");
 			InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
 
-			Instance instance = new Instance(ActorRef.noSender(), connection, new InstanceID(), hardwareDescription, 3);
+			Instance instance = new Instance(DummyInstanceGateway.INSTANCE, connection, new InstanceID(), hardwareDescription, 3);
 
 			assertEquals(3, instance.getNumberOfAvailableSlots());
 
@@ -130,7 +129,7 @@ public class InstanceTest {
 			InetAddress address = InetAddress.getByName("127.0.0.1");
 			InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
 
-			Instance instance = new Instance(ActorRef.noSender(), connection, new InstanceID(), hardwareDescription, 3);
+			Instance instance = new Instance(DummyInstanceGateway.INSTANCE, connection, new InstanceID(), hardwareDescription, 3);
 
 			assertEquals(3, instance.getNumberOfAvailableSlots());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
index e075ab6..29ec7b5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
@@ -23,8 +23,6 @@ import static org.junit.Assert.*;
 
 import java.net.InetAddress;
 
-import akka.actor.ActorRef;
-
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.api.common.JobID;
 
@@ -148,7 +146,7 @@ public class SimpleSlotTest {
 		InetAddress address = InetAddress.getByName("127.0.0.1");
 		InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
 
-		Instance instance = new Instance(ActorRef.noSender(), connection, new InstanceID(), hardwareDescription, 1);
+		Instance instance = new Instance(DummyInstanceGateway.INSTANCE, connection, new InstanceID(), hardwareDescription, 1);
 		return instance.allocateSimpleSlot(new JobID());
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index 7739dea..6affdcc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
 import org.mockito.Mockito;
 import scala.Some;
@@ -58,7 +59,10 @@ public class NetworkEnvironmentTest {
 					NUM_BUFFERS, BUFFER_SIZE, IOManager.IOMode.SYNC, new Some<NettyConfig>(nettyConf),
 					new Tuple2<Integer, Integer>(0, 0));
 
-			NetworkEnvironment env = new NetworkEnvironment(new FiniteDuration(30, TimeUnit.SECONDS), config);
+			NetworkEnvironment env = new NetworkEnvironment(
+				TestingUtils.defaultExecutionContext(),
+				new FiniteDuration(30, TimeUnit.SECONDS),
+				config);
 
 			assertFalse(env.isShutdown());
 			assertFalse(env.isAssociated());

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
index a15e477..676b2a3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
@@ -27,41 +27,22 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class ScheduleWithCoLocationHintTest {
 
-	private static ActorSystem system;
-
-	@BeforeClass
-	public static void setup(){
-		system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
-		TestingUtils.setCallingThreadDispatcher(system);
-	}
-
-	@AfterClass
-	public static void teardown(){
-		TestingUtils.setGlobalExecutionContext();
-		JavaTestKit.shutdownActorSystem(system);
-	}
-
 	@Test
 	public void scheduleAllSharedAndCoLocated() {
 		try {
 			JobVertexID jid1 = new JobVertexID();
 			JobVertexID jid2 = new JobVertexID();
 			
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
 			
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			scheduler.newInstanceAvailable(getRandomInstance(2));
@@ -187,7 +168,7 @@ public class ScheduleWithCoLocationHintTest {
 			JobVertexID jid3 = new JobVertexID();
 			JobVertexID jid4 = new JobVertexID();
 			
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
 			
 			Instance i1 = getRandomInstance(1);
 			Instance i2 = getRandomInstance(1);
@@ -231,7 +212,7 @@ public class ScheduleWithCoLocationHintTest {
 			JobVertexID jid2 = new JobVertexID();
 			JobVertexID jid3 = new JobVertexID();
 			
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
 			
 			Instance i1 = getRandomInstance(1);
 			Instance i2 = getRandomInstance(1);
@@ -276,7 +257,7 @@ public class ScheduleWithCoLocationHintTest {
 			JobVertexID jid3 = new JobVertexID();
 			JobVertexID jid4 = new JobVertexID();
 			
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
 			scheduler.newInstanceAvailable(getRandomInstance(1));
 			scheduler.newInstanceAvailable(getRandomInstance(1));
 			scheduler.newInstanceAvailable(getRandomInstance(1));
@@ -338,7 +319,7 @@ public class ScheduleWithCoLocationHintTest {
 			JobVertexID jid2 = new JobVertexID();
 			JobVertexID jid3 = new JobVertexID();
 			
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
 			
 			Instance i1 = getRandomInstance(1);
 			Instance i2 = getRandomInstance(1);
@@ -407,7 +388,7 @@ public class ScheduleWithCoLocationHintTest {
 			JobVertexID jid1 = new JobVertexID();
 			JobVertexID jid2 = new JobVertexID();
 			
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
 			
 			Instance i1 = getRandomInstance(1);
 			Instance i2 = getRandomInstance(1);
@@ -461,7 +442,7 @@ public class ScheduleWithCoLocationHintTest {
 			JobVertexID jid2 = new JobVertexID();
 			JobVertexID jidx = new JobVertexID();
 			
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
 			
 			Instance i1 = getRandomInstance(1);
 			Instance i2 = getRandomInstance(1);
@@ -519,7 +500,7 @@ public class ScheduleWithCoLocationHintTest {
 			JobVertexID jid1 = new JobVertexID();
 			JobVertexID jid2 = new JobVertexID();
 			
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
 			
 			Instance i1 = getRandomInstance(1);
 			Instance i2 = getRandomInstance(1);
@@ -581,7 +562,7 @@ public class ScheduleWithCoLocationHintTest {
 			JobVertexID jid1 = new JobVertexID();
 			JobVertexID jid2 = new JobVertexID();
 			
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
 			
 			Instance i1 = getRandomInstance(1);
 			Instance i2 = getRandomInstance(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index d19299b..2ee53d9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -25,11 +25,7 @@ import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.g
 import static org.junit.Assert.*;
 
 import org.apache.flink.runtime.instance.SimpleSlot;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -47,24 +43,11 @@ import org.apache.flink.runtime.instance.Instance;
  * Tests for the {@link Scheduler} when scheduling individual tasks.
  */
 public class SchedulerIsolatedTasksTest {
-	private static ActorSystem system;
 
-	@BeforeClass
-	public static void setup(){
-		system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
-		TestingUtils.setCallingThreadDispatcher(system);
-	}
-
-	@AfterClass
-	public static void teardown(){
-		TestingUtils.setGlobalExecutionContext();
-		JavaTestKit.shutdownActorSystem(system);
-	}
-	
 	@Test
 	public void testAddAndRemoveInstance() {
 		try {
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 			
 			Instance i1 = getRandomInstance(2);
 			Instance i2 = getRandomInstance(2);
@@ -128,7 +111,7 @@ public class SchedulerIsolatedTasksTest {
 	@Test
 	public void testScheduleImmediately() {
 		try {
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 			assertEquals(0, scheduler.getNumberOfAvailableSlots());
 			
 			scheduler.newInstanceAvailable(getRandomInstance(2));
@@ -197,12 +180,10 @@ public class SchedulerIsolatedTasksTest {
 		final int NUM_SLOTS_PER_INSTANCE = 3;
 		final int NUM_TASKS_TO_SCHEDULE = 2000;
 
-		TestingUtils.setGlobalExecutionContext();
-
 		try {
 			// note: since this test asynchronously releases slots, the executor needs release workers.
 			// doing the release call synchronous can lead to a deadlock
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 
 			for (int i = 0; i < NUM_INSTANCES; i++) {
 				scheduler.newInstanceAvailable(getRandomInstance((int) (Math.random() * NUM_SLOTS_PER_INSTANCE) + 1));
@@ -287,15 +268,13 @@ public class SchedulerIsolatedTasksTest {
 		} catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
-		} finally {
-			TestingUtils.setCallingThreadDispatcher(system);
 		}
 	}
 	
 	@Test
 	public void testScheduleWithDyingInstances() {
 		try {
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 			
 			Instance i1 = getRandomInstance(2);
 			Instance i2 = getRandomInstance(2);
@@ -355,7 +334,7 @@ public class SchedulerIsolatedTasksTest {
 	@Test
 	public void testSchedulingLocation() {
 		try {
-			Scheduler scheduler = new Scheduler();
+			Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 			
 			Instance i1 = getRandomInstance(2);
 			Instance i2 = getRandomInstance(2);