You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/11/22 15:11:34 UTC
kafka git commit: KAFKA-1044;
Eliminate direct and non-optional log4j references from `core`
Repository: kafka
Updated Branches:
refs/heads/trunk 80038e6d2 -> ed8b0315a
KAFKA-1044; Eliminate direct and non-optional log4j references from `core`
Use slf4j (via scala-logging) instead. Also:
- Log4jController is only initialised if log4j if in the classpath
- Use FATAL marker to support log4j's FATAL level (as the log4j-slf4j bridge does)
- Removed `Logging.swallow` in favour of CoreUtils.swallow, which logs to the
correct logger
Author: Viktor Somogyi <vi...@cloudera.com>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #3477 from viktorsomogyi/KAFKA-1044
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ed8b0315
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ed8b0315
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ed8b0315
Branch: refs/heads/trunk
Commit: ed8b0315a6c3705b2a163ce3ab4723234779264f
Parents: 80038e6
Author: Viktor Somogyi <vi...@cloudera.com>
Authored: Wed Nov 22 12:44:15 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed Nov 22 15:06:56 2017 +0000
----------------------------------------------------------------------
build.gradle | 1 +
core/src/main/scala/kafka/Kafka.scala | 2 +-
.../src/main/scala/kafka/admin/AclCommand.scala | 4 +-
.../ZkNodeChangeNotificationListener.scala | 2 +-
.../kafka/consumer/ConsumerFetcherManager.scala | 2 +-
.../controller/ControllerChannelManager.scala | 3 +-
.../kafka/controller/StateChangeLogger.scala | 6 +-
.../main/scala/kafka/log/AbstractIndex.scala | 4 +-
core/src/main/scala/kafka/log/LogManager.scala | 6 +-
core/src/main/scala/kafka/log/LogSegment.scala | 22 +--
.../scala/kafka/network/BlockingChannel.scala | 8 +-
.../scala/kafka/network/RequestChannel.scala | 9 +-
.../main/scala/kafka/network/SocketServer.scala | 11 +-
.../scala/kafka/producer/SyncProducer.scala | 2 +-
.../producer/async/DefaultEventHandler.scala | 9 +-
.../security/auth/SimpleAclAuthorizer.scala | 4 +-
.../main/scala/kafka/server/AdminManager.scala | 4 +-
.../scala/kafka/server/ClientQuotaManager.scala | 10 +-
.../main/scala/kafka/server/ConfigHandler.scala | 4 +-
.../kafka/server/DynamicConfigManager.scala | 4 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 6 +-
.../main/scala/kafka/server/KafkaServer.scala | 42 +++---
.../kafka/server/ReplicaFetcherThread.scala | 6 +-
.../scala/kafka/tools/ConsoleConsumer.scala | 8 +-
.../scala/kafka/tools/ConsumerPerformance.scala | 6 +-
.../main/scala/kafka/tools/MirrorMaker.scala | 8 +-
.../scala/kafka/tools/ProducerPerformance.scala | 3 +-
.../kafka/tools/SimpleConsumerPerformance.scala | 6 +-
core/src/main/scala/kafka/utils/CoreUtils.scala | 21 ++-
.../scala/kafka/utils/Log4jController.scala | 11 +-
core/src/main/scala/kafka/utils/Logging.scala | 136 +++++++------------
.../scala/kafka/security/minikdc/MiniKdc.scala | 6 +-
.../test/scala/kafka/utils/LoggingTest.scala | 37 +++++
.../unit/kafka/server/LogDirFailureTest.scala | 4 +-
.../unit/kafka/server/ServerShutdownTest.scala | 2 +-
.../scala/unit/kafka/utils/CoreUtilsTest.scala | 7 +-
.../scala/unit/kafka/zk/EmbeddedZookeeper.scala | 9 +-
.../unit/kafka/zk/ZooKeeperTestHarness.scala | 4 +-
gradle/dependencies.gradle | 2 +
39 files changed, 218 insertions(+), 223 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 03426cd..1223150 100644
--- a/build.gradle
+++ b/build.gradle
@@ -550,6 +550,7 @@ project(':core') {
compile libs.metrics
compile libs.scala
compile libs.slf4jlog4j
+ compile libs.scalaLogging
compile libs.zkclient
compile libs.zookeeper
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/Kafka.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala
index 9651038..25a7216 100755
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -94,7 +94,7 @@ object Kafka extends Logging {
}
catch {
case e: Throwable =>
- fatal(e)
+ fatal("Exiting Kafka due to fatal exception", e)
Exit.exit(1)
}
Exit.exit(0)
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/admin/AclCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index 4522135..2732f6c 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.utils.Utils
import scala.collection.JavaConverters._
-object AclCommand {
+object AclCommand extends Logging {
val Newline = scala.util.Properties.lineSeparator
val ResourceTypeToValidOperations = Map[ResourceType, Set[Operation]] (
@@ -77,7 +77,7 @@ object AclCommand {
authZ.configure(authorizerProperties.asJava)
f(authZ)
}
- finally CoreUtils.swallow(authZ.close())
+ finally CoreUtils.swallow(authZ.close(), this)
}
private def addAcl(opts: AclCommandOptions) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index f0d4b1b..4cae80c 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -87,7 +87,7 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient,
val (data, _) = zkClient.getDataAndStat(changeZnode)
data match {
case Some(d) => notificationHandler.processNotification(d)
- case None => logger.warn(s"read null data from $changeZnode when processing notification $notification")
+ case None => warn(s"read null data from $changeZnode when processing notification $notification")
}
lastExecutedChange = changeId
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 7cccfe1..0a6b82e 100755
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -69,7 +69,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
config.clientId,
config.socketTimeoutMs,
correlationId.getAndIncrement).topicsMetadata
- if(logger.isDebugEnabled) topicsMetadata.foreach(topicMetadata => debug(topicMetadata.toString()))
+ if(isDebugEnabled) topicsMetadata.foreach(topicMetadata => debug(topicMetadata.toString()))
topicsMetadata.foreach { tmd =>
val topic = tmd.topic
tmd.partitionsMetadata.foreach { pmd =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 7314679..d5bd4e6 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{Node, TopicPartition}
+import org.slf4j.event.Level
import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
@@ -210,7 +211,7 @@ class RequestSendThread(val controllerId: Int,
override def doWork(): Unit = {
- def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(100))
+ def backoff(): Unit = CoreUtils.swallow(Thread.sleep(100), this, Level.TRACE)
val QueueItem(apiKey, requestBuilder, callback) = queue.take()
var clientResponse: ClientResponse = null
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/controller/StateChangeLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/StateChangeLogger.scala b/core/src/main/scala/kafka/controller/StateChangeLogger.scala
index 21c70c3..a1d1bb2 100644
--- a/core/src/main/scala/kafka/controller/StateChangeLogger.scala
+++ b/core/src/main/scala/kafka/controller/StateChangeLogger.scala
@@ -17,11 +17,11 @@
package kafka.controller
+import com.typesafe.scalalogging.Logger
import kafka.utils.Logging
-import org.apache.log4j
object StateChangeLogger {
- private val Logger = log4j.Logger.getLogger("state.change.logger")
+ private val logger = Logger("state.change.logger")
}
/**
@@ -34,7 +34,7 @@ class StateChangeLogger(brokerId: Int, inControllerContext: Boolean, controllerE
if (controllerEpoch.isDefined && !inControllerContext)
throw new IllegalArgumentException("Controller epoch should only be defined if inControllerContext is true")
- override lazy val logger = StateChangeLogger.Logger
+ override lazy val logger = StateChangeLogger.logger
locally {
val prefix = if (inControllerContext) "Controller" else "Broker"
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/log/AbstractIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
index 899c107..9696d8d 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -75,7 +75,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
idx.position(roundDownToExactMultiple(idx.limit(), entrySize))
idx
} finally {
- CoreUtils.swallow(raf.close())
+ CoreUtils.swallow(raf.close(), this)
}
}
@@ -130,7 +130,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
mmap.position(position)
true
} finally {
- CoreUtils.swallow(raf.close())
+ CoreUtils.swallow(raf.close(), this)
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index f1e2fc2..9a61be3 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -206,7 +206,7 @@ class LogManager(logDirs: Seq[File],
info(s"Logs for partitions ${offlineCurrentTopicPartitions.mkString(",")} are offline and " +
s"logs for future partitions ${offlineFutureTopicPartitions.mkString(",")} are offline due to failure on log directory $dir")
- dirLocks.filter(_.file.getParent == dir).foreach(dir => CoreUtils.swallow(dir.destroy()))
+ dirLocks.filter(_.file.getParent == dir).foreach(dir => CoreUtils.swallow(dir.destroy(), this))
}
}
@@ -412,7 +412,7 @@ class LogManager(logDirs: Seq[File],
// stop the cleaner first
if (cleaner != null) {
- CoreUtils.swallow(cleaner.shutdown())
+ CoreUtils.swallow(cleaner.shutdown(), this)
}
// close logs in each dir
@@ -448,7 +448,7 @@ class LogManager(logDirs: Seq[File],
// mark that the shutdown was clean by creating marker file
debug("Writing clean shutdown marker at " + dir)
- CoreUtils.swallow(Files.createFile(new File(dir, Log.CleanShutdownFile).toPath))
+ CoreUtils.swallow(Files.createFile(new File(dir, Log.CleanShutdownFile).toPath), this)
}
} catch {
case e: ExecutionException =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 845f08f..6db2a50 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -287,12 +287,12 @@ class LogSegment(val log: FileRecords,
}
} catch {
case e: CorruptRecordException =>
- logger.warn("Found invalid messages in log segment %s at byte offset %d: %s."
+ warn("Found invalid messages in log segment %s at byte offset %d: %s."
.format(log.file.getAbsolutePath, validBytes, e.getMessage))
}
val truncated = log.sizeInBytes - validBytes
if (truncated > 0)
- logger.debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery")
+ debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery")
log.truncateTo(validBytes)
index.trimToValidSize()
@@ -467,21 +467,21 @@ class LogSegment(val log: FileRecords,
* Close this log segment
*/
def close() {
- CoreUtils.swallow(timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true))
- CoreUtils.swallow(index.close())
- CoreUtils.swallow(timeIndex.close())
- CoreUtils.swallow(log.close())
- CoreUtils.swallow(txnIndex.close())
+ CoreUtils.swallow(timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true), this)
+ CoreUtils.swallow(index.close(), this)
+ CoreUtils.swallow(timeIndex.close(), this)
+ CoreUtils.swallow(log.close(), this)
+ CoreUtils.swallow(txnIndex.close(), this)
}
/**
* Close file handlers used by the log segment but don't write to disk. This is used when the disk may have failed
*/
def closeHandlers() {
- CoreUtils.swallow(index.closeHandler())
- CoreUtils.swallow(timeIndex.closeHandler())
- CoreUtils.swallow(log.closeHandlers())
- CoreUtils.swallow(txnIndex.close())
+ CoreUtils.swallow(index.closeHandler(), this)
+ CoreUtils.swallow(timeIndex.closeHandler(), this)
+ CoreUtils.swallow(log.closeHandlers(), this)
+ CoreUtils.swallow(txnIndex.close(), this)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/network/BlockingChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala
index 69fd054..3493ad3 100644
--- a/core/src/main/scala/kafka/network/BlockingChannel.scala
+++ b/core/src/main/scala/kafka/network/BlockingChannel.scala
@@ -21,7 +21,7 @@ import java.net.InetSocketAddress
import java.nio.channels._
import kafka.api.RequestOrResponse
-import kafka.utils.{Logging, nonthreadsafe}
+import kafka.utils.{CoreUtils, Logging, nonthreadsafe}
import org.apache.kafka.common.network.NetworkReceive
@@ -91,15 +91,15 @@ class BlockingChannel( val host: String,
def disconnect() = lock synchronized {
if(channel != null) {
- swallow(channel.close())
- swallow(channel.socket.close())
+ CoreUtils.swallow(channel.close(), this)
+ CoreUtils.swallow(channel.socket.close(), this)
channel = null
writeChannel = null
}
// closing the main socket channel *should* close the read channel
// but let's do it to be sure.
if(readChannel != null) {
- swallow(readChannel.close())
+ CoreUtils.swallow(readChannel.close(), this)
readChannel = null
}
connected = false
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index a50af45..7cc8619 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -21,6 +21,7 @@ import java.net.InetAddress
import java.nio.ByteBuffer
import java.util.concurrent._
+import com.typesafe.scalalogging.Logger
import com.yammer.metrics.core.{Gauge, Meter}
import kafka.metrics.KafkaMetricsGroup
import kafka.network.RequestChannel.{BaseRequest, SendAction, ShutdownRequest, NoOpAction, CloseConnectionAction}
@@ -31,15 +32,14 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Sanitizer, Time}
-import org.apache.log4j.Logger
import scala.collection.mutable
import scala.reflect.ClassTag
object RequestChannel extends Logging {
- private val requestLogger = Logger.getLogger("kafka.request.logger")
+ private val requestLogger = Logger("kafka.request.logger")
- def isRequestLoggingEnabled: Boolean = requestLogger.isDebugEnabled
+ def isRequestLoggingEnabled: Boolean = requestLogger.underlying.isDebugEnabled
sealed trait BaseRequest
case object ShutdownRequest extends BaseRequest
@@ -176,7 +176,7 @@ object RequestChannel extends Logging {
recordNetworkThreadTimeCallback.foreach(record => record(networkThreadTimeNanos))
if (isRequestLoggingEnabled) {
- val detailsEnabled = requestLogger.isTraceEnabled
+ val detailsEnabled = requestLogger.underlying.isTraceEnabled
val responseString = response.responseAsString.getOrElse(
throw new IllegalStateException("responseAsString should always be defined if request logging is enabled"))
@@ -325,6 +325,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
}
def sendShutdownRequest(): Unit = requestQueue.put(ShutdownRequest)
+
}
object RequestMetrics {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 4366fea..200bfe2 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -38,6 +38,7 @@ import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, KafkaCh
import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time}
+import org.slf4j.event.Level
import scala.collection._
import JavaConverters._
@@ -252,8 +253,8 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ
if (channel != null) {
debug("Closing connection from " + channel.socket.getRemoteSocketAddress())
connectionQuotas.dec(channel.socket.getInetAddress)
- swallowError(channel.socket().close())
- swallowError(channel.close())
+ CoreUtils.swallow(channel.socket().close(), this, Level.ERROR)
+ CoreUtils.swallow(channel.close(), this, Level.ERROR)
}
}
}
@@ -319,8 +320,8 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
}
} finally {
debug("Closing server socket and selector.")
- swallowError(serverChannel.close())
- swallowError(nioSelector.close())
+ CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
+ CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
shutdownComplete()
}
}
@@ -481,7 +482,7 @@ private[kafka] class Processor(val id: Int,
}
} finally {
debug("Closing selector - processor " + id)
- swallowError(closeAll())
+ CoreUtils.swallow(closeAll(), this, Level.ERROR)
shutdownComplete()
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 04527c8..b132293 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -56,7 +56,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
* Also, when verification is turned on, care should be taken to see that the logs don't fill up with unnecessary
* data. So, leaving the rest of the logging at TRACE, while errors should be logged at ERROR level
*/
- if (logger.isDebugEnabled) {
+ if (isDebugEnabled) {
val buffer = new RequestOrResponseSend("", request).buffer
trace("verifying sendbuffer of size " + buffer.limit())
val requestTypeId = buffer.getShort()
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 3e4eaa3..8c7465f 100755
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic._
import kafka.api.{ProducerRequest, TopicMetadata}
import org.apache.kafka.common.utils.{Time, Utils}
+import org.slf4j.event.Level
@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
class DefaultEventHandler[K,V](config: ProducerConfig,
@@ -72,7 +73,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
if (topicMetadataRefreshInterval >= 0 &&
Time.SYSTEM.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
- CoreUtils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))
+ CoreUtils.swallow(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement), this, Level.ERROR)
sendPartitionPerTopicCache.clear()
topicMetadataToRefresh.clear
lastTopicMetadataRefreshTime = Time.SYSTEM.milliseconds
@@ -83,7 +84,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
// back off and update the topic metadata cache before attempting another send operation
Thread.sleep(config.retryBackoffMs)
// get topics of the outstanding produce requests and refresh metadata for those
- CoreUtils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement))
+ CoreUtils.swallow(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement), this, Level.ERROR)
sendPartitionPerTopicCache.clear()
remainingRetries -= 1
producerStats.resendRate.mark()
@@ -105,7 +106,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
case Some(partitionedData) =>
val failedProduceRequests = new ArrayBuffer[KeyedMessage[K, Message]]
for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
- if (logger.isTraceEnabled) {
+ if (isTraceEnabled) {
messagesPerBrokerMap.foreach(partitionAndEvent =>
trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
}
@@ -277,7 +278,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
if(response != null) {
if (response.status.size != producerRequest.data.size)
throw new KafkaException("Incomplete response (%s) for producer request (%s)".format(response, producerRequest))
- if (logger.isTraceEnabled) {
+ if (isTraceEnabled) {
val successfullySentData = response.status.filter(_._2.error == Errors.NONE)
successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message =>
trace("Successfully sent message: %s".format(if(message.message.isNull) null else message.message.toString()))))
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index aa25653..e1befc7 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -19,6 +19,7 @@ package kafka.security.auth
import java.util
import java.util.concurrent.locks.ReentrantReadWriteLock
+import com.typesafe.scalalogging.Logger
import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
import kafka.network.RequestChannel.Session
import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
@@ -29,7 +30,6 @@ import kafka.zk.{AclChangeNotificationSequenceZNode, AclChangeNotificationZNode,
import kafka.zookeeper.ZooKeeperClient
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.SecurityUtils
-import org.apache.log4j.Logger
import scala.collection.JavaConverters._
import scala.util.Random
@@ -51,7 +51,7 @@ object SimpleAclAuthorizer {
}
class SimpleAclAuthorizer extends Authorizer with Logging {
- private val authorizerLogger = Logger.getLogger("kafka.authorizer.logger")
+ private val authorizerLogger = Logger("kafka.authorizer.logger")
private var superUsers = Set.empty[KafkaPrincipal]
private var shouldAllowEveryoneIfNoAclIsFound = false
private var zkClient: KafkaZkClient = null
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/server/AdminManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index aefdefd..935fade 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -387,7 +387,7 @@ class AdminManager(val config: KafkaConfig,
def shutdown() {
topicPurgatory.shutdown()
- CoreUtils.swallow(createTopicPolicy.foreach(_.close()))
- CoreUtils.swallow(alterConfigPolicy.foreach(_.close()))
+ CoreUtils.swallow(createTopicPolicy.foreach(_.close()), this)
+ CoreUtils.swallow(alterConfigPolicy.foreach(_.close()), this)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index a6efa92..8ec27a3 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -194,7 +194,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
// If delayed, add the element to the delayQueue
delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))
delayQueueSensor.record()
- logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs))
+ debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs))
}
throttleTimeMs
}
@@ -434,7 +434,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
}
quota match {
case Some(newQuota) =>
- logger.info(s"Changing ${quotaType} quota for ${userInfo}${clientIdInfo} to $newQuota.bound}")
+ info(s"Changing ${quotaType} quota for ${userInfo}${clientIdInfo} to $newQuota.bound}")
overriddenQuota.put(quotaId, newQuota)
(sanitizedUser, clientId) match {
case (Some(_), Some(_)) => quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
@@ -443,7 +443,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
case (None, None) =>
}
case None =>
- logger.info(s"Removing ${quotaType} quota for ${userInfo}${clientIdInfo}")
+ info(s"Removing ${quotaType} quota for ${userInfo}${clientIdInfo}")
overriddenQuota.remove(quotaId)
}
@@ -463,7 +463,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
if (metric != null) {
val metricConfigEntity = quotaEntity(sanitizedUser.getOrElse(""), clientId.getOrElse(""), sanitizedClientId.getOrElse(""))
val newQuota = metricConfigEntity.quota
- logger.info(s"Sensor for ${userInfo}${clientIdInfo} already exists. Changing quota to ${newQuota.bound()} in MetricConfig")
+ info(s"Sensor for ${userInfo}${clientIdInfo} already exists. Changing quota to ${newQuota.bound()} in MetricConfig")
metric.config(getQuotaMetricConfig(newQuota))
}
} else {
@@ -474,7 +474,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
val metricConfigEntity = quotaEntity(userTag, clientIdTag, Sanitizer.sanitize(clientIdTag))
if (metricConfigEntity.quota != metric.config.quota) {
val newQuota = metricConfigEntity.quota
- logger.info(s"Sensor for quota-id ${metricConfigEntity.quotaId} already exists. Setting quota to ${newQuota.bound} in MetricConfig")
+ info(s"Sensor for quota-id ${metricConfigEntity.quotaId} already exists. Setting quota to ${newQuota.bound} in MetricConfig")
metric.config(getQuotaMetricConfig(newQuota))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 0edd638..390222d 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -74,10 +74,10 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC
if (topicConfig.containsKey(prop) && topicConfig.getProperty(prop).length > 0) {
val partitions = parseThrottledPartitions(topicConfig, kafkaConfig.brokerId, prop)
quotaManager.markThrottled(topic, partitions)
- logger.debug(s"Setting $prop on broker ${kafkaConfig.brokerId} for topic: $topic and partitions $partitions")
+ debug(s"Setting $prop on broker ${kafkaConfig.brokerId} for topic: $topic and partitions $partitions")
} else {
quotaManager.removeThrottle(topic)
- logger.debug(s"Removing $prop from broker ${kafkaConfig.brokerId} for topic $topic")
+ debug(s"Removing $prop from broker ${kafkaConfig.brokerId} for topic $topic")
}
}
updateThrottledList(LogConfig.LeaderReplicationThrottledReplicasProp, quotas.leader)
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
index 0f1abfc..6392723 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -121,7 +121,7 @@ class DynamicConfigManager(private val oldZkUtils: ZkUtils,
}
val entityConfig = AdminUtils.fetchEntityConfig(oldZkUtils, entityType, entity)
- logger.info(s"Processing override for entityType: $entityType, entity: $entity with config: $entityConfig")
+ info(s"Processing override for entityType: $entityType, entity: $entity with config: $entityConfig")
configHandlers(entityType).processConfigChanges(entity, entityConfig)
}
@@ -145,7 +145,7 @@ class DynamicConfigManager(private val oldZkUtils: ZkUtils,
val loggableConfig = entityConfig.asScala.map {
case (k, v) => (k, if (ScramMechanism.isScram(k)) Password.HIDDEN else v)
}
- logger.info(s"Processing override for entityPath: $entityPath with config: $loggableConfig")
+ info(s"Processing override for entityPath: $entityPath with config: $loggableConfig")
configHandlers(rootEntityType).processConfigChanges(fullSanitizedEntityName, entityConfig)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index ced3f0b..de56986 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -205,7 +205,7 @@ class KafkaApis(val requestChannel: RequestChannel,
new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava))
}
- CoreUtils.swallow(replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads())
+ CoreUtils.swallow(replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads(), this)
}
def handleUpdateMetadataRequest(request: RequestChannel.Request) {
@@ -1823,12 +1823,12 @@ class KafkaApis(val requestChannel: RequestChannel,
throw new InvalidRequestException("Invalid empty resource name")
auth.addAcls(immutable.Set(acl), resource)
- logger.debug(s"Added acl $acl to $resource")
+ debug(s"Added acl $acl to $resource")
new AclCreationResponse(ApiError.NONE)
} catch {
case throwable: Throwable =>
- logger.debug(s"Failed to add acl $acl to $resource", throwable)
+ debug(s"Failed to add acl $acl to $resource", throwable)
new AclCreationResponse(ApiError.fromThrowable(throwable))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index a13f5af..1812eb0 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -517,64 +517,64 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
// last in the `if` block. If the order is reversed, we could shutdown twice or leave `isShuttingDown` set to
// `true` at the end of this method.
if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) {
- CoreUtils.swallow(controlledShutdown())
+ CoreUtils.swallow(controlledShutdown(), this)
brokerState.newState(BrokerShuttingDown)
if (kafkaHealthcheck != null)
- CoreUtils.swallow(kafkaHealthcheck.shutdown())
+ CoreUtils.swallow(kafkaHealthcheck.shutdown(), this)
if (dynamicConfigManager != null)
- CoreUtils.swallow(dynamicConfigManager.shutdown())
+ CoreUtils.swallow(dynamicConfigManager.shutdown(), this)
// Stop socket server to stop accepting any more connections and requests.
// Socket server will be shutdown towards the end of the sequence.
if (socketServer != null)
- CoreUtils.swallow(socketServer.stopProcessingRequests())
+ CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
if (requestHandlerPool != null)
- CoreUtils.swallow(requestHandlerPool.shutdown())
+ CoreUtils.swallow(requestHandlerPool.shutdown(), this)
- CoreUtils.swallow(kafkaScheduler.shutdown())
+ CoreUtils.swallow(kafkaScheduler.shutdown(), this)
if (apis != null)
- CoreUtils.swallow(apis.close())
- CoreUtils.swallow(authorizer.foreach(_.close()))
+ CoreUtils.swallow(apis.close(), this)
+ CoreUtils.swallow(authorizer.foreach(_.close()), this)
if (adminManager != null)
- CoreUtils.swallow(adminManager.shutdown())
+ CoreUtils.swallow(adminManager.shutdown(), this)
if (transactionCoordinator != null)
- CoreUtils.swallow(transactionCoordinator.shutdown())
+ CoreUtils.swallow(transactionCoordinator.shutdown(), this)
if (groupCoordinator != null)
- CoreUtils.swallow(groupCoordinator.shutdown())
+ CoreUtils.swallow(groupCoordinator.shutdown(), this)
if (replicaManager != null)
- CoreUtils.swallow(replicaManager.shutdown())
+ CoreUtils.swallow(replicaManager.shutdown(), this)
if (logManager != null)
- CoreUtils.swallow(logManager.shutdown())
+ CoreUtils.swallow(logManager.shutdown(), this)
if (kafkaController != null)
- CoreUtils.swallow(kafkaController.shutdown())
+ CoreUtils.swallow(kafkaController.shutdown(), this)
if (zkUtils != null)
- CoreUtils.swallow(zkUtils.close())
+ CoreUtils.swallow(zkUtils.close(), this)
if (zkClient != null)
- CoreUtils.swallow(zkClient.close())
+ CoreUtils.swallow(zkClient.close(), this)
if (quotaManagers != null)
- CoreUtils.swallow(quotaManagers.shutdown())
+ CoreUtils.swallow(quotaManagers.shutdown(), this)
// Even though socket server is stopped much earlier, controller can generate
// response for controlled shutdown request. Shutdown server at the end to
// avoid any failures (e.g. when metrics are recorded)
if (socketServer != null)
- CoreUtils.swallow(socketServer.shutdown())
+ CoreUtils.swallow(socketServer.shutdown(), this)
if (metrics != null)
- CoreUtils.swallow(metrics.close())
+ CoreUtils.swallow(metrics.close(), this)
if (brokerTopicStats != null)
- CoreUtils.swallow(brokerTopicStats.close())
+ CoreUtils.swallow(brokerTopicStats.close(), this)
brokerState.newState(NotRunning)
startupComplete.set(false)
isShuttingDown.set(false)
- CoreUtils.swallow(AppInfoParser.unregisterAppInfo(jmxPrefix, config.brokerId.toString, metrics))
+ CoreUtils.swallow(AppInfoParser.unregisterAppInfo(jmxPrefix, config.brokerId.toString, metrics), this)
shutdownLatch.countDown()
info("shut down completed")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 3bc68da..4413165 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -99,14 +99,14 @@ class ReplicaFetcherThread(name: String,
throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(
topicPartition, fetchOffset, replica.logEndOffset.messageOffset))
- if (logger.isTraceEnabled)
+ if (isTraceEnabled)
trace("Follower has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
.format(replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))
// Append the leader's messages to the log
partition.appendRecordsToFollower(records)
- if (logger.isTraceEnabled)
+ if (isTraceEnabled)
trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s"
.format(replica.logEndOffset.messageOffset, records.sizeInBytes, topicPartition))
val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark)
@@ -116,7 +116,7 @@ class ReplicaFetcherThread(name: String,
// these values will be computed upon making the leader
replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)
replica.maybeIncrementLogStartOffset(leaderLogStartOffset)
- if (logger.isTraceEnabled)
+ if (isTraceEnabled)
trace(s"Follower set replica high watermark for partition $topicPartition to $followerHighWatermark")
if (quota.isThrottled(topicPartition))
quota.record(records.sizeInBytes)
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 02690f1..7d2d371 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets
import java.util.concurrent.CountDownLatch
import java.util.{Locale, Properties, Random}
+import com.typesafe.scalalogging.LazyLogging
import joptsimple._
import kafka.api.OffsetRequest
import kafka.common.{MessageFormatter, StreamEndException}
@@ -35,7 +36,6 @@ import org.apache.kafka.common.errors.{AuthenticationException, WakeupException}
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.serialization.Deserializer
import org.apache.kafka.common.utils.Utils
-import org.apache.log4j.Logger
import scala.collection.JavaConverters._
@@ -568,17 +568,15 @@ class DefaultMessageFormatter extends MessageFormatter {
}
}
-class LoggingMessageFormatter extends MessageFormatter {
+class LoggingMessageFormatter extends MessageFormatter with LazyLogging {
private val defaultWriter: DefaultMessageFormatter = new DefaultMessageFormatter
- val logger = Logger.getLogger(getClass().getName)
override def init(props: Properties): Unit = defaultWriter.init(props)
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
import consumerRecord._
defaultWriter.writeTo(consumerRecord, output)
- if (logger.isInfoEnabled)
- logger.info({if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) s"$timestampType:$timestamp, " else ""} +
+ logger.info({if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) s"$timestampType:$timestamp, " else ""} +
s"key:${if (key == null) "null" else new String(key, StandardCharsets.UTF_8)}, " +
s"value:${if (value == null) "null" else new String(value, StandardCharsets.UTF_8)}")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index bdec41f..a3e60e6 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -23,7 +23,6 @@ import scala.collection.JavaConverters._
import java.util.concurrent.atomic.AtomicLong
import java.nio.channels.ClosedByInterruptException
-import org.apache.log4j.Logger
import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer}
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.utils.Utils
@@ -38,13 +37,14 @@ import kafka.consumer.ConsumerTimeoutException
import java.text.SimpleDateFormat
import java.util.concurrent.atomic.AtomicBoolean
+import com.typesafe.scalalogging.LazyLogging
+
import scala.collection.mutable
/**
* Performance test for the full zookeeper consumer
*/
-object ConsumerPerformance {
- private val logger = Logger.getLogger(getClass())
+object ConsumerPerformance extends LazyLogging {
def main(args: Array[String]): Unit = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index b5b1540..618fd2a 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -444,18 +444,18 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
exitingOnSendFailure = true
fatal("Mirror maker thread failure due to ", t)
} finally {
- CoreUtils.swallow {
+ CoreUtils.swallow ({
info("Flushing producer.")
producer.flush()
// note that this commit is skipped if flush() fails which ensures that we don't lose messages
info("Committing consumer offsets.")
commitOffsets(mirrorMakerConsumer)
- }
+ }, this)
info("Shutting down consumer connectors.")
- CoreUtils.swallow(mirrorMakerConsumer.stop())
- CoreUtils.swallow(mirrorMakerConsumer.cleanup())
+ CoreUtils.swallow(mirrorMakerConsumer.stop(), this)
+ CoreUtils.swallow(mirrorMakerConsumer.cleanup(), this)
shutdownLatch.countDown()
info("Mirror maker thread stopped")
// if it exits accidentally, stop the entire mirror maker
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/tools/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
index 77f560b..365c9af 100644
--- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
@@ -31,7 +31,7 @@ import java.math.BigInteger
import java.nio.charset.StandardCharsets
import org.apache.kafka.common.utils.Utils
-import org.apache.log4j.Logger
+import org.slf4j.LoggerFactory
/**
* Load test for the producer
@@ -40,7 +40,6 @@ import org.apache.log4j.Logger
object ProducerPerformance extends Logging {
def main(args: Array[String]) {
- val logger = Logger.getLogger(getClass)
val config = new ProducerPerfConfig(args)
if (!config.isFixedSize)
logger.info("WARN: Throughput will be slower due to changing message size per request")
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
index 1d090b3..888d462 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
@@ -20,10 +20,10 @@ package kafka.tools
import java.net.URI
import java.text.SimpleDateFormat
+import com.typesafe.scalalogging.LazyLogging
import kafka.api.{FetchRequestBuilder, OffsetRequest, PartitionOffsetRequestInfo}
import kafka.consumer.SimpleConsumer
import kafka.utils._
-import org.apache.log4j.Logger
import kafka.common.TopicAndPartition
import org.apache.kafka.common.utils.Time
@@ -32,9 +32,7 @@ import org.apache.kafka.common.utils.Time
* Performance test for the simple consumer
*/
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
-object SimpleConsumerPerformance {
-
- private val logger = Logger.getLogger(getClass())
+object SimpleConsumerPerformance extends LazyLogging {
def main(args: Array[String]) {
logger.warn("WARNING: SimpleConsumerPerformance is deprecated and will be dropped in a future release following 0.11.0.0.")
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 7a853d5..efd4d1e 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -31,6 +31,7 @@ import kafka.cluster.EndPoint
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{Base64, KafkaThread, Utils}
+import org.slf4j.event.Level
/**
* General helper functions!
@@ -73,15 +74,23 @@ object CoreUtils extends Logging {
new KafkaThread(name, runnable(fun), daemon)
/**
- * Do the given action and log any exceptions thrown without rethrowing them
- * @param log The log method to use for logging. E.g. logger.warn
- * @param action The action to execute
- */
- def swallow(log: (Object, Throwable) => Unit, action: => Unit) {
+ * Do the given action and log any exceptions thrown without rethrowing them.
+ *
+ * @param action The action to execute.
+ * @param logging The logging instance to use for logging the thrown exception.
+ * @param logLevel The log level to use for logging.
+ */
+ def swallow(action: => Unit, logging: Logging, logLevel: Level = Level.WARN) {
try {
action
} catch {
- case e: Throwable => log(e.getMessage(), e)
+ case e: Throwable => logLevel match {
+ case Level.ERROR => logger.error(e.getMessage, e)
+ case Level.WARN => logger.warn(e.getMessage, e)
+ case Level.INFO => logger.info(e.getMessage, e)
+ case Level.DEBUG => logger.debug(e.getMessage, e)
+ case Level.TRACE => logger.trace(e.getMessage, e)
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/utils/Log4jController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Log4jController.scala b/core/src/main/scala/kafka/utils/Log4jController.scala
index 026fbae..95d0733 100755
--- a/core/src/main/scala/kafka/utils/Log4jController.scala
+++ b/core/src/main/scala/kafka/utils/Log4jController.scala
@@ -17,19 +17,10 @@
package kafka.utils
-
-import org.apache.log4j.{Logger, Level, LogManager}
import java.util
import java.util.Locale
-
-object Log4jController {
-
- private val controller = new Log4jController
-
- CoreUtils.registerMBean(controller, "kafka:type=kafka.Log4jController")
-
-}
+import org.apache.log4j.{Level, LogManager, Logger}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/utils/Logging.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Logging.scala b/core/src/main/scala/kafka/utils/Logging.scala
index c2585ad..e409bba 100755
--- a/core/src/main/scala/kafka/utils/Logging.scala
+++ b/core/src/main/scala/kafka/utils/Logging.scala
@@ -17,106 +17,64 @@
package kafka.utils
-import org.apache.log4j.Logger
+import com.typesafe.scalalogging.{LazyLogging, Logger}
+import org.slf4j.{Marker, MarkerFactory}
-trait Logging {
- val loggerName = this.getClass.getName
- lazy val logger = Logger.getLogger(loggerName)
- protected var logIdent: String = null
+object Log4jControllerRegistration {
+ private val logger = Logger(this.getClass.getName)
- // Force initialization to register Log4jControllerMBean
- private val log4jController = Log4jController
+ try {
+ val log4jController = Class.forName("kafka.utils.Log4jController").asInstanceOf[Class[Object]]
+ val instance = log4jController.getDeclaredConstructor().newInstance()
+ CoreUtils.registerMBean(instance, "kafka:type=kafka.Log4jController")
+ logger.info("Registered kafka:type=kafka.Log4jController MBean")
+ } catch {
+ case _: Exception => logger.info("Couldn't register kafka:type=kafka.Log4jController MBean")
+ }
+}
+
+private object Logging {
+ private val FatalMarker: Marker = MarkerFactory.getMarker("FATAL")
+}
+
+trait Logging extends LazyLogging {
+ def loggerName: String = logger.underlying.getName
- protected def msgWithLogIdent(msg: String) =
+ protected var logIdent: String = _
+
+ Log4jControllerRegistration
+
+ protected def msgWithLogIdent(msg: String): String =
if (logIdent == null) msg else logIdent + msg
- def trace(msg: => String): Unit = {
- if (logger.isTraceEnabled())
- logger.trace(msgWithLogIdent(msg))
- }
- def trace(e: => Throwable): Any = {
- if (logger.isTraceEnabled())
- logger.trace(logIdent,e)
- }
- def trace(msg: => String, e: => Throwable) = {
- if (logger.isTraceEnabled())
- logger.trace(msgWithLogIdent(msg),e)
- }
- def swallowTrace(action: => Unit) {
- CoreUtils.swallow(logger.trace, action)
- }
+ def trace(msg: => String): Unit = logger.trace(msgWithLogIdent(msg))
- def isDebugEnabled: Boolean = logger.isDebugEnabled
+ def trace(msg: => String, e: => Throwable): Unit = logger.trace(msgWithLogIdent(msg),e)
- def isTraceEnabled: Boolean = logger.isTraceEnabled
+ def isDebugEnabled: Boolean = logger.underlying.isDebugEnabled
- def debug(msg: => String): Unit = {
- if (logger.isDebugEnabled())
- logger.debug(msgWithLogIdent(msg))
- }
- def debug(e: => Throwable): Any = {
- if (logger.isDebugEnabled())
- logger.debug(logIdent,e)
- }
- def debug(msg: => String, e: => Throwable) = {
- if (logger.isDebugEnabled())
- logger.debug(msgWithLogIdent(msg),e)
- }
- def swallowDebug(action: => Unit) {
- CoreUtils.swallow(logger.debug, action)
- }
+ def isTraceEnabled: Boolean = logger.underlying.isTraceEnabled
- def info(msg: => String): Unit = {
- if (logger.isInfoEnabled())
- logger.info(msgWithLogIdent(msg))
- }
- def info(e: => Throwable): Any = {
- if (logger.isInfoEnabled())
- logger.info(logIdent,e)
- }
- def info(msg: => String,e: => Throwable) = {
- if (logger.isInfoEnabled())
- logger.info(msgWithLogIdent(msg),e)
- }
- def swallowInfo(action: => Unit) {
- CoreUtils.swallow(logger.info, action)
- }
+ def debug(msg: => String): Unit = logger.debug(msgWithLogIdent(msg))
- def warn(msg: => String): Unit = {
- logger.warn(msgWithLogIdent(msg))
- }
- def warn(e: => Throwable): Any = {
- logger.warn(logIdent,e)
- }
- def warn(msg: => String, e: => Throwable) = {
- logger.warn(msgWithLogIdent(msg),e)
- }
- def swallowWarn(action: => Unit) {
- CoreUtils.swallow(logger.warn, action)
- }
- def swallow(action: => Unit) = swallowWarn(action)
+ def debug(msg: => String, e: => Throwable): Unit = logger.debug(msgWithLogIdent(msg),e)
- def error(msg: => String): Unit = {
- logger.error(msgWithLogIdent(msg))
- }
- def error(e: => Throwable): Any = {
- logger.error(logIdent,e)
- }
- def error(msg: => String, e: => Throwable) = {
- logger.error(msgWithLogIdent(msg),e)
- }
- def swallowError(action: => Unit) {
- CoreUtils.swallow(logger.error, action)
- }
+ def info(msg: => String): Unit = logger.info(msgWithLogIdent(msg))
- def fatal(msg: => String): Unit = {
- logger.fatal(msgWithLogIdent(msg))
- }
- def fatal(e: => Throwable): Any = {
- logger.fatal(logIdent,e)
- }
- def fatal(msg: => String, e: => Throwable) = {
- logger.fatal(msgWithLogIdent(msg),e)
- }
+ def info(msg: => String,e: => Throwable): Unit = logger.info(msgWithLogIdent(msg),e)
+
+ def warn(msg: => String): Unit = logger.warn(msgWithLogIdent(msg))
+
+ def warn(msg: => String, e: => Throwable): Unit = logger.warn(msgWithLogIdent(msg),e)
+
+ def error(msg: => String): Unit = logger.error(msgWithLogIdent(msg))
+
+ def error(msg: => String, e: => Throwable): Unit = logger.error(msgWithLogIdent(msg),e)
+
+ def fatal(msg: => String): Unit =
+ logger.error(Logging.FatalMarker, msgWithLogIdent(msg))
+
+ def fatal(msg: => String, e: => Throwable): Unit =
+ logger.error(Logging.FatalMarker, msgWithLogIdent(msg), e)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
index 6c964d7..9b30581 100644
--- a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
+++ b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
@@ -205,7 +205,7 @@ class MiniKdc(config: Properties, workDir: File) extends Logging {
builder.append(line).append("\n")
addEntriesToDirectoryService(StrSubstitutor.replace(builder, map.asJava))
}
- finally CoreUtils.swallow(reader.close())
+ finally CoreUtils.swallow(reader.close(), this)
}
val bindAddress = config.getProperty(MiniKdc.KdcBindAddress)
@@ -254,7 +254,7 @@ class MiniKdc(config: Properties, workDir: File) extends Logging {
while ({line = reader.readLine(); line != null}) {
stringBuilder.append(line).append("{3}")
}
- } finally CoreUtils.swallow(reader.close())
+ } finally CoreUtils.swallow(reader.close(), this)
val output = MessageFormat.format(stringBuilder.toString, realm, host, port.toString, System.lineSeparator())
Files.write(krb5conf.toPath, output.getBytes(StandardCharsets.UTF_8))
}
@@ -337,7 +337,7 @@ class MiniKdc(config: Properties, workDir: File) extends Logging {
try {
for (ldifEntry <- reader.asScala)
ds.getAdminSession.add(new DefaultEntry(ds.getSchemaManager, ldifEntry.getEntry))
- } finally CoreUtils.swallow(reader.close())
+ } finally CoreUtils.swallow(reader.close(), this)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/test/scala/kafka/utils/LoggingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/utils/LoggingTest.scala b/core/src/test/scala/kafka/utils/LoggingTest.scala
new file mode 100644
index 0000000..c0600f8
--- /dev/null
+++ b/core/src/test/scala/kafka/utils/LoggingTest.scala
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import java.lang.management.ManagementFactory
+import javax.management.ObjectName
+
+import org.junit.Test
+import org.junit.Assert.{assertEquals, assertTrue}
+
+
+class LoggingTest extends Logging {
+
+ @Test
+ def testLog4jControllerIsRegistered(): Unit = {
+ val mbs = ManagementFactory.getPlatformMBeanServer()
+ val log4jControllerName = ObjectName.getInstance("kafka:type=kafka.Log4jController")
+ assertTrue("kafka.utils.Log4jController is not registered", mbs.isRegistered(log4jControllerName))
+ val instance = mbs.getObjectInstance(log4jControllerName)
+ assertEquals("kafka.utils.Log4jController", instance.getClassName)
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index afd619d..bf1ee35 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -79,7 +79,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
val kafkaConfig = KafkaConfig.fromProps(props)
val logDir = new File(kafkaConfig.logDirs.head)
// Make log directory of the partition on the leader broker inaccessible by replacing it with a file
- CoreUtils.swallow(Utils.delete(logDir))
+ CoreUtils.swallow(Utils.delete(logDir), this)
logDir.createNewFile()
assertTrue(logDir.isFile)
@@ -144,7 +144,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
// Make log directory of the partition on the leader broker inaccessible by replacing it with a file
val replica = leaderServer.replicaManager.getReplicaOrException(partition)
val logDir = replica.log.get.dir.getParentFile
- CoreUtils.swallow(Utils.delete(logDir))
+ CoreUtils.swallow(Utils.delete(logDir), this)
logDir.createNewFile()
assertTrue(logDir.isFile)
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 75adf55..bcddd40 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -135,7 +135,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
@Test
def testCleanShutdownAfterFailedStartupDueToCorruptLogs() {
- var server = new KafkaServer(config)
+ val server = new KafkaServer(config)
server.startup()
createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server))
server.shutdown()
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
index 5e607be..9ff47dd 100755
--- a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
@@ -24,26 +24,25 @@ import java.util.concurrent.locks.ReentrantLock
import java.nio.ByteBuffer
import java.util.regex.Pattern
-import org.apache.log4j.Logger
import org.scalatest.junit.JUnitSuite
import org.junit.Assert._
import kafka.common.KafkaException
import kafka.utils.CoreUtils.inLock
import org.junit.Test
import org.apache.kafka.common.utils.{Base64, Utils}
+import org.slf4j.event.Level
import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
-class CoreUtilsTest extends JUnitSuite {
+class CoreUtilsTest extends JUnitSuite with Logging {
- private val logger = Logger.getLogger(classOf[CoreUtilsTest])
val clusterIdPattern = Pattern.compile("[a-zA-Z0-9_\\-]+")
@Test
def testSwallow() {
- CoreUtils.swallow(logger.info, throw new KafkaException("test"))
+ CoreUtils.swallow(throw new KafkaException("test"), this, Level.INFO)
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
index adc8d05..d4a829d 100755
--- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
+++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
@@ -19,10 +19,9 @@ package kafka.zk
import org.apache.zookeeper.server.ZooKeeperServer
import org.apache.zookeeper.server.NIOServerCnxnFactory
-import kafka.utils.TestUtils
+import kafka.utils.{CoreUtils, Logging, TestUtils}
import java.net.InetSocketAddress
-import kafka.utils.CoreUtils
import org.apache.kafka.common.utils.Utils
/**
@@ -35,7 +34,7 @@ import org.apache.kafka.common.utils.Utils
// This should be named EmbeddedZooKeeper for consistency with other classes, but since this is widely used by other
// projects (even though it's internal), we keep the name as it is until we have a publicly supported test library for
// others to use.
-class EmbeddedZookeeper() {
+class EmbeddedZookeeper() extends Logging {
val snapshotDir = TestUtils.tempDir()
val logDir = TestUtils.tempDir()
@@ -48,8 +47,8 @@ class EmbeddedZookeeper() {
val port = zookeeper.getClientPort
def shutdown() {
- CoreUtils.swallow(zookeeper.shutdown())
- CoreUtils.swallow(factory.shutdown())
+ CoreUtils.swallow(zookeeper.shutdown(), this)
+ CoreUtils.swallow(factory.shutdown(), this)
def isDown(): Boolean = {
try {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 03741ef..cc1b9c1 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -63,11 +63,11 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
@After
def tearDown() {
if (zkUtils != null)
- CoreUtils.swallow(zkUtils.close())
+ CoreUtils.swallow(zkUtils.close(), this)
if (zkClient != null)
zkClient.close()
if (zookeeper != null)
- CoreUtils.swallow(zookeeper.shutdown())
+ CoreUtils.swallow(zookeeper.shutdown(), this)
Configuration.setConfiguration(null)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/gradle/dependencies.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index cfb0b9b..2436241 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -57,6 +57,7 @@ versions += [
jersey: "2.25.1",
jmh: "1.19",
log4j: "1.2.17",
+ scalaLogging: "3.7.2",
jopt: "5.0.4",
junit: "4.12",
lz4: "1.4",
@@ -99,6 +100,7 @@ libs += [
jmhCoreBenchmarks: "org.openjdk.jmh:jmh-core-benchmarks:$versions.jmh",
junit: "junit:junit:$versions.junit",
log4j: "log4j:log4j:$versions.log4j",
+ scalaLogging: "com.typesafe.scala-logging:scala-logging_$versions.baseScala:$versions.scalaLogging",
joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt",
lz4: "org.lz4:lz4-java:$versions.lz4",
metrics: "com.yammer.metrics:metrics-core:$versions.metrics",