You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/11/22 15:11:34 UTC

kafka git commit: KAFKA-1044; Eliminate direct and non-optional log4j references from `core`

Repository: kafka
Updated Branches:
  refs/heads/trunk 80038e6d2 -> ed8b0315a


KAFKA-1044; Eliminate direct and non-optional log4j references from `core`

Use slf4j (via scala-logging) instead. Also:

- Log4jController is only initialised if log4j if in the classpath
- Use FATAL marker to support log4j's FATAL level (as the log4j-slf4j bridge does)
- Removed `Logging.swallow` in favour of CoreUtils.swallow, which logs to the
correct logger

Author: Viktor Somogyi <vi...@cloudera.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #3477 from viktorsomogyi/KAFKA-1044


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ed8b0315
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ed8b0315
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ed8b0315

Branch: refs/heads/trunk
Commit: ed8b0315a6c3705b2a163ce3ab4723234779264f
Parents: 80038e6
Author: Viktor Somogyi <vi...@cloudera.com>
Authored: Wed Nov 22 12:44:15 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed Nov 22 15:06:56 2017 +0000

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 core/src/main/scala/kafka/Kafka.scala           |   2 +-
 .../src/main/scala/kafka/admin/AclCommand.scala |   4 +-
 .../ZkNodeChangeNotificationListener.scala      |   2 +-
 .../kafka/consumer/ConsumerFetcherManager.scala |   2 +-
 .../controller/ControllerChannelManager.scala   |   3 +-
 .../kafka/controller/StateChangeLogger.scala    |   6 +-
 .../main/scala/kafka/log/AbstractIndex.scala    |   4 +-
 core/src/main/scala/kafka/log/LogManager.scala  |   6 +-
 core/src/main/scala/kafka/log/LogSegment.scala  |  22 +--
 .../scala/kafka/network/BlockingChannel.scala   |   8 +-
 .../scala/kafka/network/RequestChannel.scala    |   9 +-
 .../main/scala/kafka/network/SocketServer.scala |  11 +-
 .../scala/kafka/producer/SyncProducer.scala     |   2 +-
 .../producer/async/DefaultEventHandler.scala    |   9 +-
 .../security/auth/SimpleAclAuthorizer.scala     |   4 +-
 .../main/scala/kafka/server/AdminManager.scala  |   4 +-
 .../scala/kafka/server/ClientQuotaManager.scala |  10 +-
 .../main/scala/kafka/server/ConfigHandler.scala |   4 +-
 .../kafka/server/DynamicConfigManager.scala     |   4 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |   6 +-
 .../main/scala/kafka/server/KafkaServer.scala   |  42 +++---
 .../kafka/server/ReplicaFetcherThread.scala     |   6 +-
 .../scala/kafka/tools/ConsoleConsumer.scala     |   8 +-
 .../scala/kafka/tools/ConsumerPerformance.scala |   6 +-
 .../main/scala/kafka/tools/MirrorMaker.scala    |   8 +-
 .../scala/kafka/tools/ProducerPerformance.scala |   3 +-
 .../kafka/tools/SimpleConsumerPerformance.scala |   6 +-
 core/src/main/scala/kafka/utils/CoreUtils.scala |  21 ++-
 .../scala/kafka/utils/Log4jController.scala     |  11 +-
 core/src/main/scala/kafka/utils/Logging.scala   | 136 +++++++------------
 .../scala/kafka/security/minikdc/MiniKdc.scala  |   6 +-
 .../test/scala/kafka/utils/LoggingTest.scala    |  37 +++++
 .../unit/kafka/server/LogDirFailureTest.scala   |   4 +-
 .../unit/kafka/server/ServerShutdownTest.scala  |   2 +-
 .../scala/unit/kafka/utils/CoreUtilsTest.scala  |   7 +-
 .../scala/unit/kafka/zk/EmbeddedZookeeper.scala |   9 +-
 .../unit/kafka/zk/ZooKeeperTestHarness.scala    |   4 +-
 gradle/dependencies.gradle                      |   2 +
 39 files changed, 218 insertions(+), 223 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 03426cd..1223150 100644
