You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/08 15:17:48 UTC
[3/4] flink git commit: [FLINK-4458] Replace ForkableFlinkMiniCluster
by LocalFlinkMiniCluster
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
deleted file mode 100644
index a6963fe..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import akka.actor.{ActorRef, Terminated}
-import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.FlinkActor
-import org.apache.flink.runtime.execution.ExecutionState
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import org.apache.flink.runtime.messages.JobManagerMessages.{RequestLeaderSessionID, ResponseLeaderSessionID}
-import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
-import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered}
-import org.apache.flink.runtime.messages.TaskMessages.{SubmitTask, TaskInFinalState, UpdateTaskExecutionState}
-import org.apache.flink.runtime.taskmanager.TaskManager
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
-import org.apache.flink.runtime.testingUtils.TestingMessages._
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
-
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-/** This mixin can be used to decorate a TaskManager with messages for testing purposes. */
-trait TestingTaskManagerLike extends FlinkActor {
- that: TaskManager =>
-
- import scala.collection.JavaConverters._
-
- val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
- val waitForJobManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
- val waitForRegisteredAtResourceManager =
- scala.collection.mutable.HashMap[ActorRef, Set[ActorRef]]()
- val waitForRunning = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
- val unregisteredTasks = scala.collection.mutable.HashSet[ExecutionAttemptID]()
-
- /** Map of registered task submit listeners */
- val registeredSubmitTaskListeners = scala.collection.mutable.HashMap[JobID, ActorRef]()
-
- val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]()
-
- var disconnectDisabled = false
-
- /**
- * Handler for testing related messages
- */
- abstract override def handleMessage: Receive = {
- handleTestingMessage orElse super.handleMessage
- }
-
- def handleTestingMessage: Receive = {
- case Alive => sender() ! Acknowledge
-
- case NotifyWhenTaskIsRunning(executionID) =>
- Option(runningTasks.get(executionID)) match {
- case Some(task) if task.getExecutionState == ExecutionState.RUNNING =>
- sender ! decorateMessage(true)
-
- case _ =>
- val listeners = waitForRunning.getOrElse(executionID, Set())
- waitForRunning += (executionID -> (listeners + sender))
- }
-
- case RequestRunningTasks =>
- sender ! decorateMessage(ResponseRunningTasks(runningTasks.asScala.toMap))
-
- case NotifyWhenTaskRemoved(executionID) =>
- Option(runningTasks.get(executionID)) match {
- case Some(_) =>
- val set = waitForRemoval.getOrElse(executionID, Set())
- waitForRemoval += (executionID -> (set + sender))
- case None =>
- if(unregisteredTasks.contains(executionID)) {
- sender ! decorateMessage(true)
- } else {
- val set = waitForRemoval.getOrElse(executionID, Set())
- waitForRemoval += (executionID -> (set + sender))
- }
- }
-
- case TaskInFinalState(executionID) =>
- super.handleMessage(TaskInFinalState(executionID))
- waitForRemoval.remove(executionID) match {
- case Some(actors) => for(actor <- actors) actor ! decorateMessage(true)
- case None =>
- }
-
- unregisteredTasks += executionID
-
- case RequestBroadcastVariablesWithReferences =>
- sender ! decorateMessage(
- ResponseBroadcastVariablesWithReferences(
- bcVarManager.getNumberOfVariablesWithReferences)
- )
-
- case RequestNumActiveConnections =>
- val numActive = if (!network.isShutdown) {
- network.getConnectionManager.getNumberOfActiveConnections
- } else {
- 0
- }
- sender ! decorateMessage(ResponseNumActiveConnections(numActive))
-
- case NotifyWhenJobRemoved(jobID) =>
- if(runningTasks.values.asScala.exists(_.getJobID == jobID)){
- context.system.scheduler.scheduleOnce(
- 200 milliseconds,
- self,
- decorateMessage(CheckIfJobRemoved(jobID)))(
- context.dispatcher,
- sender()
- )
- }else{
- sender ! decorateMessage(true)
- }
-
- case CheckIfJobRemoved(jobID) =>
- if(runningTasks.values.asScala.forall(_.getJobID != jobID)){
- sender ! decorateMessage(true)
- } else {
- context.system.scheduler.scheduleOnce(
- 200 milliseconds,
- self,
- decorateMessage(CheckIfJobRemoved(jobID)))(
- context.dispatcher,
- sender()
- )
- }
-
- case NotifyWhenJobManagerTerminated(jobManager) =>
- val waiting = waitForJobManagerToBeTerminated.getOrElse(jobManager.path.name, Set())
- waitForJobManagerToBeTerminated += jobManager.path.name -> (waiting + sender)
-
- case RegisterSubmitTaskListener(jobId) =>
- registeredSubmitTaskListeners.put(jobId, sender())
-
- case msg@SubmitTask(tdd) =>
- registeredSubmitTaskListeners.get(tdd.getJobID) match {
- case Some(listenerRef) =>
- listenerRef ! ResponseSubmitTaskListener(tdd)
- case None =>
- // Nothing to do
- }
-
- super.handleMessage(msg)
-
- /**
- * Message from task manager that accumulator values changed and need to be reported immediately
- * instead of lazily through the
- * [[org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat]] message. We forward this
- * message to the job manager that it knows it should report to the listeners.
- */
- case msg: AccumulatorsChanged =>
- currentJobManager match {
- case Some(jobManager) =>
- jobManager.forward(msg)
- sendHeartbeatToJobManager()
- sender ! true
- case None =>
- }
-
- case msg@Terminated(jobManager) =>
- super.handleMessage(msg)
-
- waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach {
- _ foreach {
- _ ! decorateMessage(JobManagerTerminated(jobManager))
- }
- }
-
- case msg:Disconnect =>
- if (!disconnectDisabled) {
- super.handleMessage(msg)
-
- val jobManager = sender()
-
- waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach {
- _ foreach {
- _ ! decorateMessage(JobManagerTerminated(jobManager))
- }
- }
- }
-
- case DisableDisconnect =>
- disconnectDisabled = true
-
- case NotifyOfComponentShutdown =>
- waitForShutdown += sender()
-
- case msg @ UpdateTaskExecutionState(taskExecutionState) =>
- super.handleMessage(msg)
-
- if(taskExecutionState.getExecutionState == ExecutionState.RUNNING) {
- waitForRunning.get(taskExecutionState.getID) foreach {
- _ foreach (_ ! decorateMessage(true))
- }
- }
-
- case RequestLeaderSessionID =>
- sender() ! ResponseLeaderSessionID(leaderSessionID.orNull)
-
- case NotifyWhenRegisteredAtJobManager(jobManager: ActorRef) =>
- if(isConnected && jobManager == currentJobManager.get) {
- sender() ! true
- } else {
- val list = waitForRegisteredAtResourceManager.getOrElse(
- jobManager,
- Set[ActorRef]())
-
- waitForRegisteredAtResourceManager += jobManager -> (list + sender())
- }
-
- case msg @ (_: AcknowledgeRegistration | _: AlreadyRegistered) =>
- super.handleMessage(msg)
-
- val jm = sender()
-
- waitForRegisteredAtResourceManager.remove(jm).foreach {
- listeners => listeners.foreach{
- listener =>
- listener ! true
- }
- }
- }
-
- /**
- * No killing of the VM for testing.
- */
- override protected def shutdown(): Unit = {
- log.info("Shutting down TestingJobManager.")
- waitForShutdown.foreach(_ ! ComponentShutdown(self))
- waitForShutdown.clear()
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
deleted file mode 100644
index 974e4e8..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import akka.actor.ActorRef
-import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import org.apache.flink.runtime.taskmanager.Task
-
-/**
- * Additional messages that the [[TestingTaskManager]] understands.
- */
-object TestingTaskManagerMessages {
-
- case class NotifyWhenTaskRemoved(executionID: ExecutionAttemptID)
-
- case class NotifyWhenTaskIsRunning(executionID: ExecutionAttemptID)
-
- case class ResponseRunningTasks(tasks: Map[ExecutionAttemptID, Task]){
- import collection.JavaConverters._
- def asJava: java.util.Map[ExecutionAttemptID, Task] = tasks.asJava
- }
-
- case class ResponseBroadcastVariablesWithReferences(number: Int)
-
- case object RequestNumActiveConnections
- case class ResponseNumActiveConnections(number: Int)
-
- case object RequestRunningTasks
-
- case object RequestBroadcastVariablesWithReferences
-
- case class NotifyWhenJobManagerTerminated(jobManager: ActorRef)
-
- case class JobManagerTerminated(jobManager: ActorRef)
-
- case class NotifyWhenRegisteredAtJobManager(resourceManager: ActorRef)
-
- /**
- * Message to give a hint to the task manager that accumulator values were updated in the task.
- * This message is forwarded to the job manager which knows that it needs to notify listeners
- * of accumulator updates.
- */
- case class AccumulatorsChanged(jobID: JobID)
-
- /**
- * Registers a listener for all [[org.apache.flink.runtime.messages.TaskMessages.SubmitTask]]
- * messages of the given job.
- *
- * If a task is submitted with the given job ID the task deployment
- * descriptor is forwarded to the listener.
- *
- * @param jobId The job ID to listen for.
- */
- case class RegisterSubmitTaskListener(jobId: JobID)
-
- /**
- * A response to a listened job ID containing the submitted task deployment descriptor.
- *
- * @param tdd The submitted task deployment descriptor.
- */
- case class ResponseSubmitTaskListener(tdd: TaskDeploymentDescriptor)
-
- // --------------------------------------------------------------------------
- // Utility methods to allow simpler case object access from Java
- // --------------------------------------------------------------------------
-
- def getRequestRunningTasksMessage: AnyRef = {
- RequestRunningTasks
- }
-
- def getRequestBroadcastVariablesWithReferencesMessage: AnyRef = {
- RequestBroadcastVariablesWithReferences
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
index e596166..c143fe2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.leaderelection;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import scala.Option;
@@ -86,7 +85,7 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
@Override
public int getNumberOfJobManagers() {
- return this.configuration().getInteger(
+ return this.originalConfiguration().getInteger(
ConfigConstants.LOCAL_NUMBER_JOB_MANAGER,
ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
new file mode 100644
index 0000000..495cacd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import akka.actor.ActorRef;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
+import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.testingUtils.TestingMessages;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * A testing resource manager which may alter the default standalone resource master's behavior.
+ */
+public class TestingResourceManager extends StandaloneResourceManager {
+
+ /** Set of Actors which want to be informed of a connection to the job manager */
+ private Set<ActorRef> waitForResourceManagerConnected = new HashSet<>();
+
+ /** Set of Actors which want to be informed of a shutdown */
+ private Set<ActorRef> waitForShutdown = new HashSet<>();
+
+ /** Flag to signal a connection to the JobManager */
+ private boolean isConnected = false;
+
+ public TestingResourceManager(Configuration flinkConfig, LeaderRetrievalService leaderRetriever) {
+ super(flinkConfig, leaderRetriever);
+ }
+
+ /**
+ * Overwrite messages here if desired
+ */
+ @Override
+ protected void handleMessage(Object message) {
+
+ if (message instanceof GetRegisteredResources) {
+ sender().tell(new GetRegisteredResourcesReply(getStartedTaskManagers()), self());
+ } else if (message instanceof FailResource) {
+ ResourceID resourceID = ((FailResource) message).resourceID;
+ notifyWorkerFailed(resourceID, "Failed for test case.");
+
+ } else if (message instanceof NotifyWhenResourceManagerConnected) {
+ if (isConnected) {
+ sender().tell(
+ Messages.getAcknowledge(),
+ self());
+ } else {
+ waitForResourceManagerConnected.add(sender());
+ }
+ } else if (message instanceof RegisterResourceManagerSuccessful) {
+ super.handleMessage(message);
+
+ isConnected = true;
+
+ for (ActorRef ref : waitForResourceManagerConnected) {
+ ref.tell(
+ Messages.getAcknowledge(),
+ self());
+ }
+ waitForResourceManagerConnected.clear();
+
+ } else if (message instanceof TestingMessages.NotifyOfComponentShutdown$) {
+ waitForShutdown.add(sender());
+ } else if (message instanceof TestingMessages.Alive$) {
+ sender().tell(Messages.getAcknowledge(), self());
+ } else {
+ super.handleMessage(message);
+ }
+ }
+
+ /**
+ * Testing messages
+ */
+ public static class GetRegisteredResources {}
+
+ public static class GetRegisteredResourcesReply {
+
+ public Collection<ResourceID> resources;
+
+ public GetRegisteredResourcesReply(Collection<ResourceID> resources) {
+ this.resources = resources;
+ }
+
+ }
+
+ /**
+ * Fails all resources that the resource manager has registered
+ */
+ public static class FailResource {
+
+ public ResourceID resourceID;
+
+ public FailResource(ResourceID resourceID) {
+ this.resourceID = resourceID;
+ }
+ }
+
+ /**
+ * The sender of this message will be informed of a connection to the Job Manager
+ */
+ public static class NotifyWhenResourceManagerConnected {}
+
+ /**
+ * Inform registered listeners about a shutdown of the application.
+ */
+ @Override
+ protected void shutdownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
+ for (ActorRef listener : waitForShutdown) {
+ listener.tell(new TestingMessages.ComponentShutdown(self()), self());
+ }
+ waitForShutdown.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index b4ba40b..c01a321 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -18,22 +18,32 @@
package org.apache.flink.runtime.testingUtils
-import java.util.concurrent.TimeoutException
+import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException}
import akka.pattern.ask
-import akka.actor.{ActorRef, Props, ActorSystem}
+import akka.actor.{ActorRef, ActorSystem, Props}
+import akka.pattern.Patterns._
import akka.testkit.CallingThreadDispatcher
import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
import org.apache.flink.runtime.clusterframework.FlinkResourceManager
-import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategy
-import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
+import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, SubmittedJobGraphStore}
import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.minicluster.FlinkMiniCluster
+import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
import org.apache.flink.runtime.testutils.TestingResourceManager
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.testingUtils.TestingMessages.Alive
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
+import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Await, Future}
/**
@@ -48,7 +58,7 @@ class TestingCluster(
userConfiguration: Configuration,
singleActorSystem: Boolean,
synchronousDispatcher: Boolean)
- extends FlinkMiniCluster(
+ extends LocalFlinkMiniCluster(
userConfiguration,
singleActorSystem) {
@@ -59,133 +69,54 @@ class TestingCluster(
// --------------------------------------------------------------------------
- override def generateConfiguration(userConfig: Configuration): Configuration = {
- val cfg = new Configuration()
- cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost")
- cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
- cfg.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, 0)
- cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10)
- cfg.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1)
-
- setDefaultCiConfig(cfg)
-
- cfg.addAll(userConfig)
- cfg
- }
-
- override def startJobManager(index: Int, actorSystem: ActorSystem): ActorRef = {
- val config = configuration.clone()
-
- val jobManagerName = if(singleActorSystem) {
- JobManager.JOB_MANAGER_NAME + "_" + (index + 1)
- } else {
- JobManager.JOB_MANAGER_NAME
- }
-
- val archiveName = if(singleActorSystem) {
- JobManager.ARCHIVE_NAME + "_" + (index + 1)
- } else {
- JobManager.ARCHIVE_NAME
- }
-
- val jobManagerPort = config.getInteger(
- ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
-
- if(jobManagerPort > 0) {
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
- }
-
- val (executionContext,
- instanceManager,
- scheduler,
- libraryCacheManager,
- restartStrategyFactory,
- timeout,
- archiveCount,
- leaderElectionService,
- submittedJobsGraphs,
- checkpointRecoveryFactory,
- savepointStore,
- jobRecoveryTimeout,
- metricRegistry) = JobManager.createJobManagerComponents(
- config,
- createLeaderElectionService())
-
- val testArchiveProps = Props(new TestingMemoryArchivist(archiveCount))
- val archive = actorSystem.actorOf(testArchiveProps, archiveName)
-
- val jobManagerProps = Props(
- new TestingJobManager(
- configuration,
- executionContext,
- instanceManager,
- scheduler,
- libraryCacheManager,
- archive,
- restartStrategyFactory,
- timeout,
- leaderElectionService,
- submittedJobsGraphs,
- checkpointRecoveryFactory,
- savepointStore,
- jobRecoveryTimeout,
- metricRegistry))
-
- val dispatcherJobManagerProps = if (synchronousDispatcher) {
- // disable asynchronous futures (e.g. accumulator update in Heartbeat)
- jobManagerProps.withDispatcher(CallingThreadDispatcher.Id)
- } else {
- jobManagerProps
- }
-
- actorSystem.actorOf(dispatcherJobManagerProps, jobManagerName)
- }
-
- override def startResourceManager(index: Int, system: ActorSystem): ActorRef = {
- val config = configuration.clone()
-
- val resourceManagerName = if(singleActorSystem) {
- FlinkResourceManager.RESOURCE_MANAGER_NAME + "_" + (index + 1)
+ override val jobManagerClass: Class[_ <: JobManager] = classOf[TestingJobManager]
+
+ override val resourceManagerClass: Class[_ <: FlinkResourceManager[_ <: ResourceIDRetrievable]] =
+ classOf[TestingResourceManager]
+
+ override val taskManagerClass: Class[_ <: TaskManager] = classOf[TestingTaskManager]
+
+ override val memoryArchivistClass: Class[_ <: MemoryArchivist] = classOf[TestingMemoryArchivist]
+
+ override def getJobManagerProps(
+ jobManagerClass: Class[_ <: JobManager],
+ configuration: Configuration,
+ executorService: ExecutorService,
+ instanceManager: InstanceManager,
+ scheduler: Scheduler,
+ libraryCacheManager: BlobLibraryCacheManager,
+ archive: ActorRef,
+ restartStrategyFactory: RestartStrategyFactory,
+ timeout: FiniteDuration,
+ leaderElectionService: LeaderElectionService,
+ submittedJobGraphStore: SubmittedJobGraphStore,
+ checkpointRecoveryFactory: CheckpointRecoveryFactory,
+ savepointStore: SavepointStore,
+ jobRecoveryTimeout: FiniteDuration,
+ metricsRegistry: Option[MetricRegistry]): Props = {
+
+ val props = super.getJobManagerProps(
+ jobManagerClass,
+ configuration,
+ executorService,
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ archive,
+ restartStrategyFactory,
+ timeout,
+ leaderElectionService,
+ submittedJobGraphStore,
+ checkpointRecoveryFactory,
+ savepointStore,
+ jobRecoveryTimeout,
+ metricsRegistry)
+
+ if (synchronousDispatcher) {
+ props.withDispatcher(CallingThreadDispatcher.Id)
} else {
- FlinkResourceManager.RESOURCE_MANAGER_NAME
+ props
}
-
- val resourceManagerPort = config.getInteger(
- ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
-
- if(resourceManagerPort > 0) {
- config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerPort + index)
- }
-
- val testResourceManagerProps = Props(
- new TestingResourceManager(
- config,
- createLeaderRetrievalService()
- ))
-
- system.actorOf(testResourceManagerProps, resourceManagerName)
- }
-
- override def startTaskManager(index: Int, system: ActorSystem) = {
-
- val tmActorName = TaskManager.TASK_MANAGER_NAME + "_" + (index + 1)
-
- TaskManager.startTaskManagerComponentsAndActor(
- configuration,
- ResourceID.generate(),
- system,
- hostname,
- Some(tmActorName),
- Some(createLeaderRetrievalService()),
- numTaskManagers == 1,
- classOf[TestingTaskManager])
- }
-
-
- def createLeaderElectionService(): Option[LeaderElectionService] = {
- None
}
@throws(classOf[TimeoutException])
@@ -228,4 +159,131 @@ class TestingCluster(
Await.ready(combinedFuture, timeout)
}
+
+ def waitForTaskManagersToBeRegisteredAtJobManager(jobManager: ActorRef): Unit = {
+ val futures = taskManagerActors.map {
+ _.map {
+ tm => (tm ? NotifyWhenRegisteredAtJobManager(jobManager))(timeout)
+ }
+ }.getOrElse(Seq())
+
+ try {
+ Await.ready(Future.sequence(futures), timeout)
+ } catch {
+ case t: TimeoutException =>
+ throw new Exception("Timeout while waiting for TaskManagers to register at " +
+ s"${jobManager.path}")
+ }
+
+ }
+
+ def restartLeadingJobManager(): Unit = {
+ this.synchronized {
+ (jobManagerActorSystems, jobManagerActors) match {
+ case (Some(jmActorSystems), Some(jmActors)) =>
+ val leader = getLeaderGateway(AkkaUtils.getTimeout(originalConfiguration))
+ val index = getLeaderIndex(AkkaUtils.getTimeout(originalConfiguration))
+
+ // restart the leading job manager with the same port
+ val port = getLeaderRPCPort
+ val oldPort = originalConfiguration.getInteger(
+ ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+ 0)
+
+ // we have to set the old port in the configuration file because this is used for startup
+ originalConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port)
+
+ clearLeader()
+
+ val stopped = gracefulStop(leader.actor(), TestingCluster.MAX_RESTART_DURATION)
+ Await.result(stopped, TestingCluster.MAX_RESTART_DURATION)
+
+ if(!singleActorSystem) {
+ jmActorSystems(index).shutdown()
+ jmActorSystems(index).awaitTermination()
+ }
+
+ val newJobManagerActorSystem = if(!singleActorSystem) {
+ startJobManagerActorSystem(index)
+ } else {
+ jmActorSystems.head
+ }
+
+ // reset the original configuration
+ originalConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, oldPort)
+
+ val newJobManagerActor = startJobManager(index, newJobManagerActorSystem)
+
+ jobManagerActors = Some(jmActors.patch(index, Seq(newJobManagerActor), 1))
+ jobManagerActorSystems = Some(jmActorSystems.patch(
+ index,
+ Seq(newJobManagerActorSystem),
+ 1))
+
+ val lrs = createLeaderRetrievalService()
+
+ jobManagerLeaderRetrievalService = Some(lrs)
+ lrs.start(this)
+
+ case _ => throw new Exception("The JobManager of the TestingCluster have not " +
+ "been started properly.")
+ }
+ }
+ }
+
+ def restartTaskManager(index: Int): Unit = {
+ (taskManagerActorSystems, taskManagerActors) match {
+ case (Some(tmActorSystems), Some(tmActors)) =>
+ val stopped = gracefulStop(tmActors(index), TestingCluster.MAX_RESTART_DURATION)
+ Await.result(stopped, TestingCluster.MAX_RESTART_DURATION)
+
+ if(!singleActorSystem) {
+ tmActorSystems(index).shutdown()
+ tmActorSystems(index).awaitTermination()
+ }
+
+ val taskManagerActorSystem = if(!singleActorSystem) {
+ startTaskManagerActorSystem(index)
+ } else {
+ tmActorSystems.head
+ }
+
+ val taskManagerActor = startTaskManager(index, taskManagerActorSystem)
+
+ taskManagerActors = Some(tmActors.patch(index, Seq(taskManagerActor), 1))
+ taskManagerActorSystems = Some(tmActorSystems.patch(index, Seq(taskManagerActorSystem), 1))
+
+ case _ => throw new Exception("The TaskManager of the TestingCluster have not " +
+ "been started properly.")
+ }
+ }
+
+ def addTaskManager(): Unit = {
+ if (useSingleActorSystem) {
+ (jobManagerActorSystems, taskManagerActors) match {
+ case (Some(jmSystems), Some(tmActors)) =>
+ val index = numTaskManagers
+ taskManagerActors = Some(tmActors :+ startTaskManager(index, jmSystems(0)))
+ numTaskManagers += 1
+ case _ => throw new IllegalStateException("Cluster has not been started properly.")
+ }
+ } else {
+ (taskManagerActorSystems, taskManagerActors) match {
+ case (Some(tmSystems), Some(tmActors)) =>
+ val index = numTaskManagers
+ val newTmSystem = startTaskManagerActorSystem(index)
+ val newTmActor = startTaskManager(index, newTmSystem)
+
+ taskManagerActorSystems = Some(tmSystems :+ newTmSystem)
+ taskManagerActors = Some(tmActors :+ newTmActor)
+
+ numTaskManagers += 1
+ case _ => throw new IllegalStateException("Cluster has not been started properly.")
+ }
+ }
+ }
+}
+
+object TestingCluster {
+ val MAX_RESTART_DURATION = new FiniteDuration(2, TimeUnit.MINUTES)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
new file mode 100644
index 0000000..62349db
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import java.util.concurrent.ExecutorService
+
+import akka.actor.ActorRef
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
+import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
+import org.apache.flink.runtime.leaderelection.LeaderElectionService
+import org.apache.flink.runtime.metrics.MetricRegistry
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+/** JobManager implementation extended by testing messages
+ *
+ */
+class TestingJobManager(
+ flinkConfiguration: Configuration,
+ executorService: ExecutorService,
+ instanceManager: InstanceManager,
+ scheduler: Scheduler,
+ libraryCacheManager: BlobLibraryCacheManager,
+ archive: ActorRef,
+ restartStrategyFactory: RestartStrategyFactory,
+ timeout: FiniteDuration,
+ leaderElectionService: LeaderElectionService,
+ submittedJobGraphs : SubmittedJobGraphStore,
+ checkpointRecoveryFactory : CheckpointRecoveryFactory,
+ savepointStore : SavepointStore,
+ jobRecoveryTimeout : FiniteDuration,
+ metricRegistry : Option[MetricRegistry])
+ extends JobManager(
+ flinkConfiguration,
+ executorService,
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ archive,
+ restartStrategyFactory,
+ timeout,
+ leaderElectionService,
+ submittedJobGraphs,
+ checkpointRecoveryFactory,
+ savepointStore,
+ jobRecoveryTimeout,
+ metricRegistry)
+ with TestingJobManagerLike {}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
new file mode 100644
index 0000000..5ba2790
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import akka.actor.{ActorRef, Cancellable, Terminated}
+import akka.pattern.{ask, pipe}
+import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.FlinkActor
+import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.jobgraph.JobStatus
+import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
+import org.apache.flink.runtime.messages.JobManagerMessages.{GrantLeadership, RegisterJobClient, RequestClassloadingProps}
+import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
+import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
+import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
+import org.apache.flink.runtime.testingUtils.TestingMessages._
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
+
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+/** This mixin can be used to decorate a JobManager with messages for testing purpose. */
+trait TestingJobManagerLike extends FlinkActor {
+ that: JobManager =>
+
+ import context._
+
+ import scala.collection.JavaConverters._
+
+ val waitForAllVerticesToBeRunning = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
+ val waitForTaskManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
+
+ val waitForAllVerticesToBeRunningOrFinished =
+ scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
+
+ var periodicCheck: Option[Cancellable] = None
+
+ val waitForJobStatus = scala.collection.mutable.HashMap[JobID,
+ collection.mutable.HashMap[JobStatus, Set[ActorRef]]]()
+
+ val waitForAccumulatorUpdate = scala.collection.mutable.HashMap[JobID, (Boolean, Set[ActorRef])]()
+
+ val waitForLeader = scala.collection.mutable.HashSet[ActorRef]()
+
+ val waitForNumRegisteredTaskManagers = mutable.PriorityQueue.newBuilder(
+ new Ordering[(Int, ActorRef)] {
+ override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = y._1 - x._1
+ })
+
+ val waitForClient = scala.collection.mutable.HashSet[ActorRef]()
+
+ val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]()
+
+ var disconnectDisabled = false
+
+ var postStopEnabled = true
+
+ abstract override def postStop(): Unit = {
+ if (postStopEnabled) {
+ super.postStop()
+ } else {
+ // only stop leader election service to revoke the leadership of this JM so that a new JM
+ // can be elected leader
+ leaderElectionService.stop()
+ }
+ }
+
+ abstract override def handleMessage: Receive = {
+ handleTestingMessage orElse super.handleMessage
+ }
+
+ def handleTestingMessage: Receive = {
+ case Alive => sender() ! Acknowledge
+
+ case RequestExecutionGraph(jobID) =>
+ currentJobs.get(jobID) match {
+ case Some((executionGraph, jobInfo)) => sender() ! decorateMessage(
+ ExecutionGraphFound(
+ jobID,
+ executionGraph)
+ )
+
+ case None => archive.tell(decorateMessage(RequestExecutionGraph(jobID)), sender())
+ }
+
+ case WaitForAllVerticesToBeRunning(jobID) =>
+ if(checkIfAllVerticesRunning(jobID)){
+ sender() ! decorateMessage(AllVerticesRunning(jobID))
+ }else{
+ val waiting = waitForAllVerticesToBeRunning.getOrElse(jobID, Set[ActorRef]())
+ waitForAllVerticesToBeRunning += jobID -> (waiting + sender())
+
+ if(periodicCheck.isEmpty){
+ periodicCheck =
+ Some(
+ context.system.scheduler.schedule(
+ 0 seconds,
+ 200 millis,
+ self,
+ decorateMessage(NotifyListeners)
+ )
+ )
+ }
+ }
+ case WaitForAllVerticesToBeRunningOrFinished(jobID) =>
+ if(checkIfAllVerticesRunningOrFinished(jobID)){
+ sender() ! decorateMessage(AllVerticesRunning(jobID))
+ }else{
+ val waiting = waitForAllVerticesToBeRunningOrFinished.getOrElse(jobID, Set[ActorRef]())
+ waitForAllVerticesToBeRunningOrFinished += jobID -> (waiting + sender())
+
+ if(periodicCheck.isEmpty){
+ periodicCheck =
+ Some(
+ context.system.scheduler.schedule(
+ 0 seconds,
+ 200 millis,
+ self,
+ decorateMessage(NotifyListeners)
+ )
+ )
+ }
+ }
+
+ case NotifyListeners =>
+ for(jobID <- currentJobs.keySet){
+ notifyListeners(jobID)
+ }
+
+ if(waitForAllVerticesToBeRunning.isEmpty && waitForAllVerticesToBeRunningOrFinished.isEmpty) {
+ periodicCheck foreach { _.cancel() }
+ periodicCheck = None
+ }
+
+
+ case NotifyWhenJobRemoved(jobID) =>
+ val gateways = instanceManager.getAllRegisteredInstances.asScala.map(_.getActorGateway)
+
+ val responses = gateways.map{
+ gateway => gateway.ask(NotifyWhenJobRemoved(jobID), timeout).mapTo[Boolean]
+ }
+
+ val jobRemovedOnJobManager = (self ? CheckIfJobRemoved(jobID))(timeout).mapTo[Boolean]
+
+ val allFutures = responses ++ Seq(jobRemovedOnJobManager)
+
+ import context.dispatcher
+ Future.fold(allFutures)(true)(_ & _) map(decorateMessage(_)) pipeTo sender()
+
+ case CheckIfJobRemoved(jobID) =>
+ if(currentJobs.contains(jobID)) {
+ context.system.scheduler.scheduleOnce(
+ 200 milliseconds,
+ self,
+ decorateMessage(CheckIfJobRemoved(jobID))
+ )(context.dispatcher, sender())
+ } else {
+ sender() ! decorateMessage(true)
+ }
+
+ case NotifyWhenTaskManagerTerminated(taskManager) =>
+ val waiting = waitForTaskManagerToBeTerminated.getOrElse(taskManager.path.name, Set())
+ waitForTaskManagerToBeTerminated += taskManager.path.name -> (waiting + sender)
+
+ case msg@Terminated(taskManager) =>
+ super.handleMessage(msg)
+
+ waitForTaskManagerToBeTerminated.remove(taskManager.path.name) foreach {
+ _ foreach {
+ listener =>
+ listener ! decorateMessage(TaskManagerTerminated(taskManager))
+ }
+ }
+
+ // see shutdown method for reply
+ case NotifyOfComponentShutdown =>
+ waitForShutdown += sender()
+
+ case NotifyWhenAccumulatorChange(jobID) =>
+
+ val (updated, registered) = waitForAccumulatorUpdate.
+ getOrElse(jobID, (false, Set[ActorRef]()))
+ waitForAccumulatorUpdate += jobID -> (updated, registered + sender)
+ sender ! true
+
+ /**
+ * Notification from the task manager that changed accumulator are transferred on next
+ * Hearbeat. We need to keep this state to notify the listeners on next Heartbeat report.
+ */
+ case AccumulatorsChanged(jobID: JobID) =>
+ waitForAccumulatorUpdate.get(jobID) match {
+ case Some((updated, registered)) =>
+ waitForAccumulatorUpdate.put(jobID, (true, registered))
+ case None =>
+ }
+
+ /**
+ * Disabled async processing of accumulator values and send accumulators to the listeners if
+ * we previously received an [[AccumulatorsChanged]] message.
+ */
+ case msg : Heartbeat =>
+ super.handleMessage(msg)
+
+ waitForAccumulatorUpdate foreach {
+ case (jobID, (updated, actors)) if updated =>
+ currentJobs.get(jobID) match {
+ case Some((graph, jobInfo)) =>
+ val flinkAccumulators = graph.getFlinkAccumulators
+ val userAccumulators = graph.aggregateUserAccumulators
+ actors foreach {
+ actor => actor ! UpdatedAccumulators(jobID, flinkAccumulators, userAccumulators)
+ }
+ case None =>
+ }
+ waitForAccumulatorUpdate.put(jobID, (false, actors))
+ case _ =>
+ }
+
+ case RequestWorkingTaskManager(jobID) =>
+ currentJobs.get(jobID) match {
+ case Some((eg, _)) =>
+ if(eg.getAllExecutionVertices.asScala.isEmpty){
+ sender ! decorateMessage(WorkingTaskManager(None))
+ } else {
+ val resource = eg.getAllExecutionVertices.asScala.head.getCurrentAssignedResource
+
+ if(resource == null){
+ sender ! decorateMessage(WorkingTaskManager(None))
+ } else {
+ sender ! decorateMessage(
+ WorkingTaskManager(
+ Some(resource.getTaskManagerActorGateway())
+ )
+ )
+ }
+ }
+ case None => sender ! decorateMessage(WorkingTaskManager(None))
+ }
+
+ case NotifyWhenJobStatus(jobID, state) =>
+ val jobStatusListener = waitForJobStatus.getOrElseUpdate(jobID,
+ scala.collection.mutable.HashMap[JobStatus, Set[ActorRef]]())
+
+ val listener = jobStatusListener.getOrElse(state, Set[ActorRef]())
+
+ jobStatusListener += state -> (listener + sender)
+
+ case msg@JobStatusChanged(jobID, newJobStatus, _, _) =>
+ super.handleMessage(msg)
+
+ val cleanup = waitForJobStatus.get(jobID) match {
+ case Some(stateListener) =>
+ stateListener.remove(newJobStatus) match {
+ case Some(listeners) =>
+ listeners foreach {
+ _ ! decorateMessage(JobStatusIs(jobID, newJobStatus))
+ }
+ case _ =>
+ }
+ stateListener.isEmpty
+
+ case _ => false
+ }
+
+ if (cleanup) {
+ waitForJobStatus.remove(jobID)
+ }
+
+ case DisableDisconnect =>
+ disconnectDisabled = true
+
+ case DisablePostStop =>
+ postStopEnabled = false
+
+ case RequestSavepoint(savepointPath) =>
+ try {
+ val savepoint = savepointStore.loadSavepoint(savepointPath)
+ sender ! ResponseSavepoint(savepoint)
+ }
+ catch {
+ case e: Exception =>
+ sender ! ResponseSavepoint(null)
+ }
+
+ case msg: Disconnect =>
+ if (!disconnectDisabled) {
+ super.handleMessage(msg)
+
+ val taskManager = sender()
+
+ waitForTaskManagerToBeTerminated.remove(taskManager.path.name) foreach {
+ _ foreach {
+ listener =>
+ listener ! decorateMessage(TaskManagerTerminated(taskManager))
+ }
+ }
+ }
+
+ case NotifyWhenLeader =>
+ if (leaderElectionService.hasLeadership) {
+ sender() ! true
+ } else {
+ waitForLeader += sender()
+ }
+
+ case msg: GrantLeadership =>
+ super.handleMessage(msg)
+
+ waitForLeader.foreach(_ ! true)
+
+ waitForLeader.clear()
+
+ case NotifyWhenClientConnects =>
+ waitForClient += sender()
+ sender() ! true
+
+ case msg: RegisterJobClient =>
+ super.handleMessage(msg)
+ waitForClient.foreach(_ ! ClientConnected)
+ case msg: RequestClassloadingProps =>
+ super.handleMessage(msg)
+ waitForClient.foreach(_ ! ClassLoadingPropsDelivered)
+
+ case NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager) =>
+ if (that.instanceManager.getNumberOfRegisteredTaskManagers >= numRegisteredTaskManager) {
+ // there are already at least numRegisteredTaskManager registered --> send Acknowledge
+ sender() ! Acknowledge
+ } else {
+ // wait until we see at least numRegisteredTaskManager being registered at the JobManager
+ waitForNumRegisteredTaskManagers += ((numRegisteredTaskManager, sender()))
+ }
+
+ // TaskManager may be registered on these two messages
+ case msg @ (_: RegisterTaskManager) =>
+ super.handleMessage(msg)
+
+ // dequeue all senders which wait for instanceManager.getNumberOfStartedTaskManagers or
+ // fewer registered TaskManagers
+ while (waitForNumRegisteredTaskManagers.nonEmpty &&
+ waitForNumRegisteredTaskManagers.head._1 <=
+ instanceManager.getNumberOfRegisteredTaskManagers) {
+ val receiver = waitForNumRegisteredTaskManagers.dequeue()._2
+ receiver ! Acknowledge
+ }
+ }
+
+ def checkIfAllVerticesRunning(jobID: JobID): Boolean = {
+ currentJobs.get(jobID) match {
+ case Some((eg, _)) =>
+ eg.getAllExecutionVertices.asScala.forall( _.getExecutionState == ExecutionState.RUNNING)
+ case None => false
+ }
+ }
+
+ def checkIfAllVerticesRunningOrFinished(jobID: JobID): Boolean = {
+ currentJobs.get(jobID) match {
+ case Some((eg, _)) =>
+ eg.getAllExecutionVertices.asScala.forall {
+ case vertex =>
+ (vertex.getExecutionState == ExecutionState.RUNNING
+ || vertex.getExecutionState == ExecutionState.FINISHED)
+ }
+ case None => false
+ }
+ }
+
+ def notifyListeners(jobID: JobID): Unit = {
+ if(checkIfAllVerticesRunning(jobID)) {
+ waitForAllVerticesToBeRunning.remove(jobID) match {
+ case Some(listeners) =>
+ for (listener <- listeners) {
+ listener ! decorateMessage(AllVerticesRunning(jobID))
+ }
+ case _ =>
+ }
+ }
+
+ if(checkIfAllVerticesRunningOrFinished(jobID)) {
+ waitForAllVerticesToBeRunningOrFinished.remove(jobID) match {
+ case Some(listeners) =>
+ for (listener <- listeners) {
+ listener ! decorateMessage(AllVerticesRunning(jobID))
+ }
+ case _ =>
+ }
+ }
+ }
+
+ /**
+ * No killing of the VM for testing.
+ */
+ override protected def shutdown(): Unit = {
+ log.info("Shutting down TestingJobManager.")
+ waitForShutdown.foreach(_ ! ComponentShutdown(self))
+ waitForShutdown.clear()
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
new file mode 100644
index 0000000..d07c48f
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import java.util.Map
+
+import akka.actor.ActorRef
+import org.apache.flink.api.common.JobID
+import org.apache.flink.api.common.accumulators.Accumulator
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry
+import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
+import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
+import org.apache.flink.runtime.instance.ActorGateway
+import org.apache.flink.runtime.jobgraph.JobStatus
+
+object TestingJobManagerMessages {
+
+ case class RequestExecutionGraph(jobID: JobID)
+
+ sealed trait ResponseExecutionGraph {
+ def jobID: JobID
+ }
+
+ case class ExecutionGraphFound(jobID: JobID, executionGraph: ExecutionGraph) extends
+ ResponseExecutionGraph
+
+ case class ExecutionGraphNotFound(jobID: JobID) extends ResponseExecutionGraph
+
+ case class WaitForAllVerticesToBeRunning(jobID: JobID)
+ case class WaitForAllVerticesToBeRunningOrFinished(jobID: JobID)
+ case class AllVerticesRunning(jobID: JobID)
+
+ case class NotifyWhenJobRemoved(jobID: JobID)
+
+ case class RequestWorkingTaskManager(jobID: JobID)
+ case class WorkingTaskManager(gatewayOption: Option[ActorGateway])
+
+ case class NotifyWhenJobStatus(jobID: JobID, state: JobStatus)
+ case class JobStatusIs(jobID: JobID, state: JobStatus)
+
+ case object NotifyListeners
+
+ case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef)
+ case class TaskManagerTerminated(taskManager: ActorRef)
+
+ /**
+ * Registers a listener to receive a message when accumulators changed.
+ * The change must be explicitly triggered by the TestingTaskManager which can receive an
+ * [[org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged]]
+ * message by a task that changed the accumulators. This message is then
+ * forwarded to the JobManager which will send the accumulators in the [[UpdatedAccumulators]]
+ * message when the next Heartbeat occurs.
+ */
+ case class NotifyWhenAccumulatorChange(jobID: JobID)
+
+ /**
+ * Reports updated accumulators back to the listener.
+ */
+ case class UpdatedAccumulators(jobID: JobID,
+ flinkAccumulators: Map[ExecutionAttemptID, Map[AccumulatorRegistry.Metric, Accumulator[_,_]]],
+ userAccumulators: Map[String, Accumulator[_,_]])
+
+ /** Notifies the sender when the [[TestingJobManager]] has been elected as the leader
+ *
+ */
+ case object NotifyWhenLeader
+
+ /**
+ * Notifies the sender when the [[TestingJobManager]] receives new clients for jobs
+ */
+ case object NotifyWhenClientConnects
+ /**
+ * Notifes of client connect
+ */
+ case object ClientConnected
+ /**
+ * Notifies when the client has requested class loading information
+ */
+ case object ClassLoadingPropsDelivered
+
+ /**
+ * Registers to be notified by an [[org.apache.flink.runtime.messages.Messages.Acknowledge]]
+ * message when at least numRegisteredTaskManager have registered at the JobManager.
+ *
+ * @param numRegisteredTaskManager minimum number of registered TMs before the sender is notified
+ */
+ case class NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager: Int)
+
+ /** Disables the post stop method of the [[TestingJobManager]].
+ *
+ * Only the leaderElectionService is stopped in the postStop method call to revoke the leadership
+ */
+ case object DisablePostStop
+
+ /**
+ * Requests a savepoint from the job manager.
+ *
+ * @param savepointPath The path of the savepoint to request.
+ */
+ case class RequestSavepoint(savepointPath: String)
+
+ /**
+ * Response to a savepoint request.
+ *
+ * @param savepoint The requested savepoint or null if none available.
+ */
+ case class ResponseSavepoint(savepoint: Savepoint)
+
+ def getNotifyWhenLeader(): AnyRef = NotifyWhenLeader
+ def getNotifyWhenClientConnects(): AnyRef = NotifyWhenClientConnects
+ def getDisablePostStop(): AnyRef = DisablePostStop
+
+ def getClientConnected(): AnyRef = ClientConnected
+ def getClassLoadingPropsDelivered(): AnyRef = ClassLoadingPropsDelivered
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
new file mode 100644
index 0000000..48a1ddd
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import org.apache.flink.runtime.jobmanager.MemoryArchivist
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{ExecutionGraphFound, ExecutionGraphNotFound, RequestExecutionGraph}
+
+/** Memory archivist extended by testing messages
+ *
+ * @param maxEntries number of maximum number of archived jobs
+ */
+class TestingMemoryArchivist(maxEntries: Int) extends MemoryArchivist(maxEntries) {
+
+ override def handleMessage: Receive = {
+ handleTestingMessage orElse super.handleMessage
+ }
+
+ def handleTestingMessage: Receive = {
+ case RequestExecutionGraph(jobID) =>
+ val executionGraph = graphs.get(jobID)
+
+ executionGraph match {
+ case Some(graph) => sender ! decorateMessage(ExecutionGraphFound(jobID, graph))
+ case None => sender ! decorateMessage(ExecutionGraphNotFound(jobID))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
new file mode 100644
index 0000000..91d169a
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import akka.actor.ActorRef
+import org.apache.flink.api.common.JobID
+
+object TestingMessages {
+
+ case class CheckIfJobRemoved(jobID: JobID)
+
+ case object DisableDisconnect
+
+ case object Alive
+
+ def getAlive: AnyRef = Alive
+
+ def getDisableDisconnect: AnyRef = DisableDisconnect
+
+ case object NotifyOfComponentShutdown
+ case class ComponentShutdown(ref: ActorRef)
+
+ def getNotifyOfComponentShutdown(): AnyRef = NotifyOfComponentShutdown
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
new file mode 100644
index 0000000..9b5a147
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.io.disk.iomanager.IOManager
+import org.apache.flink.runtime.io.network.NetworkEnvironment
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
+import org.apache.flink.runtime.memory.MemoryManager
+import org.apache.flink.runtime.taskmanager.{TaskManagerLocation, TaskManager, TaskManagerConfiguration}
+
+import scala.language.postfixOps
+
+/** Subclass of the [[TaskManager]] to support testing messages
+ */
+class TestingTaskManager(
+ config: TaskManagerConfiguration,
+ resourceID: ResourceID,
+ connectionInfo: TaskManagerLocation,
+ memoryManager: MemoryManager,
+ ioManager: IOManager,
+ network: NetworkEnvironment,
+ numberOfSlots: Int,
+ leaderRetrievalService: LeaderRetrievalService)
+ extends TaskManager(
+ config,
+ resourceID,
+ connectionInfo,
+ memoryManager,
+ ioManager,
+ network,
+ numberOfSlots,
+ leaderRetrievalService)
+ with TestingTaskManagerLike {
+
+ def this(
+ config: TaskManagerConfiguration,
+ connectionInfo: TaskManagerLocation,
+ memoryManager: MemoryManager,
+ ioManager: IOManager,
+ network: NetworkEnvironment,
+ numberOfSlots: Int,
+ leaderRetrievalService: LeaderRetrievalService) {
+ this(
+ config,
+ ResourceID.generate(),
+ connectionInfo,
+ memoryManager,
+ ioManager,
+ network,
+ numberOfSlots,
+ leaderRetrievalService)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
new file mode 100644
index 0000000..2498dbe
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import akka.actor.{ActorRef, Terminated}
+import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.FlinkActor
+import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+import org.apache.flink.runtime.messages.JobManagerMessages.{RequestLeaderSessionID, ResponseLeaderSessionID}
+import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
+import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered}
+import org.apache.flink.runtime.messages.TaskMessages.{SubmitTask, TaskInFinalState, UpdateTaskExecutionState}
+import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
+import org.apache.flink.runtime.testingUtils.TestingMessages._
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+/** This mixin can be used to decorate a TaskManager with messages for testing purposes. */
+trait TestingTaskManagerLike extends FlinkActor {
+ that: TaskManager =>
+
+ import scala.collection.JavaConverters._
+
+ val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
+ val waitForJobManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
+ val waitForRegisteredAtResourceManager =
+ scala.collection.mutable.HashMap[ActorRef, Set[ActorRef]]()
+ val waitForRunning = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
+ val unregisteredTasks = scala.collection.mutable.HashSet[ExecutionAttemptID]()
+
+ /** Map of registered task submit listeners */
+ val registeredSubmitTaskListeners = scala.collection.mutable.HashMap[JobID, ActorRef]()
+
+ val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]()
+
+ var disconnectDisabled = false
+
+ /**
+ * Handler for testing related messages
+ */
+ abstract override def handleMessage: Receive = {
+ handleTestingMessage orElse super.handleMessage
+ }
+
+ def handleTestingMessage: Receive = {
+ case Alive => sender() ! Acknowledge
+
+ case NotifyWhenTaskIsRunning(executionID) =>
+ Option(runningTasks.get(executionID)) match {
+ case Some(task) if task.getExecutionState == ExecutionState.RUNNING =>
+ sender ! decorateMessage(true)
+
+ case _ =>
+ val listeners = waitForRunning.getOrElse(executionID, Set())
+ waitForRunning += (executionID -> (listeners + sender))
+ }
+
+ case RequestRunningTasks =>
+ sender ! decorateMessage(ResponseRunningTasks(runningTasks.asScala.toMap))
+
+ case NotifyWhenTaskRemoved(executionID) =>
+ Option(runningTasks.get(executionID)) match {
+ case Some(_) =>
+ val set = waitForRemoval.getOrElse(executionID, Set())
+ waitForRemoval += (executionID -> (set + sender))
+ case None =>
+ if(unregisteredTasks.contains(executionID)) {
+ sender ! decorateMessage(true)
+ } else {
+ val set = waitForRemoval.getOrElse(executionID, Set())
+ waitForRemoval += (executionID -> (set + sender))
+ }
+ }
+
+ case TaskInFinalState(executionID) =>
+ super.handleMessage(TaskInFinalState(executionID))
+ waitForRemoval.remove(executionID) match {
+ case Some(actors) => for(actor <- actors) actor ! decorateMessage(true)
+ case None =>
+ }
+
+ unregisteredTasks += executionID
+
+ case NotifyWhenJobRemoved(jobID) =>
+ if(runningTasks.values.asScala.exists(_.getJobID == jobID)){
+ context.system.scheduler.scheduleOnce(
+ 200 milliseconds,
+ self,
+ decorateMessage(CheckIfJobRemoved(jobID)))(
+ context.dispatcher,
+ sender()
+ )
+ }else{
+ sender ! decorateMessage(true)
+ }
+
+ case CheckIfJobRemoved(jobID) =>
+ if(runningTasks.values.asScala.forall(_.getJobID != jobID)){
+ sender ! decorateMessage(true)
+ } else {
+ context.system.scheduler.scheduleOnce(
+ 200 milliseconds,
+ self,
+ decorateMessage(CheckIfJobRemoved(jobID)))(
+ context.dispatcher,
+ sender()
+ )
+ }
+
+ case NotifyWhenJobManagerTerminated(jobManager) =>
+ val waiting = waitForJobManagerToBeTerminated.getOrElse(jobManager.path.name, Set())
+ waitForJobManagerToBeTerminated += jobManager.path.name -> (waiting + sender)
+
+ case RegisterSubmitTaskListener(jobId) =>
+ registeredSubmitTaskListeners.put(jobId, sender())
+
+ case msg@SubmitTask(tdd) =>
+ registeredSubmitTaskListeners.get(tdd.getJobID) match {
+ case Some(listenerRef) =>
+ listenerRef ! ResponseSubmitTaskListener(tdd)
+ case None =>
+ // Nothing to do
+ }
+
+ super.handleMessage(msg)
+
+ /**
+ * Message from task manager that accumulator values changed and need to be reported immediately
+ * instead of lazily through the
+ * [[org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat]] message. We forward this
+ * message to the job manager that it knows it should report to the listeners.
+ */
+ case msg: AccumulatorsChanged =>
+ currentJobManager match {
+ case Some(jobManager) =>
+ jobManager.forward(msg)
+ sendHeartbeatToJobManager()
+ sender ! true
+ case None =>
+ }
+
+ case msg@Terminated(jobManager) =>
+ super.handleMessage(msg)
+
+ waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach {
+ _ foreach {
+ _ ! decorateMessage(JobManagerTerminated(jobManager))
+ }
+ }
+
+ case msg:Disconnect =>
+ if (!disconnectDisabled) {
+ super.handleMessage(msg)
+
+ val jobManager = sender()
+
+ waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach {
+ _ foreach {
+ _ ! decorateMessage(JobManagerTerminated(jobManager))
+ }
+ }
+ }
+
+ case DisableDisconnect =>
+ disconnectDisabled = true
+
+ case NotifyOfComponentShutdown =>
+ waitForShutdown += sender()
+
+ case msg @ UpdateTaskExecutionState(taskExecutionState) =>
+ super.handleMessage(msg)
+
+ if(taskExecutionState.getExecutionState == ExecutionState.RUNNING) {
+ waitForRunning.get(taskExecutionState.getID) foreach {
+ _ foreach (_ ! decorateMessage(true))
+ }
+ }
+
+ case RequestLeaderSessionID =>
+ sender() ! ResponseLeaderSessionID(leaderSessionID.orNull)
+
+ case NotifyWhenRegisteredAtJobManager(jobManager: ActorRef) =>
+ if(isConnected && jobManager == currentJobManager.get) {
+ sender() ! true
+ } else {
+ val list = waitForRegisteredAtResourceManager.getOrElse(
+ jobManager,
+ Set[ActorRef]())
+
+ waitForRegisteredAtResourceManager += jobManager -> (list + sender())
+ }
+
+ case msg @ (_: AcknowledgeRegistration | _: AlreadyRegistered) =>
+ super.handleMessage(msg)
+
+ val jm = sender()
+
+ waitForRegisteredAtResourceManager.remove(jm).foreach {
+ listeners => listeners.foreach{
+ listener =>
+ listener ! true
+ }
+ }
+ }
+
+ /**
+ * No killing of the VM for testing.
+ */
+ override protected def shutdown(): Unit = {
+ log.info("Shutting down TestingJobManager.")
+ waitForShutdown.foreach(_ ! ComponentShutdown(self))
+ waitForShutdown.clear()
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
new file mode 100644
index 0000000..32c3c55
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import akka.actor.ActorRef
+import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+import org.apache.flink.runtime.taskmanager.Task
+
+/**
+ * Additional messages that the [[TestingTaskManager]] understands.
+ */
+object TestingTaskManagerMessages {
+
+ case class NotifyWhenTaskRemoved(executionID: ExecutionAttemptID)
+
+ case class NotifyWhenTaskIsRunning(executionID: ExecutionAttemptID)
+
+ case class ResponseRunningTasks(tasks: Map[ExecutionAttemptID, Task]){
+ import collection.JavaConverters._
+ def asJava: java.util.Map[ExecutionAttemptID, Task] = tasks.asJava
+ }
+
+ case object RequestRunningTasks
+
+ case class NotifyWhenJobManagerTerminated(jobManager: ActorRef)
+
+ case class JobManagerTerminated(jobManager: ActorRef)
+
+ case class NotifyWhenRegisteredAtJobManager(resourceManager: ActorRef)
+
+ /**
+ * Message to give a hint to the task manager that accumulator values were updated in the task.
+ * This message is forwarded to the job manager which knows that it needs to notify listeners
+ * of accumulator updates.
+ */
+ case class AccumulatorsChanged(jobID: JobID)
+
+ /**
+ * Registers a listener for all [[org.apache.flink.runtime.messages.TaskMessages.SubmitTask]]
+ * messages of the given job.
+ *
+ * If a task is submitted with the given job ID the task deployment
+ * descriptor is forwarded to the listener.
+ *
+ * @param jobId The job ID to listen for.
+ */
+ case class RegisterSubmitTaskListener(jobId: JobID)
+
+ /**
+ * A response to a listened job ID containing the submitted task deployment descriptor.
+ *
+ * @param tdd The submitted task deployment descriptor.
+ */
+ case class ResponseSubmitTaskListener(tdd: TaskDeploymentDescriptor)
+
+ // --------------------------------------------------------------------------
+ // Utility methods to allow simpler case object access from Java
+ // --------------------------------------------------------------------------
+
+ def getRequestRunningTasksMessage: AnyRef = {
+ RequestRunningTasks
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index ee1b264..00410cc 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -22,9 +22,10 @@ import java.io._
import java.util.concurrent.TimeUnit
import org.apache.flink.configuration.GlobalConfiguration
-import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils}
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.util.TestLogger
-import org.junit.{AfterClass, BeforeClass, Test, Assert}
+import org.junit.{AfterClass, Assert, BeforeClass, Test}
import scala.concurrent.duration.FiniteDuration
import scala.tools.nsc.Settings
@@ -334,7 +335,7 @@ class ScalaShellITCase extends TestLogger {
}
object ScalaShellITCase {
- var cluster: Option[ForkableFlinkMiniCluster] = None
+ var cluster: Option[LocalFlinkMiniCluster] = None
val parallelism = 4
@BeforeClass
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index 8bb440c..f94ff68 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -40,6 +40,7 @@ import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -49,7 +50,6 @@ import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.test.util.TestEnvironment;
import org.junit.After;
import org.junit.AfterClass;
@@ -134,7 +134,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
}
}
- private static ForkableFlinkMiniCluster flinkCluster;
+ private static LocalFlinkMiniCluster flinkCluster;
// ------------------------------------------------------------------------
// Cluster Setup (Cassandra & Flink)
@@ -205,7 +205,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
- flinkCluster = new ForkableFlinkMiniCluster(config);
+ flinkCluster = new LocalFlinkMiniCluster(config);
flinkCluster.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 9e3c33b..c4949ff 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -30,7 +31,6 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.InstantiationUtil;
import org.junit.AfterClass;
@@ -58,7 +58,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
private static KafkaTestEnvironment kafkaServer;
private static Properties standardProps;
- private static ForkableFlinkMiniCluster flink;
+ private static LocalFlinkMiniCluster flink;
@BeforeClass
public static void prepare() throws IOException, ClassNotFoundException {
@@ -88,7 +88,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
- flink = new ForkableFlinkMiniCluster(flinkConfig, false);
+ flink = new LocalFlinkMiniCluster(flinkConfig, false);
flink.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index eddb57c..771db17 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -22,8 +22,8 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.jmx.JMXReporter;
import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
@@ -65,7 +65,7 @@ public abstract class KafkaTestBase extends TestLogger {
protected static Properties standardProps;
- protected static ForkableFlinkMiniCluster flink;
+ protected static LocalFlinkMiniCluster flink;
protected static int flinkPort;
@@ -105,7 +105,7 @@ public abstract class KafkaTestBase extends TestLogger {
flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter");
flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
- flink = new ForkableFlinkMiniCluster(flinkConfig, false);
+ flink = new LocalFlinkMiniCluster(flinkConfig, false);
flink.start();
flinkPort = flink.getLeaderRPCPort();
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
index 3705943..2e452c1 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
@@ -21,11 +21,11 @@ import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread;
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisEventsGeneratorProducerThread;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,7 +80,7 @@ public class ManualExactlyOnceTest {
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
- ForkableFlinkMiniCluster flink = new ForkableFlinkMiniCluster(flinkConfig, false);
+ LocalFlinkMiniCluster flink = new LocalFlinkMiniCluster(flinkConfig, false);
flink.start();
final int flinkPort = flink.getLeaderRPCPort();