You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/08/31 12:31:43 UTC
[07/10] flink git commit: [FLINK-2291] [runtime] Add ZooKeeper
support to elect a leader from a set of JobManager. The leader will then be
retrieved from ZooKeeper by the TaskManagers.
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/RemoteAddressExtension.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/RemoteAddressExtension.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/RemoteAddressExtension.scala
new file mode 100644
index 0000000..79f63de
--- /dev/null
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/RemoteAddressExtension.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka
+
+import akka.actor.{Address, ExtensionKey, Extension, ExtendedActorSystem}
+
+/** [[akka.actor.ActorSystem]] [[Extension]] used to obtain the [[Address]] on which the
+ * given ActorSystem is listening.
+ *
+ * @param system
+ */
+class RemoteAddressExtensionImplementation(system: ExtendedActorSystem) extends Extension {
+ def address: Address = system.provider.getDefaultAddress
+}
+
+object RemoteAddressExtension extends ExtensionKey[RemoteAddressExtensionImplementation]{}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index e9a31fb..26bf91b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -24,7 +24,8 @@ import java.net.InetSocketAddress
import java.util.{UUID, Collections}
import akka.actor.Status.Failure
-import akka.actor._
+import akka.actor.{Props, Terminated, PoisonPill, ActorRef, ActorSystem}
+import akka.pattern.ask
import grizzled.slf4j.Logger
@@ -36,24 +37,30 @@ import org.apache.flink.runtime.blob.BlobServer
import org.apache.flink.runtime.client._
import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex}
import org.apache.flink.runtime.jobmanager.web.WebInfoServer
+import org.apache.flink.runtime.leaderelection.{LeaderContender, LeaderElectionService}
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, UpdateTaskExecutionState}
-import org.apache.flink.runtime.messages.accumulators._
-import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint}
+
+import org.apache.flink.runtime.messages.accumulators.{AccumulatorResultsErroneous,
+AccumulatorResultsFound, RequestAccumulatorResults, AccumulatorMessage,
+AccumulatorResultStringsFound, RequestAccumulatorResultsStringified}
+import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage,
+AcknowledgeCheckpoint}
import org.apache.flink.runtime.messages.webmonitor._
import org.apache.flink.runtime.process.ProcessReaper
import org.apache.flink.runtime.security.SecurityUtils
import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
import org.apache.flink.runtime.taskmanager.TaskManager
-import org.apache.flink.runtime.util.{SerializedThrowable, ZooKeeperUtil, EnvironmentInformation}
+import org.apache.flink.runtime.util._
import org.apache.flink.runtime.webmonitor.WebMonitor
-import org.apache.flink.runtime.{FlinkActor, StreamingMode, LeaderSessionMessages}
+import org.apache.flink.runtime.{FlinkActor, StreamingMode, LeaderSessionMessageFilter}
import org.apache.flink.runtime.LogMessages
-import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.akka.{ListeningBehaviour, AkkaUtils}
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway, InstanceManager}
+import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager}
import org.apache.flink.runtime.jobgraph.{JobVertexID, JobGraph, JobStatus}
import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
import org.apache.flink.runtime.messages.JobManagerMessages._
@@ -61,7 +68,6 @@ import org.apache.flink.runtime.messages.RegistrationMessages._
import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, Heartbeat}
import org.apache.flink.util.{SerializedValue, ExceptionUtils, InstantiationUtil}
-import _root_.akka.pattern.ask
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ForkJoinPool
@@ -105,38 +111,58 @@ class JobManager(
protected val defaultExecutionRetries: Int,
protected val delayBetweenRetries: Long,
protected val timeout: FiniteDuration,
- protected val mode: StreamingMode)
- extends FlinkActor
- with LeaderSessionMessages // order of the mixin is important, we want filtering after logging
- with LogMessages // order of the mixin is important, we want first logging
- {
+ protected val mode: StreamingMode,
+ protected val leaderElectionService: LeaderElectionService)
+ extends FlinkActor
+ with LeaderSessionMessageFilter // mixin oder is important, we want filtering after logging
+ with LogMessages // mixin order is important, we want first logging
+ with LeaderContender {
override val log = Logger(getClass)
/** List of current jobs running jobs */
protected val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
- override val leaderSessionID = Some(UUID.randomUUID())
+ var leaderSessionID: Option[UUID] = None
/**
* Run when the job manager is started. Simply logs an informational message.
+ * The method also starts the leader election service.
*/
override def preStart(): Unit = {
- log.info(s"Starting JobManager at ${self.path.toSerializationFormat}.")
+ log.info(s"Starting JobManager at ${getAddress}.")
+
+ try {
+ leaderElectionService.start(this)
+ } catch {
+ case e: Exception =>
+ log.error("Could not start the JobManager because the leader election service did not " +
+ "start.", e)
+ throw new RuntimeException("Could not start the leader election service.", e)
+ }
}
override def postStop(): Unit = {
- log.info(s"Stopping JobManager ${self.path.toSerializationFormat}.")
+ log.info(s"Stopping JobManager ${getAddress}.")
+
+ cancelAndClearEverything(new Exception("The JobManager is shutting down."))
// disconnect the registered task managers
instanceManager.getAllRegisteredInstances.asScala.foreach {
- _.getActorGateway().tell(Disconnect("JobManager is shutting down"))
+ _.getActorGateway().tell(
+ Disconnect("JobManager is shutting down"),
+ new AkkaActorGateway(self, leaderSessionID.orNull))
}
- archive ! decorateMessage(PoisonPill)
+ try {
+ // revoke leadership and stop leader election service
+ leaderElectionService.stop()
+ } catch {
+ case e: Exception => log.error("Could not properly shutdown the leader election service.")
+ }
- for((e,_) <- currentJobs.values) {
- e.fail(new Exception("The JobManager is shutting down."))
+ if (archive != ActorRef.noSender) {
+ archive ! decorateMessage(PoisonPill)
}
instanceManager.shutdown()
@@ -158,13 +184,40 @@ class JobManager(
*/
override def handleMessage: Receive = {
+ case GrantLeadership(newLeaderSessionID) =>
+ log.info(s"JobManager ${getAddress} was granted leadership with leader session ID " +
+ s"${newLeaderSessionID}.")
+
+ leaderSessionID = newLeaderSessionID
+
+ // confirming the leader session ID might be blocking, thus do it in a future
+ future{
+ leaderElectionService.confirmLeaderSessionID(newLeaderSessionID.orNull)
+ }(context.dispatcher)
+
+ case RevokeLeadership =>
+ log.info(s"JobManager ${self.path.toSerializationFormat} was revoked leadership.")
+
+ cancelAndClearEverything(new Exception("JobManager is no longer the leader."))
+
+ // disconnect the registered task managers
+ instanceManager.getAllRegisteredInstances.asScala.foreach {
+ _.getActorGateway().tell(
+ Disconnect("JobManager is no longer the leader"),
+ new AkkaActorGateway(self, leaderSessionID.orNull))
+ }
+
+ instanceManager.unregisterAllTaskManagers()
+
+ leaderSessionID = None
+
case RegisterTaskManager(
- registrationSessionID,
- taskManager,
connectionInfo,
hardwareInformation,
numberOfSlots) =>
+ val taskManager = sender()
+
if (instanceManager.isRegistered(taskManager)) {
val instanceID = instanceManager.getRegisteredInstance(taskManager).getId
@@ -172,9 +225,6 @@ class JobManager(
// TaskManager actor, but the ask future!
sender() ! decorateMessage(
AlreadyRegistered(
- registrationSessionID,
- leaderSessionID.get,
- self,
instanceID,
libraryCacheManager.getBlobServerPort)
)
@@ -186,15 +236,12 @@ class JobManager(
connectionInfo,
hardwareInformation,
numberOfSlots,
- leaderSessionID)
+ leaderSessionID.orNull)
// IMPORTANT: Send the response to the "sender", which is not the
// TaskManager actor, but the ask future!
sender() ! decorateMessage(
AcknowledgeRegistration(
- registrationSessionID,
- leaderSessionID.get,
- self,
instanceID,
libraryCacheManager.getBlobServerPort)
)
@@ -212,7 +259,6 @@ class JobManager(
// TaskManager actor, but the ask future!
sender() ! decorateMessage(
RefuseRegistration(
- registrationSessionID,
ExceptionUtils.stringifyException(e))
)
}
@@ -224,8 +270,8 @@ class JobManager(
case RequestTotalNumberOfSlots =>
sender ! decorateMessage(instanceManager.getTotalNumberOfSlots)
- case SubmitJob(jobGraph, listen) =>
- submitJob(jobGraph, listenToEvents = listen)
+ case SubmitJob(jobGraph, listeningBehaviour) =>
+ submitJob(jobGraph, listeningBehaviour)
case CancelJob(jobID) =>
log.info(s"Trying to cancel job with ID $jobID.")
@@ -334,22 +380,23 @@ class JobManager(
jobInfo.end = timeStamp
// is the client waiting for the job result?
- newJobStatus match {
- case JobStatus.FINISHED =>
- val accumulatorResults: java.util.Map[String, SerializedValue[AnyRef]] = try {
+ if(jobInfo.client != ActorRef.noSender) {
+ newJobStatus match {
+ case JobStatus.FINISHED =>
+ val accumulatorResults: java.util.Map[String, SerializedValue[AnyRef]] = try {
executionGraph.getAccumulatorsSerialized()
- } catch {
- case e: Exception =>
- log.error(s"Cannot fetch serialized accumulators for job $jobID", e)
- Collections.emptyMap()
- }
+ } catch {
+ case e: Exception =>
+ log.error(s"Cannot fetch serialized accumulators for job $jobID", e)
+ Collections.emptyMap()
+ }
val result = new SerializedJobExecutionResult(
jobID,
jobInfo.duration,
accumulatorResults)
- jobInfo.client ! decorateMessage(JobResultSuccess(result))
+ jobInfo.client ! decorateMessage(JobResultSuccess(result))
- case JobStatus.CANCELED =>
+ case JobStatus.CANCELED =>
// the error may be packed as a serialized throwable
val unpackedError = SerializedThrowable.get(
error, executionGraph.getUserClassLoader())
@@ -358,7 +405,7 @@ class JobManager(
new SerializedThrowable(
new JobCancellationException(jobID, "Job was cancelled.", unpackedError))))
- case JobStatus.FAILED =>
+ case JobStatus.FAILED =>
val unpackedError = SerializedThrowable.get(
error, executionGraph.getUserClassLoader())
@@ -366,11 +413,12 @@ class JobManager(
new SerializedThrowable(
new JobExecutionException(jobID, "Job execution failed.", unpackedError))))
- case x =>
+ case x =>
val exception = new JobExecutionException(jobID, s"$x is not a terminal state.")
jobInfo.client ! decorateMessage(JobResultFailure(
new SerializedThrowable(exception)))
- throw exception
+ throw exception
+ }
}
removeJob(jobID)
@@ -463,6 +511,9 @@ class JobManager(
case RequestBlobManagerPort =>
sender ! decorateMessage(libraryCacheManager.getBlobServerPort)
+ case RequestArchive =>
+ sender ! decorateMessage(ResponseArchive(archive))
+
case RequestRegisteredTaskManagers =>
import scala.collection.JavaConverters._
sender ! decorateMessage(
@@ -486,13 +537,13 @@ class JobManager(
case RequestStackTrace(instanceID) =>
val gateway = instanceManager.getRegisteredInstanceById(instanceID).getActorGateway
- gateway.forward(SendStackTrace, new AkkaActorGateway(sender(), leaderSessionID))
+ gateway.forward(SendStackTrace, new AkkaActorGateway(sender, leaderSessionID.orNull))
case Terminated(taskManager) =>
if (instanceManager.isRegistered(taskManager)) {
log.info(s"Task manager ${taskManager.path} terminated.")
- instanceManager.unregisterTaskManager(taskManager)
+ instanceManager.unregisterTaskManager(taskManager, true)
context.unwatch(taskManager)
}
@@ -505,12 +556,12 @@ class JobManager(
if (instanceManager.isRegistered(taskManager)) {
log.info(s"Task manager ${taskManager.path} wants to disconnect, because $msg.")
- instanceManager.unregisterTaskManager(taskManager)
+ instanceManager.unregisterTaskManager(taskManager, false)
context.unwatch(taskManager)
}
case RequestLeaderSessionID =>
- sender() ! ResponseLeaderSessionID(leaderSessionID)
+ sender() ! ResponseLeaderSessionID(leaderSessionID.orNull)
}
/**
@@ -519,10 +570,9 @@ class JobManager(
* graph and the execution vertices are queued for scheduling.
*
* @param jobGraph representing the Flink job
- * @param listenToEvents true if the sender wants to listen to job status and execution state
- * change notifications. false if not.
+ * @param listeningBehaviour specifies the listening behaviour of the sender.
*/
- private def submitJob(jobGraph: JobGraph, listenToEvents: Boolean): Unit = {
+ private def submitJob(jobGraph: JobGraph, listeningBehaviour: ListeningBehaviour): Unit = {
if (jobGraph == null) {
sender() ! decorateMessage(JobResultFailure(
new SerializedThrowable(
@@ -560,6 +610,14 @@ class JobManager(
throw new JobSubmissionException(jobId, "The given job is empty")
}
+ val client = if(listeningBehaviour == ListeningBehaviour.DETACHED) {
+ // The client does not want to receive the SerializedJobExecutionResult
+ ActorRef.noSender
+ } else {
+ // Send the job execution result back to the sender
+ sender
+ }
+
// see if there already exists an ExecutionGraph for the corresponding job ID
executionGraph = currentJobs.getOrElseUpdate(
jobGraph.getJobID,
@@ -571,7 +629,7 @@ class JobManager(
timeout,
jobGraph.getUserJarBlobKeys(),
userCodeLoader),
- JobInfo(sender(), System.currentTimeMillis())
+ JobInfo(client, System.currentTimeMillis())
)
)._1
@@ -603,52 +661,53 @@ class JobManager(
s"The vertex ${vertex.getID} (${vertex.getName}) has no invokable class.")
}
- if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {
- vertex.setParallelism(numSlots)
- }
+ if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {
+ vertex.setParallelism(numSlots)
+ }
- try {
- vertex.initializeOnMaster(userCodeLoader)
- }
- catch {
+ try {
+ vertex.initializeOnMaster(userCodeLoader)
+ }
+ catch {
case t: Throwable =>
throw new JobExecutionException(jobId,
"Cannot initialize task '" + vertex.getName() + "': " + t.getMessage, t)
- }
- }
+ }
+ }
- // topologically sort the job vertices and attach the graph to the existing one
- val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources()
- if (log.isDebugEnabled) {
- log.debug(s"Adding ${sortedTopology.size()} vertices from " +
- s"job graph ${jobId} (${jobName}).")
- }
- executionGraph.attachJobGraph(sortedTopology)
+ // topologically sort the job vertices and attach the graph to the existing one
+ val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources()
+ if (log.isDebugEnabled) {
+ log.debug(s"Adding ${sortedTopology.size()} vertices from " +
+ s"job graph ${jobId} (${jobName}).")
+ }
+ executionGraph.attachJobGraph(sortedTopology)
- if (log.isDebugEnabled) {
- log.debug(s"Successfully created execution graph from job graph ${jobId} (${jobName}).")
- }
+ if (log.isDebugEnabled) {
+ log.debug("Successfully created execution graph from job " +
+ s"graph ${jobId} (${jobName}).")
+ }
- // configure the state checkpointing
- val snapshotSettings = jobGraph.getSnapshotSettings
- if (snapshotSettings != null) {
+ // configure the state checkpointing
+ val snapshotSettings = jobGraph.getSnapshotSettings
+ if (snapshotSettings != null) {
- val idToVertex: JobVertexID => ExecutionJobVertex = id => {
- val vertex = executionGraph.getJobVertex(id)
- if (vertex == null) {
- throw new JobSubmissionException(jobId,
- "The snapshot checkpointing settings refer to non-existent vertex " + id)
- }
- vertex
- }
+ val idToVertex: JobVertexID => ExecutionJobVertex = id => {
+ val vertex = executionGraph.getJobVertex(id)
+ if (vertex == null) {
+ throw new JobSubmissionException(jobId,
+ "The snapshot checkpointing settings refer to non-existent vertex " + id)
+ }
+ vertex
+ }
- val triggerVertices: java.util.List[ExecutionJobVertex] =
+ val triggerVertices: java.util.List[ExecutionJobVertex] =
snapshotSettings.getVerticesToTrigger().asScala.map(idToVertex).asJava
- val ackVertices: java.util.List[ExecutionJobVertex] =
+ val ackVertices: java.util.List[ExecutionJobVertex] =
snapshotSettings.getVerticesToAcknowledge().asScala.map(idToVertex).asJava
- val confirmVertices: java.util.List[ExecutionJobVertex] =
+ val confirmVertices: java.util.List[ExecutionJobVertex] =
snapshotSettings.getVerticesToConfirm().asScala.map(idToVertex).asJava
executionGraph.enableSnapshotCheckpointing(
@@ -658,15 +717,16 @@ class JobManager(
ackVertices,
confirmVertices,
context.system,
- leaderSessionID)
+ leaderSessionID.orNull)
}
// get notified about job status changes
- executionGraph.registerJobStatusListener(new AkkaActorGateway(self, leaderSessionID))
+ executionGraph.registerJobStatusListener(
+ new AkkaActorGateway(self, leaderSessionID.orNull))
- if (listenToEvents) {
+ if (listeningBehaviour == ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) {
// the sender wants to be notified about state changes
- val gateway = new AkkaActorGateway(sender(), leaderSessionID)
+ val gateway = new AkkaActorGateway(sender(), leaderSessionID.orNull)
executionGraph.registerExecutionListener(gateway)
executionGraph.registerJobStatusListener(gateway)
@@ -929,6 +989,53 @@ class JobManager(
}
}
+ /** Fails all currently running jobs and empties the list of currently running jobs. If the
+ * [[JobClientActor]] waits for a result, then a [[JobExecutionException]] is sent.
+ *
+ * @param cause Cause for the cancelling.
+ */
+ private def cancelAndClearEverything(cause: Throwable) {
+ for((jobID, (eg, jobInfo)) <- currentJobs) {
+ eg.fail(cause)
+
+ if(jobInfo.client != ActorRef.noSender) {
+ jobInfo.client ! decorateMessage(
+ Failure(
+ new JobExecutionException(
+ jobID,
+ "All jobs are cancelled and cleared.",
+ cause)
+ ))
+ }
+ }
+
+ currentJobs.clear()
+ }
+
+ override def grantLeadership(newLeaderSessionID: UUID): Unit = {
+ self ! decorateMessage(GrantLeadership(Option(newLeaderSessionID)))
+ }
+
+ override def revokeLeadership(): Unit = {
+ leaderSessionID = None
+ self ! decorateMessage(RevokeLeadership)
+ }
+
+ override def getAddress: String = {
+ AkkaUtils.getAkkaURL(context.system, self)
+ }
+
+ /** Handles error occuring in the leader election service
+ *
+ * @param exception
+ */
+ override def handleError(exception: Exception): Unit = {
+ log.error("Received an error from the LeaderElectionService.", exception)
+
+ // terminate JobManager in case of an error
+ self ! decorateMessage(PoisonPill)
+ }
+
/**
* Updates the accumulators reported from a task manager via the Heartbeat message.
* @param accumulators list of accumulator snapshots
@@ -1001,12 +1108,22 @@ object JobManager {
System.exit(STARTUP_FAILURE_RETURN_CODE)
}
- // address and will not be reachable from anyone remote
- if (listeningPort <= 0 || listeningPort >= 65536) {
- val message = "Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY +
- "' is invalid, it must be great than 0 and less than 65536."
- LOG.error(message)
- System.exit(STARTUP_FAILURE_RETURN_CODE)
+ if (ZooKeeperUtils.isZooKeeperHighAvailabilityEnabled(configuration)) {
+ // address and will not be reachable from anyone remote
+ if (listeningPort != 0) {
+ val message = "Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY +
+ "' is invalid, it must be equal to 0."
+ LOG.error(message)
+ System.exit(STARTUP_FAILURE_RETURN_CODE)
+ }
+ } else {
+ // address and will not be reachable from anyone remote
+ if (listeningPort <= 0 || listeningPort >= 65536) {
+ val message = "Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY +
+ "' is invalid, it must be greater than 0 and less than 65536."
+ LOG.error(message)
+ System.exit(STARTUP_FAILURE_RETURN_CODE)
+ }
}
// run the job manager
@@ -1089,7 +1206,7 @@ object JobManager {
try {
// bring up the job manager actor
LOG.info("Starting JobManager actor")
- val (jobManager, archiver) = startJobManagerActors(
+ val (jobManager, archive) = startJobManagerActors(
configuration,
jobManagerSystem,
streamingMode)
@@ -1114,7 +1231,7 @@ object JobManager {
jobManagerSystem,
listeningAddress,
Some(TaskManager.TASK_MANAGER_NAME),
- Some(jobManager.path.toString),
+ None,
true,
streamingMode,
classOf[TaskManager])
@@ -1130,9 +1247,15 @@ object JobManager {
}
if(configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
- val lookupTimeout = AkkaUtils.getLookupTimeout(configuration)
- val jobManagerGateway = JobManager.getJobManagerGateway(jobManager, lookupTimeout)
- val archiverGateway = new AkkaActorGateway(archiver, jobManagerGateway.leaderSessionID())
+
+ // TODO: Add support for HA. Webserver has to work in dedicated mode. All transferred
+ // information has to be made serializable
+ val address = AkkaUtils.getAddress(jobManagerSystem)
+
+ configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.host.get)
+ configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.port.get)
+
+ val leaderRetrievalService = StandaloneUtils.createLeaderRetrievalService(configuration)
// start the job manager web frontend
val webServer = if (
@@ -1143,14 +1266,16 @@ object JobManager {
LOG.info("Starting NEW JobManger web frontend")
// start the new web frontend. we need to load this dynamically
// because it is not in the same project/dependencies
- startWebRuntimeMonitor(configuration, jobManagerGateway, archiverGateway)
+ startWebRuntimeMonitor(configuration, leaderRetrievalService, jobManagerSystem)
}
else {
LOG.info("Starting JobManger web frontend")
- new WebInfoServer(configuration, jobManagerGateway, archiverGateway)
+ new WebInfoServer(configuration, leaderRetrievalService, jobManagerSystem)
}
- webServer.start()
+ if(webServer != null) {
+ webServer.start()
+ }
}
}
catch {
@@ -1208,6 +1333,13 @@ object JobManager {
} text {
"Network address for communication with the job manager"
}
+
+ opt[Int]("webui-port").optional().action { (arg, conf) =>
+ conf.setWebUIPort(arg)
+ conf
+ } text {
+ "Port for the UI web server"
+ }
}
val config = parser.parse(args, new JobManagerCliOptions()).getOrElse {
@@ -1232,42 +1364,38 @@ object JobManager {
configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir + "/..")
}
+ if (config.getWebUIPort() >= 0) {
+ configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, config.getWebUIPort())
+ }
+
+ if (config.getHost() != null) {
+ configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, config.getHost())
+ }
+
+ val host = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
+
// high availability mode
- val (hostname: String, port: Int ) =
- if (ZooKeeperUtil.isJobManagerHighAvailabilityEnabled(configuration)) {
- // TODO @removeme @tillrohrmann This is the place where the host and random port for JM is
- // chosen. For the FlinkMiniCluster you have to choose it on your own.
+ val port: Int =
+ if (ZooKeeperUtils.isZooKeeperHighAvailabilityEnabled(configuration)) {
LOG.info("Starting JobManager in High-Availability Mode")
-
- if (config.getHost() == null) {
- throw new Exception("Missing parameter '--host'. Parameter is required when " +
- "running in high-availability mode")
- }
-
- // Let web server listen on random port
- configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0)
-
- (config.getHost(), 0)
+
+ configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
+ 0
}
else {
LOG.info("Staring JobManager without high-availability")
-
- if (config.getHost() != null) {
- throw new Exception("Found an explicit address for JobManager communication " +
- "via the CLI option '--host'.\n" +
- "This parameter must only be set if the JobManager is started in high-availability " +
- "mode and connects to a ZooKeeper quorum.\n" +
- "Please configure ZooKeeper or don't set the '--host' option, so that the JobManager " +
- "uses the address configured under 'conf/flink-conf.yaml'.")
- }
- val host = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
- val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+ configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
- (host, port)
}
- (configuration, config.getJobManagerMode(), config.getStreamingMode(), hostname, port)
+ val executionMode = config.getJobManagerMode
+ val streamingMode = config.getStreamingMode
+
+ LOG.info(s"Starting JobManager on $host:$port with execution mode $executionMode and " +
+ s"streaming mode $streamingMode")
+
+ (configuration, executionMode, streamingMode, host, port)
}
/**
@@ -1278,9 +1406,17 @@ object JobManager {
* @param configuration The configuration from which to parse the config values.
* @return The members for a default JobManager.
*/
- def createJobManagerComponents(configuration: Configuration)
- : (ExecutionContext, InstanceManager, FlinkScheduler, BlobLibraryCacheManager,
- Props, Int, Long, FiniteDuration, Int) = {
+ def createJobManagerComponents(configuration: Configuration) :
+ (ExecutionContext,
+ InstanceManager,
+ FlinkScheduler,
+ BlobLibraryCacheManager,
+ Props,
+ Int, // execution retries
+ Long, // delay between retries
+ FiniteDuration, // timeout
+ Int, // number of archived jobs
+ LeaderElectionService) = {
val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
@@ -1346,6 +1482,8 @@ object JobManager {
}
}
+ val leaderElectionService = LeaderElectionUtils.createLeaderElectionService(configuration)
+
(executionContext,
instanceManager,
scheduler,
@@ -1353,8 +1491,9 @@ object JobManager {
archiveProps,
executionRetries,
delayBetweenRetries,
- timeout,
- archiveCount)
+ timeout,
+ archiveCount,
+ leaderElectionService)
}
/**
@@ -1386,7 +1525,7 @@ object JobManager {
* @param actorSystem The actor system running the JobManager
* @param jobMangerActorName Optionally the name of the JobManager actor. If none is given,
* the actor will have the name generated by the actor system.
- * @param archiverActorName Optionally the name of the archive actor. If none is given,
+ * @param archiveActorName Optionally the name of the archive actor. If none is given,
* the actor will have the name generated by the actor system.
* @param streamingMode The mode to run the system in (streaming vs. batch-only)
*
@@ -1396,7 +1535,7 @@ object JobManager {
configuration: Configuration,
actorSystem: ActorSystem,
jobMangerActorName: Option[String],
- archiverActorName: Option[String],
+ archiveActorName: Option[String],
streamingMode: StreamingMode)
: (ActorRef, ActorRef) = {
@@ -1408,10 +1547,11 @@ object JobManager {
executionRetries,
delayBetweenRetries,
timeout,
- _) = createJobManagerComponents(configuration)
+ _,
+ leaderElectionService) = createJobManagerComponents(configuration)
// start the archiver with the given name, or without (avoid name conflicts)
- val archiver: ActorRef = archiverActorName match {
+ val archive: ActorRef = archiveActorName match {
case Some(actorName) => actorSystem.actorOf(archiveProps, actorName)
case None => actorSystem.actorOf(archiveProps)
}
@@ -1423,18 +1563,19 @@ object JobManager {
instanceManager,
scheduler,
libraryCacheManager,
- archiver,
+ archive,
executionRetries,
delayBetweenRetries,
timeout,
- streamingMode)
+ streamingMode,
+ leaderElectionService)
val jobManager: ActorRef = jobMangerActorName match {
case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)
case None => actorSystem.actorOf(jobManagerProps)
}
- (jobManager, archiver)
+ (jobManager, archive)
}
def startActor(props: Props, actorSystem: ActorSystem): ActorRef = {
@@ -1452,9 +1593,13 @@ object JobManager {
* @param address The address of the JobManager's actor system.
* @return The akka URL of the JobManager actor.
*/
- def getRemoteJobManagerAkkaURL(address: InetSocketAddress): String = {
+ def getRemoteJobManagerAkkaURL(
+ address: InetSocketAddress,
+ name: Option[String] = None)
+ : String = {
val hostPort = address.getAddress().getHostAddress() + ":" + address.getPort()
- s"akka.tcp://flink@$hostPort/user/$JOB_MANAGER_NAME"
+
+ getJobManagerAkkaURLHelper(s"akka.tcp://flink@$hostPort", name)
}
/**
@@ -1463,17 +1608,24 @@ object JobManager {
*
* @return The local akka URL of the JobManager actor.
*/
- def getLocalJobManagerAkkaURL: String = {
- "akka://flink/user/" + JOB_MANAGER_NAME
+ def getLocalJobManagerAkkaURL(name: Option[String] = None): String = {
+ getJobManagerAkkaURLHelper("akka://flink", name)
+ }
+
+ def getJobManagerAkkaURL(system: ActorSystem, name: Option[String] = None): String = {
+ getJobManagerAkkaURLHelper(AkkaUtils.getAddress(system).toString, name)
}
- def getJobManagerRemoteReferenceFuture(
+ private def getJobManagerAkkaURLHelper(address: String, name: Option[String]): String = {
+ address + "/user/" + name.getOrElse(JOB_MANAGER_NAME)
+ }
+
+ def getJobManagerActorRefFuture(
address: InetSocketAddress,
system: ActorSystem,
timeout: FiniteDuration)
: Future[ActorRef] = {
-
- AkkaUtils.getReference(getRemoteJobManagerAkkaURL(address), system, timeout)
+ AkkaUtils.getActorRefFuture(getRemoteJobManagerAkkaURL(address), system, timeout)
}
/**
@@ -1486,25 +1638,12 @@ object JobManager {
* @return The ActorRef to the JobManager
*/
@throws(classOf[IOException])
- def getJobManagerRemoteReference(
+ def getJobManagerActorRef(
jobManagerUrl: String,
system: ActorSystem,
timeout: FiniteDuration)
: ActorRef = {
-
- try {
- val future = AkkaUtils.getReference(jobManagerUrl, system, timeout)
- Await.result(future, timeout)
- }
- catch {
- case e @ (_ : ActorNotFound | _ : TimeoutException) =>
- throw new IOException(
- s"JobManager at $jobManagerUrl not reachable. " +
- s"Please make sure that the JobManager is running and its port is reachable.", e)
-
- case e: IOException =>
- throw new IOException("Could not connect to JobManager at " + jobManagerUrl, e)
- }
+ AkkaUtils.getActorRef(jobManagerUrl, system, timeout)
}
/**
@@ -1517,14 +1656,14 @@ object JobManager {
* @return The ActorRef to the JobManager
*/
@throws(classOf[IOException])
- def getJobManagerRemoteReference(
+ def getJobManagerActorRef(
address: InetSocketAddress,
system: ActorSystem,
timeout: FiniteDuration)
: ActorRef = {
val jmAddress = getRemoteJobManagerAkkaURL(address)
- getJobManagerRemoteReference(jmAddress, system, timeout)
+ getJobManagerActorRef(jmAddress, system, timeout)
}
/**
@@ -1537,37 +1676,16 @@ object JobManager {
* @return The ActorRef to the JobManager
*/
@throws(classOf[IOException])
- def getJobManagerRemoteReference(
+ def getJobManagerActorRef(
address: InetSocketAddress,
system: ActorSystem,
config: Configuration)
: ActorRef = {
val timeout = AkkaUtils.getLookupTimeout(config)
- getJobManagerRemoteReference(address, system, timeout)
+ getJobManagerActorRef(address, system, timeout)
}
- /** Returns the [[ActorGateway]] for the provided JobManager. The function automatically
- * retrieves the current leader session ID from the JobManager and instantiates the
- * [[AkkaActorGateway]] with it.
- *
- * @param jobManager ActorRef to the [[JobManager]]
- * @param timeout Timeout for the blocking leader session ID retrieval
- * @throws java.lang.Exception
- * @return Gateway to the specified JobManager
- */
- @throws(classOf[Exception])
- def getJobManagerGateway(
- jobManager: ActorRef,
- timeout: FiniteDuration
- ): ActorGateway = {
- val futureLeaderSessionID = (jobManager ? RequestLeaderSessionID)(timeout)
- .mapTo[ResponseLeaderSessionID]
-
- val leaderSessionID = Await.result(futureLeaderSessionID, timeout).leaderSessionID
-
- new AkkaActorGateway(jobManager, leaderSessionID)
- }
// --------------------------------------------------------------------------
// Utilities
@@ -1581,29 +1699,28 @@ object JobManager {
* this method does not throw any exceptions, but only logs them.
*
* @param config The configuration for the runtime monitor.
- * @param jobManager The JobManager actor gateway.
- * @param archiver The execution graph archive actor.
+ * @param leaderRetrievalService Leader retrieval service to get the leading JobManager
*/
def startWebRuntimeMonitor(
config: Configuration,
- jobManager: ActorGateway,
- archiver: ActorGateway)
+ leaderRetrievalService: LeaderRetrievalService,
+ actorSystem: ActorSystem)
: WebMonitor = {
// try to load and instantiate the class
try {
val classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor"
val clazz: Class[_ <: WebMonitor] = Class.forName(classname)
- .asSubclass(classOf[WebMonitor])
+ .asSubclass(classOf[WebMonitor])
val ctor: Constructor[_ <: WebMonitor] = clazz.getConstructor(classOf[Configuration],
- classOf[ActorGateway],
- classOf[ActorGateway])
- ctor.newInstance(config, jobManager, archiver)
+ classOf[LeaderRetrievalService],
+ classOf[ActorSystem])
+ ctor.newInstance(config, leaderRetrievalService, actorSystem)
}
catch {
case e: ClassNotFoundException =>
LOG.error("Could not load web runtime monitor. " +
- "Probably reason: flink-runtime-web is not in the classpath")
+ "Probably reason: flink-runtime-web is not in the classpath")
LOG.debug("Caught exception", e)
null
case e: InvocationTargetException =>
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index e2891de..702e34b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -71,6 +71,10 @@ class MemoryArchivist(private val max_entries: Int)
var canceledCnt: Int = 0
var failedCnt: Int = 0
+ override def preStart(): Unit = {
+ log.info(s"Started memory archivist ${self.path}")
+ }
+
override def handleMessage: Receive = {
/* Receive Execution Graph to archive */
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
index 83bafaa..2369d3c 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.messages
import java.text.SimpleDateFormat
-import java.util.Date
+import java.util.{UUID, Date}
import org.apache.flink.api.common.JobID
import org.apache.flink.runtime.execution.ExecutionState
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 1c250af..d7bbb8d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.messages
import java.util.UUID
+import akka.actor.ActorRef
import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.akka.ListeningBehaviour
import org.apache.flink.runtime.client.{SerializedJobExecutionResult, JobStatusMessage}
import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
import org.apache.flink.runtime.instance.{InstanceID, Instance}
@@ -35,17 +37,32 @@ import scala.collection.JavaConverters._
*/
object JobManagerMessages {
- case class LeaderSessionMessage(leaderSessionID: Option[UUID], message: Any)
+ /** Wrapper class for leader session messages. Leader session messages implement the
+ * [[RequiresLeaderSessionID]] interface and have to be wrapped in a [[LeaderSessionMessage]],
+ * which also contains the current leader session ID.
+ *
+ * @param leaderSessionID Current leader session ID or null, if no leader session ID was set
+ * @param message [[RequiresLeaderSessionID]] message to be wrapped in a [[LeaderSessionMessage]]
+ */
+ case class LeaderSessionMessage(leaderSessionID: UUID, message: Any)
/**
- * Submits a job to the job manager. If [[registerForEvents]] is true,
- * then the sender will be registered as listener for the state change messages.
+ * Submits a job to the job manager. Depending on the [[listeningBehaviour]],
+ * the sender registers for different messages. If [[ListeningBehaviour.DETACHED]], then
+ * it will only be informed whether the submission was successful or not. If
+ * [[ListeningBehaviour.EXECUTION_RESULT]], then it will additionally receive the execution
+ * result. If [[ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES]], then it will additionally
+ * receive the job status change notifications.
+ *
* The submission result will be sent back to the sender as a success message.
*
* @param jobGraph The job to be submitted to the JobManager
- * @param registerForEvents if true, then register for state change events
+ * @param listeningBehaviour Specifies to what the sender wants to listen (detached, execution
+ * result, execution result and state changes)
*/
- case class SubmitJob(jobGraph: JobGraph, registerForEvents: Boolean)
+ case class SubmitJob(
+ jobGraph: JobGraph,
+ listeningBehaviour: ListeningBehaviour)
extends RequiresLeaderSessionID
/**
@@ -161,7 +178,7 @@ object JobManagerMessages {
*
* @param leaderSessionID
*/
- case class ResponseLeaderSessionID(leaderSessionID: Option[UUID])
+ case class ResponseLeaderSessionID(leaderSessionID: UUID)
/**
* Denotes a successful job submission.
@@ -299,6 +316,22 @@ object JobManagerMessages {
case object JobManagerStatusAlive extends JobManagerStatus
+ /** Grants leadership to the receiver. The message contains the new leader session id.
+ *
+ * @param leaderSessionID
+ */
+ case class GrantLeadership(leaderSessionID: Option[UUID])
+
+ /** Revokes leadership of the receiver.
+ */
+ case object RevokeLeadership
+
+ /** Requests the ActorRef of the archiver */
+ case object RequestArchive
+
+ /** Response containing the ActorRef of the archiver */
+ case class ResponseArchive(actor: ActorRef)
+
// --------------------------------------------------------------------------
// Utility methods to allow simpler case object access from Java
// --------------------------------------------------------------------------
@@ -338,4 +371,8 @@ object JobManagerMessages {
def getRequestLeaderSessionID: AnyRef = {
RequestLeaderSessionID
}
+
+ def getRequestArchive: AnyRef = {
+ RequestArchive
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
index b435ebc..941d63f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
@@ -18,9 +18,6 @@
package org.apache.flink.runtime.messages
-import java.util.UUID
-
-import akka.actor.ActorRef
import org.apache.flink.runtime.instance.{InstanceConnectionInfo, InstanceID, HardwareDescription}
import scala.concurrent.duration.{Deadline, FiniteDuration}
@@ -34,21 +31,18 @@ object RegistrationMessages {
/**
* Marker trait for registration messages.
*/
- trait RegistrationMessage {
- def registrationSessionID: UUID
- }
+ trait RegistrationMessage extends RequiresLeaderSessionID {}
/**
* Triggers the TaskManager to attempt a registration at the JobManager.
*
- * @param jobManagerAkkaURL The actor URL of the JobManager.
+ * @param jobManagerURL Akka URL to the JobManager
* @param timeout The timeout for the message. The next retry will double this timeout.
* @param deadline Optional deadline until when the registration must be completed.
* @param attempt The attempt number, for logging.
*/
case class TriggerTaskManagerRegistration(
- registrationSessionID: UUID,
- jobManagerAkkaURL: String,
+ jobManagerURL: String,
timeout: FiniteDuration,
deadline: Option[Deadline],
attempt: Int)
@@ -58,14 +52,11 @@ object RegistrationMessages {
* Registers a task manager at the job manager. A successful registration is acknowledged by
* [[AcknowledgeRegistration]].
*
- * @param taskManager The TaskManager actor.
* @param connectionInfo The TaskManagers connection information.
* @param resources The TaskManagers resources.
* @param numberOfSlots The number of processing slots offered by the TaskManager.
*/
case class RegisterTaskManager(
- registrationSessionID: UUID,
- taskManager: ActorRef,
connectionInfo: InstanceConnectionInfo,
resources: HardwareDescription,
numberOfSlots: Int)
@@ -80,9 +71,6 @@ object RegistrationMessages {
* @param blobPort The server port where the JobManager's BLOB service runs.
*/
case class AcknowledgeRegistration(
- registrationSessionID: UUID,
- leaderSessionID: UUID,
- jobManager: ActorRef,
instanceID: InstanceID,
blobPort: Int)
extends RegistrationMessage
@@ -94,9 +82,6 @@ object RegistrationMessages {
* @param blobPort The server port where the JobManager's BLOB service runs.
*/
case class AlreadyRegistered(
- registrationSessionID: UUID,
- leaderSessionID: UUID,
- jobManager: ActorRef,
instanceID: InstanceID,
blobPort: Int)
extends RegistrationMessage
@@ -107,6 +92,6 @@ object RegistrationMessages {
*
* @param reason Reason why the task manager registration was refused
*/
- case class RefuseRegistration(registrationSessionID: UUID, reason: String)
+ case class RefuseRegistration(reason: String)
extends RegistrationMessage
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
new file mode 100644
index 0000000..a80ca99
--- /dev/null
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages
+
+import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor}
+import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, IntermediateResultPartitionID}
+import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState
+import org.apache.flink.runtime.taskmanager.TaskExecutionState
+
+/**
+ * A set of messages that control the deployment and the state of Tasks executed
+ * on the TaskManager.
+ */
+object TaskMessages {
+
+ /**
+ * Marker trait for task messages.
+ */
+ trait TaskMessage
+
+ // --------------------------------------------------------------------------
+ // Starting and stopping Tasks
+ // --------------------------------------------------------------------------
+
+ /**
+ * Submits a task to the task manager. The result is to this message is a
+ * [[TaskOperationResult]] message.
+ *
+ * @param tasks Descriptor which contains the information to start the task.
+ */
+ case class SubmitTask(tasks: TaskDeploymentDescriptor)
+ extends TaskMessage with RequiresLeaderSessionID
+
+ /**
+ * Cancels the task associated with [[attemptID]]. The result is sent back to the sender as a
+ * [[TaskOperationResult]] message.
+ *
+ * @param attemptID The task's execution attempt ID.
+ */
+ case class CancelTask(attemptID: ExecutionAttemptID)
+ extends TaskMessage with RequiresLeaderSessionID
+
+ /**
+ * Triggers a fail of specified task from the outside (as opposed to the task throwing
+ * an exception itself) with the given exception as the cause.
+ *
+ * @param executionID The task's execution attempt ID.
+ * @param cause The reason for the external failure.
+ */
+ case class FailTask(executionID: ExecutionAttemptID, cause: Throwable)
+ extends TaskMessage
+
+ /**
+ * Notifies the TaskManager that the task has reached its final state,
+ * either FINISHED, CANCELED, or FAILED.
+ *
+ * @param executionID The task's execution attempt ID.
+ */
+ case class TaskInFinalState(executionID: ExecutionAttemptID)
+ extends TaskMessage
+
+
+ // --------------------------------------------------------------------------
+ // Updates to Intermediate Results
+ // --------------------------------------------------------------------------
+
+ /**
+ * Answer to a [[RequestPartitionState]] with the state of the respective partition.
+ */
+ case class PartitionState(
+ taskExecutionId: ExecutionAttemptID,
+ taskResultId: IntermediateDataSetID,
+ partitionId: IntermediateResultPartitionID,
+ state: ExecutionState)
+ extends TaskMessage with RequiresLeaderSessionID
+
+ /**
+ * Base class for messages that update the information about location of input partitions
+ */
+ abstract sealed class UpdatePartitionInfo extends TaskMessage with RequiresLeaderSessionID {
+ def executionID: ExecutionAttemptID
+ }
+
+ /**
+ *
+ * @param executionID The task's execution attempt ID.
+ * @param resultId The input reader to update.
+ * @param partitionInfo The partition info update.
+ */
+ case class UpdateTaskSinglePartitionInfo(
+ executionID: ExecutionAttemptID,
+ resultId: IntermediateDataSetID,
+ partitionInfo: InputChannelDeploymentDescriptor)
+ extends UpdatePartitionInfo
+
+ /**
+ *
+ * @param executionID The task's execution attempt ID.
+ * @param partitionInfos List of input gates with channel descriptors to update.
+ */
+ case class UpdateTaskMultiplePartitionInfos(
+ executionID: ExecutionAttemptID,
+ partitionInfos: Seq[(IntermediateDataSetID, InputChannelDeploymentDescriptor)])
+ extends UpdatePartitionInfo
+
+ /**
+ * Fails (and releases) all intermediate result partitions identified by
+ * [[executionID]] from the task manager.
+ *
+ * @param executionID The task's execution attempt ID.
+ */
+ case class FailIntermediateResultPartitions(executionID: ExecutionAttemptID)
+ extends TaskMessage with RequiresLeaderSessionID
+
+
+ // --------------------------------------------------------------------------
+ // Report Messages
+ // --------------------------------------------------------------------------
+
+ /**
+ * Denotes a state change of a task at the JobManager. The update success is acknowledged by a
+ * boolean value which is sent back to the sender.
+ *
+ * @param taskExecutionState The changed task state
+ */
+ case class UpdateTaskExecutionState(taskExecutionState: TaskExecutionState)
+ extends TaskMessage with RequiresLeaderSessionID
+
+ /**
+ * Response message to updates in the task state. Send for example as a response to
+ *
+ * - [[SubmitTask]]
+ * - [[CancelTask]]
+ *
+ * @param executionID identifying the respective task
+ * @param success indicating whether the operation has been successful
+ * @param description Optional description for unsuccessful results.
+ */
+ case class TaskOperationResult(
+ executionID: ExecutionAttemptID,
+ success: Boolean,
+ description: String)
+ extends TaskMessage {
+ def this(executionID: ExecutionAttemptID, success: Boolean) = this(executionID, success, "")
+ }
+
+
+ // --------------------------------------------------------------------------
+ // Utility Functions
+ // --------------------------------------------------------------------------
+
+ def createUpdateTaskMultiplePartitionInfos(
+ executionID: ExecutionAttemptID,
+ resultIDs: java.util.List[IntermediateDataSetID],
+ partitionInfos: java.util.List[InputChannelDeploymentDescriptor])
+ : UpdateTaskMultiplePartitionInfos = {
+
+ require(resultIDs.size() == partitionInfos.size(),
+ "ResultIDs must have the same length as partitionInfos.")
+
+ import scala.collection.JavaConverters.asScalaBufferConverter
+
+ new UpdateTaskMultiplePartitionInfos(executionID,
+ resultIDs.asScala.zip(partitionInfos.asScala))
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
index 6cb571c..30c82fe 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
@@ -18,6 +18,9 @@
package org.apache.flink.runtime.messages
+import java.util.UUID
+
+import akka.actor.ActorRef
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
import org.apache.flink.runtime.instance.InstanceID
@@ -92,17 +95,25 @@ object TaskManagerMessages {
/**
* Requests a notification from the task manager as soon as the task manager has been
- * registered at the job manager. Once the task manager is registered at the job manager a
+ * registered at any job manager. Once the task manager is registered at any job manager a
* [[RegisteredAtJobManager]] message will be sent to the sender.
*/
- case object NotifyWhenRegisteredAtJobManager
+ case object NotifyWhenRegisteredAtAnyJobManager
/**
- * Acknowledges that the task manager has been successfully registered at the job manager. This
- * message is a response to [[NotifyWhenRegisteredAtJobManager]].
+ * Acknowledges that the task manager has been successfully registered at any job manager. This
+ * message is a response to [[NotifyWhenRegisteredAtAnyJobManager]].
*/
case object RegisteredAtJobManager
+ /** Tells the address of the new leading [[org.apache.flink.runtime.jobmanager.JobManager]]
+ * and the new leader session ID.
+ *
+ * @param jobManagerAddress Address of the new leading JobManager
+ * @param leaderSessionID New leader session ID
+ */
+ case class JobManagerLeaderAddress(jobManagerAddress: String, leaderSessionID: UUID)
+
// --------------------------------------------------------------------------
// Utility getters for case objects to simplify access from Java
@@ -113,7 +124,7 @@ object TaskManagerMessages {
* @return The NotifyWhenRegisteredAtJobManager case object instance.
*/
def getNotifyWhenRegisteredAtJobManagerMessage:
- NotifyWhenRegisteredAtJobManager.type = NotifyWhenRegisteredAtJobManager
+ NotifyWhenRegisteredAtAnyJobManager.type = NotifyWhenRegisteredAtAnyJobManager
/**
* Accessor for the case object instance, to simplify Java interoperability.
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
deleted file mode 100644
index a80ca99..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages
-
-import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor}
-import org.apache.flink.runtime.execution.ExecutionState
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, IntermediateResultPartitionID}
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState
-import org.apache.flink.runtime.taskmanager.TaskExecutionState
-
-/**
- * A set of messages that control the deployment and the state of Tasks executed
- * on the TaskManager.
- */
-object TaskMessages {
-
- /**
- * Marker trait for task messages.
- */
- trait TaskMessage
-
- // --------------------------------------------------------------------------
- // Starting and stopping Tasks
- // --------------------------------------------------------------------------
-
- /**
- * Submits a task to the task manager. The result is to this message is a
- * [[TaskOperationResult]] message.
- *
- * @param tasks Descriptor which contains the information to start the task.
- */
- case class SubmitTask(tasks: TaskDeploymentDescriptor)
- extends TaskMessage with RequiresLeaderSessionID
-
- /**
- * Cancels the task associated with [[attemptID]]. The result is sent back to the sender as a
- * [[TaskOperationResult]] message.
- *
- * @param attemptID The task's execution attempt ID.
- */
- case class CancelTask(attemptID: ExecutionAttemptID)
- extends TaskMessage with RequiresLeaderSessionID
-
- /**
- * Triggers a fail of specified task from the outside (as opposed to the task throwing
- * an exception itself) with the given exception as the cause.
- *
- * @param executionID The task's execution attempt ID.
- * @param cause The reason for the external failure.
- */
- case class FailTask(executionID: ExecutionAttemptID, cause: Throwable)
- extends TaskMessage
-
- /**
- * Notifies the TaskManager that the task has reached its final state,
- * either FINISHED, CANCELED, or FAILED.
- *
- * @param executionID The task's execution attempt ID.
- */
- case class TaskInFinalState(executionID: ExecutionAttemptID)
- extends TaskMessage
-
-
- // --------------------------------------------------------------------------
- // Updates to Intermediate Results
- // --------------------------------------------------------------------------
-
- /**
- * Answer to a [[RequestPartitionState]] with the state of the respective partition.
- */
- case class PartitionState(
- taskExecutionId: ExecutionAttemptID,
- taskResultId: IntermediateDataSetID,
- partitionId: IntermediateResultPartitionID,
- state: ExecutionState)
- extends TaskMessage with RequiresLeaderSessionID
-
- /**
- * Base class for messages that update the information about location of input partitions
- */
- abstract sealed class UpdatePartitionInfo extends TaskMessage with RequiresLeaderSessionID {
- def executionID: ExecutionAttemptID
- }
-
- /**
- *
- * @param executionID The task's execution attempt ID.
- * @param resultId The input reader to update.
- * @param partitionInfo The partition info update.
- */
- case class UpdateTaskSinglePartitionInfo(
- executionID: ExecutionAttemptID,
- resultId: IntermediateDataSetID,
- partitionInfo: InputChannelDeploymentDescriptor)
- extends UpdatePartitionInfo
-
- /**
- *
- * @param executionID The task's execution attempt ID.
- * @param partitionInfos List of input gates with channel descriptors to update.
- */
- case class UpdateTaskMultiplePartitionInfos(
- executionID: ExecutionAttemptID,
- partitionInfos: Seq[(IntermediateDataSetID, InputChannelDeploymentDescriptor)])
- extends UpdatePartitionInfo
-
- /**
- * Fails (and releases) all intermediate result partitions identified by
- * [[executionID]] from the task manager.
- *
- * @param executionID The task's execution attempt ID.
- */
- case class FailIntermediateResultPartitions(executionID: ExecutionAttemptID)
- extends TaskMessage with RequiresLeaderSessionID
-
-
- // --------------------------------------------------------------------------
- // Report Messages
- // --------------------------------------------------------------------------
-
- /**
- * Denotes a state change of a task at the JobManager. The update success is acknowledged by a
- * boolean value which is sent back to the sender.
- *
- * @param taskExecutionState The changed task state
- */
- case class UpdateTaskExecutionState(taskExecutionState: TaskExecutionState)
- extends TaskMessage with RequiresLeaderSessionID
-
- /**
- * Response message to updates in the task state. Send for example as a response to
- *
- * - [[SubmitTask]]
- * - [[CancelTask]]
- *
- * @param executionID identifying the respective task
- * @param success indicating whether the operation has been successful
- * @param description Optional description for unsuccessful results.
- */
- case class TaskOperationResult(
- executionID: ExecutionAttemptID,
- success: Boolean,
- description: String)
- extends TaskMessage {
- def this(executionID: ExecutionAttemptID, success: Boolean) = this(executionID, success, "")
- }
-
-
- // --------------------------------------------------------------------------
- // Utility Functions
- // --------------------------------------------------------------------------
-
- def createUpdateTaskMultiplePartitionInfos(
- executionID: ExecutionAttemptID,
- resultIDs: java.util.List[IntermediateDataSetID],
- partitionInfos: java.util.List[InputChannelDeploymentDescriptor])
- : UpdateTaskMultiplePartitionInfos = {
-
- require(resultIDs.size() == partitionInfos.size(),
- "ResultIDs must have the same length as partitionInfos.")
-
- import scala.collection.JavaConverters.asScalaBufferConverter
-
- new UpdateTaskMultiplePartitionInfos(executionID,
- resultIDs.asScala.zip(partitionInfos.asScala))
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index c4c35f8..bbd011a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.minicluster
import java.net.InetAddress
+import java.util.UUID
import akka.pattern.Patterns.gracefulStop
import akka.pattern.ask
@@ -26,22 +27,25 @@ import akka.actor.{ActorRef, ActorSystem}
import com.typesafe.config.Config
-import org.apache.flink.api.common.{JobExecutionResult, JobSubmissionResult}
+import org.apache.flink.api.common.{JobID, JobExecutionResult, JobSubmissionResult}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.StreamingMode
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.client.{JobExecutionException, JobClient}
import org.apache.flink.runtime.instance.{AkkaActorGateway, ActorGateway}
import org.apache.flink.runtime.jobgraph.JobGraph
-import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.jobmanager.web.WebInfoServer
-import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
+import org.apache.flink.runtime.jobmanager.{JobManager, RecoveryMode}
+import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalService, LeaderRetrievalListener,
+StandaloneLeaderRetrievalService}
+import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtAnyJobManager
+import org.apache.flink.runtime.util.{StandaloneUtils, ZooKeeperUtils}
import org.apache.flink.runtime.webmonitor.WebMonitor
import org.slf4j.LoggerFactory
import scala.concurrent.duration.FiniteDuration
-import scala.concurrent.{ExecutionContext, Future, Await}
+import scala.concurrent._
/**
* Abstract base class for Flink's mini cluster. The mini cluster starts a
@@ -50,15 +54,16 @@ import scala.concurrent.{ExecutionContext, Future, Await}
* actors can all be run in the same [[ActorSystem]] or each one in its own.
*
* @param userConfiguration Configuration object with the user provided configuration values
- * @param singleActorSystem true if all actors (JobManager and TaskManager) shall be run in the same
- * [[ActorSystem]], otherwise false
+ * @param useSingleActorSystem true if all actors (JobManager and TaskManager) shall be run in the
+ * same [[ActorSystem]], otherwise false
* @param streamingMode True, if the system should be started in streaming mode, false if
* in pure batch mode.
*/
abstract class FlinkMiniCluster(
val userConfiguration: Configuration,
- val singleActorSystem: Boolean,
- val streamingMode: StreamingMode) {
+ val useSingleActorSystem: Boolean,
+ val streamingMode: StreamingMode)
+ extends LeaderRetrievalListener {
def this(userConfiguration: Configuration, singleActorSystem: Boolean)
= this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
@@ -73,55 +78,110 @@ abstract class FlinkMiniCluster(
// not getLocalHost(), which may be 127.0.1.1
val hostname = InetAddress.getByName("localhost").getHostAddress()
- val timeout = AkkaUtils.getTimeout(userConfiguration)
-
val configuration = generateConfiguration(userConfiguration)
- var jobManagerActorSystem = startJobManagerActorSystem()
- var (jobManagerActor, webMonitor) = startJobManager(jobManagerActorSystem)
+ /** Future to the [[ActorGateway]] of the current leader */
+ var leaderGateway: Promise[ActorGateway] = Promise()
- val numTaskManagers = configuration.getInteger(
- ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
+ /** Future to the index of the current leader */
+ var leaderIndex: Promise[Int] = Promise()
- var (taskManagerActorSystems, taskManagerActors) =
- (for(i <- 0 until numTaskManagers) yield {
- val actorSystem = if(singleActorSystem) {
- jobManagerActorSystem
- } else {
- startTaskManagerActorSystem(i)
- }
+ /** Future lock */
+ val futureLock = new Object()
+
+ implicit val executionContext = ExecutionContext.global
- (actorSystem, startTaskManager(i, actorSystem))
- }).unzip
+ implicit val timeout = AkkaUtils.getTimeout(userConfiguration)
+
+ val recoveryMode = RecoveryMode.valueOf(configuration.getString(
+ ConfigConstants.RECOVERY_MODE,
+ ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase)
+
+ val numJobManagers = getNumberOfJobManagers
+
+ val numTaskManagers = configuration.getInteger(
+ ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
+ ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER)
- waitForTaskManagersToBeRegistered()
+ var jobManagerActorSystems: Option[Seq[ActorSystem]] = None
+ var jobManagerActors: Option[Seq[ActorRef]] = None
+ var webMonitor: Option[WebMonitor] = None
+ var taskManagerActorSystems: Option[Seq[ActorSystem]] = None
+ var taskManagerActors: Option[Seq[ActorRef]] = None
+ protected var leaderRetrievalService: Option[LeaderRetrievalService] = None
// --------------------------------------------------------------------------
- // Construction
+ // Abstract Methods
// --------------------------------------------------------------------------
def generateConfiguration(userConfiguration: Configuration): Configuration
- def startJobManager(system: ActorSystem): (ActorRef, Option[WebMonitor])
+ def startJobManager(index: Int, system: ActorSystem): ActorRef
def startTaskManager(index: Int, system: ActorSystem): ActorRef
- def getJobManagerAkkaConfig: Config = {
- if (singleActorSystem) {
+ // --------------------------------------------------------------------------
+ // Getters/Setters
+ // --------------------------------------------------------------------------
+
+ def getNumberOfJobManagers: Int = {
+ if(recoveryMode == RecoveryMode.STANDALONE) {
+ 1
+ } else {
+ configuration.getInteger(
+ ConfigConstants.LOCAL_NUMBER_JOB_MANAGER,
+ ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER
+ )
+ }
+ }
+
+ def getJobManagersAsJava = {
+ import collection.JavaConverters._
+ jobManagerActors.getOrElse(Seq()).asJava
+ }
+
+ def getTaskManagers = {
+ taskManagerActors.getOrElse(Seq())
+ }
+
+ def getTaskManagersAsJava = {
+ import collection.JavaConverters._
+ taskManagerActors.getOrElse(Seq()).asJava
+ }
+
+ def getLeaderGatewayFuture: Future[ActorGateway] = {
+ leaderGateway.future
+ }
+
+ def getLeaderGateway(timeout: FiniteDuration): ActorGateway = {
+ val jmFuture = getLeaderGatewayFuture
+
+ Await.result(jmFuture, timeout)
+ }
+
+ def getLeaderIndexFuture: Future[Int] = {
+ leaderIndex.future
+ }
+
+ def getLeaderIndex(timeout: FiniteDuration): Int = {
+ val indexFuture = getLeaderIndexFuture
+
+ Await.result(indexFuture, timeout)
+ }
+
+ def getJobManagerAkkaConfig(index: Int): Config = {
+ if (useSingleActorSystem) {
AkkaUtils.getAkkaConfig(configuration, None)
}
else {
val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
- AkkaUtils.getAkkaConfig(configuration, Some((hostname, port)))
- }
- }
+ val resolvedPort = if(port != 0) port + index else port
- def startJobManagerActorSystem(): ActorSystem = {
- val config = getJobManagerAkkaConfig
- AkkaUtils.createActorSystem(config)
+ AkkaUtils.getAkkaConfig(configuration, Some((hostname, resolvedPort)))
+ }
}
def getTaskManagerAkkaConfig(index: Int): Config = {
@@ -133,77 +193,95 @@ abstract class FlinkMiniCluster(
AkkaUtils.getAkkaConfig(configuration, Some((hostname, resolvedPort)))
}
+ // --------------------------------------------------------------------------
+ // Start/Stop Methods
+ // --------------------------------------------------------------------------
+
+ def startJobManagerActorSystem(index: Int): ActorSystem = {
+ val config = getJobManagerAkkaConfig(index)
+ AkkaUtils.createActorSystem(config)
+ }
+
def startTaskManagerActorSystem(index: Int): ActorSystem = {
val config = getTaskManagerAkkaConfig(index)
AkkaUtils.createActorSystem(config)
}
- def getJobManagerGateway(): ActorGateway = {
- // create ActorGateway from the JobManager's ActorRef
- JobManager.getJobManagerGateway(jobManagerActor, timeout)
+ def startJobClientActorSystem(jobID: JobID): ActorSystem = {
+ if (useSingleActorSystem) {
+ jobManagerActorSystems match {
+ case Some(jmActorSystems) => jmActorSystems(0)
+ case None => throw new JobExecutionException(
+ jobID,
+ "The FlinkMiniCluster has not been started yet.")
+ }
+ } else {
+ JobClient.startJobClientActorSystem(configuration)
+ }
}
- def getTaskManagers = {
- taskManagerActors
+ def start(): Unit = {
+ start(true)
}
- def getTaskManagersAsJava = {
- import collection.JavaConverters._
- taskManagerActors.asJava
- }
+ def start(waitForTaskManagerRegistration: Boolean): Unit = {
+ LOG.info("Starting FlinkMiniCluster.")
- def stop(): Unit = {
- LOG.info("Stopping FlinkMiniCluster.")
- shutdown()
- awaitTermination()
- }
+ lazy val singleActorSystem = startJobManagerActorSystem(0)
- def shutdown(): Unit = {
- webMonitor foreach {
- _.stop()
- }
+ val (jmActorSystems, jmActors) =
+ (for(i <- 0 until numJobManagers) yield {
+ val actorSystem = if(useSingleActorSystem) {
+ singleActorSystem
+ } else {
+ startJobManagerActorSystem(i)
+ }
+ (actorSystem, startJobManager(i, actorSystem))
+ }).unzip
- val futures = taskManagerActors map {
- gracefulStop(_, timeout)
- }
+ jobManagerActorSystems = Some(jmActorSystems)
+ jobManagerActors = Some(jmActors)
- val future = gracefulStop(jobManagerActor, timeout)
+ val lrs = createLeaderRetrievalService();
- implicit val executionContext = ExecutionContext.global
+ leaderRetrievalService = Some(lrs)
+ lrs.start(this)
- Await.ready(Future.sequence(future +: futures), timeout)
+ val (tmActorSystems, tmActors) =
+ (for(i <- 0 until numTaskManagers) yield {
+ val actorSystem = if(useSingleActorSystem) {
+ jmActorSystems(0)
+ } else {
+ startTaskManagerActorSystem(i)
+ }
- if(!singleActorSystem){
- taskManagerActorSystems foreach {
- _.shutdown()
- }
- }
+ (actorSystem, startTaskManager(i, actorSystem))
+ }).unzip
- jobManagerActorSystem.shutdown()
- }
+ taskManagerActorSystems = Some(tmActorSystems)
+ taskManagerActors = Some(tmActors)
- def awaitTermination(): Unit = {
- jobManagerActorSystem.awaitTermination()
+ val jobManagerAkkaURL = AkkaUtils.getAkkaURL(jmActorSystems(0), jmActors(0))
- taskManagerActorSystems foreach {
- _.awaitTermination()
+ webMonitor = startWebServer(configuration, jmActorSystems(0), jobManagerAkkaURL)
+
+ if(waitForTaskManagerRegistration) {
+ waitForTaskManagersToBeRegistered()
}
}
def startWebServer(
config: Configuration,
- jobManager: ActorRef,
- archiver: ActorRef)
+ actorSystem: ActorSystem,
+ jobManagerAkkaURL: String)
: Option[WebMonitor] = {
if(
- config.getBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false) &&
- config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
-
- val lookupTimeout = AkkaUtils.getLookupTimeout(config)
+ config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false) &&
+ config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
- val jobManagerGateway = JobManager.getJobManagerGateway(jobManager, lookupTimeout)
- val archiverGateway = new AkkaActorGateway(archiver, jobManagerGateway.leaderSessionID())
+ // TODO: Add support for HA: Make web server work independently from the JM
+ val leaderRetrievalService = new StandaloneLeaderRetrievalService(jobManagerAkkaURL)
// start the job manager web frontend
val webServer = if (
@@ -214,11 +292,10 @@ abstract class FlinkMiniCluster(
LOG.info("Starting NEW JobManger web frontend")
// start the new web frontend. we need to load this dynamically
// because it is not in the same project/dependencies
- JobManager.startWebRuntimeMonitor(config, jobManagerGateway, archiverGateway)
- }
- else {
+ JobManager.startWebRuntimeMonitor(config, leaderRetrievalService, actorSystem)
+ } else {
LOG.info("Starting JobManger web frontend")
- new WebInfoServer(config, jobManagerGateway, archiverGateway)
+ new WebInfoServer(config, leaderRetrievalService, actorSystem)
}
webServer.start()
@@ -229,18 +306,89 @@ abstract class FlinkMiniCluster(
}
}
- def waitForTaskManagersToBeRegistered(): Unit = {
+ def stop(): Unit = {
+ LOG.info("Stopping FlinkMiniCluster.")
+ shutdown()
+ awaitTermination()
+
+ leaderRetrievalService.foreach(_.stop())
+ }
+
+ protected def shutdown(): Unit = {
+ webMonitor foreach {
+ _.stop()
+ }
+
+ val tmFutures = taskManagerActors map {
+ _.map(gracefulStop(_, timeout))
+ } getOrElse(Seq())
+
+
+ val jmFutures = jobManagerActors map {
+ _.map(gracefulStop(_, timeout))
+ } getOrElse(Seq())
+
implicit val executionContext = ExecutionContext.global
- val futures = taskManagerActors map {
- taskManager => (taskManager ? NotifyWhenRegisteredAtJobManager)(timeout)
+ Await.ready(Future.sequence(jmFutures ++ tmFutures), timeout)
+
+ if (!useSingleActorSystem) {
+ taskManagerActorSystems foreach {
+ _ foreach(_.shutdown())
+ }
}
+ jobManagerActorSystems foreach {
+ _ foreach(_.shutdown())
+ }
+ }
+
+ def awaitTermination(): Unit = {
+ jobManagerActorSystems foreach {
+ _ foreach(_.awaitTermination())
+ }
+
+ taskManagerActorSystems foreach {
+ _ foreach(_.awaitTermination())
+ }
+ }
+
+ // --------------------------------------------------------------------------
+ // Utility Methods
+ // --------------------------------------------------------------------------
+
+ /** Waits with the default timeout until all task managers have registered at the job manager
+ *
+ * @throws java.util.concurrent.TimeoutException
+ * @throws java.lang.InterruptedException
+ */
+ @throws(classOf[TimeoutException])
+ @throws(classOf[InterruptedException])
+ def waitForTaskManagersToBeRegistered(): Unit = {
+ waitForTaskManagersToBeRegistered(timeout)
+ }
+
+ /** Waits until all task managers have registered at the job manager until the timeout is reached.
+ *
+ * @param timeout
+ * @throws java.util.concurrent.TimeoutException
+ * @throws java.lang.InterruptedException
+ */
+ @throws(classOf[TimeoutException])
+ @throws(classOf[InterruptedException])
+ def waitForTaskManagersToBeRegistered(timeout: FiniteDuration): Unit = {
+ val futures = taskManagerActors map {
+ _ map(taskManager => (taskManager ? NotifyWhenRegisteredAtAnyJobManager)(timeout))
+ } getOrElse(Seq())
+
Await.ready(Future.sequence(futures), timeout)
}
@throws(classOf[JobExecutionException])
- def submitJobAndWait(jobGraph: JobGraph, printUpdates: Boolean): JobExecutionResult = {
+ def submitJobAndWait(
+ jobGraph: JobGraph,
+ printUpdates: Boolean)
+ : JobExecutionResult = {
submitJobAndWait(jobGraph, printUpdates, timeout)
}
@@ -251,25 +399,126 @@ abstract class FlinkMiniCluster(
timeout: FiniteDuration)
: JobExecutionResult = {
- val clientActorSystem = if (singleActorSystem) jobManagerActorSystem
- else JobClient.startJobClientActorSystem(configuration)
-
- JobClient.submitJobAndWait(
- clientActorSystem,
- getJobManagerGateway(),
- jobGraph,
- timeout,
+ val clientActorSystem = startJobClientActorSystem(jobGraph.getJobID)
+
+ try {
+ val jobManagerGateway = try {
+ getLeaderGateway(timeout)
+ } catch {
+ case e: Exception => throw new JobExecutionException(
+ jobGraph.getJobID,
+ "Could not retrieve leading job manager gateway.",
+ e)
+ }
+
+ JobClient.submitJobAndWait(
+ clientActorSystem,
+ jobManagerGateway,
+ jobGraph,
+ timeout,
printUpdates,
this.getClass().getClassLoader())
+ } finally {
+ if(!useSingleActorSystem) {
+ // we have to shutdown the just created actor system
+ shutdownJobClientActorSystem(clientActorSystem)
+ }
+ }
}
@throws(classOf[JobExecutionException])
def submitJobDetached(jobGraph: JobGraph) : JobSubmissionResult = {
- JobClient.submitJobDetached(
- getJobManagerGateway(),
- jobGraph,
- timeout,
- getClass().getClassLoader())
+
+ val jobManagerGateway = try {
+ getLeaderGateway(timeout)
+ } catch {
+ case t: Throwable =>
+ throw new JobExecutionException(
+ jobGraph.getJobID,
+ "Could not retrieve JobManager ActorRef.",
+ t
+ )
+ }
+
+ JobClient.submitJobDetached(jobManagerGateway, jobGraph, timeout, getClass().getClassLoader())
+
new JobSubmissionResult(jobGraph.getJobID)
}
+
+ def shutdownJobClientActorSystem(actorSystem: ActorSystem): Unit = {
+ if(!useSingleActorSystem) {
+ actorSystem.shutdown()
+ }
+ }
+
+ protected def createLeaderRetrievalService(): LeaderRetrievalService = {
+ (jobManagerActorSystems, jobManagerActors) match {
+ case (Some(jmActorSystems), Some(jmActors)) =>
+ if (recoveryMode == RecoveryMode.STANDALONE) {
+ new StandaloneLeaderRetrievalService(
+ AkkaUtils.getAkkaURL(jmActorSystems(0), jmActors(0)))
+ } else {
+ ZooKeeperUtils.createLeaderRetrievalService(configuration)
+ }
+
+ case _ => throw new Exception("The FlinkMiniCluster has not been started properly.")
+ }
+ }
+
+ protected def clearLeader(): Unit = {
+ futureLock.synchronized{
+ leaderGateway = Promise()
+ leaderIndex = Promise()
+ }
+ }
+
+ override def notifyLeaderAddress(address: String, leaderSessionID: UUID): Unit = {
+ if (address != null && !address.equals("")) {
+ // only accept leader addresses which are not null and non-empty
+
+ val selectedLeader = (jobManagerActorSystems, jobManagerActors) match {
+ case (Some(systems), Some(actors)) =>
+ val actorPaths = systems.zip(actors).zipWithIndex.map {
+ case ((system, actor), index) => (AkkaUtils.getAkkaURL(system, actor), actor, index)
+ }
+
+ actorPaths.find {
+ case (path, actor, index) => path.equals(address)
+ }.map(x => (x._2, x._3))
+ case _ => None
+ }
+
+ futureLock.synchronized {
+ if (leaderGateway.isCompleted) {
+ // assignments happen atomically and only here
+ leaderGateway = Promise()
+ leaderIndex = Promise()
+ }
+
+ selectedLeader match {
+ case Some((leader, index)) =>
+ leaderGateway.success(new AkkaActorGateway(leader, leaderSessionID))
+ leaderIndex.success(index)
+ case None =>
+ leaderGateway.failure(
+ new Exception(s"Could not find job manager with address ${address}."))
+ leaderIndex.failure(
+ new Exception(s"Could not find job manager index with address ${address}.")
+ )
+ }
+ }
+ }
+ }
+
+ override def handleError(exception: Exception): Unit = {
+ futureLock.synchronized {
+ if(leaderGateway.isCompleted) {
+ leaderGateway = Promise.failed(exception)
+ leaderIndex = Promise.failed(exception)
+ } else{
+ leaderGateway.failure(exception)
+ leaderIndex.failure(exception)
+ }
+ }
+ }
}