You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/04/29 11:50:31 UTC

[2/3] flink git commit: [FLINK-1923] [runtime] Replaces asynchronous logging with synchronous logging using grizzled-slf4j wrapper for Scala.

[FLINK-1923] [runtime] Replaces asynchronous logging with synchronous logging using grizzled-slf4j wrapper for Scala.

This closes #628


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5a2ca819
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5a2ca819
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5a2ca819

Branch: refs/heads/master
Commit: 5a2ca81912193552e74cd6a33637b7254e5a7174
Parents: ccd574a
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Apr 24 16:33:34 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Apr 29 10:40:39 2015 +0200

----------------------------------------------------------------------
 flink-dist/src/main/flink-bin/LICENSE           |   1 +
 flink-runtime/pom.xml                           |   5 +
 .../apache/flink/runtime/taskmanager/Task.java  |  11 --
 .../apache/flink/runtime/ActorLogMessages.scala |   5 +-
 .../flink/runtime/ActorSynchronousLogging.scala |  31 ++++
 .../flink/runtime/jobmanager/JobManager.scala   |  91 +++++-----
 .../runtime/jobmanager/JobManagerProfiler.scala |  11 +-
 .../runtime/jobmanager/MemoryArchivist.scala    |  10 +-
 .../StreamCheckpointCoordinator.scala           |  14 +-
 .../flink/runtime/taskmanager/TaskManager.scala | 167 +++++++++----------
 .../taskmanager/TaskManagerProfiler.scala       |   8 +-
 .../apache/flink/yarn/ApplicationClient.scala   |  16 +-
 .../apache/flink/yarn/ApplicationMaster.scala   |   9 +-
 .../flink/yarn/ApplicationMasterActor.scala     |  79 +++++----
 pom.xml                                         |   6 +
 15 files changed, 243 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/flink-dist/src/main/flink-bin/LICENSE
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/LICENSE b/flink-dist/src/main/flink-bin/LICENSE
index d66922c..1236381 100644
--- a/flink-dist/src/main/flink-bin/LICENSE
+++ b/flink-dist/src/main/flink-bin/LICENSE
@@ -308,6 +308,7 @@ BSD-style licenses:
  - Scala Compiler Reflect (http://www.scala-lang.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.
  - Scala Quasiquotes (http://scalamacros.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.
  - ASM (http://asm.ow2.org/) - Copyright (c) 2000-2011 INRIA, France Telecom
+ - Grizzled SLF4J (http://software.clapper.org/grizzled-slf4j/) - Copyright (c) 2010 Brian M. Clapper
 
 
 (Below is the 3-clause BSD license)

http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index f490ed9..56ed0d2 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -148,6 +148,11 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>org.clapper</groupId>
+			<artifactId>grizzled-slf4j_${scala.binary.version}</artifactId>
+		</dependency>
+
+		<dependency>
 			<groupId>org.scalatest</groupId>
 			<artifactId>scalatest_${scala.binary.version}</artifactId>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index f6eb907..e6eee5b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -30,7 +30,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
-import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.messages.TaskMessages.UnregisterTask;
 import org.apache.flink.runtime.profiling.TaskManagerProfiler;
 import org.apache.flink.util.ExceptionUtils;
@@ -357,16 +356,6 @@ public class Task {
 		taskManager.tell(new UnregisterTask(executionId), ActorRef.noSender());
 	}
 
-	protected void notifyExecutionStateChange(ExecutionState executionState,
-											Throwable optionalError) {
-		LOG.info("Update execution state of {} ({}) to {}.", this.getTaskName(),
-				this.getExecutionId(), executionState);
-		taskManager.tell(new TaskMessages.UpdateTaskExecutionState(
-				new TaskExecutionState(jobId, executionId, executionState, optionalError)),
-				ActorRef.noSender());
-
-	}
-
 	// -----------------------------------------------------------------------------------------------------------------
 	//                                        Task Profiling
 	// -----------------------------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
index acd4346..c74c339 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
@@ -19,13 +19,12 @@
 package org.apache.flink.runtime
 
 import _root_.akka.actor.Actor
-import _root_.akka.event.LoggingAdapter
 
 /**
  * Mixin to add debug message logging
  */
 trait ActorLogMessages {
-  that: Actor =>
+  that: Actor with ActorSynchronousLogging =>
 
   override def receive: Receive = new Actor.Receive {
     private val _receiveWithLogMessages = receiveWithLogMessages
@@ -50,6 +49,4 @@ trait ActorLogMessages {
   }
 
   def receiveWithLogMessages: Receive
-
-  protected def log: LoggingAdapter
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorSynchronousLogging.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorSynchronousLogging.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorSynchronousLogging.scala
new file mode 100644
index 0000000..4d3a988
--- /dev/null
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorSynchronousLogging.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime
+
+import _root_.akka.actor.Actor
+import grizzled.slf4j.Logger
+
+/** Adds a logger to an [[akka.actor.Actor]] implementation
+  *
+  */
+trait ActorSynchronousLogging {
+  self: Actor =>
+
+  lazy val log = Logger(getClass)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 6870d95..83f9e35 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress
 import java.util.Collections
 
 import akka.actor.Status.{Success, Failure}
+import grizzled.slf4j.Logger
 import org.apache.flink.api.common.{JobID, ExecutionConfig}
 import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration}
 import org.apache.flink.core.io.InputSplitAssigner
@@ -42,7 +43,7 @@ import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util.{SerializedValue, EnvironmentInformation}
-import org.apache.flink.runtime.ActorLogMessages
+import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.instance.InstanceManager
@@ -101,10 +102,7 @@ class JobManager(val flinkConfiguration: Configuration,
                  val defaultExecutionRetries: Int,
                  val delayBetweenRetries: Long,
                  val timeout: FiniteDuration)
-  extends Actor with ActorLogMessages with ActorLogging {
-
-  /** Reference to the log, for debugging */
-  val LOG = JobManager.LOG
+  extends Actor with ActorLogMessages with ActorSynchronousLogging {
 
   /** List of current jobs running jobs */
   val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
@@ -114,7 +112,7 @@ class JobManager(val flinkConfiguration: Configuration,
    * Run when the job manager is started. Simply logs an informational message.
    */
   override def preStart(): Unit = {
-    LOG.info(s"Starting JobManager at ${self.path.toSerializationFormat}.")
+    log.info(s"Starting JobManager at ${self.path.toSerializationFormat}.")
   }
 
   override def postStop(): Unit = {
@@ -138,12 +136,11 @@ class JobManager(val flinkConfiguration: Configuration,
     try {
       libraryCacheManager.shutdown()
     } catch {
-      case e: IOException => log.error(e, "Could not properly shutdown the library cache manager.")
+      case e: IOException => log.error("Could not properly shutdown the library cache manager.", e)
     }
 
-    if (log.isDebugEnabled) {
-      log.debug("Job manager {} is completely stopped.", self.path)
-    }
+    log.debug(s"Job manager ${self.path} is completely stopped.")
+
   }
 
   /**
@@ -179,7 +176,7 @@ class JobManager(val flinkConfiguration: Configuration,
           // registerTaskManager throws an IllegalStateException if it is already shut down
           // let the actor crash and restart itself in this case
           case e: Exception =>
-            log.error(e, "Failed to register TaskManager at instance manager")
+            log.error("Failed to register TaskManager at instance manager", e)
 
             // IMPORTANT: Send the response to the "sender", which is not the
             //            TaskManager actor, but the ask future!
@@ -197,7 +194,7 @@ class JobManager(val flinkConfiguration: Configuration,
       submitJob(jobGraph, listenToEvents = listen)
 
     case CancelJob(jobID) =>
-      log.info("Trying to cancel job with ID {}.", jobID)
+      log.info(s"Trying to cancel job with ID $jobID.")
 
       currentJobs.get(jobID) match {
         case Some((executionGraph, _)) =>
@@ -208,7 +205,7 @@ class JobManager(val flinkConfiguration: Configuration,
 
           sender ! CancellationSuccess(jobID)
         case None =>
-          log.info("No job found with ID {}.", jobID)
+          log.info(s"No job found with ID $jobID.")
           sender ! CancellationFailure(jobID, new IllegalArgumentException("No job found with " +
             s"ID $jobID."))
       }
@@ -227,8 +224,9 @@ class JobManager(val flinkConfiguration: Configuration,
             }(context.dispatcher)
 
             sender ! true
-          case None => log.error("Cannot find execution graph for ID {} to change state to {}.",
-            taskExecutionState.getJobID, taskExecutionState.getExecutionState)
+          case None => log.error("Cannot find execution graph for ID " +
+            s"${taskExecutionState.getJobID} to change state to " +
+            s"${taskExecutionState.getExecutionState}.")
             sender ! false
         }
       }
@@ -239,7 +237,7 @@ class JobManager(val flinkConfiguration: Configuration,
           val execution = executionGraph.getRegisteredExecutions.get(executionAttempt)
 
           if (execution == null) {
-            log.error("Can not find Execution for attempt {}.", executionAttempt)
+            log.error(s"Can not find Execution for attempt $executionAttempt.")
             null
           } else {
             val slot = execution.getAssignedResource
@@ -256,32 +254,30 @@ class JobManager(val flinkConfiguration: Configuration,
                 case splitAssigner: InputSplitAssigner =>
                   val nextInputSplit = splitAssigner.getNextInputSplit(host, taskId)
 
-                  if (log.isDebugEnabled) {
-                    log.debug("Send next input split {}.", nextInputSplit)
-                  }
+                  log.debug(s"Send next input split $nextInputSplit.")
 
                   try {
                     InstantiationUtil.serializeObject(nextInputSplit)
                   } catch {
                     case ex: Exception =>
-                      log.error(ex, "Could not serialize the next input split of class {}.",
-                        nextInputSplit.getClass)
+                      log.error(s"Could not serialize the next input split of " +
+                        s"class ${nextInputSplit.getClass}.", ex)
                       vertex.fail(new RuntimeException("Could not serialize the next input split " +
                         "of class " + nextInputSplit.getClass + ".", ex))
                       null
                   }
 
                 case _ =>
-                  log.error("No InputSplitAssigner for vertex ID {}.", vertexID)
+                  log.error(s"No InputSplitAssigner for vertex ID $vertexID.")
                   null
               }
               case _ =>
-                log.error("Cannot find execution vertex for vertex ID {}.", vertexID)
+                log.error(s"Cannot find execution vertex for vertex ID $vertexID.")
                 null
           }
         }
         case None =>
-          log.error("Cannot find execution graph for job ID {}.", jobID)
+          log.error(s"Cannot find execution graph for job ID $jobID.")
           null
       }
 
@@ -290,9 +286,8 @@ class JobManager(val flinkConfiguration: Configuration,
     case JobStatusChanged(jobID, newJobStatus, timeStamp, error) =>
       currentJobs.get(jobID) match {
         case Some((executionGraph, jobInfo)) => executionGraph.getJobName
-          log.info("Status of job {} ({}) changed to {} {}.",
-            jobID, executionGraph.getJobName, newJobStatus,
-            if (error == null) "" else error.getMessage)
+          log.info(s"Status of job $jobID (${executionGraph.getJobName}) changed to $newJobStatus" +
+            s" ${if (error == null) "" else error.getMessage}.")
 
           if (newJobStatus.isTerminalState) {
             jobInfo.end = timeStamp
@@ -304,7 +299,7 @@ class JobManager(val flinkConfiguration: Configuration,
                   accumulatorManager.getJobAccumulatorResultsSerialized(jobID)
                 } catch {
                   case e: Exception =>
-                    log.error(e, "Cannot fetch serialized accumulators for job {}", jobID)
+                    log.error(s"Cannot fetch serialized accumulators for job $jobID", e)
                     Collections.emptyMap()
                 }
                 val result = new SerializedJobExecutionResult(jobID, jobInfo.duration,
@@ -352,8 +347,8 @@ class JobManager(val flinkConfiguration: Configuration,
           sender ! Acknowledge
           executionGraph.scheduleOrUpdateConsumers(partitionId)
         case None =>
-          log.error("Cannot find execution graph for job ID {} to schedule or update consumers",
-            jobId)
+          log.error(s"Cannot find execution graph for job ID $jobId to schedule or update " +
+            s"consumers.")
           sender ! Failure(new IllegalStateException("Cannot find execution graph for job ID " +
             s"$jobId to schedule or update consumers."))
       }
@@ -383,7 +378,7 @@ class JobManager(val flinkConfiguration: Configuration,
         sender ! RunningJobsStatus(jobs)
       }
       catch {
-        case t: Throwable => LOG.error("Exception while responding to RequestRunningJobsStatus", t)
+        case t: Throwable => log.error("Exception while responding to RequestRunningJobsStatus", t)
       }
 
     case RequestJob(jobID) =>
@@ -403,10 +398,10 @@ class JobManager(val flinkConfiguration: Configuration,
 
     case Heartbeat(instanceID, metricsReport) =>
       try {
-        log.debug("Received hearbeat message from {}", instanceID)
+        log.debug(s"Received hearbeat message from $instanceID.")
         instanceManager.reportHeartBeat(instanceID, metricsReport)
       } catch {
-        case t: Throwable => log.error(t, "Could not report heart beat from {}.", sender().path)
+        case t: Throwable => log.error(s"Could not report heart beat from ${sender().path}.", t)
       }
 
     case message: AccumulatorMessage => handleAccumulatorMessage(message)
@@ -417,7 +412,7 @@ class JobManager(val flinkConfiguration: Configuration,
 
     case Terminated(taskManager) =>
       if (instanceManager.isRegistered(taskManager)) {
-        log.info("Task manager {} terminated.", taskManager.path)
+        log.info(s"Task manager ${taskManager.path} terminated.")
 
         instanceManager.unregisterTaskManager(taskManager)
         context.unwatch(taskManager)
@@ -430,7 +425,7 @@ class JobManager(val flinkConfiguration: Configuration,
       val taskManager = sender()
 
       if (instanceManager.isRegistered(taskManager)) {
-        log.info("Task manager {} wants to disconnect, because {}.", taskManager.path, msg)
+        log.info(s"Task manager ${taskManager.path} wants to disconnect, because $msg.")
 
         instanceManager.unregisterTaskManager(taskManager)
         context.unwatch(taskManager)
@@ -590,7 +585,7 @@ class JobManager(val flinkConfiguration: Configuration,
         }
         catch {
           case tt: Throwable => {
-            log.error(tt, "Error while marking ExecutionGraph as failed.")
+            log.error("Error while marking ExecutionGraph as failed.", tt)
           }
         }
       }
@@ -618,7 +613,7 @@ class JobManager(val flinkConfiguration: Configuration,
           libraryCacheManager.getClassLoader(jobId)
         } catch {
           case e: Exception =>
-            log.error(e, "Dropping accumulators. No class loader available for job " + jobId)
+            log.error("Dropping accumulators. No class loader available for job " + jobId, e)
             return
         }
 
@@ -628,7 +623,7 @@ class JobManager(val flinkConfiguration: Configuration,
             accumulatorManager.processIncomingAccumulators(jobId, accumulators)
           }
           catch {
-            case e: Exception => log.error(e, "Cannot update accumulators for job " + jobId)
+            case e: Exception => log.error("Cannot update accumulators for job " + jobId, e)
           }
         } else {
           log.error("Dropping accumulators. No class loader available for job " + jobId)
@@ -643,7 +638,7 @@ class JobManager(val flinkConfiguration: Configuration,
         }
         catch {
           case e: Exception =>
-            log.error(e, "Cannot serialize accumulator result")
+            log.error("Cannot serialize accumulator result", e)
             sender() ! AccumulatorResultsErroneous(jobID, e)
         }
 
@@ -656,7 +651,7 @@ class JobManager(val flinkConfiguration: Configuration,
         }
         catch {
           case e: Exception =>
-            log.error(e, "Cannot fetch accumulator result")
+            log.error("Cannot fetch accumulator result", e)
             sender() ! AccumulatorResultsErroneous(jobId, e)
         }
 
@@ -676,8 +671,8 @@ class JobManager(val flinkConfiguration: Configuration,
 
           archive ! ArchiveExecutionGraph(jobID, eg)
         } catch {
-          case t: Throwable => log.error(t, "Could not prepare the execution graph {} for " +
-            "archiving.", eg)
+          case t: Throwable => log.error(s"Could not prepare the execution graph $eg for " +
+            "archiving.", t)
         }
 
       case None =>
@@ -687,7 +682,7 @@ class JobManager(val flinkConfiguration: Configuration,
       libraryCacheManager.unregisterJob(jobID)
     } catch {
       case t: Throwable =>
-        log.error(t, "Could not properly unregister job {} form the library cache.", jobID)
+        log.error(s"Could not properly unregister job $jobID form the library cache.", t)
     }
   }
 }
@@ -699,7 +694,7 @@ class JobManager(val flinkConfiguration: Configuration,
  */
 object JobManager {
 
-  val LOG = LoggerFactory.getLogger(classOf[JobManager])
+  val LOG = Logger(classOf[JobManager])
 
   val STARTUP_FAILURE_RETURN_CODE = 1
   val RUNTIME_FAILURE_RETURN_CODE = 2
@@ -717,7 +712,7 @@ object JobManager {
    */
   def main(args: Array[String]): Unit = {
     // startup checks and logging
-    EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager", args)
+    EnvironmentInformation.logEnvironmentInfo(LOG.logger, "JobManager", args)
     EnvironmentInformation.checkJavaVersion()
 
     // parsing the command line arguments
@@ -798,7 +793,7 @@ object JobManager {
     LOG.info("Starting JobManager")
 
     // Bring up the job manager actor system first, bind it to the given address.
-    LOG.info("Starting JobManager actor system at {}:{}", listeningAddress, listeningPort)
+    LOG.info(s"Starting JobManager actor system at $listeningAddress:$listeningPort.")
 
     val jobManagerSystem = try {
       val akkaConfig = AkkaUtils.getAkkaConfig(configuration,
@@ -831,7 +826,7 @@ object JobManager {
       // the process reaper will kill the JVM process (to ensure easy failure detection)
       LOG.debug("Starting JobManager process reaper")
       jobManagerSystem.actorOf(
-        Props(classOf[ProcessReaper], jobManager, LOG, RUNTIME_FAILURE_RETURN_CODE),
+        Props(classOf[ProcessReaper], jobManager, LOG.logger, RUNTIME_FAILURE_RETURN_CODE),
         "JobManager_Process_Reaper")
 
       // bring up a local task manager, if needed
@@ -847,7 +842,7 @@ object JobManager {
 
         LOG.debug("Starting TaskManager process reaper")
         jobManagerSystem.actorOf(
-          Props(classOf[ProcessReaper], taskManagerActor, LOG, RUNTIME_FAILURE_RETURN_CODE),
+          Props(classOf[ProcessReaper], taskManagerActor, LOG.logger, RUNTIME_FAILURE_RETURN_CODE),
           "TaskManager_Process_Reaper")
       }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala
index eb8f913..dd3a1b7 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala
@@ -18,18 +18,21 @@
 
 package org.apache.flink.runtime.jobmanager
 
-import akka.actor.{ActorLogging, Actor}
-import org.apache.flink.runtime.ActorLogMessages
+import akka.actor.Actor
+import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
 import org.apache.flink.runtime.messages.JobManagerProfilerMessages.ReportProfilingData
 import org.apache.flink.runtime.profiling.impl.types.{InternalInstanceProfilingData, InternalExecutionVertexThreadProfilingData}
-import org.apache.flink.runtime.profiling.types.ThreadProfilingEvent
 
 import scala.collection.convert.WrapAsScala
 
 /**
  * Basic skeleton for the JobManager profiler. Currently, it simply logs the received messages.
  */
-class JobManagerProfiler extends Actor with ActorLogMessages with ActorLogging with WrapAsScala {
+class JobManagerProfiler
+  extends Actor
+  with ActorLogMessages
+  with ActorSynchronousLogging
+  with WrapAsScala {
   override def receiveWithLogMessages: Receive = {
     case ReportProfilingData(profilingContainer) =>
       profilingContainer.getIterator foreach {

http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index cb7bfec..9e71ebb 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -18,9 +18,9 @@
 
 package org.apache.flink.runtime.jobmanager
 
-import akka.actor.{ActorLogging, Actor}
+import akka.actor.Actor
 import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.ActorLogMessages
+import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
 import org.apache.flink.runtime.executiongraph.ExecutionGraph
 import org.apache.flink.runtime.messages.ArchiveMessages._
 import org.apache.flink.runtime.messages.JobManagerMessages._
@@ -48,8 +48,10 @@ import scala.ref.SoftReference
  *
  * @param max_entries Maximum number of stored Flink jobs
  */
-class MemoryArchivist(private val max_entries: Int) extends Actor with ActorLogMessages with
-ActorLogging {
+class MemoryArchivist(private val max_entries: Int)
+  extends Actor
+  with ActorLogMessages
+  with ActorSynchronousLogging {
   /*
    * Map of execution graphs belonging to recently started jobs with the time stamp of the last
    * received job event. The insert order is preserved through a LinkedHashMap.

http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
index 48266e2..8bb1274 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager
 import java.lang.{Long => JLong}
 
 import akka.actor._
-import org.apache.flink.runtime.ActorLogMessages
+import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
 import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionVertex}
 import org.apache.flink.runtime.jobgraph.JobStatus._
@@ -69,7 +69,7 @@ class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph,
                                   val interval: FiniteDuration,
                                   var curId: JLong,
                                   var ackId: JLong)
-extends Actor with ActorLogMessages with ActorLogging {
+extends Actor with ActorLogMessages with ActorSynchronousLogging {
 
   implicit private val executor = context.dispatcher
 
@@ -78,13 +78,13 @@ extends Actor with ActorLogMessages with ActorLogging {
     case InitBarrierScheduler =>
       context.system.scheduler.schedule(interval,interval,self,BarrierTimeout)
       context.system.scheduler.schedule(2 * interval,2 * interval,self,CompactAndUpdate)
-      log.info("Started Stream State Monitor for job {}{}",
-        executionGraph.getJobID,executionGraph.getJobName)
+      log.info("Started Stream State Monitor for job " +
+        s"${executionGraph.getJobID}${executionGraph.getJobName}")
       
     case BarrierTimeout =>
       executionGraph.getState match {
         case FAILED | CANCELED | FINISHED =>
-          log.info("Stopping monitor for terminated job {}", executionGraph.getJobID)
+          log.info(s"Stopping monitor for terminated job ${executionGraph.getJobID}.")
           self ! PoisonPill
         case RUNNING =>
           curId += 1
@@ -94,8 +94,8 @@ extends Actor with ActorLogMessages with ActorLogging {
           => vertex.getCurrentAssignedResource.getInstance.getTaskManager
                     ! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId,curId))
         case _ =>
-          log.debug("Omitting sending barrier since graph is in {} state for job {}",
-            executionGraph.getState, executionGraph.getJobID)
+          log.debug("Omitting sending barrier since graph is in " +
+            s"${executionGraph.getState} state for job ${executionGraph.getJobID}.")
       }
       
     case StateBarrierAck(jobID, jobVertexID, instanceID, checkpointID, opState) =>

http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index d6b91ec..511de6b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -33,11 +33,12 @@ import com.codahale.metrics.json.MetricsModule
 import com.codahale.metrics.jvm.{MemoryUsageGaugeSet, GarbageCollectorMetricSet}
 
 import com.fasterxml.jackson.databind.ObjectMapper
+import grizzled.slf4j.Logger
 
 import org.apache.flink.api.common.cache.DistributedCache
 import org.apache.flink.configuration._
 import org.apache.flink.core.fs.Path
-import org.apache.flink.runtime.ActorLogMessages
+import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.blob.{BlobService, BlobCache}
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager
@@ -67,8 +68,6 @@ import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
 import org.apache.flink.runtime.util.{MathUtils, EnvironmentInformation}
 import org.apache.flink.util.ExceptionUtils
 
-import org.slf4j.LoggerFactory
-
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.util.{Failure, Success}
@@ -125,10 +124,7 @@ class TaskManager(protected val config: TaskManagerConfiguration,
                   protected val network: NetworkEnvironment,
                   protected val numberOfSlots: Int)
 
-extends Actor with ActorLogMessages with ActorLogging {
-
-  /** The log for all synchronous logging calls */
-  private val LOG = TaskManager.LOG
+extends Actor with ActorLogMessages with ActorSynchronousLogging {
 
   /** The timeout for all actor ask futures */
   protected val askTimeout = new Timeout(config.timeout)
@@ -178,13 +174,13 @@ extends Actor with ActorLogMessages with ActorLogging {
    * JobManager.
    */
   override def preStart(): Unit = {
-    LOG.info("Starting TaskManager actor at {}.", self.path.toSerializationFormat)
-    LOG.info("TaskManager data connection information: {}", connectionInfo)
-    LOG.info("TaskManager has {} task slot(s).", numberOfSlots)
+    log.info(s"Starting TaskManager actor at ${self.path.toSerializationFormat}.")
+    log.info(s"TaskManager data connection information: $connectionInfo")
+    log.info(s"TaskManager has $numberOfSlots task slot(s).")
 
     // log the initial memory utilization
-    if (LOG.isInfoEnabled) {
-      LOG.info(TaskManager.getMemoryUsageStatsAsString(ManagementFactory.getMemoryMXBean))
+    if (log.isInfoEnabled) {
+      log.info(TaskManager.getMemoryUsageStatsAsString(ManagementFactory.getMemoryMXBean))
     }
 
     // kick off the registration
@@ -200,7 +196,7 @@ extends Actor with ActorLogMessages with ActorLogging {
    * (like network stack, library cache, memory manager, ...) are properly shut down.
    */
   override def postStop(): Unit = {
-    LOG.info("Stopping TaskManager {}.", self.path.toSerializationFormat)
+    log.info(s"Stopping TaskManager ${self.path.toSerializationFormat}.")
 
     cancelAndClearEverything(new Exception("TaskManager is shutting down."))
 
@@ -208,35 +204,35 @@ extends Actor with ActorLogMessages with ActorLogging {
       try {
         disassociateFromJobManager()
       } catch {
-        case t: Exception => LOG.error("Could not cleanly disassociate from JobManager", t)
+        case t: Exception => log.error("Could not cleanly disassociate from JobManager", t)
       }
     }
 
     try {
       ioManager.shutdown()
     } catch {
-      case t: Exception => LOG.error("I/O manager did not shutdown properly.", t)
+      case t: Exception => log.error("I/O manager did not shutdown properly.", t)
     }
 
     try {
       memoryManager.shutdown()
     } catch {
-      case t: Exception => LOG.error("Memory manager did not shutdown properly.", t)
+      case t: Exception => log.error("Memory manager did not shutdown properly.", t)
     }
 
     try {
       network.shutdown()
     } catch {
-      case t: Exception => LOG.error("Network environment did not shutdown properly.", t)
+      case t: Exception => log.error("Network environment did not shutdown properly.", t)
     }
 
     try {
       fileCache.shutdown()
     } catch {
-      case t: Exception => LOG.error("FileCache did not shutdown properly.", t)
+      case t: Exception => log.error("FileCache did not shutdown properly.", t)
     }
 
-    LOG.info("Task manager {} is completely shut down.", self.path)
+    log.info(s"Task manager ${self.path} is completely shut down.")
   }
 
   /**
@@ -277,8 +273,8 @@ extends Actor with ActorLogMessages with ActorLogging {
         handleJobManagerDisconnect(sender(), "JobManager is no longer reachable")
       }
       else {
-        LOG.warn("Received unrecognized disconnect message from {}",
-          if (actor == null) null else actor.path)
+        log.warn(s"Received unrecognized disconnect message " +
+          s"from ${if (actor == null) null else actor.path}.")
       }
 
     case Disconnect(msg) =>
@@ -291,7 +287,7 @@ extends Actor with ActorLogMessages with ActorLogging {
   override def unhandled(message: Any): Unit = {
     val errorMessage = "Received unknown message " + message
     val error = new RuntimeException(errorMessage)
-    LOG.error(errorMessage)
+    log.error(errorMessage)
 
     // terminate all we are currently running (with a dedicated message)
     // before the actor is stopped
@@ -311,8 +307,8 @@ extends Actor with ActorLogMessages with ActorLogging {
 
     // at very first, check that we are actually currently associated with a JobManager
     if (!isConnected) {
-      LOG.debug("Dropping message {} because the TaskManager is currently " +
-        "not connected to a JobManager", message)
+      log.debug(s"Dropping message $message because the TaskManager is currently " +
+        "not connected to a JobManager.")
     }
 
     // we order the messages by frequency, to make sure the code paths for matching
@@ -329,7 +325,7 @@ extends Actor with ActorLogMessages with ActorLogging {
 
       // discards intermediate result partitions of a task execution on this TaskManager
       case FailIntermediateResultPartitions(executionID) =>
-        LOG.info("Discarding the results produced by task execution " + executionID)
+        log.info("Discarding the results produced by task execution " + executionID)
         if (network.isAssociated) {
           try {
             network.getPartitionManager.releasePartitionsProducedBy(executionID)
@@ -392,7 +388,7 @@ extends Actor with ActorLogMessages with ActorLogging {
             Future {
               task.failExternally(cause)
             }.onFailure{
-              case t: Throwable => LOG.error(s"Could not fail task ${task} externally.", t)
+              case t: Throwable => log.error(s"Could not fail task ${task} externally.", t)
             }
           case None =>
         }
@@ -406,7 +402,7 @@ extends Actor with ActorLogMessages with ActorLogging {
             Future {
               task.cancelExecution()
             }.onFailure{
-              case t: Throwable => LOG.error("Could not cancel task " + task, t)
+              case t: Throwable => log.error("Could not cancel task " + task, t)
             }
 
             sender ! new TaskOperationResult(executionID, true)
@@ -426,10 +422,9 @@ extends Actor with ActorLogMessages with ActorLogging {
   private def handleCheckpointingMessage(message: CheckpointingMessage): Unit = {
 
     message match {
-
       case BarrierReq(attemptID, checkpointID) =>
-        LOG.debug("[FT-TaskManager] Barrier {} request received for attempt {}",
-          checkpointID, attemptID)
+        log.debug(s"[FT-TaskManager] Barrier $checkpointID request received " +
+          s"for attempt $attemptID.")
 
         runningTasks.get(attemptID) match {
           case Some(i) =>
@@ -441,16 +436,14 @@ extends Actor with ActorLogMessages with ActorLogging {
                       barrierTransceiver.broadcastBarrierFromSource(checkpointID)
                   }).start()
 
-                case _ => LOG.error(
-                  "Taskmanager received a checkpoint request for non-checkpointing task {}",
-                  attemptID)
+                case _ => log.error("Taskmanager received a checkpoint request for " +
+                  s"non-checkpointing task $attemptID.")
               }
             }
 
           case None =>
             // may always happen in case of canceled/finished tasks
-            LOG.debug("Taskmanager received a checkpoint request for unknown task {}",
-                      attemptID)
+            log.debug(s"Taskmanager received a checkpoint request for unknown task $attemptID.")
         }
 
       // unknown checkpoint message
@@ -474,19 +467,19 @@ extends Actor with ActorLogMessages with ActorLogging {
         if (isConnected) {
           // this may be the case, if we queue another attempt and
           // in the meantime, the registration is acknowledged
-          LOG.debug(
+          log.debug(
             "TaskManager was triggered to register at JobManager, but is already registered")
         }
         else if (deadline.exists(_.isOverdue())) {
           // we failed to register in time. that means we should quit
-          LOG.error("Failed to register at the JobManager withing the defined maximum " +
+          log.error("Failed to register at the JobManager withing the defined maximum " +
             "connect time. Shutting down ...")
 
           // terminate ourselves (hasta la vista)
           self ! PoisonPill
         }
         else {
-          LOG.info(s"Trying to register at JobManager ${jobManagerURL} " +
+          log.info(s"Trying to register at JobManager ${jobManagerURL} " +
             s"(attempt ${attempt}, timeout: ${timeout})")
 
           val jobManager = context.actorSelection(jobManagerAkkaURL)
@@ -510,9 +503,9 @@ extends Actor with ActorLogMessages with ActorLogging {
       case AcknowledgeRegistration(jobManager, id, blobPort) =>
         if (isConnected) {
           if (jobManager == currentJobManager.orNull) {
-            LOG.debug("Ignoring duplicate registration acknowledgement.")
+            log.debug("Ignoring duplicate registration acknowledgement.")
           } else {
-            LOG.warn(s"Ignoring 'AcknowledgeRegistration' message from ${jobManager.path} , " +
+            log.warn(s"Ignoring 'AcknowledgeRegistration' message from ${jobManager.path} , " +
               s"because the TaskManager is already registered at ${currentJobManager.orNull}")
           }
         }
@@ -531,15 +524,15 @@ extends Actor with ActorLogMessages with ActorLogging {
       case AlreadyRegistered(jobManager, id, blobPort) =>
         if (isConnected) {
           if (jobManager == currentJobManager.orNull) {
-            LOG.debug("Ignoring duplicate registration acknowledgement.")
+            log.debug("Ignoring duplicate registration acknowledgement.")
           } else {
-            LOG.warn(s"Received 'AlreadyRegistered' message from JobManager ${jobManager.path}, " +
+            log.warn(s"Received 'AlreadyRegistered' message from JobManager ${jobManager.path}, " +
               s"even through TaskManager is currently registered at ${currentJobManager.orNull}")
           }
         }
         else {
           // not connected, yet, to let's associate
-          LOG.info("Received 'AlreadyRegistered' message before 'AcknowledgeRegistration'")
+          log.info("Received 'AlreadyRegistered' message before 'AcknowledgeRegistration'")
 
           try {
             associateWithJobManager(jobManager, id, blobPort)
@@ -552,7 +545,7 @@ extends Actor with ActorLogMessages with ActorLogging {
 
       case RefuseRegistration(reason) =>
         if (currentJobManager.isEmpty) {
-          LOG.error(s"The registration at JobManager ${jobManagerAkkaURL} was refused, " +
+          log.error(s"The registration at JobManager ${jobManagerAkkaURL} was refused, " +
                     s"because: ${reason}. Retrying later...")
 
           // try the registration again after some time
@@ -570,11 +563,11 @@ extends Actor with ActorLogMessages with ActorLogging {
         else {
           // ignore RefuseRegistration messages which arrived after AcknowledgeRegistration
           if (sender() == currentJobManager.orNull) {
-            LOG.warn(s"Received 'RefuseRegistration' from the JobManager (${sender().path})" +
+            log.warn(s"Received 'RefuseRegistration' from the JobManager (${sender().path})" +
                      s" even though this TaskManager is already registered there.")
           }
           else {
-            LOG.warn(s"Ignoring 'RefuseRegistration' from unrelated JobManager (${sender().path})")
+            log.warn(s"Ignoring 'RefuseRegistration' from unrelated JobManager (${sender().path})")
           }
         }
 
@@ -621,7 +614,7 @@ extends Actor with ActorLogMessages with ActorLogging {
     // sanity check that we are not currently registered with a different JobManager
     if (isConnected) {
       if (currentJobManager.get == jobManager) {
-        LOG.warn("Received call to finish registration with JobManager " +
+        log.warn("Received call to finish registration with JobManager " +
           jobManager.path + " even though TaskManager is already registered.")
         return
       }
@@ -633,8 +626,8 @@ extends Actor with ActorLogMessages with ActorLogging {
     }
 
     // not yet associated, so associate
-    LOG.info("Successful registration at JobManager ({}), " +
-      "starting network stack and library cache.", jobManager.path)
+    log.info(s"Successful registration at JobManager (${jobManager.path}), " +
+      "starting network stack and library cache.")
 
     // sanity check that the JobManager dependent components are not set up currently
     if (network.isAssociated || blobService.isDefined) {
@@ -648,7 +641,7 @@ extends Actor with ActorLogMessages with ActorLogging {
     catch {
       case e: Exception =>
         val message = "Could not start network environment."
-        LOG.error(message, e)
+        log.error(message, e)
         throw new RuntimeException(message, e)
     }
 
@@ -657,7 +650,7 @@ extends Actor with ActorLogMessages with ActorLogging {
       val jmHost = jobManager.path.address.host.getOrElse("localhost")
       val address = new InetSocketAddress(jmHost, blobPort)
 
-      LOG.info("Determined BLOB server address to be {}. Starting BLOB cache.", address)
+      log.info(s"Determined BLOB server address to be $address. Starting BLOB cache.")
 
       try {
         val blobcache = new BlobCache(address, config.configuration)
@@ -667,7 +660,7 @@ extends Actor with ActorLogMessages with ActorLogging {
       catch {
         case e: Exception =>
           val message = "Could not create BLOB cache or library cache."
-          LOG.error(message, e)
+          log.error(message, e)
           throw new RuntimeException(message, e)
       }
     }
@@ -700,12 +693,12 @@ extends Actor with ActorLogMessages with ActorLogging {
    */
   private def disassociateFromJobManager(): Unit = {
     if (!isConnected) {
-      LOG.warn("TaskManager received message to disassociate from JobManager, even though " +
+      log.warn("TaskManager received message to disassociate from JobManager, even though " +
         "it is not currently associated with a JobManager")
       return
     }
 
-    LOG.info("Disassociating from JobManager")
+    log.info("Disassociating from JobManager")
 
     // stop the periodic heartbeats
     heartbeatScheduler foreach {
@@ -748,7 +741,7 @@ extends Actor with ActorLogMessages with ActorLogging {
       if (jobManager == currentJobManager.orNull) {
         try {
           val message = "Disconnecting from JobManager: " + msg
-          LOG.info(message)
+          log.info(message)
 
           // cancel all our tasks with a proper error message
           cancelAndClearEverything(new Exception(message))
@@ -769,7 +762,7 @@ extends Actor with ActorLogMessages with ActorLogging {
         }
       }
       else {
-        LOG.warn("Received erroneous JobManager disconnect message for {}", jobManager.path)
+        log.warn(s"Received erroneous JobManager disconnect message for ${jobManager.path}.")
       }
     }
   }
@@ -807,17 +800,15 @@ extends Actor with ActorLogMessages with ActorLogging {
 
       val userCodeClassLoader = libraryCacheManager match {
         case Some(manager) =>
-          if (LOG.isDebugEnabled) {
+          if (log.isDebugEnabled) {
             startRegisteringTask = System.currentTimeMillis()
           }
 
           // triggers the download of all missing jar files from the job manager
           manager.registerTask(jobID, executionID, tdd.getRequiredJarFiles)
 
-          if (LOG.isDebugEnabled) {
-            LOG.debug("Register task {} at library cache manager took {}s", executionID,
-              (System.currentTimeMillis() - startRegisteringTask) / 1000.0)
-          }
+          log.debug(s"Register task $executionID at library cache manager " +
+            s"took ${(System.currentTimeMillis() - startRegisteringTask) / 1000.0}s")
 
           manager.getClassLoader(jobID)
         case None => throw new IllegalStateException("There is no valid library cache manager.")
@@ -859,7 +850,7 @@ extends Actor with ActorLogMessages with ActorLogging {
       }
       
       // register the task with the network stack and profiles
-      LOG.info("Register task {}", task)
+      log.info(s"Register task $task.")
       network.registerTask(task)
 
       val cpTasks = new util.HashMap[String, FutureTask[Path]]()
@@ -881,7 +872,7 @@ extends Actor with ActorLogMessages with ActorLogging {
         val message = if (t.isInstanceOf[CancelTaskException]) {
           "Task was canceled"
         } else {
-          LOG.error("Could not instantiate task with execution ID " + executionID, t)
+          log.error("Could not instantiate task with execution ID " + executionID, t)
           ExceptionUtils.stringifyException(t)
         }
 
@@ -893,7 +884,7 @@ extends Actor with ActorLogMessages with ActorLogging {
 
           libraryCacheManager foreach { _.unregisterTask(jobID, executionID) }
         } catch {
-          case t: Throwable => LOG.error("Error during cleanup of task deployment.", t)
+          case t: Throwable => log.error("Error during cleanup of task deployment.", t)
         }
 
         sender ! new TaskOperationResult(executionID, false, message)
@@ -924,7 +915,7 @@ extends Actor with ActorLogMessages with ActorLogging {
                 reader.updateInputChannel(partitionInfo)
               } catch {
                 case t: Throwable =>
-                  LOG.error(s"Could not update input data location for task " +
+                  log.error(s"Could not update input data location for task " +
                     s"${task.getTaskName}. Trying to fail  task.", t)
 
                   try {
@@ -932,7 +923,7 @@ extends Actor with ActorLogMessages with ActorLogging {
                   }
                   catch {
                     case t: Throwable =>
-                      LOG.error("Failed canceling task with execution ID " + executionId +
+                      log.error("Failed canceling task with execution ID " + executionId +
                         " after task update failure.", t)
                   }
               }
@@ -951,8 +942,8 @@ extends Actor with ActorLogMessages with ActorLogging {
         }
 
       case None =>
-        LOG.debug("Discard update for input partitions of task {} : task is no longer running.",
-          executionId)
+        log.debug(s"Discard update for input partitions of task $executionId : " +
+          s"task is no longer running.")
         sender ! Acknowledge
     }
   }
@@ -965,7 +956,7 @@ extends Actor with ActorLogMessages with ActorLogging {
    */
   private def cancelAndClearEverything(cause: Throwable) {
     if (runningTasks.size > 0) {
-      LOG.info("Cancelling all computations and discarding all cached data.")
+      log.info("Cancelling all computations and discarding all cached data.")
 
       for (t <- runningTasks.values) {
         t.failExternally(cause)
@@ -983,22 +974,22 @@ extends Actor with ActorLogMessages with ActorLogging {
           try {
             task.failExternally(new Exception("Task is being removed from TaskManager"))
           } catch {
-            case e: Exception => LOG.error("Could not properly fail task", e)
+            case e: Exception => log.error("Could not properly fail task", e)
           }
         }
 
-        LOG.info("Unregister task with execution ID {}.", executionID)
+        log.info(s"Unregister task with execution ID $executionID.")
         removeAllTaskResources(task)
         libraryCacheManager foreach { _.unregisterTask(task.getJobID, executionID) }
 
-        LOG.info("Updating FINAL execution state of {} ({}) to {}.",
-          task.getTaskName, task.getExecutionId, task.getExecutionState)
+        log.info(s"Updating FINAL execution state of ${task.getTaskName} " +
+          s"(${task.getExecutionId}) to ${task.getExecutionState}.")
 
         self ! UpdateTaskExecutionState(new TaskExecutionState(
           task.getJobID, task.getExecutionId, task.getExecutionState, task.getFailureCause))
 
       case None =>
-        LOG.debug("Cannot find task with ID {} to unregister.", executionID)
+        log.debug(s"Cannot find task with ID $executionID to unregister.")
     }
   }
 
@@ -1044,7 +1035,7 @@ extends Actor with ActorLogMessages with ActorLogging {
         }
       } catch {
         // this is pretty unpleasant, but not a reason to give up immediately
-        case e: Exception => LOG.error(
+        case e: Exception => log.error(
           "Error cleaning up local temp files from the distributed cache.", e)
       }
     }
@@ -1060,14 +1051,14 @@ extends Actor with ActorLogMessages with ActorLogging {
    */
   private def sendHeartbeatToJobManager(): Unit = {
     try {
-      LOG.debug("Sending heartbeat to JobManager")
+      log.debug("Sending heartbeat to JobManager")
       val report: Array[Byte] = metricRegistryMapper.writeValueAsBytes(metricRegistry)
       currentJobManager foreach {
         jm => jm ! Heartbeat(instanceID, report)
       }
     }
     catch {
-      case e: Exception => LOG.warn("Error sending the metric heartbeat to the JobManager", e)
+      case e: Exception => log.warn("Error sending the metric heartbeat to the JobManager", e)
     }
   }
 
@@ -1093,7 +1084,7 @@ extends Actor with ActorLogMessages with ActorLogging {
       recipient ! StackTrace(instanceID, stackTraceStr)
     }
     catch {
-      case e: Exception => LOG.error("Failed to send stack trace to " + recipient.path, e)
+      case e: Exception => log.error("Failed to send stack trace to " + recipient.path, e)
     }
   }
 
@@ -1103,7 +1094,7 @@ extends Actor with ActorLogMessages with ActorLogging {
    * @param cause The exception that caused the fatal problem.
    */
   private def killTaskManagerFatal(message: String, cause: Throwable): Unit = {
-    LOG.error("\n" +
+    log.error("\n" +
       "==============================================================\n" +
       "======================      FATAL      =======================\n" +
       "==============================================================\n" +
@@ -1121,7 +1112,7 @@ extends Actor with ActorLogMessages with ActorLogging {
 object TaskManager {
 
   /** TaskManager logger for synchronous logging (not through the logging actor) */
-  val LOG = LoggerFactory.getLogger(classOf[TaskManager])
+  val LOG = Logger(classOf[TaskManager])
 
   /** Return code for unsuccessful TaskManager startup */
   val STARTUP_FAILURE_RETURN_CODE = 1
@@ -1159,7 +1150,7 @@ object TaskManager {
    */
   def main(args: Array[String]): Unit = {
     // startup checks and logging
-    EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args)
+    EnvironmentInformation.logEnvironmentInfo(LOG.logger, "TaskManager", args)
     EnvironmentInformation.checkJavaVersion()
 
     // try to parse the command line arguments
@@ -1283,8 +1274,8 @@ object TaskManager {
       LOG.info("Trying to select the network interface and address to use " +
         "by connecting to the configured JobManager.")
 
-      LOG.info("TaskManager will try to connect for {} seconds before falling back to heuristics",
-        MAX_STARTUP_CONNECT_TIME)
+      LOG.info(s"TaskManager will try to connect for $MAX_STARTUP_CONNECT_TIME seconds before " +
+        "falling back to heuristics")
 
       val jobManagerAddress = new InetSocketAddress(jobManagerHostname, jobManagerPort)
       val taskManagerAddress = try {
@@ -1361,7 +1352,7 @@ object TaskManager {
 
     // Bring up the TaskManager actor system first, bind it to the given address.
 
-    LOG.info("Starting TaskManager actor system at {}:{}", taskManagerHostname, actorSystemPort)
+    LOG.info(s"Starting TaskManager actor system at $taskManagerHostname:$actorSystemPort")
 
     val taskManagerSystem = try {
       val akkaConfig = AkkaUtils.getAkkaConfig(configuration,
@@ -1400,7 +1391,7 @@ object TaskManager {
       // the process reaper will kill the JVM process (to ensure easy failure detection)
       LOG.debug("Starting TaskManager process reaper")
       taskManagerSystem.actorOf(
-        Props(classOf[ProcessReaper], taskManager, LOG, RUNTIME_FAILURE_RETURN_CODE),
+        Props(classOf[ProcessReaper], taskManager, LOG.logger, RUNTIME_FAILURE_RETURN_CODE),
         "TaskManager_Process_Reaper")
 
       // if desired, start the logging daemon that periodically logs the
@@ -1521,7 +1512,7 @@ object TaskManager {
         "pick a fraction of the available memory.")
 
     val memorySize = if (configuredMemory > 0) {
-      LOG.info("Using {} MB for Flink managed memory.", configuredMemory)
+      LOG.info(s"Using $configuredMemory MB for Flink managed memory.")
       configuredMemory << 20 // megabytes to bytes
     }
     else {
@@ -1534,8 +1525,8 @@ object TaskManager {
       val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
                                                                                    fraction).toLong
 
-      LOG.info("Using {} of the currently free heap space for Flink managed memory ({} MB).",
-        fraction, relativeMemSize >> 20)
+      LOG.info(s"Using $fraction of the currently free heap space for Flink managed " +
+        s"memory (${relativeMemSize >> 20} MB).")
 
       relativeMemSize
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala
index 51e99f9..f0079f8 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala
@@ -22,7 +22,7 @@ import java.lang.management.ManagementFactory
 import java.util.concurrent.TimeUnit
 
 import akka.actor.{Cancellable, ActorRef, Actor, ActorLogging}
-import org.apache.flink.runtime.ActorLogMessages
+import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
 import org.apache.flink.runtime.execution.{RuntimeEnvironment, ExecutionState}
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.jobgraph.JobVertexID
@@ -44,7 +44,7 @@ import scala.concurrent.duration.FiniteDuration
  * @param reportInterval Interval of profiling action
  */
 class TaskManagerProfiler(val instancePath: String, val reportInterval: Int)
-   extends Actor with ActorLogMessages with ActorLogging {
+   extends Actor with ActorLogMessages with ActorSynchronousLogging {
 
   import context.dispatcher
 
@@ -101,7 +101,7 @@ class TaskManagerProfiler(val instancePath: String, val reportInterval: Int)
             Some(instanceProfiler.generateProfilingData(timestamp))
           } catch {
             case e: ProfilingException =>
-              log.error(e, "Error while retrieving instance profiling data.")
+              log.error("Error while retrieving instance profiling data.", e)
               None
           }
 
@@ -133,7 +133,7 @@ class TaskManagerProfiler(val instancePath: String, val reportInterval: Int)
             case _ =>
           }
         case None =>
-          log.warning(s"Could not find environment for execution id $executionID.")
+          log.warn(s"Could not find environment for execution id $executionID.")
       }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
index b400bb9..d6760ec 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
@@ -23,7 +23,7 @@ import java.net.InetSocketAddress
 import akka.actor._
 import akka.pattern.ask
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.ActorLogMessages
+import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus
@@ -34,8 +34,10 @@ import scala.concurrent.duration._
 import scala.language.postfixOps
 import scala.util.{Failure, Success}
 
-class ApplicationClient(flinkConfig: Configuration) extends Actor
-  with ActorLogMessages with ActorLogging {
+class ApplicationClient(flinkConfig: Configuration)
+  extends Actor
+  with ActorLogMessages
+  with ActorSynchronousLogging {
   import context._
 
   val INITIAL_POLLING_DELAY = 0 seconds
@@ -78,8 +80,8 @@ class ApplicationClient(flinkConfig: Configuration) extends Actor
       jobManagerFuture.onComplete {
         case Success(jm) => self ! JobManagerActorRef(jm)
         case Failure(t) =>
-          log.error(t, "Registration at JobManager/ApplicationMaster failed. Shutting " +
-            "ApplicationClient down.")
+          log.error("Registration at JobManager/ApplicationMaster failed. Shutting " +
+            "ApplicationClient down.", t)
 
           // we could not connect to the job manager --> poison ourselves
           self ! PoisonPill
@@ -93,7 +95,7 @@ class ApplicationClient(flinkConfig: Configuration) extends Actor
       // sender as the Application Client (this class).
       (jm ? RegisterClient(self))(timeout).onFailure{
         case t: Throwable =>
-          log.error(t, "Could not register at the job manager.")
+          log.error("Could not register at the job manager.", t)
           self ! PoisonPill
       }
 
@@ -144,7 +146,7 @@ class ApplicationClient(flinkConfig: Configuration) extends Actor
     // -----------------  handle messages from the cluster -------------------
     // receive remote messages
     case msg: YarnMessage =>
-      log.debug("Received new YarnMessage {}. Now {} messages in queue", msg, messagesQueue.size)
+      log.debug(s"Received new YarnMessage $msg. Now ${messagesQueue.size} messages in queue")
       messagesQueue.enqueue(msg)
 
     // locally forward messages

http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index 0fdef47..06e16dc 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -21,6 +21,7 @@ import java.io.{PrintWriter, FileWriter, BufferedWriter}
 import java.security.PrivilegedAction
 
 import akka.actor._
+import grizzled.slf4j.Logger
 import org.apache.flink.client.CliFrontend
 import org.apache.flink.configuration.{GlobalConfiguration, Configuration, ConfigConstants}
 import org.apache.flink.runtime.akka.AkkaUtils
@@ -38,7 +39,7 @@ import scala.io.Source
 object ApplicationMaster {
   import scala.collection.JavaConversions._
 
-  val LOG = LoggerFactory.getLogger(this.getClass)
+  val LOG = Logger(getClass)
 
   val CONF_FILE = "flink-conf.yaml"
   val MODIFIED_CONF_FILE = "flink-conf-modified.yaml"
@@ -50,9 +51,9 @@ object ApplicationMaster {
     LOG.info(s"YARN daemon runs as ${UserGroupInformation.getCurrentUser.getShortUserName} " +
       s"setting user to execute Flink ApplicationMaster/JobManager to ${yarnClientUsername}")
 
-    EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster/JobManager", args)
+    EnvironmentInformation.logEnvironmentInfo(LOG.logger, "YARN ApplicationMaster/JobManager", args)
     EnvironmentInformation.checkJavaVersion()
-    org.apache.flink.runtime.util.SignalHandler.register(LOG)
+    org.apache.flink.runtime.util.SignalHandler.register(LOG.logger)
 
     val ugi = UserGroupInformation.createRemoteUser(yarnClientUsername)
 
@@ -206,7 +207,7 @@ object ApplicationMaster {
     (Configuration, ActorSystem, ActorRef, ActorRef) = {
 
     LOG.info("Starting JobManager for YARN")
-    LOG.info("Loading config from: {}", currDir)
+    LOG.info(s"Loading config from: $currDir.")
 
     GlobalConfiguration.loadConfiguration(currDir)
     val configuration = GlobalConfiguration.getConfiguration()

http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
index f384130..26d1f69 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
@@ -98,7 +98,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
 
   def receiveYarnMessages: Receive = {
     case StopYarnSession(status, diag) =>
-      log.info("Stopping YARN JobManager with status {} and diagnostic {}.", status, diag)
+      log.info(s"Stopping YARN JobManager with status $status and diagnostic $diag.")
 
       instanceManager.getAllRegisteredInstances.asScala foreach {
         instance =>
@@ -108,11 +108,11 @@ trait ApplicationMasterActor extends ActorLogMessages {
       rmClientOption foreach {
         rmClient =>
           Try(rmClient.unregisterApplicationMaster(status, diag, "")).recover{
-            case t: Throwable => log.error(t, "Could not unregister the application master.")
+            case t: Throwable => log.error("Could not unregister the application master.", t)
           }
 
           Try(rmClient.close()).recover{
-            case t:Throwable => log.error(t, "Could not close the AMRMClient.")
+            case t:Throwable => log.error("Could not close the AMRMClient.", t)
           }
       }
 
@@ -121,7 +121,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
       nmClientOption foreach {
         nmClient =>
         Try(nmClient.close()).recover{
-          case t: Throwable => log.error(t, "Could not close the NMClient.")
+          case t: Throwable => log.error("Could not close the NMClient.", t)
         }
       }
 
@@ -133,7 +133,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
       context.system.shutdown()
 
     case RegisterClient(client) =>
-      log.info("Register {} as client.", client.path)
+      log.info(s"Register ${client.path} as client.")
       messageListener = Some(client)
       sender ! Acknowledge
 
@@ -142,7 +142,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
 
     case msg: StopAMAfterJob =>
       val jobId = msg.jobId
-      log.info("ApplicatonMaster will shut down YARN session when job {} has finished", jobId)
+      log.info(s"ApplicatonMaster will shut down YARN session when job $jobId has finished.")
       stopWhenJobFinished = jobId
       sender() ! Acknowledge
 
@@ -155,26 +155,26 @@ trait ApplicationMasterActor extends ActorLogMessages {
       startYarnSession(conf, actorSystemPort, webServerPort)
 
     case jnf: JobNotFound =>
-      LOG.warn("Job with ID {} not found in JobManager", jnf.jobID)
+      log.warn(s"Job with ID ${jnf.jobID} not found in JobManager")
       if(stopWhenJobFinished == null) {
-        LOG.warn("The ApplicationMaster didn't expect to receive this message")
+        log.warn("The ApplicationMaster didn't expect to receive this message")
       }
 
     case jobStatus: CurrentJobStatus =>
       if(stopWhenJobFinished == null) {
-        LOG.warn("Received job status {} which wasn't requested", jobStatus)
+        log.warn(s"Received job status $jobStatus which wasn't requested.")
       } else {
         if(stopWhenJobFinished != jobStatus.jobID) {
-          LOG.warn(s"Received job status for job ${jobStatus.jobID} but expected status for " +
+          log.warn(s"Received job status for job ${jobStatus.jobID} but expected status for " +
             s"job $stopWhenJobFinished")
         } else {
           if(jobStatus.status.isTerminalState) {
-            LOG.info(s"Job with ID ${jobStatus.jobID} is in terminal state ${jobStatus.status}. " +
+            log.info(s"Job with ID ${jobStatus.jobID} is in terminal state ${jobStatus.status}. " +
               s"Shutting down YARN session")
             self ! StopYarnSession(FinalApplicationStatus.SUCCEEDED,
               s"The monitored job with ID ${jobStatus.jobID} has finished.")
           } else {
-            LOG.debug(s"Monitored job with ID ${jobStatus.jobID} is in state ${jobStatus.status}")
+            log.debug(s"Monitored job with ID ${jobStatus.jobID} is in state ${jobStatus.status}")
           }
         }
       }
@@ -229,14 +229,14 @@ trait ApplicationMasterActor extends ActorLogMessages {
             })
           }
           // return containers if the RM wants them and we haven't allocated them yet.
-          val preemtionMessage = response.getPreemptionMessage
-          if(preemtionMessage != null) {
-            log.info("Received preemtion message from YARN {}", preemtionMessage)
-            val contract = preemtionMessage.getContract
+          val preemptionMessage = response.getPreemptionMessage
+          if(preemptionMessage != null) {
+            log.info(s"Received preemtion message from YARN $preemptionMessage.")
+            val contract = preemptionMessage.getContract
             if(contract != null) {
               tryToReturnContainers(contract.getContainers.asScala)
             }
-            val strictContract = preemtionMessage.getStrictContract
+            val strictContract = preemptionMessage.getStrictContract
             if(strictContract != null) {
               tryToReturnContainers(strictContract.getContainers.asScala)
             }
@@ -247,13 +247,13 @@ trait ApplicationMasterActor extends ActorLogMessages {
           // check if we want to start some of our allocated containers.
           if(runningContainers < numTaskManager) {
             var missingContainers = numTaskManager - runningContainers
-            log.info("The user requested {} containers, {} running. {} containers missing",
-              numTaskManager, runningContainers, missingContainers)
+            log.info(s"The user requested $numTaskManager containers, $runningContainers " +
+              s"running. $missingContainers containers missing")
 
             // not enough containers running
             if(allocatedContainersList.size > 0) {
-              log.info("{} containers already allocated by YARN. Starting...",
-                allocatedContainersList.size)
+              log.info(s"${allocatedContainersList.size} containers already allocated by YARN. " +
+                "Starting...")
               // we have some containers allocated to us --> start them
               allocatedContainersList = allocatedContainersList.dropWhile(container => {
                 if (missingContainers <= 0) {
@@ -279,7 +279,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
                             }
                           } catch {
                             case e: YarnException =>
-                              log.error(e, "Exception while starting YARN container")
+                              log.error("Exception while starting YARN container", e)
                           }
                         }
                         case None =>
@@ -305,24 +305,24 @@ trait ApplicationMasterActor extends ActorLogMessages {
               log.info(s"There are $missingContainers containers missing." +
                 s" $numPendingRequests are already requested. " +
                 s"Requesting $toAllocateFromYarn additional container(s) from YARN. " +
-                s"Reallocation of failed containers is enabled=$reallocate ('{}')",
-                ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS)
+                s"Reallocation of failed containers is enabled=$reallocate " +
+                s"('${ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS}')")
               // there are still containers missing. Request them from YARN
               if(reallocate) {
                 for(i <- 1 to toAllocateFromYarn) {
                   val containerRequest = getContainerRequest(memoryPerTaskManager)
                   rmClient.addContainerRequest(containerRequest)
                   numPendingRequests += 1
-                  log.info("Requested additional container from YARN. Pending requests {}",
-                    numPendingRequests)
+                  log.info("Requested additional container from YARN. Pending requests " +
+                    s"$numPendingRequests.")
                 }
               }
             }
           }
 
           if(runningContainers >= numTaskManager && allocatedContainersList.size > 0) {
-            log.info("Flink has {} allocated containers which are not needed right now. " +
-              "Returning them", allocatedContainersList.size)
+            log.info(s"Flink has ${allocatedContainersList.size} allocated containers which " +
+              s"are not needed right now. Returning them")
             for(container <- allocatedContainersList) {
               rmClient.releaseAssignedContainer(container.getId)
             }
@@ -353,9 +353,9 @@ trait ApplicationMasterActor extends ActorLogMessages {
           self ! StopYarnSession(FinalApplicationStatus.FAILED, "Fatal error in AM: AMRMClient " +
             "was not set")
       }
-      log.debug("Processed Heartbeat with RMClient. Running containers {}," +
-        "failed containers {}, allocated containers {}", runningContainers, failedContainers,
-        allocatedContainersList.size)
+      log.debug(s"Processed Heartbeat with RMClient. Running containers $runningContainers, " +
+        s"failed containers $failedContainers, " +
+        s"allocated containers ${allocatedContainersList.size}.")
   }
 
   private def runningContainerIds(): mutable.MutableList[ContainerId] = {
@@ -381,17 +381,16 @@ trait ApplicationMasterActor extends ActorLogMessages {
           YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS), MILLISECONDS)
 
       if(YARN_HEARTBEAT_DELAY.gteq(yarnExpiryInterval)) {
-        log.warning("The heartbeat interval of the Flink Application master ({}) is greater than " +
-          "YARN's expiry interval ({}). The application is likely to be killed by YARN.",
-        YARN_HEARTBEAT_DELAY, yarnExpiryInterval)
+        log.warn(s"The heartbeat interval of the Flink Application master " +
+          s"($YARN_HEARTBEAT_DELAY) is greater than YARN's expiry interval " +
+          s"($yarnExpiryInterval). The application is likely to be killed by YARN.")
       }
 
       numTaskManager = env.get(FlinkYarnClient.ENV_TM_COUNT).toInt
       maxFailedContainers = flinkConfiguration.
         getInteger(ConfigConstants.YARN_MAX_FAILED_CONTAINERS, numTaskManager)
-      log.info("Requesting {} TaskManagers. Tolerating {} failed TaskManagers",
-        numTaskManager, maxFailedContainers)
-
+      log.info(s"Requesting $numTaskManager TaskManagers. Tolerating $maxFailedContainers failed " +
+        "TaskManagers")
 
       val remoteFlinkJarPath = env.get(FlinkYarnClient.FLINK_JAR_PATH)
       val fs = FileSystem.get(conf)
@@ -468,7 +467,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
       context.system.scheduler.scheduleOnce(FAST_YARN_HEARTBEAT_DELAY, self, HeartbeatWithYarn)
     } recover {
       case t: Throwable =>
-        log.error(t, "Could not start yarn session.")
+        log.error("Could not start yarn session.", t)
         self ! StopYarnSession(FinalApplicationStatus.FAILED,
           s"ApplicationMaster failed while starting. Exception Message: ${t.getMessage}")
     }
@@ -479,7 +478,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
       allocatedContainersList = allocatedContainersList.dropWhile( container => {
         val result = requestedBackContainers.getId.equals(container.getId)
         if(result) {
-          log.info("Returning container {} back to ResourceManager.", container)
+          log.info(s"Returning container $container back to ResourceManager.")
         }
         result
       })
@@ -548,7 +547,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
       ctx.setTokens(securityTokens)
     } catch {
       case t: Throwable =>
-        log.error(t, "Getting current user info failed when trying to launch the container")
+        log.error("Getting current user info failed when trying to launch the container", t)
     }
 
     ctx

http://git-wip-us.apache.org/repos/asf/flink/blob/5a2ca819/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 524e45f..1278386 100644
--- a/pom.xml
+++ b/pom.xml
@@ -275,6 +275,12 @@ under the License.
 			</dependency>
 
 			<dependency>
+				<groupId>org.clapper</groupId>
+				<artifactId>grizzled-slf4j_${scala.binary.version}</artifactId>
+				<version>1.0.2</version>
+			</dependency>
+
+			<dependency>
 				<groupId>com.typesafe.akka</groupId>
 				<artifactId>akka-actor_${scala.binary.version}</artifactId>
 				<version>${akka.version}</version>