You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/04/06 17:49:36 UTC
[3/7] flink git commit: [FLINK-1580] [taskmanager] Improve
TaskManager startup robustness
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index f558d48..af13b74 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -15,26 +15,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.taskmanager
import java.io.{File, IOException}
import java.net.{InetAddress, InetSocketAddress}
import java.util
import java.util.concurrent.{TimeUnit, FutureTask}
-import management.{GarbageCollectorMXBean, ManagementFactory, MemoryMXBean}
+import java.lang.management.{GarbageCollectorMXBean, ManagementFactory, MemoryMXBean}
import akka.actor._
import akka.pattern.ask
+import akka.util.Timeout
+
import com.codahale.metrics.{Gauge, MetricFilter, MetricRegistry}
import com.codahale.metrics.json.MetricsModule
import com.codahale.metrics.jvm.{MemoryUsageGaugeSet, GarbageCollectorMetricSet}
+
import com.fasterxml.jackson.databind.ObjectMapper
+
import org.apache.flink.api.common.cache.DistributedCache
-import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
+import org.apache.flink.configuration._
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.blob.BlobCache
+import org.apache.flink.runtime.blob.{BlobService, BlobCache}
import org.apache.flink.runtime.broadcast.BroadcastVariableManager
import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor}
import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, FallbackLibraryCacheManager, LibraryCacheManager}
@@ -43,25 +48,25 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
import org.apache.flink.runtime.filecache.FileCache
import org.apache.flink.runtime.instance.{HardwareDescription, InstanceConnectionInfo, InstanceID}
import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync
+import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync}
import org.apache.flink.runtime.io.network.NetworkEnvironment
import org.apache.flink.runtime.io.network.netty.NettyConfig
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
import org.apache.flink.runtime.jobgraph.tasks.{OperatorStateCarrier,BarrierTransceiver}
-import org.apache.flink.runtime.jobmanager.{BarrierReq,JobManager}
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager
-import org.apache.flink.runtime.messages.JobManagerMessages.UpdateTaskExecutionState
-import org.apache.flink.runtime.messages.Messages.{Disconnect, Acknowledge}
-import org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered, RefuseRegistration, AcknowledgeRegistration, RegisterTaskManager}
+import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.memorymanager.{MemoryManager, DefaultMemoryManager}
+import org.apache.flink.runtime.messages.CheckpointingMessages.{CheckpointingMessage, BarrierReq}
+import org.apache.flink.runtime.messages.Messages._
+import org.apache.flink.runtime.messages.RegistrationMessages._
import org.apache.flink.runtime.messages.TaskManagerMessages._
-import org.apache.flink.runtime.messages.TaskManagerProfilerMessages.{UnregisterProfilingListener, UnmonitorTask, MonitorTask, RegisterProfilingListener}
+import org.apache.flink.runtime.messages.TaskMessages._
import org.apache.flink.runtime.net.NetUtils
import org.apache.flink.runtime.process.ProcessReaper
-import org.apache.flink.runtime.profiling.ProfilingUtils
import org.apache.flink.runtime.security.SecurityUtils
import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
import org.apache.flink.runtime.util.{MathUtils, EnvironmentInformation}
import org.apache.flink.util.ExceptionUtils
+
import org.slf4j.LoggerFactory
import scala.concurrent._
@@ -75,331 +80,696 @@ import scala.language.postfixOps
* The TaskManager is responsible for executing the individual tasks of a Flink job. It is
* implemented as an actor. The TaskManager has the following phases:
*
- * - Waiting to be registered with its JobManager. In that phase, it periodically sends
- * [[RegisterAtJobManager]] messages to itself, which trigger the sending of
- * a [[RegisterTaskManager]] message to the JobManager.
+ * - "Waiting to be registered": In that phase, it periodically
+ * sends a [[RegisterTaskManager]] message to the JobManager.
+ * Upon successful registration, the JobManager replies with an [[AcknowledgeRegistration]]
+ * message. This stops the registration messages and initializes all fields
+ * that require the JobManager's actor reference.
+ *
+ * - "Operational": Here the TaskManager accepts and processes task messages, like
+ * [[SubmitTask]], [[CancelTask]], [[FailTask]].
+ * If the TaskManager disconnects from the JobManager (because the JobManager is no longer
+ * reachable), the TaskManager gets back to the "waiting to be registered" state.
*
- * - Upon successful registration, the JobManager replies with an [[AcknowledgeRegistration]]
- * message. This stops the registration messages and initializes all fields
- * that require the JobManager's actor reference
*
- * - [[SubmitTask]] is sent from the JobManager and contains the next Task to be executed on this
- * TaskManager
+ * ========== Failure model of the TaskManager ==========
*
- * - [[CancelTask]] requests to cancel the corresponding task
+ * The TaskManager tries to compensate for task failures as far as possible by marking
+ * the task as failed and removing all its resources. This causes the JobManager to
+ * restart the task (on this same TaskManager or on a different TaskManager).
*
- * - [[FailTask]] requests to fail the corresponding task
+ * In certain cases, exceptions indicate that the TaskManager is unable to proceed.
+ * The most robust way to clean up is letting the OS/kernel do it, so we will trigger
+ * killing the process. In case of YARN (or resilient standalone mode), the process
+ * will be restarted, producing a clean state.
+ * To achieve this, we kill the TaskManager actor. The watch dog actor (process reaper)
+ * will recognize that and kill the TaskManager process.
*
- * - ...
+ * Fatal errors that require TaskManager JVM restart include:
+ *
+ * - Errors bringing up the Network Stack or Library Cache after the TaskManager
+ * has registered at the JobManager. The TaskManager cannot operate without.
+ *
+ * - Exceptions while releasing the task resources from the network stack, intermediate
+ * results, or memory manager. Those situations indicate a critical leak in the
+ * resource management, which can only be reliably fixed through a JVM restart.
+ *
+ * - Exceptions releasing intermediate result resources. Critical resource leak,
+ * requires a clean JVM.
*/
-class TaskManager(val connectionInfo: InstanceConnectionInfo,
- val jobManagerAkkaURL: String,
- val taskManagerConfig: TaskManagerConfiguration,
- val networkConfig: NetworkEnvironmentConfiguration)
- extends Actor with ActorLogMessages with ActorLogging {
-
- import context._
- import taskManagerConfig.{timeout => tmTimeout, _}
-
-
-
- implicit val timeout = tmTimeout
-
- log.info("Starting task manager at {}.", self.path)
- log.info("Creating {} task slot(s).", numberOfSlots)
- log.info("TaskManager connection information: {}", connectionInfo)
-
- val HEARTBEAT_INTERVAL = 5000 millisecond
-
- var registrationDelay = 50 milliseconds
- var registrationDuration = 0 seconds
- var registrationAttempts: Int = 0
-
- val ioManager = new IOManagerAsync(tmpDirPaths)
- val memoryManager = new DefaultMemoryManager(memorySize, numberOfSlots, pageSize)
- val bcVarManager = new BroadcastVariableManager()
- val hardwareDescription = HardwareDescription.extractFromSystem(memoryManager.getMemorySize)
- val fileCache = new FileCache()
- val runningTasks = scala.collection.mutable.HashMap[ExecutionAttemptID, Task]()
- val metricRegistry = new MetricRegistry
- // register metrics
- metricRegistry.register("gc", new GarbageCollectorMetricSet)
- metricRegistry.register("memory", new MemoryUsageGaugeSet)
- metricRegistry.register("load", new Gauge[Double] {
- override def getValue: Double =
- ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage()
- })
- // register metric serialization
- val metricRegistryMapper: ObjectMapper =
- new ObjectMapper().registerModule(new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS,
- false, MetricFilter.ALL))
-
- // Actors which want to be notified once this task manager has been registered at the job manager
- val waitForRegistration = scala.collection.mutable.Set[ActorRef]()
-
- val profiler = profilingInterval match {
- case Some(interval) =>
- log.info("Profiling of jobs is enabled.")
- Some(TaskManager.startProfiler(self.path.toSerializationFormat, interval, context.system))
- case None =>
- log.info("Profiling of jobs is disabled.")
- None
- }
+class TaskManager(protected val config: TaskManagerConfiguration,
+ protected val connectionInfo: InstanceConnectionInfo,
+ protected val jobManagerAkkaURL: String,
+ protected val memoryManager: MemoryManager,
+ protected val ioManager: IOManager,
+ protected val network: NetworkEnvironment,
+ protected val numberOfSlots: Int)
- if (log.isInfoEnabled) {
- log.info(TaskManager.getMemoryUsageStatsAsString(ManagementFactory.getMemoryMXBean))
- }
+extends Actor with ActorLogMessages with ActorLogging {
+
+ /** The log for all synchronous logging calls */
+ private val LOG = TaskManager.LOG
+
+ /** The timeout for all actor ask futures */
+ protected val askTimeout = new Timeout(config.timeout)
+
+ /** The TaskManager's physical execution resources */
+ protected val resources = HardwareDescription.extractFromSystem(memoryManager.getMemorySize)
+
+ /** Registry of all tasks currently executed by this TaskManager */
+ protected val runningTasks = scala.collection.mutable.HashMap[ExecutionAttemptID, Task]()
+
+ /** Handler for shared broadcast variables (shared between multiple Tasks) */
+ protected val bcVarManager = new BroadcastVariableManager()
+
+ /** Handler for distributed files cached by this TaskManager */
+ protected val fileCache = new FileCache()
+
+ /** Registry of metrics periodically transmitted to the JobManager */
+ private val metricRegistry = TaskManager.createMetricsRegistry()
+
+ /** Metric serialization */
+ private val metricRegistryMapper: ObjectMapper = new ObjectMapper()
+ .registerModule(new MetricsModule(TimeUnit.SECONDS,
+ TimeUnit.MILLISECONDS,
+ false,
+ MetricFilter.ALL))
+
+ /** Actors which want to be notified once this task manager has been
+ registered at the job manager */
+ private val waitForRegistration = scala.collection.mutable.Set[ActorRef]()
+
+ private var blobService: Option[BlobService] = None
+ private var libraryCacheManager: Option[LibraryCacheManager] = None
+ private var currentJobManager: Option[ActorRef] = None
+
+ private var instanceID: InstanceID = null
- var libraryCacheManager: Option[LibraryCacheManager] = None
- var networkEnvironment: Option[NetworkEnvironment] = None
- var currentJobManager: Option[ActorRef] = None
- var profilerListener: Option[ActorRef] = None
- var instanceID: InstanceID = null
- var heartbeatScheduler: Option[Cancellable] = None
+ private var heartbeatScheduler: Option[Cancellable] = None
+ // --------------------------------------------------------------------------
+ // Actor messages and life cycle
+ // --------------------------------------------------------------------------
+
+ /**
+ * Called prior to the actor receiving any messages.
+ * Logs some context info and triggers the initial attempt to register at the
+ * JobManager.
+ */
override def preStart(): Unit = {
- tryJobManagerRegistration()
+ LOG.info("Starting TaskManager actor at {}.", self.path.toSerializationFormat)
+ LOG.info("TaskManager data connection information: {}", connectionInfo)
+ LOG.info("TaskManager has {} task slot(s).", numberOfSlots)
+
+ // log the initial memory utilization
+ if (LOG.isInfoEnabled) {
+ LOG.info(TaskManager.getMemoryUsageStatsAsString(ManagementFactory.getMemoryMXBean))
+ }
+
+ // kick off the registration
+ val deadline: Option[Deadline] = config.maxRegistrationDuration.map(_.fromNow)
+
+ self.tell(TriggerTaskManagerRegistration(jobManagerAkkaURL,
+ TaskManager.INITIAL_REGISTRATION_TIMEOUT, deadline,1), ActorRef.noSender)
}
+ /**
+ * Called after the actor is stopped.
+ * Makes sure all currently running tasks are cancelled and all components
+ * (like network stack, library cache, memory manager, ...) are properly shut down.
+ */
override def postStop(): Unit = {
- log.info("Stopping task manager {}.", self.path)
+ LOG.info("Stopping TaskManager {}.", self.path.toSerializationFormat)
- currentJobManager foreach {
- _ ! Disconnect(s"TaskManager ${self.path} is shutting down.")
+ cancelAndClearEverything(new Exception("TaskManager is shutting down."))
+
+ if (isConnected) {
+ try {
+ disassociateFromJobManager()
+ } catch {
+ case t: Exception => LOG.error("Could not cleanly disassociate from JobManager", t)
+ }
}
- cancelAndClearEverything(new Exception("Task Manager is shutting down."))
+ try {
+ ioManager.shutdown()
+ } catch {
+ case t: Exception => LOG.error("I/O manager did not shutdown properly.", t)
+ }
- cleanupTaskManager()
+ try {
+ memoryManager.shutdown()
+ } catch {
+ case t: Exception => LOG.error("Memory manager did not shutdown properly.", t)
+ }
- ioManager.shutdown()
- memoryManager.shutdown()
+ try {
+ network.shutdown()
+ } catch {
+ case t: Exception => LOG.error("Network environment did not shutdown properly.", t)
+ }
try {
fileCache.shutdown()
} catch {
- case t: Throwable => log.error(t, "FileCache did not shutdown properly.")
+ case t: Exception => LOG.error("FileCache did not shutdown properly.", t)
}
- if(log.isDebugEnabled){
- log.debug("Task manager {} is completely stopped.", self.path)
- }
- }
-
- private def tryJobManagerRegistration(): Unit = {
- context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager)
+ LOG.info("Task manager {} is completely shut down.", self.path)
}
+ /**
+ * Central handling of actor messages. This method delegates to the more specialized
+ * methods for handling certain classes of messages.
+ */
override def receiveWithLogMessages: Receive = {
- case RegisterAtJobManager =>
- if(currentJobManager.isEmpty) {
- registrationDuration += registrationDelay
- // double delay for exponential backoff
- registrationDelay *= 2
- registrationAttempts += 1
-
- if (registrationDuration > maxRegistrationDuration) {
- log.error("TaskManager could not register at JobManager {} after {}.",
- jobManagerAkkaURL,
- maxRegistrationDuration)
- self ! PoisonPill
- } else {
- log.info("Try to register at master {}. {}. Attempt", jobManagerAkkaURL,
- registrationAttempts)
- val jobManager = context.actorSelection(jobManagerAkkaURL)
+ // task messages are most common and critical, we handle them first
+ case message: TaskMessage => handleTaskMessage(message)
- jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots)
+ // messages for coordinating checkpoints
+ case message: CheckpointingMessage => handleCheckpointingMessage(message)
- context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager)
- }
- }
+ // registration messages for connecting and disconnecting from / to the JobManager
+ case message: RegistrationMessage => handleRegistrationMessage(message)
- case AcknowledgeRegistration(id, blobPort, profilerListener) =>
- if(currentJobManager.isEmpty) {
- finishRegistration(sender, id, blobPort, profilerListener)
- } else {
- log.info("The TaskManager {} is already registered at the JobManager {}, but received " +
- "another AcknowledgeRegistration message.", self.path, sender.path)
- }
+ // ----- miscellaneous messages ----
+
+ // periodic heart beats that transport metrics
+ case SendHeartbeat => sendHeartbeatToJobManager()
- case AlreadyRegistered(id, blobPort, profilerListener) =>
- if(currentJobManager.isEmpty) {
- log.warning("The TaskManager {} seems to be already registered at the JobManager {} even" +
- "though it has not yet finished the registration process.", self.path, sender.path)
+ // sends the stack trace of this TaskManager to the sender
+ case SendStackTrace => sendStackTrace(sender())
- finishRegistration(sender, id, blobPort, profilerListener)
+ // registers the message sender to be notified once this TaskManager has completed
+ // its registration at the JobManager
+ case NotifyWhenRegisteredAtJobManager =>
+ if (isConnected) {
+ sender ! RegisteredAtJobManager
} else {
- // ignore AlreadyRegistered messages which arrived after AcknowledgeRegistration
- log.info("The TaskManager {} has already been registered at the JobManager {}.",
- self.path, sender.path)
+ waitForRegistration += sender
}
- case RefuseRegistration(reason) =>
- if(currentJobManager.isEmpty) {
- log.error("The registration of task manager {} was refused by the job manager {} " +
- "because {}.", self.path, jobManagerAkkaURL, reason)
-
- // Shut task manager down
- self ! PoisonPill
- } else {
- // ignore RefuseRegistration messages which arrived after AcknowledgeRegistration
- log.info("Received RefuseRegistration from the JobManager even though being already " +
- "registered")
+ // this message indicates that some actor watched by this TaskManager has died
+ case Terminated(actor: ActorRef) =>
+ if (isConnected && actor == currentJobManager.orNull) {
+ handleJobManagerDisconnect(sender(), "JobManager is no longer reachable")
+ }
+ else {
+ LOG.warn("Received unrecognized disconnect message from {}",
+ if (actor == null) null else actor.path)
}
- case SubmitTask(tdd) =>
- submitTask(tdd)
+ case Disconnect(msg) =>
+ handleJobManagerDisconnect(sender(), "JobManager requested disconnect: " + msg)
+ }
- case updateMsg:UpdateTask =>
- updateMsg match {
- case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) =>
- updateTask(executionID, List((resultID, partitionInfo)))
- case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) =>
- updateTask(executionID, partitionInfos)
- }
+ /**
+ * Handle unmatched messages with an exception.
+ */
+ override def unhandled(message: Any): Unit = {
+ val errorMessage = "Received unknown message " + message
+ val error = new RuntimeException(errorMessage)
+ LOG.error(errorMessage)
- case CancelTask(executionID) =>
- runningTasks.get(executionID) match {
- case Some(task) =>
- // execute cancel operation concurrently
- Future {
- task.cancelExecution()
- }.onFailure{
- case t: Throwable => log.error(t, "Could not cancel task {}.", task)
- }
+ // terminate all we are currently running (with a dedicated message)
+ // before the actor is stopped
+ cancelAndClearEverything(error)
- sender ! new TaskOperationResult(executionID, true)
- case None =>
- sender ! new TaskOperationResult(executionID, false,
- "No task with that execution ID was found.")
- }
+ // let the actor crash
+ throw error
+ }
- case UnregisterTask(executionID) =>
- unregisterTaskAndNotifyFinalState(executionID)
+ /**
+ * Handler for messages concerning the deployment and status updates of
+ * tasks.
+ *
+ * @param message The task message.
+ */
+ private def handleTaskMessage(message: TaskMessage): Unit = {
- case updateMsg:UpdateTaskExecutionState =>
- currentJobManager foreach {
- jobManager => {
- val futureResponse = (jobManager ? updateMsg)(timeout)
-
- val jobID = updateMsg.taskExecutionState.getJobID
- val executionID = updateMsg.taskExecutionState.getID
- val executionState = updateMsg.taskExecutionState.getExecutionState
-
- futureResponse.mapTo[Boolean].onComplete {
- case Success(result) =>
- if (!result) {
- self ! FailTask(executionID,
- new IllegalStateException("Task has been disposed on JobManager."))
- }
+ // at very first, check that we are actually currently associated with a JobManager
+ if (!isConnected) {
+ LOG.debug("Dropping message {} because the TaskManager is currently " +
+ "not connected to a JobManager", message)
+ }
+
+ // we order the messages by frequency, to make sure the code paths for matching
+ // are as short as possible
+ message match {
+
+ // tell the task about the availability of a new input partition
+ case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) =>
+ updateTaskInputPartitions(executionID, List((resultID, partitionInfo)))
+
+ // tell the task about the availability of some new input partitions
+ case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) =>
+ updateTaskInputPartitions(executionID, partitionInfos)
+
+ // discards intermediate result partitions of a task execution on this TaskManager
+ case FailIntermediateResultPartitions(executionID) =>
+ LOG.info("Discarding the results produced by task execution " + executionID)
+ if (network.isAssociated) {
+ try {
+ network.getPartitionManager.releasePartitionsProducedBy(executionID)
+ } catch {
+ case t: Throwable => killTaskManagerFatal(
+ "Fatal leak: Unable to release intermediate result partition data", t)
+ }
+ }
+
+ // notifies the TaskManager that the state of a task has changed.
+ // the TaskManager informs the JobManager and cleans up in case the transition
+ // was into a terminal state, or in case the JobManager cannot be informed of the
+ // state transition
+
+ case updateMsg @ UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) =>
+ currentJobManager foreach {
+ jobManager => {
+ val futureResponse = (jobManager ? updateMsg)(askTimeout)
+
+ val executionID = taskExecutionState.getID
+ val executionState = taskExecutionState.getExecutionState
+
+ futureResponse.mapTo[Boolean].onComplete {
+ // IMPORTANT: In the future callback, we cannot directly modify state
+ // but only send messages to the TaskManager to do those changes
+ case Success(result) =>
+ if (!result) {
+ self ! FailTask(executionID,
+ new Exception("Task has been cancelled on the JobManager."))
+ }
+
+ if (!result || executionState.isTerminal) {
+ self ! UnregisterTask(executionID)
+ }
+ case Failure(t) =>
+ self ! FailTask(executionID, new Exception(
+ "Failed to send ExecutionStateChange notification to JobManager"))
- if (!result || executionState == ExecutionState.FINISHED || executionState ==
- ExecutionState.CANCELED || executionState == ExecutionState.FAILED) {
self ! UnregisterTask(executionID)
- }
- case Failure(t) =>
- log.error(t, "Execution state change notification failed for task {}" +
- s"of job {}.", executionID, jobID)
- self ! UnregisterTask(executionID)
+ }(context.dispatcher)
}
}
- }
- case SendHeartbeat =>
- var report: Array[Byte] = null
- try {
- report = metricRegistryMapper.writeValueAsBytes(metricRegistry)
- } catch {
- case all: Throwable => log.warning("Error turning the report into JSON", all)
- }
+ // removes the task from the TaskManager and frees all its resources
+ case UnregisterTask(executionID) =>
+ unregisterTaskAndNotifyFinalState(executionID)
+
+ // starts a new task on the TaskManager
+ case SubmitTask(tdd) =>
+ submitTask(tdd)
+
+ // marks a task as failed for an external reason
+ // external reasons are reasons other than the task code itself throwing an exception
+ case FailTask(executionID, cause) =>
+ runningTasks.get(executionID) match {
+ case Some(task) =>
+
+ // execute failing operation concurrently
+ implicit val executor = context.dispatcher
+ Future {
+ task.failExternally(cause)
+ }.onFailure{
+ case t: Throwable => LOG.error(s"Could not fail task ${task} externally.", t)
+ }
+ case None =>
+ }
- currentJobManager foreach {
- _ ! Heartbeat(instanceID, report)
- }
+ // cancels a task
+ case CancelTask(executionID) =>
+ runningTasks.get(executionID) match {
+ case Some(task) =>
+ // execute cancel operation concurrently
+ implicit val executor = context.dispatcher
+ Future {
+ task.cancelExecution()
+ }.onFailure{
+ case t: Throwable => LOG.error("Could not cancel task " + task, t)
+ }
- case SendStackTrace =>
- val traces = Thread.getAllStackTraces.asScala
- val stackTraceStr = traces.map((trace: (Thread, Array[StackTraceElement])) => {
- val (thread, elements) = trace
- "Thread: " + thread.getName + '\n' + elements.mkString("\n")
- }).mkString("\n\n")
+ sender ! new TaskOperationResult(executionID, true)
- sender ! StackTrace(instanceID, stackTraceStr)
+ case None =>
+ sender ! new TaskOperationResult(executionID, false,
+ "No task with that execution ID was found.")
+ }
+ }
+ }
- case NotifyWhenRegisteredAtJobManager =>
- if (currentJobManager.isDefined) {
- sender ! RegisteredAtJobManager
- } else {
- waitForRegistration += sender
- }
+ /**
+ * Handler for messages related to checkpoints.
+ *
+ * @param message The checkpoint message.
+ */
+ private def handleCheckpointingMessage(message: CheckpointingMessage): Unit = {
- case FailTask(executionID, cause) =>
- runningTasks.get(executionID) match {
- case Some(task) =>
- // execute failing operation concurrently
- Future {
- task.failExternally(cause)
- }.onFailure{
- case t: Throwable => log.error(t, "Could not fail task {} externally.", task)
- }
- case None =>
- }
+ message match {
- case Terminated(jobManager) =>
- log.info("Job manager {} is no longer reachable. Cancelling all tasks and trying to " +
- "reregister.", jobManagerAkkaURL)
+ case BarrierReq(attemptID, checkpointID) =>
+ LOG.debug("[FT-TaskManager] Barrier {} request received for attempt {}",
+ checkpointID, attemptID)
- cancelAndClearEverything(new Throwable("Lost connection to JobManager"))
+ runningTasks.get(attemptID) match {
+ case Some(i) =>
+ if (i.getExecutionState == ExecutionState.RUNNING) {
+ i.getEnvironment.getInvokable match {
+ case barrierTransceiver: BarrierTransceiver =>
+ new Thread(new Runnable {
+ override def run(): Unit =
+ barrierTransceiver.broadcastBarrierFromSource(checkpointID)
+ }).start()
+
+ case _ => LOG.error(
+ "Taskmanager received a checkpoint request for non-checkpointing task {}",
+ attemptID)
+ }
+ }
- cleanupTaskManager()
+ case None =>
+ // may always happen in case of canceled/finished tasks
+ LOG.debug("Taskmanager received a checkpoint request for unknown task {}",
+ attemptID)
+ }
- tryJobManagerRegistration()
+ // unknown checkpoint message
+ case _ => unhandled(message)
+ }
+ }
- case Disconnect(msg) =>
- log.info("Job manager {} wants {} to disconnect. Reason {}.", jobManagerAkkaURL,
- self.path, msg)
+ /**
+ * Handler for messages concerning the registration of the TaskManager at
+ * the JobManager.
+ *
+ * Errors must not propagate out of the handler, but need to be handled internally.
+ *
+ * @param message The registration message.
+ */
+ private def handleRegistrationMessage(message: RegistrationMessage): Unit = {
- cancelAndClearEverything(new Throwable("Job manager wants me to disconnect."))
+ message match {
- cleanupTaskManager()
+ case TriggerTaskManagerRegistration(jobManagerURL, timeout, deadline, attempt) =>
+ if (isConnected) {
+ // this may be the case, if we queue another attempt and
+ // in the meantime, the registration is acknowledged
+ LOG.debug(
+ "TaskManager was triggered to register at JobManager, but is already registered")
+ }
+ else if (deadline.exists(_.isOverdue())) {
+ // we failed to register in time. that means we should quit
+ LOG.error("Failed to register at the JobManager withing the defined maximum " +
+ "connect time. Shutting down ...")
- tryJobManagerRegistration()
+ // terminate ourselves (hasta la vista)
+ self ! PoisonPill
+ }
+ else {
+ LOG.info(s"Trying to register at JobManager ${jobManagerURL} " +
+ s"(attempt ${attempt}, timeout: ${timeout})")
- case FailIntermediateResultPartitions(executionID) =>
- log.info("Fail intermediate result partitions associated with execution {}.", executionID)
- networkEnvironment foreach {
- _.getPartitionManager.releasePartitionsProducedBy(executionID)
- }
+ val jobManager = context.actorSelection(jobManagerAkkaURL)
+ jobManager ! RegisterTaskManager(self, connectionInfo, resources, numberOfSlots)
- case BarrierReq(attemptID, checkpointID) =>
- log.debug("[FT-TaskManager] Barrier {} request received for attempt {}",
- checkpointID, attemptID)
- runningTasks.get(attemptID) match {
- case Some(i) =>
- if (i.getExecutionState == ExecutionState.RUNNING) {
- i.getEnvironment.getInvokable match {
- case barrierTransceiver: BarrierTransceiver =>
- new Thread(new Runnable {
- override def run(): Unit =
- barrierTransceiver.broadcastBarrierFromSource(checkpointID);
- }).start()
- case _ => log.error("[FT-TaskManager] Received a barrier for the wrong vertex")
+ // the next timeout computes via exponential backoff with cap
+ val nextTimeout = (timeout * 2).min(TaskManager.MAX_REGISTRATION_TIMEOUT)
+
+ // schedule (with our timeout s delay) a check triggers a new registration
+ // attempt, if we are not registered by then
+ context.system.scheduler.scheduleOnce(timeout) {
+ if (!isConnected) {
+ self.tell(TriggerTaskManagerRegistration(jobManagerURL,
+ nextTimeout, deadline, attempt + 1), ActorRef.noSender)
}
+ }(context.dispatcher)
+ }
+
+ // successful registration. associate with the JobManager
+ // we disambiguate duplicate or erroneous messages, to simplify debugging
+ case AcknowledgeRegistration(jobManager, id, blobPort) =>
+ if (isConnected) {
+ if (jobManager == currentJobManager.orNull) {
+ LOG.debug("Ignoring duplicate registration acknowledgement.")
+ } else {
+ LOG.warn(s"Ignoring 'AcknowledgeRegistration' message from ${jobManager.path} , " +
+ s"because the TaskManager is already registered at ${currentJobManager.orNull}")
+ }
+ }
+ else {
+ // not yet connected, so let's associate with that JobManager
+ try {
+ associateWithJobManager(jobManager, id, blobPort)
+ } catch {
+ case t: Throwable =>
+ killTaskManagerFatal(
+ "Unable to start TaskManager components after registering at JobManager", t)
+ }
+ }
+
+ // we are already registered at that specific JobManager - duplicate answer, rare cases
+ case AlreadyRegistered(jobManager, id, blobPort) =>
+ if (isConnected) {
+ if (jobManager == currentJobManager.orNull) {
+ LOG.debug("Ignoring duplicate registration acknowledgement.")
+ } else {
+ LOG.warn(s"Received 'AlreadyRegistered' message from JobManager ${jobManager.path}, " +
+ s"even through TaskManager is currently registered at ${currentJobManager.orNull}")
}
- case None => log.error("[FT-TaskManager] Received a barrier for an unknown vertex")
+ }
+ else {
+ // not connected, yet, to let's associate
+ LOG.info("Received 'AlreadyRegistered' message before 'AcknowledgeRegistration'")
+
+ try {
+ associateWithJobManager(jobManager, id, blobPort)
+ } catch {
+ case t: Throwable =>
+ killTaskManagerFatal(
+ "Unable to start TaskManager components after registering at JobManager", t)
+ }
+ }
+
+ case RefuseRegistration(reason) =>
+ if (currentJobManager.isEmpty) {
+ LOG.error(s"The registration at JobManager ${jobManagerAkkaURL} was refused, " +
+ s"because: ${reason}. Retrying later...")
+
+ // try the registration again after some time
+
+ val delay: FiniteDuration = TaskManager.DELAY_AFTER_REFUSED_REGISTRATION
+ val deadline: Option[Deadline] = config.maxRegistrationDuration.map {
+ timeout => timeout + delay fromNow
+ }
+
+ context.system.scheduler.scheduleOnce(delay) {
+ self.tell(TriggerTaskManagerRegistration(jobManagerAkkaURL,
+ TaskManager.INITIAL_REGISTRATION_TIMEOUT, deadline, 1), ActorRef.noSender)
+ }(context.dispatcher)
+ }
+ else {
+ // ignore RefuseRegistration messages which arrived after AcknowledgeRegistration
+ if (sender() == currentJobManager.orNull) {
+ LOG.warn(s"Received 'RefuseRegistration' from the JobManager (${sender().path})" +
+ s" even though this TaskManager is already registered there.")
+ }
+ else {
+ LOG.warn(s"Ignoring 'RefuseRegistration' from unrelated JobManager (${sender().path})")
+ }
+ }
+
+ case _ => unhandled(message)
+ }
+ }
+
+
+ // --------------------------------------------------------------------------
+ // Task Manager / JobManager association and initialization
+ // --------------------------------------------------------------------------
+
+ /**
+ * Checks whether the TaskManager is currently connected to its JobManager.
+ *
+ * @return True, if the TaskManager is currently connected to a JobManager, false otherwise.
+ */
+ private def isConnected : Boolean = currentJobManager.isDefined
+
+ /**
+ * Associates the TaskManager with the given JobManager. After this
+ * method has completed, the TaskManager is ready to receive work
+ * from the given JobManager.
+ *
+ * @param jobManager The JobManager to associate with.
+ * @param id The instanceID under which the TaskManager is registered
+ * at the JobManager.
+ * @param blobPort The JobManager's port for the BLOB server.
+ */
+ private def associateWithJobManager(jobManager: ActorRef,
+ id: InstanceID,
+ blobPort: Int): Unit = {
+
+ // sanity check that we are not currently registered with a different JobManager
+ if (isConnected) {
+ if (currentJobManager.get == jobManager) {
+ LOG.warn("Received call to finish registration with JobManager " +
+ jobManager.path + " even though TaskManager is already registered.")
+ return
+ }
+ else {
+ throw new IllegalStateException("Attempt to register with JobManager " +
+ jobManager.path + " even though TaskManager is currently registered with JobManager " +
+ currentJobManager.get.path)
}
+ }
+
+ // not yet associated, so associate
+ LOG.info("Successful registration at JobManager ({}), " +
+ "starting network stack and library cache.", jobManager.path)
+
+ // sanity check that the JobManager dependent components are not set up currently
+ if (network.isAssociated || blobService.isDefined) {
+ throw new IllegalStateException("JobManager-specific components are already initialized.")
+ }
+
+ // start the network stack, now that we have the JobManager actor reference
+ try {
+ network.associateWithTaskManagerAndJobManager(jobManager, self)
+ }
+ catch {
+ case e: Exception =>
+ val message = "Could not start network environment."
+ LOG.error(message, e)
+ throw new RuntimeException(message, e)
+ }
+
+ // start a blob service, if a blob server is specified
+ if (blobPort > 0) {
+ val address = new InetSocketAddress(
+ currentJobManager.flatMap(_.path.address.host).getOrElse("localhost"),
+ blobPort)
+
+ LOG.info("Determined BLOB server address to be {}. Starting BLOB cache.", address)
+
+ try {
+ val blobcache = new BlobCache(address, config.configuration)
+ blobService = Option(blobcache)
+ libraryCacheManager = Some(new BlobLibraryCacheManager(blobcache, config.cleanupInterval))
+ }
+ catch {
+ case e: Exception =>
+ val message = "Could not create BLOB cache or library cache."
+ LOG.error(message, e)
+ throw new RuntimeException(message, e)
+ }
+ }
+ else {
+ libraryCacheManager = Some(new FallbackLibraryCacheManager)
+ }
+
+ currentJobManager = Some(jobManager)
+ instanceID = id
+
+ // watch job manager to detect when it dies
+ context.watch(jobManager)
+
+ // schedule regular heartbeat message for oneself
+ heartbeatScheduler = Some(context.system.scheduler.schedule(
+ TaskManager.HEARTBEAT_INTERVAL, TaskManager.HEARTBEAT_INTERVAL, self, SendHeartbeat)
+ (context.dispatcher))
+
+ // notify all the actors that listen for a successful registration
+ for (listener <- waitForRegistration) {
+ listener ! RegisteredAtJobManager
+ }
+ waitForRegistration.clear()
}
/**
- * Handle unmatched messages with an exception.
+ * Disassociates the TaskManager from the JobManager. This cleans
+ * removes all tasks currently running, discards all intermediate
+ * results and all cached libraries.
*/
- override def unhandled(message: Any): Unit = {
- // let the actor crash
- throw new RuntimeException("Received unknown message " + message)
+ private def disassociateFromJobManager(): Unit = {
+ if (!isConnected) {
+ LOG.warn("TaskManager received message to disassociate from JobManager, even though " +
+ "it is not currently associated with a JobManager")
+ return
+ }
+
+ LOG.info("Disassociating from JobManager")
+
+ // stop the periodic heartbeats
+ heartbeatScheduler foreach {
+ _.cancel()
+ }
+ heartbeatScheduler = None
+
+ // stop the monitoring of the JobManager
+ currentJobManager foreach {
+ jm => context.unwatch(jm)
+ }
+
+ // de-register from the JobManager (faster detection of disconnect)
+ currentJobManager foreach {
+ _ ! Disconnect(s"TaskManager ${self.path} is shutting down.")
+ }
+
+ currentJobManager = None
+ instanceID = null
+
+ // shut down BLOB and library cache
+ libraryCacheManager foreach {
+ manager => manager.shutdown()
+ }
+ libraryCacheManager = None
+
+ blobService foreach {
+ service => service.shutdown()
+ }
+ blobService = None
+
+ // disassociate the network environment
+ network.disassociate()
}
+ private def handleJobManagerDisconnect(jobManager: ActorRef, msg: String): Unit = {
+ if (isConnected && jobManager != null) {
+
+ // check if it comes from our JobManager
+ if (jobManager == currentJobManager.orNull) {
+ try {
+ val message = "Disconnecting from JobManager: " + msg
+ LOG.info(message)
+
+ // cancel all our tasks with a proper error message
+ cancelAndClearEverything(new Exception(message))
+
+ // reset our state to disassociated
+ disassociateFromJobManager()
+
+ // begin attempts to reconnect
+ val deadline: Option[Deadline] = config.maxRegistrationDuration.map(_.fromNow)
+ self ! TriggerTaskManagerRegistration(jobManagerAkkaURL,
+ TaskManager.INITIAL_REGISTRATION_TIMEOUT, deadline, 1)
+ }
+ catch {
+ // this is pretty bad, it leaves the TaskManager in a state where it cannot
+ // cleanly reconnect
+ case t: Throwable =>
+ killTaskManagerFatal("Failed to disassociate from the JobManager", t)
+ }
+ }
+ else {
+ LOG.warn("Received erroneous JobManager disconnect message for {}", jobManager.path)
+ }
+ }
+ }
+
+
+ // --------------------------------------------------------------------------
+ // Task Operations
+ // --------------------------------------------------------------------------
+
/**
* Receives a [[TaskDeploymentDescriptor]] describing the task to be executed. Sets up a
* [[RuntimeEnvironment]] for the task and starts its execution in a separate thread.
@@ -412,21 +782,31 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
val executionID = tdd.getExecutionId
val taskIndex = tdd.getIndexInSubtaskGroup
val numSubtasks = tdd.getNumberOfSubtasks
+ val slot = tdd.getTargetSlotNumber
var startRegisteringTask = 0L
var task: Task = null
+ // all operations are in a try / catch block to make sure we send a result upon any failure
try {
+ // check that we are already registered
+ if (!isConnected) {
+ throw new IllegalStateException("TaskManager is not associated with a JobManager")
+ }
+ if (slot < 0 || slot >= numberOfSlots) {
+ throw new Exception(s"Target slot ${slot} does not exist on TaskManager.")
+ }
+
val userCodeClassLoader = libraryCacheManager match {
case Some(manager) =>
- if (log.isDebugEnabled) {
+ if (LOG.isDebugEnabled) {
startRegisteringTask = System.currentTimeMillis()
}
// triggers the download of all missing jar files from the job manager
manager.registerTask(jobID, executionID, tdd.getRequiredJarFiles)
- if (log.isDebugEnabled) {
- log.debug("Register task {} at library cache manager took {}s", executionID,
+ if (LOG.isDebugEnabled) {
+ LOG.debug("Register task {} at library cache manager took {}s", executionID,
(System.currentTimeMillis() - startRegisteringTask) / 1000.0)
}
@@ -448,266 +828,135 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
}
val env = currentJobManager match {
- case Some(jobManager) =>
- val splitProvider = new TaskInputSplitProvider(jobManager, jobID, vertexID,
- executionID, userCodeClassLoader, timeout)
-
- new RuntimeEnvironment(jobManager, task, tdd, userCodeClassLoader,
- memoryManager, ioManager, splitProvider, bcVarManager, networkEnvironment.get)
-
- case None => throw new IllegalStateException("TaskManager has not yet been registered at " +
- "a JobManager.")
- }
-
- task.setEnvironment(env)
-
- //inject operator state
- if(tdd.getOperatorStates != null)
- {
- val vertex = task.getEnvironment.getInvokable match {
- case opStateCarrier: OperatorStateCarrier =>
- opStateCarrier.injectState(tdd.getOperatorStates)
- }
- }
-
- // register the task with the network stack and profiles
- networkEnvironment match {
- case Some(ne) =>
- log.info("Register task {} on {}.", task, connectionInfo)
- ne.registerTask(task)
- case None => throw new RuntimeException(
- "Network environment has not been properly instantiated.")
- }
-
- val jobConfig = tdd.getJobConfiguration
-
- if (jobConfig.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true)) {
- profiler match {
- case Some(profilerActorRef) => profilerActorRef ! MonitorTask(task)
- case None => // no log message here - floods the log
- }
- }
-
- val cpTasks = new util.HashMap[String, FutureTask[Path]]()
-
- for (entry <- DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration).asScala) {
- val cp = fileCache.createTmpFile(entry.getKey, entry.getValue, jobID)
- cpTasks.put(entry.getKey, cp)
- }
- env.addCopyTasksForCacheFile(cpTasks)
-
- if (!task.startExecution()) {
- throw new RuntimeException("Cannot start task. Task was canceled or failed.")
- }
-
- sender ! TaskOperationResult(executionID, success = true)
- } catch {
- case t: Throwable =>
- val message = if (t.isInstanceOf[CancelTaskException]) {
- "Task was canceled"
- } else {
- log.error(t, "Could not instantiate task with execution ID {}.", executionID)
- ExceptionUtils.stringifyException(t)
- }
-
- try {
- if (task != null) {
- task.failExternally(t)
- removeAllTaskResources(task)
- }
-
- libraryCacheManager foreach { _.unregisterTask(jobID, executionID) }
- } catch {
- case t: Throwable => log.error(t, "Error during cleanup of task deployment.")
- }
-
- sender ! new TaskOperationResult(executionID, false, message)
- }
- }
-
- private def cleanupTaskManager(): Unit = {
- currentJobManager foreach {
- context.unwatch(_)
- }
-
- networkEnvironment foreach {
- ne =>
- try {
- ne.shutdown()
- } catch {
- case t: Throwable => log.error(t, "ChannelManager did not shutdown properly.")
- }
- }
-
- networkEnvironment = None
-
- libraryCacheManager foreach {
- manager =>
- try {
- manager.shutdown()
- } catch {
- case t: Throwable => log.error(t, "Could not shut down the library cache manager.")
- }
- }
-
- libraryCacheManager = None
-
- heartbeatScheduler foreach {
- _.cancel()
- }
-
- heartbeatScheduler = None
-
- profilerListener foreach {
- listener =>
- profiler foreach {
- _.tell(UnregisterProfilingListener, listener)
- }
- }
-
- profilerListener = None
- currentJobManager = None
- instanceID = null
- registrationAttempts = 0
- registrationDuration = 0 seconds
- }
-
- private def updateTask(
- executionId: ExecutionAttemptID,
- partitionInfos: Seq[(IntermediateDataSetID, InputChannelDeploymentDescriptor)]): Unit = {
-
- runningTasks.get(executionId) match {
- case Some(task) =>
- val errors = partitionInfos flatMap {
- case (resultID, partitionInfo) =>
- Option(task.getEnvironment.getInputGateById(resultID)) match {
- case Some(reader) =>
- Future {
- try {
- reader.updateInputChannel(partitionInfo)
- } catch {
- case t: Throwable =>
- log.error(t, "Could not update task {}. Trying to cancel task.",
- task.getTaskName)
-
- try {
- task.markFailed(t)
- } catch {
- case t: Throwable =>
- log.error(t, "Failed canceling task with execution ID {} after task" +
- "update failure.", executionId)
- }
- }
- }
- None
- case None => Some(s"No reader with ID $resultID for task $executionId was found.")
- }
- }
-
- if(errors.isEmpty) {
- sender ! Acknowledge
- } else {
- sender ! Failure(new IllegalStateException(errors.mkString("\n")))
- }
- case None =>
- log.info("Could not update task with ID {}, because it is no longer running.",
- executionId)
- sender ! Acknowledge
- }
- }
-
- private def finishRegistration(jobManager: ActorRef, id: InstanceID, blobPort: Int,
- profilerListener: Option[ActorRef]): Unit = {
- setupTaskManager(jobManager, id, blobPort, profilerListener)
+ case Some(jobManager) =>
+ val splitProvider = new TaskInputSplitProvider(jobManager, jobID, vertexID,
+ executionID, userCodeClassLoader, askTimeout)
- for (listener <- waitForRegistration) {
- listener ! RegisteredAtJobManager
- }
+ new RuntimeEnvironment(jobManager, task, tdd, userCodeClassLoader,
+ memoryManager, ioManager, splitProvider, bcVarManager, network)
- waitForRegistration.clear()
- }
+ case None => throw new IllegalStateException(
+ "TaskManager has not yet been registered at a JobManager.")
+ }
- private def setupTaskManager(jobManager: ActorRef, id: InstanceID, blobPort: Int,
- profilerListener: Option[ActorRef]): Unit = {
+ task.setEnvironment(env)
- currentJobManager = Some(jobManager)
- this.profilerListener = profilerListener
- instanceID = id
+ //inject operator state
+ if (tdd.getOperatorStates != null) {
+ task.getEnvironment.getInvokable match {
+ case opStateCarrier: OperatorStateCarrier =>
+ opStateCarrier.injectState(tdd.getOperatorStates)
+ }
+ }
+
+ // register the task with the network stack and profiles
+ LOG.info("Register task {}", task)
+ network.registerTask(task)
- // watch job manager to detect when it dies
- context.watch(jobManager)
+ val cpTasks = new util.HashMap[String, FutureTask[Path]]()
- setupNetworkEnvironment(jobManager)
- setupLibraryCacheManager(blobPort)
+ for (entry <- DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration).asScala) {
+ val cp = fileCache.createTmpFile(entry.getKey, entry.getValue, jobID)
+ cpTasks.put(entry.getKey, cp)
+ }
+ env.addCopyTasksForCacheFile(cpTasks)
- // schedule regular heartbeat message for oneself
- heartbeatScheduler = Some(context.system.scheduler.schedule(
- TaskManager.HEARTBEAT_INTERVAL, TaskManager.HEARTBEAT_INTERVAL, self, SendHeartbeat))
+ if (!task.startExecution()) {
+ throw new RuntimeException("Cannot start task. Task was canceled or failed.")
+ }
- profilerListener foreach {
- listener =>
- profiler foreach {
- _.tell(RegisterProfilingListener, listener)
- }
+ sender ! new TaskOperationResult(executionID, true)
}
- }
+ catch {
+ case t: Throwable =>
+ val message = if (t.isInstanceOf[CancelTaskException]) {
+ "Task was canceled"
+ } else {
+ LOG.error("Could not instantiate task with execution ID " + executionID, t)
+ ExceptionUtils.stringifyException(t)
+ }
- private def setupNetworkEnvironment(jobManager: ActorRef): Unit = {
- //shutdown existing network environment
- networkEnvironment foreach {
- ne =>
try {
- ne.shutdown()
+ if (task != null) {
+ task.failExternally(t)
+ removeAllTaskResources(task)
+ }
+
+ libraryCacheManager foreach { _.unregisterTask(jobID, executionID) }
} catch {
- case t: Throwable => log.error(t, "Network environment did not shutdown properly.")
+ case t: Throwable => LOG.error("Error during cleanup of task deployment.", t)
}
- }
- try {
- val env: NetworkEnvironment = new NetworkEnvironment(timeout, networkConfig)
- env.associateWithTaskManagerAndJobManager(jobManager, self)
- networkEnvironment = Some(env)
- } catch {
- case ioe: IOException =>
- log.error(ioe, "Failed to instantiate network environment.")
- throw new RuntimeException("Failed to instantiate ChannelManager.", ioe)
+ sender ! new TaskOperationResult(executionID, false, message)
}
}
- private def setupLibraryCacheManager(blobPort: Int): Unit = {
- // shutdown existing library cache manager first
- libraryCacheManager foreach {
- manager => {
- try{
- manager.shutdown()
- } catch {
- case t: Throwable => log.error(t, "Could not properly shut down LibraryCacheManager.")
- }
- }
- }
+ /**
+ * Informs a task about additional locations of its input data partitions.
+ *
+ * @param executionId The execution attempt ID of the task.
+ * @param partitionInfos The descriptor of the intermediate result partitions.
+ */
+ private def updateTaskInputPartitions(
+ executionId: ExecutionAttemptID,
+ partitionInfos: Seq[(IntermediateDataSetID, InputChannelDeploymentDescriptor)]) : Unit = {
- // Check if a blob server is specified
- if (blobPort > 0) {
+ runningTasks.get(executionId) match {
+ case Some(task) =>
- val address = new InetSocketAddress(
- currentJobManager.flatMap(_.path.address.host).getOrElse("localhost"),
- blobPort)
+ val errors: Seq[String] = partitionInfos.flatMap { info =>
- log.info("Determined BLOB server address to be {}.", address)
+ val (resultID, partitionInfo) = info
+ val reader = task.getEnvironment.getInputGateById(resultID)
- libraryCacheManager = Some(new BlobLibraryCacheManager(
- new BlobCache(address, configuration), cleanupInterval))
- } else {
- libraryCacheManager = Some(new FallbackLibraryCacheManager)
+ if (reader != null) {
+ Future {
+ try {
+ reader.updateInputChannel(partitionInfo)
+ } catch {
+ case t: Throwable =>
+ LOG.error(s"Could not update input data location for task " +
+ s"${task.getTaskName}. Trying to fail task.", t)
+
+ try {
+ task.markFailed(t)
+ }
+ catch {
+ case t: Throwable =>
+ LOG.error("Failed canceling task with execution ID " + executionId +
+ " after task update failure.", t)
+ }
+ }
+ }(context.dispatcher)
+ None
+ }
+ else {
+ Some(s"No reader with ID $resultID for task $executionId was found.")
+ }
+ }
+
+ if (errors.isEmpty) {
+ sender ! Acknowledge
+ } else {
+ sender ! Failure(new Exception(errors.mkString("\n")))
+ }
+
+ case None =>
+ LOG.debug("Discard update for input partitions of task {} : task is no longer running.",
+ executionId)
+ sender ! Acknowledge
}
}
/**
- * Removes all tasks from this TaskManager.
+ * Marks all tasks currently registered as failed (with the given
+ * cause) and removes them.
+ *
+ * @param cause The exception given to the tasks as the failure reason.
*/
private def cancelAndClearEverything(cause: Throwable) {
if (runningTasks.size > 0) {
- log.info("Cancelling all computations and discarding all cached data.")
+ LOG.info("Cancelling all computations and discarding all cached data.")
for (t <- runningTasks.values) {
t.failExternally(cause)
@@ -719,24 +968,65 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
private def unregisterTaskAndNotifyFinalState(executionID: ExecutionAttemptID): Unit = {
runningTasks.remove(executionID) match {
case Some(task) =>
- log.info("Unregister task with execution ID {}.", executionID)
+
+ // mark the task as failed if it is not yet in a final state
+ if (!task.getExecutionState.isTerminal) {
+ try {
+ task.failExternally(new Exception("Task is being removed from TaskManager"))
+ } catch {
+ case e: Exception => LOG.error("Could not properly fail task", e)
+ }
+ }
+
+ LOG.info("Unregister task with execution ID {}.", executionID)
removeAllTaskResources(task)
libraryCacheManager foreach { _.unregisterTask(task.getJobID, executionID) }
- log.info("Updating FINAL execution state of {} ({}) to {}.", task.getTaskName,
- task.getExecutionId, task.getExecutionState)
+ LOG.info("Updating FINAL execution state of {} ({}) to {}.",
+ task.getTaskName, task.getExecutionId, task.getExecutionState)
self ! UpdateTaskExecutionState(new TaskExecutionState(
task.getJobID, task.getExecutionId, task.getExecutionState, task.getFailureCause))
case None =>
- if (log.isDebugEnabled) {
- log.debug("Cannot find task with ID {} to unregister.", executionID)
- }
+ LOG.debug("Cannot find task with ID {} to unregister.", executionID)
}
}
+ /**
+ * This method cleans up the resources of a task in the distributed cache,
+ * network stack and the memory manager.
+ *
+ * If the cleanup in the network stack or memory manager fails, this is considered
+ * a fatal problem (critical resource leak) and causes the TaskManager to quit.
+ * A TaskManager JVM restart is the best safe way to fix that error.
+ *
+ * @param task The Task whose resources should be cleared.
+ */
private def removeAllTaskResources(task: Task): Unit = {
+
+ // release the critical things first, and fail fatally if it does not work
+
+ // this releases all task resources, like buffer pools and intermediate result
+ // partitions being built. If this fails, the TaskManager is in serious trouble,
+ // as this is a massive resource leak. We kill the TaskManager in that case,
+ // to recover through a clean JVM start
+ try {
+ network.unregisterTask(task)
+ } catch {
+ case t: Throwable =>
+ killTaskManagerFatal("Failed to unregister task resources from network stack", t)
+ }
+
+ // safety net to release all the task's memory
+ try {
+ task.unregisterMemoryManager(memoryManager)
+ } catch {
+ case t: Throwable =>
+ killTaskManagerFatal("Failed to unregister task memory from memory manager", t)
+ }
+
+ // release temp files from the distributed cache
if (task.getEnvironment != null) {
try {
for (entry <- DistributedCache.readFileInfoFromConfig(
@@ -744,20 +1034,74 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
fileCache.deleteTmpFile(entry.getKey, entry.getValue, task.getJobID)
}
} catch {
- case t: Throwable => log.error(
- "Error cleaning up local files from the distributed cache.", t)
+ // this is pretty unpleasant, but not a reason to give up immediately
+ case e: Exception => LOG.error(
+ "Error cleaning up local temp files from the distributed cache.", e)
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------
+ // Miscellaneous actions
+ // --------------------------------------------------------------------------
+
+ /**
+ * Sends a heartbeat message to the JobManager (if connected) with the current
+ * metrics report.
+ */
+ private def sendHeartbeatToJobManager(): Unit = {
+ try {
+ LOG.debug("Sending heartbeat to JobManager")
+ val report: Array[Byte] = metricRegistryMapper.writeValueAsBytes(metricRegistry)
+ currentJobManager foreach {
+ jm => jm ! Heartbeat(instanceID, report)
}
}
+ catch {
+ case e: Exception => LOG.warn("Error sending the metric heartbeat to the JobManager", e)
+ }
+ }
- networkEnvironment foreach {
- _.unregisterTask(task)
+ /**
+ * Sends a message with the stack trace of all threads to the given recipient.
+ *
+ * @param recipient The target of the stack trace message
+ */
+ private def sendStackTrace(recipient: ActorRef): Unit = {
+ if (recipient == null) {
+ return
}
- profiler foreach {
- _ ! UnmonitorTask(task.getExecutionId)
+ try {
+ val traces = Thread.getAllStackTraces.asScala
+ val stackTraceStr = traces.map(
+ (trace: (Thread, Array[StackTraceElement])) => {
+ val (thread, elements) = trace
+ "Thread: " + thread.getName + '\n' + elements.mkString("\n")
+ })
+ .mkString("\n\n")
+
+ recipient ! StackTrace(instanceID, stackTraceStr)
+ }
+ catch {
+ case e: Exception => LOG.error("Failed to send stack trace to " + recipient.path, e)
}
+ }
- task.unregisterMemoryManager(memoryManager)
+ /**
+ * Prints a big error message in the log and kills the TaskManager actor.
+ *
+ * @param cause The exception that caused the fatal problem.
+ */
+ private def killTaskManagerFatal(message: String, cause: Throwable): Unit = {
+ LOG.error("\n" +
+ "==============================================================\n" +
+ "====================== FATAL =======================\n" +
+ "==============================================================\n" +
+ "\n" +
+ "A fatal error occurred, forcing the TaskManager to shut down: " + message, cause)
+
+ self ! Kill
}
}
@@ -767,18 +1111,32 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
*/
object TaskManager {
+ /** TaskManager logger for synchronous logging (not through the logging actor) */
val LOG = LoggerFactory.getLogger(classOf[TaskManager])
+ /** Return code for unsuccessful TaskManager startup */
val STARTUP_FAILURE_RETURN_CODE = 1
+
+ /** Return code for critical errors during the runtime */
val RUNTIME_FAILURE_RETURN_CODE = 2
val TASK_MANAGER_NAME = "taskmanager"
val PROFILER_NAME = "profiler"
- val REGISTRATION_DELAY = 0 seconds
- val REGISTRATION_INTERVAL = 10 seconds
- val MAX_REGISTRATION_ATTEMPTS = 10
- val HEARTBEAT_INTERVAL = 5000 millisecond
+ /** Maximum time (msecs) that the TaskManager will spend searching for a
+ * suitable network interface to use for communication */
+ val MAX_STARTUP_CONNECT_TIME = 120000L
+
+ /** Time (msecs) after which the TaskManager will start logging failed
+ * connection attempts */
+ val STARTUP_CONNECT_LOG_SUPPRESS = 10000L
+
+ val INITIAL_REGISTRATION_TIMEOUT: FiniteDuration = 500 milliseconds
+ val MAX_REGISTRATION_TIMEOUT: FiniteDuration = 30 seconds
+
+ val DELAY_AFTER_REFUSED_REGISTRATION: FiniteDuration = 10 seconds
+
+ val HEARTBEAT_INTERVAL: FiniteDuration = 5000 milliseconds
// --------------------------------------------------------------------------
@@ -807,19 +1165,19 @@ object TaskManager {
}
}
- // run the TaskManager (is requested in an authentication enabled context)
+ // run the TaskManager (if requested in an authentication enabled context)
try {
if (SecurityUtils.isSecurityEnabled) {
LOG.info("Security is enabled. Starting secure TaskManager.")
SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
override def run(): Unit = {
- runTaskManager(configuration, classOf[TaskManager])
+ selectNetworkInterfaceAndRunTaskManager(configuration, classOf[TaskManager])
}
})
}
else {
LOG.info("Security is not enabled. Starting non-authenticated TaskManager.")
- runTaskManager(configuration, classOf[TaskManager])
+ selectNetworkInterfaceAndRunTaskManager(configuration, classOf[TaskManager])
}
}
catch {
@@ -831,7 +1189,7 @@ object TaskManager {
}
/**
- * Parse the command line arguments of the [[TaskManager]] and loads the configuration.
+ * Parse the command line arguments of the TaskManager and loads the configuration.
*
* @param args Command line arguments
* @return The parsed configuration.
@@ -869,22 +1227,43 @@ object TaskManager {
// --------------------------------------------------------------------------
/**
- * Starts and runs the TaskManager. Brings up an actor system for the TaskManager and its
- * actors, starts the TaskManager's services (library cache, shuffle network stack, ...),
- * and starts the TaskManager itself.
+ * Starts and runs the TaskManager.
+ *
+ * This method first tries to select the network interface to use for the TaskManager
+ * communication. The network interface is used both for the actor communication
+ * (coordination) as well as for the data exchange between task managers. Unless
+ * the hostname/interface is explicitly configured in the configuration, this
+ * method will try out various interfaces and methods to connect to the JobManager
+ * and select the one where the connection attempt is successful.
+ *
+ * After selecting the network interface, this method brings up an actor system
+ * for the TaskManager and its actors, starts the TaskManager's services
+ * (library cache, shuffle network stack, ...), and starts the TaskManager itself.
* @param configuration The configuration for the TaskManager.
- * @param taskManagerClass The actor class to instantiate. Allows to use TaskManager subclasses
- * for example for YARN.
+ * @param taskManagerClass The actor class to instantiate.
+ * Allows to use TaskManager subclasses for example for YARN.
*/
@throws(classOf[Exception])
- def runTaskManager(configuration: Configuration,
- taskManagerClass: Class[_ <: TaskManager]) : Unit = {
+ def selectNetworkInterfaceAndRunTaskManager(configuration: Configuration,
+ taskManagerClass: Class[_ <: TaskManager]) : Unit = {
val (jobManagerHostname, jobManagerPort) = getAndCheckJobManagerAddress(configuration)
+ val (taskManagerHostname, actorSystemPort) =
+ selectNetworkInterfaceAndPort(configuration, jobManagerHostname, jobManagerPort)
+
+ runTaskManager(taskManagerHostname, actorSystemPort, configuration, taskManagerClass)
+ }
+
+ @throws(classOf[IOException])
+ @throws(classOf[IllegalConfigurationException])
+ def selectNetworkInterfaceAndPort(configuration: Configuration,
+ jobManagerHostname: String,
+ jobManagerPort: Int) : (String, Int) = {
+
var taskManagerHostname = configuration.getString(
- ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null)
+ ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null)
if (taskManagerHostname != null) {
LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname)
@@ -893,31 +1272,38 @@ object TaskManager {
// try to find out the hostname of the interface from which the TaskManager
// can connect to the JobManager. This involves a reverse name lookup
LOG.info("Trying to select the network interface and address to use " +
- "by connecting to the configured JobManager")
+ "by connecting to the configured JobManager.")
+
+ LOG.info("TaskManager will try to connect for {} seconds before falling back to heuristics",
+ MAX_STARTUP_CONNECT_TIME)
val jobManagerAddress = new InetSocketAddress(jobManagerHostname, jobManagerPort)
- taskManagerHostname = try {
+ val taskManagerAddress = try {
// try to get the address for up to two minutes and start
// logging only after ten seconds
- NetUtils.findConnectingAddress(jobManagerAddress, 120000, 10000).getHostName()
+ NetUtils.findConnectingAddress(jobManagerAddress,
+ MAX_STARTUP_CONNECT_TIME, STARTUP_CONNECT_LOG_SUPPRESS)
}
catch {
- case t: Throwable => throw new Exception("TaskManager cannot find a network interface " +
+ case t: Throwable => throw new IOException("TaskManager cannot find a network interface " +
"that can communicate with the JobManager (" + jobManagerAddress + ")", t)
}
- LOG.info("TaskManager will use hostname/address '{}' for communication.", taskManagerHostname)
+ taskManagerHostname = taskManagerAddress.getHostName()
+ LOG.info(s"TaskManager will use hostname/address '${taskManagerHostname}' " +
+ s"(${taskManagerAddress.getHostAddress()}) for communication.")
}
// if no task manager port has been configured, use 0 (system will pick any free port)
val actorSystemPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0)
- if (actorSystemPort < 0) {
- throw new Exception("Invalid value for '" + ConfigConstants.TASK_MANAGER_IPC_PORT_KEY +
+ if (actorSystemPort < 0 || actorSystemPort > 65535) {
+ throw new IllegalConfigurationException("Invalid value for '" +
+ ConfigConstants.TASK_MANAGER_IPC_PORT_KEY +
"' (port for the TaskManager actor system) : " + actorSystemPort +
" - Leave config parameter empty or use 0 to let the system choose a port automatically.")
}
- runTaskManager(taskManagerHostname, actorSystemPort, configuration, taskManagerClass)
+ (taskManagerHostname, actorSystemPort)
}
/**
@@ -965,7 +1351,8 @@ object TaskManager {
LOG.info("Starting TaskManager")
// Bring up the TaskManager actor system first, bind it to the given address.
- LOG.info("Starting TaskManager actor system")
+
+ LOG.info("Starting TaskManager actor system at {}:{}", taskManagerHostname, actorSystemPort)
val taskManagerSystem = try {
val akkaConfig = AkkaUtils.getAkkaConfig(configuration,
@@ -993,8 +1380,12 @@ object TaskManager {
// and the TaskManager actor
try {
LOG.info("Starting TaskManager actor")
- val taskManager = startTaskManagerActor(configuration, taskManagerSystem, taskManagerHostname,
- TASK_MANAGER_NAME, false, false, taskManagerClass)
+ val taskManager = startTaskManagerComponentsAndActor(configuration,
+ taskManagerSystem,
+ taskManagerHostname,
+ Some(TASK_MANAGER_NAME),
+ None, false,
+ taskManagerClass)
// start a process reaper that watches the JobManager. If the JobManager actor dies,
// the process reaper will kill the JVM process (to ensure easy failure detection)
@@ -1007,7 +1398,8 @@ object TaskManager {
// memory usage information
if (LOG.isInfoEnabled && configuration.getBoolean(
ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD,
- ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD)) {
+ ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD))
+ {
LOG.info("Starting periodic memory usage logger")
val interval = configuration.getLong(
@@ -1032,6 +1424,7 @@ object TaskManager {
}
}
logger.setDaemon(true)
+ logger.setPriority(Thread.MIN_PRIORITY)
logger.start()
}
@@ -1051,38 +1444,118 @@ object TaskManager {
}
}
- @throws(classOf[Exception])
- def startTaskManagerActor(configuration: Configuration,
- actorSystem: ActorSystem,
- taskManagerHostname: String,
- taskManagerActorName: String,
- localAkkaCommunication: Boolean,
- localTaskManagerCommunication: Boolean,
- taskManagerClass: Class[_ <: TaskManager]): ActorRef = {
-
- val (tmConfig, netConfig, connectionInfo, jmAkkaURL) = parseTaskManagerConfiguration(
- configuration, taskManagerHostname, localAkkaCommunication, localTaskManagerCommunication)
-
- val tmProps = Props(taskManagerClass, connectionInfo, jmAkkaURL, tmConfig, netConfig)
- actorSystem.actorOf(tmProps, taskManagerActorName)
- }
-
/**
- * Starts the profiler actor.
*
- * @param instanceActorPath The actor path of the taskManager that is profiled.
- * @param reportInterval The interval in which the profiler runs.
- * @param actorSystem The actor system for the profiler actor
- * @return The profiler actor ref.
+ * @param configuration The configuration for the TaskManager.
+ * @param actorSystem The actor system that should run the TaskManager actor.
+ * @param taskManagerHostname The hostname/address that describes the TaskManager's data location.
+ * @param taskManagerActorName Optionally the name of the TaskManager actor. If none is given,
+ * the actor will use a random name.
+ * @param jobManagerPath Optionally, the JobManager actor path may be provided. If none is
+ * provided, the method will construct it automatically from the
+ * JobManager hostname an port specified in the configuration.
+ * @param localTaskManagerCommunication If true, the TaskManager will not initiate the
+ * TCP network stack.
+ * @param taskManagerClass The class of the TaskManager actor. May be used to give
+ * subclasses that understand additional actor messages.
+ *
+ * @throws org.apache.flink.configuration.IllegalConfigurationException
+ * Thrown, if the given config contains illegal values.
+ *
+ * @throws java.io.IOException Thrown, if any of the I/O components (such as buffer pools,
+ * I/O manager, ...) cannot be properly started.
+ * @throws java.lang.Exception Thrown is some other error occurs while parsing the configuration
+ * or starting the TaskManager components.
+ *
+ * @return An ActorRef to the TaskManager actor.
*/
- private def startProfiler(instanceActorPath: String,
- reportInterval: Long,
- actorSystem: ActorSystem): ActorRef = {
+ @throws(classOf[IllegalConfigurationException])
+ @throws(classOf[IOException])
+ @throws(classOf[Exception])
+ def startTaskManagerComponentsAndActor(configuration: Configuration,
+ actorSystem: ActorSystem,
+ taskManagerHostname: String,
+ taskManagerActorName: Option[String],
+ jobManagerPath: Option[String],
+ localTaskManagerCommunication: Boolean,
+ taskManagerClass: Class[_ <: TaskManager]): ActorRef = {
+
+ // get and check the JobManager config
+ val jobManagerAkkaUrl: String = jobManagerPath.getOrElse {
+ val (jobManagerHostname, jobManagerPort) = getAndCheckJobManagerAddress(configuration)
+ val hostPort = new InetSocketAddress(jobManagerHostname, jobManagerPort)
+ JobManager.getRemoteJobManagerAkkaURL(hostPort)
+ }
+
+ val (taskManagerConfig : TaskManagerConfiguration,
+ netConfig: NetworkEnvironmentConfiguration,
+ connectionInfo: InstanceConnectionInfo)
+
+ = parseTaskManagerConfiguration(configuration, taskManagerHostname,
+ localTaskManagerCommunication)
+
+ // pre-start checks
+ checkTempDirs(taskManagerConfig.tmpDirPaths)
+
+ // we start the network first, to make sure it can allocate its buffers first
+ val network = new NetworkEnvironment(taskManagerConfig.timeout, netConfig)
+
+ // computing the amount of memory to use depends on how much memory is available
+ // it strictly needs to happen AFTER the network stack has been initialized
+
+ // check if a value has been configured
+ val configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L)
+ checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory,
+ ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
+ "MemoryManager needs at least one MB of memory. " +
+ "If you leave this config parameter empty, the system automatically " +
+ "pick a fraction of the available memory.")
- val profilerProps = Props(classOf[TaskManagerProfiler], instanceActorPath, reportInterval)
- actorSystem.actorOf(profilerProps, PROFILER_NAME)
+ val memorySize = if (configuredMemory > 0) {
+ LOG.info("Using {} MB for Flink managed memory.", configuredMemory)
+ configuredMemory << 20 // megabytes to bytes
+ }
+ else {
+ val fraction = configuration.getFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+ ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
+ checkConfigParameter(fraction > 0.0f && fraction < 1.0f, fraction,
+ ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+ "MemoryManager fraction of the free memory must be between 0.0 and 1.0")
+
+ val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+ fraction).toLong
+
+ LOG.info("Using {} of the currently free heap space for Flink managed memory ({} MB).",
+ fraction, relativeMemSize >> 20)
+
+ relativeMemSize
+ }
+
+ // now start the memory manager
+ val memoryManager = new DefaultMemoryManager(memorySize,
+ taskManagerConfig.numberOfSlots,
+ netConfig.networkBufferSize)
+
+ // start the I/O manager last, it will create some temp directories.
+ val ioManager: IOManager = new IOManagerAsync(taskManagerConfig.tmpDirPaths)
+
+ // create the actor properties (which define the actor constructor parameters)
+ val tmProps = Props(taskManagerClass,
+ taskManagerConfig,
+ connectionInfo,
+ jobManagerAkkaUrl,
+ memoryManager,
+ ioManager,
+ network,
+ taskManagerConfig.numberOfSlots)
+
+ taskManagerActorName match {
+ case Some(actorName) => actorSystem.actorOf(tmProps, actorName)
+ case None => actorSystem.actorOf(tmProps)
+ }
}
+
// --------------------------------------------------------------------------
// Resolving the TaskManager actor
// --------------------------------------------------------------------------
@@ -1098,8 +1571,8 @@ object TaskManager {
*/
@throws(classOf[IOException])
def getTaskManagerRemoteReference(taskManagerUrl: String,
- system: ActorSystem,
- timeout: FiniteDuration): ActorRef = {
+ system: ActorSystem,
+ timeout: FiniteDuration): ActorRef = {
try {
val future = AkkaUtils.getReference(taskManagerUrl, system, timeout)
Await.result(future, timeout)
@@ -1116,7 +1589,7 @@ object TaskManager {
}
// --------------------------------------------------------------------------
- // Miscellaneous Utilities
+ // Parsing and checking the TaskManager Configuration
// --------------------------------------------------------------------------
/**
@@ -1125,19 +1598,18 @@ object TaskManager {
*
* @param configuration The configuration.
* @param taskManagerHostname The host name under which the TaskManager communicates.
- * @param localAkkaCommunication True, if the TaskManager runs in the same actor
- * system as its JobManager.
* @param localTaskManagerCommunication True, to skip initializing the network stack.
- * Use only when only one task manager is used.
+ * Use only in cases where only one task manager runs.
* @return A tuple (TaskManagerConfiguration, network configuration,
* InstanceConnectionInfo, JobManager actor Akka URL).
*/
- @throws(classOf[Exception])
+ @throws(classOf[IllegalArgumentException])
def parseTaskManagerConfiguration(configuration: Configuration,
taskManagerHostname: String,
- localAkkaCommunication: Boolean,
localTaskManagerCommunication: Boolean):
- (TaskManagerConfiguration, NetworkEnvironmentConfiguration, InstanceConnectionInfo, String) = {
+ (TaskManagerConfiguration,
+ NetworkEnvironmentConfiguration,
+ InstanceConnectionInfo) = {
// ------- read values from the config and check them ---------
// (a lot of them)
@@ -1156,17 +1628,6 @@ object TaskManager {
val taskManagerAddress = InetAddress.getByName(taskManagerHostname)
val connectionInfo = new InstanceConnectionInfo(taskManagerAddress, dataport)
- val jobManagerActorURL = if (localAkkaCommunication) {
- // JobManager and TaskManager are in the same ActorSystem -> Use local Akka URL
- JobManager.getLocalJobManagerAkkaURL
- }
- else {
- // both run in different actor system
- val (jobManagerHostname, jobManagerPort) = getAndCheckJobManagerAddress(configuration)
- val hostPort = new InetSocketAddress(jobManagerHostname, jobManagerPort)
- JobManager.getRemoteJobManagerAkkaURL(hostPort)
- }
-
// ----> memory / network stack (shuffles/broadcasts), task slots, temp directories
// we need this because many configs have been written with a "-1" entry
@@ -1181,8 +1642,6 @@ object TaskManager {
ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS)
- val configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L)
-
checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
"Number of task slots must be at least one.")
@@ -1197,19 +1656,11 @@ object TaskManager {
ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
"Buffer size must be a power of 2.")
- checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory,
- ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
- "MemoryManager needs at least one MB of memory. " +
- "Leave this config parameter empty to let the system automatically " +
- "pick a fraction of the available memory.")
-
val tmpDirs = configuration.getString(
ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH)
.split(",|" + File.pathSeparator)
- checkTempDirs(tmpDirs)
-
val nettyConfig = if (localTaskManagerCommunication) {
None
} else {
@@ -1221,37 +1672,10 @@ object TaskManager {
val syncOrAsync = configuration.getString(ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE)
- val ioMode : IOMode = if (syncOrAsync == "async") {
- IOMode.ASYNC
- }
- else {
- IOMode.SYNC
- }
-
- val networkConfig = NetworkEnvironmentConfiguration(numNetworkBuffers, pageSize, ioMode,
- nettyConfig)
-
- val networkBufferMem = numNetworkBuffers * pageSize
-
- val memorySize = if (configuredMemory > 0) {
- LOG.info("Using {} MB for Flink managed memory.", configuredMemory)
- configuredMemory << 20 // megabytes to bytes
- }
- else {
- val fraction = configuration.getFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
- ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
- checkConfigParameter(fraction > 0.0f, fraction,
- ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
- "MemoryManager fraction of the free memory must be positive.")
-
- val relativeMemSize = ((EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() -
- networkBufferMem) * fraction).toLong
-
- LOG.info("Using {} of the currently free heap space for Flink managed memory ({} MB).",
- fraction, relativeMemSize >> 20)
+ val ioMode : IOMode = if (syncOrAsync == "async") IOMode.ASYNC else IOMode.SYNC
- relativeMemSize
- }
+ val networkConfig = NetworkEnvironmentConfiguration(numNetworkBuffers, pageSize,
+ ioMode, nettyConfig)
// ----> timeouts, library caching, profiling
@@ -1259,35 +1683,37 @@ object TaskManager {
AkkaUtils.getTimeout(configuration)
}
catch {
- case e: Exception => throw new Exception(
+ case e: Exception => throw new IllegalArgumentException(
s"Invalid format for '${ConfigConstants.AKKA_ASK_TIMEOUT}'. " +
s"Use formats like '50 s' or '1 min' to specify the timeout.")
}
LOG.info("Messages between TaskManager and JobManager have a max timeout of " + timeout)
- val profilingInterval =
- if (configuration.getBoolean(ProfilingUtils.ENABLE_PROFILING_KEY, false)) {
- Some(configuration.getLong(ProfilingUtils.TASKMANAGER_REPORTINTERVAL_KEY,
- ProfilingUtils.DEFAULT_TASKMANAGER_REPORTINTERVAL))
- } else {
- None
- }
-
val cleanupInterval = configuration.getLong(
ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000
+ val finiteRegistratioDuration = try {
+ val maxRegistrationDuration = Duration(configuration.getString(
+ ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
+ ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION))
+ if (maxRegistrationDuration.isFinite()) {
+ Some(maxRegistrationDuration.asInstanceOf[FiniteDuration])
+ } else {
+ None
+ }
+ } catch {
+ case e: NumberFormatException => throw new IllegalArgumentException(
+ "Invalid format for parameter " + ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
+ e)
+ }
- val maxRegistrationDuration = Duration(configuration.getString(
- ConfigConstants.T
<TRUNCATED>