--- a/build.gradle
+++ b/build.gradle
@@ -550,6 +550,7 @@ project(':core') {
     compile libs.metrics
     compile libs.scala
     compile libs.slf4jlog4j
+    compile libs.scalaLogging
     compile libs.zkclient
     compile libs.zookeeper
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/Kafka.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala
index 9651038..25a7216 100755
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -94,7 +94,7 @@ object Kafka extends Logging {
     }
     catch {
       case e: Throwable =>
-        fatal(e)
+        fatal("Exiting Kafka due to fatal exception", e)
         Exit.exit(1)
     }
     Exit.exit(0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/admin/AclCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index 4522135..2732f6c 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.utils.Utils
 
 import scala.collection.JavaConverters._
 
-object AclCommand {
+object AclCommand extends Logging {
 
   val Newline = scala.util.Properties.lineSeparator
   val ResourceTypeToValidOperations = Map[ResourceType, Set[Operation]] (
@@ -77,7 +77,7 @@ object AclCommand {
       authZ.configure(authorizerProperties.asJava)
       f(authZ)
     }
-    finally CoreUtils.swallow(authZ.close())
+    finally CoreUtils.swallow(authZ.close(), this)
   }
 
   private def addAcl(opts: AclCommandOptions) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index f0d4b1b..4cae80c 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -87,7 +87,7 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient,
             val (data, _) = zkClient.getDataAndStat(changeZnode)
             data match {
               case Some(d) => notificationHandler.processNotification(d)
-              case None => logger.warn(s"read null data from $changeZnode when processing notification $notification")
+              case None => warn(s"read null data from $changeZnode when processing notification $notification")
             }
             lastExecutedChange = changeId
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 7cccfe1..0a6b82e 100755
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -69,7 +69,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
                                                             config.clientId,
                                                             config.socketTimeoutMs,
                                                             correlationId.getAndIncrement).topicsMetadata
-        if(logger.isDebugEnabled) topicsMetadata.foreach(topicMetadata => debug(topicMetadata.toString()))
+        if(isDebugEnabled) topicsMetadata.foreach(topicMetadata => debug(topicMetadata.toString()))
         topicsMetadata.foreach { tmd =>
           val topic = tmd.topic
           tmd.partitionsMetadata.foreach { pmd =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 7314679..d5bd4e6 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.common.security.JaasContext
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.common.{Node, TopicPartition}
+import org.slf4j.event.Level
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
@@ -210,7 +211,7 @@ class RequestSendThread(val controllerId: Int,
 
   override def doWork(): Unit = {
 
-    def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(100))
+    def backoff(): Unit = CoreUtils.swallow(Thread.sleep(100), this, Level.TRACE)
 
     val QueueItem(apiKey, requestBuilder, callback) = queue.take()
     var clientResponse: ClientResponse = null

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/controller/StateChangeLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/StateChangeLogger.scala b/core/src/main/scala/kafka/controller/StateChangeLogger.scala
index 21c70c3..a1d1bb2 100644
--- a/core/src/main/scala/kafka/controller/StateChangeLogger.scala
+++ b/core/src/main/scala/kafka/controller/StateChangeLogger.scala
@@ -17,11 +17,11 @@
 
 package kafka.controller
 
+import com.typesafe.scalalogging.Logger
 import kafka.utils.Logging
-import org.apache.log4j
 
 object StateChangeLogger {
-  private val Logger = log4j.Logger.getLogger("state.change.logger")
+  private val logger = Logger("state.change.logger")
 }
 
 /**
@@ -34,7 +34,7 @@ class StateChangeLogger(brokerId: Int, inControllerContext: Boolean, controllerE
   if (controllerEpoch.isDefined && !inControllerContext)
     throw new IllegalArgumentException("Controller epoch should only be defined if inControllerContext is true")
 
-  override lazy val logger = StateChangeLogger.Logger
+  override lazy val logger = StateChangeLogger.logger
 
   locally {
     val prefix = if (inControllerContext) "Controller" else "Broker"

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/log/AbstractIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
index 899c107..9696d8d 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -75,7 +75,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
         idx.position(roundDownToExactMultiple(idx.limit(), entrySize))
       idx
     } finally {
-      CoreUtils.swallow(raf.close())
+      CoreUtils.swallow(raf.close(), this)
     }
   }
 
@@ -130,7 +130,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
           mmap.position(position)
           true
         } finally {
-          CoreUtils.swallow(raf.close())
+          CoreUtils.swallow(raf.close(), this)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index f1e2fc2..9a61be3 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -206,7 +206,7 @@ class LogManager(logDirs: Seq[File],
 
       info(s"Logs for partitions ${offlineCurrentTopicPartitions.mkString(",")} are offline and " +
            s"logs for future partitions ${offlineFutureTopicPartitions.mkString(",")} are offline due to failure on log directory $dir")
-      dirLocks.filter(_.file.getParent == dir).foreach(dir => CoreUtils.swallow(dir.destroy()))
+      dirLocks.filter(_.file.getParent == dir).foreach(dir => CoreUtils.swallow(dir.destroy(), this))
     }
   }
 
@@ -412,7 +412,7 @@ class LogManager(logDirs: Seq[File],
 
     // stop the cleaner first
     if (cleaner != null) {
-      CoreUtils.swallow(cleaner.shutdown())
+      CoreUtils.swallow(cleaner.shutdown(), this)
     }
 
     // close logs in each dir
@@ -448,7 +448,7 @@ class LogManager(logDirs: Seq[File],
 
         // mark that the shutdown was clean by creating marker file
         debug("Writing clean shutdown marker at " + dir)
-        CoreUtils.swallow(Files.createFile(new File(dir, Log.CleanShutdownFile).toPath))
+        CoreUtils.swallow(Files.createFile(new File(dir, Log.CleanShutdownFile).toPath), this)
       }
     } catch {
       case e: ExecutionException =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 845f08f..6db2a50 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -287,12 +287,12 @@ class LogSegment(val log: FileRecords,
       }
     } catch {
       case e: CorruptRecordException =>
-        logger.warn("Found invalid messages in log segment %s at byte offset %d: %s."
+        warn("Found invalid messages in log segment %s at byte offset %d: %s."
           .format(log.file.getAbsolutePath, validBytes, e.getMessage))
     }
     val truncated = log.sizeInBytes - validBytes
     if (truncated > 0)
-      logger.debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery")
+      debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery")
 
     log.truncateTo(validBytes)
     index.trimToValidSize()
@@ -467,21 +467,21 @@ class LogSegment(val log: FileRecords,
    * Close this log segment
    */
   def close() {
-    CoreUtils.swallow(timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true))
-    CoreUtils.swallow(index.close())
-    CoreUtils.swallow(timeIndex.close())
-    CoreUtils.swallow(log.close())
-    CoreUtils.swallow(txnIndex.close())
+    CoreUtils.swallow(timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true), this)
+    CoreUtils.swallow(index.close(), this)
+    CoreUtils.swallow(timeIndex.close(), this)
+    CoreUtils.swallow(log.close(), this)
+    CoreUtils.swallow(txnIndex.close(), this)
   }
 
   /**
     * Close file handlers used by the log segment but don't write to disk. This is used when the disk may have failed
     */
   def closeHandlers() {
-    CoreUtils.swallow(index.closeHandler())
-    CoreUtils.swallow(timeIndex.closeHandler())
-    CoreUtils.swallow(log.closeHandlers())
-    CoreUtils.swallow(txnIndex.close())
+    CoreUtils.swallow(index.closeHandler(), this)
+    CoreUtils.swallow(timeIndex.closeHandler(), this)
+    CoreUtils.swallow(log.closeHandlers(), this)
+    CoreUtils.swallow(txnIndex.close(), this)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/network/BlockingChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala
index 69fd054..3493ad3 100644
--- a/core/src/main/scala/kafka/network/BlockingChannel.scala
+++ b/core/src/main/scala/kafka/network/BlockingChannel.scala
@@ -21,7 +21,7 @@ import java.net.InetSocketAddress
 import java.nio.channels._
 
 import kafka.api.RequestOrResponse
-import kafka.utils.{Logging, nonthreadsafe}
+import kafka.utils.{CoreUtils, Logging, nonthreadsafe}
 import org.apache.kafka.common.network.NetworkReceive
 
 
@@ -91,15 +91,15 @@ class BlockingChannel( val host: String,
   
   def disconnect() = lock synchronized {
     if(channel != null) {
-      swallow(channel.close())
-      swallow(channel.socket.close())
+      CoreUtils.swallow(channel.close(), this)
+      CoreUtils.swallow(channel.socket.close(), this)
       channel = null
       writeChannel = null
     }
     // closing the main socket channel *should* close the read channel
     // but let's do it to be sure.
     if(readChannel != null) {
-      swallow(readChannel.close())
+      CoreUtils.swallow(readChannel.close(), this)
       readChannel = null
     }
     connected = false

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index a50af45..7cc8619 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -21,6 +21,7 @@ import java.net.InetAddress
 import java.nio.ByteBuffer
 import java.util.concurrent._
 
+import com.typesafe.scalalogging.Logger
 import com.yammer.metrics.core.{Gauge, Meter}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.network.RequestChannel.{BaseRequest, SendAction, ShutdownRequest, NoOpAction, CloseConnectionAction}
@@ -31,15 +32,14 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{Sanitizer, Time}
-import org.apache.log4j.Logger
 
 import scala.collection.mutable
 import scala.reflect.ClassTag
 
 object RequestChannel extends Logging {
-  private val requestLogger = Logger.getLogger("kafka.request.logger")
+  private val requestLogger = Logger("kafka.request.logger")
 
-  def isRequestLoggingEnabled: Boolean = requestLogger.isDebugEnabled
+  def isRequestLoggingEnabled: Boolean = requestLogger.underlying.isDebugEnabled
 
   sealed trait BaseRequest
   case object ShutdownRequest extends BaseRequest
@@ -176,7 +176,7 @@ object RequestChannel extends Logging {
       recordNetworkThreadTimeCallback.foreach(record => record(networkThreadTimeNanos))
 
       if (isRequestLoggingEnabled) {
-        val detailsEnabled = requestLogger.isTraceEnabled
+        val detailsEnabled = requestLogger.underlying.isTraceEnabled
         val responseString = response.responseAsString.getOrElse(
           throw new IllegalStateException("responseAsString should always be defined if request logging is enabled"))
 
@@ -325,6 +325,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
   }
 
   def sendShutdownRequest(): Unit = requestQueue.put(ShutdownRequest)
+
 }
 
 object RequestMetrics {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 4366fea..200bfe2 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -38,6 +38,7 @@ import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, KafkaCh
 import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time}
+import org.slf4j.event.Level
 
 import scala.collection._
 import JavaConverters._
@@ -252,8 +253,8 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ
     if (channel != null) {
       debug("Closing connection from " + channel.socket.getRemoteSocketAddress())
       connectionQuotas.dec(channel.socket.getInetAddress)
-      swallowError(channel.socket().close())
-      swallowError(channel.close())
+      CoreUtils.swallow(channel.socket().close(), this, Level.ERROR)
+      CoreUtils.swallow(channel.close(), this, Level.ERROR)
     }
   }
 }
@@ -319,8 +320,8 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
       }
     } finally {
       debug("Closing server socket and selector.")
-      swallowError(serverChannel.close())
-      swallowError(nioSelector.close())
+      CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
+      CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
       shutdownComplete()
     }
   }
@@ -481,7 +482,7 @@ private[kafka] class Processor(val id: Int,
       }
     } finally {
       debug("Closing selector - processor " + id)
-      swallowError(closeAll())
+      CoreUtils.swallow(closeAll(), this, Level.ERROR)
       shutdownComplete()
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 04527c8..b132293 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -56,7 +56,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
      * Also, when verification is turned on, care should be taken to see that the logs don't fill up with unnecessary
      * data. So, leaving the rest of the logging at TRACE, while errors should be logged at ERROR level
      */
-    if (logger.isDebugEnabled) {
+    if (isDebugEnabled) {
       val buffer = new RequestOrResponseSend("", request).buffer
       trace("verifying sendbuffer of size " + buffer.limit())
       val requestTypeId = buffer.getShort()

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 3e4eaa3..8c7465f 100755
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic._
 
 import kafka.api.{ProducerRequest, TopicMetadata}
 import org.apache.kafka.common.utils.{Time, Utils}
+import org.slf4j.event.Level
 
 @deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
 class DefaultEventHandler[K,V](config: ProducerConfig,
@@ -72,7 +73,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
       topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
       if (topicMetadataRefreshInterval >= 0 &&
           Time.SYSTEM.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
-        CoreUtils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))
+        CoreUtils.swallow(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement), this, Level.ERROR)
         sendPartitionPerTopicCache.clear()
         topicMetadataToRefresh.clear
         lastTopicMetadataRefreshTime = Time.SYSTEM.milliseconds
@@ -83,7 +84,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
         // back off and update the topic metadata cache before attempting another send operation
         Thread.sleep(config.retryBackoffMs)
         // get topics of the outstanding produce requests and refresh metadata for those
-        CoreUtils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement))
+        CoreUtils.swallow(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement), this, Level.ERROR)
         sendPartitionPerTopicCache.clear()
         remainingRetries -= 1
         producerStats.resendRate.mark()
@@ -105,7 +106,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
       case Some(partitionedData) =>
         val failedProduceRequests = new ArrayBuffer[KeyedMessage[K, Message]]
         for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
-          if (logger.isTraceEnabled) {
+          if (isTraceEnabled) {
             messagesPerBrokerMap.foreach(partitionAndEvent =>
               trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
           }
@@ -277,7 +278,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
         if(response != null) {
           if (response.status.size != producerRequest.data.size)
             throw new KafkaException("Incomplete response (%s) for producer request (%s)".format(response, producerRequest))
-          if (logger.isTraceEnabled) {
+          if (isTraceEnabled) {
             val successfullySentData = response.status.filter(_._2.error == Errors.NONE)
             successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message =>
               trace("Successfully sent message: %s".format(if(message.message.isNull) null else message.message.toString()))))

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index aa25653..e1befc7 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -19,6 +19,7 @@ package kafka.security.auth
 import java.util
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
+import com.typesafe.scalalogging.Logger
 import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
 import kafka.network.RequestChannel.Session
 import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
@@ -29,7 +30,6 @@ import kafka.zk.{AclChangeNotificationSequenceZNode, AclChangeNotificationZNode,
 import kafka.zookeeper.ZooKeeperClient
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.SecurityUtils
-import org.apache.log4j.Logger
 
 import scala.collection.JavaConverters._
 import scala.util.Random
@@ -51,7 +51,7 @@ object SimpleAclAuthorizer {
 }
 
 class SimpleAclAuthorizer extends Authorizer with Logging {
-  private val authorizerLogger = Logger.getLogger("kafka.authorizer.logger")
+  private val authorizerLogger = Logger("kafka.authorizer.logger")
   private var superUsers = Set.empty[KafkaPrincipal]
   private var shouldAllowEveryoneIfNoAclIsFound = false
   private var zkClient: KafkaZkClient = null

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/server/AdminManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index aefdefd..935fade 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -387,7 +387,7 @@ class AdminManager(val config: KafkaConfig,
 
   def shutdown() {
     topicPurgatory.shutdown()
-    CoreUtils.swallow(createTopicPolicy.foreach(_.close()))
-    CoreUtils.swallow(alterConfigPolicy.foreach(_.close()))
+    CoreUtils.swallow(createTopicPolicy.foreach(_.close()), this)
+    CoreUtils.swallow(alterConfigPolicy.foreach(_.close()), this)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index a6efa92..8ec27a3 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -194,7 +194,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
         // If delayed, add the element to the delayQueue
         delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))
         delayQueueSensor.record()
-        logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs))
+        debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs))
     }
     throttleTimeMs
   }
