You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/04/29 11:50:31 UTC
[2/3] flink git commit: [FLINK-1923] [runtime] Replaces asynchronous
logging with synchronous logging using grizzled-slf4j wrapper for Scala.
[FLINK-1923] [runtime] Replaces asynchronous logging with synchronous logging using grizzled-slf4j wrapper for Scala.
This closes #628
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5a2ca819
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5a2ca819
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5a2ca819
Branch: refs/heads/master
Commit: 5a2ca81912193552e74cd6a33637b7254e5a7174
Parents: ccd574a
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Apr 24 16:33:34 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Apr 29 10:40:39 2015 +0200
----------------------------------------------------------------------
flink-dist/src/main/flink-bin/LICENSE | 1 +
flink-runtime/pom.xml | 5 +
.../apache/flink/runtime/taskmanager/Task.java | 11 --
.../apache/flink/runtime/ActorLogMessages.scala | 5 +-
.../flink/runtime/ActorSynchronousLogging.scala | 31 ++++
.../flink/runtime/jobmanager/JobManager.scala | 91 +++++-----
.../runtime/jobmanager/JobManagerProfiler.scala | 11 +-
.../runtime/jobmanager/MemoryArchivist.scala | 10 +-
.../StreamCheckpointCoordinator.scala | 14 +-
.../flink/runtime/taskmanager/TaskManager.scala | 167 +++++++++----------
.../taskmanager/TaskManagerProfiler.scala | 8 +-
.../apache/flink/yarn/ApplicationClient.scala | 16 +-
.../apache/flink/yarn/ApplicationMaster.scala | 9 +-
.../flink/yarn/ApplicationMasterActor.scala | 79 +++++----
pom.xml | 6 +
15 files changed, 243 insertions(+), 221 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/flink-dist/src/main/flink-bin/LICENSE
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/LICENSE b/flink-dist/src/main/flink-bin/LICENSE
index d66922c..1236381 100644
--- a/flink-dist/src/main/flink-bin/LICENSE
+++ b/flink-dist/src/main/flink-bin/LICENSE
@@ -308,6 +308,7 @@ BSD-style licenses:
- Scala Compiler Reflect (http://www.scala-lang.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.
- Scala Quasiquotes (http://scalamacros.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.
- ASM (http://asm.ow2.org/) - Copyright (c) 2000-2011 INRIA, France Telecom
+ - Grizzled SLF4J (http://software.clapper.org/grizzled-slf4j/) - Copyright (c) 2010 Brian M. Clapper
(Below is the 3-clause BSD license)
http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index f490ed9..56ed0d2 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -148,6 +148,11 @@ under the License.
</dependency>
<dependency>
+ <groupId>org.clapper</groupId>
+ <artifactId>grizzled-slf4j_${scala.binary.version}</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index f6eb907..e6eee5b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -30,7 +30,6 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
-import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.messages.TaskMessages.UnregisterTask;
import org.apache.flink.runtime.profiling.TaskManagerProfiler;
import org.apache.flink.util.ExceptionUtils;
@@ -357,16 +356,6 @@ public class Task {
taskManager.tell(new UnregisterTask(executionId), ActorRef.noSender());
}
- protected void notifyExecutionStateChange(ExecutionState executionState,
- Throwable optionalError) {
- LOG.info("Update execution state of {} ({}) to {}.", this.getTaskName(),
- this.getExecutionId(), executionState);
- taskManager.tell(new TaskMessages.UpdateTaskExecutionState(
- new TaskExecutionState(jobId, executionId, executionState, optionalError)),
- ActorRef.noSender());
-
- }
-
// -----------------------------------------------------------------------------------------------------------------
// Task Profiling
// -----------------------------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
index acd4346..c74c339 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
@@ -19,13 +19,12 @@
package org.apache.flink.runtime
import _root_.akka.actor.Actor
-import _root_.akka.event.LoggingAdapter
/**
* Mixin to add debug message logging
*/
trait ActorLogMessages {
- that: Actor =>
+ that: Actor with ActorSynchronousLogging =>
override def receive: Receive = new Actor.Receive {
private val _receiveWithLogMessages = receiveWithLogMessages
@@ -50,6 +49,4 @@ trait ActorLogMessages {
}
def receiveWithLogMessages: Receive
-
- protected def log: LoggingAdapter
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorSynchronousLogging.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorSynchronousLogging.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorSynchronousLogging.scala
new file mode 100644
index 0000000..4d3a988
--- /dev/null
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorSynchronousLogging.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime
+
+import _root_.akka.actor.Actor
+import grizzled.slf4j.Logger
+
+/** Adds a logger to an [[akka.actor.Actor]] implementation
+ *
+ */
+trait ActorSynchronousLogging {
+ self: Actor =>
+
+ lazy val log = Logger(getClass)
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 6870d95..83f9e35 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress
import java.util.Collections
import akka.actor.Status.{Success, Failure}
+import grizzled.slf4j.Logger
import org.apache.flink.api.common.{JobID, ExecutionConfig}
import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration}
import org.apache.flink.core.io.InputSplitAssigner
@@ -42,7 +43,7 @@ import org.apache.flink.runtime.security.SecurityUtils
import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.util.{SerializedValue, EnvironmentInformation}
-import org.apache.flink.runtime.ActorLogMessages
+import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.instance.InstanceManager
@@ -101,10 +102,7 @@ class JobManager(val flinkConfiguration: Configuration,
val defaultExecutionRetries: Int,
val delayBetweenRetries: Long,
val timeout: FiniteDuration)
- extends Actor with ActorLogMessages with ActorLogging {
-
- /** Reference to the log, for debugging */
- val LOG = JobManager.LOG
+ extends Actor with ActorLogMessages with ActorSynchronousLogging {
/** List of current jobs running jobs */
val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
@@ -114,7 +112,7 @@ class JobManager(val flinkConfiguration: Configuration,
* Run when the job manager is started. Simply logs an informational message.
*/
override def preStart(): Unit = {
- LOG.info(s"Starting JobManager at ${self.path.toSerializationFormat}.")
+ log.info(s"Starting JobManager at ${self.path.toSerializationFormat}.")
}
override def postStop(): Unit = {
@@ -138,12 +136,11 @@ class JobManager(val flinkConfiguration: Configuration,
try {
libraryCacheManager.shutdown()
} catch {
- case e: IOException => log.error(e, "Could not properly shutdown the library cache manager.")
+ case e: IOException => log.error("Could not properly shutdown the library cache manager.", e)
}
- if (log.isDebugEnabled) {
- log.debug("Job manager {} is completely stopped.", self.path)
- }
+ log.debug(s"Job manager ${self.path} is completely stopped.")
+
}
/**
@@ -179,7 +176,7 @@ class JobManager(val flinkConfiguration: Configuration,
// registerTaskManager throws an IllegalStateException if it is already shut down
// let the actor crash and restart itself in this case
case e: Exception =>
- log.error(e, "Failed to register TaskManager at instance manager")
+ log.error("Failed to register TaskManager at instance manager", e)
// IMPORTANT: Send the response to the "sender", which is not the
// TaskManager actor, but the ask future!
@@ -197,7 +194,7 @@ class JobManager(val flinkConfiguration: Configuration,
submitJob(jobGraph, listenToEvents = listen)
case CancelJob(jobID) =>
- log.info("Trying to cancel job with ID {}.", jobID)
+ log.info(s"Trying to cancel job with ID $jobID.")
currentJobs.get(jobID) match {
case Some((executionGraph, _)) =>
@@ -208,7 +205,7 @@ class JobManager(val flinkConfiguration: Configuration,
sender ! CancellationSuccess(jobID)
case None =>
- log.info("No job found with ID {}.", jobID)
+ log.info(s"No job found with ID $jobID.")
sender ! CancellationFailure(jobID, new IllegalArgumentException("No job found with " +
s"ID $jobID."))
}
@@ -227,8 +224,9 @@ class JobManager(val flinkConfiguration: Configuration,
}(context.dispatcher)
sender ! true
- case None => log.error("Cannot find execution graph for ID {} to change state to {}.",
- taskExecutionState.getJobID, taskExecutionState.getExecutionState)
+ case None => log.error("Cannot find execution graph for ID " +
+ s"${taskExecutionState.getJobID} to change state to " +
+ s"${taskExecutionState.getExecutionState}.")
sender ! false
}
}
@@ -239,7 +237,7 @@ class JobManager(val flinkConfiguration: Configuration,
val execution = executionGraph.getRegisteredExecutions.get(executionAttempt)
if (execution == null) {
- log.error("Can not find Execution for attempt {}.", executionAttempt)
+ log.error(s"Can not find Execution for attempt $executionAttempt.")
null
} else {
val slot = execution.getAssignedResource
@@ -256,32 +254,30 @@ class JobManager(val flinkConfiguration: Configuration,
case splitAssigner: InputSplitAssigner =>
val nextInputSplit = splitAssigner.getNextInputSplit(host, taskId)
- if (log.isDebugEnabled) {
- log.debug("Send next input split {}.", nextInputSplit)
- }
+ log.debug(s"Send next input split $nextInputSplit.")
try {
InstantiationUtil.serializeObject(nextInputSplit)
} catch {
case ex: Exception =>
- log.error(ex, "Could not serialize the next input split of class {}.",
- nextInputSplit.getClass)
+ log.error(s"Could not serialize the next input split of " +
+ s"class ${nextInputSplit.getClass}.", ex)
vertex.fail(new RuntimeException("Could not serialize the next input split " +
"of class " + nextInputSplit.getClass + ".", ex))
null
}
case _ =>
- log.error("No InputSplitAssigner for vertex ID {}.", vertexID)
+ log.error(s"No InputSplitAssigner for vertex ID $vertexID.")
null
}
case _ =>
- log.error("Cannot find execution vertex for vertex ID {}.", vertexID)
+ log.error(s"Cannot find execution vertex for vertex ID $vertexID.")
null
}
}
case None =>
- log.error("Cannot find execution graph for job ID {}.", jobID)
+ log.error(s"Cannot find execution graph for job ID $jobID.")
null
}
@@ -290,9 +286,8 @@ class JobManager(val flinkConfiguration: Configuration,
case JobStatusChanged(jobID, newJobStatus, timeStamp, error) =>
currentJobs.get(jobID) match {
case Some((executionGraph, jobInfo)) => executionGraph.getJobName
- log.info("Status of job {} ({}) changed to {} {}.",
- jobID, executionGraph.getJobName, newJobStatus,
- if (error == null) "" else error.getMessage)
+ log.info(s"Status of job $jobID (${executionGraph.getJobName}) changed to $newJobStatus" +
+ s" ${if (error == null) "" else error.getMessage}.")
if (newJobStatus.isTerminalState) {
jobInfo.end = timeStamp
@@ -304,7 +299,7 @@ class JobManager(val flinkConfiguration: Configuration,
accumulatorManager.getJobAccumulatorResultsSerialized(jobID)
} catch {
case e: Exception =>
- log.error(e, "Cannot fetch serialized accumulators for job {}", jobID)
+ log.error(s"Cannot fetch serialized accumulators for job $jobID", e)
Collections.emptyMap()
}
val result = new SerializedJobExecutionResult(jobID, jobInfo.duration,
@@ -352,8 +347,8 @@ class JobManager(val flinkConfiguration: Configuration,
sender ! Acknowledge
executionGraph.scheduleOrUpdateConsumers(partitionId)
case None =>
- log.error("Cannot find execution graph for job ID {} to schedule or update consumers",
- jobId)
+ log.error(s"Cannot find execution graph for job ID $jobId to schedule or update " +
+ s"consumers.")
sender ! Failure(new IllegalStateException("Cannot find execution graph for job ID " +
s"$jobId to schedule or update consumers."))
}
@@ -383,7 +378,7 @@ class JobManager(val flinkConfiguration: Configuration,
sender ! RunningJobsStatus(jobs)
}
catch {
- case t: Throwable => LOG.error("Exception while responding to RequestRunningJobsStatus", t)
+ case t: Throwable => log.error("Exception while responding to RequestRunningJobsStatus", t)
}
case RequestJob(jobID) =>
@@ -403,10 +398,10 @@ class JobManager(val flinkConfiguration: Configuration,
case Heartbeat(instanceID, metricsReport) =>
try {
- log.debug("Received hearbeat message from {}", instanceID)
+ log.debug(s"Received hearbeat message from $instanceID.")
instanceManager.reportHeartBeat(instanceID, metricsReport)
} catch {
- case t: Throwable => log.error(t, "Could not report heart beat from {}.", sender().path)
+ case t: Throwable => log.error(s"Could not report heart beat from ${sender().path}.", t)
}
case message: AccumulatorMessage => handleAccumulatorMessage(message)
@@ -417,7 +412,7 @@ class JobManager(val flinkConfiguration: Configuration,
case Terminated(taskManager) =>
if (instanceManager.isRegistered(taskManager)) {
- log.info("Task manager {} terminated.", taskManager.path)
+ log.info(s"Task manager ${taskManager.path} terminated.")
instanceManager.unregisterTaskManager(taskManager)
context.unwatch(taskManager)
@@ -430,7 +425,7 @@ class JobManager(val flinkConfiguration: Configuration,
val taskManager = sender()
if (instanceManager.isRegistered(taskManager)) {
- log.info("Task manager {} wants to disconnect, because {}.", taskManager.path, msg)
+ log.info(s"Task manager ${taskManager.path} wants to disconnect, because $msg.")
instanceManager.unregisterTaskManager(taskManager)
context.unwatch(taskManager)
@@ -590,7 +585,7 @@ class JobManager(val flinkConfiguration: Configuration,
}
catch {
case tt: Throwable => {
- log.error(tt, "Error while marking ExecutionGraph as failed.")
+ log.error("Error while marking ExecutionGraph as failed.", tt)
}
}
}
@@ -618,7 +613,7 @@ class JobManager(val flinkConfiguration: Configuration,
libraryCacheManager.getClassLoader(jobId)
} catch {
case e: Exception =>
- log.error(e, "Dropping accumulators. No class loader available for job " + jobId)
+ log.error("Dropping accumulators. No class loader available for job " + jobId, e)
return
}
@@ -628,7 +623,7 @@ class JobManager(val flinkConfiguration: Configuration,
accumulatorManager.processIncomingAccumulators(jobId, accumulators)
}
catch {
- case e: Exception => log.error(e, "Cannot update accumulators for job " + jobId)
+ case e: Exception => log.error("Cannot update accumulators for job " + jobId, e)
}
} else {
log.error("Dropping accumulators. No class loader available for job " + jobId)
@@ -643,7 +638,7 @@ class JobManager(val flinkConfiguration: Configuration,
}
catch {
case e: Exception =>
- log.error(e, "Cannot serialize accumulator result")
+ log.error("Cannot serialize accumulator result", e)
sender() ! AccumulatorResultsErroneous(jobID, e)
}
@@ -656,7 +651,7 @@ class JobManager(val flinkConfiguration: Configuration,
}
catch {
case e: Exception =>
- log.error(e, "Cannot fetch accumulator result")
+ log.error("Cannot fetch accumulator result", e)
sender() ! AccumulatorResultsErroneous(jobId, e)
}
@@ -676,8 +671,8 @@ class JobManager(val flinkConfiguration: Configuration,
archive ! ArchiveExecutionGraph(jobID, eg)
} catch {
- case t: Throwable => log.error(t, "Could not prepare the execution graph {} for " +
- "archiving.", eg)
+ case t: Throwable => log.error(s"Could not prepare the execution graph $eg for " +
+ "archiving.", t)
}
case None =>
@@ -687,7 +682,7 @@ class JobManager(val flinkConfiguration: Configuration,
libraryCacheManager.unregisterJob(jobID)
} catch {
case t: Throwable =>
- log.error(t, "Could not properly unregister job {} form the library cache.", jobID)
+ log.error(s"Could not properly unregister job $jobID form the library cache.", t)
}
}
}
@@ -699,7 +694,7 @@ class JobManager(val flinkConfiguration: Configuration,
*/
object JobManager {
- val LOG = LoggerFactory.getLogger(classOf[JobManager])
+ val LOG = Logger(classOf[JobManager])
val STARTUP_FAILURE_RETURN_CODE = 1
val RUNTIME_FAILURE_RETURN_CODE = 2
@@ -717,7 +712,7 @@ object JobManager {
*/
def main(args: Array[String]): Unit = {
// startup checks and logging
- EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager", args)
+ EnvironmentInformation.logEnvironmentInfo(LOG.logger, "JobManager", args)
EnvironmentInformation.checkJavaVersion()
// parsing the command line arguments
@@ -798,7 +793,7 @@ object JobManager {
LOG.info("Starting JobManager")
// Bring up the job manager actor system first, bind it to the given address.
- LOG.info("Starting JobManager actor system at {}:{}", listeningAddress, listeningPort)
+ LOG.info(s"Starting JobManager actor system at $listeningAddress:$listeningPort.")
val jobManagerSystem = try {
val akkaConfig = AkkaUtils.getAkkaConfig(configuration,
@@ -831,7 +826,7 @@ object JobManager {
// the process reaper will kill the JVM process (to ensure easy failure detection)
LOG.debug("Starting JobManager process reaper")
jobManagerSystem.actorOf(
- Props(classOf[ProcessReaper], jobManager, LOG, RUNTIME_FAILURE_RETURN_CODE),
+ Props(classOf[ProcessReaper], jobManager, LOG.logger, RUNTIME_FAILURE_RETURN_CODE),
"JobManager_Process_Reaper")
// bring up a local task manager, if needed
@@ -847,7 +842,7 @@ object JobManager {
LOG.debug("Starting TaskManager process reaper")
jobManagerSystem.actorOf(
- Props(classOf[ProcessReaper], taskManagerActor, LOG, RUNTIME_FAILURE_RETURN_CODE),
+ Props(classOf[ProcessReaper], taskManagerActor, LOG.logger, RUNTIME_FAILURE_RETURN_CODE),
"TaskManager_Process_Reaper")
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala
index eb8f913..dd3a1b7 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala
@@ -18,18 +18,21 @@
package org.apache.flink.runtime.jobmanager
-import akka.actor.{ActorLogging, Actor}
-import org.apache.flink.runtime.ActorLogMessages
+import akka.actor.Actor
+import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
import org.apache.flink.runtime.messages.JobManagerProfilerMessages.ReportProfilingData
import org.apache.flink.runtime.profiling.impl.types.{InternalInstanceProfilingData, InternalExecutionVertexThreadProfilingData}
-import org.apache.flink.runtime.profiling.types.ThreadProfilingEvent
import scala.collection.convert.WrapAsScala
/**
* Basic skeleton for the JobManager profiler. Currently, it simply logs the received messages.
*/
-class JobManagerProfiler extends Actor with ActorLogMessages with ActorLogging with WrapAsScala {
+class JobManagerProfiler
+ extends Actor
+ with ActorLogMessages
+ with ActorSynchronousLogging
+ with WrapAsScala {
override def receiveWithLogMessages: Receive = {
case ReportProfilingData(profilingContainer) =>
profilingContainer.getIterator foreach {
http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index cb7bfec..9e71ebb 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -18,9 +18,9 @@
package org.apache.flink.runtime.jobmanager
-import akka.actor.{ActorLogging, Actor}
+import akka.actor.Actor
import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.ActorLogMessages
+import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
import org.apache.flink.runtime.executiongraph.ExecutionGraph
import org.apache.flink.runtime.messages.ArchiveMessages._
import org.apache.flink.runtime.messages.JobManagerMessages._
@@ -48,8 +48,10 @@ import scala.ref.SoftReference
*
* @param max_entries Maximum number of stored Flink jobs
*/
-class MemoryArchivist(private val max_entries: Int) extends Actor with ActorLogMessages with
-ActorLogging {
+class MemoryArchivist(private val max_entries: Int)
+ extends Actor
+ with ActorLogMessages
+ with ActorSynchronousLogging {
/*
* Map of execution graphs belonging to recently started jobs with the time stamp of the last
* received job event. The insert order is preserved through a LinkedHashMap.
http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
index 48266e2..8bb1274 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager
import java.lang.{Long => JLong}
import akka.actor._
-import org.apache.flink.runtime.ActorLogMessages
+import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
import org.apache.flink.runtime.execution.ExecutionState
import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionVertex}
import org.apache.flink.runtime.jobgraph.JobStatus._
@@ -69,7 +69,7 @@ class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph,
val interval: FiniteDuration,
var curId: JLong,
var ackId: JLong)
-extends Actor with ActorLogMessages with ActorLogging {
+extends Actor with ActorLogMessages with ActorSynchronousLogging {
implicit private val executor = context.dispatcher
@@ -78,13 +78,13 @@ extends Actor with ActorLogMessages with ActorLogging {
case InitBarrierScheduler =>
context.system.scheduler.schedule(interval,interval,self,BarrierTimeout)
context.system.scheduler.schedule(2 * interval,2 * interval,self,CompactAndUpdate)
- log.info("Started Stream State Monitor for job {}{}",
- executionGraph.getJobID,executionGraph.getJobName)
+ log.info("Started Stream State Monitor for job " +
+ s"${executionGraph.getJobID}${executionGraph.getJobName}")
case BarrierTimeout =>
executionGraph.getState match {
case FAILED | CANCELED | FINISHED =>
- log.info("Stopping monitor for terminated job {}", executionGraph.getJobID)
+ log.info(s"Stopping monitor for terminated job ${executionGraph.getJobID}.")
self ! PoisonPill
case RUNNING =>
curId += 1
@@ -94,8 +94,8 @@ extends Actor with ActorLogMessages with ActorLogging {
=> vertex.getCurrentAssignedResource.getInstance.getTaskManager
! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId,curId))
case _ =>
- log.debug("Omitting sending barrier since graph is in {} state for job {}",
- executionGraph.getState, executionGraph.getJobID)
+ log.debug("Omitting sending barrier since graph is in " +
+ s"${executionGraph.getState} state for job ${executionGraph.getJobID}.")
}
case StateBarrierAck(jobID, jobVertexID, instanceID, checkpointID, opState) =>
http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index d6b91ec..511de6b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -33,11 +33,12 @@ import com.codahale.metrics.json.MetricsModule
import com.codahale.metrics.jvm.{MemoryUsageGaugeSet, GarbageCollectorMetricSet}
import com.fasterxml.jackson.databind.ObjectMapper
+import grizzled.slf4j.Logger
import org.apache.flink.api.common.cache.DistributedCache
import org.apache.flink.configuration._
import org.apache.flink.core.fs.Path
-import org.apache.flink.runtime.ActorLogMessages
+import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.blob.{BlobService, BlobCache}
import org.apache.flink.runtime.broadcast.BroadcastVariableManager
@@ -67,8 +68,6 @@ import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
import org.apache.flink.runtime.util.{MathUtils, EnvironmentInformation}
import org.apache.flink.util.ExceptionUtils
-import org.slf4j.LoggerFactory
-
import scala.concurrent._
import scala.concurrent.duration._
import scala.util.{Failure, Success}
@@ -125,10 +124,7 @@ class TaskManager(protected val config: TaskManagerConfiguration,
protected val network: NetworkEnvironment,
protected val numberOfSlots: Int)
-extends Actor with ActorLogMessages with ActorLogging {
-
- /** The log for all synchronous logging calls */
- private val LOG = TaskManager.LOG
+extends Actor with ActorLogMessages with ActorSynchronousLogging {
/** The timeout for all actor ask futures */
protected val askTimeout = new Timeout(config.timeout)
@@ -178,13 +174,13 @@ extends Actor with ActorLogMessages with ActorLogging {
* JobManager.
*/
override def preStart(): Unit = {
- LOG.info("Starting TaskManager actor at {}.", self.path.toSerializationFormat)
- LOG.info("TaskManager data connection information: {}", connectionInfo)
- LOG.info("TaskManager has {} task slot(s).", numberOfSlots)
+ log.info(s"Starting TaskManager actor at ${self.path.toSerializationFormat}.")
+ log.info(s"TaskManager data connection information: $connectionInfo")
+ log.info(s"TaskManager has $numberOfSlots task slot(s).")
// log the initial memory utilization
- if (LOG.isInfoEnabled) {
- LOG.info(TaskManager.getMemoryUsageStatsAsString(ManagementFactory.getMemoryMXBean))
+ if (log.isInfoEnabled) {
+ log.info(TaskManager.getMemoryUsageStatsAsString(ManagementFactory.getMemoryMXBean))
}
// kick off the registration
@@ -200,7 +196,7 @@ extends Actor with ActorLogMessages with ActorLogging {
* (like network stack, library cache, memory manager, ...) are properly shut down.
*/
override def postStop(): Unit = {
- LOG.info("Stopping TaskManager {}.", self.path.toSerializationFormat)
+ log.info(s"Stopping TaskManager ${self.path.toSerializationFormat}.")
cancelAndClearEverything(new Exception("TaskManager is shutting down."))
@@ -208,35 +204,35 @@ extends Actor with ActorLogMessages with ActorLogging {
try {
disassociateFromJobManager()
} catch {
- case t: Exception => LOG.error("Could not cleanly disassociate from JobManager", t)
+ case t: Exception => log.error("Could not cleanly disassociate from JobManager", t)
}
}
try {
ioManager.shutdown()
} catch {
- case t: Exception => LOG.error("I/O manager did not shutdown properly.", t)
+ case t: Exception => log.error("I/O manager did not shutdown properly.", t)
}
try {
memoryManager.shutdown()
} catch {
- case t: Exception => LOG.error("Memory manager did not shutdown properly.", t)
+ case t: Exception => log.error("Memory manager did not shutdown properly.", t)
}
try {
network.shutdown()
} catch {
- case t: Exception => LOG.error("Network environment did not shutdown properly.", t)
+ case t: Exception => log.error("Network environment did not shutdown properly.", t)
}
try {
fileCache.shutdown()
} catch {
- case t: Exception => LOG.error("FileCache did not shutdown properly.", t)
+ case t: Exception => log.error("FileCache did not shutdown properly.", t)
}
- LOG.info("Task manager {} is completely shut down.", self.path)
+ log.info(s"Task manager ${self.path} is completely shut down.")
}
/**
@@ -277,8 +273,8 @@ extends Actor with ActorLogMessages with ActorLogging {
handleJobManagerDisconnect(sender(), "JobManager is no longer reachable")
}
else {
- LOG.warn("Received unrecognized disconnect message from {}",
- if (actor == null) null else actor.path)
+ log.warn(s"Received unrecognized disconnect message " +
+ s"from ${if (actor == null) null else actor.path}.")
}
case Disconnect(msg) =>
@@ -291,7 +287,7 @@ extends Actor with ActorLogMessages with ActorLogging {
override def unhandled(message: Any): Unit = {
val errorMessage = "Received unknown message " + message
val error = new RuntimeException(errorMessage)
- LOG.error(errorMessage)
+ log.error(errorMessage)
// terminate all we are currently running (with a dedicated message)
// before the actor is stopped
@@ -311,8 +307,8 @@ extends Actor with ActorLogMessages with ActorLogging {
// at very first, check that we are actually currently associated with a JobManager
if (!isConnected) {
- LOG.debug("Dropping message {} because the TaskManager is currently " +
- "not connected to a JobManager", message)
+ log.debug(s"Dropping message $message because the TaskManager is currently " +
+ "not connected to a JobManager.")
}
// we order the messages by frequency, to make sure the code paths for matching
@@ -329,7 +325,7 @@ extends Actor with ActorLogMessages with ActorLogging {
// discards intermediate result partitions of a task execution on this TaskManager
case FailIntermediateResultPartitions(executionID) =>
- LOG.info("Discarding the results produced by task execution " + executionID)
+ log.info("Discarding the results produced by task execution " + executionID)
if (network.isAssociated) {
try {
network.getPartitionManager.releasePartitionsProducedBy(executionID)
@@ -392,7 +388,7 @@ extends Actor with ActorLogMessages with ActorLogging {
Future {
task.failExternally(cause)
}.onFailure{
- case t: Throwable => LOG.error(s"Could not fail task ${task} externally.", t)
+ case t: Throwable => log.error(s"Could not fail task ${task} externally.", t)
}
case None =>
}
@@ -406,7 +402,7 @@ extends Actor with ActorLogMessages with ActorLogging {
Future {
task.cancelExecution()
}.onFailure{
- case t: Throwable => LOG.error("Could not cancel task " + task, t)
+ case t: Throwable => log.error("Could not cancel task " + task, t)
}
sender ! new TaskOperationResult(executionID, true)
@@ -426,10 +422,9 @@ extends Actor with ActorLogMessages with ActorLogging {
private def handleCheckpointingMessage(message: CheckpointingMessage): Unit = {
message match {
-
case BarrierReq(attemptID, checkpointID) =>
- LOG.debug("[FT-TaskManager] Barrier {} request received for attempt {}",
- checkpointID, attemptID)
+ log.debug(s"[FT-TaskManager] Barrier $checkpointID request received " +
+ s"for attempt $attemptID.")
runningTasks.get(attemptID) match {
case Some(i) =>
@@ -441,16 +436,14 @@ extends Actor with ActorLogMessages with ActorLogging {
barrierTransceiver.broadcastBarrierFromSource(checkpointID)
}).start()
- case _ => LOG.error(
- "Taskmanager received a checkpoint request for non-checkpointing task {}",
- attemptID)
+ case _ => log.error("Taskmanager received a checkpoint request for " +
+ s"non-checkpointing task $attemptID.")
}
}
case None =>
// may always happen in case of canceled/finished tasks
- LOG.debug("Taskmanager received a checkpoint request for unknown task {}",
- attemptID)
+ log.debug(s"Taskmanager received a checkpoint request for unknown task $attemptID.")
}
// unknown checkpoint message
@@ -474,19 +467,19 @@ extends Actor with ActorLogMessages with ActorLogging {
if (isConnected) {
// this may be the case, if we queue another attempt and
// in the meantime, the registration is acknowledged
- LOG.debug(
+ log.debug(
"TaskManager was triggered to register at JobManager, but is already registered")
}
else if (deadline.exists(_.isOverdue())) {
// we failed to register in time. that means we should quit
- LOG.error("Failed to register at the JobManager withing the defined maximum " +
+ log.error("Failed to register at the JobManager withing the defined maximum " +
"connect time. Shutting down ...")
// terminate ourselves (hasta la vista)
self ! PoisonPill
}
else {
- LOG.info(s"Trying to register at JobManager ${jobManagerURL} " +
+ log.info(s"Trying to register at JobManager ${jobManagerURL} " +
s"(attempt ${attempt}, timeout: ${timeout})")
val jobManager = context.actorSelection(jobManagerAkkaURL)
@@ -510,9 +503,9 @@ extends Actor with ActorLogMessages with ActorLogging {
case AcknowledgeRegistration(jobManager, id, blobPort) =>
if (isConnected) {
if (jobManager == currentJobManager.orNull) {
- LOG.debug("Ignoring duplicate registration acknowledgement.")
+ log.debug("Ignoring duplicate registration acknowledgement.")
} else {
- LOG.warn(s"Ignoring 'AcknowledgeRegistration' message from ${jobManager.path} , " +
+ log.warn(s"Ignoring 'AcknowledgeRegistration' message from ${jobManager.path} , " +
s"because the TaskManager is already registered at ${currentJobManager.orNull}")
}
}
@@ -531,15 +524,15 @@ extends Actor with ActorLogMessages with ActorLogging {
case AlreadyRegistered(jobManager, id, blobPort) =>
if (isConnected) {
if (jobManager == currentJobManager.orNull) {
- LOG.debug("Ignoring duplicate registration acknowledgement.")
+ log.debug("Ignoring duplicate registration acknowledgement.")
} else {
- LOG.warn(s"Received 'AlreadyRegistered' message from JobManager ${jobManager.path}, " +
+ log.warn(s"Received 'AlreadyRegistered' message from JobManager ${jobManager.path}, " +
s"even through TaskManager is currently registered at ${currentJobManager.orNull}")
}
}
else {
// not connected, yet, to let's associate
- LOG.info("Received 'AlreadyRegistered' message before 'AcknowledgeRegistration'")
+ log.info("Received 'AlreadyRegistered' message before 'AcknowledgeRegistration'")
try {
associateWithJobManager(jobManager, id, blobPort)
@@ -552,7 +545,7 @@ extends Actor with ActorLogMessages with ActorLogging {
case RefuseRegistration(reason) =>
if (currentJobManager.isEmpty) {
- LOG.error(s"The registration at JobManager ${jobManagerAkkaURL} was refused, " +
+ log.error(s"The registration at JobManager ${jobManagerAkkaURL} was refused, " +
s"because: ${reason}. Retrying later...")
// try the registration again after some time
@@ -570,11 +563,11 @@ extends Actor with ActorLogMessages with ActorLogging {
else {
// ignore RefuseRegistration messages which arrived after AcknowledgeRegistration
if (sender() == currentJobManager.orNull) {
- LOG.warn(s"Received 'RefuseRegistration' from the JobManager (${sender().path})" +
+ log.warn(s"Received 'RefuseRegistration' from the JobManager (${sender().path})" +
s" even though this TaskManager is already registered there.")
}
else {
- LOG.warn(s"Ignoring 'RefuseRegistration' from unrelated JobManager (${sender().path})")
+ log.warn(s"Ignoring 'RefuseRegistration' from unrelated JobManager (${sender().path})")
}
}
@@ -621,7 +614,7 @@ extends Actor with ActorLogMessages with ActorLogging {
// sanity check that we are not currently registered with a different JobManager
if (isConnected) {
if (currentJobManager.get == jobManager) {
- LOG.warn("Received call to finish registration with JobManager " +
+ log.warn("Received call to finish registration with JobManager " +
jobManager.path + " even though TaskManager is already registered.")
return
}
@@ -633,8 +626,8 @@ extends Actor with ActorLogMessages with ActorLogging {
}
// not yet associated, so associate
- LOG.info("Successful registration at JobManager ({}), " +
- "starting network stack and library cache.", jobManager.path)
+ log.info(s"Successful registration at JobManager (${jobManager.path}), " +
+ "starting network stack and library cache.")
// sanity check that the JobManager dependent components are not set up currently
if (network.isAssociated || blobService.isDefined) {
@@ -648,7 +641,7 @@ extends Actor with ActorLogMessages with ActorLogging {
catch {
case e: Exception =>
val message = "Could not start network environment."
- LOG.error(message, e)
+ log.error(message, e)
throw new RuntimeException(message, e)
}
@@ -657,7 +650,7 @@ extends Actor with ActorLogMessages with ActorLogging {
val jmHost = jobManager.path.address.host.getOrElse("localhost")
val address = new InetSocketAddress(jmHost, blobPort)
- LOG.info("Determined BLOB server address to be {}. Starting BLOB cache.", address)
+ log.info(s"Determined BLOB server address to be $address. Starting BLOB cache.")
try {
val blobcache = new BlobCache(address, config.configuration)
@@ -667,7 +660,7 @@ extends Actor with ActorLogMessages with ActorLogging {
catch {
case e: Exception =>
val message = "Could not create BLOB cache or library cache."
- LOG.error(message, e)
+ log.error(message, e)
throw new RuntimeException(message, e)
}
}
@@ -700,12 +693,12 @@ extends Actor with ActorLogMessages with ActorLogging {
*/
private def disassociateFromJobManager(): Unit = {
if (!isConnected) {
- LOG.warn("TaskManager received message to disassociate from JobManager, even though " +
+ log.warn("TaskManager received message to disassociate from JobManager, even though " +
"it is not currently associated with a JobManager")
return
}
- LOG.info("Disassociating from JobManager")
+ log.info("Disassociating from JobManager")
// stop the periodic heartbeats
heartbeatScheduler foreach {
@@ -748,7 +741,7 @@ extends Actor with ActorLogMessages with ActorLogging {
if (jobManager == currentJobManager.orNull) {
try {
val message = "Disconnecting from JobManager: " + msg
- LOG.info(message)
+ log.info(message)
// cancel all our tasks with a proper error message
cancelAndClearEverything(new Exception(message))
@@ -769,7 +762,7 @@ extends Actor with ActorLogMessages with ActorLogging {
}
}
else {
- LOG.warn("Received erroneous JobManager disconnect message for {}", jobManager.path)
+ log.warn(s"Received erroneous JobManager disconnect message for ${jobManager.path}.")
}
}
}
@@ -807,17 +800,15 @@ extends Actor with ActorLogMessages with ActorLogging {
val userCodeClassLoader = libraryCacheManager match {
case Some(manager) =>
- if (LOG.isDebugEnabled) {
+ if (log.isDebugEnabled) {
startRegisteringTask = System.currentTimeMillis()
}
// triggers the download of all missing jar files from the job manager
manager.registerTask(jobID, executionID, tdd.getRequiredJarFiles)
- if (LOG.isDebugEnabled) {
- LOG.debug("Register task {} at library cache manager took {}s", executionID,
- (System.currentTimeMillis() - startRegisteringTask) / 1000.0)
- }
+ log.debug(s"Register task $executionID at library cache manager " +
+ s"took ${(System.currentTimeMillis() - startRegisteringTask) / 1000.0}s")
manager.getClassLoader(jobID)
case None => throw new IllegalStateException("There is no valid library cache manager.")
@@ -859,7 +850,7 @@ extends Actor with ActorLogMessages with ActorLogging {
}
// register the task with the network stack and profiles
- LOG.info("Register task {}", task)
+ log.info(s"Register task $task.")
network.registerTask(task)
val cpTasks = new util.HashMap[String, FutureTask[Path]]()
@@ -881,7 +872,7 @@ extends Actor with ActorLogMessages with ActorLogging {
val message = if (t.isInstanceOf[CancelTaskException]) {
"Task was canceled"
} else {
- LOG.error("Could not instantiate task with execution ID " + executionID, t)
+ log.error("Could not instantiate task with execution ID " + executionID, t)
ExceptionUtils.stringifyException(t)
}
@@ -893,7 +884,7 @@ extends Actor with ActorLogMessages with ActorLogging {
libraryCacheManager foreach { _.unregisterTask(jobID, executionID) }
} catch {
- case t: Throwable => LOG.error("Error during cleanup of task deployment.", t)
+ case t: Throwable => log.error("Error during cleanup of task deployment.", t)
}
sender ! new TaskOperationResult(executionID, false, message)
@@ -924,7 +915,7 @@ extends Actor with ActorLogMessages with ActorLogging {
reader.updateInputChannel(partitionInfo)
} catch {
case t: Throwable =>
- LOG.error(s"Could not update input data location for task " +
+ log.error(s"Could not update input data location for task " +
s"${task.getTaskName}. Trying to fail task.", t)
try {
@@ -932,7 +923,7 @@ extends Actor with ActorLogMessages with ActorLogging {
}
catch {
case t: Throwable =>
- LOG.error("Failed canceling task with execution ID " + executionId +
+ log.error("Failed canceling task with execution ID " + executionId +
" after task update failure.", t)
}
}
@@ -951,8 +942,8 @@ extends Actor with ActorLogMessages with ActorLogging {
}
case None =>
- LOG.debug("Discard update for input partitions of task {} : task is no longer running.",
- executionId)
+ log.debug(s"Discard update for input partitions of task $executionId : " +
+ s"task is no longer running.")
sender ! Acknowledge
}
}
@@ -965,7 +956,7 @@ extends Actor with ActorLogMessages with ActorLogging {
*/
private def cancelAndClearEverything(cause: Throwable) {
if (runningTasks.size > 0) {
- LOG.info("Cancelling all computations and discarding all cached data.")
+ log.info("Cancelling all computations and discarding all cached data.")
for (t <- runningTasks.values) {
t.failExternally(cause)
@@ -983,22 +974,22 @@ extends Actor with ActorLogMessages with ActorLogging {
try {
task.failExternally(new Exception("Task is being removed from TaskManager"))
} catch {
- case e: Exception => LOG.error("Could not properly fail task", e)
+ case e: Exception => log.error("Could not properly fail task", e)
}
}
- LOG.info("Unregister task with execution ID {}.", executionID)
+ log.info(s"Unregister task with execution ID $executionID.")
removeAllTaskResources(task)
libraryCacheManager foreach { _.unregisterTask(task.getJobID, executionID) }
- LOG.info("Updating FINAL execution state of {} ({}) to {}.",
- task.getTaskName, task.getExecutionId, task.getExecutionState)
+ log.info(s"Updating FINAL execution state of ${task.getTaskName} " +
+ s"(${task.getExecutionId}) to ${task.getExecutionState}.")
self ! UpdateTaskExecutionState(new TaskExecutionState(
task.getJobID, task.getExecutionId, task.getExecutionState, task.getFailureCause))
case None =>
- LOG.debug("Cannot find task with ID {} to unregister.", executionID)
+ log.debug(s"Cannot find task with ID $executionID to unregister.")
}
}
@@ -1044,7 +1035,7 @@ extends Actor with ActorLogMessages with ActorLogging {
}
} catch {
// this is pretty unpleasant, but not a reason to give up immediately
- case e: Exception => LOG.error(
+ case e: Exception => log.error(
"Error cleaning up local temp files from the distributed cache.", e)
}
}
@@ -1060,14 +1051,14 @@ extends Actor with ActorLogMessages with ActorLogging {
*/
private def sendHeartbeatToJobManager(): Unit = {
try {
- LOG.debug("Sending heartbeat to JobManager")
+ log.debug("Sending heartbeat to JobManager")
val report: Array[Byte] = metricRegistryMapper.writeValueAsBytes(metricRegistry)
currentJobManager foreach {
jm => jm ! Heartbeat(instanceID, report)
}
}
catch {
- case e: Exception => LOG.warn("Error sending the metric heartbeat to the JobManager", e)
+ case e: Exception => log.warn("Error sending the metric heartbeat to the JobManager", e)
}
}
@@ -1093,7 +1084,7 @@ extends Actor with ActorLogMessages with ActorLogging {
recipient ! StackTrace(instanceID, stackTraceStr)
}
catch {
- case e: Exception => LOG.error("Failed to send stack trace to " + recipient.path, e)
+ case e: Exception => log.error("Failed to send stack trace to " + recipient.path, e)
}
}
@@ -1103,7 +1094,7 @@ extends Actor with ActorLogMessages with ActorLogging {
* @param cause The exception that caused the fatal problem.
*/
private def killTaskManagerFatal(message: String, cause: Throwable): Unit = {
- LOG.error("\n" +
+ log.error("\n" +
"==============================================================\n" +
"====================== FATAL =======================\n" +
"==============================================================\n" +
@@ -1121,7 +1112,7 @@ extends Actor with ActorLogMessages with ActorLogging {
object TaskManager {
/** TaskManager logger for synchronous logging (not through the logging actor) */
- val LOG = LoggerFactory.getLogger(classOf[TaskManager])
+ val LOG = Logger(classOf[TaskManager])
/** Return code for unsuccessful TaskManager startup */
val STARTUP_FAILURE_RETURN_CODE = 1
@@ -1159,7 +1150,7 @@ object TaskManager {
*/
def main(args: Array[String]): Unit = {
// startup checks and logging
- EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args)
+ EnvironmentInformation.logEnvironmentInfo(LOG.logger, "TaskManager", args)
EnvironmentInformation.checkJavaVersion()
// try to parse the command line arguments
@@ -1283,8 +1274,8 @@ object TaskManager {
LOG.info("Trying to select the network interface and address to use " +
"by connecting to the configured JobManager.")
- LOG.info("TaskManager will try to connect for {} seconds before falling back to heuristics",
- MAX_STARTUP_CONNECT_TIME)
+ LOG.info(s"TaskManager will try to connect for $MAX_STARTUP_CONNECT_TIME seconds before " +
+ "falling back to heuristics")
val jobManagerAddress = new InetSocketAddress(jobManagerHostname, jobManagerPort)
val taskManagerAddress = try {
@@ -1361,7 +1352,7 @@ object TaskManager {
// Bring up the TaskManager actor system first, bind it to the given address.
- LOG.info("Starting TaskManager actor system at {}:{}", taskManagerHostname, actorSystemPort)
+ LOG.info(s"Starting TaskManager actor system at $taskManagerHostname:$actorSystemPort")
val taskManagerSystem = try {
val akkaConfig = AkkaUtils.getAkkaConfig(configuration,
@@ -1400,7 +1391,7 @@ object TaskManager {
// the process reaper will kill the JVM process (to ensure easy failure detection)
LOG.debug("Starting TaskManager process reaper")
taskManagerSystem.actorOf(
- Props(classOf[ProcessReaper], taskManager, LOG, RUNTIME_FAILURE_RETURN_CODE),
+ Props(classOf[ProcessReaper], taskManager, LOG.logger, RUNTIME_FAILURE_RETURN_CODE),
"TaskManager_Process_Reaper")
// if desired, start the logging daemon that periodically logs the
@@ -1521,7 +1512,7 @@ object TaskManager {
"pick a fraction of the available memory.")
val memorySize = if (configuredMemory > 0) {
- LOG.info("Using {} MB for Flink managed memory.", configuredMemory)
+ LOG.info(s"Using $configuredMemory MB for Flink managed memory.")
configuredMemory << 20 // megabytes to bytes
}
else {
@@ -1534,8 +1525,8 @@ object TaskManager {
val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
fraction).toLong
- LOG.info("Using {} of the currently free heap space for Flink managed memory ({} MB).",
- fraction, relativeMemSize >> 20)
+ LOG.info(s"Using $fraction of the currently free heap space for Flink managed " +
+ s"memory (${relativeMemSize >> 20} MB).")
relativeMemSize
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala
index 51e99f9..f0079f8 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala
@@ -22,7 +22,7 @@ import java.lang.management.ManagementFactory
import java.util.concurrent.TimeUnit
import akka.actor.{Cancellable, ActorRef, Actor, ActorLogging}
-import org.apache.flink.runtime.ActorLogMessages
+import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
import org.apache.flink.runtime.execution.{RuntimeEnvironment, ExecutionState}
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
import org.apache.flink.runtime.jobgraph.JobVertexID
@@ -44,7 +44,7 @@ import scala.concurrent.duration.FiniteDuration
* @param reportInterval Interval of profiling action
*/
class TaskManagerProfiler(val instancePath: String, val reportInterval: Int)
- extends Actor with ActorLogMessages with ActorLogging {
+ extends Actor with ActorLogMessages with ActorSynchronousLogging {
import context.dispatcher
@@ -101,7 +101,7 @@ class TaskManagerProfiler(val instancePath: String, val reportInterval: Int)
Some(instanceProfiler.generateProfilingData(timestamp))
} catch {
case e: ProfilingException =>
- log.error(e, "Error while retrieving instance profiling data.")
+ log.error("Error while retrieving instance profiling data.", e)
None
}
@@ -133,7 +133,7 @@ class TaskManagerProfiler(val instancePath: String, val reportInterval: Int)
case _ =>
}
case None =>
- log.warning(s"Could not find environment for execution id $executionID.")
+ log.warn(s"Could not find environment for execution id $executionID.")
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
index b400bb9..d6760ec 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
@@ -23,7 +23,7 @@ import java.net.InetSocketAddress
import akka.actor._
import akka.pattern.ask
import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.ActorLogMessages
+import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus
@@ -34,8 +34,10 @@ import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.{Failure, Success}
-class ApplicationClient(flinkConfig: Configuration) extends Actor
- with ActorLogMessages with ActorLogging {
+class ApplicationClient(flinkConfig: Configuration)
+ extends Actor
+ with ActorLogMessages
+ with ActorSynchronousLogging {
import context._
val INITIAL_POLLING_DELAY = 0 seconds
@@ -78,8 +80,8 @@ class ApplicationClient(flinkConfig: Configuration) extends Actor
jobManagerFuture.onComplete {
case Success(jm) => self ! JobManagerActorRef(jm)
case Failure(t) =>
- log.error(t, "Registration at JobManager/ApplicationMaster failed. Shutting " +
- "ApplicationClient down.")
+ log.error("Registration at JobManager/ApplicationMaster failed. Shutting " +
+ "ApplicationClient down.", t)
// we could not connect to the job manager --> poison ourselves
self ! PoisonPill
@@ -93,7 +95,7 @@ class ApplicationClient(flinkConfig: Configuration) extends Actor
// sender as the Application Client (this class).
(jm ? RegisterClient(self))(timeout).onFailure{
case t: Throwable =>
- log.error(t, "Could not register at the job manager.")
+ log.error("Could not register at the job manager.", t)
self ! PoisonPill
}
@@ -144,7 +146,7 @@ class ApplicationClient(flinkConfig: Configuration) extends Actor
// ----------------- handle messages from the cluster -------------------
// receive remote messages
case msg: YarnMessage =>
- log.debug("Received new YarnMessage {}. Now {} messages in queue", msg, messagesQueue.size)
+ log.debug(s"Received new YarnMessage $msg. Now ${messagesQueue.size} messages in queue")
messagesQueue.enqueue(msg)
// locally forward messages
http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index 0fdef47..06e16dc 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -21,6 +21,7 @@ import java.io.{PrintWriter, FileWriter, BufferedWriter}
import java.security.PrivilegedAction
import akka.actor._
+import grizzled.slf4j.Logger
import org.apache.flink.client.CliFrontend
import org.apache.flink.configuration.{GlobalConfiguration, Configuration, ConfigConstants}
import org.apache.flink.runtime.akka.AkkaUtils
@@ -38,7 +39,7 @@ import scala.io.Source
object ApplicationMaster {
import scala.collection.JavaConversions._
- val LOG = LoggerFactory.getLogger(this.getClass)
+ val LOG = Logger(getClass)
val CONF_FILE = "flink-conf.yaml"
val MODIFIED_CONF_FILE = "flink-conf-modified.yaml"
@@ -50,9 +51,9 @@ object ApplicationMaster {
LOG.info(s"YARN daemon runs as ${UserGroupInformation.getCurrentUser.getShortUserName} " +
s"setting user to execute Flink ApplicationMaster/JobManager to ${yarnClientUsername}")
- EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster/JobManager", args)
+ EnvironmentInformation.logEnvironmentInfo(LOG.logger, "YARN ApplicationMaster/JobManager", args)
EnvironmentInformation.checkJavaVersion()
- org.apache.flink.runtime.util.SignalHandler.register(LOG)
+ org.apache.flink.runtime.util.SignalHandler.register(LOG.logger)
val ugi = UserGroupInformation.createRemoteUser(yarnClientUsername)
@@ -206,7 +207,7 @@ object ApplicationMaster {
(Configuration, ActorSystem, ActorRef, ActorRef) = {
LOG.info("Starting JobManager for YARN")
- LOG.info("Loading config from: {}", currDir)
+ LOG.info(s"Loading config from: $currDir.")
GlobalConfiguration.loadConfiguration(currDir)
val configuration = GlobalConfiguration.getConfiguration()
http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
index f384130..26d1f69 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
@@ -98,7 +98,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
def receiveYarnMessages: Receive = {
case StopYarnSession(status, diag) =>
- log.info("Stopping YARN JobManager with status {} and diagnostic {}.", status, diag)
+ log.info(s"Stopping YARN JobManager with status $status and diagnostic $diag.")
instanceManager.getAllRegisteredInstances.asScala foreach {
instance =>
@@ -108,11 +108,11 @@ trait ApplicationMasterActor extends ActorLogMessages {
rmClientOption foreach {
rmClient =>
Try(rmClient.unregisterApplicationMaster(status, diag, "")).recover{
- case t: Throwable => log.error(t, "Could not unregister the application master.")
+ case t: Throwable => log.error("Could not unregister the application master.", t)
}
Try(rmClient.close()).recover{
- case t:Throwable => log.error(t, "Could not close the AMRMClient.")
+ case t:Throwable => log.error("Could not close the AMRMClient.", t)
}
}
@@ -121,7 +121,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
nmClientOption foreach {
nmClient =>
Try(nmClient.close()).recover{
- case t: Throwable => log.error(t, "Could not close the NMClient.")
+ case t: Throwable => log.error("Could not close the NMClient.", t)
}
}
@@ -133,7 +133,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
context.system.shutdown()
case RegisterClient(client) =>
- log.info("Register {} as client.", client.path)
+ log.info(s"Register ${client.path} as client.")
messageListener = Some(client)
sender ! Acknowledge
@@ -142,7 +142,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
case msg: StopAMAfterJob =>
val jobId = msg.jobId
- log.info("ApplicatonMaster will shut down YARN session when job {} has finished", jobId)
+ log.info(s"ApplicatonMaster will shut down YARN session when job $jobId has finished.")
stopWhenJobFinished = jobId
sender() ! Acknowledge
@@ -155,26 +155,26 @@ trait ApplicationMasterActor extends ActorLogMessages {
startYarnSession(conf, actorSystemPort, webServerPort)
case jnf: JobNotFound =>
- LOG.warn("Job with ID {} not found in JobManager", jnf.jobID)
+ log.warn(s"Job with ID ${jnf.jobID} not found in JobManager")
if(stopWhenJobFinished == null) {
- LOG.warn("The ApplicationMaster didn't expect to receive this message")
+ log.warn("The ApplicationMaster didn't expect to receive this message")
}
case jobStatus: CurrentJobStatus =>
if(stopWhenJobFinished == null) {
- LOG.warn("Received job status {} which wasn't requested", jobStatus)
+ log.warn(s"Received job status $jobStatus which wasn't requested.")
} else {
if(stopWhenJobFinished != jobStatus.jobID) {
- LOG.warn(s"Received job status for job ${jobStatus.jobID} but expected status for " +
+ log.warn(s"Received job status for job ${jobStatus.jobID} but expected status for " +
s"job $stopWhenJobFinished")
} else {
if(jobStatus.status.isTerminalState) {
- LOG.info(s"Job with ID ${jobStatus.jobID} is in terminal state ${jobStatus.status}. " +
+ log.info(s"Job with ID ${jobStatus.jobID} is in terminal state ${jobStatus.status}. " +
s"Shutting down YARN session")
self ! StopYarnSession(FinalApplicationStatus.SUCCEEDED,
s"The monitored job with ID ${jobStatus.jobID} has finished.")
} else {
- LOG.debug(s"Monitored job with ID ${jobStatus.jobID} is in state ${jobStatus.status}")
+ log.debug(s"Monitored job with ID ${jobStatus.jobID} is in state ${jobStatus.status}")
}
}
}
@@ -229,14 +229,14 @@ trait ApplicationMasterActor extends ActorLogMessages {
})
}
// return containers if the RM wants them and we haven't allocated them yet.
- val preemtionMessage = response.getPreemptionMessage
- if(preemtionMessage != null) {
- log.info("Received preemtion message from YARN {}", preemtionMessage)
- val contract = preemtionMessage.getContract
+ val preemptionMessage = response.getPreemptionMessage
+ if(preemptionMessage != null) {
+ log.info(s"Received preemtion message from YARN $preemptionMessage.")
+ val contract = preemptionMessage.getContract
if(contract != null) {
tryToReturnContainers(contract.getContainers.asScala)
}
- val strictContract = preemtionMessage.getStrictContract
+ val strictContract = preemptionMessage.getStrictContract
if(strictContract != null) {
tryToReturnContainers(strictContract.getContainers.asScala)
}
@@ -247,13 +247,13 @@ trait ApplicationMasterActor extends ActorLogMessages {
// check if we want to start some of our allocated containers.
if(runningContainers < numTaskManager) {
var missingContainers = numTaskManager - runningContainers
- log.info("The user requested {} containers, {} running. {} containers missing",
- numTaskManager, runningContainers, missingContainers)
+ log.info(s"The user requested $numTaskManager containers, $runningContainers " +
+ s"running. $missingContainers containers missing")
// not enough containers running
if(allocatedContainersList.size > 0) {
- log.info("{} containers already allocated by YARN. Starting...",
- allocatedContainersList.size)
+ log.info(s"${allocatedContainersList.size} containers already allocated by YARN. " +
+ "Starting...")
// we have some containers allocated to us --> start them
allocatedContainersList = allocatedContainersList.dropWhile(container => {
if (missingContainers <= 0) {
@@ -279,7 +279,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
}
} catch {
case e: YarnException =>
- log.error(e, "Exception while starting YARN container")
+ log.error("Exception while starting YARN container", e)
}
}
case None =>
@@ -305,24 +305,24 @@ trait ApplicationMasterActor extends ActorLogMessages {
log.info(s"There are $missingContainers containers missing." +
s" $numPendingRequests are already requested. " +
s"Requesting $toAllocateFromYarn additional container(s) from YARN. " +
- s"Reallocation of failed containers is enabled=$reallocate ('{}')",
- ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS)
+ s"Reallocation of failed containers is enabled=$reallocate " +
+ s"('${ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS}')")
// there are still containers missing. Request them from YARN
if(reallocate) {
for(i <- 1 to toAllocateFromYarn) {
val containerRequest = getContainerRequest(memoryPerTaskManager)
rmClient.addContainerRequest(containerRequest)
numPendingRequests += 1
- log.info("Requested additional container from YARN. Pending requests {}",
- numPendingRequests)
+ log.info("Requested additional container from YARN. Pending requests " +
+ s"$numPendingRequests.")
}
}
}
}
if(runningContainers >= numTaskManager && allocatedContainersList.size > 0) {
- log.info("Flink has {} allocated containers which are not needed right now. " +
- "Returning them", allocatedContainersList.size)
+ log.info(s"Flink has ${allocatedContainersList.size} allocated containers which " +
+ s"are not needed right now. Returning them")
for(container <- allocatedContainersList) {
rmClient.releaseAssignedContainer(container.getId)
}
@@ -353,9 +353,9 @@ trait ApplicationMasterActor extends ActorLogMessages {
self ! StopYarnSession(FinalApplicationStatus.FAILED, "Fatal error in AM: AMRMClient " +
"was not set")
}
- log.debug("Processed Heartbeat with RMClient. Running containers {}," +
- "failed containers {}, allocated containers {}", runningContainers, failedContainers,
- allocatedContainersList.size)
+ log.debug(s"Processed Heartbeat with RMClient. Running containers $runningContainers, " +
+ s"failed containers $failedContainers, " +
+ s"allocated containers ${allocatedContainersList.size}.")
}
private def runningContainerIds(): mutable.MutableList[ContainerId] = {
@@ -381,17 +381,16 @@ trait ApplicationMasterActor extends ActorLogMessages {
YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS), MILLISECONDS)
if(YARN_HEARTBEAT_DELAY.gteq(yarnExpiryInterval)) {
- log.warning("The heartbeat interval of the Flink Application master ({}) is greater than " +
- "YARN's expiry interval ({}). The application is likely to be killed by YARN.",
- YARN_HEARTBEAT_DELAY, yarnExpiryInterval)
+ log.warn(s"The heartbeat interval of the Flink Application master " +
+ s"($YARN_HEARTBEAT_DELAY) is greater than YARN's expiry interval " +
+ s"($yarnExpiryInterval). The application is likely to be killed by YARN.")
}
numTaskManager = env.get(FlinkYarnClient.ENV_TM_COUNT).toInt
maxFailedContainers = flinkConfiguration.
getInteger(ConfigConstants.YARN_MAX_FAILED_CONTAINERS, numTaskManager)
- log.info("Requesting {} TaskManagers. Tolerating {} failed TaskManagers",
- numTaskManager, maxFailedContainers)
-
+ log.info(s"Requesting $numTaskManager TaskManagers. Tolerating $maxFailedContainers failed " +
+ "TaskManagers")
val remoteFlinkJarPath = env.get(FlinkYarnClient.FLINK_JAR_PATH)
val fs = FileSystem.get(conf)
@@ -468,7 +467,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
context.system.scheduler.scheduleOnce(FAST_YARN_HEARTBEAT_DELAY, self, HeartbeatWithYarn)
} recover {
case t: Throwable =>
- log.error(t, "Could not start yarn session.")
+ log.error("Could not start yarn session.", t)
self ! StopYarnSession(FinalApplicationStatus.FAILED,
s"ApplicationMaster failed while starting. Exception Message: ${t.getMessage}")
}
@@ -479,7 +478,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
allocatedContainersList = allocatedContainersList.dropWhile( container => {
val result = requestedBackContainers.getId.equals(container.getId)
if(result) {
- log.info("Returning container {} back to ResourceManager.", container)
+ log.info(s"Returning container $container back to ResourceManager.")
}
result
})
@@ -548,7 +547,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
ctx.setTokens(securityTokens)
} catch {
case t: Throwable =>
- log.error(t, "Getting current user info failed when trying to launch the container")
+ log.error("Getting current user info failed when trying to launch the container", t)
}
ctx
http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 524e45f..1278386 100644
--- a/pom.xml
+++ b/pom.xml
@@ -275,6 +275,12 @@ under the License.
</dependency>
<dependency>
+ <groupId>org.clapper</groupId>
+ <artifactId>grizzled-slf4j_${scala.binary.version}</artifactId>
+ <version>1.0.2</version>
+ </dependency>
+
+ <dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_${scala.binary.version}</artifactId>
<version>${akka.version}</version>