You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/13 18:54:09 UTC
[6/7] flink git commit: [FLINK-2329] [runtime] Introduces
InstanceGateway as an abstraction to communicate with the TaskManager.
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index eb0aab4..9db330b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -24,18 +24,11 @@ import static org.mockito.Mockito.mock;
import java.io.IOException;
-import akka.actor.Actor;
-import akka.testkit.TestActorRef;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.Status;
-import akka.actor.UntypedActor;
-import akka.japi.Creator;
-import akka.testkit.JavaTestKit;
-
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.BaseTestingInstanceGateway;
+import org.apache.flink.runtime.instance.DummyInstanceGateway;
+import org.apache.flink.runtime.instance.InstanceGateway;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.api.common.JobID;
@@ -46,26 +39,12 @@ import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.Test;
+import scala.concurrent.ExecutionContext;
@SuppressWarnings("serial")
public class ExecutionVertexCancelTest {
- private static ActorSystem system;
-
- @BeforeClass
- public static void setup(){
- system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
- }
-
- @AfterClass
- public static void teardown(){
- JavaTestKit.shutdownActorSystem(system);
- system = null;
- }
-
// --------------------------------------------------------------------------------------------
// Canceling in different states
// --------------------------------------------------------------------------------------------
@@ -127,388 +106,350 @@ public class ExecutionVertexCancelTest {
@Test
public void testCancelConcurrentlyToDeploying_CallsNotOvertaking() {
- new JavaTestKit(system){{
- try {
- final JobVertexID jid = new JobVertexID();
- final ActionQueue actions = new ActionQueue();
+ try {
+ final JobVertexID jid = new JobVertexID();
- TestingUtils.setExecutionContext(new TestingUtils.QueuedActionExecutionContext(
- actions));
+ final TestingUtils.QueuedActionExecutionContext executionContext = TestingUtils.queuedActionExecutionContext();
+ final TestingUtils.ActionQueue actions = executionContext.actionQueue();
- final ExecutionJobVertex ejv = getExecutionVertex(jid);
- final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
- AkkaUtils.getDefaultTimeout());
- final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
+ final ExecutionJobVertex ejv = getExecutionVertex(
+ jid,
+ executionContext
+ );
- setVertexState(vertex, ExecutionState.SCHEDULED);
- assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
+ final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+ AkkaUtils.getDefaultTimeout());
+ final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
- ActorRef taskManager = TestActorRef.create(system, Props.create(new
- CancelSequenceTaskManagerCreator(new TaskOperationResult(execId, true),
- new TaskOperationResult(execId, false))));
+ setVertexState(vertex, ExecutionState.SCHEDULED);
+ assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
- Instance instance = getInstance(taskManager);
- SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+ InstanceGateway instanceGateway = new CancelSequenceInstanceGateway(
+ executionContext,
+ new TaskOperationResult(execId, true),
+ new TaskOperationResult(execId, false));
- vertex.deployToSlot(slot);
+ Instance instance = getInstance(instanceGateway);
+ SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+ vertex.deployToSlot(slot);
- assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
+ assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
- vertex.cancel();
+ vertex.cancel();
- assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
+ assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
- // first action happens (deploy)
- actions.triggerNextAction();
- assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
+ // first action happens (deploy)
+ actions.triggerNextAction();
+ assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
- // the deploy call found itself in canceling after it returned and needs to send a cancel call
- // the call did not yet execute, so it is still in canceling
- assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
+ // the deploy call found itself in canceling after it returned and needs to send a cancel call
+ // the call did not yet execute, so it is still in canceling
+ assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
- // second action happens (cancel call from cancel function)
- actions.triggerNextAction();
+ // second action happens (cancel call from cancel function)
+ actions.triggerNextAction();
- // TaskManager reports back (canceling done)
- vertex.getCurrentExecutionAttempt().cancelingComplete();
+ // TaskManager reports back (canceling done)
+ vertex.getCurrentExecutionAttempt().cancelingComplete();
- // should properly set state to cancelled
- assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
+ // should properly set state to cancelled
+ assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
- // trigger the correction canceling call
- actions.triggerNextAction();
- assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
+ // trigger the correction canceling call
+ actions.triggerNextAction();
+ assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
- assertTrue(slot.isReleased());
+ assertTrue(slot.isReleased());
- assertNull(vertex.getFailureCause());
+ assertNull(vertex.getFailureCause());
- assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
- assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
- assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
- }
- catch(Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- TestingUtils.setGlobalExecutionContext();
- }
- }};
+ assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+ assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
+ assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
+ }
+ catch(Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
@Test
public void testCancelConcurrentlyToDeploying_CallsOvertaking() {
- new JavaTestKit(system){
- {
- try {
- final JobVertexID jid = new JobVertexID();
- final ActionQueue actions = new ActionQueue();
+ try {
+ final JobVertexID jid = new JobVertexID();
- TestingUtils.setExecutionContext(new TestingUtils
- .QueuedActionExecutionContext(actions));
+ final TestingUtils.QueuedActionExecutionContext executionContext = TestingUtils.queuedActionExecutionContext();
+ final TestingUtils.ActionQueue actions = executionContext.actionQueue();
- final ExecutionJobVertex ejv = getExecutionVertex(jid);
+ final ExecutionJobVertex ejv = getExecutionVertex(jid, executionContext);
- final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
- AkkaUtils.getDefaultTimeout());
- final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
+ final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+ AkkaUtils.getDefaultTimeout());
+ final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
- setVertexState(vertex, ExecutionState.SCHEDULED);
- assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
+ setVertexState(vertex, ExecutionState.SCHEDULED);
+ assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
- // task manager cancel sequence mock actor
- // first return NOT SUCCESS (task not found, cancel call overtook deploy call), then success (cancel call after deploy call)
- TestActorRef<? extends Actor> taskManager = TestActorRef.create(system, Props.create(new
- CancelSequenceTaskManagerCreator(new
- TaskOperationResult(execId, false), new TaskOperationResult(execId, true))));
+ // task manager cancel sequence mock actor
+ // first return NOT SUCCESS (task not found, cancel call overtook deploy call), then success (cancel call after deploy call)
+ InstanceGateway instanceGateway = new CancelSequenceInstanceGateway(
+ executionContext,
+ new TaskOperationResult(execId, false),
+ new TaskOperationResult(execId, true)
+ );
- Instance instance = getInstance(taskManager);
- SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+ Instance instance = getInstance(instanceGateway);
+ SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
- vertex.deployToSlot(slot);
+ vertex.deployToSlot(slot);
- assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
+ assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
- vertex.cancel();
+ vertex.cancel();
- assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
+ assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
- // first action happens (deploy)
- Runnable deployAction = actions.popNextAction();
- Runnable cancelAction = actions.popNextAction();
+ // first action happens (deploy)
+ Runnable deployAction = actions.popNextAction();
+ Runnable cancelAction = actions.popNextAction();
- // cancel call first
- cancelAction.run();
- // process onComplete callback
- actions.triggerNextAction();
+ // cancel call first
+ cancelAction.run();
+ // process onComplete callback
+ actions.triggerNextAction();
- // did not find the task, not properly cancelled, stay in canceling
- assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
+ // did not find the task, not properly cancelled, stay in canceling
+ assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
- // deploy action next
- deployAction.run();
+ // deploy action next
+ deployAction.run();
- // the deploy call found itself in canceling after it returned and needs to send a cancel call
- // the call did not yet execute, so it is still in canceling
- assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
+ // the deploy call found itself in canceling after it returned and needs to send a cancel call
+ // the call did not yet execute, so it is still in canceling
+ assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
- vertex.getCurrentExecutionAttempt().cancelingComplete();
+ vertex.getCurrentExecutionAttempt().cancelingComplete();
- assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
+ assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
- assertTrue(slot.isReleased());
+ assertTrue(slot.isReleased());
- assertNull(vertex.getFailureCause());
+ assertNull(vertex.getFailureCause());
- assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
- assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
- assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }finally{
- TestingUtils.setGlobalExecutionContext();
- }
- }
- };
+ assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+ assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
+ assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
@Test
public void testCancelFromRunning() {
- new JavaTestKit(system) {
- {
- try {
- TestingUtils.setCallingThreadDispatcher(system);
- final JobVertexID jid = new JobVertexID();
- final ExecutionJobVertex ejv = getExecutionVertex(jid);
+ try {
+ final JobVertexID jid = new JobVertexID();
+ final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext());
- final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
- AkkaUtils.getDefaultTimeout());
- final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
+ final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+ AkkaUtils.getDefaultTimeout());
+ final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
- final TestActorRef<? extends Actor> taskManager = TestActorRef.create(system,
- Props.create(new CancelSequenceTaskManagerCreator(new
- TaskOperationResult(execId, true))));
+ InstanceGateway instanceGateway = new CancelSequenceInstanceGateway(
+ TestingUtils.directExecutionContext(),
+ new TaskOperationResult(execId, true));
- Instance instance = getInstance(taskManager);
- SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+ Instance instance = getInstance(instanceGateway);
+ SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
- setVertexState(vertex, ExecutionState.RUNNING);
- setVertexResource(vertex, slot);
+ setVertexState(vertex, ExecutionState.RUNNING);
+ setVertexResource(vertex, slot);
- assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
+ assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
- vertex.cancel();
- vertex.getCurrentExecutionAttempt().cancelingComplete(); // responce by task manager once actially canceled
+ vertex.cancel();
+ vertex.getCurrentExecutionAttempt().cancelingComplete(); // responce by task manager once actially canceled
- assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
+ assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
- assertTrue(slot.isReleased());
+ assertTrue(slot.isReleased());
- assertNull(vertex.getFailureCause());
+ assertNull(vertex.getFailureCause());
- assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
- assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
- assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }finally{
- TestingUtils.setGlobalExecutionContext();
- }
- }
- };
+ assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+ assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
+ assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
@Test
public void testRepeatedCancelFromRunning() {
- new JavaTestKit(system) {
- {
- try {
- TestingUtils.setCallingThreadDispatcher(system);
+ try {
- final JobVertexID jid = new JobVertexID();
- final ExecutionJobVertex ejv = getExecutionVertex(jid);
+ final JobVertexID jid = new JobVertexID();
+ final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext());
- final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
- AkkaUtils.getDefaultTimeout());
- final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
+ final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+ AkkaUtils.getDefaultTimeout());
+ final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
- final ActorRef taskManager = TestActorRef.create(system, Props.create(new
- CancelSequenceTaskManagerCreator(new
- TaskOperationResult(execId, true))));
+ final InstanceGateway instanceGateway = new CancelSequenceInstanceGateway(
+ TestingUtils.directExecutionContext(),
+ new TaskOperationResult(execId, true)
+ );
- Instance instance = getInstance(taskManager);
- SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+ Instance instance = getInstance(instanceGateway);
+ SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
- setVertexState(vertex, ExecutionState.RUNNING);
- setVertexResource(vertex, slot);
+ setVertexState(vertex, ExecutionState.RUNNING);
+ setVertexResource(vertex, slot);
- assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
+ assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
- vertex.cancel();
+ vertex.cancel();
- assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
+ assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
- vertex.cancel();
+ vertex.cancel();
- assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
+ assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
- // callback by TaskManager after canceling completes
- vertex.getCurrentExecutionAttempt().cancelingComplete();
+ // callback by TaskManager after canceling completes
+ vertex.getCurrentExecutionAttempt().cancelingComplete();
- assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
+ assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
- assertTrue(slot.isReleased());
+ assertTrue(slot.isReleased());
- assertNull(vertex.getFailureCause());
+ assertNull(vertex.getFailureCause());
- assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
- assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
- assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }finally{
- TestingUtils.setGlobalExecutionContext();
- }
- }
- };
+ assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+ assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
+ assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
@Test
public void testCancelFromRunningDidNotFindTask() {
// this may happen when the task finished or failed while the call was in progress
- new JavaTestKit(system) {
- {
- try {
- TestingUtils.setCallingThreadDispatcher(system);
- final JobVertexID jid = new JobVertexID();
- final ExecutionJobVertex ejv = getExecutionVertex(jid);
+ try {
+ final JobVertexID jid = new JobVertexID();
+ final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext());
- final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
- AkkaUtils.getDefaultTimeout());
- final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
+ final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+ AkkaUtils.getDefaultTimeout());
+ final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
- final ActorRef taskManager = TestActorRef.create(system,Props.create(new
- CancelSequenceTaskManagerCreator(new
- TaskOperationResult(execId, false))));
- Instance instance = getInstance(taskManager);
- SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+ final InstanceGateway instanceGateway = new CancelSequenceInstanceGateway(
+ TestingUtils.directExecutionContext(),
+ new TaskOperationResult(execId, false)
+ );
- setVertexState(vertex, ExecutionState.RUNNING);
- setVertexResource(vertex, slot);
+ Instance instance = getInstance(instanceGateway);
+ SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
- assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
+ setVertexState(vertex, ExecutionState.RUNNING);
+ setVertexResource(vertex, slot);
- vertex.cancel();
+ assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
- assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
+ vertex.cancel();
- assertNull(vertex.getFailureCause());
+ assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
- assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
- assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }finally{
- TestingUtils.setGlobalExecutionContext();
- }
- }
- };
+ assertNull(vertex.getFailureCause());
+
+ assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+ assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
@Test
public void testCancelCallFails() {
- new JavaTestKit(system) {
- {
- try {
- TestingUtils.setCallingThreadDispatcher(system);
- final JobVertexID jid = new JobVertexID();
- final ExecutionJobVertex ejv = getExecutionVertex(jid);
+ try {
+ final JobVertexID jid = new JobVertexID();
+ final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext());
- final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
- AkkaUtils.getDefaultTimeout());
+ final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+ AkkaUtils.getDefaultTimeout());
- final ActorRef taskManager = TestActorRef.create(system, Props.create(new
- CancelSequenceTaskManagerCreator()));
+ final InstanceGateway gateway = new CancelSequenceInstanceGateway(TestingUtils.directExecutionContext());
- Instance instance = getInstance(taskManager);
- SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+ Instance instance = getInstance(gateway);
+ SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
- setVertexState(vertex, ExecutionState.RUNNING);
- setVertexResource(vertex, slot);
+ setVertexState(vertex, ExecutionState.RUNNING);
+ setVertexResource(vertex, slot);
- assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
+ assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
- vertex.cancel();
+ vertex.cancel();
- assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
+ assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
- assertTrue(slot.isReleased());
+ assertTrue(slot.isReleased());
- assertNotNull(vertex.getFailureCause());
+ assertNotNull(vertex.getFailureCause());
- assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
- assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
- assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }finally{
- TestingUtils.setGlobalExecutionContext();
- }
- }
- };
+ assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+ assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
+ assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
@Test
public void testSendCancelAndReceiveFail() {
- new JavaTestKit(system) {
- {
- try {
- final JobVertexID jid = new JobVertexID();
- final ExecutionJobVertex ejv = getExecutionVertex(jid);
+ try {
+ final JobVertexID jid = new JobVertexID();
+ final ExecutionJobVertex ejv = getExecutionVertex(jid);
- final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
- AkkaUtils.getDefaultTimeout());
- final ExecutionAttemptID execID = vertex.getCurrentExecutionAttempt().getAttemptId();
+ final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+ AkkaUtils.getDefaultTimeout());
+ final ExecutionAttemptID execID = vertex.getCurrentExecutionAttempt().getAttemptId();
- final ActorRef taskManager = system.actorOf(
- Props.create(new CancelSequenceTaskManagerCreator(
- new TaskOperationResult(execID, true)
- )));
+ final InstanceGateway gateway = new CancelSequenceInstanceGateway(
+ TestingUtils.defaultExecutionContext(),
+ new TaskOperationResult(execID, true));
- Instance instance = getInstance(taskManager);
- SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+ Instance instance = getInstance(gateway);
+ SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
- setVertexState(vertex, ExecutionState.RUNNING);
- setVertexResource(vertex, slot);
+ setVertexState(vertex, ExecutionState.RUNNING);
+ setVertexResource(vertex, slot);
- assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
+ assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
- vertex.cancel();
+ vertex.cancel();
- assertTrue(vertex.getExecutionState() == ExecutionState.CANCELING || vertex.getExecutionState() == ExecutionState.FAILED);
+ assertTrue(vertex.getExecutionState() == ExecutionState.CANCELING || vertex.getExecutionState() == ExecutionState.FAILED);
- vertex.getCurrentExecutionAttempt().markFailed(new Throwable("test"));
+ vertex.getCurrentExecutionAttempt().markFailed(new Throwable("test"));
- assertTrue(vertex.getExecutionState() == ExecutionState.CANCELED || vertex.getExecutionState() == ExecutionState.FAILED);
+ assertTrue(vertex.getExecutionState() == ExecutionState.CANCELED || vertex.getExecutionState() == ExecutionState.FAILED);
- assertTrue(slot.isReleased());
+ assertTrue(slot.isReleased());
- assertEquals(0, vertex.getExecutionGraph().getRegisteredExecutions().size());
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
- };
+ assertEquals(0, vertex.getExecutionGraph().getRegisteredExecutions().size());
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
// --------------------------------------------------------------------------------------------
@@ -541,7 +482,7 @@ public class ExecutionVertexCancelTest {
// deploying after canceling from CREATED needs to raise an exception, because
// the scheduler (or any caller) needs to know that the slot should be released
try {
- Instance instance = getInstance(ActorRef.noSender());
+ Instance instance = getInstance(DummyInstanceGateway.INSTANCE);
SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
vertex.deployToSlot(slot);
@@ -584,7 +525,7 @@ public class ExecutionVertexCancelTest {
AkkaUtils.getDefaultTimeout());
setVertexState(vertex, ExecutionState.CANCELING);
- Instance instance = getInstance(ActorRef.noSender());
+ Instance instance = getInstance(DummyInstanceGateway.INSTANCE);
SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
vertex.deployToSlot(slot);
@@ -600,7 +541,7 @@ public class ExecutionVertexCancelTest {
ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
- Instance instance = getInstance(ActorRef.noSender());
+ Instance instance = getInstance(DummyInstanceGateway.INSTANCE);
SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
setVertexResource(vertex, slot);
@@ -621,39 +562,33 @@ public class ExecutionVertexCancelTest {
}
}
- public static class CancelSequenceTaskManagerCreator implements Creator<CancelSequenceTaskManager> {
- private final TaskOperationResult[] results;
- public CancelSequenceTaskManagerCreator(TaskOperationResult ... results){
- this.results = results;
- }
-
- @Override
- public CancelSequenceTaskManager create() throws Exception {
- return new CancelSequenceTaskManager(results);
- }
- }
-
- public static class CancelSequenceTaskManager extends UntypedActor{
+ public static class CancelSequenceInstanceGateway extends BaseTestingInstanceGateway {
private final TaskOperationResult[] results;
- private int index;
+ private int index = -1;
- public CancelSequenceTaskManager(TaskOperationResult[] results){
- this.results = results;
- index = -1;
+ public CancelSequenceInstanceGateway(ExecutionContext executionContext, TaskOperationResult ... result) {
+ super(executionContext);
+ this.results = result;
}
@Override
- public void onReceive(Object message) throws Exception {
- if(message instanceof TaskMessages.SubmitTask){
- getSender().tell(Messages.getAcknowledge(), getSelf());
- }else if(message instanceof TaskMessages.CancelTask){
+ public Object handleMessage(Object message) throws Exception {
+ Object result;
+ if(message instanceof TaskMessages.SubmitTask) {
+ result = Messages.getAcknowledge();
+ } else if(message instanceof TaskMessages.CancelTask) {
index++;
+
if(index >= results.length){
- getSender().tell(new Status.Failure(new IOException("RPC call failed.")), getSelf());
- }else {
- getSender().tell(results[index], getSelf());
+ throw new IOException("RPC call failed.");
+ } else {
+ result = results[index];
}
+ } else {
+ result = null;
}
+
+ return result;
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index eadf328..431c3a9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -22,13 +22,6 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
import static org.junit.Assert.*;
-import akka.actor.Actor;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
-
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.Instance;
@@ -37,37 +30,20 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.Test;
public class ExecutionVertexDeploymentTest {
- private static ActorSystem system;
-
- @BeforeClass
- public static void setup(){
- system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
- }
-
- @AfterClass
- public static void teardown(){
- JavaTestKit.shutdownActorSystem(system);
- }
-
@Test
public void testDeployCall() {
try {
final JobVertexID jid = new JobVertexID();
- TestingUtils.setCallingThreadDispatcher(system);
- ActorRef tm = TestActorRef.create(system, Props.create(SimpleAcknowledgingTaskManager
- .class));
-
final ExecutionJobVertex ejv = getExecutionVertex(jid);
// mock taskmanager to simply accept the call
- Instance instance = getInstance(tm);
+ Instance instance = getInstance(
+ new SimpleInstanceGateway(TestingUtils.directExecutionContext()));
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
@@ -95,24 +71,17 @@ public class ExecutionVertexDeploymentTest {
e.printStackTrace();
fail(e.getMessage());
}
- finally {
- TestingUtils.setGlobalExecutionContext();
- }
}
@Test
public void testDeployWithSynchronousAnswer() {
try {
- TestingUtils.setCallingThreadDispatcher(system);
-
final JobVertexID jid = new JobVertexID();
- final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system,
- Props.create(SimpleAcknowledgingTaskManager.class));
-
- final ExecutionJobVertex ejv = getExecutionVertex(jid);
+ final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext());
- final Instance instance = getInstance(simpleTaskManager);
+ final Instance instance = getInstance(
+ new SimpleInstanceGateway(TestingUtils.directExecutionContext()));
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
@@ -143,9 +112,6 @@ public class ExecutionVertexDeploymentTest {
e.printStackTrace();
fail(e.getMessage());
}
- finally {
- TestingUtils.setGlobalExecutionContext();
- }
}
@Test
@@ -157,10 +123,8 @@ public class ExecutionVertexDeploymentTest {
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
- final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system,
- Props.create(SimpleAcknowledgingTaskManager.class));
-
- final Instance instance = getInstance(simpleTaskManager);
+ final Instance instance = getInstance(
+ new SimpleInstanceGateway(TestingUtils.defaultExecutionContext()));
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
@@ -200,18 +164,14 @@ public class ExecutionVertexDeploymentTest {
@Test
public void testDeployFailedSynchronous() {
try {
- TestingUtils.setCallingThreadDispatcher(system);
-
final JobVertexID jid = new JobVertexID();
- final ExecutionJobVertex ejv = getExecutionVertex(jid);
+ final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext());
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
- final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system,
- Props.create(SimpleFailingTaskManager.class));
-
- final Instance instance = getInstance(simpleTaskManager);
+ final Instance instance = getInstance(
+ new SimpleFailingInstanceGateway(TestingUtils.directExecutionContext()));
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
@@ -230,9 +190,6 @@ public class ExecutionVertexDeploymentTest {
e.printStackTrace();
fail(e.getMessage());
}
- finally {
- TestingUtils.setGlobalExecutionContext();
- }
}
@Test
@@ -243,10 +200,8 @@ public class ExecutionVertexDeploymentTest {
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
- final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system,
- Props.create(SimpleFailingTaskManager.class));
-
- final Instance instance = getInstance(simpleTaskManager);
+ final Instance instance = getInstance(
+ new SimpleFailingInstanceGateway(TestingUtils.directExecutionContext()));
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
@@ -280,20 +235,16 @@ public class ExecutionVertexDeploymentTest {
public void testFailExternallyDuringDeploy() {
try {
final JobVertexID jid = new JobVertexID();
- final ExecutionJobVertex ejv = getExecutionVertex(jid);
- final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
- AkkaUtils.getDefaultTimeout());
+ final TestingUtils.QueuedActionExecutionContext ec = TestingUtils.queuedActionExecutionContext();
+ final TestingUtils.ActionQueue queue = ec.actionQueue();
- final ActionQueue queue = new ActionQueue();
- final TestingUtils.QueuedActionExecutionContext ec = new TestingUtils
- .QueuedActionExecutionContext(queue);
+ final ExecutionJobVertex ejv = getExecutionVertex(jid, ec);
- TestingUtils.setExecutionContext(ec);
+ final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+ AkkaUtils.getDefaultTimeout());
- final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system,
- Props.create(SimpleAcknowledgingTaskManager.class));
- final Instance instance = getInstance(simpleTaskManager);
+ final Instance instance = getInstance(new SimpleInstanceGateway(TestingUtils.directExecutionContext()));
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
@@ -307,6 +258,7 @@ public class ExecutionVertexDeploymentTest {
assertEquals(testError, vertex.getFailureCause());
queue.triggerNextAction();
+ queue.triggerNextAction();
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
@@ -316,34 +268,29 @@ public class ExecutionVertexDeploymentTest {
e.printStackTrace();
fail(e.getMessage());
}
- finally {
- TestingUtils.setGlobalExecutionContext();
- }
}
@Test
public void testFailCallOvertakesDeploymentAnswer() {
try {
- ActionQueue queue = new ActionQueue();
- TestingUtils.QueuedActionExecutionContext context = new TestingUtils
- .QueuedActionExecutionContext(queue);
-
- TestingUtils.setExecutionContext(context);
+ final TestingUtils.QueuedActionExecutionContext context = TestingUtils.queuedActionExecutionContext();
+ final TestingUtils.ActionQueue queue = context.actionQueue();
final JobVertexID jid = new JobVertexID();
- final ExecutionJobVertex ejv = getExecutionVertex(jid);
+ final ExecutionJobVertex ejv = getExecutionVertex(jid, context);
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
final ExecutionAttemptID eid = vertex.getCurrentExecutionAttempt().getAttemptId();
- final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system, Props.create(new
- ExecutionVertexCancelTest.CancelSequenceTaskManagerCreator(new
- TaskOperationResult(eid, false), new TaskOperationResult(eid, true))));
+ final Instance instance = getInstance(
+ new ExecutionVertexCancelTest.CancelSequenceInstanceGateway(
+ context,
+ new TaskOperationResult(eid, false),
+ new TaskOperationResult(eid, true)));
- final Instance instance = getInstance(simpleTaskManager);
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
@@ -361,11 +308,14 @@ public class ExecutionVertexDeploymentTest {
Runnable cancel1 = queue.popNextAction();
cancel1.run();
- // execute onComplete callback
+ // execute onComplete callback of cancel
queue.triggerNextAction();
deploy.run();
+ // execute onComplete callback of deploy
+ queue.triggerNextAction();
+
assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
assertEquals(testError, vertex.getFailureCause());
@@ -380,8 +330,5 @@ public class ExecutionVertexDeploymentTest {
e.printStackTrace();
fail(e.getMessage());
}
- finally {
- TestingUtils.setGlobalExecutionContext();
- }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 1e9c30b..8ea7017 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -23,14 +23,9 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ge
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
-
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.DummyInstanceGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -38,28 +33,12 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Matchers;
public class ExecutionVertexSchedulingTest {
- private static ActorSystem system;
-
- @BeforeClass
- public static void setup(){
- system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
- }
-
- @AfterClass
- public static void teardown(){
- JavaTestKit.shutdownActorSystem(system);
- system = null;
- }
-
@Test
public void testSlotReleasedWhenScheduledImmediately() {
try {
@@ -68,7 +47,7 @@ public class ExecutionVertexSchedulingTest {
AkkaUtils.getDefaultTimeout());
// a slot than cannot be deployed to
- final Instance instance = getInstance(ActorRef.noSender());
+ final Instance instance = getInstance(DummyInstanceGateway.INSTANCE);
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
slot.releaseSlot();
@@ -98,7 +77,7 @@ public class ExecutionVertexSchedulingTest {
AkkaUtils.getDefaultTimeout());
// a slot than cannot be deployed to
- final Instance instance = getInstance(ActorRef.noSender());
+ final Instance instance = getInstance(DummyInstanceGateway.INSTANCE);
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
slot.releaseSlot();
@@ -134,11 +113,7 @@ public class ExecutionVertexSchedulingTest {
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
- TestingUtils.setCallingThreadDispatcher(system);
- ActorRef tm = TestActorRef.create(system, Props.create(ExecutionGraphTestUtils
- .SimpleAcknowledgingTaskManager.class));
-
- final Instance instance = getInstance(tm);
+ final Instance instance = getInstance(new ExecutionGraphTestUtils.SimpleInstanceGateway(TestingUtils.defaultExecutionContext()));
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
Scheduler scheduler = mock(Scheduler.class);
@@ -153,8 +128,6 @@ public class ExecutionVertexSchedulingTest {
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
- }finally{
- TestingUtils.setGlobalExecutionContext();
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
index 0d2ffeb..b4a7e63 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
@@ -39,39 +39,14 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
-import akka.actor.Actor;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
public class LocalInputSplitsTest {
private static final FiniteDuration TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS);
- private static ActorSystem system;
-
- private static TestActorRef<? extends Actor> taskManager;
-
-
- @BeforeClass
- public static void setup() {
- system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
- taskManager = TestActorRef.create(system,
- Props.create(ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager.class));
- }
-
- @AfterClass
- public static void teardown() {
- JavaTestKit.shutdownActorSystem(system);
- system = null;
- }
-
// --------------------------------------------------------------------------------------------
@Test
@@ -290,14 +265,18 @@ public class LocalInputSplitsTest {
JobGraph jobGraph = new JobGraph("test job", vertex);
- ExecutionGraph eg = new ExecutionGraph(jobGraph.getJobID(),
- jobGraph.getName(), jobGraph.getJobConfiguration(), TIMEOUT);
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ jobGraph.getJobID(),
+ jobGraph.getName(),
+ jobGraph.getJobConfiguration(),
+ TIMEOUT);
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
eg.setQueuedSchedulingAllowed(false);
// create a scheduler with 6 instances where always two are on the same host
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
Instance i1 = getInstance(new byte[] {10,0,1,1}, 12345, "host1", 1);
Instance i2 = getInstance(new byte[] {10,0,1,1}, 12346, "host1", 1);
Instance i3 = getInstance(new byte[] {10,0,1,2}, 12345, "host2", 1);
@@ -349,8 +328,12 @@ public class LocalInputSplitsTest {
JobGraph jobGraph = new JobGraph("test job", vertex);
- ExecutionGraph eg = new ExecutionGraph(jobGraph.getJobID(),
- jobGraph.getName(), jobGraph.getJobConfiguration(), TIMEOUT);
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ jobGraph.getJobID(),
+ jobGraph.getName(),
+ jobGraph.getJobConfiguration(),
+ TIMEOUT);
eg.setQueuedSchedulingAllowed(false);
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
@@ -370,7 +353,7 @@ public class LocalInputSplitsTest {
}
private static Scheduler getScheduler(int numInstances, int numSlotsPerInstance) throws Exception {
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
for (int i = 0; i < numInstances; i++) {
byte[] ipAddress = new byte[] { 10, 0, 1, (byte) (1 + i) };
@@ -393,7 +376,13 @@ public class LocalInputSplitsTest {
when(connection.getHostname()).thenReturn(hostname);
when(connection.getFQDNHostname()).thenReturn(hostname);
- return new Instance(taskManager, connection, new InstanceID(), hardwareDescription, slots);
+ return new Instance(
+ new ExecutionGraphTestUtils.SimpleInstanceGateway(
+ TestingUtils.defaultExecutionContext()),
+ connection,
+ new InstanceID(),
+ hardwareDescription,
+ slots);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index 4677bf8..3cedb63 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Test;
import java.util.ArrayList;
@@ -56,7 +57,12 @@ public class PointwisePatternTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
- ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ jobId,
+ jobName,
+ cfg,
+ AkkaUtils.getDefaultTimeout());
try {
eg.attachJobGraph(ordered);
}
@@ -91,7 +97,12 @@ public class PointwisePatternTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
- ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ jobId,
+ jobName,
+ cfg,
+ AkkaUtils.getDefaultTimeout());
try {
eg.attachJobGraph(ordered);
}
@@ -127,7 +138,12 @@ public class PointwisePatternTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
- ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ jobId,
+ jobName,
+ cfg,
+ AkkaUtils.getDefaultTimeout());
try {
eg.attachJobGraph(ordered);
}
@@ -164,7 +180,12 @@ public class PointwisePatternTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
- ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ jobId,
+ jobName,
+ cfg,
+ AkkaUtils.getDefaultTimeout());
try {
eg.attachJobGraph(ordered);
}
@@ -199,7 +220,12 @@ public class PointwisePatternTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
- ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ jobId,
+ jobName,
+ cfg,
+ AkkaUtils.getDefaultTimeout());
try {
eg.attachJobGraph(ordered);
}
@@ -254,7 +280,12 @@ public class PointwisePatternTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
- ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ jobId,
+ jobName,
+ cfg,
+ AkkaUtils.getDefaultTimeout());
try {
eg.attachJobGraph(ordered);
}
@@ -300,7 +331,12 @@ public class PointwisePatternTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
- ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ jobId,
+ jobName,
+ cfg,
+ AkkaUtils.getDefaultTimeout());
try {
eg.attachJobGraph(ordered);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index 376ff14..b779d79 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -18,11 +18,10 @@
package org.apache.flink.runtime.executiongraph;
-import akka.actor.ActorRef;
-
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.DummyInstanceGateway;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
@@ -34,6 +33,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
@@ -78,7 +78,7 @@ public class TerminalStateDeadlockTest {
InstanceConnectionInfo ci = new InstanceConnectionInfo(address, 12345);
HardwareDescription resources = new HardwareDescription(4, 4000000, 3000000, 2000000);
- Instance instance = new Instance(ActorRef.noSender(), ci, new InstanceID(), resources, 4);
+ Instance instance = new Instance(DummyInstanceGateway.INSTANCE, ci, new InstanceID(), resources, 4);
this.resource = instance.allocateSimpleSlot(new JobID());
}
@@ -116,7 +116,7 @@ public class TerminalStateDeadlockTest {
vertices = Arrays.asList(v1, v2);
}
- final Scheduler scheduler = new Scheduler();
+ final Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
final Executor executor = Executors.newFixedThreadPool(4);
@@ -181,7 +181,7 @@ public class TerminalStateDeadlockTest {
private volatile boolean done;
TestExecGraph(JobID jobId) {
- super(jobId, "test graph", EMPTY_CONFIG, TIMEOUT);
+ super(TestingUtils.defaultExecutionContext(), jobId, "test graph", EMPTY_CONFIG, TIMEOUT);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
index 756b9a4..3305254 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
@@ -26,6 +26,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.instance.DummyInstanceGateway;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
@@ -39,42 +40,14 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
-import akka.actor.Actor;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
public class VertexLocationConstraintTest {
private static final FiniteDuration timeout = new FiniteDuration(100, TimeUnit.SECONDS);
- private static ActorSystem system;
-
- private static TestActorRef<? extends Actor> taskManager;
-
-
- @BeforeClass
- public static void setup() {
- system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
-
- taskManager = TestActorRef.create(system,
- Props.create(ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager.class));
- }
-
- @AfterClass
- public static void teardown() {
- JavaTestKit.shutdownActorSystem(system);
- system = null;
- }
-
-
@Test
public void testScheduleWithConstraint1() {
try {
@@ -91,7 +64,7 @@ public class VertexLocationConstraintTest {
Instance instance2 = getInstance(address2, 6789, hostname2);
Instance instance3 = getInstance(address3, 6789, hostname3);
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
scheduler.newInstanceAvailable(instance1);
scheduler.newInstanceAvailable(instance2);
scheduler.newInstanceAvailable(instance3);
@@ -102,7 +75,12 @@ public class VertexLocationConstraintTest {
jobVertex.setParallelism(2);
JobGraph jg = new JobGraph("test job", jobVertex);
- ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(), timeout);
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ jg.getJobID(),
+ jg.getName(),
+ jg.getJobConfiguration(),
+ timeout);
eg.attachJobGraph(Collections.singletonList(jobVertex));
ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex.getID());
@@ -157,7 +135,7 @@ public class VertexLocationConstraintTest {
Instance instance2 = getInstance(address2, 6789, hostname2);
Instance instance3 = getInstance(address3, 6789, hostname3);
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
scheduler.newInstanceAvailable(instance1);
scheduler.newInstanceAvailable(instance2);
scheduler.newInstanceAvailable(instance3);
@@ -168,7 +146,12 @@ public class VertexLocationConstraintTest {
jobVertex.setParallelism(2);
JobGraph jg = new JobGraph("test job", jobVertex);
- ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(), timeout);
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ jg.getJobID(),
+ jg.getName(),
+ jg.getJobConfiguration(),
+ timeout);
eg.attachJobGraph(Collections.singletonList(jobVertex));
ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex.getID());
@@ -219,7 +202,7 @@ public class VertexLocationConstraintTest {
Instance instance2 = getInstance(address2, 6789, hostname2);
Instance instance3 = getInstance(address3, 6789, hostname3);
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
scheduler.newInstanceAvailable(instance1);
scheduler.newInstanceAvailable(instance2);
scheduler.newInstanceAvailable(instance3);
@@ -238,7 +221,12 @@ public class VertexLocationConstraintTest {
JobGraph jg = new JobGraph("test job", jobVertex1, jobVertex2);
- ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(), timeout);
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ jg.getJobID(),
+ jg.getName(),
+ jg.getJobConfiguration(),
+ timeout);
eg.attachJobGraph(Arrays.asList(jobVertex1, jobVertex2));
ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex1.getID());
@@ -290,7 +278,7 @@ public class VertexLocationConstraintTest {
Instance instance1 = getInstance(address1, 6789, hostname1);
Instance instance2 = getInstance(address2, 6789, hostname2);
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
scheduler.newInstanceAvailable(instance1);
// prepare the execution graph
@@ -299,7 +287,12 @@ public class VertexLocationConstraintTest {
jobVertex.setParallelism(1);
JobGraph jg = new JobGraph("test job", jobVertex);
- ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(), timeout);
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ jg.getJobID(),
+ jg.getName(),
+ jg.getJobConfiguration(),
+ timeout);
eg.attachJobGraph(Collections.singletonList(jobVertex));
ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex.getID());
@@ -343,7 +336,7 @@ public class VertexLocationConstraintTest {
Instance instance1 = getInstance(address1, 6789, hostname1);
Instance instance2 = getInstance(address2, 6789, hostname2);
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
scheduler.newInstanceAvailable(instance1);
// prepare the execution graph
@@ -362,7 +355,12 @@ public class VertexLocationConstraintTest {
jobVertex1.setSlotSharingGroup(sharingGroup);
jobVertex2.setSlotSharingGroup(sharingGroup);
- ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(), timeout);
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ jg.getJobID(),
+ jg.getName(),
+ jg.getJobConfiguration(),
+ timeout);
eg.attachJobGraph(Arrays.asList(jobVertex1, jobVertex2));
ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex1.getID());
@@ -395,12 +393,17 @@ public class VertexLocationConstraintTest {
JobVertex vertex = new JobVertex("test vertex", new JobVertexID());
JobGraph jg = new JobGraph("test job", vertex);
- ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(), timeout);
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ jg.getJobID(),
+ jg.getName(),
+ jg.getJobConfiguration(),
+ timeout);
eg.attachJobGraph(Collections.singletonList(vertex));
ExecutionVertex ev = eg.getAllVertices().get(vertex.getID()).getTaskVertices()[0];
- Instance instance = ExecutionGraphTestUtils.getInstance(ActorRef.noSender());
+ Instance instance = ExecutionGraphTestUtils.getInstance(DummyInstanceGateway.INSTANCE);
ev.setLocationConstraintHosts(Collections.singletonList(instance));
assertNotNull(ev.getPreferredLocations());
@@ -431,6 +434,12 @@ public class VertexLocationConstraintTest {
when(connection.getHostname()).thenReturn(hostname);
when(connection.getFQDNHostname()).thenReturn(hostname);
- return new Instance(taskManager, connection, new InstanceID(), hardwareDescription, 1);
+ return new Instance(
+ new ExecutionGraphTestUtils.SimpleInstanceGateway(
+ TestingUtils.defaultExecutionContext()),
+ connection,
+ new InstanceID(),
+ hardwareDescription,
+ 1);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
index a1d6d03..d9e422c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Test;
public class VertexSlotSharingTest {
@@ -68,7 +69,11 @@ public class VertexSlotSharingTest {
List<JobVertex> vertices = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
- ExecutionGraph eg = new ExecutionGraph(new JobID(), "test job", new Configuration(),
+ ExecutionGraph eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ new JobID(),
+ "test job",
+ new Configuration(),
AkkaUtils.getDefaultTimeout());
eg.attachJobGraph(vertices);
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java
new file mode 100644
index 0000000..e9f8259
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import akka.actor.ActorPath;
+import akka.actor.ActorRef;
+import akka.dispatch.Futures;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract base class for testing {@link InstanceGateway} instances. The implementing subclass
+ * only has to provide an implementation for handleMessage which contains the logic to treat
+ * different messages.
+ */
+abstract public class BaseTestingInstanceGateway implements InstanceGateway {
+ /**
+ * {@link ExecutionContext} which is used to execute the futures.
+ */
+ private final ExecutionContext executionContext;
+
+ public BaseTestingInstanceGateway(ExecutionContext executionContext) {
+ this.executionContext = executionContext;
+ }
+
+ @Override
+ public Future<Object> ask(Object message, FiniteDuration timeout) {
+ try {
+ final Object result = handleMessage(message);
+
+ return Futures.future(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ return result;
+ }
+ }, executionContext);
+
+ } catch (final Exception e) {
+ // if an exception occurred in the handleMessage method then return it as part of the future
+ return Futures.future(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ throw e;
+ }
+ }, executionContext);
+ }
+ }
+
+ /**
+ * Handles the supported messages by this InstanceGateway
+ *
+ * @param message Message to handle
+ * @return Result
+ * @throws Exception
+ */
+ abstract public Object handleMessage(Object message) throws Exception;
+
+ @Override
+ public void tell(Object message) {}
+
+ @Override
+ public void forward(Object message, ActorRef sender) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) {
+ return ask(message, timeout);
+ }
+
+ @Override
+ public String path() {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java
new file mode 100644
index 0000000..5941201
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import akka.actor.ActorPath;
+import akka.actor.ActorRef;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Dummy {@link InstanceGateway} implementation used for testing.
+ */
+public class DummyInstanceGateway implements InstanceGateway {
+ public static final DummyInstanceGateway INSTANCE = new DummyInstanceGateway();
+
+ @Override
+ public Future<Object> ask(Object message, FiniteDuration timeout) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void tell(Object message) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void forward(Object message, ActorRef sender) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String path() {
+ return "DummyInstanceGateway";
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
index 595ac7e..c075a17 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.*;
import java.lang.reflect.Method;
import java.net.InetAddress;
-import akka.actor.ActorRef;
import org.apache.flink.api.common.JobID;
import org.junit.Test;
@@ -39,7 +38,7 @@ public class InstanceTest {
InetAddress address = InetAddress.getByName("127.0.0.1");
InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
- Instance instance = new Instance(ActorRef.noSender(), connection, new InstanceID(), hardwareDescription, 4);
+ Instance instance = new Instance(DummyInstanceGateway.INSTANCE, connection, new InstanceID(), hardwareDescription, 4);
assertEquals(4, instance.getTotalNumberOfSlots());
assertEquals(4, instance.getNumberOfAvailableSlots());
@@ -100,7 +99,7 @@ public class InstanceTest {
InetAddress address = InetAddress.getByName("127.0.0.1");
InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
- Instance instance = new Instance(ActorRef.noSender(), connection, new InstanceID(), hardwareDescription, 3);
+ Instance instance = new Instance(DummyInstanceGateway.INSTANCE, connection, new InstanceID(), hardwareDescription, 3);
assertEquals(3, instance.getNumberOfAvailableSlots());
@@ -130,7 +129,7 @@ public class InstanceTest {
InetAddress address = InetAddress.getByName("127.0.0.1");
InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
- Instance instance = new Instance(ActorRef.noSender(), connection, new InstanceID(), hardwareDescription, 3);
+ Instance instance = new Instance(DummyInstanceGateway.INSTANCE, connection, new InstanceID(), hardwareDescription, 3);
assertEquals(3, instance.getNumberOfAvailableSlots());
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
index e075ab6..29ec7b5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
@@ -23,8 +23,6 @@ import static org.junit.Assert.*;
import java.net.InetAddress;
-import akka.actor.ActorRef;
-
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.api.common.JobID;
@@ -148,7 +146,7 @@ public class SimpleSlotTest {
InetAddress address = InetAddress.getByName("127.0.0.1");
InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
- Instance instance = new Instance(ActorRef.noSender(), connection, new InstanceID(), hardwareDescription, 1);
+ Instance instance = new Instance(DummyInstanceGateway.INSTANCE, connection, new InstanceID(), hardwareDescription, 1);
return instance.allocateSimpleSlot(new JobID());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index 7739dea..6affdcc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Test;
import org.mockito.Mockito;
import scala.Some;
@@ -58,7 +59,10 @@ public class NetworkEnvironmentTest {
NUM_BUFFERS, BUFFER_SIZE, IOManager.IOMode.SYNC, new Some<NettyConfig>(nettyConf),
new Tuple2<Integer, Integer>(0, 0));
- NetworkEnvironment env = new NetworkEnvironment(new FiniteDuration(30, TimeUnit.SECONDS), config);
+ NetworkEnvironment env = new NetworkEnvironment(
+ TestingUtils.defaultExecutionContext(),
+ new FiniteDuration(30, TimeUnit.SECONDS),
+ config);
assertFalse(env.isShutdown());
assertFalse(env.isAssociated());
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
index a15e477..676b2a3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
@@ -27,41 +27,22 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.Test;
public class ScheduleWithCoLocationHintTest {
- private static ActorSystem system;
-
- @BeforeClass
- public static void setup(){
- system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
- TestingUtils.setCallingThreadDispatcher(system);
- }
-
- @AfterClass
- public static void teardown(){
- TestingUtils.setGlobalExecutionContext();
- JavaTestKit.shutdownActorSystem(system);
- }
-
@Test
public void scheduleAllSharedAndCoLocated() {
try {
JobVertexID jid1 = new JobVertexID();
JobVertexID jid2 = new JobVertexID();
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
scheduler.newInstanceAvailable(getRandomInstance(2));
scheduler.newInstanceAvailable(getRandomInstance(2));
@@ -187,7 +168,7 @@ public class ScheduleWithCoLocationHintTest {
JobVertexID jid3 = new JobVertexID();
JobVertexID jid4 = new JobVertexID();
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
Instance i1 = getRandomInstance(1);
Instance i2 = getRandomInstance(1);
@@ -231,7 +212,7 @@ public class ScheduleWithCoLocationHintTest {
JobVertexID jid2 = new JobVertexID();
JobVertexID jid3 = new JobVertexID();
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
Instance i1 = getRandomInstance(1);
Instance i2 = getRandomInstance(1);
@@ -276,7 +257,7 @@ public class ScheduleWithCoLocationHintTest {
JobVertexID jid3 = new JobVertexID();
JobVertexID jid4 = new JobVertexID();
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
scheduler.newInstanceAvailable(getRandomInstance(1));
scheduler.newInstanceAvailable(getRandomInstance(1));
scheduler.newInstanceAvailable(getRandomInstance(1));
@@ -338,7 +319,7 @@ public class ScheduleWithCoLocationHintTest {
JobVertexID jid2 = new JobVertexID();
JobVertexID jid3 = new JobVertexID();
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
Instance i1 = getRandomInstance(1);
Instance i2 = getRandomInstance(1);
@@ -407,7 +388,7 @@ public class ScheduleWithCoLocationHintTest {
JobVertexID jid1 = new JobVertexID();
JobVertexID jid2 = new JobVertexID();
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
Instance i1 = getRandomInstance(1);
Instance i2 = getRandomInstance(1);
@@ -461,7 +442,7 @@ public class ScheduleWithCoLocationHintTest {
JobVertexID jid2 = new JobVertexID();
JobVertexID jidx = new JobVertexID();
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
Instance i1 = getRandomInstance(1);
Instance i2 = getRandomInstance(1);
@@ -519,7 +500,7 @@ public class ScheduleWithCoLocationHintTest {
JobVertexID jid1 = new JobVertexID();
JobVertexID jid2 = new JobVertexID();
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
Instance i1 = getRandomInstance(1);
Instance i2 = getRandomInstance(1);
@@ -581,7 +562,7 @@ public class ScheduleWithCoLocationHintTest {
JobVertexID jid1 = new JobVertexID();
JobVertexID jid2 = new JobVertexID();
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
Instance i1 = getRandomInstance(1);
Instance i2 = getRandomInstance(1);
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index d19299b..2ee53d9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -25,11 +25,7 @@ import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.g
import static org.junit.Assert.*;
import org.apache.flink.runtime.instance.SimpleSlot;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.Test;
import java.util.ArrayList;
@@ -47,24 +43,11 @@ import org.apache.flink.runtime.instance.Instance;
* Tests for the {@link Scheduler} when scheduling individual tasks.
*/
public class SchedulerIsolatedTasksTest {
- private static ActorSystem system;
- @BeforeClass
- public static void setup(){
- system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
- TestingUtils.setCallingThreadDispatcher(system);
- }
-
- @AfterClass
- public static void teardown(){
- TestingUtils.setGlobalExecutionContext();
- JavaTestKit.shutdownActorSystem(system);
- }
-
@Test
public void testAddAndRemoveInstance() {
try {
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
Instance i1 = getRandomInstance(2);
Instance i2 = getRandomInstance(2);
@@ -128,7 +111,7 @@ public class SchedulerIsolatedTasksTest {
@Test
public void testScheduleImmediately() {
try {
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
assertEquals(0, scheduler.getNumberOfAvailableSlots());
scheduler.newInstanceAvailable(getRandomInstance(2));
@@ -197,12 +180,10 @@ public class SchedulerIsolatedTasksTest {
final int NUM_SLOTS_PER_INSTANCE = 3;
final int NUM_TASKS_TO_SCHEDULE = 2000;
- TestingUtils.setGlobalExecutionContext();
-
try {
// note: since this test asynchronously releases slots, the executor needs release workers.
// doing the release call synchronous can lead to a deadlock
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
for (int i = 0; i < NUM_INSTANCES; i++) {
scheduler.newInstanceAvailable(getRandomInstance((int) (Math.random() * NUM_SLOTS_PER_INSTANCE) + 1));
@@ -287,15 +268,13 @@ public class SchedulerIsolatedTasksTest {
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
- } finally {
- TestingUtils.setCallingThreadDispatcher(system);
}
}
@Test
public void testScheduleWithDyingInstances() {
try {
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
Instance i1 = getRandomInstance(2);
Instance i2 = getRandomInstance(2);
@@ -355,7 +334,7 @@ public class SchedulerIsolatedTasksTest {
@Test
public void testSchedulingLocation() {
try {
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
Instance i1 = getRandomInstance(2);
Instance i2 = getRandomInstance(2);