@@ -434,7 +434,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
       }
       quota match {
         case Some(newQuota) =>
-          logger.info(s"Changing ${quotaType} quota for ${userInfo}${clientIdInfo} to $newQuota.bound}")
+          info(s"Changing ${quotaType} quota for ${userInfo}${clientIdInfo} to $newQuota.bound}")
           overriddenQuota.put(quotaId, newQuota)
           (sanitizedUser, clientId) match {
             case (Some(_), Some(_)) => quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
@@ -443,7 +443,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
             case (None, None) =>
           }
         case None =>
-          logger.info(s"Removing ${quotaType} quota for ${userInfo}${clientIdInfo}")
+          info(s"Removing ${quotaType} quota for ${userInfo}${clientIdInfo}")
           overriddenQuota.remove(quotaId)
       }
 
@@ -463,7 +463,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
           if (metric != null) {
             val metricConfigEntity = quotaEntity(sanitizedUser.getOrElse(""), clientId.getOrElse(""), sanitizedClientId.getOrElse(""))
             val newQuota = metricConfigEntity.quota
-            logger.info(s"Sensor for ${userInfo}${clientIdInfo} already exists. Changing quota to ${newQuota.bound()} in MetricConfig")
+            info(s"Sensor for ${userInfo}${clientIdInfo} already exists. Changing quota to ${newQuota.bound()} in MetricConfig")
             metric.config(getQuotaMetricConfig(newQuota))
           }
       } else {
@@ -474,7 +474,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
               val metricConfigEntity = quotaEntity(userTag, clientIdTag, Sanitizer.sanitize(clientIdTag))
               if (metricConfigEntity.quota != metric.config.quota) {
                 val newQuota = metricConfigEntity.quota
-                logger.info(s"Sensor for quota-id ${metricConfigEntity.quotaId} already exists. Setting quota to ${newQuota.bound} in MetricConfig")
+                info(s"Sensor for quota-id ${metricConfigEntity.quotaId} already exists. Setting quota to ${newQuota.bound} in MetricConfig")
                 metric.config(getQuotaMetricConfig(newQuota))
               }
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 0edd638..390222d 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -74,10 +74,10 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC
       if (topicConfig.containsKey(prop) && topicConfig.getProperty(prop).length > 0) {
         val partitions = parseThrottledPartitions(topicConfig, kafkaConfig.brokerId, prop)
         quotaManager.markThrottled(topic, partitions)
-        logger.debug(s"Setting $prop on broker ${kafkaConfig.brokerId} for topic: $topic and partitions $partitions")
+        debug(s"Setting $prop on broker ${kafkaConfig.brokerId} for topic: $topic and partitions $partitions")
       } else {
         quotaManager.removeThrottle(topic)
-        logger.debug(s"Removing $prop from broker ${kafkaConfig.brokerId} for topic $topic")
+        debug(s"Removing $prop from broker ${kafkaConfig.brokerId} for topic $topic")
       }
     }
     updateThrottledList(LogConfig.LeaderReplicationThrottledReplicasProp, quotas.leader)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
