You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/13 18:54:08 UTC
[5/7] flink git commit: [FLINK-2329] [runtime] Introduces
InstanceGateway as an abstraction to communicate with the TaskManager.
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
index f987e07..3e90123 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
@@ -32,11 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.runtime.instance.SimpleSlot;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -45,21 +41,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
* Tests for the scheduler when scheduling tasks in slot sharing groups.
*/
public class SchedulerSlotSharingTest {
- private static ActorSystem system;
- @BeforeClass
- public static void setup(){
- system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
- TestingUtils.setCallingThreadDispatcher(system);
- }
-
- @AfterClass
- public static void teardown(){
- TestingUtils.setGlobalExecutionContext();
- JavaTestKit.shutdownActorSystem(system);
- }
-
-
@Test
public void scheduleSingleVertexType() {
try {
@@ -67,7 +49,7 @@ public class SchedulerSlotSharingTest {
SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1);
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
Instance i1 = getRandomInstance(2);
Instance i2 = getRandomInstance(2);
scheduler.newInstanceAvailable(i1);
@@ -154,7 +136,7 @@ public class SchedulerSlotSharingTest {
SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
scheduler.newInstanceAvailable(getRandomInstance(2));
scheduler.newInstanceAvailable(getRandomInstance(2));
@@ -274,7 +256,7 @@ public class SchedulerSlotSharingTest {
SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
scheduler.newInstanceAvailable(getRandomInstance(2));
scheduler.newInstanceAvailable(getRandomInstance(2));
@@ -339,7 +321,7 @@ public class SchedulerSlotSharingTest {
SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3);
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
scheduler.newInstanceAvailable(getRandomInstance(2));
scheduler.newInstanceAvailable(getRandomInstance(2));
@@ -450,7 +432,7 @@ public class SchedulerSlotSharingTest {
SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
scheduler.newInstanceAvailable(getRandomInstance(2));
// schedule 1 tasks from the first vertex group and 2 from the second
@@ -502,7 +484,7 @@ public class SchedulerSlotSharingTest {
SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
scheduler.newInstanceAvailable(getRandomInstance(3));
scheduler.newInstanceAvailable(getRandomInstance(2));
@@ -649,7 +631,7 @@ public class SchedulerSlotSharingTest {
Instance i1 = getRandomInstance(2);
Instance i2 = getRandomInstance(2);
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
scheduler.newInstanceAvailable(i1);
scheduler.newInstanceAvailable(i2);
@@ -699,7 +681,7 @@ public class SchedulerSlotSharingTest {
Instance i1 = getRandomInstance(2);
Instance i2 = getRandomInstance(2);
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
scheduler.newInstanceAvailable(i1);
scheduler.newInstanceAvailable(i2);
@@ -749,7 +731,7 @@ public class SchedulerSlotSharingTest {
Instance i1 = getRandomInstance(2);
Instance i2 = getRandomInstance(2);
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
scheduler.newInstanceAvailable(i1);
scheduler.newInstanceAvailable(i2);
@@ -795,7 +777,6 @@ public class SchedulerSlotSharingTest {
@Test
public void testSequentialAllocateAndRelease() {
- TestingUtils.setGlobalExecutionContext();
try {
final JobVertexID jid1 = new JobVertexID();
final JobVertexID jid2 = new JobVertexID();
@@ -804,7 +785,7 @@ public class SchedulerSlotSharingTest {
final SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3, jid4);
- final Scheduler scheduler = new Scheduler();
+ final Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
scheduler.newInstanceAvailable(getRandomInstance(4));
// allocate something from group 1 and 2 interleaved with schedule for group 3
@@ -853,15 +834,13 @@ public class SchedulerSlotSharingTest {
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
- }finally{
- TestingUtils.setCallingThreadDispatcher(system);
}
}
@Test
public void testConcurrentAllocateAndRelease() {
final ExecutorService executor = Executors.newFixedThreadPool(20);
- TestingUtils.setGlobalExecutionContext();
+
try {
for (int run = 0; run < 50; run++) {
final JobVertexID jid1 = new JobVertexID();
@@ -871,7 +850,7 @@ public class SchedulerSlotSharingTest {
final SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3, jid4);
- final Scheduler scheduler = new Scheduler();
+ final Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
scheduler.newInstanceAvailable(getRandomInstance(4));
final AtomicInteger enumerator1 = new AtomicInteger();
@@ -1030,10 +1009,6 @@ public class SchedulerSlotSharingTest {
e.printStackTrace();
fail(e.getMessage());
}
- finally {
- executor.shutdownNow();
- TestingUtils.setCallingThreadDispatcher(system);
- }
}
@Test
@@ -1046,7 +1021,7 @@ public class SchedulerSlotSharingTest {
SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3, jid4);
- Scheduler scheduler = new Scheduler();
+ Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
scheduler.newInstanceAvailable(getRandomInstance(4));
// schedule one task for the first and second vertex
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index 694b88b..2de0635 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -29,9 +29,9 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicInteger;
-import akka.actor.ActorRef;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.DummyInstanceGateway;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
@@ -66,7 +66,7 @@ public class SchedulerTestUtils {
final long GB = 1024L*1024*1024;
HardwareDescription resources = new HardwareDescription(4, 4*GB, 3*GB, 2*GB);
- return new Instance(ActorRef.noSender(), ci, new InstanceID(), resources, numSlots);
+ return new Instance(DummyInstanceGateway.INSTANCE, ci, new InstanceID(), resources, numSlots);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index a67cd00..442ddcf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Test;
import scala.Option;
import scala.Tuple2;
@@ -89,7 +90,10 @@ public class TaskManagerComponentsStartupShutdownTest {
final MemoryManager memManager = new DefaultMemoryManager(32 * BUFFER_SIZE, 1, BUFFER_SIZE, false);
final IOManager ioManager = new IOManagerAsync(TMP_DIR);
- final NetworkEnvironment network = new NetworkEnvironment(timeout, netConf);
+ final NetworkEnvironment network = new NetworkEnvironment(
+ TestingUtils.defaultExecutionContext(),
+ timeout,
+ netConf);
final int numberOfSlots = 1;
// create the task manager
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
index a1ca43c..b3090e6 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
@@ -18,36 +18,27 @@
package org.apache.flink.runtime.executiongraph
-import akka.actor.{Props, ActorSystem}
-import akka.testkit.TestKit
import org.apache.flink.api.common.JobID
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleInstanceGateway
import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, JobVertex}
import org.apache.flink.runtime.jobmanager.Tasks
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
import org.apache.flink.runtime.testingUtils.TestingUtils
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+import org.scalatest.{Matchers, WordSpecLike}
-class ExecutionGraphRestartTest(_system: ActorSystem) extends TestKit(_system) with WordSpecLike
-with Matchers with BeforeAndAfterAll {
-
- def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig))
-
- override def afterAll(): Unit = {
- TestKit.shutdownActorSystem(system)
- }
+class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
val NUM_TASKS = 31
"The execution graph" must {
"be manually restartable" in {
try {
- val tm = system.actorOf(Props(classOf[ExecutionGraphTestUtils
- .SimpleAcknowledgingTaskManager], "TaskManager"))
- val instance = ExecutionGraphTestUtils.getInstance(tm)
+ val instance = ExecutionGraphTestUtils.getInstance(
+ new SimpleInstanceGateway(TestingUtils.directExecutionContext))
- val scheduler = new Scheduler
+ val scheduler = new Scheduler(TestingUtils.defaultExecutionContext)
scheduler.newInstanceAvailable(instance)
val sender = new JobVertex("Task")
@@ -56,7 +47,11 @@ with Matchers with BeforeAndAfterAll {
val jobGraph = new JobGraph("Pointwise job", sender)
- val eg = new ExecutionGraph(new JobID(), "test job", new Configuration(),
+ val eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext,
+ new JobID(),
+ "test job",
+ new Configuration(),
AkkaUtils.getDefaultTimeout)
eg.setNumberOfRetriesLeft(0)
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources)
@@ -87,11 +82,10 @@ with Matchers with BeforeAndAfterAll {
"restart itself automatically" in {
try {
- val tm = system.actorOf(Props
- (classOf[ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager], "TaskManager"))
- val instance = ExecutionGraphTestUtils.getInstance(tm)
+ val instance = ExecutionGraphTestUtils.getInstance(
+ new SimpleInstanceGateway(TestingUtils.directExecutionContext))
- val scheduler = new Scheduler
+ val scheduler = new Scheduler(TestingUtils.defaultExecutionContext)
scheduler.newInstanceAvailable(instance)
val sender = new JobVertex("Task")
@@ -100,7 +94,11 @@ with Matchers with BeforeAndAfterAll {
val jobGraph = new JobGraph("Pointwise job", sender)
- val eg = new ExecutionGraph(new JobID(), "Test job", new Configuration(),
+ val eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext,
+ new JobID(),
+ "Test job",
+ new Configuration(),
AkkaUtils.getDefaultTimeout)
eg.setNumberOfRetriesLeft(1)
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources)
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
index 13199bc..aaa0025 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -18,38 +18,27 @@
package org.apache.flink.runtime.executiongraph
-import akka.actor.{Props, ActorSystem}
-import akka.testkit.TestKit
import org.apache.flink.api.common.JobID
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils
-.SimpleAcknowledgingTaskManager
+import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleInstanceGateway
import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, JobVertex}
import org.apache.flink.runtime.jobmanager.Tasks
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
import org.apache.flink.runtime.testingUtils.TestingUtils
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+import org.scalatest.{Matchers, WordSpecLike}
-class TaskManagerLossFailsTasksTest(_system: ActorSystem) extends TestKit(_system) with
-WordSpecLike with Matchers with BeforeAndAfterAll {
-
- def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig))
-
- override def afterAll(): Unit = {
- TestKit.shutdownActorSystem(system)
- }
+class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers {
"A task manager loss" must {
"fail the assigned tasks" in {
try {
- val tm1 = system.actorOf(Props(classOf[SimpleAcknowledgingTaskManager], "TaskManager1"))
- val tm2 = system.actorOf(Props(classOf[SimpleAcknowledgingTaskManager], "TaskManager2"))
-
- val instance1 = ExecutionGraphTestUtils.getInstance(tm1, 10)
- val instance2 = ExecutionGraphTestUtils.getInstance(tm2, 10)
+ val instance1 = ExecutionGraphTestUtils.getInstance(
+ new SimpleInstanceGateway(TestingUtils.defaultExecutionContext), 10)
+ val instance2 = ExecutionGraphTestUtils.getInstance(
+ new SimpleInstanceGateway(TestingUtils.defaultExecutionContext), 10)
- val scheduler = new Scheduler
+ val scheduler = new Scheduler(TestingUtils.defaultExecutionContext)
scheduler.newInstanceAvailable(instance1)
scheduler.newInstanceAvailable(instance2)
@@ -59,7 +48,11 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
val jobGraph = new JobGraph("Pointwise job", sender)
- val eg = new ExecutionGraph(new JobID(), "test job", new Configuration(),
+ val eg = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext,
+ new JobID(),
+ "test job",
+ new Configuration(),
AkkaUtils.getDefaultTimeout)
eg.setNumberOfRetriesLeft(0)
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources)
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index 766ea55..f7bf56a 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -177,12 +177,12 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
jm ! NotifyWhenJobStatus(jobGraph.getJobID, JobStatus.RESTARTING)
jm ! RequestWorkingTaskManager(jobGraph.getJobID)
- val WorkingTaskManager(tm) = expectMsgType[WorkingTaskManager]
+ val WorkingTaskManager(gatewayOption) = expectMsgType[WorkingTaskManager]
- tm match {
- case ActorRef.noSender => fail("There has to be at least one task manager on which" +
+ gatewayOption match {
+ case None => fail("There has to be at least one task manager on which" +
"the tasks are running.")
- case t => t ! PoisonPill
+ case Some(gateway) => gateway.tell(PoisonPill)
}
expectMsg(JobStatusIs(jobGraph.getJobID, JobStatus.RESTARTING))
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 11e93d6..219e5ae 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -60,16 +60,34 @@ class TestingCluster(userConfiguration: Configuration,
override def startJobManager(actorSystem: ActorSystem): ActorRef = {
- val (instanceManager, scheduler, libraryCacheManager, _, accumulatorManager,
- executionRetries, delayBetweenRetries,
- timeout, archiveCount) = JobManager.createJobManagerComponents(configuration)
+ val (executionContext,
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ _,
+ accumulatorManager,
+ executionRetries,
+ delayBetweenRetries,
+ timeout,
+ archiveCount) = JobManager.createJobManagerComponents(configuration)
val testArchiveProps = Props(new MemoryArchivist(archiveCount) with TestingMemoryArchivist)
val archive = actorSystem.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME)
- val jobManagerProps = Props(new JobManager(configuration, instanceManager, scheduler,
- libraryCacheManager, archive, accumulatorManager, executionRetries,
- delayBetweenRetries, timeout, streamingMode) with TestingJobManager)
+ val jobManagerProps = Props(
+ new JobManager(
+ configuration,
+ executionContext,
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ archive,
+ accumulatorManager,
+ executionRetries,
+ delayBetweenRetries,
+ timeout,
+ streamingMode)
+ with TestingJobManager)
actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 89e1d72..5747b7e 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -106,11 +106,10 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala {
case NotifyWhenJobRemoved(jobID) =>
- val tms = instanceManager.getAllRegisteredInstances.map(_.getTaskManager)
+ val gateways = instanceManager.getAllRegisteredInstances.map(_.getInstanceGateway)
- val responses = tms.map{
- tm =>
- (tm ? NotifyWhenJobRemoved(jobID))(timeout).mapTo[Boolean]
+ val responses = gateways.map{
+ gateway => gateway.ask(NotifyWhenJobRemoved(jobID), timeout).mapTo[Boolean]
}
import context.dispatcher
@@ -135,17 +134,17 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala {
currentJobs.get(jobID) match {
case Some((eg, _)) =>
if(eg.getAllExecutionVertices.isEmpty){
- sender ! WorkingTaskManager(ActorRef.noSender)
+ sender ! WorkingTaskManager(None)
} else {
val resource = eg.getAllExecutionVertices.head.getCurrentAssignedResource
if(resource == null){
- sender ! WorkingTaskManager(ActorRef.noSender)
+ sender ! WorkingTaskManager(None)
} else {
- sender ! WorkingTaskManager(resource.getInstance().getTaskManager)
+ sender ! WorkingTaskManager(Some(resource.getInstance().getInstanceGateway))
}
}
- case None => sender ! WorkingTaskManager(ActorRef.noSender)
+ case None => sender ! WorkingTaskManager(None)
}
case NotifyWhenJobStatus(jobID, state) =>
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index f810749..241c6c0 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.testingUtils
import akka.actor.ActorRef
import org.apache.flink.api.common.JobID
import org.apache.flink.runtime.executiongraph.ExecutionGraph
+import org.apache.flink.runtime.instance.InstanceGateway
import org.apache.flink.runtime.jobgraph.JobStatus
object TestingJobManagerMessages {
@@ -43,7 +44,7 @@ object TestingJobManagerMessages {
case class NotifyWhenJobRemoved(jobID: JobID)
case class RequestWorkingTaskManager(jobID: JobID)
- case class WorkingTaskManager(taskManager: ActorRef)
+ case class WorkingTaskManager(gatewayOption: Option[InstanceGateway])
case class NotifyWhenJobStatus(jobID: JobID, state: JobStatus)
case class JobStatusIs(jobID: JobID, state: JobStatus)
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 3611633..914f37c 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -18,14 +18,12 @@
package org.apache.flink.runtime.testingUtils
-import akka.actor.{ActorRef, ActorSystem}
-import akka.testkit.CallingThreadDispatcher
+import com.google.common.util.concurrent.MoreExecutors
import com.typesafe.config.ConfigFactory
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ActionQueue
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
@@ -67,19 +65,33 @@ object TestingUtils {
new TestingCluster(config)
}
- def setGlobalExecutionContext(): Unit = {
- AkkaUtils.globalExecutionContext = ExecutionContext.global
+ /** Returns the global [[ExecutionContext]] which is a [[scala.concurrent.forkjoin.ForkJoinPool]]
+ * with a default parallelism equal to the number of available cores.
+ *
+ * @return ExecutionContext.global
+ */
+ def defaultExecutionContext = ExecutionContext.global
+
+ /** Returns an [[ExecutionContext]] which uses the current thread to execute the runnable.
+ *
+ * @return Direct [[ExecutionContext]] which executes runnables directly
+ */
+ def directExecutionContext = ExecutionContext.fromExecutor(MoreExecutors.directExecutor())
+
+ /** @return A new [[QueuedActionExecutionContext]] */
+ def queuedActionExecutionContext = {
+ new QueuedActionExecutionContext(new ActionQueue())
}
- def setCallingThreadDispatcher(system: ActorSystem): Unit = {
- AkkaUtils.globalExecutionContext = system.dispatchers.lookup(CallingThreadDispatcher.Id)
- }
-
- def setExecutionContext(context: ExecutionContext): Unit = {
- AkkaUtils.globalExecutionContext = context
- }
+ /** [[ExecutionContext]] which queues [[Runnable]] up in an [[ActionQueue]] instead of
+ * execution them. If the automatic execution mode is activated, then the [[Runnable]] are
+ * executed.
+ *
+ * @param actionQueue
+ */
+ class QueuedActionExecutionContext private[testingUtils] (val actionQueue: ActionQueue)
+ extends ExecutionContext {
- class QueuedActionExecutionContext(queue: ActionQueue) extends ExecutionContext {
var automaticExecution = false
def toggleAutomaticExecution() = {
@@ -90,7 +102,7 @@ object TestingUtils {
if(automaticExecution){
runnable.run()
}else {
- queue.queueAction(runnable)
+ actionQueue.queueAction(runnable)
}
}
@@ -98,4 +110,26 @@ object TestingUtils {
t.printStackTrace()
}
}
+
+ /** Queue which stores [[Runnable]] */
+ class ActionQueue {
+ private val runnables = scala.collection.mutable.Queue[Runnable]()
+
+ def triggerNextAction {
+ val r = runnables.dequeue
+ r.run()
+ }
+
+ def popNextAction: Runnable = {
+ runnables.dequeue()
+ }
+
+ def queueAction(r: Runnable) {
+ runnables.enqueue(r)
+ }
+
+ def isEmpty: Boolean = {
+ runnables.isEmpty
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index a0db4d6..e108970 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -30,8 +30,8 @@ import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StreamingMode;
-import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.hadoop.fs.FileSystem;
import org.junit.Assert;
import org.slf4j.Logger;
@@ -148,7 +148,7 @@ public class TestBaseUtils {
}
Future<Iterable<Object>> bcVariableManagerFutureResponses = Futures.sequence(
- bcVariableManagerResponseFutures, AkkaUtils.globalExecutionContext());
+ bcVariableManagerResponseFutures, TestingUtils.defaultExecutionContext());
Iterable<Object> responses = Await.result(bcVariableManagerFutureResponses, timeout);
@@ -158,7 +158,7 @@ public class TestBaseUtils {
}
Future<Iterable<Object>> numActiveConnectionsFutureResponses = Futures.sequence(
- numActiveConnectionsResponseFutures, AkkaUtils.globalExecutionContext());
+ numActiveConnectionsResponseFutures, TestingUtils.defaultExecutionContext());
responses = Await.result(numActiveConnectionsFutureResponses, timeout);
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 796ea09..0534178 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -80,16 +80,38 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration,
override def startJobManager(actorSystem: ActorSystem): ActorRef = {
- val (instanceManager, scheduler, libraryCacheManager, _, accumulatorManager,
- executionRetries, delayBetweenRetries,
- timeout, archiveCount) = JobManager.createJobManagerComponents(configuration)
+ val (executionContext,
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ _,
+ accumulatorManager,
+ executionRetries,
+ delayBetweenRetries,
+ timeout,
+ archiveCount) = JobManager.createJobManagerComponents(configuration)
+
+ val testArchiveProps = Props(
+ new MemoryArchivist(
+ archiveCount)
+ with TestingMemoryArchivist)
- val testArchiveProps = Props(new MemoryArchivist(archiveCount) with TestingMemoryArchivist)
val archive = actorSystem.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME)
- val jobManagerProps = Props(new JobManager(configuration, instanceManager, scheduler,
- libraryCacheManager, archive, accumulatorManager, executionRetries,
- delayBetweenRetries, timeout, streamingMode) with TestingJobManager)
+ val jobManagerProps = Props(
+ new JobManager(
+ configuration,
+ executionContext,
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ archive,
+ accumulatorManager,
+ executionRetries,
+ delayBetweenRetries,
+ timeout,
+ streamingMode)
+ with TestingJobManager)
val jobManager = actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index 1884fab..c20f621 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -110,9 +110,15 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
jm ! RequestWorkingTaskManager(jobID)
- val tm = expectMsgType[WorkingTaskManager].taskManager
- // kill one task manager
- tm ! PoisonPill
+ val gatewayOption = expectMsgType[WorkingTaskManager].gatewayOption
+
+ gatewayOption match {
+ case Some(gateway) =>
+ // kill one task manager
+ gateway.tell(PoisonPill)
+
+ case None => fail("Could not retrieve a working task manager.")
+ }
val failure = expectMsgType[Failure]
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index 5dd197d..8cfeead 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -235,16 +235,34 @@ object ApplicationMaster {
// start all the components inside the job manager
LOG.debug("Starting JobManager components")
- val (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager,
- executionRetries, delayBetweenRetries,
- timeout, _) = JobManager.createJobManagerComponents(configuration)
+ val (executionContext,
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ archiveProps,
+ accumulatorManager,
+ executionRetries,
+ delayBetweenRetries,
+ timeout,
+ _) = JobManager.createJobManagerComponents(configuration)
// start the archiver
val archiver: ActorRef = jobManagerSystem.actorOf(archiveProps, JobManager.ARCHIVE_NAME)
- val jobManagerProps = Props(new JobManager(configuration, instanceManager, scheduler,
- libraryCacheManager, archiver, accumulatorManager, executionRetries,
- delayBetweenRetries, timeout, streamingMode) with ApplicationMasterActor)
+ val jobManagerProps = Props(
+ new JobManager(
+ configuration,
+ executionContext,
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ archiver,
+ accumulatorManager,
+ executionRetries,
+ delayBetweenRetries,
+ timeout,
+ streamingMode)
+ with ApplicationMasterActor)
LOG.debug("Starting JobManager actor")
val jobManager = JobManager.startActor(jobManagerProps, jobManagerSystem)
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
index 411808b..3fb5e30 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
@@ -103,7 +103,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
instanceManager.getAllRegisteredInstances.asScala foreach {
instance =>
- instance.getTaskManager ! StopYarnSession(status, diag)
+ instance.getInstanceGateway.tell(StopYarnSession(status, diag))
}
rmClientOption foreach {