You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/07/23 17:41:12 UTC
[4/6] flink git commit: [FLINK-2332] [runtime] Adds leader session
IDs and registration session IDs to JobManager and TaskManager messages.
http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index d78a594..f974946 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -20,13 +20,14 @@ package org.apache.flink.runtime.taskmanager
import java.io.{File, IOException}
import java.net.{InetAddress, InetSocketAddress}
+import java.util.UUID
import java.util.concurrent.TimeUnit
import java.lang.reflect.Method
import java.lang.management.{OperatingSystemMXBean, ManagementFactory}
-import akka.actor._
-import akka.pattern.ask
-import akka.util.Timeout
+import _root_.akka.actor._
+import _root_.akka.pattern.ask
+import _root_.akka.util.Timeout
import com.codahale.metrics.{Gauge, MetricFilter, MetricRegistry}
import com.codahale.metrics.json.MetricsModule
@@ -36,9 +37,10 @@ import com.fasterxml.jackson.databind.ObjectMapper
import grizzled.slf4j.Logger
import org.apache.flink.configuration.{Configuration, ConfigConstants, GlobalConfiguration, IllegalConfigurationException}
+
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
import org.apache.flink.runtime.messages.checkpoint.{NotifyCheckpointComplete, TriggerCheckpoint, AbstractCheckpointMessage}
-import org.apache.flink.runtime.accumulators.{AccumulatorSnapshot, AccumulatorRegistry}
-import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages}
+import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessages, LogMessages, StreamingMode}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.blob.{BlobService, BlobCache}
import org.apache.flink.runtime.broadcast.BroadcastVariableManager
@@ -46,7 +48,8 @@ import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, Ta
import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, FallbackLibraryCacheManager, LibraryCacheManager}
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
import org.apache.flink.runtime.filecache.FileCache
-import org.apache.flink.runtime.instance.{HardwareDescription, InstanceConnectionInfo, InstanceID}
+import org.apache.flink.runtime.instance.{AkkaActorGateway, HardwareDescription,
+InstanceConnectionInfo, InstanceID}
import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync}
import org.apache.flink.runtime.io.network.NetworkEnvironment
@@ -114,15 +117,20 @@ import scala.language.postfixOps
* - Exceptions releasing intermediate result resources. Critical resource leak,
* requires a clean JVM.
*/
-class TaskManager(protected val config: TaskManagerConfiguration,
- protected val connectionInfo: InstanceConnectionInfo,
- protected val jobManagerAkkaURL: String,
- protected val memoryManager: MemoryManager,
- protected val ioManager: IOManager,
- protected val network: NetworkEnvironment,
- protected val numberOfSlots: Int)
-
-extends Actor with ActorLogMessages with ActorSynchronousLogging {
+class TaskManager(
+ protected val config: TaskManagerConfiguration,
+ protected val connectionInfo: InstanceConnectionInfo,
+ protected val jobManagerAkkaURL: String,
+ protected val memoryManager: MemoryManager,
+ protected val ioManager: IOManager,
+ protected val network: NetworkEnvironment,
+ protected val numberOfSlots: Int)
+ extends FlinkActor
+ with LeaderSessionMessages // Mixin order is important: second we want to filter leader messages
+ with LogMessages // Mixin order is important: first we want to support message logging
+{
+
+ override val log = Logger(getClass)
/** The timeout for all actor ask futures */
protected val askTimeout = new Timeout(config.timeout)
@@ -161,6 +169,10 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
private var heartbeatScheduler: Option[Cancellable] = None
+ protected var leaderSessionID: Option[UUID] = None
+
+ private var currentRegistrationSessionID: UUID = UUID.randomUUID()
+
// --------------------------------------------------------------------------
// Actor messages and life cycle
// --------------------------------------------------------------------------
@@ -183,8 +195,15 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
// kick off the registration
val deadline: Option[Deadline] = config.maxRegistrationDuration.map(_.fromNow)
- self.tell(TriggerTaskManagerRegistration(jobManagerAkkaURL,
- TaskManager.INITIAL_REGISTRATION_TIMEOUT, deadline,1), ActorRef.noSender)
+ self ! decorateMessage(
+ TriggerTaskManagerRegistration(
+ currentRegistrationSessionID,
+ jobManagerAkkaURL,
+ TaskManager.INITIAL_REGISTRATION_TIMEOUT,
+ deadline,
+ 1)
+ )
+
}
/**
@@ -236,8 +255,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
* Central handling of actor messages. This method delegates to the more specialized
* methods for handling certain classes of messages.
*/
- override def receiveWithLogMessages: Receive = {
-
+ override def handleMessage: Receive = {
// task messages are most common and critical, we handle them first
case message: TaskMessage => handleTaskMessage(message)
@@ -259,7 +277,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
// its registration at the JobManager
case NotifyWhenRegisteredAtJobManager =>
if (isConnected) {
- sender ! RegisteredAtJobManager
+ sender ! decorateMessage(RegisteredAtJobManager)
} else {
waitForRegistration += sender
}
@@ -339,12 +357,12 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
// was into a terminal state, or in case the JobManager cannot be informed of the
// state transition
- case updateMsg@UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) =>
+ case updateMsg@UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) =>
// we receive these from our tasks and forward them to the JobManager
currentJobManager foreach {
jobManager => {
- val futureResponse = (jobManager ? updateMsg)(askTimeout)
+ val futureResponse = (jobManager ? decorateMessage(updateMsg))(askTimeout)
val executionID = taskExecutionState.getID
@@ -353,13 +371,20 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
// but only send messages to the TaskManager to do those changes
case Success(result) =>
if (!result) {
- self ! FailTask(executionID,
+ self ! decorateMessage(
+ FailTask(
+ executionID,
new Exception("Task has been cancelled on the JobManager."))
+ )
}
case Failure(t) =>
- self ! FailTask(executionID, new Exception(
- "Failed to send ExecutionStateChange notification to JobManager"))
+ self ! decorateMessage(
+ FailTask(
+ executionID,
+ new Exception(
+ "Failed to send ExecutionStateChange notification to JobManager"))
+ )
}(context.dispatcher)
}
}
@@ -387,11 +412,15 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
val task = runningTasks.get(executionID)
if (task != null) {
task.cancelExecution()
- sender ! new TaskOperationResult(executionID, true)
+ sender ! decorateMessage(new TaskOperationResult(executionID, true))
} else {
log.debug(s"Cannot find task to cancel for execution ${executionID})")
- sender ! new TaskOperationResult(executionID, false,
+ sender ! decorateMessage(
+ new TaskOperationResult(
+ executionID,
+ false,
"No task with that execution ID was found.")
+ )
}
case PartitionState(taskExecutionId, taskResultId, partitionId, state) =>
@@ -413,7 +442,6 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
private def handleCheckpointingMessage(actorMessage: AbstractCheckpointMessage): Unit = {
actorMessage match {
-
case message: TriggerCheckpoint =>
val taskExecutionId = message.getTaskExecutionId
val checkpointId = message.getCheckpointId
@@ -457,118 +485,148 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
* @param message The registration message.
*/
private def handleRegistrationMessage(message: RegistrationMessage): Unit = {
+ if(message.registrationSessionID.equals(currentRegistrationSessionID)) {
+ message match {
+ case TriggerTaskManagerRegistration(
+ registrationSessionID,
+ jobManagerURL,
+ timeout,
+ deadline,
+ attempt) =>
+
+ if (isConnected) {
+ // this may be the case, if we queue another attempt and
+ // in the meantime, the registration is acknowledged
+ log.debug(
+ "TaskManager was triggered to register at JobManager, but is already registered")
+ }
+ else if (deadline.exists(_.isOverdue())) {
+ // we failed to register in time. that means we should quit
+ log.error("Failed to register at the JobManager withing the defined maximum " +
+ "connect time. Shutting down ...")
- message match {
-
- case TriggerTaskManagerRegistration(jobManagerURL, timeout, deadline, attempt) =>
- if (isConnected) {
- // this may be the case, if we queue another attempt and
- // in the meantime, the registration is acknowledged
- log.debug(
- "TaskManager was triggered to register at JobManager, but is already registered")
- }
- else if (deadline.exists(_.isOverdue())) {
- // we failed to register in time. that means we should quit
- log.error("Failed to register at the JobManager withing the defined maximum " +
- "connect time. Shutting down ...")
+ // terminate ourselves (hasta la vista)
+ self ! decorateMessage(PoisonPill)
+ }
+ else {
+ log.info(s"Trying to register at JobManager ${jobManagerURL} " +
+ s"(attempt ${attempt}, timeout: ${timeout})")
+
+ val jobManager = context.actorSelection(jobManagerAkkaURL)
+ jobManager ! decorateMessage(
+ RegisterTaskManager(
+ registrationSessionID,
+ self,
+ connectionInfo,
+ resources,
+ numberOfSlots)
+ )
+
+ // the next timeout computes via exponential backoff with cap
+ val nextTimeout = (timeout * 2).min(TaskManager.MAX_REGISTRATION_TIMEOUT)
+
+ // schedule (with our timeout s delay) a check triggers a new registration
+ // attempt, if we are not registered by then
+ context.system.scheduler.scheduleOnce(timeout) {
+ if (!isConnected) {
+ self ! decorateMessage(
+ TriggerTaskManagerRegistration(
+ registrationSessionID,
+ jobManagerURL,
+ nextTimeout,
+ deadline,
+ attempt + 1)
+ )
+ }
+ }(context.dispatcher)
+ }
- // terminate ourselves (hasta la vista)
- self ! PoisonPill
- }
- else {
- log.info(s"Trying to register at JobManager ${jobManagerURL} " +
- s"(attempt ${attempt}, timeout: ${timeout})")
-
- val jobManager = context.actorSelection(jobManagerAkkaURL)
- jobManager ! RegisterTaskManager(self, connectionInfo, resources, numberOfSlots)
-
- // the next timeout computes via exponential backoff with cap
- val nextTimeout = (timeout * 2).min(TaskManager.MAX_REGISTRATION_TIMEOUT)
-
- // schedule (with our timeout s delay) a check triggers a new registration
- // attempt, if we are not registered by then
- context.system.scheduler.scheduleOnce(timeout) {
- if (!isConnected) {
- self.tell(TriggerTaskManagerRegistration(jobManagerURL,
- nextTimeout, deadline, attempt + 1), ActorRef.noSender)
+ // successful registration. associate with the JobManager
+ // we disambiguate duplicate or erroneous messages, to simplify debugging
+ case AcknowledgeRegistration(_, leaderSessionID, jobManager, id, blobPort) =>
+ if (isConnected) {
+ if (jobManager == currentJobManager.orNull) {
+ log.debug("Ignoring duplicate registration acknowledgement.")
+ } else {
+ log.warn(s"Ignoring 'AcknowledgeRegistration' message from ${jobManager.path} , " +
+ s"because the TaskManager is already registered at ${currentJobManager.orNull}")
}
- }(context.dispatcher)
- }
-
- // successful registration. associate with the JobManager
- // we disambiguate duplicate or erroneous messages, to simplify debugging
- case AcknowledgeRegistration(jobManager, id, blobPort) =>
- if (isConnected) {
- if (jobManager == currentJobManager.orNull) {
- log.debug("Ignoring duplicate registration acknowledgement.")
- } else {
- log.warn(s"Ignoring 'AcknowledgeRegistration' message from ${jobManager.path} , " +
- s"because the TaskManager is already registered at ${currentJobManager.orNull}")
}
- }
- else {
- // not yet connected, so let's associate with that JobManager
- try {
- associateWithJobManager(jobManager, id, blobPort)
- } catch {
- case t: Throwable =>
- killTaskManagerFatal(
- "Unable to start TaskManager components after registering at JobManager", t)
+ else {
+ // not yet connected, so let's associate with that JobManager
+ try {
+ associateWithJobManager(jobManager, id, blobPort, leaderSessionID)
+ } catch {
+ case t: Throwable =>
+ killTaskManagerFatal(
+ "Unable to start TaskManager components after registering at JobManager", t)
+ }
}
- }
- // we are already registered at that specific JobManager - duplicate answer, rare cases
- case AlreadyRegistered(jobManager, id, blobPort) =>
- if (isConnected) {
- if (jobManager == currentJobManager.orNull) {
- log.debug("Ignoring duplicate registration acknowledgement.")
- } else {
- log.warn(s"Received 'AlreadyRegistered' message from JobManager ${jobManager.path}, " +
- s"even through TaskManager is currently registered at ${currentJobManager.orNull}")
+ // we are already registered at that specific JobManager - duplicate answer, rare cases
+ case AlreadyRegistered(_, leaderSesssionID, jobManager, id, blobPort) =>
+ if (isConnected) {
+ if (jobManager == currentJobManager.orNull) {
+ log.debug("Ignoring duplicate registration acknowledgement.")
+ } else {
+ log.warn(s"Received 'AlreadyRegistered' message from " +
+ s"JobManager ${jobManager.path}, even through TaskManager is currently " +
+ s"registered at ${currentJobManager.orNull}")
+ }
}
- }
- else {
- // not connected, yet, to let's associate
- log.info("Received 'AlreadyRegistered' message before 'AcknowledgeRegistration'")
-
- try {
- associateWithJobManager(jobManager, id, blobPort)
- } catch {
- case t: Throwable =>
- killTaskManagerFatal(
- "Unable to start TaskManager components after registering at JobManager", t)
+ else {
+ // not connected, yet, to let's associate
+ log.info("Received 'AlreadyRegistered' message before 'AcknowledgeRegistration'")
+
+ try {
+ associateWithJobManager(jobManager, id, blobPort, leaderSesssionID)
+ } catch {
+ case t: Throwable =>
+ killTaskManagerFatal(
+ "Unable to start TaskManager components after registering at JobManager", t)
+ }
}
- }
- case RefuseRegistration(reason) =>
- if (currentJobManager.isEmpty) {
- log.error(s"The registration at JobManager ${jobManagerAkkaURL} was refused, " +
- s"because: ${reason}. Retrying later...")
+ case RefuseRegistration(registrationSessionID, reason) =>
+ if (currentJobManager.isEmpty) {
+ log.error(s"The registration at JobManager ${jobManagerAkkaURL} was refused, " +
+ s"because: ${reason}. Retrying later...")
- // try the registration again after some time
+ // try the registration again after some time
- val delay: FiniteDuration = TaskManager.DELAY_AFTER_REFUSED_REGISTRATION
- val deadline: Option[Deadline] = config.maxRegistrationDuration.map {
- timeout => timeout + delay fromNow
- }
+ val delay: FiniteDuration = TaskManager.DELAY_AFTER_REFUSED_REGISTRATION
+ val deadline: Option[Deadline] = config.maxRegistrationDuration.map {
+ timeout => timeout + delay fromNow
+ }
- context.system.scheduler.scheduleOnce(delay) {
- self.tell(TriggerTaskManagerRegistration(jobManagerAkkaURL,
- TaskManager.INITIAL_REGISTRATION_TIMEOUT, deadline, 1), ActorRef.noSender)
- }(context.dispatcher)
- }
- else {
- // ignore RefuseRegistration messages which arrived after AcknowledgeRegistration
- if (sender() == currentJobManager.orNull) {
- log.warn(s"Received 'RefuseRegistration' from the JobManager (${sender().path})" +
- s" even though this TaskManager is already registered there.")
+ context.system.scheduler.scheduleOnce(delay) {
+ self ! decorateMessage(
+ TriggerTaskManagerRegistration(
+ registrationSessionID,
+ jobManagerAkkaURL,
+ TaskManager.INITIAL_REGISTRATION_TIMEOUT,
+ deadline,
+ 1)
+ )
+ }(context.dispatcher)
}
else {
- log.warn(s"Ignoring 'RefuseRegistration' from unrelated JobManager (${sender().path})")
+ // ignore RefuseRegistration messages which arrived after AcknowledgeRegistration
+ if (sender() == currentJobManager.orNull) {
+ log.warn(s"Received 'RefuseRegistration' from the JobManager (${sender().path})" +
+ s" even though this TaskManager is already registered there.")
+ }
+ else {
+ log.warn(s"Ignoring 'RefuseRegistration' from unrelated " +
+ s"JobManager (${sender().path})")
+ }
}
- }
- case _ => unhandled(message)
+ case _ => unhandled(message)
+ }
+ } else {
+ log.debug(s"Discarded registration message ${message}, because the registration session " +
+ "ID was not correct.")
}
}
@@ -593,20 +651,28 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
* @param id The instanceID under which the TaskManager is registered
* at the JobManager.
* @param blobPort The JobManager's port for the BLOB server.
+ * @param newLeaderSessionID Leader session ID of the JobManager
*/
- private def associateWithJobManager(jobManager: ActorRef,
- id: InstanceID,
- blobPort: Int): Unit = {
+ private def associateWithJobManager(
+ jobManager: ActorRef,
+ id: InstanceID,
+ blobPort: Int,
+ newLeaderSessionID: UUID)
+ : Unit = {
if (jobManager == null) {
- throw new NullPointerException("jobManager may not be null")
+ throw new NullPointerException("jobManager must not be null.")
}
if (id == null) {
- throw new NullPointerException("instance ID may not be null")
+ throw new NullPointerException("instance ID must not be null.")
}
if (blobPort <= 0 || blobPort > 65535) {
throw new IllegalArgumentException("blob port is out of range: " + blobPort)
}
+ if(newLeaderSessionID == null) {
+ throw new NullPointerException("Leader session ID must not be null.")
+ }
+
// sanity check that we are not currently registered with a different JobManager
if (isConnected) {
@@ -631,9 +697,18 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
throw new IllegalStateException("JobManager-specific components are already initialized.")
}
+ currentJobManager = Some(jobManager)
+ instanceID = id
+ leaderSessionID = Some(newLeaderSessionID)
+
// start the network stack, now that we have the JobManager actor reference
try {
- network.associateWithTaskManagerAndJobManager(jobManager, self)
+ network.associateWithTaskManagerAndJobManager(
+ new AkkaActorGateway(jobManager, leaderSessionID),
+ new AkkaActorGateway(self, leaderSessionID)
+ )
+
+
}
catch {
case e: Exception =>
@@ -665,16 +740,18 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
libraryCacheManager = Some(new FallbackLibraryCacheManager)
}
- currentJobManager = Some(jobManager)
- instanceID = id
-
// watch job manager to detect when it dies
context.watch(jobManager)
// schedule regular heartbeat message for oneself
- heartbeatScheduler = Some(context.system.scheduler.schedule(
- TaskManager.HEARTBEAT_INTERVAL, TaskManager.HEARTBEAT_INTERVAL, self, SendHeartbeat)
- (context.dispatcher))
+ heartbeatScheduler = Some(
+ context.system.scheduler.schedule(
+ TaskManager.HEARTBEAT_INTERVAL,
+ TaskManager.HEARTBEAT_INTERVAL,
+ self,
+ decorateMessage(SendHeartbeat)
+ )(context.dispatcher)
+ )
// notify all the actors that listen for a successful registration
for (listener <- waitForRegistration) {
@@ -710,7 +787,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
// de-register from the JobManager (faster detection of disconnect)
currentJobManager foreach {
- _ ! Disconnect(s"TaskManager ${self.path} is shutting down.")
+ _ ! decorateMessage(Disconnect(s"TaskManager ${self.path} is shutting down."))
}
currentJobManager = None
@@ -748,8 +825,14 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
// begin attempts to reconnect
val deadline: Option[Deadline] = config.maxRegistrationDuration.map(_.fromNow)
- self ! TriggerTaskManagerRegistration(jobManagerAkkaURL,
- TaskManager.INITIAL_REGISTRATION_TIMEOUT, deadline, 1)
+ self ! decorateMessage(
+ TriggerTaskManagerRegistration(
+ currentRegistrationSessionID,
+ jobManagerAkkaURL,
+ TaskManager.INITIAL_REGISTRATION_TIMEOUT,
+ deadline,
+ 1)
+ )
}
catch {
// this is pretty bad, it leaves the TaskManager in a state where it cannot
@@ -795,8 +878,21 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
// create the task. this does not grab any TaskManager resources or download
// and libraries - the operation does not block
- val task = new Task(tdd, memoryManager, ioManager, network, bcVarManager,
- self, jobManagerActor, config.timeout, libCache, fileCache)
+
+ val jobManagerGateway = new AkkaActorGateway(jobManagerActor, leaderSessionID)
+ val selfGateway = new AkkaActorGateway(self, leaderSessionID)
+
+ val task = new Task(
+ tdd,
+ memoryManager,
+ ioManager,
+ network,
+ bcVarManager,
+ selfGateway,
+ jobManagerGateway,
+ config.timeout,
+ libCache,
+ fileCache)
log.info(s"Received task ${task.getTaskNameWithSubtasks}")
@@ -812,12 +908,12 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
// all good, we kick off the task, which performs its own initialization
task.startTaskThread()
- sender ! Acknowledge
+ sender ! decorateMessage(Acknowledge)
}
catch {
case t: Throwable =>
log.error("SubmitTask failed", t)
- sender ! Failure(t)
+ sender ! decorateMessage(Failure(t))
}
}
@@ -828,8 +924,9 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
* @param partitionInfos The descriptor of the intermediate result partitions.
*/
private def updateTaskInputPartitions(
- executionId: ExecutionAttemptID,
- partitionInfos: Seq[(IntermediateDataSetID, InputChannelDeploymentDescriptor)]) : Unit = {
+ executionId: ExecutionAttemptID,
+ partitionInfos: Seq[(IntermediateDataSetID, InputChannelDeploymentDescriptor)])
+ : Unit = {
Option(runningTasks.get(executionId)) match {
case Some(task) =>
@@ -867,15 +964,15 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
}
if (errors.isEmpty) {
- sender ! Acknowledge
+ sender ! decorateMessage(Acknowledge)
} else {
- sender ! Failure(new Exception(errors.mkString("\n")))
+ sender ! decorateMessage(Failure(new Exception(errors.mkString("\n"))))
}
case None =>
log.debug(s"Discard update for input partitions of task $executionId : " +
s"task is no longer running.")
- sender ! Acknowledge
+ sender ! decorateMessage(Acknowledge)
}
}
@@ -919,9 +1016,16 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
registry.getSnapshot
}
- self ! UpdateTaskExecutionState(new TaskExecutionState(
- task.getJobID, task.getExecutionId, task.getExecutionState, task.getFailureCause,
- accumulators))
+ self ! decorateMessage(
+ UpdateTaskExecutionState(
+ new TaskExecutionState(
+ task.getJobID,
+ task.getExecutionId,
+ task.getExecutionState,
+ task.getFailureCause,
+ accumulators)
+ )
+ )
}
else {
log.error(s"Cannot find task with ID $executionID to unregister.")
@@ -952,7 +1056,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
}
currentJobManager foreach {
- jm => jm ! Heartbeat(instanceID, metricsReport, accumulatorEvents)
+ jm => jm ! decorateMessage(Heartbeat(instanceID, metricsReport, accumulatorEvents))
}
}
catch {
@@ -972,14 +1076,12 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
try {
val traces = Thread.getAllStackTraces.asScala
- val stackTraceStr = traces.map(
- (trace: (Thread, Array[StackTraceElement])) => {
- val (thread, elements) = trace
+ val stackTraceStr = traces.map {
+ case (thread: Thread, elements: Array[StackTraceElement]) =>
"Thread: " + thread.getName + '\n' + elements.mkString("\n")
- })
- .mkString("\n\n")
+ }.mkString("\n\n")
- recipient ! StackTrace(instanceID, stackTraceStr)
+ recipient ! decorateMessage(StackTrace(instanceID, stackTraceStr))
}
catch {
case e: Exception => log.error("Failed to send stack trace to " + recipient.path, e)
@@ -999,7 +1101,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
"\n" +
"A fatal error occurred, forcing the TaskManager to shut down: " + message, cause)
- self ! Kill
+ self ! decorateMessage(Kill)
}
}
@@ -1165,24 +1267,34 @@ object TaskManager {
* Allows to use TaskManager subclasses for example for YARN.
*/
@throws(classOf[Exception])
- def selectNetworkInterfaceAndRunTaskManager(configuration: Configuration,
- streamingMode: StreamingMode,
- taskManagerClass: Class[_ <: TaskManager]) : Unit = {
+ def selectNetworkInterfaceAndRunTaskManager(
+ configuration: Configuration,
+ streamingMode: StreamingMode,
+ taskManagerClass: Class[_ <: TaskManager])
+ : Unit = {
val (jobManagerHostname, jobManagerPort) = getAndCheckJobManagerAddress(configuration)
- val (taskManagerHostname, actorSystemPort) =
- selectNetworkInterfaceAndPort(configuration, jobManagerHostname, jobManagerPort)
+ val (taskManagerHostname, actorSystemPort) = selectNetworkInterfaceAndPort(
+ configuration,
+ jobManagerHostname,
+ jobManagerPort)
- runTaskManager(taskManagerHostname, actorSystemPort, configuration,
- streamingMode, taskManagerClass)
+ runTaskManager(
+ taskManagerHostname,
+ actorSystemPort,
+ configuration,
+ streamingMode,
+ taskManagerClass)
}
@throws(classOf[IOException])
@throws(classOf[IllegalConfigurationException])
- def selectNetworkInterfaceAndPort(configuration: Configuration,
- jobManagerHostname: String,
- jobManagerPort: Int) : (String, Int) = {
+ def selectNetworkInterfaceAndPort(
+ configuration: Configuration,
+ jobManagerHostname: String,
+ jobManagerPort: Int)
+ : (String, Int) = {
var taskManagerHostname = configuration.getString(
ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null)
@@ -1243,13 +1355,19 @@ object TaskManager {
* @param configuration The configuration for the TaskManager.
*/
@throws(classOf[Exception])
- def runTaskManager(taskManagerHostname: String,
- actorSystemPort: Int,
- configuration: Configuration,
- streamingMode: StreamingMode) : Unit = {
+ def runTaskManager(
+ taskManagerHostname: String,
+ actorSystemPort: Int,
+ configuration: Configuration,
+ streamingMode: StreamingMode)
+ : Unit = {
- runTaskManager(taskManagerHostname, actorSystemPort, configuration,
- streamingMode, classOf[TaskManager])
+ runTaskManager(
+ taskManagerHostname,
+ actorSystemPort,
+ configuration,
+ streamingMode,
+ classOf[TaskManager])
}
/**
@@ -1269,11 +1387,13 @@ object TaskManager {
* subclasses for example for YARN.
*/
@throws(classOf[Exception])
- def runTaskManager(taskManagerHostname: String,
- actorSystemPort: Int,
- configuration: Configuration,
- streamingMode: StreamingMode,
- taskManagerClass: Class[_ <: TaskManager]) : Unit = {
+ def runTaskManager(
+ taskManagerHostname: String,
+ actorSystemPort: Int,
+ configuration: Configuration,
+ streamingMode: StreamingMode,
+ taskManagerClass: Class[_ <: TaskManager])
+ : Unit = {
LOG.info(s"Starting TaskManager in streaming mode $streamingMode")
@@ -1282,8 +1402,10 @@ object TaskManager {
LOG.info(s"Starting TaskManager actor system at $taskManagerHostname:$actorSystemPort")
val taskManagerSystem = try {
- val akkaConfig = AkkaUtils.getAkkaConfig(configuration,
- Some((taskManagerHostname, actorSystemPort)))
+ val akkaConfig = AkkaUtils.getAkkaConfig(
+ configuration,
+ Some((taskManagerHostname, actorSystemPort))
+ )
if (LOG.isDebugEnabled) {
LOG.debug("Using akka configuration\n " + akkaConfig)
}
@@ -1307,13 +1429,14 @@ object TaskManager {
// and the TaskManager actor
try {
LOG.info("Starting TaskManager actor")
- val taskManager = startTaskManagerComponentsAndActor(configuration,
- taskManagerSystem,
- taskManagerHostname,
- Some(TASK_MANAGER_NAME),
- None, false,
- streamingMode,
- taskManagerClass)
+ val taskManager = startTaskManagerComponentsAndActor(
+ configuration,
+ taskManagerSystem,
+ taskManagerHostname,
+ Some(TASK_MANAGER_NAME),
+ None, false,
+ streamingMode,
+ taskManagerClass)
// start a process reaper that watches the JobManager. If the TaskManager actor dies,
// the process reaper will kill the JVM process (to ensure easy failure detection)
@@ -1433,14 +1556,15 @@ object TaskManager {
configuredMemory << 20 // megabytes to bytes
}
else {
- val fraction = configuration.getFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
- ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
+ val fraction = configuration.getFloat(
+ ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+ ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
checkConfigParameter(fraction > 0.0f && fraction < 1.0f, fraction,
ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must be between 0.0 and 1.0")
val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
- fraction).toLong
+ fraction).toLong
LOG.info(s"Using $fraction of the currently free heap space for Flink managed " +
s"memory (${relativeMemSize >> 20} MB).")
@@ -1452,10 +1576,11 @@ object TaskManager {
// now start the memory manager
val memoryManager = try {
- new DefaultMemoryManager(memorySize,
- taskManagerConfig.numberOfSlots,
- netConfig.networkBufferSize,
- preAllocateMemory)
+ new DefaultMemoryManager(
+ memorySize,
+ taskManagerConfig.numberOfSlots,
+ netConfig.networkBufferSize,
+ preAllocateMemory)
}
catch {
case e: OutOfMemoryError => throw new Exception(
@@ -1472,7 +1597,8 @@ object TaskManager {
}
// create the actor properties (which define the actor constructor parameters)
- val tmProps = Props(taskManagerClass,
+ val tmProps = Props(
+ taskManagerClass,
taskManagerConfig,
connectionInfo,
jobManagerAkkaUrl,
@@ -1502,9 +1628,11 @@ object TaskManager {
* @return The ActorRef to the TaskManager
*/
@throws(classOf[IOException])
- def getTaskManagerRemoteReference(taskManagerUrl: String,
- system: ActorSystem,
- timeout: FiniteDuration): ActorRef = {
+ def getTaskManagerRemoteReference(
+ taskManagerUrl: String,
+ system: ActorSystem,
+ timeout: FiniteDuration)
+ : ActorRef = {
try {
val future = AkkaUtils.getReference(taskManagerUrl, system, timeout)
Await.result(future, timeout)
@@ -1536,10 +1664,11 @@ object TaskManager {
* InstanceConnectionInfo, JobManager actor Akka URL).
*/
@throws(classOf[IllegalArgumentException])
- def parseTaskManagerConfiguration(configuration: Configuration,
- taskManagerHostname: String,
- localTaskManagerCommunication: Boolean):
- (TaskManagerConfiguration,
+ def parseTaskManagerConfiguration(
+ configuration: Configuration,
+ taskManagerHostname: String,
+ localTaskManagerCommunication: Boolean)
+ : (TaskManagerConfiguration,
NetworkEnvironmentConfiguration,
InstanceConnectionInfo) = {
@@ -1579,10 +1708,10 @@ object TaskManager {
ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY)
val pageSizeNew: Int = configuration.getInteger(
- ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, -1)
+ ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, -1)
val pageSizeOld: Int = configuration.getInteger(
- ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, -1)
+ ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, -1)
val pageSize: Int =
if (pageSizeNew != -1) {
@@ -1617,7 +1746,7 @@ object TaskManager {
val tmpDirs = configuration.getString(
ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH)
- .split(",|" + File.pathSeparator)
+ .split(",|" + File.pathSeparator)
val nettyConfig = if (localTaskManagerCommunication) {
None
@@ -1633,7 +1762,10 @@ object TaskManager {
val ioMode : IOMode = if (syncOrAsync == "async") IOMode.ASYNC else IOMode.SYNC
val networkConfig = NetworkEnvironmentConfiguration(
- numNetworkBuffers, pageSize, ioMode, nettyConfig)
+ numNetworkBuffers,
+ pageSize,
+ ioMode,
+ nettyConfig)
// ----> timeouts, library caching, profiling
@@ -1667,8 +1799,12 @@ object TaskManager {
e)
}
- val taskManagerConfig = TaskManagerConfiguration(tmpDirs, cleanupInterval, timeout,
- finiteRegistratioDuration, slots,
+ val taskManagerConfig = TaskManagerConfiguration(
+ tmpDirs,
+ cleanupInterval,
+ timeout,
+ finiteRegistratioDuration,
+ slots,
configuration)
(taskManagerConfig, networkConfig, connectionInfo)
@@ -1720,10 +1856,12 @@ object TaskManager {
* @throws IllegalConfigurationException Thrown if the condition is violated.
*/
@throws(classOf[IllegalConfigurationException])
- private def checkConfigParameter(condition: Boolean,
- parameter: Any,
- name: String,
- errorMessage: String = ""): Unit = {
+ private def checkConfigParameter(
+ condition: Boolean,
+ parameter: Any,
+ name: String,
+ errorMessage: String = "")
+ : Unit = {
if (!condition) {
throw new IllegalConfigurationException(
s"Invalid configuration value for '${name}' : ${parameter} - ${errorMessage}")
http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
new file mode 100644
index 0000000..324b014
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Kill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.Option;
+
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class FlinkUntypedActorTest {
+
+ private static ActorSystem actorSystem;
+
+ @BeforeClass
+ public static void setup() {
+ actorSystem = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
+ }
+
+ @AfterClass
+ public static void teardown() {
+ JavaTestKit.shutdownActorSystem(actorSystem);
+ }
+
+ @Test
+ public void testLeaderSessionMessageFilteringOfFlinkUntypedActor() {
+ final Option<UUID> leaderSessionID = Option.apply(UUID.randomUUID());
+ final Option<UUID> oldSessionID = Option.apply(UUID.randomUUID());
+
+ TestActorRef<PlainFlinkUntypedActor> actor = null;
+
+ try {
+ actor = TestActorRef.create(
+ actorSystem, Props.create(PlainFlinkUntypedActor.class, leaderSessionID));
+
+ final PlainFlinkUntypedActor underlyingActor = actor.underlyingActor();
+
+ actor.tell(new JobManagerMessages.LeaderSessionMessage(leaderSessionID, 1), ActorRef.noSender());
+ actor.tell(new JobManagerMessages.LeaderSessionMessage(oldSessionID, 2), ActorRef.noSender());
+ actor.tell(new JobManagerMessages.LeaderSessionMessage(leaderSessionID, 2), ActorRef.noSender());
+ actor.tell(1, ActorRef.noSender());
+
+ assertEquals(3, underlyingActor.getMessageCounter());
+
+ } finally {
+ stopActor(actor);
+ }
+ }
+
+ @Test
+ public void testThrowingExceptionWhenReceivingNonWrappedRequiresLeaderSessionIDMessage() {
+ final Option<UUID> leaderSessionID = Option.apply(UUID.randomUUID());
+
+ TestActorRef<PlainFlinkUntypedActor> actor = null;
+
+ try{
+ final Props props = Props.create(PlainFlinkUntypedActor.class, leaderSessionID);
+ actor = TestActorRef.create(actorSystem, props);
+
+ actor.receive(new JobManagerMessages.LeaderSessionMessage(leaderSessionID, 1));
+
+ try {
+ actor.receive(new PlainRequiresLeaderSessionID());
+
+ fail("Expected an exception to be thrown, because a RequiresLeaderSessionID" +
+ "message was sent without being wrapped in LeaderSessionMessage.");
+ } catch (Exception e) {
+ assertEquals("Received a message PlainRequiresLeaderSessionID " +
+ "without a leader session ID, even though it requires to have one.",
+ e.getMessage());
+ }
+
+ } finally {
+ stopActor(actor);
+ }
+ }
+
+ private static void stopActor(ActorRef actor) {
+ if(actor != null) {
+ actor.tell(Kill.getInstance(), ActorRef.noSender());
+ }
+ }
+
+
+ static class PlainFlinkUntypedActor extends FlinkUntypedActor {
+
+ private Option<UUID> leaderSessionID;
+
+ private int messageCounter;
+
+ public PlainFlinkUntypedActor(Option<UUID> leaderSessionID) {
+ this.leaderSessionID = leaderSessionID;
+ this.messageCounter = 0;
+ }
+
+ @Override
+ protected void handleMessage(Object message) throws Exception {
+ messageCounter++;
+ }
+
+ @Override
+ protected Option<UUID> getLeaderSessionID() {
+ return leaderSessionID;
+ }
+
+ public int getMessageCounter() {
+ return messageCounter;
+ }
+ }
+
+ static class PlainRequiresLeaderSessionID implements RequiresLeaderSessionID {
+ @Override
+ public String toString() {
+ return "PlainRequiresLeaderSessionID";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 932e366..b124304 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -18,12 +18,10 @@
package org.apache.flink.runtime.checkpoint;
-import akka.actor.ActorRef;
-import akka.pattern.Patterns;
-
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -62,18 +60,19 @@ public class CoordinatorShutdownTest {
JobGraph testGraph = new JobGraph("test job", vertex);
testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 5000));
- ActorRef jobManager = cluster.getJobManager();
+ ActorGateway jobManager = cluster.getJobManagerGateway();
FiniteDuration timeout = new FiniteDuration(60, TimeUnit.SECONDS);
JobManagerMessages.SubmitJob submitMessage = new JobManagerMessages.SubmitJob(testGraph, false);
// submit is successful, but then the job dies because no TaskManager / slot is available
- Future<Object> submitFuture = Patterns.ask(jobManager, submitMessage, timeout.toMillis());
+ Future<Object> submitFuture = jobManager.ask(submitMessage, timeout);
Await.result(submitFuture, timeout);
// get the execution graph and make sure the coordinator is properly shut down
- Future<Object> jobRequestFuture = Patterns.ask(jobManager,
- new JobManagerMessages.RequestJob(testGraph.getJobID()), timeout.toMillis());
+ Future<Object> jobRequestFuture = jobManager.ask(
+ new JobManagerMessages.RequestJob(testGraph.getJobID()),
+ timeout);
ExecutionGraph graph = ((JobManagerMessages.JobFound) Await.result(jobRequestFuture, timeout)).executionGraph();
@@ -109,18 +108,19 @@ public class CoordinatorShutdownTest {
JobGraph testGraph = new JobGraph("test job", vertex);
testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 5000));
- ActorRef jobManager = cluster.getJobManager();
+ ActorGateway jobManager = cluster.getJobManagerGateway();
FiniteDuration timeout = new FiniteDuration(60, TimeUnit.SECONDS);
JobManagerMessages.SubmitJob submitMessage = new JobManagerMessages.SubmitJob(testGraph, false);
// submit is successful, but then the job dies because no TaskManager / slot is available
- Future<Object> submitFuture = Patterns.ask(jobManager, submitMessage, timeout.toMillis());
+ Future<Object> submitFuture = jobManager.ask(submitMessage, timeout);
Await.result(submitFuture, timeout);
// get the execution graph and make sure the coordinator is properly shut down
- Future<Object> jobRequestFuture = Patterns.ask(jobManager,
- new JobManagerMessages.RequestJob(testGraph.getJobID()), timeout.toMillis());
+ Future<Object> jobRequestFuture = jobManager.ask(
+ new JobManagerMessages.RequestJob(testGraph.getJobID()),
+ timeout);
ExecutionGraph graph = ((JobManagerMessages.JobFound) Await.result(jobRequestFuture, timeout)).executionGraph();
http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index cff7146..e3fc852 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -92,7 +92,7 @@ public class ExecutionGraphDeploymentTest {
ExecutionJobVertex ejv = eg.getAllVertices().get(jid2);
ExecutionVertex vertex = ejv.getTaskVertices()[3];
- ExecutionGraphTestUtils.SimpleInstanceGateway instanceGateway = new ExecutionGraphTestUtils.SimpleInstanceGateway(TestingUtils.directExecutionContext());
+ ExecutionGraphTestUtils.SimpleActorGateway instanceGateway = new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext());
final Instance instance = getInstance(instanceGateway);
@@ -295,7 +295,7 @@ public class ExecutionGraphDeploymentTest {
for (int i = 0; i < dop1 + dop2; i++) {
scheduler.newInstanceAvailable(
ExecutionGraphTestUtils.getInstance(
- new ExecutionGraphTestUtils.SimpleInstanceGateway(
+ new ExecutionGraphTestUtils.SimpleActorGateway(
TestingUtils.directExecutionContext())));
}
assertEquals(dop1 + dop2, scheduler.getNumberOfAvailableSlots());
http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 8a63060..64d4c44 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -24,18 +24,17 @@ import static org.mockito.Mockito.spy;
import java.lang.reflect.Field;
import java.net.InetAddress;
-import java.util.LinkedList;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.BaseTestingInstanceGateway;
+import org.apache.flink.runtime.instance.BaseTestingActorGateway;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.runtime.instance.InstanceGateway;
+import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -101,11 +100,11 @@ public class ExecutionGraphTestUtils {
// utility mocking methods
// --------------------------------------------------------------------------------------------
- public static Instance getInstance(final InstanceGateway gateway) throws Exception {
+ public static Instance getInstance(final ActorGateway gateway) throws Exception {
return getInstance(gateway, 1);
}
- public static Instance getInstance(final InstanceGateway gateway, final int numberOfSlots) throws Exception {
+ public static Instance getInstance(final ActorGateway gateway, final int numberOfSlots) throws Exception {
HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
InetAddress address = InetAddress.getByName("127.0.0.1");
InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
@@ -113,10 +112,10 @@ public class ExecutionGraphTestUtils {
return new Instance(gateway, connection, new InstanceID(), hardwareDescription, numberOfSlots);
}
- public static class SimpleInstanceGateway extends BaseTestingInstanceGateway {
+ public static class SimpleActorGateway extends BaseTestingActorGateway {
public TaskDeploymentDescriptor lastTDD;
- public SimpleInstanceGateway(ExecutionContext executionContext){
+ public SimpleActorGateway(ExecutionContext executionContext){
super(executionContext);
}
@@ -140,8 +139,8 @@ public class ExecutionGraphTestUtils {
}
}
- public static class SimpleFailingInstanceGateway extends BaseTestingInstanceGateway {
- public SimpleFailingInstanceGateway(ExecutionContext executionContext) {
+ public static class SimpleFailingActorGateway extends BaseTestingActorGateway {
+ public SimpleFailingActorGateway(ExecutionContext executionContext) {
super(executionContext);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index f47e92c..89b82f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -63,7 +63,7 @@ public class ExecutionStateProgressTest {
// mock resources and mock taskmanager
for (ExecutionVertex ee : ejv.getTaskVertices()) {
SimpleSlot slot = getInstance(
- new SimpleInstanceGateway(
+ new SimpleActorGateway(
TestingUtils.defaultExecutionContext())
).allocateSimpleSlot(jid);
ee.deployToSlot(slot);
http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 9db330b..e9b67af 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -26,9 +26,9 @@ import java.io.IOException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.BaseTestingInstanceGateway;
-import org.apache.flink.runtime.instance.DummyInstanceGateway;
-import org.apache.flink.runtime.instance.InstanceGateway;
+import org.apache.flink.runtime.instance.BaseTestingActorGateway;
+import org.apache.flink.runtime.instance.DummyActorGateway;
+import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.api.common.JobID;
@@ -125,12 +125,12 @@ public class ExecutionVertexCancelTest {
setVertexState(vertex, ExecutionState.SCHEDULED);
assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
- InstanceGateway instanceGateway = new CancelSequenceInstanceGateway(
+ ActorGateway actorGateway = new CancelSequenceActorGateway(
executionContext,
new TaskOperationResult(execId, true),
new TaskOperationResult(execId, false));
- Instance instance = getInstance(instanceGateway);
+ Instance instance = getInstance(actorGateway);
SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
vertex.deployToSlot(slot);
@@ -195,13 +195,13 @@ public class ExecutionVertexCancelTest {
// task manager cancel sequence mock actor
// first return NOT SUCCESS (task not found, cancel call overtook deploy call), then success (cancel call after deploy call)
- InstanceGateway instanceGateway = new CancelSequenceInstanceGateway(
+ ActorGateway actorGateway = new CancelSequenceActorGateway(
executionContext,
new TaskOperationResult(execId, false),
new TaskOperationResult(execId, true)
);
- Instance instance = getInstance(instanceGateway);
+ Instance instance = getInstance(actorGateway);
SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
vertex.deployToSlot(slot);
@@ -258,11 +258,11 @@ public class ExecutionVertexCancelTest {
AkkaUtils.getDefaultTimeout());
final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
- InstanceGateway instanceGateway = new CancelSequenceInstanceGateway(
+ ActorGateway actorGateway = new CancelSequenceActorGateway(
TestingUtils.directExecutionContext(),
new TaskOperationResult(execId, true));
- Instance instance = getInstance(instanceGateway);
+ Instance instance = getInstance(actorGateway);
SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
setVertexState(vertex, ExecutionState.RUNNING);
@@ -299,12 +299,12 @@ public class ExecutionVertexCancelTest {
AkkaUtils.getDefaultTimeout());
final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
- final InstanceGateway instanceGateway = new CancelSequenceInstanceGateway(
+ final ActorGateway actorGateway = new CancelSequenceActorGateway(
TestingUtils.directExecutionContext(),
new TaskOperationResult(execId, true)
);
- Instance instance = getInstance(instanceGateway);
+ Instance instance = getInstance(actorGateway);
SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
setVertexState(vertex, ExecutionState.RUNNING);
@@ -350,12 +350,12 @@ public class ExecutionVertexCancelTest {
final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
- final InstanceGateway instanceGateway = new CancelSequenceInstanceGateway(
+ final ActorGateway actorGateway = new CancelSequenceActorGateway(
TestingUtils.directExecutionContext(),
new TaskOperationResult(execId, false)
);
- Instance instance = getInstance(instanceGateway);
+ Instance instance = getInstance(actorGateway);
SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
setVertexState(vertex, ExecutionState.RUNNING);
@@ -386,7 +386,7 @@ public class ExecutionVertexCancelTest {
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
- final InstanceGateway gateway = new CancelSequenceInstanceGateway(TestingUtils.directExecutionContext());
+ final ActorGateway gateway = new CancelSequenceActorGateway(TestingUtils.directExecutionContext());
Instance instance = getInstance(gateway);
SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
@@ -423,7 +423,7 @@ public class ExecutionVertexCancelTest {
AkkaUtils.getDefaultTimeout());
final ExecutionAttemptID execID = vertex.getCurrentExecutionAttempt().getAttemptId();
- final InstanceGateway gateway = new CancelSequenceInstanceGateway(
+ final ActorGateway gateway = new CancelSequenceActorGateway(
TestingUtils.defaultExecutionContext(),
new TaskOperationResult(execID, true));
@@ -482,7 +482,7 @@ public class ExecutionVertexCancelTest {
// deploying after canceling from CREATED needs to raise an exception, because
// the scheduler (or any caller) needs to know that the slot should be released
try {
- Instance instance = getInstance(DummyInstanceGateway.INSTANCE);
+ Instance instance = getInstance(DummyActorGateway.INSTANCE);
SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
vertex.deployToSlot(slot);
@@ -525,7 +525,7 @@ public class ExecutionVertexCancelTest {
AkkaUtils.getDefaultTimeout());
setVertexState(vertex, ExecutionState.CANCELING);
- Instance instance = getInstance(DummyInstanceGateway.INSTANCE);
+ Instance instance = getInstance(DummyActorGateway.INSTANCE);
SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
vertex.deployToSlot(slot);
@@ -541,7 +541,7 @@ public class ExecutionVertexCancelTest {
ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
- Instance instance = getInstance(DummyInstanceGateway.INSTANCE);
+ Instance instance = getInstance(DummyActorGateway.INSTANCE);
SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
setVertexResource(vertex, slot);
@@ -562,11 +562,11 @@ public class ExecutionVertexCancelTest {
}
}
- public static class CancelSequenceInstanceGateway extends BaseTestingInstanceGateway {
+ public static class CancelSequenceActorGateway extends BaseTestingActorGateway {
private final TaskOperationResult[] results;
private int index = -1;
- public CancelSequenceInstanceGateway(ExecutionContext executionContext, TaskOperationResult ... result) {
+ public CancelSequenceActorGateway(ExecutionContext executionContext, TaskOperationResult... result) {
super(executionContext);
this.results = result;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 431c3a9..81ec6c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -43,7 +43,7 @@ public class ExecutionVertexDeploymentTest {
// mock taskmanager to simply accept the call
Instance instance = getInstance(
- new SimpleInstanceGateway(TestingUtils.directExecutionContext()));
+ new SimpleActorGateway(TestingUtils.directExecutionContext()));
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
@@ -81,7 +81,7 @@ public class ExecutionVertexDeploymentTest {
final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext());
final Instance instance = getInstance(
- new SimpleInstanceGateway(TestingUtils.directExecutionContext()));
+ new SimpleActorGateway(TestingUtils.directExecutionContext()));
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
@@ -124,7 +124,7 @@ public class ExecutionVertexDeploymentTest {
AkkaUtils.getDefaultTimeout());
final Instance instance = getInstance(
- new SimpleInstanceGateway(TestingUtils.defaultExecutionContext()));
+ new SimpleActorGateway(TestingUtils.defaultExecutionContext()));
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
@@ -171,7 +171,7 @@ public class ExecutionVertexDeploymentTest {
AkkaUtils.getDefaultTimeout());
final Instance instance = getInstance(
- new SimpleFailingInstanceGateway(TestingUtils.directExecutionContext()));
+ new SimpleFailingActorGateway(TestingUtils.directExecutionContext()));
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
@@ -201,7 +201,7 @@ public class ExecutionVertexDeploymentTest {
AkkaUtils.getDefaultTimeout());
final Instance instance = getInstance(
- new SimpleFailingInstanceGateway(TestingUtils.directExecutionContext()));
+ new SimpleFailingActorGateway(TestingUtils.directExecutionContext()));
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
@@ -244,7 +244,7 @@ public class ExecutionVertexDeploymentTest {
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
- final Instance instance = getInstance(new SimpleInstanceGateway(TestingUtils.directExecutionContext()));
+ final Instance instance = getInstance(new SimpleActorGateway(TestingUtils.directExecutionContext()));
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
@@ -286,7 +286,7 @@ public class ExecutionVertexDeploymentTest {
final ExecutionAttemptID eid = vertex.getCurrentExecutionAttempt().getAttemptId();
final Instance instance = getInstance(
- new ExecutionVertexCancelTest.CancelSequenceInstanceGateway(
+ new ExecutionVertexCancelTest.CancelSequenceActorGateway(
context,
new TaskOperationResult(eid, false),
new TaskOperationResult(eid, true)));
http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 8ea7017..5e9ee33 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -25,7 +25,7 @@ import static org.mockito.Mockito.*;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.DummyInstanceGateway;
+import org.apache.flink.runtime.instance.DummyActorGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -47,7 +47,7 @@ public class ExecutionVertexSchedulingTest {
AkkaUtils.getDefaultTimeout());
// a slot than cannot be deployed to
- final Instance instance = getInstance(DummyInstanceGateway.INSTANCE);
+ final Instance instance = getInstance(DummyActorGateway.INSTANCE);
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
slot.releaseSlot();
@@ -77,7 +77,7 @@ public class ExecutionVertexSchedulingTest {
AkkaUtils.getDefaultTimeout());
// a slot than cannot be deployed to
- final Instance instance = getInstance(DummyInstanceGateway.INSTANCE);
+ final Instance instance = getInstance(DummyActorGateway.INSTANCE);
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
slot.releaseSlot();
@@ -113,7 +113,7 @@ public class ExecutionVertexSchedulingTest {
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
- final Instance instance = getInstance(new ExecutionGraphTestUtils.SimpleInstanceGateway(TestingUtils.defaultExecutionContext()));
+ final Instance instance = getInstance(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.defaultExecutionContext()));
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
Scheduler scheduler = mock(Scheduler.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
index b4a7e63..2530a53 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
@@ -377,7 +377,7 @@ public class LocalInputSplitsTest {
when(connection.getFQDNHostname()).thenReturn(hostname);
return new Instance(
- new ExecutionGraphTestUtils.SimpleInstanceGateway(
+ new ExecutionGraphTestUtils.SimpleActorGateway(
TestingUtils.defaultExecutionContext()),
connection,
new InstanceID(),
http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index b779d79..f42543f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.DummyInstanceGateway;
+import org.apache.flink.runtime.instance.DummyActorGateway;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
@@ -78,7 +78,7 @@ public class TerminalStateDeadlockTest {
InstanceConnectionInfo ci = new InstanceConnectionInfo(address, 12345);
HardwareDescription resources = new HardwareDescription(4, 4000000, 3000000, 2000000);
- Instance instance = new Instance(DummyInstanceGateway.INSTANCE, ci, new InstanceID(), resources, 4);
+ Instance instance = new Instance(DummyActorGateway.INSTANCE, ci, new InstanceID(), resources, 4);
this.resource = instance.allocateSimpleSlot(new JobID());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
index 3305254..8604b63 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
@@ -26,7 +26,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
-import org.apache.flink.runtime.instance.DummyInstanceGateway;
+import org.apache.flink.runtime.instance.DummyActorGateway;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
@@ -403,7 +403,7 @@ public class VertexLocationConstraintTest {
ExecutionVertex ev = eg.getAllVertices().get(vertex.getID()).getTaskVertices()[0];
- Instance instance = ExecutionGraphTestUtils.getInstance(DummyInstanceGateway.INSTANCE);
+ Instance instance = ExecutionGraphTestUtils.getInstance(DummyActorGateway.INSTANCE);
ev.setLocationConstraintHosts(Collections.singletonList(instance));
assertNotNull(ev.getPreferredLocations());
@@ -435,7 +435,7 @@ public class VertexLocationConstraintTest {
when(connection.getFQDNHostname()).thenReturn(hostname);
return new Instance(
- new ExecutionGraphTestUtils.SimpleInstanceGateway(
+ new ExecutionGraphTestUtils.SimpleActorGateway(
TestingUtils.defaultExecutionContext()),
connection,
new InstanceID(),
http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java
new file mode 100644
index 0000000..2e62781
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import akka.actor.ActorRef;
+import akka.dispatch.Futures;
+import scala.Option;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract base class for testing {@link ActorGateway} instances. The implementing subclass
+ * only has to provide an implementation for handleMessage which contains the logic to treat
+ * different messages.
+ */
+abstract public class BaseTestingActorGateway implements ActorGateway {
+ /**
+ * {@link ExecutionContext} which is used to execute the futures.
+ */
+ private final ExecutionContext executionContext;
+
+ public BaseTestingActorGateway(ExecutionContext executionContext) {
+ this.executionContext = executionContext;
+ }
+
+ @Override
+ public Future<Object> ask(Object message, FiniteDuration timeout) {
+ try {
+ final Object result = handleMessage(message);
+
+ return Futures.future(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ return result;
+ }
+ }, executionContext);
+
+ } catch (final Exception e) {
+ // if an exception occurred in the handleMessage method then return it as part of the future
+ return Futures.future(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ throw e;
+ }
+ }, executionContext);
+ }
+ }
+
+ /**
+ * Handles the supported messages by this InstanceGateway
+ *
+ * @param message Message to handle
+ * @return Result
+ * @throws Exception
+ */
+ abstract public Object handleMessage(Object message) throws Exception;
+
+ @Override
+ public void tell(Object message) {
+ try {
+ handleMessage(message);
+ } catch (Exception e) {
+ // discard exception because it happens on the "remote" instance
+ }
+ }
+
+ @Override
+ public void tell(Object message, ActorGateway sender) {
+ try{
+ handleMessage(message);
+ } catch (Exception e) {
+ // discard exception because it happens on the "remote" instance
+ }
+ }
+
+ @Override
+ public void forward(Object message, ActorGateway sender) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) {
+ return ask(message, timeout);
+ }
+
+ @Override
+ public String path() {
+ return "BaseTestingInstanceGateway";
+ }
+
+ @Override
+ public ActorRef actor() {
+ return ActorRef.noSender();
+ }
+
+ @Override
+ public Option<UUID> leaderSessionID() {
+ return Option.empty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java
deleted file mode 100644
index e9f8259..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import akka.actor.ActorPath;
-import akka.actor.ActorRef;
-import akka.dispatch.Futures;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.concurrent.Callable;
-
-/**
- * Abstract base class for testing {@link InstanceGateway} instances. The implementing subclass
- * only has to provide an implementation for handleMessage which contains the logic to treat
- * different messages.
- */
-abstract public class BaseTestingInstanceGateway implements InstanceGateway {
- /**
- * {@link ExecutionContext} which is used to execute the futures.
- */
- private final ExecutionContext executionContext;
-
- public BaseTestingInstanceGateway(ExecutionContext executionContext) {
- this.executionContext = executionContext;
- }
-
- @Override
- public Future<Object> ask(Object message, FiniteDuration timeout) {
- try {
- final Object result = handleMessage(message);
-
- return Futures.future(new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- return result;
- }
- }, executionContext);
-
- } catch (final Exception e) {
- // if an exception occurred in the handleMessage method then return it as part of the future
- return Futures.future(new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- throw e;
- }
- }, executionContext);
- }
- }
-
- /**
- * Handles the supported messages by this InstanceGateway
- *
- * @param message Message to handle
- * @return Result
- * @throws Exception
- */
- abstract public Object handleMessage(Object message) throws Exception;
-
- @Override
- public void tell(Object message) {}
-
- @Override
- public void forward(Object message, ActorRef sender) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) {
- return ask(message, timeout);
- }
-
- @Override
- public String path() {
- throw new UnsupportedOperationException();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyActorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyActorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyActorGateway.java
new file mode 100644
index 0000000..10762f2
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyActorGateway.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import akka.actor.ActorRef;
+import scala.Option;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+
+/**
+ * Dummy {@link ActorGateway} implementation used for testing.
+ */
+public class DummyActorGateway implements ActorGateway {
+ public static final DummyActorGateway INSTANCE = new DummyActorGateway();
+
+ @Override
+ public Future<Object> ask(Object message, FiniteDuration timeout) {
+ return null;
+ }
+
+ @Override
+ public void tell(Object message) {}
+
+ @Override
+ public void tell(Object message, ActorGateway sender) {}
+
+ @Override
+ public void forward(Object message, ActorGateway sender) {}
+
+ @Override
+ public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) {
+ return null;
+ }
+
+ @Override
+ public String path() {
+ return "DummyInstanceGateway";
+ }
+
+ @Override
+ public ActorRef actor() {
+ return ActorRef.noSender();
+ }
+
+ @Override
+ public Option<UUID> leaderSessionID() {
+ return Option.<UUID>empty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java
deleted file mode 100644
index 5941201..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import akka.actor.ActorPath;
-import akka.actor.ActorRef;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Dummy {@link InstanceGateway} implementation used for testing.
- */
-public class DummyInstanceGateway implements InstanceGateway {
- public static final DummyInstanceGateway INSTANCE = new DummyInstanceGateway();
-
- @Override
- public Future<Object> ask(Object message, FiniteDuration timeout) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void tell(Object message) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void forward(Object message, ActorRef sender) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public String path() {
- return "DummyInstanceGateway";
- }
-}