index 0f1abfc..6392723 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -121,7 +121,7 @@ class DynamicConfigManager(private val oldZkUtils: ZkUtils,
       }
 
       val entityConfig = AdminUtils.fetchEntityConfig(oldZkUtils, entityType, entity)
-      logger.info(s"Processing override for entityType: $entityType, entity: $entity with config: $entityConfig")
+      info(s"Processing override for entityType: $entityType, entity: $entity with config: $entityConfig")
       configHandlers(entityType).processConfigChanges(entity, entityConfig)
 
     }
@@ -145,7 +145,7 @@ class DynamicConfigManager(private val oldZkUtils: ZkUtils,
       val loggableConfig = entityConfig.asScala.map {
         case (k, v) => (k, if (ScramMechanism.isScram(k)) Password.HIDDEN else v)
       }
-      logger.info(s"Processing override for entityPath: $entityPath with config: $loggableConfig")
+      info(s"Processing override for entityPath: $entityPath with config: $loggableConfig")
       configHandlers(rootEntityType).processConfigChanges(fullSanitizedEntityName, entityConfig)
 
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index ced3f0b..de56986 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -205,7 +205,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava))
     }
 
-    CoreUtils.swallow(replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads())
+    CoreUtils.swallow(replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads(), this)
   }
 
   def handleUpdateMetadataRequest(request: RequestChannel.Request) {
@@ -1823,12 +1823,12 @@ class KafkaApis(val requestChannel: RequestChannel,
                   throw new InvalidRequestException("Invalid empty resource name")
                 auth.addAcls(immutable.Set(acl), resource)
 
-                logger.debug(s"Added acl $acl to $resource")
+                debug(s"Added acl $acl to $resource")
 
                 new AclCreationResponse(ApiError.NONE)
               } catch {
                 case throwable: Throwable =>
-                  logger.debug(s"Failed to add acl $acl to $resource", throwable)
+                  debug(s"Failed to add acl $acl to $resource", throwable)
                   new AclCreationResponse(ApiError.fromThrowable(throwable))
               }
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index a13f5af..1812eb0 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -517,64 +517,64 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
       // last in the `if` block. If the order is reversed, we could shutdown twice or leave `isShuttingDown` set to
       // `true` at the end of this method.
       if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) {
-        CoreUtils.swallow(controlledShutdown())
+        CoreUtils.swallow(controlledShutdown(), this)
         brokerState.newState(BrokerShuttingDown)
 
         if (kafkaHealthcheck != null)
-          CoreUtils.swallow(kafkaHealthcheck.shutdown())
+          CoreUtils.swallow(kafkaHealthcheck.shutdown(), this)
 
         if (dynamicConfigManager != null)
-          CoreUtils.swallow(dynamicConfigManager.shutdown())
+          CoreUtils.swallow(dynamicConfigManager.shutdown(), this)
 
         // Stop socket server to stop accepting any more connections and requests.
         // Socket server will be shutdown towards the end of the sequence.
         if (socketServer != null)
-          CoreUtils.swallow(socketServer.stopProcessingRequests())
+          CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
         if (requestHandlerPool != null)
-          CoreUtils.swallow(requestHandlerPool.shutdown())
+          CoreUtils.swallow(requestHandlerPool.shutdown(), this)
 
-        CoreUtils.swallow(kafkaScheduler.shutdown())
+        CoreUtils.swallow(kafkaScheduler.shutdown(), this)
 
         if (apis != null)
-          CoreUtils.swallow(apis.close())
-        CoreUtils.swallow(authorizer.foreach(_.close()))
+          CoreUtils.swallow(apis.close(), this)
+        CoreUtils.swallow(authorizer.foreach(_.close()), this)
         if (adminManager != null)
-          CoreUtils.swallow(adminManager.shutdown())
+          CoreUtils.swallow(adminManager.shutdown(), this)
 
         if (transactionCoordinator != null)
-          CoreUtils.swallow(transactionCoordinator.shutdown())
+          CoreUtils.swallow(transactionCoordinator.shutdown(), this)
         if (groupCoordinator != null)
-          CoreUtils.swallow(groupCoordinator.shutdown())
+          CoreUtils.swallow(groupCoordinator.shutdown(), this)
 
         if (replicaManager != null)
-          CoreUtils.swallow(replicaManager.shutdown())
+          CoreUtils.swallow(replicaManager.shutdown(), this)
         if (logManager != null)
-          CoreUtils.swallow(logManager.shutdown())
+          CoreUtils.swallow(logManager.shutdown(), this)
 
         if (kafkaController != null)
-          CoreUtils.swallow(kafkaController.shutdown())
+          CoreUtils.swallow(kafkaController.shutdown(), this)
         if (zkUtils != null)
-          CoreUtils.swallow(zkUtils.close())
+          CoreUtils.swallow(zkUtils.close(), this)
         if (zkClient != null)
-          CoreUtils.swallow(zkClient.close())
+          CoreUtils.swallow(zkClient.close(), this)
 
         if (quotaManagers != null)
-          CoreUtils.swallow(quotaManagers.shutdown())
+          CoreUtils.swallow(quotaManagers.shutdown(), this)
         // Even though socket server is stopped much earlier, controller can generate
         // response for controlled shutdown request. Shutdown server at the end to
         // avoid any failures (e.g. when metrics are recorded)
         if (socketServer != null)
-          CoreUtils.swallow(socketServer.shutdown())
+          CoreUtils.swallow(socketServer.shutdown(), this)
         if (metrics != null)
-          CoreUtils.swallow(metrics.close())
+          CoreUtils.swallow(metrics.close(), this)
         if (brokerTopicStats != null)
-          CoreUtils.swallow(brokerTopicStats.close())
+          CoreUtils.swallow(brokerTopicStats.close(), this)
 
         brokerState.newState(NotRunning)
 
         startupComplete.set(false)
         isShuttingDown.set(false)
-        CoreUtils.swallow(AppInfoParser.unregisterAppInfo(jmxPrefix, config.brokerId.toString, metrics))
+        CoreUtils.swallow(AppInfoParser.unregisterAppInfo(jmxPrefix, config.brokerId.toString, metrics), this)
         shutdownLatch.countDown()
         info("shut down completed")
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 3bc68da..4413165 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -99,14 +99,14 @@ class ReplicaFetcherThread(name: String,
       throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(
         topicPartition, fetchOffset, replica.logEndOffset.messageOffset))
 
-    if (logger.isTraceEnabled)
+    if (isTraceEnabled)
       trace("Follower has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
         .format(replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))
 
     // Append the leader's messages to the log
     partition.appendRecordsToFollower(records)
 
-    if (logger.isTraceEnabled)
+    if (isTraceEnabled)
       trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s"
         .format(replica.logEndOffset.messageOffset, records.sizeInBytes, topicPartition))
     val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark)
