You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/08/31 12:31:42 UTC
[06/10] flink git commit: [FLINK-2291] [runtime] Add ZooKeeper
support to elect a leader from a set of JobManager. The leader will then be
retrieved from ZooKeeper by the TaskManagers.
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 54c457e..3f8783a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -18,32 +18,29 @@
package org.apache.flink.runtime.minicluster
-import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
+import akka.actor.{ActorRef, ActorSystem}
import org.apache.flink.api.common.io.FileOutputFormat
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.StreamingMode
import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.client.JobClient
-import org.apache.flink.runtime.instance.AkkaActorGateway
import org.apache.flink.runtime.io.network.netty.NettyConfig
import org.apache.flink.runtime.jobmanager.JobManager
-import org.apache.flink.runtime.jobmanager.web.WebInfoServer
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.util.EnvironmentInformation
-import org.apache.flink.runtime.webmonitor.WebMonitor
import org.slf4j.LoggerFactory
/**
* Local Flink mini cluster which executes all [[TaskManager]]s and the [[JobManager]] in the same
- * JVM. It extends the [[FlinkMiniCluster]] by providing a [[JobClient]], having convenience
- * functions to setup Flink's configuration and implementations to create [[JobManager]] and
- * [[TaskManager]].
+ * JVM. It extends the [[FlinkMiniCluster]] by having convenience functions to setup Flink's
+ * configuration and implementations to create [[JobManager]] and [[TaskManager]].
*
* @param userConfiguration Configuration object with the user provided configuration values
* @param singleActorSystem true if all actors (JobManager and TaskManager) shall be run in the same
* [[ActorSystem]], otherwise false
+ * @param streamingMode Defines the execution mode of Flink's components (JobManager and
+ * TaskManager)
*/
class LocalFlinkMiniCluster(
userConfiguration: Configuration,
@@ -51,22 +48,12 @@ class LocalFlinkMiniCluster(
streamingMode: StreamingMode)
extends FlinkMiniCluster(userConfiguration, singleActorSystem, streamingMode) {
-
def this(userConfiguration: Configuration, singleActorSystem: Boolean)
= this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
def this(userConfiguration: Configuration) = this(userConfiguration, true)
// --------------------------------------------------------------------------
-
-
- val jobClientActorSystem = if (singleActorSystem) {
- jobManagerActorSystem
- } else {
- // create an actor system listening on a random port
- JobClient.startJobClientActorSystem(configuration)
- }
-
override def generateConfiguration(userConfiguration: Configuration): Configuration = {
val config = getDefaultConfig
@@ -78,14 +65,28 @@ class LocalFlinkMiniCluster(
config
}
- override def startJobManager(system: ActorSystem): (ActorRef, Option[WebMonitor]) = {
+ override def startJobManager(index: Int, system: ActorSystem): ActorRef = {
val config = configuration.clone()
-
- val (jobManager, archiver) = JobManager.startJobManagerActors(config, system, streamingMode)
- val webMonitorOption = startWebServer(config, jobManager, archiver)
+ val jobManagerName = getJobManagerName(index)
+ val archiveName = getArchiveName(index)
+
+ val jobManagerPort = config.getInteger(
+ ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+ ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+
+ if(jobManagerPort > 0) {
+ config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
+ }
- (jobManager, webMonitorOption)
+ val (jobManager, _) = JobManager.startJobManagerActors(
+ config,
+ system,
+ Some(jobManagerName),
+ Some(archiveName),
+ streamingMode)
+
+ jobManager
}
override def startTaskManager(index: Int, system: ActorSystem): ActorRef = {
@@ -113,51 +114,32 @@ class LocalFlinkMiniCluster(
} else {
TaskManager.TASK_MANAGER_NAME
}
-
- val jobManagerPath: Option[String] = if (singleActorSystem) {
- Some(jobManagerActor.path.toString)
- } else {
- None
- }
TaskManager.startTaskManagerComponentsAndActor(
config,
system,
hostname, // network interface to bind to
Some(taskManagerActorName), // actor name
- jobManagerPath, // job manager akka URL
+ Some(createLeaderRetrievalService), // job manager leader retrieval service
localExecution, // start network stack?
streamingMode,
classOf[TaskManager])
}
- def getJobClientActorSystem: ActorSystem = jobClientActorSystem
-
- def getJobManagerRPCPort: Int = {
- if (jobManagerActorSystem.isInstanceOf[ExtendedActorSystem]) {
- val extActor = jobManagerActorSystem.asInstanceOf[ExtendedActorSystem]
- extActor.provider.getDefaultAddress.port match {
- case p: Some[Int] => p.get
- case _ => -1
- }
- } else {
- -1
- }
- }
+ def getLeaderRPCPort: Int = {
+ val index = getLeaderIndex(timeout)
- override def shutdown(): Unit = {
- super.shutdown()
+ jobManagerActorSystems match {
+ case Some(jmActorSystems) =>
+ AkkaUtils.getAddress(jmActorSystems(index)).port match {
+ case Some(p) => p
+ case None => -1
+ }
- if (!singleActorSystem) {
- jobClientActorSystem.shutdown()
+ case None => throw new Exception("The JobManager of the LocalFlinkMiniCluster has not been " +
+ "started properly.")
}
- }
- override def awaitTermination(): Unit = {
- if (!singleActorSystem) {
- jobClientActorSystem.awaitTermination()
- }
- super.awaitTermination()
}
def initializeIOFormatClasses(configuration: Configuration): Unit = {
@@ -178,10 +160,10 @@ class LocalFlinkMiniCluster(
if (config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1) == -1) {
val bufferSizeNew: Int = config.getInteger(
- ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, -1)
+ ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, -1)
val bufferSizeOld: Int = config.getInteger(
- ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, -1)
+ ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, -1)
val bufferSize: Int =
if (bufferSizeNew != -1) {
bufferSizeNew
@@ -194,13 +176,15 @@ class LocalFlinkMiniCluster(
bufferSizeOld
}
- val bufferMem: Long = config.getLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS) * bufferSize.toLong
+ val bufferMem: Long = config.getLong(
+ ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS) * bufferSize.toLong
val numTaskManager = config.getInteger(
- ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
+ ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
- val memoryFraction = config.getFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+ val memoryFraction = config.getFloat(
+ ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
// full memory size
@@ -218,16 +202,12 @@ class LocalFlinkMiniCluster(
}
}
- def getConfiguration: Configuration = {
- this.userConfiguration
- }
-
def getDefaultConfig: Configuration = {
val config: Configuration = new Configuration()
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, hostname)
- config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
// Reduce number of threads for local execution
config.setInteger(NettyConfig.NUM_THREADS_CLIENT, 1)
@@ -235,8 +215,19 @@ class LocalFlinkMiniCluster(
config
}
-}
-object LocalFlinkMiniCluster {
-// val LOG = LoggerFactory.getLogger(classOf[LocalFlinkMiniCluster])
+ protected def getJobManagerName(index: Int): String = {
+ if(singleActorSystem) {
+ JobManager.JOB_MANAGER_NAME + "_" + (index + 1)
+ } else {
+ JobManager.JOB_MANAGER_NAME
+ }
+ }
+ protected def getArchiveName(index: Int): String = {
+ if(singleActorSystem) {
+ JobManager.ARCHIVE_NAME + "_" + (index + 1)
+ } else {
+ JobManager.ARCHIVE_NAME
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index cc8b8ba..529d3d1 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -39,8 +39,10 @@ import grizzled.slf4j.Logger
import org.apache.flink.configuration._
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
+import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService}
+import org.apache.flink.runtime.messages.TaskMessages._
import org.apache.flink.runtime.messages.checkpoint.{NotifyCheckpointComplete, TriggerCheckpoint, AbstractCheckpointMessage}
-import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessages, LogMessages, StreamingMode}
+import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages, StreamingMode}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.blob.{BlobService, BlobCache}
import org.apache.flink.runtime.broadcast.BroadcastVariableManager
@@ -55,17 +57,15 @@ import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync}
import org.apache.flink.runtime.io.network.NetworkEnvironment
import org.apache.flink.runtime.io.network.netty.NettyConfig
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
-import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.memorymanager.{MemoryManager, DefaultMemoryManager}
import org.apache.flink.runtime.messages.Messages._
import org.apache.flink.runtime.messages.RegistrationMessages._
import org.apache.flink.runtime.messages.TaskManagerMessages._
-import org.apache.flink.runtime.messages.TaskMessages._
import org.apache.flink.runtime.net.NetUtils
import org.apache.flink.runtime.process.ProcessReaper
import org.apache.flink.runtime.security.SecurityUtils
import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
-import org.apache.flink.runtime.util.{ZooKeeperUtil, MathUtils, EnvironmentInformation}
+import org.apache.flink.runtime.util.{LeaderRetrievalUtils, MathUtils, EnvironmentInformation}
import scala.concurrent._
import scala.concurrent.duration._
@@ -120,15 +120,15 @@ import scala.language.postfixOps
class TaskManager(
protected val config: TaskManagerConfiguration,
protected val connectionInfo: InstanceConnectionInfo,
- protected val jobManagerAkkaURL: String,
protected val memoryManager: MemoryManager,
protected val ioManager: IOManager,
protected val network: NetworkEnvironment,
- protected val numberOfSlots: Int)
+ protected val numberOfSlots: Int,
+ protected val leaderRetrievalService: LeaderRetrievalService)
extends FlinkActor
- with LeaderSessionMessages // Mixin order is important: second we want to filter leader messages
+ with LeaderSessionMessageFilter // Mixin order is important: We want to filter after logging
with LogMessages // Mixin order is important: first we want to support message logging
-{
+ with LeaderRetrievalListener {
override val log = Logger(getClass)
@@ -152,10 +152,12 @@ class TaskManager(
/** Metric serialization */
private val metricRegistryMapper: ObjectMapper = new ObjectMapper()
- .registerModule(new MetricsModule(TimeUnit.SECONDS,
- TimeUnit.MILLISECONDS,
- false,
- MetricFilter.ALL))
+ .registerModule(
+ new MetricsModule(
+ TimeUnit.SECONDS,
+ TimeUnit.MILLISECONDS,
+ false,
+ MetricFilter.ALL))
/** Actors which want to be notified once this task manager has been
registered at the job manager */
@@ -164,19 +166,18 @@ class TaskManager(
private var blobService: Option[BlobService] = None
private var libraryCacheManager: Option[LibraryCacheManager] = None
protected var currentJobManager: Option[ActorRef] = None
-
+ private var jobManagerAkkaURL: Option[String] = None
+
private var instanceID: InstanceID = null
private var heartbeatScheduler: Option[Cancellable] = None
- protected var leaderSessionID: Option[UUID] = None
+ var leaderSessionID: Option[UUID] = None
- private val currentRegistrationSessionID: UUID = UUID.randomUUID()
private val runtimeInfo = new TaskManagerRuntimeInfo(
connectionInfo.getHostname(),
new UnmodifiableConfiguration(config.configuration))
-
// --------------------------------------------------------------------------
// Actor messages and life cycle
// --------------------------------------------------------------------------
@@ -196,18 +197,13 @@ class TaskManager(
log.info(MemoryLogger.getMemoryUsageStatsAsString(ManagementFactory.getMemoryMXBean))
}
- // kick off the registration
- val deadline: Option[Deadline] = config.maxRegistrationDuration.map(_.fromNow)
-
- self ! decorateMessage(
- TriggerTaskManagerRegistration(
- currentRegistrationSessionID,
- jobManagerAkkaURL,
- TaskManager.INITIAL_REGISTRATION_TIMEOUT,
- deadline,
- 1)
- )
-
+ try {
+ leaderRetrievalService.start(this)
+ } catch {
+ case e: Exception =>
+ log.error("Could not start leader retrieval service.", e)
+ throw new RuntimeException("Could not start leader retrieval service.", e)
+ }
}
/**
@@ -229,6 +225,12 @@ class TaskManager(
}
try {
+ leaderRetrievalService.stop()
+ } catch {
+ case e: Exception => log.error("Leader retrieval service did not shut down properly.")
+ }
+
+ try {
ioManager.shutdown()
} catch {
case t: Exception => log.error("I/O manager did not shutdown properly.", t)
@@ -266,6 +268,10 @@ class TaskManager(
// messages for coordinating checkpoints
case message: AbstractCheckpointMessage => handleCheckpointingMessage(message)
+ case JobManagerLeaderAddress(address, leaderSessionID) => {
+ handleJobManagerLeaderAddress(address, leaderSessionID)
+ }
+
// registration messages for connecting and disconnecting from / to the JobManager
case message: RegistrationMessage => handleRegistrationMessage(message)
@@ -279,7 +285,7 @@ class TaskManager(
// registers the message sender to be notified once this TaskManager has completed
// its registration at the JobManager
- case NotifyWhenRegisteredAtJobManager =>
+ case NotifyWhenRegisteredAtAnyJobManager =>
if (isConnected) {
sender ! decorateMessage(RegisteredAtJobManager)
} else {
@@ -290,6 +296,7 @@ class TaskManager(
case Terminated(actor: ActorRef) =>
if (isConnected && actor == currentJobManager.orNull) {
handleJobManagerDisconnect(sender(), "JobManager is no longer reachable")
+ triggerTaskManagerRegistration()
}
else {
log.warn(s"Received unrecognized disconnect message " +
@@ -298,6 +305,7 @@ class TaskManager(
case Disconnect(msg) =>
handleJobManagerDisconnect(sender(), "JobManager requested disconnect: " + msg)
+ triggerTaskManagerRegistration()
case FatalError(message, cause) =>
killTaskManagerFatal(message, cause)
@@ -361,7 +369,7 @@ class TaskManager(
// was into a terminal state, or in case the JobManager cannot be informed of the
// state transition
- case updateMsg@UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) =>
+ case updateMsg @ UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) =>
// we receive these from our tasks and forward them to the JobManager
currentJobManager foreach {
@@ -379,7 +387,7 @@ class TaskManager(
FailTask(
executionID,
new Exception("Task has been cancelled on the JobManager."))
- )
+ )
}
case Failure(t) =>
@@ -416,15 +424,15 @@ class TaskManager(
val task = runningTasks.get(executionID)
if (task != null) {
task.cancelExecution()
- sender ! decorateMessage(new TaskOperationResult(executionID, true))
+ sender ! decorateMessage(new TaskOperationResult(executionID, true))
} else {
log.debug(s"Cannot find task to cancel for execution ${executionID})")
- sender ! decorateMessage(
- new TaskOperationResult(
- executionID,
- false,
- "No task with that execution ID was found.")
- )
+ sender ! decorateMessage(
+ new TaskOperationResult(
+ executionID,
+ false,
+ "No task with that execution ID was found.")
+ )
}
case PartitionState(taskExecutionId, taskResultId, partitionId, state) =>
@@ -489,148 +497,148 @@ class TaskManager(
* @param message The registration message.
*/
private def handleRegistrationMessage(message: RegistrationMessage): Unit = {
- if(message.registrationSessionID.equals(currentRegistrationSessionID)) {
- message match {
- case TriggerTaskManagerRegistration(
- registrationSessionID,
- jobManagerURL,
- timeout,
- deadline,
- attempt) =>
+ message match {
+ case TriggerTaskManagerRegistration(
+ jobManagerURL,
+ timeout,
+ deadline,
+ attempt) =>
- if (isConnected) {
- // this may be the case, if we queue another attempt and
- // in the meantime, the registration is acknowledged
- log.debug(
- "TaskManager was triggered to register at JobManager, but is already registered")
- }
- else if (deadline.exists(_.isOverdue())) {
- // we failed to register in time. that means we should quit
- log.error("Failed to register at the JobManager withing the defined maximum " +
- "connect time. Shutting down ...")
+ if (isConnected) {
+ // this may be the case, if we queue another attempt and
+ // in the meantime, the registration is acknowledged
+ log.debug(
+ "TaskManager was triggered to register at JobManager, but is already registered")
+ }
+ else if (deadline.exists(_.isOverdue())) {
+ // we failed to register in time. that means we should quit
+ log.error("Failed to register at the JobManager withing the defined maximum " +
+ "connect time. Shutting down ...")
- // terminate ourselves (hasta la vista)
- self ! decorateMessage(PoisonPill)
+ // terminate ourselves (hasta la vista)
+ self ! decorateMessage(PoisonPill)
+ }
+ else {
+ if (!jobManagerAkkaURL.equals(Option(jobManagerURL))) {
+ throw new Exception("Invalid internal state: Trying to register at JobManager " +
+ s"${jobManagerURL} even though the current JobManagerAkkaURL is set to " +
+ s"${jobManagerAkkaURL.getOrElse("")}")
}
- else {
- log.info(s"Trying to register at JobManager ${jobManagerURL} " +
- s"(attempt ${attempt}, timeout: ${timeout})")
-
- val jobManager = context.actorSelection(jobManagerAkkaURL)
- jobManager ! decorateMessage(
- RegisterTaskManager(
- registrationSessionID,
- self,
- connectionInfo,
- resources,
- numberOfSlots)
- )
- // the next timeout computes via exponential backoff with cap
- val nextTimeout = (timeout * 2).min(TaskManager.MAX_REGISTRATION_TIMEOUT)
+ log.info(s"Trying to register at JobManager ${jobManagerURL} " +
+ s"(attempt ${attempt}, timeout: ${timeout})")
- // schedule (with our timeout s delay) a check triggers a new registration
- // attempt, if we are not registered by then
- context.system.scheduler.scheduleOnce(timeout) {
- if (!isConnected) {
- self ! decorateMessage(
- TriggerTaskManagerRegistration(
- registrationSessionID,
- jobManagerURL,
- nextTimeout,
- deadline,
- attempt + 1)
- )
- }
- }(context.dispatcher)
- }
+ val jobManager = context.actorSelection(jobManagerURL)
- // successful registration. associate with the JobManager
- // we disambiguate duplicate or erroneous messages, to simplify debugging
- case AcknowledgeRegistration(_, leaderSessionID, jobManager, id, blobPort) =>
- if (isConnected) {
- if (jobManager == currentJobManager.orNull) {
- log.debug("Ignoring duplicate registration acknowledgement.")
- } else {
- log.warn(s"Ignoring 'AcknowledgeRegistration' message from ${jobManager.path} , " +
- s"because the TaskManager is already registered at ${currentJobManager.orNull}")
- }
+ jobManager ! decorateMessage(
+ RegisterTaskManager(
+ connectionInfo,
+ resources,
+ numberOfSlots)
+ )
+
+ // the next timeout computes via exponential backoff with cap
+ val nextTimeout = (timeout * 2).min(TaskManager.MAX_REGISTRATION_TIMEOUT)
+
+ // schedule (with our timeout s delay) a check triggers a new registration
+ // attempt, if we are not registered by then
+ context.system.scheduler.scheduleOnce(
+ timeout,
+ self,
+ decorateMessage(TriggerTaskManagerRegistration(
+ jobManagerURL,
+ nextTimeout,
+ deadline,
+ attempt + 1)
+ ))(context.dispatcher)
+ }
+
+ // successful registration. associate with the JobManager
+ // we disambiguate duplicate or erroneous messages, to simplify debugging
+ case AcknowledgeRegistration(id, blobPort) =>
+ val jobManager = sender()
+
+ if (isConnected) {
+ if (jobManager == currentJobManager.orNull) {
+ log.debug("Ignoring duplicate registration acknowledgement.")
+ } else {
+ log.warn(s"Ignoring 'AcknowledgeRegistration' message from ${jobManager.path} , " +
+ s"because the TaskManager is already registered at ${currentJobManager.orNull}")
}
- else {
- // not yet connected, so let's associate with that JobManager
- try {
- associateWithJobManager(jobManager, id, blobPort, leaderSessionID)
- } catch {
- case t: Throwable =>
- killTaskManagerFatal(
- "Unable to start TaskManager components after registering at JobManager", t)
- }
+ }
+ else {
+ // not yet connected, so let's associate with that JobManager
+ try {
+ associateWithJobManager(jobManager, id, blobPort)
+ } catch {
+ case t: Throwable =>
+ killTaskManagerFatal(
+ "Unable to start TaskManager components after registering at JobManager", t)
}
+ }
- // we are already registered at that specific JobManager - duplicate answer, rare cases
- case AlreadyRegistered(_, leaderSesssionID, jobManager, id, blobPort) =>
- if (isConnected) {
- if (jobManager == currentJobManager.orNull) {
- log.debug("Ignoring duplicate registration acknowledgement.")
- } else {
- log.warn(s"Received 'AlreadyRegistered' message from " +
- s"JobManager ${jobManager.path}, even through TaskManager is currently " +
- s"registered at ${currentJobManager.orNull}")
- }
- }
- else {
- // not connected, yet, to let's associate
- log.info("Received 'AlreadyRegistered' message before 'AcknowledgeRegistration'")
+ // we are already registered at that specific JobManager - duplicate answer, rare cases
+ case AlreadyRegistered(id, blobPort) =>
+ val jobManager = sender()
- try {
- associateWithJobManager(jobManager, id, blobPort, leaderSesssionID)
- } catch {
- case t: Throwable =>
- killTaskManagerFatal(
- "Unable to start TaskManager components after registering at JobManager", t)
- }
+ if (isConnected) {
+ if (jobManager == currentJobManager.orNull) {
+ log.debug("Ignoring duplicate registration acknowledgement.")
+ } else {
+ log.warn(s"Received 'AlreadyRegistered' message from " +
+ s"JobManager ${jobManager.path}, even through TaskManager is currently " +
+ s"registered at ${currentJobManager.orNull}")
}
+ }
+ else {
+ // not connected, yet, to let's associate
+ log.info("Received 'AlreadyRegistered' message before 'AcknowledgeRegistration'")
+
+ try {
+ associateWithJobManager(jobManager, id, blobPort)
+ } catch {
+ case t: Throwable =>
+ killTaskManagerFatal(
+ "Unable to start TaskManager components after registering at JobManager", t)
+ }
+ }
- case RefuseRegistration(registrationSessionID, reason) =>
- if (currentJobManager.isEmpty) {
- log.error(s"The registration at JobManager ${jobManagerAkkaURL} was refused, " +
- s"because: ${reason}. Retrying later...")
-
- // try the registration again after some time
+ case RefuseRegistration(reason) =>
+ if (currentJobManager.isEmpty) {
+ log.error(s"The registration at JobManager ${jobManagerAkkaURL} was refused, " +
+ s"because: ${reason}. Retrying later...")
- val delay: FiniteDuration = TaskManager.DELAY_AFTER_REFUSED_REGISTRATION
- val deadline: Option[Deadline] = config.maxRegistrationDuration.map {
- timeout => timeout + delay fromNow
- }
+ if(jobManagerAkkaURL.isDefined) {
+ // try the registration again after some time
+ val delay: FiniteDuration = TaskManager.DELAY_AFTER_REFUSED_REGISTRATION
+ val deadline: Option[Deadline] = config.maxRegistrationDuration.map {
+ timeout => timeout + delay fromNow
+ }
- context.system.scheduler.scheduleOnce(delay) {
- self ! decorateMessage(
- TriggerTaskManagerRegistration(
- registrationSessionID,
- jobManagerAkkaURL,
- TaskManager.INITIAL_REGISTRATION_TIMEOUT,
- deadline,
- 1)
- )
- }(context.dispatcher)
+ context.system.scheduler.scheduleOnce(delay) {
+ self ! decorateMessage(
+ TriggerTaskManagerRegistration(
+ jobManagerAkkaURL.get,
+ TaskManager.INITIAL_REGISTRATION_TIMEOUT,
+ deadline,
+ 1)
+ )
+ }(context.dispatcher)
+ }
+ }
+ else {
+ // ignore RefuseRegistration messages which arrived after AcknowledgeRegistration
+ if (sender() == currentJobManager.orNull) {
+ log.warn(s"Received 'RefuseRegistration' from the JobManager (${sender().path})" +
+ s" even though this TaskManager is already registered there.")
}
else {
- // ignore RefuseRegistration messages which arrived after AcknowledgeRegistration
- if (sender() == currentJobManager.orNull) {
- log.warn(s"Received 'RefuseRegistration' from the JobManager (${sender().path})" +
- s" even though this TaskManager is already registered there.")
- }
- else {
- log.warn(s"Ignoring 'RefuseRegistration' from unrelated " +
- s"JobManager (${sender().path})")
- }
+ log.warn(s"Ignoring 'RefuseRegistration' from unrelated " +
+ s"JobManager (${sender().path})")
}
+ }
- case _ => unhandled(message)
- }
- } else {
- log.debug(s"Discarded registration message ${message}, because the registration session " +
- "ID was not correct.")
+ case _ => unhandled(message)
}
}
@@ -644,7 +652,7 @@ class TaskManager(
*
* @return True, if the TaskManager is currently connected to a JobManager, false otherwise.
*/
- private def isConnected : Boolean = currentJobManager.isDefined
+ protected def isConnected : Boolean = currentJobManager.isDefined
/**
* Associates the TaskManager with the given JobManager. After this
@@ -655,13 +663,11 @@ class TaskManager(
* @param id The instanceID under which the TaskManager is registered
* at the JobManager.
* @param blobPort The JobManager's port for the BLOB server.
- * @param newLeaderSessionID Leader session ID of the JobManager
*/
private def associateWithJobManager(
jobManager: ActorRef,
id: InstanceID,
- blobPort: Int,
- newLeaderSessionID: UUID)
+ blobPort: Int)
: Unit = {
if (jobManager == null) {
@@ -673,10 +679,6 @@ class TaskManager(
if (blobPort <= 0 || blobPort > 65535) {
throw new IllegalArgumentException("blob port is out of range: " + blobPort)
}
- if(newLeaderSessionID == null) {
- throw new NullPointerException("Leader session ID must not be null.")
- }
-
// sanity check that we are not currently registered with a different JobManager
if (isConnected) {
@@ -703,13 +705,12 @@ class TaskManager(
currentJobManager = Some(jobManager)
instanceID = id
- leaderSessionID = Some(newLeaderSessionID)
// start the network stack, now that we have the JobManager actor reference
try {
network.associateWithTaskManagerAndJobManager(
- new AkkaActorGateway(jobManager, leaderSessionID),
- new AkkaActorGateway(self, leaderSessionID)
+ new AkkaActorGateway(jobManager, leaderSessionID.orNull),
+ new AkkaActorGateway(self, leaderSessionID.orNull)
)
@@ -791,7 +792,7 @@ class TaskManager(
// de-register from the JobManager (faster detection of disconnect)
currentJobManager foreach {
- _ ! decorateMessage(Disconnect(s"TaskManager ${self.path} is shutting down."))
+ _ ! decorateMessage(Disconnect(s"TaskManager ${self.path} is disassociating"))
}
currentJobManager = None
@@ -812,13 +813,14 @@ class TaskManager(
network.disassociate()
}
- private def handleJobManagerDisconnect(jobManager: ActorRef, msg: String): Unit = {
+ protected def handleJobManagerDisconnect(jobManager: ActorRef, msg: String): Unit = {
if (isConnected && jobManager != null) {
// check if it comes from our JobManager
if (jobManager == currentJobManager.orNull) {
try {
- val message = "Disconnecting from JobManager: " + msg
+ val message = s"TaskManager ${self.path} disconnects from JobManager " +
+ s"${jobManager.path}: " + msg
log.info(message)
// cancel all our tasks with a proper error message
@@ -826,17 +828,6 @@ class TaskManager(
// reset our state to disassociated
disassociateFromJobManager()
-
- // begin attempts to reconnect
- val deadline: Option[Deadline] = config.maxRegistrationDuration.map(_.fromNow)
- self ! decorateMessage(
- TriggerTaskManagerRegistration(
- currentRegistrationSessionID,
- jobManagerAkkaURL,
- TaskManager.INITIAL_REGISTRATION_TIMEOUT,
- deadline,
- 1)
- )
}
catch {
// this is pretty bad, it leaves the TaskManager in a state where it cannot
@@ -883,8 +874,8 @@ class TaskManager(
// create the task. this does not grab any TaskManager resources or download
// and libraries - the operation does not block
- val jobManagerGateway = new AkkaActorGateway(jobManagerActor, leaderSessionID)
- val selfGateway = new AkkaActorGateway(self, leaderSessionID)
+ val jobManagerGateway = new AkkaActorGateway(jobManagerActor, leaderSessionID.orNull)
+ val selfGateway = new AkkaActorGateway(self, leaderSessionID.orNull)
val task = new Task(
tdd,
@@ -1108,6 +1099,58 @@ class TaskManager(
self ! decorateMessage(Kill)
}
+
+ override def notifyLeaderAddress(leaderAddress: String, leaderSessionID: UUID): Unit = {
+ self ! JobManagerLeaderAddress(leaderAddress, leaderSessionID)
+ }
+
+ /** Handles the notification about a new leader and its address. If the TaskManager is still
+ * connected to another JobManager, it first disconnects from it. If the new JobManager
+ * address is not null, then it starts the registration process.
+ *
+ * @param newJobManagerAkkaURL
+ * @param leaderSessionID
+ */
+ private def handleJobManagerLeaderAddress(
+ newJobManagerAkkaURL: String,
+ leaderSessionID: UUID)
+ : Unit = {
+
+ currentJobManager match {
+ case Some(jm) =>
+ handleJobManagerDisconnect(jm, s"JobManager ${newJobManagerAkkaURL} was elected as leader.")
+ case None =>
+ }
+
+ this.jobManagerAkkaURL = Option(newJobManagerAkkaURL)
+ this.leaderSessionID = Option(leaderSessionID)
+
+ triggerTaskManagerRegistration()
+ }
+
+ /** Starts the TaskManager's registration process to connect to the JobManager.
+ *
+ */
+ def triggerTaskManagerRegistration(): Unit = {
+ if(jobManagerAkkaURL.isDefined) {
+ // begin attempts to reconnect
+ val deadline: Option[Deadline] = config.maxRegistrationDuration.map(_.fromNow)
+
+ self ! decorateMessage(
+ TriggerTaskManagerRegistration(
+ jobManagerAkkaURL.get,
+ TaskManager.INITIAL_REGISTRATION_TIMEOUT,
+ deadline,
+ 1)
+ )
+ }
+ }
+
+ override def handleError(exception: Exception): Unit = {
+ log.error("Error in leader retrieval service", exception)
+
+ self ! decorateMessage(PoisonPill)
+ }
}
/**
@@ -1278,12 +1321,7 @@ object TaskManager {
taskManagerClass: Class[_ <: TaskManager])
: Unit = {
- val (jobManagerHostname, jobManagerPort) = getAndCheckJobManagerAddress(configuration)
-
- val (taskManagerHostname, actorSystemPort) = selectNetworkInterfaceAndPort(
- configuration,
- jobManagerHostname,
- jobManagerPort)
+ val (taskManagerHostname, actorSystemPort) = selectNetworkInterfaceAndPort(configuration)
runTaskManager(
taskManagerHostname,
@@ -1296,9 +1334,7 @@ object TaskManager {
@throws(classOf[IOException])
@throws(classOf[IllegalConfigurationException])
def selectNetworkInterfaceAndPort(
- configuration: Configuration,
- jobManagerHostname: String,
- jobManagerPort: Int)
+ configuration: Configuration)
: (String, Int) = {
var taskManagerHostname = configuration.getString(
@@ -1308,25 +1344,12 @@ object TaskManager {
LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname)
}
else {
- // try to find out the hostname of the interface from which the TaskManager
- // can connect to the JobManager. This involves a reverse name lookup
- LOG.info("Trying to select the network interface and address to use " +
- "by connecting to the configured JobManager.")
-
- LOG.info(s"TaskManager will try to connect for $MAX_STARTUP_CONNECT_TIME milliseconds " +
- s"before falling back to heuristics")
-
- val jobManagerAddress = new InetSocketAddress(jobManagerHostname, jobManagerPort)
- val taskManagerAddress = try {
- // try to get the address for up to two minutes and start
- // logging only after ten seconds
- NetUtils.findConnectingAddress(jobManagerAddress,
- MAX_STARTUP_CONNECT_TIME, STARTUP_CONNECT_LOG_SUPPRESS)
- }
- catch {
- case t: Throwable => throw new IOException("TaskManager cannot find a network interface " +
- "that can communicate with the JobManager (" + jobManagerAddress + ")", t)
- }
+ val leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration)
+ val lookupTimeout = AkkaUtils.getLookupTimeout(configuration)
+
+ val taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(
+ leaderRetrievalService,
+ lookupTimeout)
taskManagerHostname = taskManagerAddress.getHostName()
LOG.info(s"TaskManager will use hostname/address '${taskManagerHostname}' " +
@@ -1439,7 +1462,8 @@ object TaskManager {
taskManagerSystem,
taskManagerHostname,
Some(TASK_MANAGER_NAME),
- None, false,
+ None,
+ false,
streamingMode,
taskManagerClass)
@@ -1489,9 +1513,9 @@ object TaskManager {
* @param taskManagerHostname The hostname/address that describes the TaskManager's data location.
* @param taskManagerActorName Optionally the name of the TaskManager actor. If none is given,
* the actor will use a random name.
- * @param jobManagerPath Optionally, the JobManager actor path may be provided. If none is
- * provided, the method will construct it automatically from the
- * JobManager hostname an port specified in the configuration.
+ * @param leaderRetrievalServiceOption Optionally, a leader retrieval service can be provided. If
+ * none is given, then a LeaderRetrievalService is
+ * constructed from the configuration.
* @param localTaskManagerCommunication If true, the TaskManager will not initiate the
* TCP network stack.
* @param streamingMode The streaming mode to start the TaskManager in (streaming/batch-only)
@@ -1516,20 +1540,13 @@ object TaskManager {
actorSystem: ActorSystem,
taskManagerHostname: String,
taskManagerActorName: Option[String],
- jobManagerPath: Option[String],
+ leaderRetrievalServiceOption: Option[LeaderRetrievalService],
localTaskManagerCommunication: Boolean,
streamingMode: StreamingMode,
taskManagerClass: Class[_ <: TaskManager])
: ActorRef = {
- // get and check the JobManager config
- val jobManagerAkkaUrl: String = jobManagerPath.getOrElse {
- val (jobManagerHostname, jobManagerPort) = getAndCheckJobManagerAddress(configuration)
- val hostPort = new InetSocketAddress(jobManagerHostname, jobManagerPort)
- JobManager.getRemoteJobManagerAkkaURL(hostPort)
- }
-
- val (taskManagerConfig : TaskManagerConfiguration,
+ val (taskManagerConfig : TaskManagerConfiguration,
netConfig: NetworkEnvironmentConfiguration,
connectionInfo: InstanceConnectionInfo
) = parseTaskManagerConfiguration(
@@ -1596,9 +1613,9 @@ object TaskManager {
// start the I/O manager last, it will create some temp directories.
val ioManager: IOManager = new IOManagerAsync(taskManagerConfig.tmpDirPaths)
- if (ZooKeeperUtil.isJobManagerHighAvailabilityEnabled(configuration)) {
- // TODO @removeme @tillrohrmann Setup leader retrieval service
- LOG.info("HA mode.")
+ val leaderRetrievalService = leaderRetrievalServiceOption match {
+ case Some(lrs) => lrs
+ case None => LeaderRetrievalUtils.createLeaderRetrievalService(configuration)
}
// create the actor properties (which define the actor constructor parameters)
@@ -1606,11 +1623,11 @@ object TaskManager {
taskManagerClass,
taskManagerConfig,
connectionInfo,
- jobManagerAkkaUrl,
memoryManager,
ioManager,
network,
- taskManagerConfig.numberOfSlots)
+ taskManagerConfig.numberOfSlots,
+ leaderRetrievalService)
taskManagerActorName match {
case Some(actorName) => actorSystem.actorOf(tmProps, actorName)
@@ -1639,7 +1656,7 @@ object TaskManager {
timeout: FiniteDuration)
: ActorRef = {
try {
- val future = AkkaUtils.getReference(taskManagerUrl, system, timeout)
+ val future = AkkaUtils.getActorRefFuture(taskManagerUrl, system, timeout)
Await.result(future, timeout)
}
catch {
@@ -1756,8 +1773,13 @@ object TaskManager {
val nettyConfig = if (localTaskManagerCommunication) {
None
} else {
- Some(new NettyConfig(
- connectionInfo.address(), connectionInfo.dataPort(), pageSize, configuration))
+ Some(
+ new NettyConfig(
+ connectionInfo.address(),
+ connectionInfo.dataPort(),
+ pageSize,
+ configuration)
+ )
}
// Default spill I/O mode for intermediate results
@@ -1822,7 +1844,7 @@ object TaskManager {
* @param configuration The configuration to read the config values from.
* @return A 2-tuple (hostname, port).
*/
- private def getAndCheckJobManagerAddress(configuration: Configuration) : (String, Int) = {
+ def getAndCheckJobManagerAddress(configuration: Configuration) : (String, Int) = {
val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
index 324b014..3ffc68f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import scala.Option;
import java.util.UUID;
@@ -51,10 +50,14 @@ public class FlinkUntypedActorTest {
JavaTestKit.shutdownActorSystem(actorSystem);
}
+ /**
+ * Tests that LeaderSessionMessage messages with a wrong leader session ID are filtered
+ * out.
+ */
@Test
public void testLeaderSessionMessageFilteringOfFlinkUntypedActor() {
- final Option<UUID> leaderSessionID = Option.apply(UUID.randomUUID());
- final Option<UUID> oldSessionID = Option.apply(UUID.randomUUID());
+ final UUID leaderSessionID = UUID.randomUUID();
+ final UUID oldSessionID = UUID.randomUUID();
TestActorRef<PlainFlinkUntypedActor> actor = null;
@@ -76,9 +79,13 @@ public class FlinkUntypedActorTest {
}
}
+ /**
+ * Tests that an exception is thrown, when the FlinkUntypedActore receives a message which
+ * extends {@link RequiresLeaderSessionID} and is not wrapped in a LeaderSessionMessage.
+ */
@Test
public void testThrowingExceptionWhenReceivingNonWrappedRequiresLeaderSessionIDMessage() {
- final Option<UUID> leaderSessionID = Option.apply(UUID.randomUUID());
+ final UUID leaderSessionID = UUID.randomUUID();
TestActorRef<PlainFlinkUntypedActor> actor = null;
@@ -95,7 +102,8 @@ public class FlinkUntypedActorTest {
"message was sent without being wrapped in LeaderSessionMessage.");
} catch (Exception e) {
assertEquals("Received a message PlainRequiresLeaderSessionID " +
- "without a leader session ID, even though it requires to have one.",
+ "without a leader session ID, even though the message requires a " +
+ "leader session ID.",
e.getMessage());
}
@@ -110,14 +118,14 @@ public class FlinkUntypedActorTest {
}
}
-
static class PlainFlinkUntypedActor extends FlinkUntypedActor {
- private Option<UUID> leaderSessionID;
+ private UUID leaderSessionID;
private int messageCounter;
- public PlainFlinkUntypedActor(Option<UUID> leaderSessionID) {
+ public PlainFlinkUntypedActor(UUID leaderSessionID) {
+
this.leaderSessionID = leaderSessionID;
this.messageCounter = 0;
}
@@ -128,7 +136,7 @@ public class FlinkUntypedActorTest {
}
@Override
- protected Option<UUID> getLeaderSessionID() {
+ protected UUID getLeaderSessionID() {
return leaderSessionID;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index b124304..f6e4ab8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -30,6 +31,7 @@ import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Test;
import scala.concurrent.Await;
@@ -49,8 +51,9 @@ public class CoordinatorShutdownTest {
LocalFlinkMiniCluster cluster = null;
try {
Configuration noTaskManagerConfig = new Configuration();
- noTaskManagerConfig.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 0);
+ noTaskManagerConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 0);
cluster = new LocalFlinkMiniCluster(noTaskManagerConfig, true);
+ cluster.start();
// build a test graph with snapshotting enabled
JobVertex vertex = new JobVertex("Test Vertex");
@@ -60,17 +63,19 @@ public class CoordinatorShutdownTest {
JobGraph testGraph = new JobGraph("test job", vertex);
testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 5000));
- ActorGateway jobManager = cluster.getJobManagerGateway();
+ ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
FiniteDuration timeout = new FiniteDuration(60, TimeUnit.SECONDS);
- JobManagerMessages.SubmitJob submitMessage = new JobManagerMessages.SubmitJob(testGraph, false);
+ JobManagerMessages.SubmitJob submitMessage = new JobManagerMessages.SubmitJob(
+ testGraph,
+ ListeningBehaviour.EXECUTION_RESULT);
// submit is successful, but then the job dies because no TaskManager / slot is available
- Future<Object> submitFuture = jobManager.ask(submitMessage, timeout);
+ Future<Object> submitFuture = jmGateway.ask(submitMessage, timeout);
Await.result(submitFuture, timeout);
// get the execution graph and make sure the coordinator is properly shut down
- Future<Object> jobRequestFuture = jobManager.ask(
+ Future<Object> jobRequestFuture = jmGateway.ask(
new JobManagerMessages.RequestJob(testGraph.getJobID()),
timeout);
@@ -99,6 +104,7 @@ public class CoordinatorShutdownTest {
LocalFlinkMiniCluster cluster = null;
try {
cluster = new LocalFlinkMiniCluster(new Configuration(), true);
+ cluster.start();
// build a test graph with snapshotting enabled
JobVertex vertex = new JobVertex("Test Vertex");
@@ -108,17 +114,19 @@ public class CoordinatorShutdownTest {
JobGraph testGraph = new JobGraph("test job", vertex);
testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 5000));
- ActorGateway jobManager = cluster.getJobManagerGateway();
+ ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
FiniteDuration timeout = new FiniteDuration(60, TimeUnit.SECONDS);
- JobManagerMessages.SubmitJob submitMessage = new JobManagerMessages.SubmitJob(testGraph, false);
+ JobManagerMessages.SubmitJob submitMessage = new JobManagerMessages.SubmitJob(
+ testGraph,
+ ListeningBehaviour.EXECUTION_RESULT);
// submit is successful, but then the job dies because no TaskManager / slot is available
- Future<Object> submitFuture = jobManager.ask(submitMessage, timeout);
+ Future<Object> submitFuture = jmGateway.ask(submitMessage, timeout);
Await.result(submitFuture, timeout);
// get the execution graph and make sure the coordinator is properly shut down
- Future<Object> jobRequestFuture = jobManager.ask(
+ Future<Object> jobRequestFuture = jmGateway.ask(
new JobManagerMessages.RequestJob(testGraph.getJobID()),
timeout);
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index e9b67af..2daa62e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -35,7 +35,8 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.messages.Messages;
-import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
+import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -574,9 +575,9 @@ public class ExecutionVertexCancelTest {
@Override
public Object handleMessage(Object message) throws Exception {
Object result;
- if(message instanceof TaskMessages.SubmitTask) {
+ if(message instanceof SubmitTask) {
result = Messages.getAcknowledge();
- } else if(message instanceof TaskMessages.CancelTask) {
+ } else if(message instanceof CancelTask) {
index++;
if(index >= results.length){
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java
index 2e62781..62bc96b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.instance;
import akka.actor.ActorRef;
import akka.dispatch.Futures;
-import scala.Option;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
@@ -114,7 +113,7 @@ abstract public class BaseTestingActorGateway implements ActorGateway {
}
@Override
- public Option<UUID> leaderSessionID() {
- return Option.empty();
+ public UUID leaderSessionID() {
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyActorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyActorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyActorGateway.java
index 10762f2..3d27611 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyActorGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyActorGateway.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.instance;
import akka.actor.ActorRef;
-import scala.Option;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
@@ -62,7 +61,7 @@ public class DummyActorGateway implements ActorGateway {
}
@Override
- public Option<UUID> leaderSessionID() {
- return Option.<UUID>empty();
+ public UUID leaderSessionID() {
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
index 2c01f1f..a0166eb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
@@ -42,8 +42,6 @@ import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import scala.Option;
-import scala.Some;
/**
* Tests for {@link org.apache.flink.runtime.instance.InstanceManager}.
@@ -52,7 +50,7 @@ public class InstanceManagerTest{
static ActorSystem system;
- static Option<UUID> leaderSessionID = new Some<UUID>(UUID.randomUUID());
+ static UUID leaderSessionID = UUID.randomUUID();
@BeforeClass
public static void setup(){
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
index 6978225..1515f83 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.CancelPartitionRequestTest.InfiniteSubpartitionView;
import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
-import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index e9f3a62..fdbffaa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -51,13 +51,15 @@ public class PartialConsumePipelinedResultTest {
@BeforeClass
public static void setUp() throws Exception {
final Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUMBER_OF_TMS);
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUMBER_OF_TMS);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUMBER_OF_SLOTS_PER_TM);
config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, NUMBER_OF_NETWORK_BUFFERS);
flink = new TestingCluster(config, true);
+ flink.start();
+
jobClient = JobClient.startJobClientActorSystem(flink.configuration());
}
@@ -102,7 +104,7 @@ public class PartialConsumePipelinedResultTest {
JobClient.submitJobAndWait(
jobClient,
- flink.getJobManagerGateway(),
+ flink.getLeaderGateway(TestingUtils.TESTING_DURATION()),
jobGraph,
TestingUtils.TESTING_DURATION(),
false, this.getClass().getClassLoader());
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 938c2ad..cd56318 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -24,13 +24,10 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.ConnectionID;
-import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
index be73bf5..b2aaab0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
@@ -101,7 +101,7 @@ public class JobManagerProcessReapingTest {
Throwable lastError = null;
for (int i = 0; i < 40; i++) {
try {
- jobManagerRef = JobManager.getJobManagerRemoteReference(
+ jobManagerRef = JobManager.getJobManagerActorRef(
new InetSocketAddress("localhost", jobManagerPort),
localSystem, new FiniteDuration(25, TimeUnit.SECONDS));
break;
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 217f46e..db6df75 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -26,6 +26,7 @@ import com.typesafe.config.Config;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -49,16 +50,15 @@ import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.Execution
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import scala.Option;
import scala.Some;
import scala.Tuple2;
import java.net.InetAddress;
-import java.util.UUID;
import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
import static org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT;
@@ -121,13 +121,18 @@ public class JobManagerTest {
final JobGraph jobGraph = new JobGraph("Blocking test job", sender);
final JobID jid = jobGraph.getJobID();
- final ActorGateway jobManagerGateway = cluster.getJobManagerGateway();
+ final ActorGateway jobManagerGateway = cluster.getLeaderGateway(
+ TestingUtils.TESTING_DURATION());
// we can set the leader session ID to None because we don't use this gateway to send messages
- final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), Option.<UUID>empty());
+ final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null);
// Submit the job and wait for all vertices to be running
- jobManagerGateway.tell(new SubmitJob(jobGraph, false), testActorGateway);
+ jobManagerGateway.tell(
+ new SubmitJob(
+ jobGraph,
+ ListeningBehaviour.EXECUTION_RESULT),
+ testActorGateway);
expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
jobManagerGateway.tell(
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 560fc30..bba1460 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -20,16 +20,21 @@ package org.apache.flink.runtime.jobmanager;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -54,13 +59,18 @@ public class JobSubmitTest {
private static final FiniteDuration timeout = new FiniteDuration(5000, TimeUnit.MILLISECONDS);
private static ActorSystem jobManagerSystem;
- private static ActorGateway jobManager;
+ private static ActorGateway jmGateway;
@BeforeClass
public static void setupJobManager() {
Configuration config = new Configuration();
- scala.Option<Tuple2<String, Object>> listeningAddress = scala.Option.empty();
+ int port = NetUtils.getAvailablePort();
+
+ config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+ config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
+
+ scala.Option<Tuple2<String, Object>> listeningAddress = scala.Option.apply(new Tuple2<String, Object>("localhost", port));
jobManagerSystem = AkkaUtils.createActorSystem(config, listeningAddress);
ActorRef jobManagerActorRef = JobManager.startJobManagerActors(
config,
@@ -68,12 +78,16 @@ public class JobSubmitTest {
StreamingMode.BATCH_ONLY)._1();
try {
- jobManager = JobManager.getJobManagerGateway(jobManagerActorRef, timeout);
+ LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config);
+
+ jmGateway = LeaderRetrievalUtils.retrieveLeaderGateway(
+ lrs,
+ jobManagerSystem,
+ timeout
+ );
} catch (Exception e) {
fail("Could not retrieve the JobManager gateway. " + e.getMessage());
}
-
-
}
@AfterClass
@@ -92,7 +106,7 @@ public class JobSubmitTest {
JobGraph jg = new JobGraph("test job", jobVertex);
// request the blob port from the job manager
- Future<Object> future = jobManager.ask(JobManagerMessages.getRequestBlobManagerPort(), timeout);
+ Future<Object> future = jmGateway.ask(JobManagerMessages.getRequestBlobManagerPort(), timeout);
int blobPort = (Integer) Await.result(future, timeout);
// upload two dummy bytes and add their keys to the job graph as dependencies
@@ -113,7 +127,11 @@ public class JobSubmitTest {
jg.addBlob(key2);
// submit the job
- Future<Object> submitFuture = jobManager.ask(new JobManagerMessages.SubmitJob(jg, false), timeout);
+ Future<Object> submitFuture = jmGateway.ask(
+ new JobManagerMessages.SubmitJob(
+ jg,
+ ListeningBehaviour.EXECUTION_RESULT),
+ timeout);
try {
Await.result(submitFuture, timeout);
}
@@ -152,7 +170,11 @@ public class JobSubmitTest {
JobGraph jg = new JobGraph("test job", jobVertex);
// submit the job
- Future<Object> submitFuture = jobManager.ask(new JobManagerMessages.SubmitJob(jg, false), timeout);
+ Future<Object> submitFuture = jmGateway.ask(
+ new JobManagerMessages.SubmitJob(
+ jg,
+ ListeningBehaviour.EXECUTION_RESULT),
+ timeout);
try {
Await.result(submitFuture, timeout);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
new file mode 100644
index 0000000..e6067d0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
+import org.apache.curator.test.TestingCluster;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.instance.InstanceManager;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.LeaderElectionUtils;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+public class JobManagerLeaderElectionTest extends TestLogger {
+
+ private static ActorSystem actorSystem;
+ private static TestingCluster testingCluster;
+ private static Timeout timeout = new Timeout(TestingUtils.TESTING_DURATION());
+ private static FiniteDuration duration = new FiniteDuration(5, TimeUnit.MINUTES);
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ actorSystem = ActorSystem.create("TestingActorSystem");
+
+ testingCluster = new TestingCluster(3);
+ testingCluster.start();
+ }
+
+ @AfterClass
+ public static void teardown() throws Exception {
+ if (actorSystem != null) {
+ JavaTestKit.shutdownActorSystem(actorSystem);
+ }
+
+ if(testingCluster != null) {
+ testingCluster.stop();
+ }
+ }
+
+ /**
+ * Tests that a single JobManager is elected as the leader by ZooKeeper.
+ */
+ @Test
+ public void testLeaderElection() throws Exception {
+ final Configuration configuration = new Configuration();
+
+ configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+ configuration.setString(
+ ConfigConstants.ZOOKEEPER_QUORUM_KEY,
+ testingCluster.getConnectString());
+
+ ActorRef jm = null;
+
+ try {
+ Props jmProps = createJobManagerProps(configuration);
+
+ jm = actorSystem.actorOf(jmProps);
+
+ Future<Object> leaderFuture = Patterns.ask(
+ jm,
+ TestingJobManagerMessages.getNotifyWhenLeader(),
+ timeout);
+
+ Await.ready(leaderFuture, duration);
+ } finally {
+ TestingUtils.stopActor(jm);
+ }
+
+ }
+
+ /**
+ * Tests that a second JobManager is elected as the leader once the previous leader dies.
+ */
+ @Test
+ public void testLeaderReelection() throws Exception {
+ final Configuration configuration = new Configuration();
+
+ configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+ configuration.setString(
+ ConfigConstants.ZOOKEEPER_QUORUM_KEY,
+ testingCluster.getConnectString());
+
+ ActorRef jm;
+ ActorRef jm2 = null;
+
+ try {
+ Props jmProps = createJobManagerProps(configuration);
+
+ jm = actorSystem.actorOf(jmProps);
+
+ Future<Object> leaderFuture = Patterns.ask(
+ jm,
+ TestingJobManagerMessages.getNotifyWhenLeader(),
+ timeout);
+
+ Await.ready(leaderFuture, duration);
+
+ Props jmProps2 = createJobManagerProps(configuration);
+
+ jm2 = actorSystem.actorOf(jmProps2);
+
+ jm.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+ // now the second JobManager should be elected as the leader
+ Future<Object> leader2Future = Patterns.ask(
+ jm2,
+ TestingJobManagerMessages.getNotifyWhenLeader(),
+ timeout
+ );
+
+ Await.ready(leader2Future, duration);
+ } finally {
+ TestingUtils.stopActor(jm2);
+ }
+ }
+
+ private Props createJobManagerProps(Configuration configuration) throws Exception {
+ LeaderElectionService leaderElectionService = LeaderElectionUtils.
+ createLeaderElectionService(configuration);
+
+ return Props.create(
+ TestingJobManager.class,
+ configuration,
+ TestingUtils.defaultExecutionContext(),
+ new InstanceManager(),
+ new Scheduler(TestingUtils.defaultExecutionContext()),
+ new BlobLibraryCacheManager(new BlobServer(configuration), 10l),
+ ActorRef.noSender(),
+ 1,
+ 1L,
+ AkkaUtils.getDefaultTimeout(),
+ StreamingMode.BATCH_ONLY,
+ leaderElectionService
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
new file mode 100644
index 0000000..8dd380e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.Tasks;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.*;
+
+public class LeaderChangeStateCleanupTest extends TestLogger {
+
+ private static FiniteDuration timeout = TestingUtils.TESTING_DURATION();
+
+ private int numJMs = 2;
+ private int numTMs = 2;
+ private int numSlotsPerTM = 2;
+ private int parallelism = numTMs * numSlotsPerTM;
+
+ private Configuration configuration;
+ private LeaderElectionRetrievalTestingCluster cluster = null;
+ private JobGraph job = createBlockingJob(parallelism);
+
+ @Before
+ public void before() throws Exception {
+ Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
+
+ configuration = new Configuration();
+
+ configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
+ configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
+ configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
+
+ cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false, StreamingMode.BATCH_ONLY);
+ cluster.start(false); // TaskManagers don't have to register at the JobManager
+
+ cluster.waitForActorsToBeAlive(); // we only wait until all actors are alive
+ }
+
+ @After
+ public void after() {
+ if(cluster != null) {
+ cluster.stop();
+ }
+ }
+
+ /**
+ * Tests that a job is properly canceled in the case of a leader change. In such an event all
+ * TaskManagers have to disconnect from the previous leader and connect to the newly elected
+ * leader.
+ */
+ @Test
+ public void testStateCleanupAfterNewLeaderElectionAndListenerNotification() throws Exception {
+ UUID leaderSessionID1 = UUID.randomUUID();
+ UUID leaderSessionID2 = UUID.randomUUID();
+
+ // first make JM(0) the leader
+ cluster.grantLeadership(0, leaderSessionID1);
+ // notify all listeners
+ cluster.notifyRetrievalListeners(0, leaderSessionID1);
+
+ cluster.waitForTaskManagersToBeRegistered();
+
+ // submit blocking job so that it is not finished when we cancel it
+ cluster.submitJobDetached(job);
+
+ ActorGateway jm = cluster.getLeaderGateway(timeout);
+
+ Future<Object> wait = jm.ask(new WaitForAllVerticesToBeRunningOrFinished(job.getJobID()), timeout);
+
+ Await.ready(wait, timeout);
+
+ Future<Object> jobRemoval = jm.ask(new NotifyWhenJobRemoved(job.getJobID()), timeout);
+
+ // make the JM(1) the new leader
+ cluster.grantLeadership(1, leaderSessionID2);
+ // notify all listeners about the event
+ cluster.notifyRetrievalListeners(1, leaderSessionID2);
+
+ Await.ready(jobRemoval, timeout);
+
+ cluster.waitForTaskManagersToBeRegistered();
+
+ ActorGateway jm2 = cluster.getLeaderGateway(timeout);
+
+ Future<Object> futureNumberSlots = jm2.ask(JobManagerMessages.getRequestTotalNumberOfSlots(), timeout);
+
+ // check that all TMs have registered at the new leader
+ int numberSlots = (Integer)Await.result(futureNumberSlots, timeout);
+
+ assertEquals(parallelism, numberSlots);
+
+ // try to resubmit now the non-blocking job, it should complete successfully
+ Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(false);
+ cluster.submitJobAndWait(job, false, timeout);
+ }
+
+ /**
+ * Tests that a job is properly canceled in the case of a leader change. However, this time only the
+ * JMs are notified about the leader change and the TMs still believe the old leader to have
+ * leadership.
+ */
+ @Test
+ public void testStateCleanupAfterNewLeaderElection() throws Exception {
+ UUID leaderSessionID = UUID.randomUUID();
+ UUID newLeaderSessionID = UUID.randomUUID();
+
+ cluster.grantLeadership(0, leaderSessionID);
+ cluster.notifyRetrievalListeners(0, leaderSessionID);
+
+ cluster.waitForTaskManagersToBeRegistered();
+
+ // submit blocking job so that we can test job clean up
+ cluster.submitJobDetached(job);
+
+ ActorGateway jm = cluster.getLeaderGateway(timeout);
+
+ Future<Object> wait = jm.ask(new WaitForAllVerticesToBeRunningOrFinished(job.getJobID()), timeout);
+
+ Await.ready(wait, timeout);
+
+ Future<Object> jobRemoval = jm.ask(new NotifyWhenJobRemoved(job.getJobID()), timeout);
+
+ // only notify the JMs about the new leader JM(1)
+ cluster.grantLeadership(1, newLeaderSessionID);
+
+ // job should be removed anyway
+ Await.ready(jobRemoval, timeout);
+ }
+
+ /**
+ * Tests that a job is properly canceled in the event of a leader change. However, this time
+ * only the TMs are notified about the changing leader. This should be enough to cancel the
+ * currently running job, though.
+ */
+ @Test
+ public void testStateCleanupAfterListenerNotification() throws Exception {
+ UUID leaderSessionID = UUID.randomUUID();
+ UUID newLeaderSessionID = UUID.randomUUID();
+
+ cluster.grantLeadership(0, leaderSessionID);
+ cluster.notifyRetrievalListeners(0, leaderSessionID);
+
+ cluster.waitForTaskManagersToBeRegistered();
+
+ // submit blocking job
+ cluster.submitJobDetached(job);
+
+ ActorGateway jm = cluster.getLeaderGateway(timeout);
+
+ Future<Object> wait = jm.ask(new WaitForAllVerticesToBeRunningOrFinished(job.getJobID()), timeout);
+
+ Await.ready(wait, timeout);
+
+ Future<Object> jobRemoval = jm.ask(new NotifyWhenJobRemoved(job.getJobID()), timeout);
+
+ // notify listeners (TMs) about the leader change
+ cluster.notifyRetrievalListeners(1, newLeaderSessionID);
+
+ Await.ready(jobRemoval, timeout);
+ }
+
+ /**
+ * Tests that the same JobManager can be reelected as the leader. Even though, the same JM
+ * is elected as the next leader, all currently running jobs should be canceled properly and
+ * all TMs should disconnect from the leader and then reconnect to it.
+ */
+ @Test
+ public void testReelectionOfSameJobManager() throws Exception {
+ UUID leaderSessionID = UUID.randomUUID();
+ UUID newLeaderSessionID = UUID.randomUUID();
+
+ FiniteDuration shortTimeout = new FiniteDuration(20, TimeUnit.SECONDS);
+
+ cluster.grantLeadership(0, leaderSessionID);
+ cluster.notifyRetrievalListeners(0, leaderSessionID);
+
+ cluster.waitForTaskManagersToBeRegistered();
+
+ // submit blocking job
+ cluster.submitJobDetached(job);
+
+ ActorGateway jm = cluster.getLeaderGateway(timeout);
+
+ Future<Object> wait = jm.ask(new WaitForAllVerticesToBeRunningOrFinished(job.getJobID()), timeout);
+
+ Await.ready(wait, timeout);
+
+ Future<Object> jobRemoval = jm.ask(new NotifyWhenJobRemoved(job.getJobID()), timeout);
+
+ // make JM(0) again the leader --> this implies first a leadership revokal
+ cluster.grantLeadership(0, newLeaderSessionID);
+
+ Await.ready(jobRemoval, timeout);
+
+ // The TMs should not be able to reconnect since they don't know the current leader
+ // session ID
+ try {
+ cluster.waitForTaskManagersToBeRegistered(shortTimeout);
+ fail("TaskManager should not be able to register at JobManager.");
+ } catch (TimeoutException e) {
+ // expected exception since the TMs have still the old leader session ID
+ }
+
+ // notify the TMs about the new (old) leader
+ cluster.notifyRetrievalListeners(0, newLeaderSessionID);
+
+ cluster.waitForTaskManagersToBeRegistered();
+
+ // try to resubmit now the non-blocking job, it should complete successfully
+ Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(false);
+ cluster.submitJobAndWait(job, false, timeout);
+
+ }
+
+ public JobGraph createBlockingJob(int parallelism) {
+ Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
+
+ JobVertex sender = new JobVertex("sender");
+ JobVertex receiver = new JobVertex("receiver");
+
+ sender.setInvokableClass(Tasks.Sender.class);
+ receiver.setInvokableClass(Tasks.BlockingOnceReceiver.class);
+
+ sender.setParallelism(parallelism);
+ receiver.setParallelism(parallelism);
+
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+
+ SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
+ sender.setSlotSharingGroup(slotSharingGroup);
+ receiver.setSlotSharingGroup(slotSharingGroup);
+
+ return new JobGraph("Blocking test job", sender, receiver);
+ }
+}