You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/28 20:59:22 UTC
[1/3] kafka git commit: KAFKA-5746;
Add new metrics to support health checks (KIP-188)
Repository: kafka
Updated Branches:
refs/heads/trunk dd6347a5d -> 021d8a8e9
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6434d23..6781dc9 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -44,7 +44,7 @@ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANS
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, MemoryRecords, RecordBatch}
+import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, MemoryRecords, RecordBatch, RecordsProcessingStats}
import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse
import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse}
import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _}
@@ -424,7 +424,7 @@ class KafkaApis(val requestChannel: RequestChannel,
s"from client id ${request.header.clientId} with ack=0\n" +
s"Topic and partition to exceptions: $exceptionsSummary"
)
- closeConnection(request)
+ closeConnection(request, new ProduceResponse(mergedResponseStatus.asJava).errorCounts)
} else {
sendNoOpResponseExemptThrottle(request)
}
@@ -444,6 +444,12 @@ class KafkaApis(val requestChannel: RequestChannel,
produceResponseCallback)
}
+ def processingStatsCallback(processingStats: Map[TopicPartition, RecordsProcessingStats]): Unit = {
+ processingStats.foreach { case (tp, info) =>
+ updateRecordsProcessingStats(request, tp, info)
+ }
+ }
+
if (authorizedRequestInfo.isEmpty)
sendResponseCallback(Map.empty)
else {
@@ -456,7 +462,8 @@ class KafkaApis(val requestChannel: RequestChannel,
internalTopicsAllowed = internalTopicsAllowed,
isFromClient = true,
entriesPerPartition = authorizedRequestInfo,
- responseCallback = sendResponseCallback)
+ responseCallback = sendResponseCallback,
+ processingStatsCallback = processingStatsCallback)
// if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
// hence we clear its data here in order to let GC reclaim its memory since it is already appended to log
@@ -511,9 +518,11 @@ class KafkaApis(val requestChannel: RequestChannel,
downConvertMagic.map { magic =>
trace(s"Down converting records from partition $tp to message format version $magic for fetch request from $clientId")
- val converted = data.records.downConvert(magic, fetchRequest.fetchData.get(tp).fetchOffset)
+ val startNanos = time.nanoseconds
+ val converted = data.records.downConvert(magic, fetchRequest.fetchData.get(tp).fetchOffset, time)
+ updateRecordsProcessingStats(request, tp, converted.recordsProcessingStats)
new FetchResponse.PartitionData(data.error, data.highWatermark, FetchResponse.INVALID_LAST_STABLE_OFFSET,
- data.logStartOffset, data.abortedTransactions, converted)
+ data.logStartOffset, data.abortedTransactions, converted.records)
}
}.getOrElse(data)
@@ -2002,6 +2011,25 @@ class KafkaApis(val requestChannel: RequestChannel,
throw new ClusterAuthorizationException(s"Request $request is not authorized.")
}
+ private def updateRecordsProcessingStats(request: RequestChannel.Request, tp: TopicPartition,
+ processingStats: RecordsProcessingStats): Unit = {
+ val conversionCount = processingStats.conversionCount
+ if (conversionCount > 0) {
+ request.header.apiKey match {
+ case ApiKeys.PRODUCE =>
+ brokerTopicStats.topicStats(tp.topic).produceMessageConversionsRate.mark(conversionCount)
+ brokerTopicStats.allTopicsStats.produceMessageConversionsRate.mark(conversionCount)
+ case ApiKeys.FETCH =>
+ brokerTopicStats.topicStats(tp.topic).fetchMessageConversionsRate.mark(conversionCount)
+ brokerTopicStats.allTopicsStats.fetchMessageConversionsRate.mark(conversionCount)
+ case _ =>
+ throw new IllegalStateException("Message conversion info is recorded only for Produce/Fetch requests")
+ }
+ request.messageConversionsTimeNanos = processingStats.conversionTimeNanos
+ }
+ request.temporaryMemoryBytes = processingStats.temporaryMemoryBytes
+ }
+
private def handleError(request: RequestChannel.Request, e: Throwable) {
val mayThrottle = e.isInstanceOf[ClusterAuthorizationException] || !request.header.apiKey.clusterAction
error("Error when handling request %s".format(request.body[AbstractRequest]), e)
@@ -2031,9 +2059,10 @@ class KafkaApis(val requestChannel: RequestChannel,
}
private def sendErrorOrCloseConnection(request: RequestChannel.Request, error: Throwable)(throttleMs: Int): Unit = {
- val response = request.body[AbstractRequest].getErrorResponse(throttleMs, error)
+ val requestBody = request.body[AbstractRequest]
+ val response = requestBody.getErrorResponse(throttleMs, error)
if (response == null)
- closeConnection(request)
+ closeConnection(request, requestBody.errorCounts(error))
else
sendResponse(request, Some(response))
}
@@ -2043,13 +2072,17 @@ class KafkaApis(val requestChannel: RequestChannel,
sendResponse(request, None)
}
- private def closeConnection(request: RequestChannel.Request): Unit = {
+ private def closeConnection(request: RequestChannel.Request, errorCounts: java.util.Map[Errors, Integer]): Unit = {
// This case is used when the request handler has encountered an error, but the client
// does not expect a response (e.g. when produce request has acks set to 0)
+ requestChannel.updateErrorMetrics(request.header.apiKey, errorCounts.asScala)
requestChannel.sendResponse(new RequestChannel.Response(request, None, CloseConnectionAction, None))
}
private def sendResponse(request: RequestChannel.Request, responseOpt: Option[AbstractResponse]): Unit = {
+ // Update error metrics for each error code in the response including Errors.NONE
+ responseOpt.foreach(response => requestChannel.updateErrorMetrics(request.header.apiKey, response.errorCounts.asScala))
+
responseOpt match {
case Some(response) =>
val responseSend = request.context.buildResponse(response)
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index b108bf6..43c81ab 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -25,10 +25,13 @@ import kafka.api.ApiVersion
import kafka.cluster.EndPoint
import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
+import com.yammer.metrics.core.Gauge
import org.I0Itec.zkclient.IZkStateListener
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.zookeeper.Watcher.Event.KeeperState
+import scala.collection.mutable.Set
+
/**
* This class registers the broker in zookeeper to allow
* other brokers and consumers to detect failures. It uses an ephemeral znode with the path:
@@ -71,6 +74,8 @@ class KafkaHealthcheck(brokerId: Int,
interBrokerProtocolVersion)
}
+ def shutdown(): Unit = sessionExpireListener.shutdown()
+
/**
* When we get a SessionExpired event, it means that we have lost all ephemeral nodes and ZKClient has re-established
* a connection for us. We need to re-register this broker in the broker registry. We rely on `handleStateChanged`
@@ -78,6 +83,8 @@ class KafkaHealthcheck(brokerId: Int,
*/
class SessionExpireListener extends IZkStateListener with KafkaMetricsGroup {
+ private val metricNames = Set[String]()
+
private[server] val stateToMeterMap = {
import KeeperState._
val stateToEventTypeMap = Map(
@@ -89,10 +96,20 @@ class KafkaHealthcheck(brokerId: Int,
Expired -> "Expires"
)
stateToEventTypeMap.map { case (state, eventType) =>
- state -> newMeter(s"ZooKeeper${eventType}PerSec", eventType.toLowerCase(Locale.ROOT), TimeUnit.SECONDS)
+ val name = s"ZooKeeper${eventType}PerSec"
+ metricNames += name
+ state -> newMeter(name, eventType.toLowerCase(Locale.ROOT), TimeUnit.SECONDS)
}
}
+ private[server] val sessionStateGauge =
+ newGauge("SessionState", new Gauge[String] {
+ override def value: String =
+ Option(zkUtils.zkConnection.getZookeeperState.toString).getOrElse("DISCONNECTED")
+ })
+
+ metricNames += "SessionState"
+
@throws[Exception]
override def handleStateChanged(state: KeeperState) {
stateToMeterMap.get(state).foreach(_.mark())
@@ -110,6 +127,8 @@ class KafkaHealthcheck(brokerId: Int,
fatal("Could not establish session with zookeeper", error)
}
+ def shutdown(): Unit = metricNames.foreach(removeMetric(_))
+
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index f055762..a498781 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -129,6 +129,8 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup {
val failedFetchRequestRate = newMeter(BrokerTopicStats.FailedFetchRequestsPerSec, "requests", TimeUnit.SECONDS, tags)
val totalProduceRequestRate = newMeter(BrokerTopicStats.TotalProduceRequestsPerSec, "requests", TimeUnit.SECONDS, tags)
val totalFetchRequestRate = newMeter(BrokerTopicStats.TotalFetchRequestsPerSec, "requests", TimeUnit.SECONDS, tags)
+ val fetchMessageConversionsRate = newMeter(BrokerTopicStats.FetchMessageConversionsPerSec, "requests", TimeUnit.SECONDS, tags)
+ val produceMessageConversionsRate = newMeter(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests", TimeUnit.SECONDS, tags)
def close() {
removeMetric(BrokerTopicStats.MessagesInPerSec, tags)
@@ -143,6 +145,8 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup {
removeMetric(BrokerTopicStats.FailedFetchRequestsPerSec, tags)
removeMetric(BrokerTopicStats.TotalProduceRequestsPerSec, tags)
removeMetric(BrokerTopicStats.TotalFetchRequestsPerSec, tags)
+ removeMetric(BrokerTopicStats.FetchMessageConversionsPerSec, tags)
+ removeMetric(BrokerTopicStats.ProduceMessageConversionsPerSec, tags)
}
}
@@ -157,6 +161,8 @@ object BrokerTopicStats {
val FailedFetchRequestsPerSec = "FailedFetchRequestsPerSec"
val TotalProduceRequestsPerSec = "TotalProduceRequestsPerSec"
val TotalFetchRequestsPerSec = "TotalFetchRequestsPerSec"
+ val FetchMessageConversionsPerSec = "FetchMessageConversionsPerSec"
+ val ProduceMessageConversionsPerSec = "ProduceMessageConversionsPerSec"
private val valueFactory = (k: String) => new BrokerTopicMetrics(Some(k))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 88b1d23..f8af7a2 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -293,7 +293,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
isStartingUp.set(false)
- AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)
+ AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString, metrics)
info("started")
}
}
@@ -333,19 +333,21 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
chrootOption.foreach { chroot =>
val zkConnForChrootCreation = config.zkConnect.substring(0, chrootIndex)
- val zkClientForChrootCreation = ZkUtils(zkConnForChrootCreation,
+ val zkClientForChrootCreation = ZkUtils.withMetrics(zkConnForChrootCreation,
sessionTimeout = config.zkSessionTimeoutMs,
connectionTimeout = config.zkConnectionTimeoutMs,
- secureAclsEnabled)
+ secureAclsEnabled,
+ time)
zkClientForChrootCreation.makeSurePersistentPathExists(chroot)
info(s"Created zookeeper path $chroot")
zkClientForChrootCreation.close()
}
- val zkUtils = ZkUtils(config.zkConnect,
+ val zkUtils = ZkUtils.withMetrics(config.zkConnect,
sessionTimeout = config.zkSessionTimeoutMs,
connectionTimeout = config.zkConnectionTimeoutMs,
- secureAclsEnabled)
+ secureAclsEnabled,
+ time)
zkUtils.setupCommonPaths()
zkUtils
}
@@ -512,6 +514,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
CoreUtils.swallow(controlledShutdown())
brokerState.newState(BrokerShuttingDown)
+ if (kafkaHealthcheck != null)
+ CoreUtils.swallow(kafkaHealthcheck.shutdown())
+
if (socketServer != null)
CoreUtils.swallow(socketServer.shutdown())
if (requestHandlerPool != null)
@@ -549,7 +554,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
startupComplete.set(false)
isShuttingDown.set(false)
- CoreUtils.swallow(AppInfoParser.unregisterAppInfo(jmxPrefix, config.brokerId.toString))
+ CoreUtils.swallow(AppInfoParser.unregisterAppInfo(jmxPrefix, config.brokerId.toString, metrics))
shutdownLatch.countDown()
info("shut down completed")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 9cc6317..3acb88b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -434,7 +434,8 @@ class ReplicaManager(val config: KafkaConfig,
isFromClient: Boolean,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
- delayedProduceLock: Option[Object] = None) {
+ delayedProduceLock: Option[Object] = None,
+ processingStatsCallback: Map[TopicPartition, RecordsProcessingStats] => Unit = _ => ()) {
if (isValidRequiredAcks(requiredAcks)) {
val sTime = time.milliseconds
val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
@@ -448,6 +449,8 @@ class ReplicaManager(val config: KafkaConfig,
new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime, result.info.logStartOffset)) // response status
}
+ processingStatsCallback(localProduceResults.mapValues(_.info.recordsProcessingStats))
+
if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
// create delayed produce operation
val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 9582c50..755f500 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -17,7 +17,7 @@
package kafka.utils
-import java.util.concurrent.CountDownLatch
+import java.util.concurrent.{CountDownLatch, TimeUnit}
import kafka.admin._
import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
@@ -25,12 +25,16 @@ import kafka.cluster._
import kafka.common.{KafkaException, NoEpochForPartitionException, TopicAndPartition}
import kafka.consumer.{ConsumerThreadId, TopicCount}
import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch, ReassignedPartitionsContext}
+import kafka.metrics.KafkaMetricsGroup
import kafka.server.ConfigType
import kafka.utils.ZkUtils._
+
+import com.yammer.metrics.core.MetricName
import org.I0Itec.zkclient.exception.{ZkBadVersionException, ZkException, ZkMarshallingError, ZkNoNodeException, ZkNodeExistsException}
import org.I0Itec.zkclient.serialize.ZkSerializer
-import org.I0Itec.zkclient.{ZkClient, ZkConnection, IZkDataListener, IZkChildListener, IZkStateListener}
+import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient, ZkConnection}
import org.apache.kafka.common.config.ConfigException
+import org.apache.kafka.common.utils.Time
import org.apache.zookeeper.AsyncCallback.{DataCallback, StringCallback}
import org.apache.zookeeper.KeeperException.Code
import org.apache.zookeeper.data.{ACL, Stat}
@@ -84,6 +88,12 @@ object ZkUtils {
// sensitive information that should not be world readable to the Seq
val SensitiveZkRootPaths = Seq(ConfigUsersPath)
+ def withMetrics(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int, isZkSecurityEnabled: Boolean,
+ time: Time): ZkUtils = {
+ val (zkClient, zkConnection) = createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)
+ new ZkUtils(new ZooKeeperClientMetrics(zkClient, time), zkConnection, isZkSecurityEnabled)
+ }
+
def apply(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int, isZkSecurityEnabled: Boolean): ZkUtils = {
val (zkClient, zkConnection) = createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)
new ZkUtils(zkClient, zkConnection, isZkSecurityEnabled)
@@ -212,7 +222,38 @@ object ZkUtils {
}
-class ZkUtils(val zkClient: ZkClient,
+class ZooKeeperClientWrapper(val zkClient: ZkClient) {
+ def apply[T](method: ZkClient => T): T = method(zkClient)
+ def close(): Unit = {
+ if(zkClient != null)
+ zkClient.close()
+ }
+}
+
+class ZooKeeperClientMetrics(zkClient: ZkClient, val time: Time)
+ extends ZooKeeperClientWrapper(zkClient) with KafkaMetricsGroup {
+ val latencyMetric = newHistogram("ZooKeeperRequestLatencyMs")
+
+ override protected def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = {
+ explicitMetricName("kafka.server", "ZooKeeperClientMetrics", name, metricTags)
+ }
+
+ override def apply[T](method: ZkClient => T): T = {
+ val startNs = time.nanoseconds
+ val ret =
+ try method(zkClient)
+ finally latencyMetric.update(TimeUnit.NANOSECONDS.toMillis(time.nanoseconds - startNs))
+ ret
+ }
+
+ override def close(): Unit = {
+ if (latencyMetric != null)
+ removeMetric("ZooKeeperRequestLatencyMs")
+ super.close()
+ }
+}
+
+class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
val zkConnection: ZkConnection,
val isSecure: Boolean) extends Logging {
// These are persistent ZK paths that should exist on kafka broker startup.
@@ -228,8 +269,12 @@ class ZkUtils(val zkClient: ZkClient,
ProducerIdBlockPath,
LogDirEventNotificationPath)
+ /** Present for compatibility */
+ def this(zkClient: ZkClient, zkConnection: ZkConnection, isSecure: Boolean) =
+ this(new ZooKeeperClientWrapper(zkClient), zkConnection, isSecure)
+
// Visible for testing
- val zkPath = new ZkPath(zkClient)
+ val zkPath = new ZkPath(zkClientWrap)
import ZkUtils._
@@ -238,6 +283,8 @@ class ZkUtils(val zkClient: ZkClient,
def defaultAcls(path: String): java.util.List[ACL] = ZkUtils.defaultAcls(isSecure, path)
+ def zkClient: ZkClient = zkClientWrap.zkClient
+
def getController(): Int = {
readDataMaybeNull(ControllerPath)._1 match {
case Some(controller) => KafkaController.parseControllerId(controller)
@@ -388,7 +435,7 @@ class ZkUtils(val zkClient: ZkClient,
brokerInfo,
zkConnection.getZookeeper,
isSecure)
- zkCheckedEphemeral.create()
+ zkClientWrap(_ => zkCheckedEphemeral.create())
} catch {
case _: ZkNodeExistsException =>
throw new RuntimeException("A broker is already registered on the path " + brokerIdPath
@@ -429,7 +476,7 @@ class ZkUtils(val zkClient: ZkClient,
acls
}
- if (!zkClient.exists(path))
+ if (!zkClientWrap(zkClient => zkClient.exists(path)))
zkPath.createPersistent(path, createParents = true, acl) //won't throw NoNodeException or NodeExistsException
}
@@ -512,7 +559,7 @@ class ZkUtils(val zkClient: ZkClient,
def updatePersistentPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls) = {
val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
try {
- zkClient.writeData(path, data)
+ zkClientWrap(_.writeData(path, data))
} catch {
case _: ZkNoNodeException =>
createParentPath(path)
@@ -520,7 +567,7 @@ class ZkUtils(val zkClient: ZkClient,
zkPath.createPersistent(path, data, acl)
} catch {
case _: ZkNodeExistsException =>
- zkClient.writeData(path, data)
+ zkClientWrap(_.writeData(path, data))
}
}
}
@@ -536,7 +583,7 @@ class ZkUtils(val zkClient: ZkClient,
def conditionalUpdatePersistentPath(path: String, data: String, expectVersion: Int,
optionalChecker:Option[(ZkUtils, String, String) => (Boolean,Int)] = None): (Boolean, Int) = {
try {
- val stat = zkClient.writeDataReturnStat(path, data, expectVersion)
+ val stat = zkClientWrap(_.writeDataReturnStat(path, data, expectVersion))
debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d"
.format(path, data, expectVersion, stat.getVersion))
(true, stat.getVersion)
@@ -563,7 +610,7 @@ class ZkUtils(val zkClient: ZkClient,
*/
def conditionalUpdatePersistentPathIfExists(path: String, data: String, expectVersion: Int): (Boolean, Int) = {
try {
- val stat = zkClient.writeDataReturnStat(path, data, expectVersion)
+ val stat = zkClientWrap(_.writeDataReturnStat(path, data, expectVersion))
debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d"
.format(path, data, expectVersion, stat.getVersion))
(true, stat.getVersion)
@@ -583,7 +630,7 @@ class ZkUtils(val zkClient: ZkClient,
def updateEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = {
val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
try {
- zkClient.writeData(path, data)
+ zkClientWrap(_.writeData(path, data))
} catch {
case _: ZkNoNodeException =>
createParentPath(path)
@@ -592,7 +639,7 @@ class ZkUtils(val zkClient: ZkClient,
}
def deletePath(path: String): Boolean = {
- zkClient.delete(path)
+ zkClientWrap(_.delete(path))
}
/**
@@ -601,7 +648,7 @@ class ZkUtils(val zkClient: ZkClient,
*/
def conditionalDeletePath(path: String, expectedVersion: Int): Boolean = {
try {
- zkClient.delete(path, expectedVersion)
+ zkClientWrap(_.delete(path, expectedVersion))
true
} catch {
case _: ZkBadVersionException => false
@@ -609,37 +656,38 @@ class ZkUtils(val zkClient: ZkClient,
}
def deletePathRecursive(path: String) {
- zkClient.deleteRecursive(path)
+ zkClientWrap(_.deleteRecursive(path))
}
def subscribeDataChanges(path: String, listener: IZkDataListener): Unit =
- zkClient.subscribeDataChanges(path, listener)
+ zkClientWrap(_.subscribeDataChanges(path, listener))
def unsubscribeDataChanges(path: String, dataListener: IZkDataListener): Unit =
- zkClient.unsubscribeDataChanges(path, dataListener)
+ zkClientWrap(_.unsubscribeDataChanges(path, dataListener))
def subscribeStateChanges(listener: IZkStateListener): Unit =
- zkClient.subscribeStateChanges(listener)
+ zkClientWrap(_.subscribeStateChanges(listener))
def subscribeChildChanges(path: String, listener: IZkChildListener): Option[Seq[String]] =
- Option(zkClient.subscribeChildChanges(path, listener)).map(_.asScala)
+ Option(zkClientWrap(_.subscribeChildChanges(path, listener))).map(_.asScala)
def unsubscribeChildChanges(path: String, childListener: IZkChildListener): Unit =
- zkClient.unsubscribeChildChanges(path, childListener)
+ zkClientWrap(_.unsubscribeChildChanges(path, childListener))
def unsubscribeAll(): Unit =
- zkClient.unsubscribeAll()
+ zkClientWrap(_.unsubscribeAll())
def readData(path: String): (String, Stat) = {
val stat: Stat = new Stat()
- val dataStr: String = zkClient.readData(path, stat)
+ val dataStr: String = zkClientWrap(_.readData[String](path, stat))
(dataStr, stat)
}
def readDataMaybeNull(path: String): (Option[String], Stat) = {
val stat = new Stat()
val dataAndStat = try {
- (Some(zkClient.readData(path, stat)), stat)
+ val dataStr = zkClientWrap(_.readData[String](path, stat))
+ (Some(dataStr), stat)
} catch {
case _: ZkNoNodeException =>
(None, stat)
@@ -650,18 +698,18 @@ class ZkUtils(val zkClient: ZkClient,
def readDataAndVersionMaybeNull(path: String): (Option[String], Int) = {
val stat = new Stat()
try {
- val data: String = zkClient.readData(path, stat)
+ val data = zkClientWrap(_.readData[String](path, stat))
(Option(data), stat.getVersion)
} catch {
case _: ZkNoNodeException => (None, stat.getVersion)
}
}
- def getChildren(path: String): Seq[String] = zkClient.getChildren(path).asScala
+ def getChildren(path: String): Seq[String] = zkClientWrap(_.getChildren(path)).asScala
def getChildrenParentMayNotExist(path: String): Seq[String] = {
try {
- zkClient.getChildren(path).asScala
+ zkClientWrap(_.getChildren(path)).asScala
} catch {
case _: ZkNoNodeException => Nil
}
@@ -671,7 +719,7 @@ class ZkUtils(val zkClient: ZkClient,
* Check if the given path exists
*/
def pathExists(path: String): Boolean = {
- zkClient.exists(path)
+ zkClientWrap(_.exists(path))
}
def isTopicMarkedForDeletion(topic: String): Boolean = {
@@ -789,9 +837,9 @@ class ZkUtils(val zkClient: ZkClient,
def deletePartition(brokerId: Int, topic: String) {
val brokerIdPath = BrokerIdsPath + "/" + brokerId
- zkClient.delete(brokerIdPath)
+ zkClientWrap(_.delete(brokerIdPath))
val brokerPartTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic + "/" + brokerId
- zkClient.delete(brokerPartTopicPath)
+ zkClientWrap(_.delete(brokerPartTopicPath))
}
@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
@@ -851,7 +899,7 @@ class ZkUtils(val zkClient: ZkClient,
*/
def getSequenceId(path: String, acls: java.util.List[ACL] = UseDefaultAcls): Int = {
val acl = if (acls == UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
- def writeToZk: Int = zkClient.writeDataReturnStat(path, "", -1).getVersion
+ def writeToZk: Int = zkClientWrap(_.writeDataReturnStat(path, "", -1)).getVersion
try {
writeToZk
} catch {
@@ -915,9 +963,7 @@ class ZkUtils(val zkClient: ZkClient,
}
def close() {
- if(zkClient != null) {
- zkClient.close()
- }
+ zkClientWrap.close()
}
}
@@ -973,7 +1019,7 @@ class ZKConfig(props: VerifiableProperties) {
val zkSyncTimeMs = props.getInt(ZkSyncTimeMsProp, 2000)
}
-class ZkPath(client: ZkClient) {
+class ZkPath(clientWrap: ZooKeeperClientWrapper) {
@volatile private var isNamespacePresent: Boolean = false
@@ -981,7 +1027,7 @@ class ZkPath(client: ZkClient) {
if (isNamespacePresent)
return
- if (!client.exists("/")) {
+ if (!clientWrap(_.exists("/"))) {
throw new ConfigException("Zookeeper namespace does not exist")
}
isNamespacePresent = true
@@ -993,22 +1039,22 @@ class ZkPath(client: ZkClient) {
def createPersistent(path: String, data: Object, acls: java.util.List[ACL]) {
checkNamespace()
- client.createPersistent(path, data, acls)
+ clientWrap(_.createPersistent(path, data, acls))
}
def createPersistent(path: String, createParents: Boolean, acls: java.util.List[ACL]) {
checkNamespace()
- client.createPersistent(path, createParents, acls)
+ clientWrap(_.createPersistent(path, createParents, acls))
}
def createEphemeral(path: String, data: Object, acls: java.util.List[ACL]) {
checkNamespace()
- client.createEphemeral(path, data, acls)
+ clientWrap(_.createEphemeral(path, data, acls))
}
def createPersistentSequential(path: String, data: Object, acls: java.util.List[ACL]): String = {
checkNamespace()
- client.createPersistentSequential(path, data, acls)
+ clientWrap(_.createPersistentSequential(path, data, acls))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/integration/kafka/api/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
new file mode 100644
index 0000000..f71a36a
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package kafka.api
+
+import java.util.{Locale, Properties}
+
+import kafka.log.LogConfig
+import kafka.network.RequestMetrics
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.{JaasTestUtils, TestUtils}
+
+import com.yammer.metrics.Metrics
+import com.yammer.metrics.core.{Gauge, Histogram, Meter}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.errors.InvalidTopicException
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.junit.{After, Before, Test}
+import org.junit.Assert._
+
+import scala.collection.JavaConverters._
+
+class MetricsTest extends IntegrationTestHarness with SaslSetup {
+
+ override val producerCount = 1
+ override val consumerCount = 1
+ override val serverCount = 1
+
+ override protected def listenerName = new ListenerName("CLIENT")
+ private val kafkaClientSaslMechanism = "PLAIN"
+ private val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
+ private val kafkaServerJaasEntryName =
+ s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}"
+ this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "false")
+ this.serverConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableDoc, "false")
+ override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+ override protected val serverSaslProperties =
+ Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
+ override protected val clientSaslProperties =
+ Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
+
+ @Before
+ override def setUp(): Unit = {
+ startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, kafkaServerJaasEntryName))
+ super.setUp()
+ }
+
+ @After
+ override def tearDown(): Unit = {
+ super.tearDown()
+ closeSasl()
+ }
+
+ /**
+ * Verifies some of the metrics of producer, consumer as well as server.
+ */
+ @Test
+ def testMetrics(): Unit = {
+ val topic = "topicWithOldMessageFormat"
+ val props = new Properties
+ props.setProperty(LogConfig.MessageFormatVersionProp, "0.9.0")
+ TestUtils.createTopic(this.zkUtils, topic, numPartitions = 1, replicationFactor = 1, this.servers, props)
+ val tp = new TopicPartition(topic, 0)
+
+ // Clear static state
+ RequestMetrics.clearErrorMeters()
+
+ // Produce and consume some records
+ val numRecords = 10
+ val recordSize = 1000
+ val producer = producers.head
+ sendRecords(producer, numRecords, recordSize, tp)
+
+ val consumer = this.consumers.head
+ consumer.assign(List(tp).asJava)
+ consumer.seek(tp, 0)
+ TestUtils.consumeRecords(consumer, numRecords)
+
+ verifyKafkaRateMetricsHaveCumulativeCount()
+ verifyClientVersionMetrics(consumer.metrics, "Consumer")
+ verifyClientVersionMetrics(this.producers.head.metrics, "Producer")
+
+ val server = servers.head
+ verifyBrokerMessageConversionMetrics(server, recordSize)
+ verifyBrokerErrorMetrics(servers.head)
+ verifyBrokerZkMetrics(server, topic)
+
+ generateAuthenticationFailure(tp)
+ verifyBrokerAuthenticationMetrics(server)
+ }
+
+ private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], numRecords: Int,
+ recordSize: Int, tp: TopicPartition) = {
+ val bytes = new Array[Byte](recordSize)
+ (0 until numRecords).map { i =>
+ producer.send(new ProducerRecord(tp.topic, tp.partition, i.toLong, s"key $i".getBytes, bytes))
+ }
+ producer.flush()
+ }
+
+ // Create a producer that fails authentication to verify authentication failure metrics
+ private def generateAuthenticationFailure(tp: TopicPartition): Unit = {
+ val producerProps = new Properties()
+ val saslProps = new Properties()
+ // Temporary limit to reduce blocking before KIP-152 client-side changes are merged
+ saslProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000")
+ saslProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000")
+ saslProps.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256")
+ // Use acks=0 to verify error metric when connection is closed without a response
+ saslProps.put(ProducerConfig.ACKS_CONFIG, "0")
+ val producer = TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol,
+ trustStoreFile = trustStoreFile, saslProperties = Some(saslProps), props = Some(producerProps))
+
+ try {
+ producer.send(new ProducerRecord(tp.topic, tp.partition, "key".getBytes, "value".getBytes)).get
+ } catch {
+ case _: Exception => // expected exception
+ } finally {
+ producer.close()
+ }
+ }
+
+ private def verifyKafkaRateMetricsHaveCumulativeCount(): Unit = {
+
+ def exists(name: String, rateMetricName: MetricName, allMetricNames: Set[MetricName]): Boolean = {
+ allMetricNames.contains(new MetricName(name, rateMetricName.group, "", rateMetricName.tags))
+ }
+
+ def verify(rateMetricName: MetricName, allMetricNames: Set[MetricName]): Unit = {
+ val name = rateMetricName.name
+ val totalExists = exists(name.replace("-rate", "-total"), rateMetricName, allMetricNames)
+ val totalTimeExists = exists(name.replace("-rate", "-time"), rateMetricName, allMetricNames)
+ assertTrue(s"No cumulative count/time metric for rate metric $rateMetricName",
+ totalExists || totalTimeExists)
+ }
+
+ val consumer = this.consumers.head
+ val consumerMetricNames = consumer.metrics.keySet.asScala.toSet
+ consumerMetricNames.filter(_.name.endsWith("-rate"))
+ .foreach(verify(_, consumerMetricNames))
+
+ val producer = this.producers.head
+ val producerMetricNames = producer.metrics.keySet.asScala.toSet
+ val producerExclusions = Set("compression-rate") // compression-rate is an Average metric, not Rate
+ producerMetricNames.filter(_.name.endsWith("-rate"))
+ .filterNot(metricName => producerExclusions.contains(metricName.name))
+ .foreach(verify(_, producerMetricNames))
+
+ // Check a couple of metrics of consumer and producer to ensure that values are set
+ verifyKafkaMetricRecorded("records-consumed-rate", consumer.metrics, "Consumer")
+ verifyKafkaMetricRecorded("records-consumed-total", consumer.metrics, "Consumer")
+ verifyKafkaMetricRecorded("record-send-rate", producer.metrics, "Producer")
+ verifyKafkaMetricRecorded("record-send-total", producer.metrics, "Producer")
+ }
+
+ private def verifyClientVersionMetrics(metrics: java.util.Map[MetricName, _ <: Metric], entity: String): Unit = {
+ Seq("commit-id", "version").foreach { name =>
+ verifyKafkaMetric(name, metrics, entity) { matchingMetrics =>
+ assertEquals(1, matchingMetrics.size)
+ val metric = matchingMetrics.head
+ val value = metric.metricValue
+ assertNotNull(s"$entity metric not recorded $name", value)
+ assertNotNull(s"$entity metric $name should be a non-empty String",
+ value.isInstanceOf[String] && !value.asInstanceOf[String].isEmpty)
+ assertTrue("Client-id not specified", metric.metricName.tags.containsKey("client-id"))
+ }
+ }
+ }
+
+ private def verifyBrokerAuthenticationMetrics(server: KafkaServer): Unit = {
+ val metrics = server.metrics.metrics
+ TestUtils.waitUntilTrue(() =>
+ maxKafkaMetricValue("failed-authentication-total", metrics, "Broker", Some("socket-server-metrics")) > 0,
+ "failed-authentication-total not updated")
+ verifyKafkaMetricRecorded("successful-authentication-rate", metrics, "Broker", Some("socket-server-metrics"))
+ verifyKafkaMetricRecorded("successful-authentication-total", metrics, "Broker", Some("socket-server-metrics"))
+ verifyKafkaMetricRecorded("failed-authentication-rate", metrics, "Broker", Some("socket-server-metrics"))
+ verifyKafkaMetricRecorded("failed-authentication-total", metrics, "Broker", Some("socket-server-metrics"))
+ }
+
+ private def verifyBrokerMessageConversionMetrics(server: KafkaServer, recordSize: Int): Unit = {
+ val requestMetricsPrefix = "kafka.network:type=RequestMetrics"
+ val requestBytes = verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Produce")
+ val tempBytes = verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Produce")
+ assertTrue(s"Unexpected temporary memory size requestBytes $requestBytes tempBytes $tempBytes",
+ tempBytes >= recordSize)
+
+ verifyYammerMetricRecorded(s"kafka.server:type=BrokerTopicMetrics,name=ProduceMessageConversionsPerSec")
+ verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=MessageConversionsTimeMs,request=Produce", value => value > 0.0)
+
+ verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Fetch")
+ // Temporary size for fetch should be zero after KAFKA-5968 is fixed
+ verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Fetch", value => value >= 0.0)
+
+ // request size recorded for all request types, check one
+ verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Metadata")
+ }
+
+ private def verifyBrokerZkMetrics(server: KafkaServer, topic: String): Unit = {
+ // Latency is rounded to milliseconds, so we may need to retry some operations to get latency > 0.
+ val (_, recorded) = TestUtils.computeUntilTrue({
+ servers.head.zkUtils.getLeaderAndIsrForPartition(topic, 0)
+ yammerMetricValue("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs").asInstanceOf[Double]
+ })(latency => latency > 0.0)
+ assertTrue("ZooKeeper latency not recorded", recorded)
+
+ assertEquals(s"Unexpected ZK state ${server.zkUtils.zkConnection.getZookeeperState}",
+ "CONNECTED", yammerMetricValue("SessionState"))
+ }
+
+ private def verifyBrokerErrorMetrics(server: KafkaServer): Unit = {
+
+ def errorMetricCount = Metrics.defaultRegistry.allMetrics.keySet.asScala.filter(_.getName == "ErrorsPerSec").size
+
+ val startErrorMetricCount = errorMetricCount
+ val errorMetricPrefix = "kafka.network:type=RequestMetrics,name=ErrorsPerSec"
+ verifyYammerMetricRecorded(s"$errorMetricPrefix,request=Metadata,error=NONE")
+
+ try {
+ consumers.head.partitionsFor("12{}!")
+ } catch {
+ case _: InvalidTopicException => // expected
+ }
+ verifyYammerMetricRecorded(s"$errorMetricPrefix,request=Metadata,error=INVALID_TOPIC_EXCEPTION")
+
+ // Check that error metrics are registered dynamically
+ val currentErrorMetricCount = errorMetricCount
+ assertEquals(startErrorMetricCount + 1, currentErrorMetricCount)
+ assertTrue(s"Too many error metrics $currentErrorMetricCount" , currentErrorMetricCount < 10)
+
+ // Verify that error metric is updated with producer acks=0 when no response is sent
+ sendRecords(producers.head, 1, 100, new TopicPartition("non-existent", 0))
+ verifyYammerMetricRecorded(s"$errorMetricPrefix,request=Metadata,error=LEADER_NOT_AVAILABLE")
+ }
+
+ private def verifyKafkaMetric[T](name: String, metrics: java.util.Map[MetricName, _ <: Metric], entity: String,
+ group: Option[String] = None)(verify: Iterable[Metric] => T) : T = {
+ val matchingMetrics = metrics.asScala.filter {
+ case (metricName, _) => metricName.name == name && group.forall(_ == metricName.group)
+ }
+ assertTrue(s"Metric not found $name", matchingMetrics.size > 0)
+ verify(matchingMetrics.values)
+ }
+
+ private def maxKafkaMetricValue(name: String, metrics: java.util.Map[MetricName, _ <: Metric], entity: String,
+ group: Option[String]): Double = {
+ // Use max value of all matching metrics since Selector metrics are recorded for each Processor
+ verifyKafkaMetric(name, metrics, entity, group) { matchingMetrics =>
+ matchingMetrics.foldLeft(0.0)((max, metric) => Math.max(max, metric.value))
+ }
+ }
+
+ private def verifyKafkaMetricRecorded(name: String, metrics: java.util.Map[MetricName, _ <: Metric], entity: String,
+ group: Option[String] = None): Unit = {
+ val value = maxKafkaMetricValue(name, metrics, entity, group)
+ assertTrue(s"$entity metric not recorded correctly for $name value $value", value > 0.0)
+ }
+
+ private def yammerMetricValue(name: String): Any = {
+ val allMetrics = Metrics.defaultRegistry.allMetrics.asScala
+ val (_, metric) = allMetrics.find { case (n, _) => n.getMBeanName.endsWith(name) }
+ .getOrElse(fail(s"Unable to find broker metric $name: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}"))
+ metric match {
+ case m: Meter => m.count.toDouble
+ case m: Histogram => m.max
+ case m: Gauge[_] => m.value
+ case m => fail(s"Unexpected broker metric of class ${m.getClass}")
+ }
+ }
+
+ private def verifyYammerMetricRecorded(name: String, verify: Double => Boolean = d => d > 0): Double = {
+ val metricValue = yammerMetricValue(name).asInstanceOf[Double]
+ assertTrue(s"Broker metric not recorded correctly for $name value $metricValue", verify(metricValue))
+ metricValue
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 604bbf3..c1b26f1 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -1508,55 +1508,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
servers.foreach(assertNoExemptRequestMetric(_))
}
- // Rate metrics of both Producer and Consumer are verified by this test
- @Test
- def testRateMetricsHaveCumulativeCount() {
- val numRecords = 100
- sendRecords(numRecords)
-
- val consumer = this.consumers.head
- consumer.assign(List(tp).asJava)
- consumer.seek(tp, 0)
- consumeAndVerifyRecords(consumer, numRecords = numRecords, startingOffset = 0)
-
- def exists(name: String, rateMetricName: MetricName, allMetricNames: Set[MetricName]): Boolean = {
- allMetricNames.contains(new MetricName(name, rateMetricName.group, "", rateMetricName.tags))
- }
-
- def verify(rateMetricName: MetricName, allMetricNames: Set[MetricName]): Unit = {
- val name = rateMetricName.name
- val totalExists = exists(name.replace("-rate", "-total"), rateMetricName, allMetricNames)
- val totalTimeExists = exists(name.replace("-rate", "-time"), rateMetricName, allMetricNames)
- assertTrue(s"No cumulative count/time metric for rate metric $rateMetricName",
- totalExists || totalTimeExists)
- }
-
- val consumerMetricNames = consumer.metrics.keySet.asScala.toSet
- consumerMetricNames.filter(_.name.endsWith("-rate"))
- .foreach(verify(_, consumerMetricNames))
-
- val producer = this.producers.head
- val producerMetricNames = producer.metrics.keySet.asScala.toSet
- val producerExclusions = Set("compression-rate") // compression-rate is an Average metric, not Rate
- producerMetricNames.filter(_.name.endsWith("-rate"))
- .filterNot(metricName => producerExclusions.contains(metricName.name))
- .foreach(verify(_, producerMetricNames))
-
- def verifyMetric(name: String, metrics: java.util.Map[MetricName, _ <: Metric], entity: String): Unit = {
- val entry = metrics.asScala.find { case (metricName, _) => metricName.name == name }
- assertTrue(s"$entity metric not defined $name", entry.nonEmpty)
- entry.foreach { case (metricName, metric) =>
- assertTrue(s"$entity metric not recorded $metricName", metric.value > 0.0)
- }
- }
-
- // Check a couple of metrics of consumer and producer to ensure that values are set
- verifyMetric("records-consumed-rate", consumer.metrics, "Consumer")
- verifyMetric("records-consumed-total", consumer.metrics, "Consumer")
- verifyMetric("record-send-rate", producer.metrics, "Producer")
- verifyMetric("record-send-total", producer.metrics, "Producer")
- }
-
def runMultiConsumerSessionTimeoutTest(closeConsumer: Boolean): Unit = {
// use consumers defined in this class plus one additional consumer
// Use topic defined in this class + one additional topic
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 2ffd828..95abb33 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -1351,7 +1351,8 @@ class GroupCoordinatorTest extends JUnitSuite {
isFromClient = EasyMock.eq(false),
EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
EasyMock.capture(capturedArgument),
- EasyMock.anyObject().asInstanceOf[Option[Object]])).andAnswer(new IAnswer[Unit] {
+ EasyMock.anyObject().asInstanceOf[Option[Object]],
+ EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
override def answer = capturedArgument.getValue.apply(
Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)
@@ -1434,7 +1435,8 @@ class GroupCoordinatorTest extends JUnitSuite {
isFromClient = EasyMock.eq(false),
EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
EasyMock.capture(capturedArgument),
- EasyMock.anyObject().asInstanceOf[Option[Object]])
+ EasyMock.anyObject().asInstanceOf[Option[Object]],
+ EasyMock.anyObject())
).andAnswer(new IAnswer[Unit] {
override def answer = capturedArgument.getValue.apply(
Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
@@ -1463,7 +1465,8 @@ class GroupCoordinatorTest extends JUnitSuite {
isFromClient = EasyMock.eq(false),
EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
EasyMock.capture(capturedArgument),
- EasyMock.anyObject().asInstanceOf[Option[Object]])
+ EasyMock.anyObject().asInstanceOf[Option[Object]],
+ EasyMock.anyObject())
).andAnswer(new IAnswer[Unit] {
override def answer = capturedArgument.getValue.apply(
Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId)) ->
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index a2f5f92..0def9ce 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -1306,7 +1306,8 @@ class GroupMetadataManagerTest {
isFromClient = EasyMock.eq(false),
EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
EasyMock.capture(capturedArgument),
- EasyMock.anyObject().asInstanceOf[Option[Object]])
+ EasyMock.anyObject().asInstanceOf[Option[Object]],
+ EasyMock.anyObject())
)
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
capturedArgument
@@ -1320,7 +1321,8 @@ class GroupMetadataManagerTest {
isFromClient = EasyMock.eq(false),
EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
EasyMock.capture(capturedArgument),
- EasyMock.anyObject().asInstanceOf[Option[Object]])
+ EasyMock.anyObject().asInstanceOf[Option[Object]],
+ EasyMock.anyObject())
).andAnswer(new IAnswer[Unit] {
override def answer = capturedArgument.getValue.apply(
Map(groupTopicPartition ->
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 49c8e6a..ed1636c 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -498,7 +498,8 @@ class TransactionStateManagerTest {
EasyMock.eq(false),
EasyMock.eq(recordsByPartition),
EasyMock.capture(capturedArgument),
- EasyMock.eq(None)
+ EasyMock.eq(None),
+ EasyMock.anyObject()
)).andAnswer(new IAnswer[Unit] {
override def answer(): Unit = {
capturedArgument.getValue.apply(
@@ -598,6 +599,7 @@ class TransactionStateManagerTest {
isFromClient = EasyMock.eq(false),
EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
EasyMock.capture(capturedArgument),
+ EasyMock.anyObject(),
EasyMock.anyObject())
).andAnswer(new IAnswer[Unit] {
override def answer(): Unit = capturedArgument.getValue.apply(
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 96f7bfc..b64371e 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -22,6 +22,7 @@ import kafka.common.LongRef
import kafka.message.{CompressionCodec, DefaultCompressionCodec, GZIPCompressionCodec, NoCompressionCodec, SnappyCompressionCodec}
import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.record._
+import org.apache.kafka.common.utils.Time
import org.apache.kafka.test.TestUtils
import org.junit.Assert._
import org.junit.Test
@@ -30,6 +31,8 @@ import scala.collection.JavaConverters._
class LogValidatorTest {
+ val time = Time.SYSTEM
+
@Test
def testLogAppendTimeNonCompressedV1() {
checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1)
@@ -41,6 +44,7 @@ class LogValidatorTest {
val records = createRecords(magicValue = magic, timestamp = 1234L, codec = CompressionType.NONE)
val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(0),
+ time= time,
now = now,
sourceCodec = NoCompressionCodec,
targetCodec = NoCompressionCodec,
@@ -56,6 +60,8 @@ class LogValidatorTest {
assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp)
assertEquals(s"The offset of max timestamp should be 0", 0, validatedResults.shallowOffsetOfMaxTimestamp)
assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
+
+ verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 0, records, compressed = false)
}
def testLogAppendTimeNonCompressedV2() {
@@ -74,6 +80,7 @@ class LogValidatorTest {
val validatedResults = LogValidator.validateMessagesAndAssignOffsets(
records,
offsetCounter = new LongRef(0),
+ time = time,
now = now,
sourceCodec = DefaultCompressionCodec,
targetCodec = DefaultCompressionCodec,
@@ -92,6 +99,9 @@ class LogValidatorTest {
assertEquals(s"The offset of max timestamp should be ${records.records.asScala.size - 1}",
records.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
assertTrue("Message size may have been changed", validatedResults.messageSizeMaybeChanged)
+
+ val stats = validatedResults.recordsProcessingStats
+ verifyRecordsProcessingStats(stats, 3, records, compressed = true)
}
@Test
@@ -111,6 +121,7 @@ class LogValidatorTest {
val validatedResults = LogValidator.validateMessagesAndAssignOffsets(
records,
offsetCounter = new LongRef(0),
+ time = time,
now = now,
sourceCodec = DefaultCompressionCodec,
targetCodec = DefaultCompressionCodec,
@@ -130,6 +141,8 @@ class LogValidatorTest {
assertEquals(s"The offset of max timestamp should be ${records.records.asScala.size - 1}",
records.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
+
+ verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 0, records, compressed = true)
}
@Test
@@ -161,6 +174,7 @@ class LogValidatorTest {
val validatingResults = LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(0),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = NoCompressionCodec,
targetCodec = NoCompressionCodec,
@@ -192,6 +206,8 @@ class LogValidatorTest {
assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatingResults.maxTimestamp)
assertEquals(s"Offset of max timestamp should be 1", 1, validatingResults.shallowOffsetOfMaxTimestamp)
assertFalse("Message size should not have been changed", validatingResults.messageSizeMaybeChanged)
+
+ verifyRecordsProcessingStats(validatingResults.recordsProcessingStats, 0, records, compressed = false)
}
@Test
@@ -223,6 +239,7 @@ class LogValidatorTest {
val validatingResults = LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(0),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = NoCompressionCodec,
targetCodec = GZIPCompressionCodec,
@@ -253,6 +270,8 @@ class LogValidatorTest {
assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatingResults.maxTimestamp)
assertEquals("Offset of max timestamp should be 2", 2, validatingResults.shallowOffsetOfMaxTimestamp)
assertTrue("Message size should have been changed", validatingResults.messageSizeMaybeChanged)
+
+ verifyRecordsProcessingStats(validatingResults.recordsProcessingStats, 3, records, compressed = true)
}
@Test
@@ -269,6 +288,7 @@ class LogValidatorTest {
val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(0),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = DefaultCompressionCodec,
targetCodec = DefaultCompressionCodec,
@@ -293,6 +313,8 @@ class LogValidatorTest {
assertEquals(s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}",
validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
assertTrue("Message size should have been changed", validatedResults.messageSizeMaybeChanged)
+
+ verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 3, records, compressed = true)
}
@Test
@@ -306,6 +328,7 @@ class LogValidatorTest {
val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, codec = CompressionType.GZIP, timestamp = timestamp)
val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(0),
+ time = time,
now = timestamp,
sourceCodec = DefaultCompressionCodec,
targetCodec = DefaultCompressionCodec,
@@ -330,6 +353,8 @@ class LogValidatorTest {
assertEquals(s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}",
validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
assertTrue("Message size should have been changed", validatedResults.messageSizeMaybeChanged)
+
+ verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 3, records, compressed = true)
}
@Test
@@ -356,6 +381,7 @@ class LogValidatorTest {
val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(0),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = DefaultCompressionCodec,
targetCodec = DefaultCompressionCodec,
@@ -387,6 +413,8 @@ class LogValidatorTest {
assertEquals(s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}",
validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
+
+ verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 0, records, compressed = true)
}
@Test
@@ -402,6 +430,7 @@ class LogValidatorTest {
LogValidator.validateMessagesAndAssignOffsets(
records,
offsetCounter = new LongRef(0),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = NoCompressionCodec,
targetCodec = NoCompressionCodec,
@@ -421,6 +450,7 @@ class LogValidatorTest {
LogValidator.validateMessagesAndAssignOffsets(
records,
offsetCounter = new LongRef(0),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = NoCompressionCodec,
targetCodec = NoCompressionCodec,
@@ -440,6 +470,7 @@ class LogValidatorTest {
LogValidator.validateMessagesAndAssignOffsets(
records,
offsetCounter = new LongRef(0),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = DefaultCompressionCodec,
targetCodec = DefaultCompressionCodec,
@@ -459,6 +490,7 @@ class LogValidatorTest {
LogValidator.validateMessagesAndAssignOffsets(
records,
offsetCounter = new LongRef(0),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = DefaultCompressionCodec,
targetCodec = DefaultCompressionCodec,
@@ -477,6 +509,7 @@ class LogValidatorTest {
checkOffsets(records, 0)
checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = NoCompressionCodec,
targetCodec = NoCompressionCodec,
@@ -495,6 +528,7 @@ class LogValidatorTest {
checkOffsets(records, 0)
checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = DefaultCompressionCodec,
targetCodec = DefaultCompressionCodec,
@@ -514,6 +548,7 @@ class LogValidatorTest {
checkOffsets(records, 0)
val messageWithOffset = LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = NoCompressionCodec,
targetCodec = NoCompressionCodec,
@@ -534,6 +569,7 @@ class LogValidatorTest {
checkOffsets(records, 0)
val messageWithOffset = LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = NoCompressionCodec,
targetCodec = NoCompressionCodec,
@@ -555,6 +591,7 @@ class LogValidatorTest {
val compressedMessagesWithOffset = LogValidator.validateMessagesAndAssignOffsets(
records,
offsetCounter = new LongRef(offset),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = DefaultCompressionCodec,
targetCodec = DefaultCompressionCodec,
@@ -576,6 +613,7 @@ class LogValidatorTest {
val compressedMessagesWithOffset = LogValidator.validateMessagesAndAssignOffsets(
records,
offsetCounter = new LongRef(offset),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = DefaultCompressionCodec,
targetCodec = DefaultCompressionCodec,
@@ -595,6 +633,7 @@ class LogValidatorTest {
val offset = 1234567
checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = NoCompressionCodec,
targetCodec = NoCompressionCodec,
@@ -613,6 +652,7 @@ class LogValidatorTest {
val offset = 1234567
checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = NoCompressionCodec,
targetCodec = NoCompressionCodec,
@@ -631,6 +671,7 @@ class LogValidatorTest {
checkOffsets(records, 0)
checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = DefaultCompressionCodec,
targetCodec = DefaultCompressionCodec,
@@ -649,6 +690,7 @@ class LogValidatorTest {
checkOffsets(records, 0)
checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = DefaultCompressionCodec,
targetCodec = DefaultCompressionCodec,
@@ -667,6 +709,7 @@ class LogValidatorTest {
val records = MemoryRecords.withEndTransactionMarker(23423L, 5, endTxnMarker)
LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = NoCompressionCodec,
targetCodec = NoCompressionCodec,
@@ -685,6 +728,7 @@ class LogValidatorTest {
val records = MemoryRecords.withEndTransactionMarker(23423L, 5, endTxnMarker)
val result = LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = NoCompressionCodec,
targetCodec = SnappyCompressionCodec,
@@ -708,6 +752,7 @@ class LogValidatorTest {
checkOffsets(records, 0)
checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = NoCompressionCodec,
targetCodec = NoCompressionCodec,
@@ -727,6 +772,7 @@ class LogValidatorTest {
checkOffsets(records, 0)
checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = DefaultCompressionCodec,
targetCodec = DefaultCompressionCodec,
@@ -745,6 +791,7 @@ class LogValidatorTest {
val offset = 1234567
checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = NoCompressionCodec,
targetCodec = NoCompressionCodec,
@@ -763,6 +810,7 @@ class LogValidatorTest {
checkOffsets(records, 0)
checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = DefaultCompressionCodec,
targetCodec = DefaultCompressionCodec,
@@ -782,6 +830,7 @@ class LogValidatorTest {
checkOffsets(records, 0)
checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = NoCompressionCodec,
targetCodec = NoCompressionCodec,
@@ -801,6 +850,7 @@ class LogValidatorTest {
checkOffsets(records, 0)
checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = DefaultCompressionCodec,
targetCodec = DefaultCompressionCodec,
@@ -822,6 +872,7 @@ class LogValidatorTest {
new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes), new SimpleRecord("beautiful".getBytes))
checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = DefaultCompressionCodec,
targetCodec = DefaultCompressionCodec,
@@ -843,6 +894,7 @@ class LogValidatorTest {
new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes), new SimpleRecord("beautiful".getBytes))
checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = DefaultCompressionCodec,
targetCodec = DefaultCompressionCodec,
@@ -862,6 +914,7 @@ class LogValidatorTest {
checkOffsets(records, 0)
checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = NoCompressionCodec,
targetCodec = NoCompressionCodec,
@@ -881,6 +934,7 @@ class LogValidatorTest {
checkOffsets(records, 0)
checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = DefaultCompressionCodec,
targetCodec = DefaultCompressionCodec,
@@ -898,6 +952,7 @@ class LogValidatorTest {
val records = recordsWithInvalidInnerMagic(offset)
LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = SnappyCompressionCodec,
targetCodec = SnappyCompressionCodec,
@@ -936,6 +991,7 @@ class LogValidatorTest {
val records = MemoryRecords.readableRecords(buffer)
LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
+ time = time,
now = System.currentTimeMillis(),
sourceCodec = sourceCodec,
targetCodec = targetCodec,
@@ -1010,4 +1066,19 @@ class LogValidatorTest {
}
}
+ def verifyRecordsProcessingStats(stats: RecordsProcessingStats, convertedCount: Int,
+ records: MemoryRecords, compressed: Boolean): Unit = {
+ assertNotNull("Records processing info is null", stats)
+ assertEquals(convertedCount, stats.conversionCount)
+ if (stats.conversionCount > 0)
+ assertTrue(s"Conversion time not recorded $stats", stats.conversionTimeNanos > 0)
+ val originalSize = records.sizeInBytes
+ val tempBytes = stats.temporaryMemoryBytes
+ if (convertedCount > 0)
+ assertTrue(s"Temp bytes too small, orig=$originalSize actual=$tempBytes", tempBytes > originalSize)
+ else if (compressed)
+ assertTrue("Temp bytes not updated", tempBytes > 0)
+ else
+ assertEquals(0, tempBytes)
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 8b611f2..077950d 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -519,7 +519,7 @@ class SocketServerTest extends JUnitSuite {
val channel = overrideServer.requestChannel
val request = receiveRequest(channel)
- val requestMetrics = RequestMetrics.metricsMap(request.header.apiKey.name)
+ val requestMetrics = RequestMetrics(request.header.apiKey.name)
def totalTimeHistCount(): Long = requestMetrics.totalTimeHist.count
val expectedTotalTimeCount = totalTimeHistCount() + 1
@@ -561,7 +561,7 @@ class SocketServerTest extends JUnitSuite {
TestUtils.waitUntilTrue(() => overrideServer.processor(request.processor).channel(request.context.connectionId).isEmpty,
s"Idle connection `${request.context.connectionId}` was not closed by selector")
- val requestMetrics = RequestMetrics.metricsMap(request.header.apiKey.name)
+ val requestMetrics = RequestMetrics(request.header.apiKey.name)
def totalTimeHistCount(): Long = requestMetrics.totalTimeHist.count
val expectedTotalTimeCount = totalTimeHistCount() + 1
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index deea586..fde2ae1 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -179,6 +179,7 @@ class KafkaApisTest {
EasyMock.eq(false),
EasyMock.anyObject(),
EasyMock.capture(responseCallback),
+ EasyMock.anyObject(),
EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
override def answer(): Unit = {
responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE)))
@@ -217,6 +218,7 @@ class KafkaApisTest {
EasyMock.eq(false),
EasyMock.anyObject(),
EasyMock.capture(responseCallback),
+ EasyMock.anyObject(),
EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
override def answer(): Unit = {
responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE)))
@@ -247,6 +249,7 @@ class KafkaApisTest {
EasyMock.eq(false),
EasyMock.anyObject(),
EasyMock.anyObject(),
+ EasyMock.anyObject(),
EasyMock.anyObject()))
EasyMock.replay(replicaManager)
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 344ef2e..1806cb0 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -68,6 +68,12 @@
<li>The Java clients and tools now accept any string as a client-id.</li>
<li>The deprecated tool <code>kafka-consumer-offset-checker.sh</code> has been removed. Use <code>kafka-consumer-groups.sh</code> to get consumer group details.</li>
<li>SimpleAclAuthorizer now logs access denials to the authorizer log by default.</li>
+ <li>The <code>app-info</code> mbean registered with JMX to provide version and commit id will be deprecated and replaced with
+ metrics providing these attributes.</li>
+ <li>Kafka metrics may now contain non-numeric values. <code>org.apache.kafka.common.Metric#value()</code> has been deprecated and
+ may throw an <code>IllegalStateException</code> when iterating over metrics of <code>KafkaProducer/KafkaConsumer/KafkaAdminClient</code>.
+ <code>org.apache.kafka.common.Metric#metricValue()</code> can be used to safely iterate over any metric value.</code>
+
</ul>
<h5><a id="upgrade_100_new_protocols" href="#upgrade_100_new_protocols">New Protocol Versions</a></h5>
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
index e02bbb0..0e5d130 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
@@ -32,7 +32,7 @@ public class ToolsUtils {
public static void printMetrics(Map<MetricName, ? extends Metric> metrics) {
if (metrics != null && !metrics.isEmpty()) {
int maxLengthOfDisplayName = 0;
- TreeMap<String, Double> sortedMetrics = new TreeMap<>(new Comparator<String>() {
+ TreeMap<String, Object> sortedMetrics = new TreeMap<>(new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
return o1.compareTo(o2);
@@ -42,12 +42,18 @@ public class ToolsUtils {
MetricName mName = metric.metricName();
String mergedName = mName.group() + ":" + mName.name() + ":" + mName.tags();
maxLengthOfDisplayName = maxLengthOfDisplayName < mergedName.length() ? mergedName.length() : maxLengthOfDisplayName;
- sortedMetrics.put(mergedName, metric.value());
+ sortedMetrics.put(mergedName, metric.metricValue());
}
- String outputFormat = "%-" + maxLengthOfDisplayName + "s : %.3f";
+ String doubleOutputFormat = "%-" + maxLengthOfDisplayName + "s : %.3f";
+ String defaultOutputFormat = "%-" + maxLengthOfDisplayName + "s : %s";
System.out.println(String.format("\n%-" + maxLengthOfDisplayName + "s %s", "Metric Name", "Value"));
- for (Map.Entry<String, Double> entry : sortedMetrics.entrySet()) {
+ for (Map.Entry<String, Object> entry : sortedMetrics.entrySet()) {
+ String outputFormat;
+ if (entry.getValue() instanceof Double)
+ outputFormat = doubleOutputFormat;
+ else
+ outputFormat = defaultOutputFormat;
System.out.println(String.format(outputFormat, entry.getKey(), entry.getValue()));
}
}
[3/3] kafka git commit: KAFKA-5746;
Add new metrics to support health checks (KIP-188)
Posted by ij...@apache.org.
KAFKA-5746; Add new metrics to support health checks (KIP-188)
Adds new metrics to support health checks:
1. Error rates for each request type, per-error code
2. Request size and temporary memory size
3. Message conversion rate and time
4. Successful and failed authentication rates
5. ZooKeeper latency and status
6. Client version
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #3705 from rajinisivaram/KAFKA-5746-new-metrics
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/021d8a8e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/021d8a8e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/021d8a8e
Branch: refs/heads/trunk
Commit: 021d8a8e9698dce454e0e801092460b98f0a8a4d
Parents: dd6347a
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Thu Sep 28 21:58:47 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Sep 28 21:58:59 2017 +0100
----------------------------------------------------------------------
checkstyle/import-control.xml | 4 +
checkstyle/suppressions.xml | 2 +-
.../kafka/clients/admin/KafkaAdminClient.java | 4 +-
.../kafka/clients/consumer/KafkaConsumer.java | 4 +-
.../kafka/clients/producer/KafkaProducer.java | 4 +-
.../clients/producer/internals/Sender.java | 2 +-
.../java/org/apache/kafka/common/Metric.java | 12 +-
.../org/apache/kafka/common/metrics/Gauge.java | 31 ++
.../kafka/common/metrics/JmxReporter.java | 2 +-
.../kafka/common/metrics/KafkaMetric.java | 40 ++-
.../apache/kafka/common/metrics/Measurable.java | 4 +-
.../common/metrics/MetricValueProvider.java | 28 ++
.../apache/kafka/common/metrics/Metrics.java | 36 ++-
.../org/apache/kafka/common/metrics/Sensor.java | 2 +-
.../apache/kafka/common/network/Selector.java | 33 ++-
.../kafka/common/record/AbstractRecords.java | 46 ++-
.../kafka/common/record/ConvertedRecords.java | 36 +++
.../apache/kafka/common/record/FileRecords.java | 17 +-
.../kafka/common/record/MemoryRecords.java | 5 +-
.../common/record/MemoryRecordsBuilder.java | 40 +--
.../org/apache/kafka/common/record/Records.java | 9 +-
.../common/record/RecordsProcessingStats.java | 60 ++++
.../kafka/common/requests/AbstractRequest.java | 14 +
.../kafka/common/requests/AbstractResponse.java | 29 ++
.../requests/AddOffsetsToTxnResponse.java | 6 +
.../requests/AddPartitionsToTxnResponse.java | 5 +
.../common/requests/AlterConfigsResponse.java | 6 +
.../requests/AlterReplicaDirResponse.java | 5 +
.../common/requests/ApiVersionsResponse.java | 5 +
.../requests/ControlledShutdownResponse.java | 6 +
.../common/requests/CreateAclsResponse.java | 11 +
.../requests/CreatePartitionsRequest.java | 2 +-
.../requests/CreatePartitionsResponse.java | 6 +
.../common/requests/CreateTopicsResponse.java | 6 +
.../common/requests/DeleteAclsResponse.java | 11 +
.../common/requests/DeleteRecordsResponse.java | 8 +
.../common/requests/DeleteTopicsResponse.java | 5 +
.../common/requests/DescribeAclsResponse.java | 6 +
.../requests/DescribeConfigsResponse.java | 9 +
.../common/requests/DescribeGroupsResponse.java | 7 +
.../requests/DescribeLogDirsResponse.java | 8 +
.../kafka/common/requests/EndTxnResponse.java | 6 +
.../kafka/common/requests/FetchResponse.java | 9 +
.../requests/FindCoordinatorResponse.java | 6 +
.../common/requests/HeartbeatResponse.java | 6 +
.../common/requests/InitProducerIdResponse.java | 6 +
.../common/requests/JoinGroupResponse.java | 5 +
.../common/requests/LeaderAndIsrResponse.java | 5 +
.../common/requests/LeaveGroupResponse.java | 6 +
.../common/requests/ListGroupsResponse.java | 6 +
.../common/requests/ListOffsetResponse.java | 8 +
.../kafka/common/requests/MetadataResponse.java | 8 +
.../common/requests/OffsetCommitResponse.java | 5 +
.../common/requests/OffsetFetchResponse.java | 5 +
.../requests/OffsetsForLeaderEpochResponse.java | 8 +
.../kafka/common/requests/ProduceRequest.java | 7 +
.../kafka/common/requests/ProduceResponse.java | 8 +
.../requests/SaslAuthenticateResponse.java | 6 +
.../common/requests/SaslHandshakeResponse.java | 6 +
.../common/requests/StopReplicaResponse.java | 5 +
.../common/requests/SyncGroupResponse.java | 6 +
.../requests/TxnOffsetCommitResponse.java | 5 +
.../common/requests/UpdateMetadataResponse.java | 6 +
.../requests/WriteTxnMarkersResponse.java | 10 +
.../authenticator/SaslServerAuthenticator.java | 13 +-
.../kafka/common/utils/AppInfoParser.java | 43 ++-
.../clients/consumer/internals/FetcherTest.java | 1 +
.../clients/producer/internals/SenderTest.java | 3 +
.../kafka/common/metrics/MetricsTest.java | 1 +
.../common/metrics/stats/FrequenciesTest.java | 3 +-
.../kafka/common/network/NioEchoServer.java | 51 +++-
.../kafka/common/network/SelectorTest.java | 2 +-
.../common/network/SslTransportLayerTest.java | 7 +
.../kafka/common/record/FileRecordsTest.java | 12 +-
.../common/record/MemoryRecordsBuilderTest.java | 43 ++-
.../authenticator/SaslAuthenticatorTest.java | 11 +
.../SaslServerAuthenticatorTest.java | 5 +-
.../kafka/connect/runtime/ConnectMetrics.java | 4 +-
.../runtime/distributed/WorkerGroupMember.java | 4 +-
.../main/scala/kafka/admin/ConfigCommand.scala | 2 +-
.../kafka/controller/KafkaController.scala | 4 +-
core/src/main/scala/kafka/log/Log.scala | 12 +-
.../src/main/scala/kafka/log/LogValidator.scala | 52 +++-
.../scala/kafka/metrics/KafkaMetricsGroup.scala | 15 +-
.../scala/kafka/network/RequestChannel.scala | 97 ++++++-
.../main/scala/kafka/server/AdminManager.scala | 2 +-
.../scala/kafka/server/ClientQuotaManager.scala | 2 -
.../src/main/scala/kafka/server/KafkaApis.scala | 49 +++-
.../scala/kafka/server/KafkaHealthcheck.scala | 21 +-
.../kafka/server/KafkaRequestHandler.scala | 6 +
.../main/scala/kafka/server/KafkaServer.scala | 17 +-
.../scala/kafka/server/ReplicaManager.scala | 5 +-
core/src/main/scala/kafka/utils/ZkUtils.scala | 122 +++++---
.../integration/kafka/api/MetricsTest.scala | 288 +++++++++++++++++++
.../kafka/api/PlaintextConsumerTest.scala | 49 ----
.../group/GroupCoordinatorTest.scala | 9 +-
.../group/GroupMetadataManagerTest.scala | 6 +-
.../TransactionStateManagerTest.scala | 4 +-
.../scala/unit/kafka/log/LogValidatorTest.scala | 71 +++++
.../unit/kafka/network/SocketServerTest.scala | 4 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 3 +
docs/upgrade.html | 6 +
.../java/org/apache/kafka/tools/ToolsUtils.java | 14 +-
103 files changed, 1544 insertions(+), 243 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index f4d9655..ddb13bc 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -140,6 +140,10 @@
<allow class="org.apache.kafka.common.errors.SerializationException" />
<allow class="org.apache.kafka.common.header.Headers" />
</subpackage>
+
+ <subpackage name="utils">
+ <allow pkg="org.apache.kafka.common.metrics" />
+ </subpackage>
</subpackage>
<subpackage name="clients">
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 734b07e..b747e4c 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -60,7 +60,7 @@
files="AbstractRequest.java"/>
<suppress checks="NPathComplexity"
- files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|SslTransportLayer|Sender|Serdes|Agent).java"/>
+ files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|Agent).java"/>
<!-- clients tests -->
<suppress checks="ClassDataAbstractionCoupling"
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 88339bf..5f37b8e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -377,7 +377,7 @@ public class KafkaAdminClient extends AdminClient {
new TimeoutProcessorFactory() : timeoutProcessorFactory;
this.maxRetries = config.getInt(AdminClientConfig.RETRIES_CONFIG);
config.logUnused();
- AppInfoParser.registerAppInfo(JMX_PREFIX, sanitizedClientId);
+ AppInfoParser.registerAppInfo(JMX_PREFIX, sanitizedClientId, metrics);
log.debug("Kafka admin client initialized");
thread.start();
}
@@ -418,7 +418,7 @@ public class KafkaAdminClient extends AdminClient {
// Wait for the thread to be joined.
thread.join();
- AppInfoParser.unregisterAppInfo(JMX_PREFIX, Sanitizer.sanitize(clientId));
+ AppInfoParser.unregisterAppInfo(JMX_PREFIX, Sanitizer.sanitize(clientId), metrics);
log.debug("Kafka admin client closed.");
} catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index f4af39c..d6764ca 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -771,7 +771,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
isolationLevel);
config.logUnused();
- AppInfoParser.registerAppInfo(JMX_PREFIX, sanitizedClientId);
+ AppInfoParser.registerAppInfo(JMX_PREFIX, sanitizedClientId, metrics);
log.debug("Kafka consumer initialized");
} catch (Throwable t) {
@@ -1726,7 +1726,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
ClientUtils.closeQuietly(client, "consumer network client", firstException);
ClientUtils.closeQuietly(keyDeserializer, "consumer key deserializer", firstException);
ClientUtils.closeQuietly(valueDeserializer, "consumer value deserializer", firstException);
- AppInfoParser.unregisterAppInfo(JMX_PREFIX, Sanitizer.sanitize(clientId));
+ AppInfoParser.unregisterAppInfo(JMX_PREFIX, Sanitizer.sanitize(clientId), metrics);
log.debug("Kafka consumer has been closed");
Throwable exception = firstException.get();
if (exception != null && !swallowException) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index d40a16c..a202217 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -427,7 +427,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
this.ioThread.start();
this.errors = this.metrics.sensor("errors");
config.logUnused();
- AppInfoParser.registerAppInfo(JMX_PREFIX, sanitizedClientId);
+ AppInfoParser.registerAppInfo(JMX_PREFIX, sanitizedClientId, metrics);
log.debug("Kafka producer started");
} catch (Throwable t) {
// call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
@@ -1075,7 +1075,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
ClientUtils.closeQuietly(keySerializer, "producer keySerializer", firstException);
ClientUtils.closeQuietly(valueSerializer, "producer valueSerializer", firstException);
ClientUtils.closeQuietly(partitioner, "producer partitioner", firstException);
- AppInfoParser.unregisterAppInfo(JMX_PREFIX, Sanitizer.sanitize(clientId));
+ AppInfoParser.unregisterAppInfo(JMX_PREFIX, Sanitizer.sanitize(clientId), metrics);
log.debug("Kafka producer has been closed");
if (firstException.get() != null && !swallowException)
throw new KafkaException("Failed to close kafka producer", firstException.get());
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 6d9021c..a15a250 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -675,7 +675,7 @@ public class Sender implements Runnable {
// not all support the same message format version. For example, if a partition migrates from a broker
// which is supporting the new magic version to one which doesn't, then we will need to convert.
if (!records.hasMatchingMagic(minUsedMagic))
- records = batch.records().downConvert(minUsedMagic, 0);
+ records = batch.records().downConvert(minUsedMagic, 0, time).records();
produceRecordsByPartition.put(tp, records);
recordsByPartition.put(tp, batch);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/Metric.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Metric.java b/clients/src/main/java/org/apache/kafka/common/Metric.java
index 908c5c7..1f8d618 100644
--- a/clients/src/main/java/org/apache/kafka/common/Metric.java
+++ b/clients/src/main/java/org/apache/kafka/common/Metric.java
@@ -17,7 +17,7 @@
package org.apache.kafka.common;
/**
- * A numerical metric tracked for monitoring purposes
+ * A metric tracked for monitoring purposes.
*/
public interface Metric {
@@ -27,8 +27,16 @@ public interface Metric {
public MetricName metricName();
/**
- * The value of the metric
+ * The value of the metric as double if the metric is measurable
+ * @throws IllegalStateException if this metric does not have a measurable double value
+ * @deprecated As of 1.0.0, use {@link #metricValue()} instead. This will be removed in a future major release.
*/
+ @Deprecated
public double value();
+ /**
+ * The value of the metric, which may be measurable or a non-measurable gauge
+ */
+ public Object metricValue();
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/metrics/Gauge.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Gauge.java b/clients/src/main/java/org/apache/kafka/common/metrics/Gauge.java
new file mode 100644
index 0000000..647942b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Gauge.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+/**
+ * A gauge metric is an instantaneous reading of a particular value.
+ */
+public interface Gauge<T> extends MetricValueProvider<T> {
+
+ /**
+ * Returns the current value associated with this gauge.
+ * @param config The configuration for this metric
+ * @param now The POSIX time in milliseconds the measurement is being taken
+ */
+ T value(MetricConfig config, long now);
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
index de02a23..294e1d8 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
@@ -184,7 +184,7 @@ public class JmxReporter implements MetricsReporter {
@Override
public Object getAttribute(String name) throws AttributeNotFoundException, MBeanException, ReflectionException {
if (this.metrics.containsKey(name))
- return this.metrics.get(name).value();
+ return this.metrics.get(name).metricValue();
else
throw new AttributeNotFoundException("Could not find attribute " + name);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
index ef53b89..0cdd844 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
@@ -25,13 +25,16 @@ public final class KafkaMetric implements Metric {
private MetricName metricName;
private final Object lock;
private final Time time;
- private final Measurable measurable;
+ private final MetricValueProvider<?> metricValueProvider;
private MetricConfig config;
- KafkaMetric(Object lock, MetricName metricName, Measurable measurable, MetricConfig config, Time time) {
+ KafkaMetric(Object lock, MetricName metricName, MetricValueProvider<?> valueProvider,
+ MetricConfig config, Time time) {
this.metricName = metricName;
this.lock = lock;
- this.measurable = measurable;
+ if (!(valueProvider instanceof Measurable) && !(valueProvider instanceof Gauge))
+ throw new IllegalArgumentException("Unsupported metric value provider of class " + valueProvider.getClass());
+ this.metricValueProvider = valueProvider;
this.config = config;
this.time = time;
}
@@ -45,19 +48,42 @@ public final class KafkaMetric implements Metric {
return this.metricName;
}
+ /**
+ * See {@link Metric#value()} for the details on why this is deprecated.
+ */
@Override
+ @Deprecated
public double value() {
synchronized (this.lock) {
- return value(time.milliseconds());
+ return measurableValue(time.milliseconds());
+ }
+ }
+
+ @Override
+ public Object metricValue() {
+ long now = time.milliseconds();
+ synchronized (this.lock) {
+ if (this.metricValueProvider instanceof Measurable)
+ return ((Measurable) metricValueProvider).measure(config, now);
+ else if (this.metricValueProvider instanceof Gauge)
+ return ((Gauge<?>) metricValueProvider).value(config, now);
+ else
+ throw new IllegalStateException("Not a valid metric: " + this.metricValueProvider.getClass());
}
}
public Measurable measurable() {
- return this.measurable;
+ if (this.metricValueProvider instanceof Measurable)
+ return (Measurable) metricValueProvider;
+ else
+ throw new IllegalStateException("Not a measurable: " + this.metricValueProvider.getClass());
}
- double value(long timeMs) {
- return this.measurable.measure(config, timeMs);
+ double measurableValue(long timeMs) {
+ if (this.metricValueProvider instanceof Measurable)
+ return ((Measurable) metricValueProvider).measure(config, timeMs);
+ else
+ throw new IllegalStateException("Not a measurable metric");
}
public void config(MetricConfig config) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java b/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java
index d068895..866caba 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java
@@ -19,7 +19,7 @@ package org.apache.kafka.common.metrics;
/**
* A measurable quantity that can be registered as a metric
*/
-public interface Measurable {
+public interface Measurable extends MetricValueProvider<Double> {
/**
* Measure this quantity and return the result as a double
@@ -27,6 +27,6 @@ public interface Measurable {
* @param now The POSIX time in milliseconds the measurement is being taken
* @return The measured value
*/
- public double measure(MetricConfig config, long now);
+ double measure(MetricConfig config, long now);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/metrics/MetricValueProvider.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricValueProvider.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricValueProvider.java
new file mode 100644
index 0000000..68028e7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricValueProvider.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+/**
+ * Super-interface for {@link Measurable} or {@link Gauge} that provides
+ * metric values.
+ * <p>
+ * In the future for Java8 and above, {@link Gauge#value(MetricConfig, long)} will be
+ * moved to this interface with a default implementation in {@link Measurable} that returns
+ * {@link Measurable#measure(MetricConfig, long)}.
+ * </p>
+ */
+public interface MetricValueProvider<T> { }
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 0b4507b..8127022 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -449,6 +449,10 @@ public class Metrics implements Closeable {
/**
* Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
* This is a way to expose existing values as metrics.
+ *
+ * This method is kept for binary compatibility purposes, it has the same behaviour as
+ * {@link #addMetric(MetricName, MetricValue)}.
+ *
* @param metricName The name of the metric
* @param measurable The measurable that will be measured by this metric
*/
@@ -457,22 +461,48 @@ public class Metrics implements Closeable {
}
/**
- * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
+ * Add a metric to monitor an object that implements Measurable. This metric won't be associated with any sensor.
* This is a way to expose existing values as metrics.
+ *
+ * This method is kept for binary compatibility purposes, it has the same behaviour as
+ * {@link #addMetric(MetricName, MetricConfig, MetricValueProvider)}.
+ *
* @param metricName The name of the metric
* @param config The configuration to use when measuring this measurable
* @param measurable The measurable that will be measured by this metric
*/
- public synchronized void addMetric(MetricName metricName, MetricConfig config, Measurable measurable) {
+ public void addMetric(MetricName metricName, MetricConfig config, Measurable measurable) {
+ addMetric(metricName, config, (MetricValueProvider<?>) measurable);
+ }
+
+ /**
+ * Add a metric to monitor an object that implements MetricValueProvider. This metric won't be associated with any
+ * sensor. This is a way to expose existing values as metrics.
+ *
+ * @param metricName The name of the metric
+ * @param metricValueProvider The metric value provider associated with this metric
+ */
+ public void addMetric(MetricName metricName, MetricConfig config, MetricValueProvider<?> metricValueProvider) {
KafkaMetric m = new KafkaMetric(new Object(),
Utils.notNull(metricName),
- Utils.notNull(measurable),
+ Utils.notNull(metricValueProvider),
config == null ? this.config : config,
time);
registerMetric(m);
}
/**
+ * Add a metric to monitor an object that implements MetricValueProvider. This metric won't be associated with any
+ * sensor. This is a way to expose existing values as metrics.
+ *
+ * @param metricName The name of the metric
+ * @param metricValueProvider The metric value provider associated with this metric
+ */
+ public void addMetric(MetricName metricName, MetricValueProvider<?> metricValueProvider) {
+ addMetric(metricName, null, metricValueProvider);
+ }
+
+ /**
* Remove a metric if it exists and return it. Return null otherwise. If a metric is removed, `metricRemoval`
* will be invoked for each reporter.
*
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index 55d75b1..321fab6 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -199,7 +199,7 @@ public final class Sensor {
if (config != null) {
Quota quota = config.quota();
if (quota != null) {
- double value = metric.value(timeMs);
+ double value = metric.measurableValue(timeMs);
if (!quota.acceptable(value)) {
throw new QuotaViolationException(metric.metricName(), value,
quota.bound());
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index b753745..f70afe0 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -456,7 +456,14 @@ public class Selector implements Selectable, AutoCloseable {
/* if channel is not ready finish prepare */
if (channel.isConnected() && !channel.ready()) {
- channel.prepare();
+ try {
+ channel.prepare();
+ } catch (AuthenticationException e) {
+ sensors.failedAuthentication.record();
+ throw e;
+ }
+ if (channel.ready())
+ sensors.successfulAuthentication.record();
}
attemptRead(key, channel);
@@ -839,6 +846,8 @@ public class Selector implements Selectable, AutoCloseable {
public final Sensor connectionClosed;
public final Sensor connectionCreated;
+ public final Sensor successfulAuthentication;
+ public final Sensor failedAuthentication;
public final Sensor bytesTransferred;
public final Sensor bytesSent;
public final Sensor bytesReceived;
@@ -863,19 +872,27 @@ public class Selector implements Selectable, AutoCloseable {
tagsSuffix.append(tag.getValue());
}
- this.connectionClosed = sensor("connections-closed:" + tagsSuffix.toString());
+ this.connectionClosed = sensor("connections-closed:" + tagsSuffix);
this.connectionClosed.add(createMeter(metrics, metricGrpName, metricTags,
"connection-close", "connections closed"));
- this.connectionCreated = sensor("connections-created:" + tagsSuffix.toString());
+ this.connectionCreated = sensor("connections-created:" + tagsSuffix);
this.connectionCreated.add(createMeter(metrics, metricGrpName, metricTags,
"connection-creation", "new connections established"));
- this.bytesTransferred = sensor("bytes-sent-received:" + tagsSuffix.toString());
+ this.successfulAuthentication = sensor("successful-authentication:" + tagsSuffix);
+ this.successfulAuthentication.add(createMeter(metrics, metricGrpName, metricTags,
+ "successful-authentication", "connections with successful authentication"));
+
+ this.failedAuthentication = sensor("failed-authentication:" + tagsSuffix);
+ this.failedAuthentication.add(createMeter(metrics, metricGrpName, metricTags,
+ "failed-authentication", "connections with failed authentication"));
+
+ this.bytesTransferred = sensor("bytes-sent-received:" + tagsSuffix);
bytesTransferred.add(createMeter(metrics, metricGrpName, metricTags, new Count(),
"network-io", "network operations (reads or writes) on all connections"));
- this.bytesSent = sensor("bytes-sent:" + tagsSuffix.toString(), bytesTransferred);
+ this.bytesSent = sensor("bytes-sent:" + tagsSuffix, bytesTransferred);
this.bytesSent.add(createMeter(metrics, metricGrpName, metricTags,
"outgoing-byte", "outgoing bytes sent to all servers"));
this.bytesSent.add(createMeter(metrics, metricGrpName, metricTags, new Count(),
@@ -885,20 +902,20 @@ public class Selector implements Selectable, AutoCloseable {
metricName = metrics.metricName("request-size-max", metricGrpName, "The maximum size of any request sent.", metricTags);
this.bytesSent.add(metricName, new Max());
- this.bytesReceived = sensor("bytes-received:" + tagsSuffix.toString(), bytesTransferred);
+ this.bytesReceived = sensor("bytes-received:" + tagsSuffix, bytesTransferred);
this.bytesReceived.add(createMeter(metrics, metricGrpName, metricTags,
"incoming-byte", "bytes read off all sockets"));
this.bytesReceived.add(createMeter(metrics, metricGrpName, metricTags,
new Count(), "response", "responses received"));
- this.selectTime = sensor("select-time:" + tagsSuffix.toString());
+ this.selectTime = sensor("select-time:" + tagsSuffix);
this.selectTime.add(createMeter(metrics, metricGrpName, metricTags,
new Count(), "select", "times the I/O layer checked for new I/O to perform"));
metricName = metrics.metricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", metricTags);
this.selectTime.add(metricName, new Avg());
this.selectTime.add(createIOThreadRatioMeter(metrics, metricGrpName, metricTags, "io-wait", "waiting"));
- this.ioTime = sensor("io-time:" + tagsSuffix.toString());
+ this.ioTime = sensor("io-time:" + tagsSuffix);
metricName = metrics.metricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", metricTags);
this.ioTime.add(metricName, new Avg());
this.ioTime.add(createIOThreadRatioMeter(metrics, metricGrpName, metricTags, "io", "doing I/O"));
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 9ba2048..6a1b95c 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.record;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;
@@ -59,10 +60,12 @@ public abstract class AbstractRecords implements Records {
* need to drop records from the batch during the conversion. Some versions of librdkafka rely on this for
* correctness.
*/
- protected MemoryRecords downConvert(Iterable<? extends RecordBatch> batches, byte toMagic, long firstOffset) {
+ protected ConvertedRecords<MemoryRecords> downConvert(Iterable<? extends RecordBatch> batches, byte toMagic,
+ long firstOffset, Time time) {
// maintain the batch along with the decompressed records to avoid the need to decompress again
List<RecordBatchAndRecords> recordBatchAndRecordsList = new ArrayList<>();
int totalSizeEstimate = 0;
+ long startNanos = time.nanoseconds();
for (RecordBatch batch : batches) {
if (toMagic < RecordBatch.MAGIC_VALUE_V2 && batch.isControlBatch())
@@ -91,22 +94,31 @@ public abstract class AbstractRecords implements Records {
}
ByteBuffer buffer = ByteBuffer.allocate(totalSizeEstimate);
+ long temporaryMemoryBytes = 0;
+ long conversionCount = 0;
for (RecordBatchAndRecords recordBatchAndRecords : recordBatchAndRecordsList) {
- if (recordBatchAndRecords.batch.magic() <= toMagic)
+ temporaryMemoryBytes += recordBatchAndRecords.batch.sizeInBytes();
+ if (recordBatchAndRecords.batch.magic() <= toMagic) {
recordBatchAndRecords.batch.writeTo(buffer);
- else
- buffer = convertRecordBatch(toMagic, buffer, recordBatchAndRecords);
+ } else {
+ MemoryRecordsBuilder builder = convertRecordBatch(toMagic, buffer, recordBatchAndRecords);
+ buffer = builder.buffer();
+ temporaryMemoryBytes += builder.uncompressedBytesWritten();
+ conversionCount++;
+ }
}
buffer.flip();
- return MemoryRecords.readableRecords(buffer);
+ RecordsProcessingStats stats = new RecordsProcessingStats(temporaryMemoryBytes, conversionCount,
+ time.nanoseconds() - startNanos);
+ return new ConvertedRecords<>(MemoryRecords.readableRecords(buffer), stats);
}
/**
* Return a buffer containing the converted record batches. The returned buffer may not be the same as the received
* one (e.g. it may require expansion).
*/
- private ByteBuffer convertRecordBatch(byte magic, ByteBuffer buffer, RecordBatchAndRecords recordBatchAndRecords) {
+ private MemoryRecordsBuilder convertRecordBatch(byte magic, ByteBuffer buffer, RecordBatchAndRecords recordBatchAndRecords) {
RecordBatch batch = recordBatchAndRecords.batch;
final TimestampType timestampType = batch.timestampType();
long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ? batch.maxTimestamp() : RecordBatch.NO_TIMESTAMP;
@@ -117,7 +129,7 @@ public abstract class AbstractRecords implements Records {
builder.append(record);
builder.close();
- return builder.buffer();
+ return builder;
}
/**
@@ -192,7 +204,8 @@ public abstract class AbstractRecords implements Records {
* Get an upper bound estimate on the batch size needed to hold a record with the given fields. This is only
* an estimate because it does not take into account overhead from the compression algorithm.
*/
- public static int estimateSizeInBytesUpperBound(byte magic, CompressionType compressionType, ByteBuffer key, ByteBuffer value, Header[] headers) {
+ public static int estimateSizeInBytesUpperBound(byte magic, CompressionType compressionType, ByteBuffer key,
+ ByteBuffer value, Header[] headers) {
if (magic >= RecordBatch.MAGIC_VALUE_V2)
return DefaultRecordBatch.estimateBatchSizeUpperBound(key, value, headers);
else if (compressionType != CompressionType.NONE)
@@ -201,6 +214,23 @@ public abstract class AbstractRecords implements Records {
return Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value);
}
+ /**
+ * Return the size of the record batch header.
+ *
+ * For V0 and V1 with no compression, it's unclear if Records.LOG_OVERHEAD or 0 should be chosen. There is no header
+ * per batch, but a sequence of batches is preceded by the offset and size. This method returns `0` as it's what
+ * `MemoryRecordsBuilder` requires.
+ */
+ public static int recordBatchHeaderSizeInBytes(byte magic, CompressionType compressionType) {
+ if (magic > RecordBatch.MAGIC_VALUE_V1) {
+ return DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
+ } else if (compressionType != CompressionType.NONE) {
+ return Records.LOG_OVERHEAD + LegacyRecord.recordOverhead(magic);
+ } else {
+ return 0;
+ }
+ }
+
private static class RecordBatchAndRecords {
private final RecordBatch batch;
private final List<Record> records;
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java b/clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java
new file mode 100644
index 0000000..fe37c48
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+public class ConvertedRecords<T extends Records> {
+
+ private final T records;
+ private final RecordsProcessingStats recordsProcessingStats;
+
+ public ConvertedRecords(T records, RecordsProcessingStats recordsProcessingStats) {
+ this.records = records;
+ this.recordsProcessingStats = recordsProcessingStats;
+ }
+
+ public T records() {
+ return records;
+ }
+
+ public RecordsProcessingStats recordsProcessingStats() {
+ return recordsProcessingStats;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index a898634..ae99db3 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import java.io.Closeable;
@@ -237,19 +238,19 @@ public class FileRecords extends AbstractRecords implements Closeable {
}
@Override
- public Records downConvert(byte toMagic, long firstOffset) {
+ public ConvertedRecords<? extends Records> downConvert(byte toMagic, long firstOffset, Time time) {
List<? extends RecordBatch> batches = Utils.toList(batches().iterator());
if (batches.isEmpty()) {
// This indicates that the message is too large, which means that the buffer is not large
- // enough to hold a full record batch. We just return all the bytes in the file message set.
- // Even though the message set does not have the right format version, we expect old clients
- // to raise an error to the user after reading the message size and seeing that there
- // are not enough available bytes in the response to read the full message. Note that this is
+ // enough to hold a full record batch. We just return all the bytes in this instance.
+ // Even though the record batch does not have the right format version, we expect old clients
+ // to raise an error to the user after reading the record batch size and seeing that there
+ // are not enough available bytes in the response to read it fully. Note that this is
// only possible prior to KIP-74, after which the broker was changed to always return at least
- // one full message, even if it requires exceeding the max fetch size requested by the client.
- return this;
+ // one full record batch, even if it requires exceeding the max fetch size requested by the client.
+ return new ConvertedRecords<>(this, RecordsProcessingStats.EMPTY);
} else {
- return downConvert(batches, toMagic, firstOffset);
+ return downConvert(batches, toMagic, firstOffset, time);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index d3f2444..2a25aad 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -111,8 +112,8 @@ public class MemoryRecords extends AbstractRecords {
}
@Override
- public MemoryRecords downConvert(byte toMagic, long firstOffset) {
- return downConvert(batches(), toMagic, firstOffset);
+ public ConvertedRecords<MemoryRecords> downConvert(byte toMagic, long firstOffset, Time time) {
+ return downConvert(batches(), toMagic, firstOffset, time);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index fc83134..ad0bab7 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -54,6 +54,7 @@ public class MemoryRecordsBuilder {
private final boolean isControlBatch;
private final int partitionLeaderEpoch;
private final int writeLimit;
+ private final int batchHeaderSizeInBytes;
// Use a conservative estimate of the compression ratio. The producer overrides this using statistics
// from previous batches before appending any records.
@@ -64,8 +65,7 @@ public class MemoryRecordsBuilder {
private long producerId;
private short producerEpoch;
private int baseSequence;
- private int writtenUncompressed = 0; // Number of bytes (excluding the header) written before compression
- private int batchHeaderSize;
+ private int uncompressedRecordsSizeInBytes = 0; // Number of bytes (excluding the header) written before compression
private int numRecords = 0;
private float actualCompressionRatio = 1;
private long maxTimestamp = RecordBatch.NO_TIMESTAMP;
@@ -104,7 +104,7 @@ public class MemoryRecordsBuilder {
this.baseOffset = baseOffset;
this.logAppendTime = logAppendTime;
this.numRecords = 0;
- this.writtenUncompressed = 0;
+ this.uncompressedRecordsSizeInBytes = 0;
this.actualCompressionRatio = 1;
this.maxTimestamp = RecordBatch.NO_TIMESTAMP;
this.producerId = producerId;
@@ -115,18 +115,9 @@ public class MemoryRecordsBuilder {
this.partitionLeaderEpoch = partitionLeaderEpoch;
this.writeLimit = writeLimit;
this.initialPosition = bufferStream.position();
+ this.batchHeaderSizeInBytes = AbstractRecords.recordBatchHeaderSizeInBytes(magic, compressionType);
- if (magic > RecordBatch.MAGIC_VALUE_V1) {
- batchHeaderSize = DefaultRecordBatch.RECORDS_OFFSET;
- } else if (compressionType != CompressionType.NONE) {
- // for compressed records, leave space for the header and the shallow message metadata
- // and move the starting position to the value payload offset
- batchHeaderSize = Records.LOG_OVERHEAD + LegacyRecord.recordOverhead(magic);
- } else {
- batchHeaderSize = 0;
- }
-
- bufferStream.position(initialPosition + batchHeaderSize);
+ bufferStream.position(initialPosition + batchHeaderSizeInBytes);
this.bufferStream = bufferStream;
this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
}
@@ -238,6 +229,17 @@ public class MemoryRecordsBuilder {
}
}
+ public int numRecords() {
+ return numRecords;
+ }
+
+ /**
+ * Return the sum of the size of the batch header (always uncompressed) and the records (before compression).
+ */
+ public int uncompressedBytesWritten() {
+ return uncompressedRecordsSizeInBytes + batchHeaderSizeInBytes;
+ }
+
public void setProducerState(long producerId, short producerEpoch, int baseSequence, boolean isTransactional) {
if (isClosed()) {
// Sequence numbers are assigned when the batch is closed while the accumulator is being drained.
@@ -306,9 +308,9 @@ public class MemoryRecordsBuilder {
builtRecords = MemoryRecords.EMPTY;
} else {
if (magic > RecordBatch.MAGIC_VALUE_V1)
- this.actualCompressionRatio = (float) writeDefaultBatchHeader() / this.writtenUncompressed;
+ this.actualCompressionRatio = (float) writeDefaultBatchHeader() / this.uncompressedRecordsSizeInBytes;
else if (compressionType != CompressionType.NONE)
- this.actualCompressionRatio = (float) writeLegacyCompressedWrapperHeader() / this.writtenUncompressed;
+ this.actualCompressionRatio = (float) writeLegacyCompressedWrapperHeader() / this.uncompressedRecordsSizeInBytes;
ByteBuffer buffer = buffer().duplicate();
buffer.flip();
@@ -651,7 +653,7 @@ public class MemoryRecordsBuilder {
", last offset: " + offset);
numRecords += 1;
- writtenUncompressed += size;
+ uncompressedRecordsSizeInBytes += size;
lastOffset = offset;
if (magic > RecordBatch.MAGIC_VALUE_V0 && timestamp > maxTimestamp) {
@@ -678,10 +680,10 @@ public class MemoryRecordsBuilder {
*/
private int estimatedBytesWritten() {
if (compressionType == CompressionType.NONE) {
- return batchHeaderSize + writtenUncompressed;
+ return batchHeaderSizeInBytes + uncompressedRecordsSizeInBytes;
} else {
// estimate the written bytes to the underlying byte buffer based on uncompressed written bytes
- return batchHeaderSize + (int) (writtenUncompressed * estimatedCompressionRatio * COMPRESSION_RATE_ESTIMATION_FACTOR);
+ return batchHeaderSizeInBytes + (int) (uncompressedRecordsSizeInBytes * estimatedCompressionRatio * COMPRESSION_RATE_ESTIMATION_FACTOR);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/record/Records.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java b/clients/src/main/java/org/apache/kafka/common/record/Records.java
index ec2e717..19152ba 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Records.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java
@@ -19,6 +19,8 @@ package org.apache.kafka.common.record;
import java.io.IOException;
import java.nio.channels.GatheringByteChannel;
+import org.apache.kafka.common.utils.Time;
+
/**
* Interface for accessing the records contained in a log. The log itself is represented as a sequence of record
* batches (see {@link RecordBatch}).
@@ -97,10 +99,11 @@ public interface Records {
* deep iteration since all of the deep records must also be converted to the desired format.
* @param toMagic The magic value to convert to
* @param firstOffset The starting offset for returned records. This only impacts some cases. See
- * {@link AbstractRecords#downConvert(Iterable, byte, long)} for an explanation.
- * @return A Records instance (which may or may not be the same instance)
+ * {@link AbstractRecords#downConvert(Iterable, byte, long, Time) for an explanation.
+ * @param time instance used for reporting stats
+ * @return A ConvertedRecords instance which may or may not contain the same instance in its records field.
*/
- Records downConvert(byte toMagic, long firstOffset);
+ ConvertedRecords<? extends Records> downConvert(byte toMagic, long firstOffset, Time time);
/**
* Get an iterator over the records in this log. Note that this generally requires decompression,
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/record/RecordsProcessingStats.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordsProcessingStats.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsProcessingStats.java
new file mode 100644
index 0000000..cbb76b7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsProcessingStats.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+public class RecordsProcessingStats {
+
+ public static final RecordsProcessingStats EMPTY = new RecordsProcessingStats(0L, 0L, -1);
+
+ private final long temporaryMemoryBytes;
+ private final long conversionCount;
+ private final long conversionTimeNanos;
+
+ public RecordsProcessingStats(long temporaryMemoryBytes, long conversionCount, long conversionTimeNanos) {
+ this.temporaryMemoryBytes = temporaryMemoryBytes;
+ this.conversionCount = conversionCount;
+ this.conversionTimeNanos = conversionTimeNanos;
+ }
+
+ /**
+ * Returns the number of temporary memory bytes allocated to process the records.
+ * This size depends on whether the records need decompression and/or conversion:
+ * <ul>
+ * <li>Non compressed, no conversion: zero</li>
+ * <li>Non compressed, with conversion: size of the converted buffer</li>
+ * <li>Compressed, no conversion: size of the original buffer after decompression</li>
+ * <li>Compressed, with conversion: size of the original buffer after decompression + size of the converted buffer uncompressed</li>
+ * </ul>
+ */
+ public long temporaryMemoryBytes() {
+ return temporaryMemoryBytes;
+ }
+
+ public long conversionCount() {
+ return conversionCount;
+ }
+
+ public long conversionTimeNanos() {
+ return conversionTimeNanos;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("RecordsProcessingStats(temporaryMemoryBytes=%d, conversionCount=%d, conversionTimeNanos=%d)",
+ temporaryMemoryBytes, conversionCount, conversionTimeNanos);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index da1d147..e6dd6da 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -19,9 +19,11 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
+import java.util.Map;
public abstract class AbstractRequest extends AbstractRequestResponse {
@@ -120,6 +122,18 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
public abstract AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e);
/**
+ * Get the error counts corresponding to an error response. This is overridden for requests
+ * where response may be null (e.g produce with acks=0).
+ */
+ public Map<Errors, Integer> errorCounts(Throwable e) {
+ AbstractResponse response = getErrorResponse(0, e);
+ if (response == null)
+ throw new IllegalStateException("Error counts could not be obtained for request " + this);
+ else
+ return response.errorCounts();
+ }
+
+ /**
* Factory method for getting a request object based on ApiKey ID and a version
*/
public static AbstractRequest parseRequest(ApiKeys apiKey, short apiVersion, Struct struct) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 4cff798..12fe3c8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -19,9 +19,13 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
public abstract class AbstractResponse extends AbstractRequestResponse {
public static final int DEFAULT_THROTTLE_TIME = 0;
@@ -37,6 +41,31 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
return serialize(responseHeader.toStruct(), toStruct(version));
}
+ public abstract Map<Errors, Integer> errorCounts();
+
+ protected Map<Errors, Integer> errorCounts(Errors error) {
+ return Collections.singletonMap(error, 1);
+ }
+
+ protected Map<Errors, Integer> errorCounts(Map<?, Errors> errors) {
+ Map<Errors, Integer> errorCounts = new HashMap<>();
+ for (Errors error : errors.values())
+ updateErrorCounts(errorCounts, error);
+ return errorCounts;
+ }
+
+ protected Map<Errors, Integer> apiErrorCounts(Map<?, ApiError> errors) {
+ Map<Errors, Integer> errorCounts = new HashMap<>();
+ for (ApiError apiError : errors.values())
+ updateErrorCounts(errorCounts, apiError.error());
+ return errorCounts;
+ }
+
+ protected void updateErrorCounts(Map<Errors, Integer> errorCounts, Errors error) {
+ Integer count = errorCounts.get(error);
+ errorCounts.put(error, count == null ? 1 : count + 1);
+ }
+
protected abstract Struct toStruct(short version);
public static AbstractResponse parseResponse(ApiKeys apiKey, Struct struct) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
index 10dc279..69b8ad4 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
+import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
@@ -67,6 +68,11 @@ public class AddOffsetsToTxnResponse extends AbstractResponse {
}
@Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
+ @Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.ADD_OFFSETS_TO_TXN.responseSchema(version));
struct.set(THROTTLE_TIME_MS, throttleTimeMs);
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
index e9f6088..4472449 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
@@ -95,6 +95,11 @@ public class AddPartitionsToTxnResponse extends AbstractResponse {
}
@Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(errors);
+ }
+
+ @Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.ADD_PARTITIONS_TO_TXN.responseSchema(version));
struct.set(THROTTLE_TIME_MS, throttleTimeMs);
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
index df9416e..f292ef6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
@@ -81,6 +82,11 @@ public class AlterConfigsResponse extends AbstractResponse {
return errors;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return apiErrorCounts(errors);
+ }
+
public int throttleTimeMs() {
return throttleTimeMs;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
index 1767d45..b875104 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
@@ -124,6 +124,11 @@ public class AlterReplicaDirResponse extends AbstractResponse {
return this.responses;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(responses);
+ }
+
public static AlterReplicaDirResponse parse(ByteBuffer buffer, short version) {
return new AlterReplicaDirResponse(ApiKeys.ALTER_REPLICA_DIR.responseSchema(version).read(buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 2bdc8aa..753db9e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -159,6 +159,11 @@ public class ApiVersionsResponse extends AbstractResponse {
return error;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
public static ApiVersionsResponse parse(ByteBuffer buffer, short version) {
return new ApiVersionsResponse(ApiKeys.API_VERSIONS.parseResponse(version, buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
index dfd68e7..f9852dd 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
@@ -85,6 +86,11 @@ public class ControlledShutdownResponse extends AbstractResponse {
return error;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
public Set<TopicPartition> partitionsRemaining() {
return partitionsRemaining;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
index 836215e..6c35798 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
@@ -20,11 +20,14 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
@@ -101,6 +104,14 @@ public class CreateAclsResponse extends AbstractResponse {
return aclCreationResponses;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ Map<Errors, Integer> errorCounts = new HashMap<>();
+ for (AclCreationResponse response : aclCreationResponses)
+ updateErrorCounts(errorCounts, response.error.error());
+ return errorCounts;
+ }
+
public static CreateAclsResponse parse(ByteBuffer buffer, short version) {
return new CreateAclsResponse(ApiKeys.CREATE_ACLS.responseSchema(version).read(buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
index 3e3cc30..41c5327 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
@@ -122,7 +122,7 @@ public class CreatePartitionsRequest extends AbstractRequest {
Object[] assignmentsArray = partitionCountStruct.getArray(ASSIGNMENT_KEY_NAME);
NewPartitions newPartition;
if (assignmentsArray != null) {
- List<List<Integer>> assignments = new ArrayList(assignmentsArray.length);
+ List<List<Integer>> assignments = new ArrayList<>(assignmentsArray.length);
for (Object replicas : assignmentsArray) {
Object[] replicasArray = (Object[]) replicas;
List<Integer> replicasList = new ArrayList<>(replicasArray.length);
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsResponse.java
index 390221f..73a6b56 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsResponse.java
@@ -18,6 +18,7 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
@@ -94,6 +95,11 @@ public class CreatePartitionsResponse extends AbstractResponse {
return errors;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return apiErrorCounts(errors);
+ }
+
public int throttleTimeMs() {
return throttleTimeMs;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
index b3c052b..1f73eda 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
@@ -125,6 +126,11 @@ public class CreateTopicsResponse extends AbstractResponse {
return errors;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return apiErrorCounts(errors);
+ }
+
public static CreateTopicsResponse parse(ByteBuffer buffer, short version) {
return new CreateTopicsResponse(ApiKeys.CREATE_TOPICS.responseSchema(version).read(buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
index 0857287..7ae25da 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.resource.Resource;
import org.apache.kafka.common.utils.Utils;
@@ -31,7 +32,9 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
@@ -182,6 +185,14 @@ public class DeleteAclsResponse extends AbstractResponse {
return responses;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ Map<Errors, Integer> errorCounts = new HashMap<>();
+ for (AclFilterResponse response : responses)
+ updateErrorCounts(errorCounts, response.error.error());
+ return errorCounts;
+ }
+
public static DeleteAclsResponse parse(ByteBuffer buffer, short version) {
return new DeleteAclsResponse(ApiKeys.DELETE_ACLS.responseSchema(version).read(buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
index 5bfdec8..b839bf5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
@@ -161,6 +161,14 @@ public class DeleteRecordsResponse extends AbstractResponse {
return this.responses;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ Map<Errors, Integer> errorCounts = new HashMap<>();
+ for (PartitionResponse response : responses.values())
+ updateErrorCounts(errorCounts, response.error);
+ return errorCounts;
+ }
+
public static DeleteRecordsResponse parse(ByteBuffer buffer, short version) {
return new DeleteRecordsResponse(ApiKeys.DELETE_RECORDS.responseSchema(version).read(buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
index 9c84c11..d13e441 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
@@ -110,6 +110,11 @@ public class DeleteTopicsResponse extends AbstractResponse {
return errors;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(errors);
+ }
+
public static DeleteTopicsResponse parse(ByteBuffer buffer, short version) {
return new DeleteTopicsResponse(ApiKeys.DELETE_TOPICS.responseSchema(version).read(buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
index f8b9695..a21230b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.resource.Resource;
@@ -133,6 +134,11 @@ public class DescribeAclsResponse extends AbstractResponse {
return error;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error.error());
+ }
+
public Collection<AclBinding> acls() {
return acls;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
index 9b2289d..91bf30e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
@@ -176,6 +177,14 @@ public class DescribeConfigsResponse extends AbstractResponse {
}
@Override
+ public Map<Errors, Integer> errorCounts() {
+ Map<Errors, Integer> errorCounts = new HashMap<>();
+ for (Config response : configs.values())
+ updateErrorCounts(errorCounts, response.error.error());
+ return errorCounts;
+ }
+
+ @Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.DESCRIBE_CONFIGS.responseSchema(version));
struct.set(THROTTLE_TIME_MS, throttleTimeMs);
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index 9241165..61c5a36 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -141,6 +141,13 @@ public class DescribeGroupsResponse extends AbstractResponse {
return groups;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ Map<Errors, Integer> errorCounts = new HashMap<>();
+ for (GroupMetadata response : groups.values())
+ updateErrorCounts(errorCounts, response.error);
+ return errorCounts;
+ }
public static class GroupMetadata {
private final Errors error;
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
index dc226d8..6613dfe 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
@@ -162,6 +162,14 @@ public class DescribeLogDirsResponse extends AbstractResponse {
return throttleTimeMs;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ Map<Errors, Integer> errorCounts = new HashMap<>();
+ for (LogDirInfo logDirInfo : logDirInfos.values())
+ updateErrorCounts(errorCounts, logDirInfo.error);
+ return errorCounts;
+ }
+
public Map<String, LogDirInfo> logDirInfos() {
return logDirInfos;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
index a3bae58..a0a453d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
+import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
@@ -66,6 +67,11 @@ public class EndTxnResponse extends AbstractResponse {
}
@Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
+ @Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.END_TXN.responseSchema(version));
struct.set(THROTTLE_TIME_MS, throttleTimeMs);
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index f8d3090..0d09027 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.record.Records;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -353,6 +354,14 @@ public class FetchResponse extends AbstractResponse {
return this.throttleTimeMs;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ Map<Errors, Integer> errorCounts = new HashMap<>();
+ for (PartitionData response : responseData.values())
+ updateErrorCounts(errorCounts, response.error);
+ return errorCounts;
+ }
+
public static FetchResponse parse(ByteBuffer buffer, short version) {
return new FetchResponse(ApiKeys.FETCH.responseSchema(version).read(buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
index f5d9805..bddc41f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
+import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
@@ -104,6 +105,11 @@ public class FindCoordinatorResponse extends AbstractResponse {
return error;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
public Node node() {
return node;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index e41d937..b52a993 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
+import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
@@ -74,6 +75,11 @@ public class HeartbeatResponse extends AbstractResponse {
}
@Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
+ @Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.HEARTBEAT.responseSchema(version));
struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
[2/3] kafka git commit: KAFKA-5746;
Add new metrics to support health checks (KIP-188)
Posted by ij...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
index 8799ad7..9ecb21f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.RecordBatch;
import java.nio.ByteBuffer;
+import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
@@ -88,6 +89,11 @@ public class InitProducerIdResponse extends AbstractResponse {
return error;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
public short epoch() {
return epoch;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index a4431b9..56491eb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -147,6 +147,11 @@ public class JoinGroupResponse extends AbstractResponse {
return error;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
public int generationId() {
return generationId;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
index 39b8c37..921c8ad 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
@@ -87,6 +87,11 @@ public class LeaderAndIsrResponse extends AbstractResponse {
return error;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
public static LeaderAndIsrResponse parse(ByteBuffer buffer, short version) {
return new LeaderAndIsrResponse(ApiKeys.LEADER_AND_ISR.parseResponse(version, buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
index bef21e9..f8682ec 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
+import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
@@ -73,6 +74,11 @@ public class LeaveGroupResponse extends AbstractResponse {
}
@Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
+ @Override
public Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.LEAVE_GROUP.responseSchema(version));
struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index 8f48f39..afc5ebd 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
@@ -97,6 +98,11 @@ public class ListGroupsResponse extends AbstractResponse {
return error;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
public static class Group {
private final String groupId;
private final String protocolType;
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index 732fb49..13f2dfb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -191,6 +191,14 @@ public class ListOffsetResponse extends AbstractResponse {
return responseData;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ Map<Errors, Integer> errorCounts = new HashMap<>();
+ for (PartitionData response : responseData.values())
+ updateErrorCounts(errorCounts, response.error);
+ return errorCounts;
+ }
+
public static ListOffsetResponse parse(ByteBuffer buffer, short version) {
return new ListOffsetResponse(ApiKeys.LIST_OFFSETS.parseResponse(version, buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index fb69cef..99c4ffb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -294,6 +294,14 @@ public class MetadataResponse extends AbstractResponse {
return errors;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ Map<Errors, Integer> errorCounts = new HashMap<>();
+ for (TopicMetadata metadata : topicMetadata)
+ updateErrorCounts(errorCounts, metadata.error);
+ return errorCounts;
+ }
+
/**
* Returns the set of topics with the specified error
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 13484ed..b439034 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -145,6 +145,11 @@ public class OffsetCommitResponse extends AbstractResponse {
return responseData;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(responseData);
+ }
+
public static OffsetCommitResponse parse(ByteBuffer buffer, short version) {
return new OffsetCommitResponse(ApiKeys.OFFSET_COMMIT.parseResponse(version, buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index c3341e0..4d069fe 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -189,6 +189,11 @@ public class OffsetFetchResponse extends AbstractResponse {
return this.error;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
public Map<TopicPartition, PartitionData> responseData() {
return responseData;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
index 13d70b7..4a91533 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
@@ -82,6 +82,14 @@ public class OffsetsForLeaderEpochResponse extends AbstractResponse {
return epochEndOffsetsByPartition;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ Map<Errors, Integer> errorCounts = new HashMap<>();
+ for (EpochEndOffset response : epochEndOffsetsByPartition.values())
+ updateErrorCounts(errorCounts, response.error());
+ return errorCounts;
+ }
+
public static OffsetsForLeaderEpochResponse parse(ByteBuffer buffer, short versionId) {
return new OffsetsForLeaderEpochResponse(ApiKeys.OFFSET_FOR_LEADER_EPOCH.responseSchema(versionId).read(buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index fbc7f76..ee4a2e2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -334,6 +335,12 @@ public class ProduceRequest extends AbstractRequest {
}
}
+ @Override
+ public Map<Errors, Integer> errorCounts(Throwable e) {
+ Errors error = Errors.forException(e);
+ return Collections.singletonMap(error, partitions().size());
+ }
+
private Collection<TopicPartition> partitions() {
return partitionSizes.keySet();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index e1978dd..afedc9d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -233,6 +233,14 @@ public class ProduceResponse extends AbstractResponse {
return this.throttleTime;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ Map<Errors, Integer> errorCounts = new HashMap<>();
+ for (PartitionResponse response : responses.values())
+ updateErrorCounts(errorCounts, response.error);
+ return errorCounts;
+ }
+
public static final class PartitionResponse {
public Errors error;
public long baseOffset;
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
index c950cb9..e244182 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
+import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
@@ -85,6 +86,11 @@ public class SaslAuthenticateResponse extends AbstractResponse {
}
@Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
+ @Override
public Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.SASL_AUTHENTICATE.responseSchema(version));
struct.set(ERROR_CODE, error.code());
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
index c9f6369..252c62d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
@@ -76,6 +77,11 @@ public class SaslHandshakeResponse extends AbstractResponse {
}
@Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
+ @Override
public Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.SASL_HANDSHAKE.responseSchema(version));
struct.set(ERROR_CODE, error.code());
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
index 4196b83..8ad6222 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
@@ -84,6 +84,11 @@ public class StopReplicaResponse extends AbstractResponse {
return error;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
public static StopReplicaResponse parse(ByteBuffer buffer, short version) {
return new StopReplicaResponse(ApiKeys.STOP_REPLICA.parseResponse(version, buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
index d68b2cd..77f9512 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
+import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
@@ -82,6 +83,11 @@ public class SyncGroupResponse extends AbstractResponse {
return error;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
public ByteBuffer memberAssignment() {
return memberState;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
index 53804d9..ff0f8ce 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
@@ -125,6 +125,11 @@ public class TxnOffsetCommitResponse extends AbstractResponse {
return errors;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(errors);
+ }
+
public static TxnOffsetCommitResponse parse(ByteBuffer buffer, short version) {
return new TxnOffsetCommitResponse(ApiKeys.TXN_OFFSET_COMMIT.parseResponse(version, buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
index 9ff8e27..4c21cde 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
+import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
@@ -56,6 +57,11 @@ public class UpdateMetadataResponse extends AbstractResponse {
return error;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
public static UpdateMetadataResponse parse(ByteBuffer buffer, short version) {
return new UpdateMetadataResponse(ApiKeys.UPDATE_METADATA.parseResponse(version, buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
index 797fb59..f4bf157 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
@@ -150,6 +150,16 @@ public class WriteTxnMarkersResponse extends AbstractResponse {
return errors.get(producerId);
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ Map<Errors, Integer> errorCounts = new HashMap<>();
+ for (Map<TopicPartition, Errors> allErrors : errors.values()) {
+ for (Errors error : allErrors.values())
+ updateErrorCounts(errorCounts, error);
+ }
+ return errorCounts;
+ }
+
public static WriteTxnMarkersResponse parse(ByteBuffer buffer, short version) {
return new WriteTxnMarkersResponse(ApiKeys.WRITE_TXN_MARKERS.parseResponse(version, buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index fe57d27..739e0cd 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -113,7 +113,7 @@ public class SaslServerAuthenticator implements Authenticator {
// Next SASL state to be set when outgoing writes associated with the current SASL state complete
private SaslState pendingSaslState = null;
// Exception that will be thrown by `authenticate()` when SaslState is set to FAILED after outbound writes complete
- private IOException pendingException = null;
+ private AuthenticationException pendingException = null;
private SaslServer saslServer;
private String saslMechanism;
private AuthCallbackHandler callbackHandler;
@@ -272,8 +272,15 @@ public class SaslServerAuthenticator implements Authenticator {
default:
break;
}
+ } catch (SaslException | AuthenticationException e) {
+ // Exception will be propagated after response is sent to client
+ AuthenticationException authException = (e instanceof AuthenticationException) ?
+ (AuthenticationException) e : new AuthenticationException("SASL authentication failed", e);
+ setSaslState(SaslState.FAILED, authException);
} catch (Exception e) {
- setSaslState(SaslState.FAILED, new IOException(e));
+ // In the case of IOExceptions and other unexpected exceptions, fail immediately
+ saslState = SaslState.FAILED;
+ throw e;
}
}
}
@@ -303,7 +310,7 @@ public class SaslServerAuthenticator implements Authenticator {
setSaslState(saslState, null);
}
- private void setSaslState(SaslState saslState, IOException exception) throws IOException {
+ private void setSaslState(SaslState saslState, AuthenticationException exception) throws IOException {
if (netOutBuffer != null && !netOutBuffer.completed()) {
pendingSaslState = saslState;
pendingException = exception;
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
index 25e09e1..6053da6 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
@@ -24,6 +24,10 @@ import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,27 +55,49 @@ public class AppInfoParser {
return COMMIT_ID;
}
- public static synchronized void registerAppInfo(String prefix, String id) {
+ public static synchronized void registerAppInfo(String prefix, String id, Metrics metrics) {
try {
ObjectName name = new ObjectName(prefix + ":type=app-info,id=" + id);
AppInfo mBean = new AppInfo();
ManagementFactory.getPlatformMBeanServer().registerMBean(mBean, name);
+
+ registerMetrics(metrics); // prefix will be added later by JmxReporter
} catch (JMException e) {
log.warn("Error registering AppInfo mbean", e);
}
}
- public static synchronized void unregisterAppInfo(String prefix, String id) {
+ public static synchronized void unregisterAppInfo(String prefix, String id, Metrics metrics) {
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
try {
ObjectName name = new ObjectName(prefix + ":type=app-info,id=" + id);
if (server.isRegistered(name))
server.unregisterMBean(name);
+
+ unregisterMetrics(metrics);
} catch (JMException e) {
log.warn("Error unregistering AppInfo mbean", e);
}
}
+ private static MetricName metricName(Metrics metrics, String name) {
+ return metrics.metricName(name, "app-info", "Metric indicating " + name);
+ }
+
+ private static void registerMetrics(Metrics metrics) {
+ if (metrics != null) {
+ metrics.addMetric(metricName(metrics, "version"), new ImmutableValue<>(VERSION));
+ metrics.addMetric(metricName(metrics, "commit-id"), new ImmutableValue<>(COMMIT_ID));
+ }
+ }
+
+ private static void unregisterMetrics(Metrics metrics) {
+ if (metrics != null) {
+ metrics.removeMetric(metricName(metrics, "version"));
+ metrics.removeMetric(metricName(metrics, "commit-id"));
+ }
+ }
+
public interface AppInfoMBean {
public String getVersion();
public String getCommitId();
@@ -95,4 +121,17 @@ public class AppInfoParser {
}
}
+
+ static class ImmutableValue<T> implements Gauge<T> {
+ private final T value;
+
+ public ImmutableValue(T value) {
+ this.value = value;
+ }
+
+ @Override
+ public T value(MetricConfig config, long now) {
+ return value;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 5263d3b..cab385c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -109,6 +109,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+@SuppressWarnings("deprecation")
public class FetcherTest {
private ConsumerRebalanceListener listener = new NoOpConsumerRebalanceListener();
private String topicName = "test";
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index afccee0..3a92e63 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -240,6 +240,7 @@ public class SenderTest {
* Send multiple requests. Verify that the client side quota metrics have the right values
*/
@Test
+ @SuppressWarnings("deprecation")
public void testQuotaMetrics() throws Exception {
MockSelector selector = new MockSelector(time);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(this.senderMetricsRegistry);
@@ -1619,6 +1620,7 @@ public class SenderTest {
}
@Test
+ @SuppressWarnings("deprecation")
public void testAbortRetryWhenProducerIdChanges() throws InterruptedException {
final long producerId = 343434L;
TransactionManager transactionManager = new TransactionManager();
@@ -1711,6 +1713,7 @@ public class SenderTest {
testSplitBatchAndSend(txnManager, producerIdAndEpoch, tp);
}
+ @SuppressWarnings("deprecation")
private void testSplitBatchAndSend(TransactionManager txnManager,
ProducerIdAndEpoch producerIdAndEpoch,
TopicPartition tp) throws Exception {
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index e24c3d7..55f8e23 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -46,6 +46,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+@SuppressWarnings("deprecation")
public class MetricsTest {
private static final double EPS = 0.000001;
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
index 9db18e2..9b6f686 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
@@ -102,6 +102,7 @@ public class FrequenciesTest {
}
@Test
+ @SuppressWarnings("deprecation")
public void testUseWithMetrics() {
MetricName name1 = name("1");
MetricName name2 = name("2");
@@ -156,4 +157,4 @@ public class FrequenciesTest {
return new Frequency(name(name), value);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index 190fa3d..8d510f5 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -16,7 +16,11 @@
*/
package org.apache.kafka.common.network;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.CredentialCache;
@@ -24,6 +28,8 @@ import org.apache.kafka.common.security.scram.ScramCredentialUtils;
import org.apache.kafka.common.security.scram.ScramMechanism;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -36,6 +42,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
/**
* Non-blocking EchoServer implementation that uses ChannelBuilder to create channels
@@ -43,6 +50,9 @@ import java.util.List;
*
*/
public class NioEchoServer extends Thread {
+
+ private static final double EPS = 0.0001;
+
private final int port;
private final ServerSocketChannel serverSocketChannel;
private final List<SocketChannel> newChannels;
@@ -51,6 +61,7 @@ public class NioEchoServer extends Thread {
private final Selector selector;
private volatile WritableByteChannel outputChannel;
private final CredentialCache credentialCache;
+ private final Metrics metrics;
public NioEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig config,
String serverHost, ChannelBuilder channelBuilder, CredentialCache credentialCache) throws Exception {
@@ -67,7 +78,8 @@ public class NioEchoServer extends Thread {
ScramCredentialUtils.createCache(credentialCache, ScramMechanism.mechanismNames());
if (channelBuilder == null)
channelBuilder = ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialCache);
- this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder, new LogContext());
+ this.metrics = new Metrics();
+ this.selector = new Selector(5000, metrics, new MockTime(), "MetricGroup", channelBuilder, new LogContext());
acceptorThread = new AcceptorThread();
}
@@ -79,6 +91,43 @@ public class NioEchoServer extends Thread {
return credentialCache;
}
+ @SuppressWarnings("deprecation")
+ public double metricValue(String name) {
+ for (Map.Entry<MetricName, KafkaMetric> entry : metrics.metrics().entrySet()) {
+ if (entry.getKey().name().equals(name))
+ return entry.getValue().value();
+ }
+ throw new IllegalStateException("Metric not found, " + name + ", found=" + metrics.metrics().keySet());
+ }
+
+ public void verifyAuthenticationMetrics(int successfulAuthentications, final int failedAuthentications)
+ throws InterruptedException {
+ waitForMetric("successful-authentication", successfulAuthentications);
+ waitForMetric("failed-authentication", failedAuthentications);
+ }
+
+ private void waitForMetric(String name, final double expectedValue) throws InterruptedException {
+ final String totalName = name + "-total";
+ final String rateName = name + "-rate";
+ if (expectedValue == 0.0) {
+ assertEquals(expectedValue, metricValue(totalName), EPS);
+ assertEquals(expectedValue, metricValue(rateName), EPS);
+ } else {
+ TestUtils.waitForCondition(new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ return Math.abs(metricValue(totalName) - expectedValue) <= EPS;
+ }
+ }, "Metric not updated " + totalName);
+ TestUtils.waitForCondition(new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ return metricValue(rateName) > 0.0;
+ }
+ }, "Metric not updated " + rateName);
+ }
+ }
+
@Override
public void run() {
try {
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 8894873..be4dbc7 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -506,7 +506,7 @@ public class SelectorTest {
// record void method invocations
kafkaChannel.disconnect();
kafkaChannel.close();
- expect(kafkaChannel.ready()).andReturn(false);
+ expect(kafkaChannel.ready()).andReturn(false).anyTimes();
// prepare throws an exception
kafkaChannel.prepare();
expectLastCall().andThrow(new IOException());
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 90c8cd5..440140b 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -105,6 +105,7 @@ public class SslTransportLayerTest {
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.checkClientConnection(selector, node, 100, 10);
+ server.verifyAuthenticationMetrics(1, 0);
}
/**
@@ -230,6 +231,7 @@ public class SslTransportLayerTest {
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
+ server.verifyAuthenticationMetrics(0, 1);
}
/**
@@ -323,6 +325,7 @@ public class SslTransportLayerTest {
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
+ server.verifyAuthenticationMetrics(0, 1);
}
/**
@@ -343,6 +346,7 @@ public class SslTransportLayerTest {
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
+ server.verifyAuthenticationMetrics(0, 1);
}
/**
@@ -495,6 +499,7 @@ public class SslTransportLayerTest {
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
+ server.verifyAuthenticationMetrics(0, 1);
}
/**
@@ -512,6 +517,7 @@ public class SslTransportLayerTest {
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
+ server.verifyAuthenticationMetrics(0, 1);
}
/**
@@ -530,6 +536,7 @@ public class SslTransportLayerTest {
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
+ server.verifyAuthenticationMetrics(0, 1);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index b41db67..6df7c2d 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -17,6 +17,8 @@
package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
@@ -47,10 +49,12 @@ public class FileRecordsTest {
"ijkl".getBytes()
};
private FileRecords fileRecords;
+ private Time time;
@Before
public void setup() throws IOException {
this.fileRecords = createFileRecords(values);
+ this.time = new MockTime();
}
/**
@@ -310,7 +314,7 @@ public class FileRecordsTest {
int start = fileRecords.searchForOffsetWithSize(1, 0).position;
int size = batch.sizeInBytes();
FileRecords slice = fileRecords.read(start, size - 1);
- Records messageV0 = slice.downConvert(RecordBatch.MAGIC_VALUE_V0, 0);
+ Records messageV0 = slice.downConvert(RecordBatch.MAGIC_VALUE_V0, 0, time).records();
assertTrue("No message should be there", batches(messageV0).isEmpty());
assertEquals("There should be " + (size - 1) + " bytes", size - 1, messageV0.sizeInBytes());
}
@@ -362,7 +366,7 @@ public class FileRecordsTest {
try (FileRecords fileRecords = FileRecords.open(tempFile())) {
fileRecords.append(MemoryRecords.readableRecords(buffer));
fileRecords.flush();
- Records convertedRecords = fileRecords.downConvert(toMagic, 0L);
+ Records convertedRecords = fileRecords.downConvert(toMagic, 0L, time).records();
verifyConvertedRecords(records, offsets, convertedRecords, compressionType, toMagic);
if (toMagic <= RecordBatch.MAGIC_VALUE_V1 && compressionType == CompressionType.NONE) {
@@ -371,7 +375,7 @@ public class FileRecordsTest {
firstOffset = 11L; // v1 record
else
firstOffset = 17; // v2 record
- Records convertedRecords2 = fileRecords.downConvert(toMagic, firstOffset);
+ Records convertedRecords2 = fileRecords.downConvert(toMagic, firstOffset, time).records();
List<Long> filteredOffsets = new ArrayList<>(offsets);
List<SimpleRecord> filteredRecords = new ArrayList<>(records);
int index = filteredOffsets.indexOf(firstOffset) - 1;
@@ -380,7 +384,7 @@ public class FileRecordsTest {
verifyConvertedRecords(filteredRecords, filteredOffsets, convertedRecords2, compressionType, toMagic);
} else {
// firstOffset doesn't have any effect in this case
- Records convertedRecords2 = fileRecords.downConvert(toMagic, 10L);
+ Records convertedRecords2 = fileRecords.downConvert(toMagic, 10L, time).records();
verifyConvertedRecords(records, offsets, convertedRecords2, compressionType, toMagic);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index cc2bf79..4df3492 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.record;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
@@ -30,6 +31,7 @@ import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -38,10 +40,12 @@ public class MemoryRecordsBuilderTest {
private final CompressionType compressionType;
private final int bufferOffset;
+ private final Time time;
public MemoryRecordsBuilderTest(int bufferOffset, CompressionType compressionType) {
this.bufferOffset = bufferOffset;
this.compressionType = compressionType;
+ this.time = Time.SYSTEM;
}
@Test
@@ -456,7 +460,11 @@ public class MemoryRecordsBuilderTest {
buffer.flip();
- Records records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1, 0);
+ ConvertedRecords<MemoryRecords> convertedRecords = MemoryRecords.readableRecords(buffer)
+ .downConvert(RecordBatch.MAGIC_VALUE_V1, 0, time);
+ MemoryRecords records = convertedRecords.records();
+ verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(),
+ 3, 2, records.sizeInBytes(), buffer.limit());
List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator());
if (compressionType != CompressionType.NONE) {
@@ -493,7 +501,11 @@ public class MemoryRecordsBuilderTest {
buffer.flip();
- Records records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1, 0);
+ ConvertedRecords<MemoryRecords> convertedRecords = MemoryRecords.readableRecords(buffer)
+ .downConvert(RecordBatch.MAGIC_VALUE_V1, 0, time);
+ MemoryRecords records = convertedRecords.records();
+ verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(), 3, 1,
+ records.sizeInBytes(), buffer.limit());
List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator());
if (compressionType != CompressionType.NONE) {
@@ -517,7 +529,10 @@ public class MemoryRecordsBuilderTest {
assertEquals("2", utf8(logRecords.get(1).key()));
assertEquals("3", utf8(logRecords.get(2).key()));
- records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1, 2L);
+ convertedRecords = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1, 2L, time);
+ records = convertedRecords.records();
+ verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(), 3, 1,
+ records.sizeInBytes(), buffer.limit());
batches = Utils.toList(records.batches().iterator());
logRecords = Utils.toList(records.records().iterator());
@@ -619,4 +634,26 @@ public class MemoryRecordsBuilderTest {
return values;
}
+ private void verifyRecordsProcessingStats(RecordsProcessingStats processingStats, int recordCount, int convertedCount,
+ long finalBytes, long preConvertedBytes) {
+ assertNotNull("Records processing info is null", processingStats);
+ assertEquals(convertedCount, processingStats.conversionCount());
+ assertTrue("Processing time not recorded", processingStats.conversionTimeNanos() > 0);
+ long tempBytes = processingStats.temporaryMemoryBytes();
+ if (compressionType == CompressionType.NONE) {
+ if (convertedCount == 0)
+ assertEquals(finalBytes, tempBytes);
+ else if (convertedCount == recordCount)
+ assertEquals(preConvertedBytes + finalBytes, tempBytes);
+ else {
+ assertTrue(String.format("Unexpected temp bytes %d final %d pre %d", tempBytes, finalBytes, preConvertedBytes),
+ tempBytes > finalBytes && tempBytes < finalBytes + preConvertedBytes);
+ }
+ } else {
+ long compressedBytes = finalBytes - Records.LOG_OVERHEAD - LegacyRecord.RECORD_OVERHEAD_V0;
+ assertTrue(String.format("Uncompressed size expected temp=%d, compressed=%d", tempBytes, compressedBytes),
+ tempBytes > compressedBytes);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index c59f2c9..c2c8d81 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -126,6 +126,7 @@ public class SaslAuthenticatorTest {
server = createEchoServer(securityProtocol);
createAndCheckClientConnection(securityProtocol, node);
+ server.verifyAuthenticationMetrics(1, 0);
}
/**
@@ -139,6 +140,7 @@ public class SaslAuthenticatorTest {
server = createEchoServer(securityProtocol);
createAndCheckClientConnection(securityProtocol, node);
+ server.verifyAuthenticationMetrics(1, 0);
}
/**
@@ -153,6 +155,7 @@ public class SaslAuthenticatorTest {
server = createEchoServer(securityProtocol);
createAndCheckClientConnectionFailure(securityProtocol, node);
+ server.verifyAuthenticationMetrics(0, 1);
}
/**
@@ -167,6 +170,7 @@ public class SaslAuthenticatorTest {
server = createEchoServer(securityProtocol);
createAndCheckClientConnectionFailure(securityProtocol, node);
+ server.verifyAuthenticationMetrics(0, 1);
}
/**
@@ -268,6 +272,7 @@ public class SaslAuthenticatorTest {
server = createEchoServer(securityProtocol);
updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
createAndCheckClientConnection(securityProtocol, "0");
+ server.verifyAuthenticationMetrics(1, 0);
}
/**
@@ -303,6 +308,7 @@ public class SaslAuthenticatorTest {
server = createEchoServer(securityProtocol);
updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
createAndCheckClientConnectionFailure(securityProtocol, node);
+ server.verifyAuthenticationMetrics(0, 1);
}
/**
@@ -321,6 +327,7 @@ public class SaslAuthenticatorTest {
server = createEchoServer(securityProtocol);
updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
createAndCheckClientConnectionFailure(securityProtocol, node);
+ server.verifyAuthenticationMetrics(0, 1);
}
/**
@@ -338,9 +345,11 @@ public class SaslAuthenticatorTest {
String node = "1";
saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
createAndCheckClientConnectionFailure(securityProtocol, node);
+ server.verifyAuthenticationMetrics(0, 1);
saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
createAndCheckClientConnection(securityProtocol, "2");
+ server.verifyAuthenticationMetrics(1, 1);
}
/**
@@ -643,6 +652,7 @@ public class SaslAuthenticatorTest {
server = createEchoServer(securityProtocol);
createAndCheckClientConnectionFailure(securityProtocol, node);
+ server.verifyAuthenticationMetrics(0, 1);
}
/**
@@ -657,6 +667,7 @@ public class SaslAuthenticatorTest {
server = createEchoServer(securityProtocol);
createAndCheckClientConnectionFailure(securityProtocol, node);
+ server.verifyAuthenticationMetrics(0, 1);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
index 9d8be8d..4abff84 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
@@ -40,7 +40,6 @@ import java.util.HashMap;
import java.util.Map;
import static org.apache.kafka.common.security.scram.ScramMechanism.SCRAM_SHA_256;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class SaslServerAuthenticatorTest {
@@ -100,8 +99,8 @@ public class SaslServerAuthenticatorTest {
try {
authenticator.authenticate();
fail("Expected authenticate() to raise an exception");
- } catch (IOException e) {
- assertTrue(e.getCause() instanceof IllegalSaslStateException);
+ } catch (IllegalSaslStateException e) {
+ // expected exception
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
index 8c8ba6d..eb01337 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -73,7 +73,7 @@ public class ConnectMetrics {
reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time);
LOG.debug("Registering Connect metrics with JMX for worker '{}'", workerId);
- AppInfoParser.registerAppInfo(JMX_PREFIX, workerId);
+ AppInfoParser.registerAppInfo(JMX_PREFIX, workerId, metrics);
}
/**
@@ -164,7 +164,7 @@ public class ConnectMetrics {
public void stop() {
metrics.close();
LOG.debug("Unregistering Connect metrics with JMX for worker '{}'", workerId);
- AppInfoParser.unregisterAppInfo(JMX_PREFIX, workerId);
+ AppInfoParser.unregisterAppInfo(JMX_PREFIX, workerId, metrics);
}
public static class MetricGroupId {
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index d61a29c..6e19d11 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -133,7 +133,7 @@ public class WorkerGroupMember {
configStorage,
listener);
- AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
+ AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
log.debug("Connect group member created");
} catch (Throwable t) {
// call close methods if internal objects are already constructed
@@ -200,7 +200,7 @@ public class WorkerGroupMember {
ClientUtils.closeQuietly(coordinator, "coordinator", firstException);
ClientUtils.closeQuietly(metrics, "consumer metrics", firstException);
ClientUtils.closeQuietly(client, "consumer network client", firstException);
- AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
+ AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics);
if (firstException.get() != null && !swallowException)
throw new KafkaException("Failed to stop the Connect group member", firstException.get());
else
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index bd193c7..306d64a 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -22,7 +22,7 @@ import joptsimple._
import kafka.common.Config
import kafka.common.InvalidConfigException
import kafka.log.LogConfig
-import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig, QuotaId}
+import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
import kafka.utils.{CommandLineUtils, ZkUtils}
import kafka.utils.Implicits._
import org.apache.kafka.common.security.JaasUtils
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 5db2c5d..4ea61ab 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -964,12 +964,12 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
private def registerLogDirEventNotificationListener() = {
debug("Registering logDirEventNotificationListener")
- zkUtils.zkClient.subscribeChildChanges(ZkUtils.LogDirEventNotificationPath, logDirEventNotificationListener)
+ zkUtils.subscribeChildChanges(ZkUtils.LogDirEventNotificationPath, logDirEventNotificationListener)
}
private def deregisterLogDirEventNotificationListener() = {
debug("De-registering logDirEventNotificationListener")
- zkUtils.zkClient.unsubscribeChildChanges(ZkUtils.LogDirEventNotificationPath, logDirEventNotificationListener)
+ zkUtils.unsubscribeChildChanges(ZkUtils.LogDirEventNotificationPath, logDirEventNotificationListener)
}
private def readControllerEpochFromZookeeper() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index f32da4b..e6c774a 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -48,11 +48,11 @@ import java.util.regex.Pattern
object LogAppendInfo {
val UnknownLogAppendInfo = LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L,
- NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
+ RecordsProcessingStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
def unknownLogAppendInfoWithLogStartOffset(logStartOffset: Long): LogAppendInfo =
LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
- NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
+ RecordsProcessingStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
}
/**
@@ -65,6 +65,7 @@ object LogAppendInfo {
* @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp.
* @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp
* @param logStartOffset The start offset of the log at the time of this append.
+ * @param recordsProcessingStats Statistics collected during record processing, `null` if `assignOffsets` is `false`
* @param sourceCodec The source codec used in the message set (send by the producer)
* @param targetCodec The target codec of the message set(after applying the broker compression configuration if any)
* @param shallowCount The number of shallow messages
@@ -77,6 +78,7 @@ case class LogAppendInfo(var firstOffset: Long,
var offsetOfMaxTimestamp: Long,
var logAppendTime: Long,
var logStartOffset: Long,
+ var recordsProcessingStats: RecordsProcessingStats,
sourceCodec: CompressionCodec,
targetCodec: CompressionCodec,
shallowCount: Int,
@@ -617,6 +619,7 @@ class Log(@volatile var dir: File,
val validateAndOffsetAssignResult = try {
LogValidator.validateMessagesAndAssignOffsets(validRecords,
offset,
+ time,
now,
appendInfo.sourceCodec,
appendInfo.targetCodec,
@@ -633,6 +636,7 @@ class Log(@volatile var dir: File,
appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
appendInfo.lastOffset = offset.value - 1
+ appendInfo.recordsProcessingStats = validateAndOffsetAssignResult.recordsProcessingStats
if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
appendInfo.logAppendTime = now
@@ -868,8 +872,8 @@ class Log(@volatile var dir: File,
// Apply broker-side compression if any
val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
- LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset, sourceCodec,
- targetCodec, shallowMessageCount, validBytesCount, monotonic)
+ LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset,
+ RecordsProcessingStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
}
private def updateProducers(batch: RecordBatch,
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/log/LogValidator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index b6b20e3..15750e9 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -23,6 +23,7 @@ import kafka.message.{CompressionCodec, NoCompressionCodec}
import kafka.utils.Logging
import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.record._
+import org.apache.kafka.common.utils.Time
import scala.collection.mutable
import scala.collection.JavaConverters._
@@ -46,6 +47,7 @@ private[kafka] object LogValidator extends Logging {
*/
private[kafka] def validateMessagesAndAssignOffsets(records: MemoryRecords,
offsetCounter: LongRef,
+ time: Time,
now: Long,
sourceCodec: CompressionCodec,
targetCodec: CompressionCodec,
@@ -58,14 +60,14 @@ private[kafka] object LogValidator extends Logging {
if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
// check the magic value
if (!records.hasMatchingMagic(magic))
- convertAndAssignOffsetsNonCompressed(records, offsetCounter, compactedTopic, now, timestampType,
+ convertAndAssignOffsetsNonCompressed(records, offsetCounter, compactedTopic, time, now, timestampType,
timestampDiffMaxMs, magic, partitionLeaderEpoch, isFromClient)
else
// Do in-place validation, offset assignment and maybe set timestamp
assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, timestampType, timestampDiffMaxMs,
partitionLeaderEpoch, isFromClient, magic)
} else {
- validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, now, sourceCodec, targetCodec, compactedTopic,
+ validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, time, now, sourceCodec, targetCodec, compactedTopic,
magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, isFromClient)
}
}
@@ -109,6 +111,7 @@ private[kafka] object LogValidator extends Logging {
private def convertAndAssignOffsetsNonCompressed(records: MemoryRecords,
offsetCounter: LongRef,
compactedTopic: Boolean,
+ time: Time,
now: Long,
timestampType: TimestampType,
timestampDiffMaxMs: Long,
@@ -137,12 +140,16 @@ private[kafka] object LogValidator extends Logging {
}
val convertedRecords = builder.build()
+
val info = builder.info
+ val recordsProcessingStats = new RecordsProcessingStats(builder.uncompressedBytesWritten,
+ builder.numRecords, time.nanoseconds - now)
ValidationAndOffsetAssignResult(
validatedRecords = convertedRecords,
maxTimestamp = info.maxTimestamp,
shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp,
- messageSizeMaybeChanged = true)
+ messageSizeMaybeChanged = true,
+ recordsProcessingStats = recordsProcessingStats)
}
private def assignOffsetsNonCompressed(records: MemoryRecords,
@@ -203,7 +210,8 @@ private[kafka] object LogValidator extends Logging {
validatedRecords = records,
maxTimestamp = maxTimestamp,
shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp,
- messageSizeMaybeChanged = false)
+ messageSizeMaybeChanged = false,
+ recordsProcessingStats = RecordsProcessingStats.EMPTY)
}
/**
@@ -215,6 +223,7 @@ private[kafka] object LogValidator extends Logging {
*/
def validateMessagesAndAssignOffsetsCompressed(records: MemoryRecords,
offsetCounter: LongRef,
+ time: Time,
now: Long,
sourceCodec: CompressionCodec,
targetCodec: CompressionCodec,
@@ -232,8 +241,11 @@ private[kafka] object LogValidator extends Logging {
val expectedInnerOffset = new LongRef(0)
val validatedRecords = new mutable.ArrayBuffer[Record]
+ var uncompressedSizeInBytes = 0
+
for (batch <- records.batches.asScala) {
validateBatch(batch, isFromClient, toMagic)
+ uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType())
// Do not compress control records unless they are written compressed
if (sourceCodec == NoCompressionCodec && batch.isControlBatch)
@@ -244,6 +256,8 @@ private[kafka] object LogValidator extends Logging {
if (sourceCodec != NoCompressionCodec && record.isCompressed)
throw new InvalidRecordException("Compressed outer record should not have an inner record with a " +
s"compression attribute set: $record")
+
+ uncompressedSizeInBytes += record.sizeInBytes()
if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
// Check if we need to overwrite offset
// No in place assignment situation 3
@@ -269,8 +283,9 @@ private[kafka] object LogValidator extends Logging {
val first = records.batches.asScala.head
(first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional)
}
- buildRecordsAndAssignOffsets(toMagic, offsetCounter, timestampType, CompressionType.forId(targetCodec.codec), now,
- validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch)
+ buildRecordsAndAssignOffsets(toMagic, offsetCounter, time, timestampType, CompressionType.forId(targetCodec.codec), now,
+ validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch, isFromClient,
+ uncompressedSizeInBytes)
} else {
// we can update the batch only and write the compressed payload as is
val batch = records.batches.iterator.next()
@@ -287,15 +302,18 @@ private[kafka] object LogValidator extends Logging {
if (toMagic >= RecordBatch.MAGIC_VALUE_V2)
batch.setPartitionLeaderEpoch(partitionLeaderEpoch)
+ val recordsProcessingStats = new RecordsProcessingStats(uncompressedSizeInBytes, 0, -1)
ValidationAndOffsetAssignResult(validatedRecords = records,
maxTimestamp = maxTimestamp,
shallowOffsetOfMaxTimestamp = lastOffset,
- messageSizeMaybeChanged = false)
+ messageSizeMaybeChanged = false,
+ recordsProcessingStats = recordsProcessingStats)
}
}
private def buildRecordsAndAssignOffsets(magic: Byte,
offsetCounter: LongRef,
+ time: Time,
timestampType: TimestampType,
compressionType: CompressionType,
logAppendTime: Long,
@@ -304,7 +322,10 @@ private[kafka] object LogValidator extends Logging {
producerEpoch: Short,
baseSequence: Int,
isTransactional: Boolean,
- partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
+ partitionLeaderEpoch: Int,
+ isFromClient: Boolean,
+ uncompresssedSizeInBytes: Int): ValidationAndOffsetAssignResult = {
+ val startNanos = time.nanoseconds
val estimatedSize = AbstractRecords.estimateSizeInBytes(magic, offsetCounter.value, compressionType,
validatedRecords.asJava)
val buffer = ByteBuffer.allocate(estimatedSize)
@@ -316,13 +337,23 @@ private[kafka] object LogValidator extends Logging {
}
val records = builder.build()
+
val info = builder.info
+ // This is not strictly correct, it represents the number of records where in-place assignment is not possible
+ // instead of the number of records that were converted. It will over-count cases where the source and target are
+ // message format V0 or if the inner offsets are not consecutive. This is OK since the impact is the same: we have
+ // to rebuild the records (including recompression if enabled).
+ val conversionCount = builder.numRecords
+ val recordsProcessingStats = new RecordsProcessingStats(uncompresssedSizeInBytes + builder.uncompressedBytesWritten,
+ conversionCount, time.nanoseconds - startNanos)
+
ValidationAndOffsetAssignResult(
validatedRecords = records,
maxTimestamp = info.maxTimestamp,
shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp,
- messageSizeMaybeChanged = true)
+ messageSizeMaybeChanged = true,
+ recordsProcessingStats = recordsProcessingStats)
}
private def validateKey(record: Record, compactedTopic: Boolean) {
@@ -352,6 +383,7 @@ private[kafka] object LogValidator extends Logging {
case class ValidationAndOffsetAssignResult(validatedRecords: MemoryRecords,
maxTimestamp: Long,
shallowOffsetOfMaxTimestamp: Long,
- messageSizeMaybeChanged: Boolean)
+ messageSizeMaybeChanged: Boolean,
+ recordsProcessingStats: RecordsProcessingStats)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index 1894213..b9ab486 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -40,19 +40,22 @@ trait KafkaMetricsGroup extends Logging {
* @param tags Additional attributes which mBean will have.
* @return Sanitized metric name object.
*/
- private def metricName(name: String, tags: scala.collection.Map[String, String]) = {
+ protected def metricName(name: String, tags: scala.collection.Map[String, String]): MetricName = {
val klass = this.getClass
val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName
val simpleName = klass.getSimpleName.replaceAll("\\$$", "")
- // Tags may contain ipv6 address with ':', which is not valid in JMX ObjectName
- def quoteIfRequired(value: String) = if (value.contains(':')) ObjectName.quote(value) else value
- val metricTags = tags.map(kv => (kv._1, quoteIfRequired(kv._2)))
- explicitMetricName(pkg, simpleName, name, metricTags)
+ explicitMetricName(pkg, simpleName, name, tags)
}
- private def explicitMetricName(group: String, typeName: String, name: String, tags: scala.collection.Map[String, String]) = {
+ protected def explicitMetricName(group: String, typeName: String, name: String,
+ tags: scala.collection.Map[String, String]): MetricName = {
+
+ // Tags may contain ipv6 address with ':', which is not valid in JMX ObjectName
+ def quoteIfRequired(value: String) = if (value.contains(':')) ObjectName.quote(value) else value
+ val metricTags = tags.map(kv => (kv._1, quoteIfRequired(kv._2)))
+
val nameBuilder: StringBuilder = new StringBuilder
nameBuilder.append(group)
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index b2ef615..e5f115c 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -21,19 +21,20 @@ import java.net.InetAddress
import java.nio.ByteBuffer
import java.util.concurrent._
-import com.yammer.metrics.core.Gauge
+import com.yammer.metrics.core.{Gauge, Meter}
import kafka.metrics.KafkaMetricsGroup
import kafka.network.RequestChannel.{BaseRequest, SendAction, ShutdownRequest, NoOpAction, CloseConnectionAction}
import kafka.utils.{Logging, NotNothing}
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.metrics.Sanitizer
import org.apache.kafka.common.network.Send
-import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Time
import org.apache.log4j.Logger
+import scala.collection.mutable
import scala.reflect.ClassTag
object RequestChannel extends Logging {
@@ -60,6 +61,8 @@ object RequestChannel extends Logging {
@volatile var responseCompleteTimeNanos = -1L
@volatile var responseDequeueTimeNanos = -1L
@volatile var apiRemoteCompleteTimeNanos = -1L
+ @volatile var messageConversionsTimeNanos = 0L
+ @volatile var temporaryMemoryBytes = 0L
@volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
val session = Session(context.principal, context.clientAddress)
@@ -121,6 +124,7 @@ object RequestChannel extends Logging {
val apiThrottleTimeMs = nanosToMs(responseCompleteTimeNanos - apiRemoteCompleteTimeNanos)
val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos)
val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos)
+ val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos)
val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)
val fetchMetricNames =
if (header.apiKey == ApiKeys.FETCH) {
@@ -133,7 +137,7 @@ object RequestChannel extends Logging {
else Seq.empty
val metricNames = fetchMetricNames :+ header.apiKey.name
metricNames.foreach { metricName =>
- val m = RequestMetrics.metricsMap(metricName)
+ val m = RequestMetrics(metricName)
m.requestRate.mark()
m.requestQueueTimeHist.update(Math.round(requestQueueTimeMs))
m.localTimeHist.update(Math.round(apiLocalTimeMs))
@@ -142,6 +146,9 @@ object RequestChannel extends Logging {
m.responseQueueTimeHist.update(Math.round(responseQueueTimeMs))
m.responseSendTimeHist.update(Math.round(responseSendTimeMs))
m.totalTimeHist.update(Math.round(totalTimeMs))
+ m.requestBytesHist.update(sizeOfBodyInBytes)
+ m.messageConversionsTimeHist.foreach(_.update(Math.round(messageConversionsTimeMs)))
+ m.tempMemoryBytesHist.foreach(_.update(temporaryMemoryBytes))
}
// Records network handler thread usage. This is included towards the request quota for the
@@ -171,6 +178,10 @@ object RequestChannel extends Logging {
.append(",securityProtocol:").append(context.securityProtocol)
.append(",principal:").append(session.principal)
.append(",listener:").append(context.listenerName.value)
+ if (temporaryMemoryBytes > 0)
+ builder.append(",temporaryMemoryBytes:").append(temporaryMemoryBytes)
+ if (messageConversionsTimeMs > 0)
+ builder.append(",messageConversionsTime:").append(messageConversionsTimeMs)
requestLogger.debug(builder.toString)
}
}
@@ -281,6 +292,12 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
responseListeners ::= onResponse
}
+ def updateErrorMetrics(apiKey: ApiKeys, errors: collection.Map[Errors, Integer]) {
+ errors.foreach { case (error, count) =>
+ RequestMetrics.markErrorMeter(apiKey.name, error, count)
+ }
+ }
+
def shutdown() {
requestQueue.clear()
}
@@ -290,11 +307,30 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
}
object RequestMetrics {
- val metricsMap = new scala.collection.mutable.HashMap[String, RequestMetrics]
+
+ private val metricsMap = mutable.Map[String, RequestMetrics]()
+
val consumerFetchMetricName = ApiKeys.FETCH.name + "Consumer"
val followFetchMetricName = ApiKeys.FETCH.name + "Follower"
- (ApiKeys.values().toList.map(e => e.name)
- ++ List(consumerFetchMetricName, followFetchMetricName)).foreach(name => metricsMap.put(name, new RequestMetrics(name)))
+
+ (ApiKeys.values.toSeq.map(_.name) ++ Seq(consumerFetchMetricName, followFetchMetricName)).foreach { name =>
+ metricsMap.put(name, new RequestMetrics(name))
+ }
+
+ def apply(metricName: String) = metricsMap(metricName)
+
+ def markErrorMeter(name: String, error: Errors, count: Int) {
+ val errorMeter = metricsMap(name).errorMeters(error)
+ errorMeter.getOrCreateMeter().mark(count.toLong)
+ }
+
+ // Used for testing until these metrics are moved to a class
+ private[kafka] def clearErrorMeters(): Unit = {
+ metricsMap.values.foreach { requestMetrics =>
+ requestMetrics.errorMeters.values.foreach(_.removeMeter())
+ }
+ }
+
}
class RequestMetrics(name: String) extends KafkaMetricsGroup {
@@ -306,11 +342,58 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup {
val localTimeHist = newHistogram("LocalTimeMs", biased = true, tags)
// time a request takes to wait on remote brokers (currently only relevant to fetch and produce requests)
val remoteTimeHist = newHistogram("RemoteTimeMs", biased = true, tags)
- // time a request is throttled (only relevant to fetch and produce requests)
+ // time a request is throttled
val throttleTimeHist = newHistogram("ThrottleTimeMs", biased = true, tags)
// time a response spent in a response queue
val responseQueueTimeHist = newHistogram("ResponseQueueTimeMs", biased = true, tags)
// time to send the response to the requester
val responseSendTimeHist = newHistogram("ResponseSendTimeMs", biased = true, tags)
val totalTimeHist = newHistogram("TotalTimeMs", biased = true, tags)
+ // request size in bytes
+ val requestBytesHist = newHistogram("RequestBytes", biased = true, tags)
+ // time for message conversions (only relevant to fetch and produce requests)
+ val messageConversionsTimeHist =
+ if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name)
+ Some(newHistogram("MessageConversionsTimeMs", biased = true, tags))
+ else
+ None
+ // Temporary memory allocated for processing request (only populated for fetch and produce requests)
+ // This shows the memory allocated for compression/conversions excluding the actual request size
+ val tempMemoryBytesHist =
+ if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name)
+ Some(newHistogram("TemporaryMemoryBytes", biased = true, tags))
+ else
+ None
+
+ private val errorMeters = mutable.Map[Errors, ErrorMeter]()
+ Errors.values.foreach(error => errorMeters.put(error, new ErrorMeter(name, error)))
+
+ class ErrorMeter(name: String, error: Errors) {
+ private val tags = Map("request" -> name, "error" -> error.name)
+
+ @volatile private var meter: Meter = null
+
+ def getOrCreateMeter(): Meter = {
+ if (meter != null)
+ meter
+ else {
+ synchronized {
+ if (meter == null)
+ meter = newMeter("ErrorsPerSec", "requests", TimeUnit.SECONDS, tags)
+ meter
+ }
+ }
+ }
+
+ // This is currently used only in tests.
+ def removeMeter(): Unit = {
+ synchronized {
+ if (meter != null) {
+ removeMetric("ErrorsPerSec", tags)
+ meter = null
+ }
+ }
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/server/AdminManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index fd31fc4..2c83c1b 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -25,7 +25,7 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
import org.apache.kafka.clients.admin.NewPartitions
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource}
-import org.apache.kafka.common.errors.{ApiException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, InvalidTopicException, PolicyViolationException, ReassignmentInProgressException, UnknownTopicOrPartitionException}
+import org.apache.kafka.common.errors.{ApiException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, PolicyViolationException, ReassignmentInProgressException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index c84fbcb..afaa5dd 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -16,8 +16,6 @@
*/
package kafka.server
-import java.net.{URLEncoder, URLDecoder}
-import java.nio.charset.StandardCharsets
import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit}
import java.util.concurrent.locks.ReentrantReadWriteLock