@@ -116,7 +116,7 @@ class ReplicaFetcherThread(name: String,
     // these values will be computed upon making the leader
     replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)
     replica.maybeIncrementLogStartOffset(leaderLogStartOffset)
-    if (logger.isTraceEnabled)
+    if (isTraceEnabled)
       trace(s"Follower set replica high watermark for partition $topicPartition to $followerHighWatermark")
     if (quota.isThrottled(topicPartition))
       quota.record(records.sizeInBytes)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 02690f1..7d2d371 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets
 import java.util.concurrent.CountDownLatch
 import java.util.{Locale, Properties, Random}
 
+import com.typesafe.scalalogging.LazyLogging
 import joptsimple._
 import kafka.api.OffsetRequest
 import kafka.common.{MessageFormatter, StreamEndException}
@@ -35,7 +36,6 @@ import org.apache.kafka.common.errors.{AuthenticationException, WakeupException}
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.serialization.Deserializer
 import org.apache.kafka.common.utils.Utils
-import org.apache.log4j.Logger
 
 import scala.collection.JavaConverters._
 
@@ -568,17 +568,15 @@ class DefaultMessageFormatter extends MessageFormatter {
   }
 }
 
-class LoggingMessageFormatter extends MessageFormatter   {
+class LoggingMessageFormatter extends MessageFormatter with LazyLogging {
   private val defaultWriter: DefaultMessageFormatter = new DefaultMessageFormatter
-  val logger = Logger.getLogger(getClass().getName)
 
   override def init(props: Properties): Unit = defaultWriter.init(props)
 
   def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
     import consumerRecord._
     defaultWriter.writeTo(consumerRecord, output)
-    if (logger.isInfoEnabled)
-      logger.info({if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) s"$timestampType:$timestamp, " else ""} +
+    logger.info({if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) s"$timestampType:$timestamp, " else ""} +
                   s"key:${if (key == null) "null" else new String(key, StandardCharsets.UTF_8)}, " +
                   s"value:${if (value == null) "null" else new String(value, StandardCharsets.UTF_8)}")
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index bdec41f..a3e60e6 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -23,7 +23,6 @@ import scala.collection.JavaConverters._
 import java.util.concurrent.atomic.AtomicLong
 import java.nio.channels.ClosedByInterruptException
 
-import org.apache.log4j.Logger
 import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.utils.Utils
@@ -38,13 +37,14 @@ import kafka.consumer.ConsumerTimeoutException
 import java.text.SimpleDateFormat
 import java.util.concurrent.atomic.AtomicBoolean
 
+import com.typesafe.scalalogging.LazyLogging
+
 import scala.collection.mutable
 
 /**
  * Performance test for the full zookeeper consumer
  */
-object ConsumerPerformance {
-  private val logger = Logger.getLogger(getClass())
+object ConsumerPerformance extends LazyLogging {
 
   def main(args: Array[String]): Unit = {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index b5b1540..618fd2a 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -444,18 +444,18 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
           exitingOnSendFailure = true
           fatal("Mirror maker thread failure due to ", t)
       } finally {
-        CoreUtils.swallow {
+        CoreUtils.swallow ({
           info("Flushing producer.")
           producer.flush()
 
           // note that this commit is skipped if flush() fails which ensures that we don't lose messages
           info("Committing consumer offsets.")
           commitOffsets(mirrorMakerConsumer)
-        }
+        }, this)
 
         info("Shutting down consumer connectors.")
-        CoreUtils.swallow(mirrorMakerConsumer.stop())
-        CoreUtils.swallow(mirrorMakerConsumer.cleanup())
+        CoreUtils.swallow(mirrorMakerConsumer.stop(), this)
+        CoreUtils.swallow(mirrorMakerConsumer.cleanup(), this)
         shutdownLatch.countDown()
         info("Mirror maker thread stopped")
         // if it exits accidentally, stop the entire mirror maker

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/tools/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
index 77f560b..365c9af 100644
--- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
@@ -31,7 +31,7 @@ import java.math.BigInteger
 import java.nio.charset.StandardCharsets
 
 import org.apache.kafka.common.utils.Utils
-import org.apache.log4j.Logger
+import org.slf4j.LoggerFactory
 
 /**
  * Load test for the producer
@@ -40,7 +40,6 @@ import org.apache.log4j.Logger
 object ProducerPerformance extends Logging {
 
   def main(args: Array[String]) {
-    val logger = Logger.getLogger(getClass)
     val config = new ProducerPerfConfig(args)
     if (!config.isFixedSize)
       logger.info("WARN: Throughput will be slower due to changing message size per request")

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
index 1d090b3..888d462 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
@@ -20,10 +20,10 @@ package kafka.tools
 import java.net.URI
 import java.text.SimpleDateFormat
 
+import com.typesafe.scalalogging.LazyLogging
 import kafka.api.{FetchRequestBuilder, OffsetRequest, PartitionOffsetRequestInfo}
 import kafka.consumer.SimpleConsumer
 import kafka.utils._
-import org.apache.log4j.Logger
 import kafka.common.TopicAndPartition
 import org.apache.kafka.common.utils.Time
 
@@ -32,9 +32,7 @@ import org.apache.kafka.common.utils.Time
  * Performance test for the simple consumer
  */
 @deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
-object SimpleConsumerPerformance {
-
-  private val logger = Logger.getLogger(getClass())
+object SimpleConsumerPerformance extends LazyLogging {
 
   def main(args: Array[String]) {
     logger.warn("WARNING: SimpleConsumerPerformance is deprecated and will be dropped in a future release following 0.11.0.0.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 7a853d5..efd4d1e 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -31,6 +31,7 @@ import kafka.cluster.EndPoint
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.{Base64, KafkaThread, Utils}
+import org.slf4j.event.Level
 
 /**
  * General helper functions!
@@ -73,15 +74,23 @@ object CoreUtils extends Logging {
     new KafkaThread(name, runnable(fun), daemon)
 
   /**
-   * Do the given action and log any exceptions thrown without rethrowing them
-   * @param log The log method to use for logging. E.g. logger.warn
-   * @param action The action to execute
-   */
-  def swallow(log: (Object, Throwable) => Unit, action: => Unit) {
+    * Do the given action and log any exceptions thrown without rethrowing them.
+    *
+    * @param action The action to execute.
+    * @param logging The logging instance to use for logging the thrown exception.
+    * @param logLevel The log level to use for logging.
+    */
+  def swallow(action: => Unit, logging: Logging, logLevel: Level = Level.WARN) {
     try {
       action
     } catch {
-      case e: Throwable => log(e.getMessage(), e)
+      case e: Throwable => logLevel match {
+        case Level.ERROR => logger.error(e.getMessage, e)
+        case Level.WARN => logger.warn(e.getMessage, e)
+        case Level.INFO => logger.info(e.getMessage, e)
+        case Level.DEBUG => logger.debug(e.getMessage, e)
+        case Level.TRACE => logger.trace(e.getMessage, e)
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/utils/Log4jController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Log4jController.scala b/core/src/main/scala/kafka/utils/Log4jController.scala
index 026fbae..95d0733 100755
--- a/core/src/main/scala/kafka/utils/Log4jController.scala
+++ b/core/src/main/scala/kafka/utils/Log4jController.scala
@@ -17,19 +17,10 @@
 
 package kafka.utils
 
-
-import org.apache.log4j.{Logger, Level, LogManager}
 import java.util
 import java.util.Locale
 
-
-object Log4jController {
-
-  private val controller = new Log4jController
-
-  CoreUtils.registerMBean(controller, "kafka:type=kafka.Log4jController")
-
-}
+import org.apache.log4j.{Level, LogManager, Logger}
 
 
 /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/utils/Logging.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Logging.scala b/core/src/main/scala/kafka/utils/Logging.scala
index c2585ad..e409bba 100755
--- a/core/src/main/scala/kafka/utils/Logging.scala
+++ b/core/src/main/scala/kafka/utils/Logging.scala
@@ -17,106 +17,64 @@
 
 package kafka.utils
 
-import org.apache.log4j.Logger
+import com.typesafe.scalalogging.{LazyLogging, Logger}
+import org.slf4j.{Marker, MarkerFactory}
 
-trait Logging {
-  val loggerName = this.getClass.getName
-  lazy val logger = Logger.getLogger(loggerName)
 
-  protected var logIdent: String = null
+object Log4jControllerRegistration {
+  private val logger = Logger(this.getClass.getName)
 
-  // Force initialization to register Log4jControllerMBean
-  private val log4jController = Log4jController
+  try {
+    val log4jController = Class.forName("kafka.utils.Log4jController").asInstanceOf[Class[Object]]
+    val instance = log4jController.getDeclaredConstructor().newInstance()
+    CoreUtils.registerMBean(instance, "kafka:type=kafka.Log4jController")
+    logger.info("Registered kafka:type=kafka.Log4jController MBean")
+  } catch {
+    case _: Exception => logger.info("Couldn't register kafka:type=kafka.Log4jController MBean")
+  }
+}
+
+private object Logging {
+  private val FatalMarker: Marker = MarkerFactory.getMarker("FATAL")
+}
+
+trait Logging extends LazyLogging {
+  def loggerName: String = logger.underlying.getName
 
-  protected def msgWithLogIdent(msg: String) =
+  protected var logIdent: String = _
+
+  Log4jControllerRegistration
+
+  protected def msgWithLogIdent(msg: String): String =
     if (logIdent == null) msg else logIdent + msg
 
-  def trace(msg: => String): Unit = {
-    if (logger.isTraceEnabled())
-      logger.trace(msgWithLogIdent(msg))
-  }
-  def trace(e: => Throwable): Any = {
-    if (logger.isTraceEnabled())
-      logger.trace(logIdent,e)
-  }
-  def trace(msg: => String, e: => Throwable) = {
-    if (logger.isTraceEnabled())
-      logger.trace(msgWithLogIdent(msg),e)
-  }
-  def swallowTrace(action: => Unit) {
-    CoreUtils.swallow(logger.trace, action)
-  }
+  def trace(msg: => String): Unit = logger.trace(msgWithLogIdent(msg))
 
-  def isDebugEnabled: Boolean = logger.isDebugEnabled
+  def trace(msg: => String, e: => Throwable): Unit = logger.trace(msgWithLogIdent(msg),e)
 
-  def isTraceEnabled: Boolean = logger.isTraceEnabled
+  def isDebugEnabled: Boolean = logger.underlying.isDebugEnabled
 
-  def debug(msg: => String): Unit = {
-    if (logger.isDebugEnabled())
-      logger.debug(msgWithLogIdent(msg))
-  }
-  def debug(e: => Throwable): Any = {
-    if (logger.isDebugEnabled())
-      logger.debug(logIdent,e)
-  }
-  def debug(msg: => String, e: => Throwable) = {
-    if (logger.isDebugEnabled())
-      logger.debug(msgWithLogIdent(msg),e)
-  }
-  def swallowDebug(action: => Unit) {
-    CoreUtils.swallow(logger.debug, action)
-  }
+  def isTraceEnabled: Boolean = logger.underlying.isTraceEnabled
 
-  def info(msg: => String): Unit = {
-    if (logger.isInfoEnabled())
-      logger.info(msgWithLogIdent(msg))
-  }
-  def info(e: => Throwable): Any = {
-    if (logger.isInfoEnabled())
-      logger.info(logIdent,e)
-  }
-  def info(msg: => String,e: => Throwable) = {
-    if (logger.isInfoEnabled())
-      logger.info(msgWithLogIdent(msg),e)
-  }
-  def swallowInfo(action: => Unit) {
-    CoreUtils.swallow(logger.info, action)
-  }
+  def debug(msg: => String): Unit = logger.debug(msgWithLogIdent(msg))
 
-  def warn(msg: => String): Unit = {
-    logger.warn(msgWithLogIdent(msg))
-  }
-  def warn(e: => Throwable): Any = {
-    logger.warn(logIdent,e)
-  }
-  def warn(msg: => String, e: => Throwable) = {
-    logger.warn(msgWithLogIdent(msg),e)
-  }
-  def swallowWarn(action: => Unit) {
-    CoreUtils.swallow(logger.warn, action)
-  }
-  def swallow(action: => Unit) = swallowWarn(action)
+  def debug(msg: => String, e: => Throwable): Unit = logger.debug(msgWithLogIdent(msg),e)
 
-  def error(msg: => String): Unit = {
-    logger.error(msgWithLogIdent(msg))
-  }		
-  def error(e: => Throwable): Any = {
-    logger.error(logIdent,e)
-  }
-  def error(msg: => String, e: => Throwable) = {
-    logger.error(msgWithLogIdent(msg),e)
-  }
-  def swallowError(action: => Unit) {
-    CoreUtils.swallow(logger.error, action)
-  }
+  def info(msg: => String): Unit = logger.info(msgWithLogIdent(msg))
 
-  def fatal(msg: => String): Unit = {
-    logger.fatal(msgWithLogIdent(msg))
-  }
-  def fatal(e: => Throwable): Any = {
-    logger.fatal(logIdent,e)
-  }	
-  def fatal(msg: => String, e: => Throwable) = {
-    logger.fatal(msgWithLogIdent(msg),e)
-  }
+  def info(msg: => String,e: => Throwable): Unit = logger.info(msgWithLogIdent(msg),e)
+
+  def warn(msg: => String): Unit = logger.warn(msgWithLogIdent(msg))
+
+  def warn(msg: => String, e: => Throwable): Unit = logger.warn(msgWithLogIdent(msg),e)
+
+  def error(msg: => String): Unit = logger.error(msgWithLogIdent(msg))
+
+  def error(msg: => String, e: => Throwable): Unit = logger.error(msgWithLogIdent(msg),e)
+
+  def fatal(msg: => String): Unit =
+    logger.error(Logging.FatalMarker, msgWithLogIdent(msg))
+
+  def fatal(msg: => String, e: => Throwable): Unit =
+    logger.error(Logging.FatalMarker, msgWithLogIdent(msg), e)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
index 6c964d7..9b30581 100644
--- a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
+++ b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
@@ -205,7 +205,7 @@ class MiniKdc(config: Properties, workDir: File) extends Logging {
           builder.append(line).append("\n")
         addEntriesToDirectoryService(StrSubstitutor.replace(builder, map.asJava))
       }
-      finally CoreUtils.swallow(reader.close())
+      finally CoreUtils.swallow(reader.close(), this)
     }
 
     val bindAddress = config.getProperty(MiniKdc.KdcBindAddress)
@@ -254,7 +254,7 @@ class MiniKdc(config: Properties, workDir: File) extends Logging {
       while ({line = reader.readLine(); line != null}) {
         stringBuilder.append(line).append("{3}")
       }
-    } finally CoreUtils.swallow(reader.close())
+    } finally CoreUtils.swallow(reader.close(), this)
     val output = MessageFormat.format(stringBuilder.toString, realm, host, port.toString, System.lineSeparator())
     Files.write(krb5conf.toPath, output.getBytes(StandardCharsets.UTF_8))
   }
@@ -337,7 +337,7 @@ class MiniKdc(config: Properties, workDir: File) extends Logging {
     try {
       for (ldifEntry <- reader.asScala)
         ds.getAdminSession.add(new DefaultEntry(ds.getSchemaManager, ldifEntry.getEntry))
-    } finally CoreUtils.swallow(reader.close())
+    } finally CoreUtils.swallow(reader.close(), this)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/test/scala/kafka/utils/LoggingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/utils/LoggingTest.scala b/core/src/test/scala/kafka/utils/LoggingTest.scala
new file mode 100644
index 0000000..c0600f8
--- /dev/null
+++ b/core/src/test/scala/kafka/utils/LoggingTest.scala
@@ -0,0 +1,37 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.utils
+
+import java.lang.management.ManagementFactory
+import javax.management.ObjectName
+
+import org.junit.Test
+import org.junit.Assert.{assertEquals, assertTrue}
+
+
+class LoggingTest extends Logging {
+
+  @Test
+  def testLog4jControllerIsRegistered(): Unit = {
+    val mbs = ManagementFactory.getPlatformMBeanServer()
+    val log4jControllerName = ObjectName.getInstance("kafka:type=kafka.Log4jController")
+    assertTrue("kafka.utils.Log4jController is not registered", mbs.isRegistered(log4jControllerName))
+    val instance = mbs.getObjectInstance(log4jControllerName)
+    assertEquals("kafka.utils.Log4jController", instance.getClassName)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index afd619d..bf1ee35 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -79,7 +79,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
       val kafkaConfig = KafkaConfig.fromProps(props)
       val logDir = new File(kafkaConfig.logDirs.head)
       // Make log directory of the partition on the leader broker inaccessible by replacing it with a file
-      CoreUtils.swallow(Utils.delete(logDir))
+      CoreUtils.swallow(Utils.delete(logDir), this)
       logDir.createNewFile()
       assertTrue(logDir.isFile)
 
@@ -144,7 +144,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
     // Make log directory of the partition on the leader broker inaccessible by replacing it with a file
     val replica = leaderServer.replicaManager.getReplicaOrException(partition)
     val logDir = replica.log.get.dir.getParentFile
-    CoreUtils.swallow(Utils.delete(logDir))
+    CoreUtils.swallow(Utils.delete(logDir), this)
     logDir.createNewFile()
     assertTrue(logDir.isFile)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 75adf55..bcddd40 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -135,7 +135,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
 
   @Test
   def testCleanShutdownAfterFailedStartupDueToCorruptLogs() {
-    var server = new KafkaServer(config)
+    val server = new KafkaServer(config)
     server.startup()
     createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server))
     server.shutdown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
index 5e607be..9ff47dd 100755
--- a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
@@ -24,26 +24,25 @@ import java.util.concurrent.locks.ReentrantLock
 import java.nio.ByteBuffer
 import java.util.regex.Pattern
 
-import org.apache.log4j.Logger
 import org.scalatest.junit.JUnitSuite
 import org.junit.Assert._
 import kafka.common.KafkaException
 import kafka.utils.CoreUtils.inLock
 import org.junit.Test
 import org.apache.kafka.common.utils.{Base64, Utils}
+import org.slf4j.event.Level
 
 import scala.collection.JavaConverters._
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, ExecutionContext, Future}
 
-class CoreUtilsTest extends JUnitSuite {
+class CoreUtilsTest extends JUnitSuite with Logging {
 
-  private val logger = Logger.getLogger(classOf[CoreUtilsTest])
   val clusterIdPattern = Pattern.compile("[a-zA-Z0-9_\\-]+")
 
   @Test
   def testSwallow() {
-    CoreUtils.swallow(logger.info, throw new KafkaException("test"))
+    CoreUtils.swallow(throw new KafkaException("test"), this, Level.INFO)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
index adc8d05..d4a829d 100755
--- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
+++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
@@ -19,10 +19,9 @@ package kafka.zk
 
 import org.apache.zookeeper.server.ZooKeeperServer
 import org.apache.zookeeper.server.NIOServerCnxnFactory
-import kafka.utils.TestUtils
+import kafka.utils.{CoreUtils, Logging, TestUtils}
 import java.net.InetSocketAddress
 
-import kafka.utils.CoreUtils
 import org.apache.kafka.common.utils.Utils
 
 /**
@@ -35,7 +34,7 @@ import org.apache.kafka.common.utils.Utils
 // This should be named EmbeddedZooKeeper for consistency with other classes, but since this is widely used by other
 // projects (even though it's internal), we keep the name as it is until we have a publicly supported test library for
 // others to use.
-class EmbeddedZookeeper() {
+class EmbeddedZookeeper() extends Logging {
 
   val snapshotDir = TestUtils.tempDir()
   val logDir = TestUtils.tempDir()
@@ -48,8 +47,8 @@ class EmbeddedZookeeper() {
   val port = zookeeper.getClientPort
 
   def shutdown() {
-    CoreUtils.swallow(zookeeper.shutdown())
-    CoreUtils.swallow(factory.shutdown())
+    CoreUtils.swallow(zookeeper.shutdown(), this)
+    CoreUtils.swallow(factory.shutdown(), this)
 
     def isDown(): Boolean = {
       try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 03741ef..cc1b9c1 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -63,11 +63,11 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
   @After
   def tearDown() {
     if (zkUtils != null)
-     CoreUtils.swallow(zkUtils.close())
+     CoreUtils.swallow(zkUtils.close(), this)
     if (zkClient != null)
      zkClient.close()
     if (zookeeper != null)
-      CoreUtils.swallow(zookeeper.shutdown())
+      CoreUtils.swallow(zookeeper.shutdown(), this)
     Configuration.setConfiguration(null)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/gradle/dependencies.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index cfb0b9b..2436241 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -57,6 +57,7 @@ versions += [
   jersey: "2.25.1",
   jmh: "1.19",
   log4j: "1.2.17",
+  scalaLogging: "3.7.2",
   jopt: "5.0.4",
   junit: "4.12",
   lz4: "1.4",
@@ -99,6 +100,7 @@ libs += [
   jmhCoreBenchmarks: "org.openjdk.jmh:jmh-core-benchmarks:$versions.jmh",
   junit: "junit:junit:$versions.junit",
   log4j: "log4j:log4j:$versions.log4j",
+  scalaLogging: "com.typesafe.scala-logging:scala-logging_$versions.baseScala:$versions.scalaLogging",
   joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt",
   lz4: "org.lz4:lz4-java:$versions.lz4",
   metrics: "com.yammer.metrics:metrics-core:$versions.metrics",