You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/28 20:59:22 UTC

[1/3] kafka git commit: KAFKA-5746; Add new metrics to support health checks (KIP-188)

Repository: kafka
Updated Branches:
  refs/heads/trunk dd6347a5d -> 021d8a8e9


http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6434d23..6781dc9 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -44,7 +44,7 @@ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANS
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, MemoryRecords, RecordBatch}
+import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, MemoryRecords, RecordBatch, RecordsProcessingStats}
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse
 import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse}
 import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _}
@@ -424,7 +424,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                 s"from client id ${request.header.clientId} with ack=0\n" +
                 s"Topic and partition to exceptions: $exceptionsSummary"
             )
-            closeConnection(request)
+            closeConnection(request, new ProduceResponse(mergedResponseStatus.asJava).errorCounts)
           } else {
             sendNoOpResponseExemptThrottle(request)
           }
@@ -444,6 +444,12 @@ class KafkaApis(val requestChannel: RequestChannel,
         produceResponseCallback)
     }
 
+    def processingStatsCallback(processingStats: Map[TopicPartition, RecordsProcessingStats]): Unit = {
+      processingStats.foreach { case (tp, info) =>
+        updateRecordsProcessingStats(request, tp, info)
+      }
+    }
+
     if (authorizedRequestInfo.isEmpty)
       sendResponseCallback(Map.empty)
     else {
@@ -456,7 +462,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         internalTopicsAllowed = internalTopicsAllowed,
         isFromClient = true,
         entriesPerPartition = authorizedRequestInfo,
-        responseCallback = sendResponseCallback)
+        responseCallback = sendResponseCallback,
+        processingStatsCallback = processingStatsCallback)
 
       // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
       // hence we clear its data here in order to let GC reclaim its memory since it is already appended to log
@@ -511,9 +518,11 @@ class KafkaApis(val requestChannel: RequestChannel,
 
         downConvertMagic.map { magic =>
           trace(s"Down converting records from partition $tp to message format version $magic for fetch request from $clientId")
-          val converted = data.records.downConvert(magic, fetchRequest.fetchData.get(tp).fetchOffset)
+          val startNanos = time.nanoseconds
+          val converted = data.records.downConvert(magic, fetchRequest.fetchData.get(tp).fetchOffset, time)
+          updateRecordsProcessingStats(request, tp, converted.recordsProcessingStats)
           new FetchResponse.PartitionData(data.error, data.highWatermark, FetchResponse.INVALID_LAST_STABLE_OFFSET,
-            data.logStartOffset, data.abortedTransactions, converted)
+            data.logStartOffset, data.abortedTransactions, converted.records)
         }
 
       }.getOrElse(data)
@@ -2002,6 +2011,25 @@ class KafkaApis(val requestChannel: RequestChannel,
       throw new ClusterAuthorizationException(s"Request $request is not authorized.")
   }
 
+  private def updateRecordsProcessingStats(request: RequestChannel.Request, tp: TopicPartition,
+                                           processingStats: RecordsProcessingStats): Unit = {
+    val conversionCount = processingStats.conversionCount
+    if (conversionCount > 0) {
+      request.header.apiKey match {
+        case ApiKeys.PRODUCE =>
+          brokerTopicStats.topicStats(tp.topic).produceMessageConversionsRate.mark(conversionCount)
+          brokerTopicStats.allTopicsStats.produceMessageConversionsRate.mark(conversionCount)
+        case ApiKeys.FETCH =>
+          brokerTopicStats.topicStats(tp.topic).fetchMessageConversionsRate.mark(conversionCount)
+          brokerTopicStats.allTopicsStats.fetchMessageConversionsRate.mark(conversionCount)
+        case _ =>
+          throw new IllegalStateException("Message conversion info is recorded only for Produce/Fetch requests")
+      }
+      request.messageConversionsTimeNanos = processingStats.conversionTimeNanos
+    }
+    request.temporaryMemoryBytes = processingStats.temporaryMemoryBytes
+  }
+
   private def handleError(request: RequestChannel.Request, e: Throwable) {
     val mayThrottle = e.isInstanceOf[ClusterAuthorizationException] || !request.header.apiKey.clusterAction
     error("Error when handling request %s".format(request.body[AbstractRequest]), e)
@@ -2031,9 +2059,10 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   private def sendErrorOrCloseConnection(request: RequestChannel.Request, error: Throwable)(throttleMs: Int): Unit = {
-    val response = request.body[AbstractRequest].getErrorResponse(throttleMs, error)
+    val requestBody = request.body[AbstractRequest]
+    val response = requestBody.getErrorResponse(throttleMs, error)
     if (response == null)
-      closeConnection(request)
+      closeConnection(request, requestBody.errorCounts(error))
     else
       sendResponse(request, Some(response))
   }
@@ -2043,13 +2072,17 @@ class KafkaApis(val requestChannel: RequestChannel,
     sendResponse(request, None)
   }
 
-  private def closeConnection(request: RequestChannel.Request): Unit = {
+  private def closeConnection(request: RequestChannel.Request, errorCounts: java.util.Map[Errors, Integer]): Unit = {
     // This case is used when the request handler has encountered an error, but the client
     // does not expect a response (e.g. when produce request has acks set to 0)
+    requestChannel.updateErrorMetrics(request.header.apiKey, errorCounts.asScala)
     requestChannel.sendResponse(new RequestChannel.Response(request, None, CloseConnectionAction, None))
   }
 
   private def sendResponse(request: RequestChannel.Request, responseOpt: Option[AbstractResponse]): Unit = {
+    // Update error metrics for each error code in the response including Errors.NONE
+    responseOpt.foreach(response => requestChannel.updateErrorMetrics(request.header.apiKey, response.errorCounts.asScala))
+
     responseOpt match {
       case Some(response) =>
         val responseSend = request.context.buildResponse(response)

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index b108bf6..43c81ab 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -25,10 +25,13 @@ import kafka.api.ApiVersion
 import kafka.cluster.EndPoint
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
+import com.yammer.metrics.core.Gauge
 import org.I0Itec.zkclient.IZkStateListener
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.zookeeper.Watcher.Event.KeeperState
 
+import scala.collection.mutable.Set
+
 /**
  * This class registers the broker in zookeeper to allow 
  * other brokers and consumers to detect failures. It uses an ephemeral znode with the path:
@@ -71,6 +74,8 @@ class KafkaHealthcheck(brokerId: Int,
       interBrokerProtocolVersion)
   }
 
+  def shutdown(): Unit = sessionExpireListener.shutdown()
+
   /**
    *  When we get a SessionExpired event, it means that we have lost all ephemeral nodes and ZKClient has re-established
    *  a connection for us. We need to re-register this broker in the broker registry. We rely on `handleStateChanged`
@@ -78,6 +83,8 @@ class KafkaHealthcheck(brokerId: Int,
    */
   class SessionExpireListener extends IZkStateListener with KafkaMetricsGroup {
 
+    private val metricNames = Set[String]()
+
     private[server] val stateToMeterMap = {
       import KeeperState._
       val stateToEventTypeMap = Map(
@@ -89,10 +96,20 @@ class KafkaHealthcheck(brokerId: Int,
         Expired -> "Expires"
       )
       stateToEventTypeMap.map { case (state, eventType) =>
-        state -> newMeter(s"ZooKeeper${eventType}PerSec", eventType.toLowerCase(Locale.ROOT), TimeUnit.SECONDS)
+        val name = s"ZooKeeper${eventType}PerSec"
+        metricNames += name
+        state -> newMeter(name, eventType.toLowerCase(Locale.ROOT), TimeUnit.SECONDS)
       }
     }
 
+    private[server] val sessionStateGauge =
+      newGauge("SessionState", new Gauge[String] {
+        override def value: String =
+          Option(zkUtils.zkConnection.getZookeeperState.toString).getOrElse("DISCONNECTED")
+      })
+
+    metricNames += "SessionState"
+
     @throws[Exception]
     override def handleStateChanged(state: KeeperState) {
       stateToMeterMap.get(state).foreach(_.mark())
@@ -110,6 +127,8 @@ class KafkaHealthcheck(brokerId: Int,
       fatal("Could not establish session with zookeeper", error)
     }
 
+    def shutdown(): Unit = metricNames.foreach(removeMetric(_))
+
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index f055762..a498781 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -129,6 +129,8 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup {
   val failedFetchRequestRate = newMeter(BrokerTopicStats.FailedFetchRequestsPerSec, "requests", TimeUnit.SECONDS, tags)
   val totalProduceRequestRate = newMeter(BrokerTopicStats.TotalProduceRequestsPerSec, "requests", TimeUnit.SECONDS, tags)
   val totalFetchRequestRate = newMeter(BrokerTopicStats.TotalFetchRequestsPerSec, "requests", TimeUnit.SECONDS, tags)
+  val fetchMessageConversionsRate = newMeter(BrokerTopicStats.FetchMessageConversionsPerSec, "requests", TimeUnit.SECONDS, tags)
+  val produceMessageConversionsRate = newMeter(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests", TimeUnit.SECONDS, tags)
 
   def close() {
     removeMetric(BrokerTopicStats.MessagesInPerSec, tags)
@@ -143,6 +145,8 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup {
     removeMetric(BrokerTopicStats.FailedFetchRequestsPerSec, tags)
     removeMetric(BrokerTopicStats.TotalProduceRequestsPerSec, tags)
     removeMetric(BrokerTopicStats.TotalFetchRequestsPerSec, tags)
+    removeMetric(BrokerTopicStats.FetchMessageConversionsPerSec, tags)
+    removeMetric(BrokerTopicStats.ProduceMessageConversionsPerSec, tags)
   }
 }
 
@@ -157,6 +161,8 @@ object BrokerTopicStats {
   val FailedFetchRequestsPerSec = "FailedFetchRequestsPerSec"
   val TotalProduceRequestsPerSec = "TotalProduceRequestsPerSec"
   val TotalFetchRequestsPerSec = "TotalFetchRequestsPerSec"
+  val FetchMessageConversionsPerSec = "FetchMessageConversionsPerSec"
+  val ProduceMessageConversionsPerSec = "ProduceMessageConversionsPerSec"
   private val valueFactory = (k: String) => new BrokerTopicMetrics(Some(k))
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 88b1d23..f8af7a2 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -293,7 +293,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         shutdownLatch = new CountDownLatch(1)
         startupComplete.set(true)
         isStartingUp.set(false)
-        AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)
+        AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString, metrics)
         info("started")
       }
     }
@@ -333,19 +333,21 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
     chrootOption.foreach { chroot =>
       val zkConnForChrootCreation = config.zkConnect.substring(0, chrootIndex)
-      val zkClientForChrootCreation = ZkUtils(zkConnForChrootCreation,
+      val zkClientForChrootCreation = ZkUtils.withMetrics(zkConnForChrootCreation,
                                               sessionTimeout = config.zkSessionTimeoutMs,
                                               connectionTimeout = config.zkConnectionTimeoutMs,
-                                              secureAclsEnabled)
+                                              secureAclsEnabled,
+                                              time)
       zkClientForChrootCreation.makeSurePersistentPathExists(chroot)
       info(s"Created zookeeper path $chroot")
       zkClientForChrootCreation.close()
     }
 
-    val zkUtils = ZkUtils(config.zkConnect,
+    val zkUtils = ZkUtils.withMetrics(config.zkConnect,
                           sessionTimeout = config.zkSessionTimeoutMs,
                           connectionTimeout = config.zkConnectionTimeoutMs,
-                          secureAclsEnabled)
+                          secureAclsEnabled,
+                          time)
     zkUtils.setupCommonPaths()
     zkUtils
   }
@@ -512,6 +514,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         CoreUtils.swallow(controlledShutdown())
         brokerState.newState(BrokerShuttingDown)
 
+        if (kafkaHealthcheck != null)
+          CoreUtils.swallow(kafkaHealthcheck.shutdown())
+
         if (socketServer != null)
           CoreUtils.swallow(socketServer.shutdown())
         if (requestHandlerPool != null)
@@ -549,7 +554,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
         startupComplete.set(false)
         isShuttingDown.set(false)
-        CoreUtils.swallow(AppInfoParser.unregisterAppInfo(jmxPrefix, config.brokerId.toString))
+        CoreUtils.swallow(AppInfoParser.unregisterAppInfo(jmxPrefix, config.brokerId.toString, metrics))
         shutdownLatch.countDown()
         info("shut down completed")
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 9cc6317..3acb88b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -434,7 +434,8 @@ class ReplicaManager(val config: KafkaConfig,
                     isFromClient: Boolean,
                     entriesPerPartition: Map[TopicPartition, MemoryRecords],
                     responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
-                    delayedProduceLock: Option[Object] = None) {
+                    delayedProduceLock: Option[Object] = None,
+                    processingStatsCallback: Map[TopicPartition, RecordsProcessingStats] => Unit = _ => ()) {
     if (isValidRequiredAcks(requiredAcks)) {
       val sTime = time.milliseconds
       val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
@@ -448,6 +449,8 @@ class ReplicaManager(val config: KafkaConfig,
                   new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime, result.info.logStartOffset)) // response status
       }
 
+      processingStatsCallback(localProduceResults.mapValues(_.info.recordsProcessingStats))
+
       if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
         // create delayed produce operation
         val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 9582c50..755f500 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -17,7 +17,7 @@
 
 package kafka.utils
 
-import java.util.concurrent.CountDownLatch
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import kafka.admin._
 import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
@@ -25,12 +25,16 @@ import kafka.cluster._
 import kafka.common.{KafkaException, NoEpochForPartitionException, TopicAndPartition}
 import kafka.consumer.{ConsumerThreadId, TopicCount}
 import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch, ReassignedPartitionsContext}
+import kafka.metrics.KafkaMetricsGroup
 import kafka.server.ConfigType
 import kafka.utils.ZkUtils._
+
+import com.yammer.metrics.core.MetricName
 import org.I0Itec.zkclient.exception.{ZkBadVersionException, ZkException, ZkMarshallingError, ZkNoNodeException, ZkNodeExistsException}
 import org.I0Itec.zkclient.serialize.ZkSerializer
-import org.I0Itec.zkclient.{ZkClient, ZkConnection, IZkDataListener, IZkChildListener, IZkStateListener}
+import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient, ZkConnection}
 import org.apache.kafka.common.config.ConfigException
+import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.AsyncCallback.{DataCallback, StringCallback}
 import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.data.{ACL, Stat}
@@ -84,6 +88,12 @@ object ZkUtils {
   //            sensitive information that should not be world readable to the Seq
   val SensitiveZkRootPaths = Seq(ConfigUsersPath)
 
+  def withMetrics(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int, isZkSecurityEnabled: Boolean,
+                  time: Time): ZkUtils = {
+    val (zkClient, zkConnection) = createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)
+    new ZkUtils(new ZooKeeperClientMetrics(zkClient, time), zkConnection, isZkSecurityEnabled)
+  }
+
   def apply(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int, isZkSecurityEnabled: Boolean): ZkUtils = {
     val (zkClient, zkConnection) = createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)
     new ZkUtils(zkClient, zkConnection, isZkSecurityEnabled)
@@ -212,7 +222,38 @@ object ZkUtils {
 
 }
 
-class ZkUtils(val zkClient: ZkClient,
+class ZooKeeperClientWrapper(val zkClient: ZkClient) {
+  def apply[T](method: ZkClient => T): T = method(zkClient)
+  def close(): Unit = {
+    if(zkClient != null)
+      zkClient.close()
+  }
+}
+
+class ZooKeeperClientMetrics(zkClient: ZkClient, val time: Time)
+    extends ZooKeeperClientWrapper(zkClient) with KafkaMetricsGroup {
+  val latencyMetric = newHistogram("ZooKeeperRequestLatencyMs")
+
+  override protected def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = {
+    explicitMetricName("kafka.server", "ZooKeeperClientMetrics", name, metricTags)
+  }
+
+  override def apply[T](method: ZkClient => T): T = {
+    val startNs = time.nanoseconds
+    val ret =
+      try method(zkClient)
+      finally latencyMetric.update(TimeUnit.NANOSECONDS.toMillis(time.nanoseconds - startNs))
+    ret
+  }
+
+  override def close(): Unit = {
+    if (latencyMetric != null)
+      removeMetric("ZooKeeperRequestLatencyMs")
+    super.close()
+  }
+}
+
+class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
               val zkConnection: ZkConnection,
               val isSecure: Boolean) extends Logging {
   // These are persistent ZK paths that should exist on kafka broker startup.
@@ -228,8 +269,12 @@ class ZkUtils(val zkClient: ZkClient,
                               ProducerIdBlockPath,
                               LogDirEventNotificationPath)
 
+  /** Present for compatibility */
+  def this(zkClient: ZkClient, zkConnection: ZkConnection, isSecure: Boolean) =
+    this(new ZooKeeperClientWrapper(zkClient), zkConnection, isSecure)
+
   // Visible for testing
-  val zkPath = new ZkPath(zkClient)
+  val zkPath = new ZkPath(zkClientWrap)
 
   import ZkUtils._
 
@@ -238,6 +283,8 @@ class ZkUtils(val zkClient: ZkClient,
 
   def defaultAcls(path: String): java.util.List[ACL] = ZkUtils.defaultAcls(isSecure, path)
 
+  def zkClient: ZkClient = zkClientWrap.zkClient
+
   def getController(): Int = {
     readDataMaybeNull(ControllerPath)._1 match {
       case Some(controller) => KafkaController.parseControllerId(controller)
@@ -388,7 +435,7 @@ class ZkUtils(val zkClient: ZkClient,
                                                       brokerInfo,
                                                       zkConnection.getZookeeper,
                                                       isSecure)
-      zkCheckedEphemeral.create()
+      zkClientWrap(_ => zkCheckedEphemeral.create())
     } catch {
       case _: ZkNodeExistsException =>
         throw new RuntimeException("A broker is already registered on the path " + brokerIdPath
@@ -429,7 +476,7 @@ class ZkUtils(val zkClient: ZkClient,
       acls
     }
 
-    if (!zkClient.exists(path))
+    if (!zkClientWrap(zkClient => zkClient.exists(path)))
       zkPath.createPersistent(path, createParents = true, acl) //won't throw NoNodeException or NodeExistsException
   }
 
@@ -512,7 +559,7 @@ class ZkUtils(val zkClient: ZkClient,
   def updatePersistentPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls) = {
     val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
     try {
-      zkClient.writeData(path, data)
+      zkClientWrap(_.writeData(path, data))
     } catch {
       case _: ZkNoNodeException =>
         createParentPath(path)
@@ -520,7 +567,7 @@ class ZkUtils(val zkClient: ZkClient,
           zkPath.createPersistent(path, data, acl)
         } catch {
           case _: ZkNodeExistsException =>
-            zkClient.writeData(path, data)
+            zkClientWrap(_.writeData(path, data))
         }
     }
   }
@@ -536,7 +583,7 @@ class ZkUtils(val zkClient: ZkClient,
   def conditionalUpdatePersistentPath(path: String, data: String, expectVersion: Int,
     optionalChecker:Option[(ZkUtils, String, String) => (Boolean,Int)] = None): (Boolean, Int) = {
     try {
-      val stat = zkClient.writeDataReturnStat(path, data, expectVersion)
+      val stat = zkClientWrap(_.writeDataReturnStat(path, data, expectVersion))
       debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d"
         .format(path, data, expectVersion, stat.getVersion))
       (true, stat.getVersion)
@@ -563,7 +610,7 @@ class ZkUtils(val zkClient: ZkClient,
    */
   def conditionalUpdatePersistentPathIfExists(path: String, data: String, expectVersion: Int): (Boolean, Int) = {
     try {
-      val stat = zkClient.writeDataReturnStat(path, data, expectVersion)
+      val stat = zkClientWrap(_.writeDataReturnStat(path, data, expectVersion))
       debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d"
         .format(path, data, expectVersion, stat.getVersion))
       (true, stat.getVersion)
@@ -583,7 +630,7 @@ class ZkUtils(val zkClient: ZkClient,
   def updateEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = {
     val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
     try {
-      zkClient.writeData(path, data)
+      zkClientWrap(_.writeData(path, data))
     } catch {
       case _: ZkNoNodeException =>
         createParentPath(path)
@@ -592,7 +639,7 @@ class ZkUtils(val zkClient: ZkClient,
   }
 
   def deletePath(path: String): Boolean = {
-    zkClient.delete(path)
+    zkClientWrap(_.delete(path))
   }
 
   /**
@@ -601,7 +648,7 @@ class ZkUtils(val zkClient: ZkClient,
     */
    def conditionalDeletePath(path: String, expectedVersion: Int): Boolean = {
     try {
-      zkClient.delete(path, expectedVersion)
+      zkClientWrap(_.delete(path, expectedVersion))
       true
     } catch {
       case _: ZkBadVersionException => false
@@ -609,37 +656,38 @@ class ZkUtils(val zkClient: ZkClient,
   }
 
   def deletePathRecursive(path: String) {
-    zkClient.deleteRecursive(path)
+    zkClientWrap(_.deleteRecursive(path))
   }
 
   def subscribeDataChanges(path: String, listener: IZkDataListener): Unit =
-    zkClient.subscribeDataChanges(path, listener)
+    zkClientWrap(_.subscribeDataChanges(path, listener))
 
   def unsubscribeDataChanges(path: String, dataListener: IZkDataListener): Unit =
-    zkClient.unsubscribeDataChanges(path, dataListener)
+    zkClientWrap(_.unsubscribeDataChanges(path, dataListener))
 
   def subscribeStateChanges(listener: IZkStateListener): Unit =
-    zkClient.subscribeStateChanges(listener)
+    zkClientWrap(_.subscribeStateChanges(listener))
 
   def subscribeChildChanges(path: String, listener: IZkChildListener): Option[Seq[String]] =
-    Option(zkClient.subscribeChildChanges(path, listener)).map(_.asScala)
+    Option(zkClientWrap(_.subscribeChildChanges(path, listener))).map(_.asScala)
 
   def unsubscribeChildChanges(path: String, childListener: IZkChildListener): Unit =
-    zkClient.unsubscribeChildChanges(path, childListener)
+    zkClientWrap(_.unsubscribeChildChanges(path, childListener))
 
   def unsubscribeAll(): Unit =
-    zkClient.unsubscribeAll()
+    zkClientWrap(_.unsubscribeAll())
 
   def readData(path: String): (String, Stat) = {
     val stat: Stat = new Stat()
-    val dataStr: String = zkClient.readData(path, stat)
+    val dataStr: String = zkClientWrap(_.readData[String](path, stat))
     (dataStr, stat)
   }
 
   def readDataMaybeNull(path: String): (Option[String], Stat) = {
     val stat = new Stat()
     val dataAndStat = try {
-                        (Some(zkClient.readData(path, stat)), stat)
+                        val dataStr = zkClientWrap(_.readData[String](path, stat))
+                        (Some(dataStr), stat)
                       } catch {
                         case _: ZkNoNodeException =>
                           (None, stat)
@@ -650,18 +698,18 @@ class ZkUtils(val zkClient: ZkClient,
   def readDataAndVersionMaybeNull(path: String): (Option[String], Int) = {
     val stat = new Stat()
     try {
-      val data: String = zkClient.readData(path, stat)
+      val data = zkClientWrap(_.readData[String](path, stat))
       (Option(data), stat.getVersion)
     } catch {
       case _: ZkNoNodeException => (None, stat.getVersion)
     }
   }
 
-  def getChildren(path: String): Seq[String] = zkClient.getChildren(path).asScala
+  def getChildren(path: String): Seq[String] = zkClientWrap(_.getChildren(path)).asScala
 
   def getChildrenParentMayNotExist(path: String): Seq[String] = {
     try {
-      zkClient.getChildren(path).asScala
+      zkClientWrap(_.getChildren(path)).asScala
     } catch {
       case _: ZkNoNodeException => Nil
     }
@@ -671,7 +719,7 @@ class ZkUtils(val zkClient: ZkClient,
    * Check if the given path exists
    */
   def pathExists(path: String): Boolean = {
-    zkClient.exists(path)
+    zkClientWrap(_.exists(path))
   }
 
   def isTopicMarkedForDeletion(topic: String): Boolean = {
@@ -789,9 +837,9 @@ class ZkUtils(val zkClient: ZkClient,
 
   def deletePartition(brokerId: Int, topic: String) {
     val brokerIdPath = BrokerIdsPath + "/" + brokerId
-    zkClient.delete(brokerIdPath)
+    zkClientWrap(_.delete(brokerIdPath))
     val brokerPartTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic + "/" + brokerId
-    zkClient.delete(brokerPartTopicPath)
+    zkClientWrap(_.delete(brokerPartTopicPath))
   }
 
   @deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
@@ -851,7 +899,7 @@ class ZkUtils(val zkClient: ZkClient,
     */
   def getSequenceId(path: String, acls: java.util.List[ACL] = UseDefaultAcls): Int = {
     val acl = if (acls == UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
-    def writeToZk: Int = zkClient.writeDataReturnStat(path, "", -1).getVersion
+    def writeToZk: Int = zkClientWrap(_.writeDataReturnStat(path, "", -1)).getVersion
     try {
       writeToZk
     } catch {
@@ -915,9 +963,7 @@ class ZkUtils(val zkClient: ZkClient,
   }
 
   def close() {
-    if(zkClient != null) {
-      zkClient.close()
-    }
+    zkClientWrap.close()
   }
 }
 
@@ -973,7 +1019,7 @@ class ZKConfig(props: VerifiableProperties) {
   val zkSyncTimeMs = props.getInt(ZkSyncTimeMsProp, 2000)
 }
 
-class ZkPath(client: ZkClient) {
+class ZkPath(clientWrap: ZooKeeperClientWrapper) {
 
   @volatile private var isNamespacePresent: Boolean = false
 
@@ -981,7 +1027,7 @@ class ZkPath(client: ZkClient) {
     if (isNamespacePresent)
       return
 
-    if (!client.exists("/")) {
+    if (!clientWrap(_.exists("/"))) {
       throw new ConfigException("Zookeeper namespace does not exist")
     }
     isNamespacePresent = true
@@ -993,22 +1039,22 @@ class ZkPath(client: ZkClient) {
 
   def createPersistent(path: String, data: Object, acls: java.util.List[ACL]) {
     checkNamespace()
-    client.createPersistent(path, data, acls)
+    clientWrap(_.createPersistent(path, data, acls))
   }
 
   def createPersistent(path: String, createParents: Boolean, acls: java.util.List[ACL]) {
     checkNamespace()
-    client.createPersistent(path, createParents, acls)
+    clientWrap(_.createPersistent(path, createParents, acls))
   }
 
   def createEphemeral(path: String, data: Object, acls: java.util.List[ACL]) {
     checkNamespace()
-    client.createEphemeral(path, data, acls)
+    clientWrap(_.createEphemeral(path, data, acls))
   }
 
   def createPersistentSequential(path: String, data: Object, acls: java.util.List[ACL]): String = {
     checkNamespace()
-    client.createPersistentSequential(path, data, acls)
+    clientWrap(_.createPersistentSequential(path, data, acls))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/integration/kafka/api/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
new file mode 100644
index 0000000..f71a36a
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -0,0 +1,288 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+package kafka.api
+
+import java.util.{Locale, Properties}
+
+import kafka.log.LogConfig
+import kafka.network.RequestMetrics
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.{JaasTestUtils, TestUtils}
+
+import com.yammer.metrics.Metrics
+import com.yammer.metrics.core.{Gauge, Histogram, Meter}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.errors.InvalidTopicException
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.junit.{After, Before, Test}
+import org.junit.Assert._
+
+import scala.collection.JavaConverters._
+
+class MetricsTest extends IntegrationTestHarness with SaslSetup {
+
+  override val producerCount = 1
+  override val consumerCount = 1
+  override val serverCount = 1
+
+  override protected def listenerName = new ListenerName("CLIENT")
+  private val kafkaClientSaslMechanism = "PLAIN"
+  private val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
+  private val kafkaServerJaasEntryName =
+    s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}"
+  this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "false")
+  this.serverConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableDoc, "false")
+  override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+  override protected val serverSaslProperties =
+    Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
+  override protected val clientSaslProperties =
+    Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
+
+  @Before
+  override def setUp(): Unit = {
+    startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, kafkaServerJaasEntryName))
+    super.setUp()
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    super.tearDown()
+    closeSasl()
+  }
+
+  /**
+   * Verifies some of the metrics of producer, consumer as well as server.
+   */
+  @Test
+  def testMetrics(): Unit = {
+    val topic = "topicWithOldMessageFormat"
+    val props = new Properties
+    props.setProperty(LogConfig.MessageFormatVersionProp, "0.9.0")
+    TestUtils.createTopic(this.zkUtils, topic, numPartitions = 1, replicationFactor = 1, this.servers, props)
+    val tp = new TopicPartition(topic, 0)
+
+    // Clear static state
+    RequestMetrics.clearErrorMeters()
+
+    // Produce and consume some records
+    val numRecords = 10
+    val recordSize = 1000
+    val producer = producers.head
+    sendRecords(producer, numRecords, recordSize, tp)
+
+    val consumer = this.consumers.head
+    consumer.assign(List(tp).asJava)
+    consumer.seek(tp, 0)
+    TestUtils.consumeRecords(consumer, numRecords)
+
+    verifyKafkaRateMetricsHaveCumulativeCount()
+    verifyClientVersionMetrics(consumer.metrics, "Consumer")
+    verifyClientVersionMetrics(this.producers.head.metrics, "Producer")
+
+    val server = servers.head
+    verifyBrokerMessageConversionMetrics(server, recordSize)
+    verifyBrokerErrorMetrics(servers.head)
+    verifyBrokerZkMetrics(server, topic)
+
+    generateAuthenticationFailure(tp)
+    verifyBrokerAuthenticationMetrics(server)
+  }
+
+  private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], numRecords: Int,
+      recordSize: Int, tp: TopicPartition) = {
+    val bytes = new Array[Byte](recordSize)
+    (0 until numRecords).map { i =>
+      producer.send(new ProducerRecord(tp.topic, tp.partition, i.toLong, s"key $i".getBytes, bytes))
+    }
+    producer.flush()
+  }
+
+  // Create a producer that fails authentication to verify authentication failure metrics
+  private def generateAuthenticationFailure(tp: TopicPartition): Unit = {
+    val producerProps = new Properties()
+    val saslProps = new Properties()
+     // Temporary limit to reduce blocking before KIP-152 client-side changes are merged
+    saslProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000")
+    saslProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000")
+    saslProps.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256")
+    // Use acks=0 to verify error metric when connection is closed without a response
+    saslProps.put(ProducerConfig.ACKS_CONFIG, "0")
+    val producer = TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol,
+        trustStoreFile = trustStoreFile, saslProperties = Some(saslProps), props = Some(producerProps))
+
+    try {
+      producer.send(new ProducerRecord(tp.topic, tp.partition, "key".getBytes, "value".getBytes)).get
+    } catch {
+      case _: Exception => // expected exception
+    } finally {
+      producer.close()
+    }
+  }
+
+  private def verifyKafkaRateMetricsHaveCumulativeCount(): Unit =  {
+
+    def exists(name: String, rateMetricName: MetricName, allMetricNames: Set[MetricName]): Boolean = {
+      allMetricNames.contains(new MetricName(name, rateMetricName.group, "", rateMetricName.tags))
+    }
+
+    def verify(rateMetricName: MetricName, allMetricNames: Set[MetricName]): Unit = {
+      val name = rateMetricName.name
+      val totalExists = exists(name.replace("-rate", "-total"), rateMetricName, allMetricNames)
+      val totalTimeExists = exists(name.replace("-rate", "-time"), rateMetricName, allMetricNames)
+      assertTrue(s"No cumulative count/time metric for rate metric $rateMetricName",
+          totalExists || totalTimeExists)
+    }
+
+    val consumer = this.consumers.head
+    val consumerMetricNames = consumer.metrics.keySet.asScala.toSet
+    consumerMetricNames.filter(_.name.endsWith("-rate"))
+        .foreach(verify(_, consumerMetricNames))
+
+    val producer = this.producers.head
+    val producerMetricNames = producer.metrics.keySet.asScala.toSet
+    val producerExclusions = Set("compression-rate") // compression-rate is an Average metric, not Rate
+    producerMetricNames.filter(_.name.endsWith("-rate"))
+        .filterNot(metricName => producerExclusions.contains(metricName.name))
+        .foreach(verify(_, producerMetricNames))
+
+    // Check a couple of metrics of consumer and producer to ensure that values are set
+    verifyKafkaMetricRecorded("records-consumed-rate", consumer.metrics, "Consumer")
+    verifyKafkaMetricRecorded("records-consumed-total", consumer.metrics, "Consumer")
+    verifyKafkaMetricRecorded("record-send-rate", producer.metrics, "Producer")
+    verifyKafkaMetricRecorded("record-send-total", producer.metrics, "Producer")
+  }
+
+  private def verifyClientVersionMetrics(metrics: java.util.Map[MetricName, _ <: Metric], entity: String): Unit = {
+    Seq("commit-id", "version").foreach { name =>
+      verifyKafkaMetric(name, metrics, entity) { matchingMetrics =>
+        assertEquals(1, matchingMetrics.size)
+        val metric = matchingMetrics.head
+        val value = metric.metricValue
+        assertNotNull(s"$entity metric not recorded $name", value)
+        assertNotNull(s"$entity metric $name should be a non-empty String",
+            value.isInstanceOf[String] && !value.asInstanceOf[String].isEmpty)
+        assertTrue("Client-id not specified", metric.metricName.tags.containsKey("client-id"))
+      }
+    }
+  }
+
+  private def verifyBrokerAuthenticationMetrics(server: KafkaServer): Unit = {
+    val metrics = server.metrics.metrics
+    TestUtils.waitUntilTrue(() =>
+      maxKafkaMetricValue("failed-authentication-total", metrics, "Broker", Some("socket-server-metrics")) > 0,
+      "failed-authentication-total not updated")
+    verifyKafkaMetricRecorded("successful-authentication-rate", metrics, "Broker", Some("socket-server-metrics"))
+    verifyKafkaMetricRecorded("successful-authentication-total", metrics, "Broker", Some("socket-server-metrics"))
+    verifyKafkaMetricRecorded("failed-authentication-rate", metrics, "Broker", Some("socket-server-metrics"))
+    verifyKafkaMetricRecorded("failed-authentication-total", metrics, "Broker", Some("socket-server-metrics"))
+  }
+
+  private def verifyBrokerMessageConversionMetrics(server: KafkaServer, recordSize: Int): Unit = {
+    val requestMetricsPrefix = "kafka.network:type=RequestMetrics"
+    val requestBytes = verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Produce")
+    val tempBytes = verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Produce")
+    assertTrue(s"Unexpected temporary memory size requestBytes $requestBytes tempBytes $tempBytes",
+        tempBytes >= recordSize)
+
+    verifyYammerMetricRecorded(s"kafka.server:type=BrokerTopicMetrics,name=ProduceMessageConversionsPerSec")
+    verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=MessageConversionsTimeMs,request=Produce", value => value > 0.0)
+
+    verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Fetch")
+    // Temporary size for fetch should be zero after KAFKA-5968 is fixed
+    verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Fetch", value => value >= 0.0)
+
+     // request size recorded for all request types, check one
+    verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Metadata")
+  }
+
+  private def verifyBrokerZkMetrics(server: KafkaServer, topic: String): Unit = {
+    // Latency is rounded to milliseconds, so we may need to retry some operations to get latency > 0.
+    val (_, recorded) = TestUtils.computeUntilTrue({
+      servers.head.zkUtils.getLeaderAndIsrForPartition(topic, 0)
+      yammerMetricValue("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs").asInstanceOf[Double]
+    })(latency => latency > 0.0)
+    assertTrue("ZooKeeper latency not recorded", recorded)
+
+    assertEquals(s"Unexpected ZK state ${server.zkUtils.zkConnection.getZookeeperState}",
+        "CONNECTED", yammerMetricValue("SessionState"))
+  }
+
+  private def verifyBrokerErrorMetrics(server: KafkaServer): Unit = {
+
+    def errorMetricCount = Metrics.defaultRegistry.allMetrics.keySet.asScala.filter(_.getName == "ErrorsPerSec").size
+
+    val startErrorMetricCount = errorMetricCount
+    val errorMetricPrefix = "kafka.network:type=RequestMetrics,name=ErrorsPerSec"
+    verifyYammerMetricRecorded(s"$errorMetricPrefix,request=Metadata,error=NONE")
+
+    try {
+      consumers.head.partitionsFor("12{}!")
+    } catch {
+      case _: InvalidTopicException => // expected
+    }
+    verifyYammerMetricRecorded(s"$errorMetricPrefix,request=Metadata,error=INVALID_TOPIC_EXCEPTION")
+
+    // Check that error metrics are registered dynamically
+    val currentErrorMetricCount = errorMetricCount
+    assertEquals(startErrorMetricCount + 1, currentErrorMetricCount)
+    assertTrue(s"Too many error metrics $currentErrorMetricCount" , currentErrorMetricCount < 10)
+
+    // Verify that error metric is updated with producer acks=0 when no response is sent
+    sendRecords(producers.head, 1, 100, new TopicPartition("non-existent", 0))
+    verifyYammerMetricRecorded(s"$errorMetricPrefix,request=Metadata,error=LEADER_NOT_AVAILABLE")
+  }
+
+  private def verifyKafkaMetric[T](name: String, metrics: java.util.Map[MetricName, _ <: Metric], entity: String,
+      group: Option[String] = None)(verify: Iterable[Metric] => T) : T = {
+    val matchingMetrics = metrics.asScala.filter {
+      case (metricName, _) => metricName.name == name && group.forall(_ == metricName.group)
+    }
+    assertTrue(s"Metric not found $name", matchingMetrics.size > 0)
+    verify(matchingMetrics.values)
+  }
+
+  private def maxKafkaMetricValue(name: String, metrics: java.util.Map[MetricName, _ <: Metric], entity: String,
+      group: Option[String]): Double = {
+    // Use max value of all matching metrics since Selector metrics are recorded for each Processor
+    verifyKafkaMetric(name, metrics, entity, group) { matchingMetrics =>
+      matchingMetrics.foldLeft(0.0)((max, metric) => Math.max(max, metric.value))
+    }
+  }
+
+  private def verifyKafkaMetricRecorded(name: String, metrics: java.util.Map[MetricName, _ <: Metric], entity: String,
+      group: Option[String] = None): Unit = {
+    val value = maxKafkaMetricValue(name, metrics, entity, group)
+    assertTrue(s"$entity metric not recorded correctly for $name value $value", value > 0.0)
+  }
+
+  private def yammerMetricValue(name: String): Any = {
+    val allMetrics = Metrics.defaultRegistry.allMetrics.asScala
+    val (_, metric) = allMetrics.find { case (n, _) => n.getMBeanName.endsWith(name) }
+      .getOrElse(fail(s"Unable to find broker metric $name: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}"))
+    metric match {
+      case m: Meter => m.count.toDouble
+      case m: Histogram => m.max
+      case m: Gauge[_] => m.value
+      case m => fail(s"Unexpected broker metric of class ${m.getClass}")
+    }
+  }
+
+  private def verifyYammerMetricRecorded(name: String, verify: Double => Boolean = d => d > 0): Double = {
+    val metricValue = yammerMetricValue(name).asInstanceOf[Double]
+    assertTrue(s"Broker metric not recorded correctly for $name value $metricValue", verify(metricValue))
+    metricValue
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 604bbf3..c1b26f1 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -1508,55 +1508,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     servers.foreach(assertNoExemptRequestMetric(_))
   }
 
-  // Rate metrics of both Producer and Consumer are verified by this test
-  @Test
-  def testRateMetricsHaveCumulativeCount() {
-    val numRecords = 100
-    sendRecords(numRecords)
-
-    val consumer = this.consumers.head
-    consumer.assign(List(tp).asJava)
-    consumer.seek(tp, 0)
-    consumeAndVerifyRecords(consumer, numRecords = numRecords, startingOffset = 0)
-
-    def exists(name: String, rateMetricName: MetricName, allMetricNames: Set[MetricName]): Boolean = {
-      allMetricNames.contains(new MetricName(name, rateMetricName.group, "", rateMetricName.tags))
-    }
-
-    def verify(rateMetricName: MetricName, allMetricNames: Set[MetricName]): Unit = {
-      val name = rateMetricName.name
-      val totalExists = exists(name.replace("-rate", "-total"), rateMetricName, allMetricNames)
-      val totalTimeExists = exists(name.replace("-rate", "-time"), rateMetricName, allMetricNames)
-      assertTrue(s"No cumulative count/time metric for rate metric $rateMetricName",
-          totalExists || totalTimeExists)
-    }
-
-    val consumerMetricNames = consumer.metrics.keySet.asScala.toSet
-    consumerMetricNames.filter(_.name.endsWith("-rate"))
-        .foreach(verify(_, consumerMetricNames))
-
-    val producer = this.producers.head
-    val producerMetricNames = producer.metrics.keySet.asScala.toSet
-    val producerExclusions = Set("compression-rate") // compression-rate is an Average metric, not Rate
-    producerMetricNames.filter(_.name.endsWith("-rate"))
-        .filterNot(metricName => producerExclusions.contains(metricName.name))
-        .foreach(verify(_, producerMetricNames))
-
-    def verifyMetric(name: String, metrics: java.util.Map[MetricName, _ <: Metric], entity: String): Unit = {
-      val entry = metrics.asScala.find { case (metricName, _) => metricName.name == name }
-      assertTrue(s"$entity metric not defined $name", entry.nonEmpty)
-      entry.foreach { case (metricName, metric) =>
-        assertTrue(s"$entity metric not recorded $metricName", metric.value > 0.0)
-      }
-    }
-
-    // Check a couple of metrics of consumer and producer to ensure that values are set
-    verifyMetric("records-consumed-rate", consumer.metrics, "Consumer")
-    verifyMetric("records-consumed-total", consumer.metrics, "Consumer")
-    verifyMetric("record-send-rate", producer.metrics, "Producer")
-    verifyMetric("record-send-total", producer.metrics, "Producer")
-  }
-
   def runMultiConsumerSessionTimeoutTest(closeConsumer: Boolean): Unit = {
     // use consumers defined in this class plus one additional consumer
     // Use topic defined in this class + one additional topic

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 2ffd828..95abb33 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -1351,7 +1351,8 @@ class GroupCoordinatorTest extends JUnitSuite {
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
-      EasyMock.anyObject().asInstanceOf[Option[Object]])).andAnswer(new IAnswer[Unit] {
+      EasyMock.anyObject().asInstanceOf[Option[Object]],
+      EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
           new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)
@@ -1434,7 +1435,8 @@ class GroupCoordinatorTest extends JUnitSuite {
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
-      EasyMock.anyObject().asInstanceOf[Option[Object]])
+      EasyMock.anyObject().asInstanceOf[Option[Object]],
+      EasyMock.anyObject())
     ).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
           Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
@@ -1463,7 +1465,8 @@ class GroupCoordinatorTest extends JUnitSuite {
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
-      EasyMock.anyObject().asInstanceOf[Option[Object]])
+      EasyMock.anyObject().asInstanceOf[Option[Object]],
+      EasyMock.anyObject())
     ).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId)) ->

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index a2f5f92..0def9ce 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -1306,7 +1306,8 @@ class GroupMetadataManagerTest {
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
-      EasyMock.anyObject().asInstanceOf[Option[Object]])
+      EasyMock.anyObject().asInstanceOf[Option[Object]],
+      EasyMock.anyObject())
     )
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
     capturedArgument
@@ -1320,7 +1321,8 @@ class GroupMetadataManagerTest {
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
-      EasyMock.anyObject().asInstanceOf[Option[Object]])
+      EasyMock.anyObject().asInstanceOf[Option[Object]],
+      EasyMock.anyObject())
     ).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(groupTopicPartition ->

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 49c8e6a..ed1636c 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -498,7 +498,8 @@ class TransactionStateManagerTest {
           EasyMock.eq(false),
           EasyMock.eq(recordsByPartition),
           EasyMock.capture(capturedArgument),
-          EasyMock.eq(None)
+          EasyMock.eq(None),
+          EasyMock.anyObject()
         )).andAnswer(new IAnswer[Unit] {
           override def answer(): Unit = {
             capturedArgument.getValue.apply(
@@ -598,6 +599,7 @@ class TransactionStateManagerTest {
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
+      EasyMock.anyObject(),
       EasyMock.anyObject())
     ).andAnswer(new IAnswer[Unit] {
         override def answer(): Unit = capturedArgument.getValue.apply(

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 96f7bfc..b64371e 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -22,6 +22,7 @@ import kafka.common.LongRef
 import kafka.message.{CompressionCodec, DefaultCompressionCodec, GZIPCompressionCodec, NoCompressionCodec, SnappyCompressionCodec}
 import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedForMessageFormatException}
 import org.apache.kafka.common.record._
+import org.apache.kafka.common.utils.Time
 import org.apache.kafka.test.TestUtils
 import org.junit.Assert._
 import org.junit.Test
@@ -30,6 +31,8 @@ import scala.collection.JavaConverters._
 
 class LogValidatorTest {
 
+  val time = Time.SYSTEM
+
   @Test
   def testLogAppendTimeNonCompressedV1() {
     checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1)
@@ -41,6 +44,7 @@ class LogValidatorTest {
     val records = createRecords(magicValue = magic, timestamp = 1234L, codec = CompressionType.NONE)
     val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(0),
+      time= time,
       now = now,
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
@@ -56,6 +60,8 @@ class LogValidatorTest {
     assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp)
     assertEquals(s"The offset of max timestamp should be 0", 0, validatedResults.shallowOffsetOfMaxTimestamp)
     assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
+
+    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 0, records, compressed = false)
   }
 
   def testLogAppendTimeNonCompressedV2() {
@@ -74,6 +80,7 @@ class LogValidatorTest {
     val validatedResults = LogValidator.validateMessagesAndAssignOffsets(
       records,
       offsetCounter = new LongRef(0),
+      time = time,
       now = now,
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -92,6 +99,9 @@ class LogValidatorTest {
     assertEquals(s"The offset of max timestamp should be ${records.records.asScala.size - 1}",
       records.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
     assertTrue("Message size may have been changed", validatedResults.messageSizeMaybeChanged)
+
+    val stats = validatedResults.recordsProcessingStats
+    verifyRecordsProcessingStats(stats, 3, records, compressed = true)
   }
 
   @Test
@@ -111,6 +121,7 @@ class LogValidatorTest {
     val validatedResults = LogValidator.validateMessagesAndAssignOffsets(
       records,
       offsetCounter = new LongRef(0),
+      time = time,
       now = now,
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -130,6 +141,8 @@ class LogValidatorTest {
     assertEquals(s"The offset of max timestamp should be ${records.records.asScala.size - 1}",
       records.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
     assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
+
+    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 0, records, compressed = true)
   }
 
   @Test
@@ -161,6 +174,7 @@ class LogValidatorTest {
 
     val validatingResults = LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(0),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
@@ -192,6 +206,8 @@ class LogValidatorTest {
     assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatingResults.maxTimestamp)
     assertEquals(s"Offset of max timestamp should be 1", 1, validatingResults.shallowOffsetOfMaxTimestamp)
     assertFalse("Message size should not have been changed", validatingResults.messageSizeMaybeChanged)
+
+    verifyRecordsProcessingStats(validatingResults.recordsProcessingStats, 0, records, compressed = false)
   }
 
   @Test
@@ -223,6 +239,7 @@ class LogValidatorTest {
 
     val validatingResults = LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(0),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = GZIPCompressionCodec,
@@ -253,6 +270,8 @@ class LogValidatorTest {
     assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatingResults.maxTimestamp)
     assertEquals("Offset of max timestamp should be 2", 2, validatingResults.shallowOffsetOfMaxTimestamp)
     assertTrue("Message size should have been changed", validatingResults.messageSizeMaybeChanged)
+
+    verifyRecordsProcessingStats(validatingResults.recordsProcessingStats, 3, records, compressed = true)
   }
 
   @Test
@@ -269,6 +288,7 @@ class LogValidatorTest {
     val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
     val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(0),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -293,6 +313,8 @@ class LogValidatorTest {
     assertEquals(s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}",
       validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
     assertTrue("Message size should have been changed", validatedResults.messageSizeMaybeChanged)
+
+    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 3, records, compressed = true)
   }
 
   @Test
@@ -306,6 +328,7 @@ class LogValidatorTest {
     val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, codec = CompressionType.GZIP, timestamp = timestamp)
     val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(0),
+      time = time,
       now = timestamp,
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -330,6 +353,8 @@ class LogValidatorTest {
     assertEquals(s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}",
       validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
     assertTrue("Message size should have been changed", validatedResults.messageSizeMaybeChanged)
+
+    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 3, records, compressed = true)
   }
 
   @Test
@@ -356,6 +381,7 @@ class LogValidatorTest {
 
     val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(0),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -387,6 +413,8 @@ class LogValidatorTest {
     assertEquals(s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}",
       validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
     assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
+
+    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 0, records, compressed = true)
   }
 
   @Test
@@ -402,6 +430,7 @@ class LogValidatorTest {
     LogValidator.validateMessagesAndAssignOffsets(
       records,
       offsetCounter = new LongRef(0),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
@@ -421,6 +450,7 @@ class LogValidatorTest {
     LogValidator.validateMessagesAndAssignOffsets(
       records,
       offsetCounter = new LongRef(0),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
@@ -440,6 +470,7 @@ class LogValidatorTest {
     LogValidator.validateMessagesAndAssignOffsets(
       records,
       offsetCounter = new LongRef(0),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -459,6 +490,7 @@ class LogValidatorTest {
     LogValidator.validateMessagesAndAssignOffsets(
       records,
       offsetCounter = new LongRef(0),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -477,6 +509,7 @@ class LogValidatorTest {
     checkOffsets(records, 0)
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
@@ -495,6 +528,7 @@ class LogValidatorTest {
     checkOffsets(records, 0)
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -514,6 +548,7 @@ class LogValidatorTest {
     checkOffsets(records, 0)
     val messageWithOffset = LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
@@ -534,6 +569,7 @@ class LogValidatorTest {
     checkOffsets(records, 0)
     val messageWithOffset = LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
@@ -555,6 +591,7 @@ class LogValidatorTest {
     val compressedMessagesWithOffset = LogValidator.validateMessagesAndAssignOffsets(
       records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -576,6 +613,7 @@ class LogValidatorTest {
     val compressedMessagesWithOffset = LogValidator.validateMessagesAndAssignOffsets(
       records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -595,6 +633,7 @@ class LogValidatorTest {
     val offset = 1234567
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
@@ -613,6 +652,7 @@ class LogValidatorTest {
     val offset = 1234567
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
@@ -631,6 +671,7 @@ class LogValidatorTest {
     checkOffsets(records, 0)
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -649,6 +690,7 @@ class LogValidatorTest {
     checkOffsets(records, 0)
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -667,6 +709,7 @@ class LogValidatorTest {
     val records = MemoryRecords.withEndTransactionMarker(23423L, 5, endTxnMarker)
     LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
@@ -685,6 +728,7 @@ class LogValidatorTest {
     val records = MemoryRecords.withEndTransactionMarker(23423L, 5, endTxnMarker)
     val result = LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = SnappyCompressionCodec,
@@ -708,6 +752,7 @@ class LogValidatorTest {
     checkOffsets(records, 0)
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
@@ -727,6 +772,7 @@ class LogValidatorTest {
     checkOffsets(records, 0)
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -745,6 +791,7 @@ class LogValidatorTest {
     val offset = 1234567
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
@@ -763,6 +810,7 @@ class LogValidatorTest {
     checkOffsets(records, 0)
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -782,6 +830,7 @@ class LogValidatorTest {
     checkOffsets(records, 0)
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
@@ -801,6 +850,7 @@ class LogValidatorTest {
     checkOffsets(records, 0)
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -822,6 +872,7 @@ class LogValidatorTest {
       new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes), new SimpleRecord("beautiful".getBytes))
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -843,6 +894,7 @@ class LogValidatorTest {
       new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes), new SimpleRecord("beautiful".getBytes))
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -862,6 +914,7 @@ class LogValidatorTest {
     checkOffsets(records, 0)
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
@@ -881,6 +934,7 @@ class LogValidatorTest {
     checkOffsets(records, 0)
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -898,6 +952,7 @@ class LogValidatorTest {
     val records = recordsWithInvalidInnerMagic(offset)
     LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = SnappyCompressionCodec,
       targetCodec = SnappyCompressionCodec,
@@ -936,6 +991,7 @@ class LogValidatorTest {
     val records = MemoryRecords.readableRecords(buffer)
     LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = sourceCodec,
       targetCodec = targetCodec,
@@ -1010,4 +1066,19 @@ class LogValidatorTest {
     }
   }
 
+  def verifyRecordsProcessingStats(stats: RecordsProcessingStats, convertedCount: Int,
+            records: MemoryRecords, compressed: Boolean): Unit = {
+    assertNotNull("Records processing info is null", stats)
+    assertEquals(convertedCount, stats.conversionCount)
+    if (stats.conversionCount > 0)
+      assertTrue(s"Conversion time not recorded $stats", stats.conversionTimeNanos > 0)
+    val originalSize = records.sizeInBytes
+    val tempBytes = stats.temporaryMemoryBytes
+    if (convertedCount > 0)
+      assertTrue(s"Temp bytes too small, orig=$originalSize actual=$tempBytes", tempBytes > originalSize)
+    else if (compressed)
+      assertTrue("Temp bytes not updated", tempBytes > 0)
+    else
+      assertEquals(0, tempBytes)
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 8b611f2..077950d 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -519,7 +519,7 @@ class SocketServerTest extends JUnitSuite {
       val channel = overrideServer.requestChannel
       val request = receiveRequest(channel)
 
-      val requestMetrics = RequestMetrics.metricsMap(request.header.apiKey.name)
+      val requestMetrics = RequestMetrics(request.header.apiKey.name)
       def totalTimeHistCount(): Long = requestMetrics.totalTimeHist.count
       val expectedTotalTimeCount = totalTimeHistCount() + 1
 
@@ -561,7 +561,7 @@ class SocketServerTest extends JUnitSuite {
       TestUtils.waitUntilTrue(() => overrideServer.processor(request.processor).channel(request.context.connectionId).isEmpty,
         s"Idle connection `${request.context.connectionId}` was not closed by selector")
 
-      val requestMetrics = RequestMetrics.metricsMap(request.header.apiKey.name)
+      val requestMetrics = RequestMetrics(request.header.apiKey.name)
       def totalTimeHistCount(): Long = requestMetrics.totalTimeHist.count
       val expectedTotalTimeCount = totalTimeHistCount() + 1
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index deea586..fde2ae1 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -179,6 +179,7 @@ class KafkaApisTest {
       EasyMock.eq(false),
       EasyMock.anyObject(),
       EasyMock.capture(responseCallback),
+      EasyMock.anyObject(),
       EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
       override def answer(): Unit = {
         responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE)))
@@ -217,6 +218,7 @@ class KafkaApisTest {
       EasyMock.eq(false),
       EasyMock.anyObject(),
       EasyMock.capture(responseCallback),
+      EasyMock.anyObject(),
       EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
       override def answer(): Unit = {
         responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE)))
@@ -247,6 +249,7 @@ class KafkaApisTest {
       EasyMock.eq(false),
       EasyMock.anyObject(),
       EasyMock.anyObject(),
+      EasyMock.anyObject(),
       EasyMock.anyObject()))
 
     EasyMock.replay(replicaManager)

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 344ef2e..1806cb0 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -68,6 +68,12 @@
     <li>The Java clients and tools now accept any string as a client-id.</li>
     <li>The deprecated tool <code>kafka-consumer-offset-checker.sh</code> has been removed. Use <code>kafka-consumer-groups.sh</code> to get consumer group details.</li>
     <li>SimpleAclAuthorizer now logs access denials to the authorizer log by default.</li>
+    <li>The <code>app-info</code> mbean registered with JMX to provide version and commit id will be deprecated and replaced with
+        metrics providing these attributes.</li>
+    <li>Kafka metrics may now contain non-numeric values. <code>org.apache.kafka.common.Metric#value()</code> has been deprecated and
+        may throw an <code>IllegalStateException</code> when iterating over metrics of <code>KafkaProducer/KafkaConsumer/KafkaAdminClient</code>.
+        <code>org.apache.kafka.common.Metric#metricValue()</code> can be used to safely iterate over any metric value.</code>
+
 </ul>
 
 <h5><a id="upgrade_100_new_protocols" href="#upgrade_100_new_protocols">New Protocol Versions</a></h5>

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
index e02bbb0..0e5d130 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
@@ -32,7 +32,7 @@ public class ToolsUtils {
     public static void printMetrics(Map<MetricName, ? extends Metric> metrics) {
         if (metrics != null && !metrics.isEmpty()) {
             int maxLengthOfDisplayName = 0;
-            TreeMap<String, Double> sortedMetrics = new TreeMap<>(new Comparator<String>() {
+            TreeMap<String, Object> sortedMetrics = new TreeMap<>(new Comparator<String>() {
                 @Override
                 public int compare(String o1, String o2) {
                     return o1.compareTo(o2);
@@ -42,12 +42,18 @@ public class ToolsUtils {
                 MetricName mName = metric.metricName();
                 String mergedName = mName.group() + ":" + mName.name() + ":" + mName.tags();
                 maxLengthOfDisplayName = maxLengthOfDisplayName < mergedName.length() ? mergedName.length() : maxLengthOfDisplayName;
-                sortedMetrics.put(mergedName, metric.value());
+                sortedMetrics.put(mergedName, metric.metricValue());
             }
-            String outputFormat = "%-" + maxLengthOfDisplayName + "s : %.3f";
+            String doubleOutputFormat = "%-" + maxLengthOfDisplayName + "s : %.3f";
+            String defaultOutputFormat = "%-" + maxLengthOfDisplayName + "s : %s";
             System.out.println(String.format("\n%-" + maxLengthOfDisplayName + "s   %s", "Metric Name", "Value"));
 
-            for (Map.Entry<String, Double> entry : sortedMetrics.entrySet()) {
+            for (Map.Entry<String, Object> entry : sortedMetrics.entrySet()) {
+                String outputFormat;
+                if (entry.getValue() instanceof Double)
+                    outputFormat = doubleOutputFormat;
+                else
+                    outputFormat = defaultOutputFormat;
                 System.out.println(String.format(outputFormat, entry.getKey(), entry.getValue()));
             }
         }


[3/3] kafka git commit: KAFKA-5746; Add new metrics to support health checks (KIP-188)

Posted by ij...@apache.org.
KAFKA-5746; Add new metrics to support health checks (KIP-188)

Adds new metrics to support health checks:
1. Error rates for each request type, per-error code
2. Request size and temporary memory size
3. Message conversion rate and time
4. Successful and failed authentication rates
5. ZooKeeper latency and status
6. Client version

Author: Rajini Sivaram <ra...@googlemail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #3705 from rajinisivaram/KAFKA-5746-new-metrics


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/021d8a8e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/021d8a8e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/021d8a8e

Branch: refs/heads/trunk
Commit: 021d8a8e9698dce454e0e801092460b98f0a8a4d
Parents: dd6347a
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Thu Sep 28 21:58:47 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Sep 28 21:58:59 2017 +0100

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   4 +
 checkstyle/suppressions.xml                     |   2 +-
 .../kafka/clients/admin/KafkaAdminClient.java   |   4 +-
 .../kafka/clients/consumer/KafkaConsumer.java   |   4 +-
 .../kafka/clients/producer/KafkaProducer.java   |   4 +-
 .../clients/producer/internals/Sender.java      |   2 +-
 .../java/org/apache/kafka/common/Metric.java    |  12 +-
 .../org/apache/kafka/common/metrics/Gauge.java  |  31 ++
 .../kafka/common/metrics/JmxReporter.java       |   2 +-
 .../kafka/common/metrics/KafkaMetric.java       |  40 ++-
 .../apache/kafka/common/metrics/Measurable.java |   4 +-
 .../common/metrics/MetricValueProvider.java     |  28 ++
 .../apache/kafka/common/metrics/Metrics.java    |  36 ++-
 .../org/apache/kafka/common/metrics/Sensor.java |   2 +-
 .../apache/kafka/common/network/Selector.java   |  33 ++-
 .../kafka/common/record/AbstractRecords.java    |  46 ++-
 .../kafka/common/record/ConvertedRecords.java   |  36 +++
 .../apache/kafka/common/record/FileRecords.java |  17 +-
 .../kafka/common/record/MemoryRecords.java      |   5 +-
 .../common/record/MemoryRecordsBuilder.java     |  40 +--
 .../org/apache/kafka/common/record/Records.java |   9 +-
 .../common/record/RecordsProcessingStats.java   |  60 ++++
 .../kafka/common/requests/AbstractRequest.java  |  14 +
 .../kafka/common/requests/AbstractResponse.java |  29 ++
 .../requests/AddOffsetsToTxnResponse.java       |   6 +
 .../requests/AddPartitionsToTxnResponse.java    |   5 +
 .../common/requests/AlterConfigsResponse.java   |   6 +
 .../requests/AlterReplicaDirResponse.java       |   5 +
 .../common/requests/ApiVersionsResponse.java    |   5 +
 .../requests/ControlledShutdownResponse.java    |   6 +
 .../common/requests/CreateAclsResponse.java     |  11 +
 .../requests/CreatePartitionsRequest.java       |   2 +-
 .../requests/CreatePartitionsResponse.java      |   6 +
 .../common/requests/CreateTopicsResponse.java   |   6 +
 .../common/requests/DeleteAclsResponse.java     |  11 +
 .../common/requests/DeleteRecordsResponse.java  |   8 +
 .../common/requests/DeleteTopicsResponse.java   |   5 +
 .../common/requests/DescribeAclsResponse.java   |   6 +
 .../requests/DescribeConfigsResponse.java       |   9 +
 .../common/requests/DescribeGroupsResponse.java |   7 +
 .../requests/DescribeLogDirsResponse.java       |   8 +
 .../kafka/common/requests/EndTxnResponse.java   |   6 +
 .../kafka/common/requests/FetchResponse.java    |   9 +
 .../requests/FindCoordinatorResponse.java       |   6 +
 .../common/requests/HeartbeatResponse.java      |   6 +
 .../common/requests/InitProducerIdResponse.java |   6 +
 .../common/requests/JoinGroupResponse.java      |   5 +
 .../common/requests/LeaderAndIsrResponse.java   |   5 +
 .../common/requests/LeaveGroupResponse.java     |   6 +
 .../common/requests/ListGroupsResponse.java     |   6 +
 .../common/requests/ListOffsetResponse.java     |   8 +
 .../kafka/common/requests/MetadataResponse.java |   8 +
 .../common/requests/OffsetCommitResponse.java   |   5 +
 .../common/requests/OffsetFetchResponse.java    |   5 +
 .../requests/OffsetsForLeaderEpochResponse.java |   8 +
 .../kafka/common/requests/ProduceRequest.java   |   7 +
 .../kafka/common/requests/ProduceResponse.java  |   8 +
 .../requests/SaslAuthenticateResponse.java      |   6 +
 .../common/requests/SaslHandshakeResponse.java  |   6 +
 .../common/requests/StopReplicaResponse.java    |   5 +
 .../common/requests/SyncGroupResponse.java      |   6 +
 .../requests/TxnOffsetCommitResponse.java       |   5 +
 .../common/requests/UpdateMetadataResponse.java |   6 +
 .../requests/WriteTxnMarkersResponse.java       |  10 +
 .../authenticator/SaslServerAuthenticator.java  |  13 +-
 .../kafka/common/utils/AppInfoParser.java       |  43 ++-
 .../clients/consumer/internals/FetcherTest.java |   1 +
 .../clients/producer/internals/SenderTest.java  |   3 +
 .../kafka/common/metrics/MetricsTest.java       |   1 +
 .../common/metrics/stats/FrequenciesTest.java   |   3 +-
 .../kafka/common/network/NioEchoServer.java     |  51 +++-
 .../kafka/common/network/SelectorTest.java      |   2 +-
 .../common/network/SslTransportLayerTest.java   |   7 +
 .../kafka/common/record/FileRecordsTest.java    |  12 +-
 .../common/record/MemoryRecordsBuilderTest.java |  43 ++-
 .../authenticator/SaslAuthenticatorTest.java    |  11 +
 .../SaslServerAuthenticatorTest.java            |   5 +-
 .../kafka/connect/runtime/ConnectMetrics.java   |   4 +-
 .../runtime/distributed/WorkerGroupMember.java  |   4 +-
 .../main/scala/kafka/admin/ConfigCommand.scala  |   2 +-
 .../kafka/controller/KafkaController.scala      |   4 +-
 core/src/main/scala/kafka/log/Log.scala         |  12 +-
 .../src/main/scala/kafka/log/LogValidator.scala |  52 +++-
 .../scala/kafka/metrics/KafkaMetricsGroup.scala |  15 +-
 .../scala/kafka/network/RequestChannel.scala    |  97 ++++++-
 .../main/scala/kafka/server/AdminManager.scala  |   2 +-
 .../scala/kafka/server/ClientQuotaManager.scala |   2 -
 .../src/main/scala/kafka/server/KafkaApis.scala |  49 +++-
 .../scala/kafka/server/KafkaHealthcheck.scala   |  21 +-
 .../kafka/server/KafkaRequestHandler.scala      |   6 +
 .../main/scala/kafka/server/KafkaServer.scala   |  17 +-
 .../scala/kafka/server/ReplicaManager.scala     |   5 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   | 122 +++++---
 .../integration/kafka/api/MetricsTest.scala     | 288 +++++++++++++++++++
 .../kafka/api/PlaintextConsumerTest.scala       |  49 ----
 .../group/GroupCoordinatorTest.scala            |   9 +-
 .../group/GroupMetadataManagerTest.scala        |   6 +-
 .../TransactionStateManagerTest.scala           |   4 +-
 .../scala/unit/kafka/log/LogValidatorTest.scala |  71 +++++
 .../unit/kafka/network/SocketServerTest.scala   |   4 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala |   3 +
 docs/upgrade.html                               |   6 +
 .../java/org/apache/kafka/tools/ToolsUtils.java |  14 +-
 103 files changed, 1544 insertions(+), 243 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index f4d9655..ddb13bc 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -140,6 +140,10 @@
       <allow class="org.apache.kafka.common.errors.SerializationException" />
       <allow class="org.apache.kafka.common.header.Headers" />
     </subpackage>
+
+    <subpackage name="utils">
+      <allow pkg="org.apache.kafka.common.metrics" />
+    </subpackage>
   </subpackage>
 
   <subpackage name="clients">

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 734b07e..b747e4c 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -60,7 +60,7 @@
               files="AbstractRequest.java"/>
 
     <suppress checks="NPathComplexity"
-              files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|SslTransportLayer|Sender|Serdes|Agent).java"/>
+              files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|Agent).java"/>
 
     <!-- clients tests -->
     <suppress checks="ClassDataAbstractionCoupling"

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 88339bf..5f37b8e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -377,7 +377,7 @@ public class KafkaAdminClient extends AdminClient {
             new TimeoutProcessorFactory() : timeoutProcessorFactory;
         this.maxRetries = config.getInt(AdminClientConfig.RETRIES_CONFIG);
         config.logUnused();
-        AppInfoParser.registerAppInfo(JMX_PREFIX, sanitizedClientId);
+        AppInfoParser.registerAppInfo(JMX_PREFIX, sanitizedClientId, metrics);
         log.debug("Kafka admin client initialized");
         thread.start();
     }
@@ -418,7 +418,7 @@ public class KafkaAdminClient extends AdminClient {
             // Wait for the thread to be joined.
             thread.join();
 
-            AppInfoParser.unregisterAppInfo(JMX_PREFIX, Sanitizer.sanitize(clientId));
+            AppInfoParser.unregisterAppInfo(JMX_PREFIX, Sanitizer.sanitize(clientId), metrics);
 
             log.debug("Kafka admin client closed.");
         } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index f4af39c..d6764ca 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -771,7 +771,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     isolationLevel);
 
             config.logUnused();
-            AppInfoParser.registerAppInfo(JMX_PREFIX, sanitizedClientId);
+            AppInfoParser.registerAppInfo(JMX_PREFIX, sanitizedClientId, metrics);
 
             log.debug("Kafka consumer initialized");
         } catch (Throwable t) {
@@ -1726,7 +1726,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         ClientUtils.closeQuietly(client, "consumer network client", firstException);
         ClientUtils.closeQuietly(keyDeserializer, "consumer key deserializer", firstException);
         ClientUtils.closeQuietly(valueDeserializer, "consumer value deserializer", firstException);
-        AppInfoParser.unregisterAppInfo(JMX_PREFIX, Sanitizer.sanitize(clientId));
+        AppInfoParser.unregisterAppInfo(JMX_PREFIX, Sanitizer.sanitize(clientId), metrics);
         log.debug("Kafka consumer has been closed");
         Throwable exception = firstException.get();
         if (exception != null && !swallowException) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index d40a16c..a202217 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -427,7 +427,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             this.ioThread.start();
             this.errors = this.metrics.sensor("errors");
             config.logUnused();
-            AppInfoParser.registerAppInfo(JMX_PREFIX, sanitizedClientId);
+            AppInfoParser.registerAppInfo(JMX_PREFIX, sanitizedClientId, metrics);
             log.debug("Kafka producer started");
         } catch (Throwable t) {
             // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
@@ -1075,7 +1075,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         ClientUtils.closeQuietly(keySerializer, "producer keySerializer", firstException);
         ClientUtils.closeQuietly(valueSerializer, "producer valueSerializer", firstException);
         ClientUtils.closeQuietly(partitioner, "producer partitioner", firstException);
-        AppInfoParser.unregisterAppInfo(JMX_PREFIX, Sanitizer.sanitize(clientId));
+        AppInfoParser.unregisterAppInfo(JMX_PREFIX, Sanitizer.sanitize(clientId), metrics);
         log.debug("Kafka producer has been closed");
         if (firstException.get() != null && !swallowException)
             throw new KafkaException("Failed to close kafka producer", firstException.get());

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 6d9021c..a15a250 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -675,7 +675,7 @@ public class Sender implements Runnable {
             // not all support the same message format version. For example, if a partition migrates from a broker
             // which is supporting the new magic version to one which doesn't, then we will need to convert.
             if (!records.hasMatchingMagic(minUsedMagic))
-                records = batch.records().downConvert(minUsedMagic, 0);
+                records = batch.records().downConvert(minUsedMagic, 0, time).records();
             produceRecordsByPartition.put(tp, records);
             recordsByPartition.put(tp, batch);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/Metric.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Metric.java b/clients/src/main/java/org/apache/kafka/common/Metric.java
index 908c5c7..1f8d618 100644
--- a/clients/src/main/java/org/apache/kafka/common/Metric.java
+++ b/clients/src/main/java/org/apache/kafka/common/Metric.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.common;
 
 /**
- * A numerical metric tracked for monitoring purposes
+ * A metric tracked for monitoring purposes.
  */
 public interface Metric {
 
@@ -27,8 +27,16 @@ public interface Metric {
     public MetricName metricName();
 
     /**
-     * The value of the metric
+     * The value of the metric as double if the metric is measurable
+     * @throws IllegalStateException if this metric does not have a measurable double value
+     * @deprecated As of 1.0.0, use {@link #metricValue()} instead. This will be removed in a future major release.
      */
+    @Deprecated
     public double value();
 
+    /**
+     * The value of the metric, which may be measurable or a non-measurable gauge
+     */
+    public Object metricValue();
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/metrics/Gauge.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Gauge.java b/clients/src/main/java/org/apache/kafka/common/metrics/Gauge.java
new file mode 100644
index 0000000..647942b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Gauge.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+/**
+ * A gauge metric is an instantaneous reading of a particular value.
+ */
+public interface Gauge<T> extends MetricValueProvider<T> {
+
+    /**
+     * Returns the current value associated with this gauge.
+     * @param config The configuration for this metric
+     * @param now The POSIX time in milliseconds the measurement is being taken
+     */
+    T value(MetricConfig config, long now);
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
index de02a23..294e1d8 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
@@ -184,7 +184,7 @@ public class JmxReporter implements MetricsReporter {
         @Override
         public Object getAttribute(String name) throws AttributeNotFoundException, MBeanException, ReflectionException {
             if (this.metrics.containsKey(name))
-                return this.metrics.get(name).value();
+                return this.metrics.get(name).metricValue();
             else
                 throw new AttributeNotFoundException("Could not find attribute " + name);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
index ef53b89..0cdd844 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
@@ -25,13 +25,16 @@ public final class KafkaMetric implements Metric {
     private MetricName metricName;
     private final Object lock;
     private final Time time;
-    private final Measurable measurable;
+    private final MetricValueProvider<?> metricValueProvider;
     private MetricConfig config;
 
-    KafkaMetric(Object lock, MetricName metricName, Measurable measurable, MetricConfig config, Time time) {
+    KafkaMetric(Object lock, MetricName metricName, MetricValueProvider<?> valueProvider,
+            MetricConfig config, Time time) {
         this.metricName = metricName;
         this.lock = lock;
-        this.measurable = measurable;
+        if (!(valueProvider instanceof Measurable) && !(valueProvider instanceof Gauge))
+            throw new IllegalArgumentException("Unsupported metric value provider of class " + valueProvider.getClass());
+        this.metricValueProvider = valueProvider;
         this.config = config;
         this.time = time;
     }
@@ -45,19 +48,42 @@ public final class KafkaMetric implements Metric {
         return this.metricName;
     }
 
+    /**
+     * See {@link Metric#value()} for the details on why this is deprecated.
+     */
     @Override
+    @Deprecated
     public double value() {
         synchronized (this.lock) {
-            return value(time.milliseconds());
+            return measurableValue(time.milliseconds());
+        }
+    }
+
+    @Override
+    public Object metricValue() {
+        long now = time.milliseconds();
+        synchronized (this.lock) {
+            if (this.metricValueProvider instanceof Measurable)
+                return ((Measurable) metricValueProvider).measure(config, now);
+            else if (this.metricValueProvider instanceof Gauge)
+                return ((Gauge<?>) metricValueProvider).value(config, now);
+            else
+                throw new IllegalStateException("Not a valid metric: " + this.metricValueProvider.getClass());
         }
     }
 
     public Measurable measurable() {
-        return this.measurable;
+        if (this.metricValueProvider instanceof Measurable)
+            return (Measurable) metricValueProvider;
+        else
+            throw new IllegalStateException("Not a measurable: " + this.metricValueProvider.getClass());
     }
 
-    double value(long timeMs) {
-        return this.measurable.measure(config, timeMs);
+    double measurableValue(long timeMs) {
+        if (this.metricValueProvider instanceof Measurable)
+            return ((Measurable) metricValueProvider).measure(config, timeMs);
+        else
+            throw new IllegalStateException("Not a measurable metric");
     }
 
     public void config(MetricConfig config) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java b/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java
index d068895..866caba 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java
@@ -19,7 +19,7 @@ package org.apache.kafka.common.metrics;
 /**
  * A measurable quantity that can be registered as a metric
  */
-public interface Measurable {
+public interface Measurable extends MetricValueProvider<Double> {
 
     /**
      * Measure this quantity and return the result as a double
@@ -27,6 +27,6 @@ public interface Measurable {
      * @param now The POSIX time in milliseconds the measurement is being taken
      * @return The measured value
      */
-    public double measure(MetricConfig config, long now);
+    double measure(MetricConfig config, long now);
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/metrics/MetricValueProvider.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricValueProvider.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricValueProvider.java
new file mode 100644
index 0000000..68028e7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricValueProvider.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+/**
+ * Super-interface for {@link Measurable} or {@link Gauge} that provides
+ * metric values.
+ * <p>
+ * In the future for Java8 and above, {@link Gauge#value(MetricConfig, long)} will be
+ * moved to this interface with a default implementation in {@link Measurable} that returns
+ * {@link Measurable#measure(MetricConfig, long)}.
+ * </p>
+ */
+public interface MetricValueProvider<T> { }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 0b4507b..8127022 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -449,6 +449,10 @@ public class Metrics implements Closeable {
     /**
      * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
      * This is a way to expose existing values as metrics.
+     *
+     * This method is kept for binary compatibility purposes, it has the same behaviour as
+     * {@link #addMetric(MetricName, MetricValue)}.
+     *
      * @param metricName The name of the metric
      * @param measurable The measurable that will be measured by this metric
      */
@@ -457,22 +461,48 @@ public class Metrics implements Closeable {
     }
 
     /**
-     * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
+     * Add a metric to monitor an object that implements Measurable. This metric won't be associated with any sensor.
      * This is a way to expose existing values as metrics.
+     *
+     * This method is kept for binary compatibility purposes, it has the same behaviour as
+     * {@link #addMetric(MetricName, MetricConfig, MetricValueProvider)}.
+     *
      * @param metricName The name of the metric
      * @param config The configuration to use when measuring this measurable
      * @param measurable The measurable that will be measured by this metric
      */
-    public synchronized void addMetric(MetricName metricName, MetricConfig config, Measurable measurable) {
+    public void addMetric(MetricName metricName, MetricConfig config, Measurable measurable) {
+        addMetric(metricName, config, (MetricValueProvider<?>) measurable);
+    }
+
+    /**
+     * Add a metric to monitor an object that implements MetricValueProvider. This metric won't be associated with any
+     * sensor. This is a way to expose existing values as metrics.
+     *
+     * @param metricName The name of the metric
+     * @param metricValueProvider The metric value provider associated with this metric
+     */
+    public void addMetric(MetricName metricName, MetricConfig config, MetricValueProvider<?> metricValueProvider) {
         KafkaMetric m = new KafkaMetric(new Object(),
                                         Utils.notNull(metricName),
-                                        Utils.notNull(measurable),
+                                        Utils.notNull(metricValueProvider),
                                         config == null ? this.config : config,
                                         time);
         registerMetric(m);
     }
 
     /**
+     * Add a metric to monitor an object that implements MetricValueProvider. This metric won't be associated with any
+     * sensor. This is a way to expose existing values as metrics.
+     *
+     * @param metricName The name of the metric
+     * @param metricValueProvider The metric value provider associated with this metric
+     */
+    public void addMetric(MetricName metricName, MetricValueProvider<?> metricValueProvider) {
+        addMetric(metricName, null, metricValueProvider);
+    }
+
+    /**
      * Remove a metric if it exists and return it. Return null otherwise. If a metric is removed, `metricRemoval`
      * will be invoked for each reporter.
      *

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index 55d75b1..321fab6 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -199,7 +199,7 @@ public final class Sensor {
             if (config != null) {
                 Quota quota = config.quota();
                 if (quota != null) {
-                    double value = metric.value(timeMs);
+                    double value = metric.measurableValue(timeMs);
                     if (!quota.acceptable(value)) {
                         throw new QuotaViolationException(metric.metricName(), value,
                             quota.bound());

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index b753745..f70afe0 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -456,7 +456,14 @@ public class Selector implements Selectable, AutoCloseable {
 
                 /* if channel is not ready finish prepare */
                 if (channel.isConnected() && !channel.ready()) {
-                    channel.prepare();
+                    try {
+                        channel.prepare();
+                    } catch (AuthenticationException e) {
+                        sensors.failedAuthentication.record();
+                        throw e;
+                    }
+                    if (channel.ready())
+                        sensors.successfulAuthentication.record();
                 }
 
                 attemptRead(key, channel);
@@ -839,6 +846,8 @@ public class Selector implements Selectable, AutoCloseable {
 
         public final Sensor connectionClosed;
         public final Sensor connectionCreated;
+        public final Sensor successfulAuthentication;
+        public final Sensor failedAuthentication;
         public final Sensor bytesTransferred;
         public final Sensor bytesSent;
         public final Sensor bytesReceived;
@@ -863,19 +872,27 @@ public class Selector implements Selectable, AutoCloseable {
                 tagsSuffix.append(tag.getValue());
             }
 
-            this.connectionClosed = sensor("connections-closed:" + tagsSuffix.toString());
+            this.connectionClosed = sensor("connections-closed:" + tagsSuffix);
             this.connectionClosed.add(createMeter(metrics, metricGrpName, metricTags,
                     "connection-close", "connections closed"));
 
-            this.connectionCreated = sensor("connections-created:" + tagsSuffix.toString());
+            this.connectionCreated = sensor("connections-created:" + tagsSuffix);
             this.connectionCreated.add(createMeter(metrics, metricGrpName, metricTags,
                     "connection-creation", "new connections established"));
 
-            this.bytesTransferred = sensor("bytes-sent-received:" + tagsSuffix.toString());
+            this.successfulAuthentication = sensor("successful-authentication:" + tagsSuffix);
+            this.successfulAuthentication.add(createMeter(metrics, metricGrpName, metricTags,
+                    "successful-authentication", "connections with successful authentication"));
+
+            this.failedAuthentication = sensor("failed-authentication:" + tagsSuffix);
+            this.failedAuthentication.add(createMeter(metrics, metricGrpName, metricTags,
+                    "failed-authentication", "connections with failed authentication"));
+
+            this.bytesTransferred = sensor("bytes-sent-received:" + tagsSuffix);
             bytesTransferred.add(createMeter(metrics, metricGrpName, metricTags, new Count(),
                     "network-io", "network operations (reads or writes) on all connections"));
 
-            this.bytesSent = sensor("bytes-sent:" + tagsSuffix.toString(), bytesTransferred);
+            this.bytesSent = sensor("bytes-sent:" + tagsSuffix, bytesTransferred);
             this.bytesSent.add(createMeter(metrics, metricGrpName, metricTags,
                     "outgoing-byte", "outgoing bytes sent to all servers"));
             this.bytesSent.add(createMeter(metrics, metricGrpName, metricTags, new Count(),
@@ -885,20 +902,20 @@ public class Selector implements Selectable, AutoCloseable {
             metricName = metrics.metricName("request-size-max", metricGrpName, "The maximum size of any request sent.", metricTags);
             this.bytesSent.add(metricName, new Max());
 
-            this.bytesReceived = sensor("bytes-received:" + tagsSuffix.toString(), bytesTransferred);
+            this.bytesReceived = sensor("bytes-received:" + tagsSuffix, bytesTransferred);
             this.bytesReceived.add(createMeter(metrics, metricGrpName, metricTags,
                     "incoming-byte", "bytes read off all sockets"));
             this.bytesReceived.add(createMeter(metrics, metricGrpName, metricTags,
                     new Count(), "response", "responses received"));
 
-            this.selectTime = sensor("select-time:" + tagsSuffix.toString());
+            this.selectTime = sensor("select-time:" + tagsSuffix);
             this.selectTime.add(createMeter(metrics, metricGrpName, metricTags,
                     new Count(), "select", "times the I/O layer checked for new I/O to perform"));
             metricName = metrics.metricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", metricTags);
             this.selectTime.add(metricName, new Avg());
             this.selectTime.add(createIOThreadRatioMeter(metrics, metricGrpName, metricTags, "io-wait", "waiting"));
 
-            this.ioTime = sensor("io-time:" + tagsSuffix.toString());
+            this.ioTime = sensor("io-time:" + tagsSuffix);
             metricName = metrics.metricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", metricTags);
             this.ioTime.add(metricName, new Avg());
             this.ioTime.add(createIOThreadRatioMeter(metrics, metricGrpName, metricTags, "io", "doing I/O"));

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 9ba2048..6a1b95c 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
@@ -59,10 +60,12 @@ public abstract class AbstractRecords implements Records {
      * need to drop records from the batch during the conversion. Some versions of librdkafka rely on this for
      * correctness.
      */
-    protected MemoryRecords downConvert(Iterable<? extends RecordBatch> batches, byte toMagic, long firstOffset) {
+    protected ConvertedRecords<MemoryRecords> downConvert(Iterable<? extends RecordBatch> batches, byte toMagic,
+            long firstOffset, Time time) {
         // maintain the batch along with the decompressed records to avoid the need to decompress again
         List<RecordBatchAndRecords> recordBatchAndRecordsList = new ArrayList<>();
         int totalSizeEstimate = 0;
+        long startNanos = time.nanoseconds();
 
         for (RecordBatch batch : batches) {
             if (toMagic < RecordBatch.MAGIC_VALUE_V2 && batch.isControlBatch())
@@ -91,22 +94,31 @@ public abstract class AbstractRecords implements Records {
         }
 
         ByteBuffer buffer = ByteBuffer.allocate(totalSizeEstimate);
+        long temporaryMemoryBytes = 0;
+        long conversionCount = 0;
         for (RecordBatchAndRecords recordBatchAndRecords : recordBatchAndRecordsList) {
-            if (recordBatchAndRecords.batch.magic() <= toMagic)
+            temporaryMemoryBytes += recordBatchAndRecords.batch.sizeInBytes();
+            if (recordBatchAndRecords.batch.magic() <= toMagic) {
                 recordBatchAndRecords.batch.writeTo(buffer);
-            else
-                buffer = convertRecordBatch(toMagic, buffer, recordBatchAndRecords);
+            } else {
+                MemoryRecordsBuilder builder = convertRecordBatch(toMagic, buffer, recordBatchAndRecords);
+                buffer = builder.buffer();
+                temporaryMemoryBytes += builder.uncompressedBytesWritten();
+                conversionCount++;
+            }
         }
 
         buffer.flip();
-        return MemoryRecords.readableRecords(buffer);
+        RecordsProcessingStats stats = new RecordsProcessingStats(temporaryMemoryBytes, conversionCount,
+                time.nanoseconds() - startNanos);
+        return new ConvertedRecords<>(MemoryRecords.readableRecords(buffer), stats);
     }
 
     /**
      * Return a buffer containing the converted record batches. The returned buffer may not be the same as the received
      * one (e.g. it may require expansion).
      */
-    private ByteBuffer convertRecordBatch(byte magic, ByteBuffer buffer, RecordBatchAndRecords recordBatchAndRecords) {
+    private MemoryRecordsBuilder convertRecordBatch(byte magic, ByteBuffer buffer, RecordBatchAndRecords recordBatchAndRecords) {
         RecordBatch batch = recordBatchAndRecords.batch;
         final TimestampType timestampType = batch.timestampType();
         long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ? batch.maxTimestamp() : RecordBatch.NO_TIMESTAMP;
@@ -117,7 +129,7 @@ public abstract class AbstractRecords implements Records {
             builder.append(record);
 
         builder.close();
-        return builder.buffer();
+        return builder;
     }
 
     /**
@@ -192,7 +204,8 @@ public abstract class AbstractRecords implements Records {
      * Get an upper bound estimate on the batch size needed to hold a record with the given fields. This is only
      * an estimate because it does not take into account overhead from the compression algorithm.
      */
-    public static int estimateSizeInBytesUpperBound(byte magic, CompressionType compressionType, ByteBuffer key, ByteBuffer value, Header[] headers) {
+    public static int estimateSizeInBytesUpperBound(byte magic, CompressionType compressionType, ByteBuffer key,
+                                                    ByteBuffer value, Header[] headers) {
         if (magic >= RecordBatch.MAGIC_VALUE_V2)
             return DefaultRecordBatch.estimateBatchSizeUpperBound(key, value, headers);
         else if (compressionType != CompressionType.NONE)
@@ -201,6 +214,23 @@ public abstract class AbstractRecords implements Records {
             return Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value);
     }
 
+    /**
+     * Return the size of the record batch header.
+     *
+     * For V0 and V1 with no compression, it's unclear if Records.LOG_OVERHEAD or 0 should be chosen. There is no header
+     * per batch, but a sequence of batches is preceded by the offset and size. This method returns `0` as it's what
+     * `MemoryRecordsBuilder` requires.
+     */
+    public static int recordBatchHeaderSizeInBytes(byte magic, CompressionType compressionType) {
+        if (magic > RecordBatch.MAGIC_VALUE_V1) {
+            return DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
+        } else if (compressionType != CompressionType.NONE) {
+            return Records.LOG_OVERHEAD + LegacyRecord.recordOverhead(magic);
+        } else {
+            return 0;
+        }
+    }
+
     private static class RecordBatchAndRecords {
         private final RecordBatch batch;
         private final List<Record> records;

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java b/clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java
new file mode 100644
index 0000000..fe37c48
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+public class ConvertedRecords<T extends Records> {
+
+    private final T records;
+    private final RecordsProcessingStats recordsProcessingStats;
+
+    public ConvertedRecords(T records, RecordsProcessingStats recordsProcessingStats) {
+        this.records = records;
+        this.recordsProcessingStats = recordsProcessingStats;
+    }
+
+    public T records() {
+        return records;
+    }
+
+    public RecordsProcessingStats recordsProcessingStats() {
+        return recordsProcessingStats;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index a898634..ae99db3 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.record;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 
 import java.io.Closeable;
@@ -237,19 +238,19 @@ public class FileRecords extends AbstractRecords implements Closeable {
     }
 
     @Override
-    public Records downConvert(byte toMagic, long firstOffset) {
+    public ConvertedRecords<? extends Records> downConvert(byte toMagic, long firstOffset, Time time) {
         List<? extends RecordBatch> batches = Utils.toList(batches().iterator());
         if (batches.isEmpty()) {
             // This indicates that the message is too large, which means that the buffer is not large
-            // enough to hold a full record batch. We just return all the bytes in the file message set.
-            // Even though the message set does not have the right format version, we expect old clients
-            // to raise an error to the user after reading the message size and seeing that there
-            // are not enough available bytes in the response to read the full message. Note that this is
+            // enough to hold a full record batch. We just return all the bytes in this instance.
+            // Even though the record batch does not have the right format version, we expect old clients
+            // to raise an error to the user after reading the record batch size and seeing that there
+            // are not enough available bytes in the response to read it fully. Note that this is
             // only possible prior to KIP-74, after which the broker was changed to always return at least
-            // one full message, even if it requires exceeding the max fetch size requested by the client.
-            return this;
+            // one full record batch, even if it requires exceeding the max fetch size requested by the client.
+            return new ConvertedRecords<>(this, RecordsProcessingStats.EMPTY);
         } else {
-            return downConvert(batches, toMagic, firstOffset);
+            return downConvert(batches, toMagic, firstOffset, time);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index d3f2444..2a25aad 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -111,8 +112,8 @@ public class MemoryRecords extends AbstractRecords {
     }
 
     @Override
-    public MemoryRecords downConvert(byte toMagic, long firstOffset) {
-        return downConvert(batches(), toMagic, firstOffset);
+    public ConvertedRecords<MemoryRecords> downConvert(byte toMagic, long firstOffset, Time time) {
+        return downConvert(batches(), toMagic, firstOffset, time);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index fc83134..ad0bab7 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -54,6 +54,7 @@ public class MemoryRecordsBuilder {
     private final boolean isControlBatch;
     private final int partitionLeaderEpoch;
     private final int writeLimit;
+    private final int batchHeaderSizeInBytes;
 
     // Use a conservative estimate of the compression ratio. The producer overrides this using statistics
     // from previous batches before appending any records.
@@ -64,8 +65,7 @@ public class MemoryRecordsBuilder {
     private long producerId;
     private short producerEpoch;
     private int baseSequence;
-    private int writtenUncompressed = 0; // Number of bytes (excluding the header) written before compression
-    private int batchHeaderSize;
+    private int uncompressedRecordsSizeInBytes = 0; // Number of bytes (excluding the header) written before compression
     private int numRecords = 0;
     private float actualCompressionRatio = 1;
     private long maxTimestamp = RecordBatch.NO_TIMESTAMP;
@@ -104,7 +104,7 @@ public class MemoryRecordsBuilder {
         this.baseOffset = baseOffset;
         this.logAppendTime = logAppendTime;
         this.numRecords = 0;
-        this.writtenUncompressed = 0;
+        this.uncompressedRecordsSizeInBytes = 0;
         this.actualCompressionRatio = 1;
         this.maxTimestamp = RecordBatch.NO_TIMESTAMP;
         this.producerId = producerId;
@@ -115,18 +115,9 @@ public class MemoryRecordsBuilder {
         this.partitionLeaderEpoch = partitionLeaderEpoch;
         this.writeLimit = writeLimit;
         this.initialPosition = bufferStream.position();
+        this.batchHeaderSizeInBytes = AbstractRecords.recordBatchHeaderSizeInBytes(magic, compressionType);
 
-        if (magic > RecordBatch.MAGIC_VALUE_V1) {
-            batchHeaderSize = DefaultRecordBatch.RECORDS_OFFSET;
-        } else if (compressionType != CompressionType.NONE) {
-            // for compressed records, leave space for the header and the shallow message metadata
-            // and move the starting position to the value payload offset
-            batchHeaderSize = Records.LOG_OVERHEAD + LegacyRecord.recordOverhead(magic);
-        } else {
-            batchHeaderSize = 0;
-        }
-
-        bufferStream.position(initialPosition + batchHeaderSize);
+        bufferStream.position(initialPosition + batchHeaderSizeInBytes);
         this.bufferStream = bufferStream;
         this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
     }
@@ -238,6 +229,17 @@ public class MemoryRecordsBuilder {
         }
     }
 
+    public int numRecords() {
+        return numRecords;
+    }
+
+    /**
+     * Return the sum of the size of the batch header (always uncompressed) and the records (before compression).
+     */
+    public int uncompressedBytesWritten() {
+        return uncompressedRecordsSizeInBytes + batchHeaderSizeInBytes;
+    }
+
     public void setProducerState(long producerId, short producerEpoch, int baseSequence, boolean isTransactional) {
         if (isClosed()) {
             // Sequence numbers are assigned when the batch is closed while the accumulator is being drained.
@@ -306,9 +308,9 @@ public class MemoryRecordsBuilder {
             builtRecords = MemoryRecords.EMPTY;
         } else {
             if (magic > RecordBatch.MAGIC_VALUE_V1)
-                this.actualCompressionRatio = (float) writeDefaultBatchHeader() / this.writtenUncompressed;
+                this.actualCompressionRatio = (float) writeDefaultBatchHeader() / this.uncompressedRecordsSizeInBytes;
             else if (compressionType != CompressionType.NONE)
-                this.actualCompressionRatio = (float) writeLegacyCompressedWrapperHeader() / this.writtenUncompressed;
+                this.actualCompressionRatio = (float) writeLegacyCompressedWrapperHeader() / this.uncompressedRecordsSizeInBytes;
 
             ByteBuffer buffer = buffer().duplicate();
             buffer.flip();
@@ -651,7 +653,7 @@ public class MemoryRecordsBuilder {
                     ", last offset: " + offset);
 
         numRecords += 1;
-        writtenUncompressed += size;
+        uncompressedRecordsSizeInBytes += size;
         lastOffset = offset;
 
         if (magic > RecordBatch.MAGIC_VALUE_V0 && timestamp > maxTimestamp) {
@@ -678,10 +680,10 @@ public class MemoryRecordsBuilder {
      */
     private int estimatedBytesWritten() {
         if (compressionType == CompressionType.NONE) {
-            return batchHeaderSize + writtenUncompressed;
+            return batchHeaderSizeInBytes + uncompressedRecordsSizeInBytes;
         } else {
             // estimate the written bytes to the underlying byte buffer based on uncompressed written bytes
-            return batchHeaderSize + (int) (writtenUncompressed * estimatedCompressionRatio * COMPRESSION_RATE_ESTIMATION_FACTOR);
+            return batchHeaderSizeInBytes + (int) (uncompressedRecordsSizeInBytes * estimatedCompressionRatio * COMPRESSION_RATE_ESTIMATION_FACTOR);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/record/Records.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java b/clients/src/main/java/org/apache/kafka/common/record/Records.java
index ec2e717..19152ba 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Records.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java
@@ -19,6 +19,8 @@ package org.apache.kafka.common.record;
 import java.io.IOException;
 import java.nio.channels.GatheringByteChannel;
 
+import org.apache.kafka.common.utils.Time;
+
 /**
  * Interface for accessing the records contained in a log. The log itself is represented as a sequence of record
  * batches (see {@link RecordBatch}).
@@ -97,10 +99,11 @@ public interface Records {
      * deep iteration since all of the deep records must also be converted to the desired format.
      * @param toMagic The magic value to convert to
      * @param firstOffset The starting offset for returned records. This only impacts some cases. See
-     *                    {@link AbstractRecords#downConvert(Iterable, byte, long)} for an explanation.
-     * @return A Records instance (which may or may not be the same instance)
+     *                    {@link AbstractRecords#downConvert(Iterable, byte, long, Time) for an explanation.
+     * @param time instance used for reporting stats
+     * @return A ConvertedRecords instance which may or may not contain the same instance in its records field.
      */
-    Records downConvert(byte toMagic, long firstOffset);
+    ConvertedRecords<? extends Records> downConvert(byte toMagic, long firstOffset, Time time);
 
     /**
      * Get an iterator over the records in this log. Note that this generally requires decompression,

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/record/RecordsProcessingStats.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordsProcessingStats.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsProcessingStats.java
new file mode 100644
index 0000000..cbb76b7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsProcessingStats.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+public class RecordsProcessingStats {
+
+    public static final RecordsProcessingStats EMPTY = new RecordsProcessingStats(0L, 0L, -1);
+
+    private final long temporaryMemoryBytes;
+    private final long conversionCount;
+    private final long conversionTimeNanos;
+
+    public RecordsProcessingStats(long temporaryMemoryBytes, long conversionCount, long conversionTimeNanos) {
+        this.temporaryMemoryBytes = temporaryMemoryBytes;
+        this.conversionCount = conversionCount;
+        this.conversionTimeNanos = conversionTimeNanos;
+    }
+
+    /**
+     * Returns the number of temporary memory bytes allocated to process the records.
+     * This size depends on whether the records need decompression and/or conversion:
+     * <ul>
+     *   <li>Non compressed, no conversion: zero</li>
+     *   <li>Non compressed, with conversion: size of the converted buffer</li>
+     *   <li>Compressed, no conversion: size of the original buffer after decompression</li>
+     *   <li>Compressed, with conversion: size of the original buffer after decompression + size of the converted buffer uncompressed</li>
+     * </ul>
+     */
+    public long temporaryMemoryBytes() {
+        return temporaryMemoryBytes;
+    }
+
+    public long conversionCount() {
+        return conversionCount;
+    }
+
+    public long conversionTimeNanos() {
+        return conversionTimeNanos;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("RecordsProcessingStats(temporaryMemoryBytes=%d, conversionCount=%d, conversionTimeNanos=%d)",
+                temporaryMemoryBytes, conversionCount, conversionTimeNanos);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index da1d147..e6dd6da 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -19,9 +19,11 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.network.NetworkSend;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 public abstract class AbstractRequest extends AbstractRequestResponse {
 
@@ -120,6 +122,18 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
     public abstract AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e);
 
     /**
+     * Get the error counts corresponding to an error response. This is overridden for requests
+     * where response may be null (e.g produce with acks=0).
+     */
+    public Map<Errors, Integer> errorCounts(Throwable e) {
+        AbstractResponse response = getErrorResponse(0, e);
+        if (response == null)
+            throw new IllegalStateException("Error counts could not be obtained for request " + this);
+        else
+            return response.errorCounts();
+    }
+
+    /**
      * Factory method for getting a request object based on ApiKey ID and a version
      */
     public static AbstractRequest parseRequest(ApiKeys apiKey, short apiVersion, Struct struct) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 4cff798..12fe3c8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -19,9 +19,13 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.network.NetworkSend;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 public abstract class AbstractResponse extends AbstractRequestResponse {
     public static final int DEFAULT_THROTTLE_TIME = 0;
@@ -37,6 +41,31 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
         return serialize(responseHeader.toStruct(), toStruct(version));
     }
 
+    public abstract Map<Errors, Integer> errorCounts();
+
+    protected Map<Errors, Integer> errorCounts(Errors error) {
+        return Collections.singletonMap(error, 1);
+    }
+
+    protected Map<Errors, Integer> errorCounts(Map<?, Errors> errors) {
+        Map<Errors, Integer> errorCounts = new HashMap<>();
+        for (Errors error : errors.values())
+            updateErrorCounts(errorCounts, error);
+        return errorCounts;
+    }
+
+    protected Map<Errors, Integer> apiErrorCounts(Map<?, ApiError> errors) {
+        Map<Errors, Integer> errorCounts = new HashMap<>();
+        for (ApiError apiError : errors.values())
+            updateErrorCounts(errorCounts, apiError.error());
+        return errorCounts;
+    }
+
+    protected void updateErrorCounts(Map<Errors, Integer> errorCounts, Errors error) {
+        Integer count = errorCounts.get(error);
+        errorCounts.put(error, count == null ? 1 : count + 1);
+    }
+
     protected abstract Struct toStruct(short version);
 
     public static AbstractResponse parseResponse(ApiKeys apiKey, Struct struct) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
index 10dc279..69b8ad4 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
@@ -67,6 +68,11 @@ public class AddOffsetsToTxnResponse extends AbstractResponse {
     }
 
     @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
+    @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.ADD_OFFSETS_TO_TXN.responseSchema(version));
         struct.set(THROTTLE_TIME_MS, throttleTimeMs);

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
index e9f6088..4472449 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
@@ -95,6 +95,11 @@ public class AddPartitionsToTxnResponse extends AbstractResponse {
     }
 
     @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(errors);
+    }
+
+    @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.ADD_PARTITIONS_TO_TXN.responseSchema(version));
         struct.set(THROTTLE_TIME_MS, throttleTimeMs);

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
index df9416e..f292ef6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -81,6 +82,11 @@ public class AlterConfigsResponse extends AbstractResponse {
         return errors;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return apiErrorCounts(errors);
+    }
+
     public int throttleTimeMs() {
         return throttleTimeMs;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
index 1767d45..b875104 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
@@ -124,6 +124,11 @@ public class AlterReplicaDirResponse extends AbstractResponse {
         return this.responses;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(responses);
+    }
+
     public static AlterReplicaDirResponse parse(ByteBuffer buffer, short version) {
         return new AlterReplicaDirResponse(ApiKeys.ALTER_REPLICA_DIR.responseSchema(version).read(buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 2bdc8aa..753db9e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -159,6 +159,11 @@ public class ApiVersionsResponse extends AbstractResponse {
         return error;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
     public static ApiVersionsResponse parse(ByteBuffer buffer, short version) {
         return new ApiVersionsResponse(ApiKeys.API_VERSIONS.parseResponse(version, buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
index dfd68e7..f9852dd 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
@@ -85,6 +86,11 @@ public class ControlledShutdownResponse extends AbstractResponse {
         return error;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
     public Set<TopicPartition> partitionsRemaining() {
         return partitionsRemaining;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
index 836215e..6c35798 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
@@ -20,11 +20,14 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
@@ -101,6 +104,14 @@ public class CreateAclsResponse extends AbstractResponse {
         return aclCreationResponses;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        Map<Errors, Integer> errorCounts = new HashMap<>();
+        for (AclCreationResponse response : aclCreationResponses)
+            updateErrorCounts(errorCounts, response.error.error());
+        return errorCounts;
+    }
+
     public static CreateAclsResponse parse(ByteBuffer buffer, short version) {
         return new CreateAclsResponse(ApiKeys.CREATE_ACLS.responseSchema(version).read(buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
index 3e3cc30..41c5327 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
@@ -122,7 +122,7 @@ public class CreatePartitionsRequest extends AbstractRequest {
             Object[] assignmentsArray = partitionCountStruct.getArray(ASSIGNMENT_KEY_NAME);
             NewPartitions newPartition;
             if (assignmentsArray != null) {
-                List<List<Integer>> assignments = new ArrayList(assignmentsArray.length);
+                List<List<Integer>> assignments = new ArrayList<>(assignmentsArray.length);
                 for (Object replicas : assignmentsArray) {
                     Object[] replicasArray = (Object[]) replicas;
                     List<Integer> replicasList = new ArrayList<>(replicasArray.length);

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsResponse.java
index 390221f..73a6b56 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsResponse.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
@@ -94,6 +95,11 @@ public class CreatePartitionsResponse extends AbstractResponse {
         return errors;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return apiErrorCounts(errors);
+    }
+
     public int throttleTimeMs() {
         return throttleTimeMs;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
index b3c052b..1f73eda 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -125,6 +126,11 @@ public class CreateTopicsResponse extends AbstractResponse {
         return errors;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return apiErrorCounts(errors);
+    }
+
     public static CreateTopicsResponse parse(ByteBuffer buffer, short version) {
         return new CreateTopicsResponse(ApiKeys.CREATE_TOPICS.responseSchema(version).read(buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
index 0857287..7ae25da 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.resource.Resource;
 import org.apache.kafka.common.utils.Utils;
@@ -31,7 +32,9 @@ import org.slf4j.LoggerFactory;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
@@ -182,6 +185,14 @@ public class DeleteAclsResponse extends AbstractResponse {
         return responses;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        Map<Errors, Integer> errorCounts = new HashMap<>();
+        for (AclFilterResponse response : responses)
+            updateErrorCounts(errorCounts, response.error.error());
+        return errorCounts;
+    }
+
     public static DeleteAclsResponse parse(ByteBuffer buffer, short version) {
         return new DeleteAclsResponse(ApiKeys.DELETE_ACLS.responseSchema(version).read(buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
index 5bfdec8..b839bf5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
@@ -161,6 +161,14 @@ public class DeleteRecordsResponse extends AbstractResponse {
         return this.responses;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        Map<Errors, Integer> errorCounts = new HashMap<>();
+        for (PartitionResponse response : responses.values())
+            updateErrorCounts(errorCounts, response.error);
+        return errorCounts;
+    }
+
     public static DeleteRecordsResponse parse(ByteBuffer buffer, short version) {
         return new DeleteRecordsResponse(ApiKeys.DELETE_RECORDS.responseSchema(version).read(buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
index 9c84c11..d13e441 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
@@ -110,6 +110,11 @@ public class DeleteTopicsResponse extends AbstractResponse {
         return errors;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(errors);
+    }
+
     public static DeleteTopicsResponse parse(ByteBuffer buffer, short version) {
         return new DeleteTopicsResponse(ApiKeys.DELETE_TOPICS.responseSchema(version).read(buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
index f8b9695..a21230b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.resource.Resource;
 
@@ -133,6 +134,11 @@ public class DescribeAclsResponse extends AbstractResponse {
         return error;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error.error());
+    }
+
     public Collection<AclBinding> acls() {
         return acls;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
index 9b2289d..91bf30e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -176,6 +177,14 @@ public class DescribeConfigsResponse extends AbstractResponse {
     }
 
     @Override
+    public Map<Errors, Integer> errorCounts() {
+        Map<Errors, Integer> errorCounts = new HashMap<>();
+        for (Config response : configs.values())
+            updateErrorCounts(errorCounts, response.error.error());
+        return errorCounts;
+    }
+
+    @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.DESCRIBE_CONFIGS.responseSchema(version));
         struct.set(THROTTLE_TIME_MS, throttleTimeMs);

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index 9241165..61c5a36 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -141,6 +141,13 @@ public class DescribeGroupsResponse extends AbstractResponse {
         return groups;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        Map<Errors, Integer> errorCounts = new HashMap<>();
+        for (GroupMetadata response : groups.values())
+            updateErrorCounts(errorCounts, response.error);
+        return errorCounts;
+    }
 
     public static class GroupMetadata {
         private final Errors error;

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
index dc226d8..6613dfe 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
@@ -162,6 +162,14 @@ public class DescribeLogDirsResponse extends AbstractResponse {
         return throttleTimeMs;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        Map<Errors, Integer> errorCounts = new HashMap<>();
+        for (LogDirInfo logDirInfo : logDirInfos.values())
+            updateErrorCounts(errorCounts, logDirInfo.error);
+        return errorCounts;
+    }
+
     public Map<String, LogDirInfo> logDirInfos() {
         return logDirInfos;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
index a3bae58..a0a453d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
@@ -66,6 +67,11 @@ public class EndTxnResponse extends AbstractResponse {
     }
 
     @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
+    @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.END_TXN.responseSchema(version));
         struct.set(THROTTLE_TIME_MS, throttleTimeMs);

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index f8d3090..0d09027 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.record.Records;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -353,6 +354,14 @@ public class FetchResponse extends AbstractResponse {
         return this.throttleTimeMs;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        Map<Errors, Integer> errorCounts = new HashMap<>();
+        for (PartitionData response : responseData.values())
+            updateErrorCounts(errorCounts, response.error);
+        return errorCounts;
+    }
+
     public static FetchResponse parse(ByteBuffer buffer, short version) {
         return new FetchResponse(ApiKeys.FETCH.responseSchema(version).read(buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
index f5d9805..bddc41f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
@@ -104,6 +105,11 @@ public class FindCoordinatorResponse extends AbstractResponse {
         return error;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
     public Node node() {
         return node;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index e41d937..b52a993 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
@@ -74,6 +75,11 @@ public class HeartbeatResponse extends AbstractResponse {
     }
 
     @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
+    @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.HEARTBEAT.responseSchema(version));
         struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);


[2/3] kafka git commit: KAFKA-5746; Add new metrics to support health checks (KIP-188)

Posted by ij...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
index 8799ad7..9ecb21f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.RecordBatch;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
@@ -88,6 +89,11 @@ public class InitProducerIdResponse extends AbstractResponse {
         return error;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
     public short epoch() {
         return epoch;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index a4431b9..56491eb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -147,6 +147,11 @@ public class JoinGroupResponse extends AbstractResponse {
         return error;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
     public int generationId() {
         return generationId;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
index 39b8c37..921c8ad 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
@@ -87,6 +87,11 @@ public class LeaderAndIsrResponse extends AbstractResponse {
         return error;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
     public static LeaderAndIsrResponse parse(ByteBuffer buffer, short version) {
         return new LeaderAndIsrResponse(ApiKeys.LEADER_AND_ISR.parseResponse(version, buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
index bef21e9..f8682ec 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
@@ -73,6 +74,11 @@ public class LeaveGroupResponse extends AbstractResponse {
     }
 
     @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
+    @Override
     public Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.LEAVE_GROUP.responseSchema(version));
         struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index 8f48f39..afc5ebd 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
@@ -97,6 +98,11 @@ public class ListGroupsResponse extends AbstractResponse {
         return error;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
     public static class Group {
         private final String groupId;
         private final String protocolType;

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index 732fb49..13f2dfb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -191,6 +191,14 @@ public class ListOffsetResponse extends AbstractResponse {
         return responseData;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        Map<Errors, Integer> errorCounts = new HashMap<>();
+        for (PartitionData response : responseData.values())
+            updateErrorCounts(errorCounts, response.error);
+        return errorCounts;
+    }
+
     public static ListOffsetResponse parse(ByteBuffer buffer, short version) {
         return new ListOffsetResponse(ApiKeys.LIST_OFFSETS.parseResponse(version, buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index fb69cef..99c4ffb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -294,6 +294,14 @@ public class MetadataResponse extends AbstractResponse {
         return errors;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        Map<Errors, Integer> errorCounts = new HashMap<>();
+        for (TopicMetadata metadata : topicMetadata)
+            updateErrorCounts(errorCounts, metadata.error);
+        return errorCounts;
+    }
+
     /**
      * Returns the set of topics with the specified error
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 13484ed..b439034 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -145,6 +145,11 @@ public class OffsetCommitResponse extends AbstractResponse {
         return responseData;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(responseData);
+    }
+
     public static OffsetCommitResponse parse(ByteBuffer buffer, short version) {
         return new OffsetCommitResponse(ApiKeys.OFFSET_COMMIT.parseResponse(version, buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index c3341e0..4d069fe 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -189,6 +189,11 @@ public class OffsetFetchResponse extends AbstractResponse {
         return this.error;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
     public Map<TopicPartition, PartitionData> responseData() {
         return responseData;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
index 13d70b7..4a91533 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
@@ -82,6 +82,14 @@ public class OffsetsForLeaderEpochResponse extends AbstractResponse {
         return epochEndOffsetsByPartition;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        Map<Errors, Integer> errorCounts = new HashMap<>();
+        for (EpochEndOffset response : epochEndOffsetsByPartition.values())
+            updateErrorCounts(errorCounts, response.error());
+        return errorCounts;
+    }
+
     public static OffsetsForLeaderEpochResponse parse(ByteBuffer buffer, short versionId) {
         return new OffsetsForLeaderEpochResponse(ApiKeys.OFFSET_FOR_LEADER_EPOCH.responseSchema(versionId).read(buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index fbc7f76..ee4a2e2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.Utils;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -334,6 +335,12 @@ public class ProduceRequest extends AbstractRequest {
         }
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts(Throwable e) {
+        Errors error = Errors.forException(e);
+        return Collections.singletonMap(error, partitions().size());
+    }
+
     private Collection<TopicPartition> partitions() {
         return partitionSizes.keySet();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index e1978dd..afedc9d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -233,6 +233,14 @@ public class ProduceResponse extends AbstractResponse {
         return this.throttleTime;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        Map<Errors, Integer> errorCounts = new HashMap<>();
+        for (PartitionResponse response : responses.values())
+            updateErrorCounts(errorCounts, response.error);
+        return errorCounts;
+    }
+
     public static final class PartitionResponse {
         public Errors error;
         public long baseOffset;

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
index c950cb9..e244182 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
@@ -85,6 +86,11 @@ public class SaslAuthenticateResponse extends AbstractResponse {
     }
 
     @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
+    @Override
     public Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.SASL_AUTHENTICATE.responseSchema(version));
         struct.set(ERROR_CODE, error.code());

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
index c9f6369..252c62d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 
@@ -76,6 +77,11 @@ public class SaslHandshakeResponse extends AbstractResponse {
     }
 
     @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
+    @Override
     public Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.SASL_HANDSHAKE.responseSchema(version));
         struct.set(ERROR_CODE, error.code());

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
index 4196b83..8ad6222 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
@@ -84,6 +84,11 @@ public class StopReplicaResponse extends AbstractResponse {
         return error;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
     public static StopReplicaResponse parse(ByteBuffer buffer, short version) {
         return new StopReplicaResponse(ApiKeys.STOP_REPLICA.parseResponse(version, buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
index d68b2cd..77f9512 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
@@ -82,6 +83,11 @@ public class SyncGroupResponse extends AbstractResponse {
         return error;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
     public ByteBuffer memberAssignment() {
         return memberState;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
index 53804d9..ff0f8ce 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
@@ -125,6 +125,11 @@ public class TxnOffsetCommitResponse extends AbstractResponse {
         return errors;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(errors);
+    }
+
     public static TxnOffsetCommitResponse parse(ByteBuffer buffer, short version) {
         return new TxnOffsetCommitResponse(ApiKeys.TXN_OFFSET_COMMIT.parseResponse(version, buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
index 9ff8e27..4c21cde 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 
@@ -56,6 +57,11 @@ public class UpdateMetadataResponse extends AbstractResponse {
         return error;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
     public static UpdateMetadataResponse parse(ByteBuffer buffer, short version) {
         return new UpdateMetadataResponse(ApiKeys.UPDATE_METADATA.parseResponse(version, buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
index 797fb59..f4bf157 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
@@ -150,6 +150,16 @@ public class WriteTxnMarkersResponse extends AbstractResponse {
         return errors.get(producerId);
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        Map<Errors, Integer> errorCounts = new HashMap<>();
+        for (Map<TopicPartition, Errors> allErrors : errors.values()) {
+            for (Errors error : allErrors.values())
+                updateErrorCounts(errorCounts, error);
+        }
+        return errorCounts;
+    }
+
     public static WriteTxnMarkersResponse parse(ByteBuffer buffer, short version) {
         return new WriteTxnMarkersResponse(ApiKeys.WRITE_TXN_MARKERS.parseResponse(version, buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index fe57d27..739e0cd 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -113,7 +113,7 @@ public class SaslServerAuthenticator implements Authenticator {
     // Next SASL state to be set when outgoing writes associated with the current SASL state complete
     private SaslState pendingSaslState = null;
     // Exception that will be thrown by `authenticate()` when SaslState is set to FAILED after outbound writes complete
-    private IOException pendingException = null;
+    private AuthenticationException pendingException = null;
     private SaslServer saslServer;
     private String saslMechanism;
     private AuthCallbackHandler callbackHandler;
@@ -272,8 +272,15 @@ public class SaslServerAuthenticator implements Authenticator {
                     default:
                         break;
                 }
+            } catch (SaslException | AuthenticationException e) {
+                // Exception will be propagated after response is sent to client
+                AuthenticationException authException = (e instanceof AuthenticationException) ?
+                        (AuthenticationException) e : new AuthenticationException("SASL authentication failed", e);
+                setSaslState(SaslState.FAILED, authException);
             } catch (Exception e) {
-                setSaslState(SaslState.FAILED, new IOException(e));
+                // In the case of IOExceptions and other unexpected exceptions, fail immediately
+                saslState = SaslState.FAILED;
+                throw e;
             }
         }
     }
@@ -303,7 +310,7 @@ public class SaslServerAuthenticator implements Authenticator {
         setSaslState(saslState, null);
     }
 
-    private void setSaslState(SaslState saslState, IOException exception) throws IOException {
+    private void setSaslState(SaslState saslState, AuthenticationException exception) throws IOException {
         if (netOutBuffer != null && !netOutBuffer.completed()) {
             pendingSaslState = saslState;
             pendingException = exception;

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
index 25e09e1..6053da6 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
@@ -24,6 +24,10 @@ import javax.management.JMException;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,27 +55,49 @@ public class AppInfoParser {
         return COMMIT_ID;
     }
 
-    public static synchronized void registerAppInfo(String prefix, String id) {
+    public static synchronized void registerAppInfo(String prefix, String id, Metrics metrics) {
         try {
             ObjectName name = new ObjectName(prefix + ":type=app-info,id=" + id);
             AppInfo mBean = new AppInfo();
             ManagementFactory.getPlatformMBeanServer().registerMBean(mBean, name);
+
+            registerMetrics(metrics); // prefix will be added later by JmxReporter
         } catch (JMException e) {
             log.warn("Error registering AppInfo mbean", e);
         }
     }
 
-    public static synchronized void unregisterAppInfo(String prefix, String id) {
+    public static synchronized void unregisterAppInfo(String prefix, String id, Metrics metrics) {
         MBeanServer server = ManagementFactory.getPlatformMBeanServer();
         try {
             ObjectName name = new ObjectName(prefix + ":type=app-info,id=" + id);
             if (server.isRegistered(name))
                 server.unregisterMBean(name);
+
+            unregisterMetrics(metrics);
         } catch (JMException e) {
             log.warn("Error unregistering AppInfo mbean", e);
         }
     }
 
+    private static MetricName metricName(Metrics metrics, String name) {
+        return metrics.metricName(name, "app-info", "Metric indicating " + name);
+    }
+
+    private static void registerMetrics(Metrics metrics) {
+        if (metrics != null) {
+            metrics.addMetric(metricName(metrics, "version"), new ImmutableValue<>(VERSION));
+            metrics.addMetric(metricName(metrics, "commit-id"), new ImmutableValue<>(COMMIT_ID));
+        }
+    }
+
+    private static void unregisterMetrics(Metrics metrics) {
+        if (metrics != null) {
+            metrics.removeMetric(metricName(metrics, "version"));
+            metrics.removeMetric(metricName(metrics, "commit-id"));
+        }
+    }
+
     public interface AppInfoMBean {
         public String getVersion();
         public String getCommitId();
@@ -95,4 +121,17 @@ public class AppInfoParser {
         }
 
     }
+
+    static class ImmutableValue<T> implements Gauge<T> {
+        private final T value;
+
+        public ImmutableValue(T value) {
+            this.value = value;
+        }
+
+        @Override
+        public T value(MetricConfig config, long now) {
+            return value;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 5263d3b..cab385c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -109,6 +109,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+@SuppressWarnings("deprecation")
 public class FetcherTest {
     private ConsumerRebalanceListener listener = new NoOpConsumerRebalanceListener();
     private String topicName = "test";

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index afccee0..3a92e63 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -240,6 +240,7 @@ public class SenderTest {
      * Send multiple requests. Verify that the client side quota metrics have the right values
      */
     @Test
+    @SuppressWarnings("deprecation")
     public void testQuotaMetrics() throws Exception {
         MockSelector selector = new MockSelector(time);
         Sensor throttleTimeSensor = Sender.throttleTimeSensor(this.senderMetricsRegistry);
@@ -1619,6 +1620,7 @@ public class SenderTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void testAbortRetryWhenProducerIdChanges() throws InterruptedException {
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();
@@ -1711,6 +1713,7 @@ public class SenderTest {
         testSplitBatchAndSend(txnManager, producerIdAndEpoch, tp);
     }
 
+    @SuppressWarnings("deprecation")
     private void testSplitBatchAndSend(TransactionManager txnManager,
                                        ProducerIdAndEpoch producerIdAndEpoch,
                                        TopicPartition tp) throws Exception {

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index e24c3d7..55f8e23 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -46,6 +46,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+@SuppressWarnings("deprecation")
 public class MetricsTest {
 
     private static final double EPS = 0.000001;

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
index 9db18e2..9b6f686 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
@@ -102,6 +102,7 @@ public class FrequenciesTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void testUseWithMetrics() {
         MetricName name1 = name("1");
         MetricName name2 = name("2");
@@ -156,4 +157,4 @@ public class FrequenciesTest {
         return new Frequency(name(name), value);
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index 190fa3d..8d510f5 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -16,7 +16,11 @@
  */
 package org.apache.kafka.common.network;
 
+import static org.junit.Assert.assertEquals;
+
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
@@ -24,6 +28,8 @@ import org.apache.kafka.common.security.scram.ScramCredentialUtils;
 import org.apache.kafka.common.security.scram.ScramMechanism;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -36,6 +42,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Non-blocking EchoServer implementation that uses ChannelBuilder to create channels
@@ -43,6 +50,9 @@ import java.util.List;
  *
  */
 public class NioEchoServer extends Thread {
+
+    private static final double EPS = 0.0001;
+
     private final int port;
     private final ServerSocketChannel serverSocketChannel;
     private final List<SocketChannel> newChannels;
@@ -51,6 +61,7 @@ public class NioEchoServer extends Thread {
     private final Selector selector;
     private volatile WritableByteChannel outputChannel;
     private final CredentialCache credentialCache;
+    private final Metrics metrics;
 
     public NioEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig config,
             String serverHost, ChannelBuilder channelBuilder, CredentialCache credentialCache) throws Exception {
@@ -67,7 +78,8 @@ public class NioEchoServer extends Thread {
             ScramCredentialUtils.createCache(credentialCache, ScramMechanism.mechanismNames());
         if (channelBuilder == null)
             channelBuilder = ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialCache);
-        this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder, new LogContext());
+        this.metrics = new Metrics();
+        this.selector = new Selector(5000, metrics, new MockTime(), "MetricGroup", channelBuilder, new LogContext());
         acceptorThread = new AcceptorThread();
     }
 
@@ -79,6 +91,43 @@ public class NioEchoServer extends Thread {
         return credentialCache;
     }
 
+    @SuppressWarnings("deprecation")
+    public double metricValue(String name) {
+        for (Map.Entry<MetricName, KafkaMetric> entry : metrics.metrics().entrySet()) {
+            if (entry.getKey().name().equals(name))
+                return entry.getValue().value();
+        }
+        throw new IllegalStateException("Metric not found, " + name + ", found=" + metrics.metrics().keySet());
+    }
+
+    public void verifyAuthenticationMetrics(int successfulAuthentications, final int failedAuthentications)
+            throws InterruptedException {
+        waitForMetric("successful-authentication", successfulAuthentications);
+        waitForMetric("failed-authentication", failedAuthentications);
+    }
+
+    private void waitForMetric(String name, final double expectedValue) throws InterruptedException {
+        final String totalName = name + "-total";
+        final String rateName = name + "-rate";
+        if (expectedValue == 0.0) {
+            assertEquals(expectedValue, metricValue(totalName), EPS);
+            assertEquals(expectedValue, metricValue(rateName), EPS);
+        } else {
+            TestUtils.waitForCondition(new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return Math.abs(metricValue(totalName) - expectedValue) <= EPS;
+                }
+            }, "Metric not updated " + totalName);
+            TestUtils.waitForCondition(new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return metricValue(rateName) > 0.0;
+                }
+            }, "Metric not updated " + rateName);
+        }
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 8894873..be4dbc7 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -506,7 +506,7 @@ public class SelectorTest {
         // record void method invocations
         kafkaChannel.disconnect();
         kafkaChannel.close();
-        expect(kafkaChannel.ready()).andReturn(false);
+        expect(kafkaChannel.ready()).andReturn(false).anyTimes();
         // prepare throws an exception
         kafkaChannel.prepare();
         expectLastCall().andThrow(new IOException());

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 90c8cd5..440140b 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -105,6 +105,7 @@ public class SslTransportLayerTest {
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
         NetworkTestUtils.checkClientConnection(selector, node, 100, 10);
+        server.verifyAuthenticationMetrics(1, 0);
     }
 
     /**
@@ -230,6 +231,7 @@ public class SslTransportLayerTest {
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
         NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
+        server.verifyAuthenticationMetrics(0, 1);
     }
 
     /**
@@ -323,6 +325,7 @@ public class SslTransportLayerTest {
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
         NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
+        server.verifyAuthenticationMetrics(0, 1);
     }
 
     /**
@@ -343,6 +346,7 @@ public class SslTransportLayerTest {
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
         NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
+        server.verifyAuthenticationMetrics(0, 1);
     }
 
     /**
@@ -495,6 +499,7 @@ public class SslTransportLayerTest {
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
         NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
+        server.verifyAuthenticationMetrics(0, 1);
     }
 
     /**
@@ -512,6 +517,7 @@ public class SslTransportLayerTest {
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
         NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
+        server.verifyAuthenticationMetrics(0, 1);
     }
 
     /**
@@ -530,6 +536,7 @@ public class SslTransportLayerTest {
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
         NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
+        server.verifyAuthenticationMetrics(0, 1);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index b41db67..6df7c2d 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
 import org.easymock.EasyMock;
@@ -47,10 +49,12 @@ public class FileRecordsTest {
             "ijkl".getBytes()
     };
     private FileRecords fileRecords;
+    private Time time;
 
     @Before
     public void setup() throws IOException {
         this.fileRecords = createFileRecords(values);
+        this.time = new MockTime();
     }
 
     /**
@@ -310,7 +314,7 @@ public class FileRecordsTest {
         int start = fileRecords.searchForOffsetWithSize(1, 0).position;
         int size = batch.sizeInBytes();
         FileRecords slice = fileRecords.read(start, size - 1);
-        Records messageV0 = slice.downConvert(RecordBatch.MAGIC_VALUE_V0, 0);
+        Records messageV0 = slice.downConvert(RecordBatch.MAGIC_VALUE_V0, 0, time).records();
         assertTrue("No message should be there", batches(messageV0).isEmpty());
         assertEquals("There should be " + (size - 1) + " bytes", size - 1, messageV0.sizeInBytes());
     }
@@ -362,7 +366,7 @@ public class FileRecordsTest {
         try (FileRecords fileRecords = FileRecords.open(tempFile())) {
             fileRecords.append(MemoryRecords.readableRecords(buffer));
             fileRecords.flush();
-            Records convertedRecords = fileRecords.downConvert(toMagic, 0L);
+            Records convertedRecords = fileRecords.downConvert(toMagic, 0L, time).records();
             verifyConvertedRecords(records, offsets, convertedRecords, compressionType, toMagic);
 
             if (toMagic <= RecordBatch.MAGIC_VALUE_V1 && compressionType == CompressionType.NONE) {
@@ -371,7 +375,7 @@ public class FileRecordsTest {
                     firstOffset = 11L; // v1 record
                 else
                     firstOffset = 17; // v2 record
-                Records convertedRecords2 = fileRecords.downConvert(toMagic, firstOffset);
+                Records convertedRecords2 = fileRecords.downConvert(toMagic, firstOffset, time).records();
                 List<Long> filteredOffsets = new ArrayList<>(offsets);
                 List<SimpleRecord> filteredRecords = new ArrayList<>(records);
                 int index = filteredOffsets.indexOf(firstOffset) - 1;
@@ -380,7 +384,7 @@ public class FileRecordsTest {
                 verifyConvertedRecords(filteredRecords, filteredOffsets, convertedRecords2, compressionType, toMagic);
             } else {
                 // firstOffset doesn't have any effect in this case
-                Records convertedRecords2 = fileRecords.downConvert(toMagic, 10L);
+                Records convertedRecords2 = fileRecords.downConvert(toMagic, 10L, time).records();
                 verifyConvertedRecords(records, offsets, convertedRecords2, compressionType, toMagic);
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index cc2bf79..4df3492 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
@@ -30,6 +31,7 @@ import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -38,10 +40,12 @@ public class MemoryRecordsBuilderTest {
 
     private final CompressionType compressionType;
     private final int bufferOffset;
+    private final Time time;
 
     public MemoryRecordsBuilderTest(int bufferOffset, CompressionType compressionType) {
         this.bufferOffset = bufferOffset;
         this.compressionType = compressionType;
+        this.time = Time.SYSTEM;
     }
 
     @Test
@@ -456,7 +460,11 @@ public class MemoryRecordsBuilderTest {
 
         buffer.flip();
 
-        Records records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1, 0);
+        ConvertedRecords<MemoryRecords> convertedRecords = MemoryRecords.readableRecords(buffer)
+                .downConvert(RecordBatch.MAGIC_VALUE_V1, 0, time);
+        MemoryRecords records = convertedRecords.records();
+        verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(),
+                3, 2, records.sizeInBytes(), buffer.limit());
 
         List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator());
         if (compressionType != CompressionType.NONE) {
@@ -493,7 +501,11 @@ public class MemoryRecordsBuilderTest {
 
         buffer.flip();
 
-        Records records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1, 0);
+        ConvertedRecords<MemoryRecords> convertedRecords = MemoryRecords.readableRecords(buffer)
+                .downConvert(RecordBatch.MAGIC_VALUE_V1, 0, time);
+        MemoryRecords records = convertedRecords.records();
+        verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(), 3, 1,
+                records.sizeInBytes(), buffer.limit());
 
         List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator());
         if (compressionType != CompressionType.NONE) {
@@ -517,7 +529,10 @@ public class MemoryRecordsBuilderTest {
         assertEquals("2", utf8(logRecords.get(1).key()));
         assertEquals("3", utf8(logRecords.get(2).key()));
 
-        records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1, 2L);
+        convertedRecords = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1, 2L, time);
+        records = convertedRecords.records();
+        verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(), 3, 1,
+                records.sizeInBytes(), buffer.limit());
 
         batches = Utils.toList(records.batches().iterator());
         logRecords = Utils.toList(records.records().iterator());
@@ -619,4 +634,26 @@ public class MemoryRecordsBuilderTest {
         return values;
     }
 
+    private void verifyRecordsProcessingStats(RecordsProcessingStats processingStats, int recordCount, int convertedCount,
+            long finalBytes, long preConvertedBytes) {
+        assertNotNull("Records processing info is null", processingStats);
+        assertEquals(convertedCount, processingStats.conversionCount());
+        assertTrue("Processing time not recorded", processingStats.conversionTimeNanos() > 0);
+        long tempBytes = processingStats.temporaryMemoryBytes();
+        if (compressionType == CompressionType.NONE) {
+            if (convertedCount == 0)
+                assertEquals(finalBytes, tempBytes);
+            else if (convertedCount == recordCount)
+                assertEquals(preConvertedBytes + finalBytes, tempBytes);
+            else {
+                assertTrue(String.format("Unexpected temp bytes %d final %d pre %d", tempBytes, finalBytes, preConvertedBytes),
+                        tempBytes > finalBytes && tempBytes < finalBytes + preConvertedBytes);
+            }
+        } else {
+            long compressedBytes = finalBytes - Records.LOG_OVERHEAD - LegacyRecord.RECORD_OVERHEAD_V0;
+            assertTrue(String.format("Uncompressed size expected temp=%d, compressed=%d", tempBytes, compressedBytes),
+                    tempBytes > compressedBytes);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index c59f2c9..c2c8d81 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -126,6 +126,7 @@ public class SaslAuthenticatorTest {
 
         server = createEchoServer(securityProtocol);
         createAndCheckClientConnection(securityProtocol, node);
+        server.verifyAuthenticationMetrics(1, 0);
     }
 
     /**
@@ -139,6 +140,7 @@ public class SaslAuthenticatorTest {
 
         server = createEchoServer(securityProtocol);
         createAndCheckClientConnection(securityProtocol, node);
+        server.verifyAuthenticationMetrics(1, 0);
     }
 
     /**
@@ -153,6 +155,7 @@ public class SaslAuthenticatorTest {
 
         server = createEchoServer(securityProtocol);
         createAndCheckClientConnectionFailure(securityProtocol, node);
+        server.verifyAuthenticationMetrics(0, 1);
     }
 
     /**
@@ -167,6 +170,7 @@ public class SaslAuthenticatorTest {
 
         server = createEchoServer(securityProtocol);
         createAndCheckClientConnectionFailure(securityProtocol, node);
+        server.verifyAuthenticationMetrics(0, 1);
     }
 
     /**
@@ -268,6 +272,7 @@ public class SaslAuthenticatorTest {
         server = createEchoServer(securityProtocol);
         updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
         createAndCheckClientConnection(securityProtocol, "0");
+        server.verifyAuthenticationMetrics(1, 0);
     }
 
     /**
@@ -303,6 +308,7 @@ public class SaslAuthenticatorTest {
         server = createEchoServer(securityProtocol);
         updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
         createAndCheckClientConnectionFailure(securityProtocol, node);
+        server.verifyAuthenticationMetrics(0, 1);
     }
 
     /**
@@ -321,6 +327,7 @@ public class SaslAuthenticatorTest {
         server = createEchoServer(securityProtocol);
         updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
         createAndCheckClientConnectionFailure(securityProtocol, node);
+        server.verifyAuthenticationMetrics(0, 1);
     }
 
     /**
@@ -338,9 +345,11 @@ public class SaslAuthenticatorTest {
         String node = "1";
         saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
         createAndCheckClientConnectionFailure(securityProtocol, node);
+        server.verifyAuthenticationMetrics(0, 1);
 
         saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
         createAndCheckClientConnection(securityProtocol, "2");
+        server.verifyAuthenticationMetrics(1, 1);
     }
 
     /**
@@ -643,6 +652,7 @@ public class SaslAuthenticatorTest {
 
         server = createEchoServer(securityProtocol);
         createAndCheckClientConnectionFailure(securityProtocol, node);
+        server.verifyAuthenticationMetrics(0, 1);
     }
 
     /**
@@ -657,6 +667,7 @@ public class SaslAuthenticatorTest {
 
         server = createEchoServer(securityProtocol);
         createAndCheckClientConnectionFailure(securityProtocol, node);
+        server.verifyAuthenticationMetrics(0, 1);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
index 9d8be8d..4abff84 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
@@ -40,7 +40,6 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.kafka.common.security.scram.ScramMechanism.SCRAM_SHA_256;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class SaslServerAuthenticatorTest {
@@ -100,8 +99,8 @@ public class SaslServerAuthenticatorTest {
         try {
             authenticator.authenticate();
             fail("Expected authenticate() to raise an exception");
-        } catch (IOException e) {
-            assertTrue(e.getCause() instanceof IllegalSaslStateException);
+        } catch (IllegalSaslStateException e) {
+            // expected exception
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
index 8c8ba6d..eb01337 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -73,7 +73,7 @@ public class ConnectMetrics {
         reporters.add(new JmxReporter(JMX_PREFIX));
         this.metrics = new Metrics(metricConfig, reporters, time);
         LOG.debug("Registering Connect metrics with JMX for worker '{}'", workerId);
-        AppInfoParser.registerAppInfo(JMX_PREFIX, workerId);
+        AppInfoParser.registerAppInfo(JMX_PREFIX, workerId, metrics);
     }
 
     /**
@@ -164,7 +164,7 @@ public class ConnectMetrics {
     public void stop() {
         metrics.close();
         LOG.debug("Unregistering Connect metrics with JMX for worker '{}'", workerId);
-        AppInfoParser.unregisterAppInfo(JMX_PREFIX, workerId);
+        AppInfoParser.unregisterAppInfo(JMX_PREFIX, workerId, metrics);
     }
 
     public static class MetricGroupId {

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index d61a29c..6e19d11 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -133,7 +133,7 @@ public class WorkerGroupMember {
                     configStorage,
                     listener);
 
-            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
+            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
             log.debug("Connect group member created");
         } catch (Throwable t) {
             // call close methods if internal objects are already constructed
@@ -200,7 +200,7 @@ public class WorkerGroupMember {
         ClientUtils.closeQuietly(coordinator, "coordinator", firstException);
         ClientUtils.closeQuietly(metrics, "consumer metrics", firstException);
         ClientUtils.closeQuietly(client, "consumer network client", firstException);
-        AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
+        AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics);
         if (firstException.get() != null && !swallowException)
             throw new KafkaException("Failed to stop the Connect group member", firstException.get());
         else

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index bd193c7..306d64a 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -22,7 +22,7 @@ import joptsimple._
 import kafka.common.Config
 import kafka.common.InvalidConfigException
 import kafka.log.LogConfig
-import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig, QuotaId}
+import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
 import kafka.utils.{CommandLineUtils, ZkUtils}
 import kafka.utils.Implicits._
 import org.apache.kafka.common.security.JaasUtils

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 5db2c5d..4ea61ab 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -964,12 +964,12 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
 
   private def registerLogDirEventNotificationListener() = {
     debug("Registering logDirEventNotificationListener")
-    zkUtils.zkClient.subscribeChildChanges(ZkUtils.LogDirEventNotificationPath, logDirEventNotificationListener)
+    zkUtils.subscribeChildChanges(ZkUtils.LogDirEventNotificationPath, logDirEventNotificationListener)
   }
 
   private def deregisterLogDirEventNotificationListener() = {
     debug("De-registering logDirEventNotificationListener")
-    zkUtils.zkClient.unsubscribeChildChanges(ZkUtils.LogDirEventNotificationPath, logDirEventNotificationListener)
+    zkUtils.unsubscribeChildChanges(ZkUtils.LogDirEventNotificationPath, logDirEventNotificationListener)
   }
 
   private def readControllerEpochFromZookeeper() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index f32da4b..e6c774a 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -48,11 +48,11 @@ import java.util.regex.Pattern
 
 object LogAppendInfo {
   val UnknownLogAppendInfo = LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L,
-    NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
+    RecordsProcessingStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
 
   def unknownLogAppendInfoWithLogStartOffset(logStartOffset: Long): LogAppendInfo =
     LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
-      NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
+      RecordsProcessingStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
 }
 
 /**
@@ -65,6 +65,7 @@ object LogAppendInfo {
  * @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp.
  * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp
  * @param logStartOffset The start offset of the log at the time of this append.
+ * @param recordsProcessingStats Statistics collected during record processing, `null` if `assignOffsets` is `false`
  * @param sourceCodec The source codec used in the message set (send by the producer)
  * @param targetCodec The target codec of the message set(after applying the broker compression configuration if any)
  * @param shallowCount The number of shallow messages
@@ -77,6 +78,7 @@ case class LogAppendInfo(var firstOffset: Long,
                          var offsetOfMaxTimestamp: Long,
                          var logAppendTime: Long,
                          var logStartOffset: Long,
+                         var recordsProcessingStats: RecordsProcessingStats,
                          sourceCodec: CompressionCodec,
                          targetCodec: CompressionCodec,
                          shallowCount: Int,
@@ -617,6 +619,7 @@ class Log(@volatile var dir: File,
           val validateAndOffsetAssignResult = try {
             LogValidator.validateMessagesAndAssignOffsets(validRecords,
               offset,
+              time,
               now,
               appendInfo.sourceCodec,
               appendInfo.targetCodec,
@@ -633,6 +636,7 @@ class Log(@volatile var dir: File,
           appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
           appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
           appendInfo.lastOffset = offset.value - 1
+          appendInfo.recordsProcessingStats = validateAndOffsetAssignResult.recordsProcessingStats
           if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
             appendInfo.logAppendTime = now
 
@@ -868,8 +872,8 @@ class Log(@volatile var dir: File,
 
     // Apply broker-side compression if any
     val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
-    LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset, sourceCodec,
-      targetCodec, shallowMessageCount, validBytesCount, monotonic)
+    LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset,
+      RecordsProcessingStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
   }
 
   private def updateProducers(batch: RecordBatch,

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/log/LogValidator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index b6b20e3..15750e9 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -23,6 +23,7 @@ import kafka.message.{CompressionCodec, NoCompressionCodec}
 import kafka.utils.Logging
 import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedForMessageFormatException}
 import org.apache.kafka.common.record._
+import org.apache.kafka.common.utils.Time
 
 import scala.collection.mutable
 import scala.collection.JavaConverters._
@@ -46,6 +47,7 @@ private[kafka] object LogValidator extends Logging {
    */
   private[kafka] def validateMessagesAndAssignOffsets(records: MemoryRecords,
                                                       offsetCounter: LongRef,
+                                                      time: Time,
                                                       now: Long,
                                                       sourceCodec: CompressionCodec,
                                                       targetCodec: CompressionCodec,
@@ -58,14 +60,14 @@ private[kafka] object LogValidator extends Logging {
     if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
       // check the magic value
       if (!records.hasMatchingMagic(magic))
-        convertAndAssignOffsetsNonCompressed(records, offsetCounter, compactedTopic, now, timestampType,
+        convertAndAssignOffsetsNonCompressed(records, offsetCounter, compactedTopic, time, now, timestampType,
           timestampDiffMaxMs, magic, partitionLeaderEpoch, isFromClient)
       else
         // Do in-place validation, offset assignment and maybe set timestamp
         assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, timestampType, timestampDiffMaxMs,
           partitionLeaderEpoch, isFromClient, magic)
     } else {
-      validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, now, sourceCodec, targetCodec, compactedTopic,
+      validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, time, now, sourceCodec, targetCodec, compactedTopic,
         magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, isFromClient)
     }
   }
@@ -109,6 +111,7 @@ private[kafka] object LogValidator extends Logging {
   private def convertAndAssignOffsetsNonCompressed(records: MemoryRecords,
                                                    offsetCounter: LongRef,
                                                    compactedTopic: Boolean,
+                                                   time: Time,
                                                    now: Long,
                                                    timestampType: TimestampType,
                                                    timestampDiffMaxMs: Long,
@@ -137,12 +140,16 @@ private[kafka] object LogValidator extends Logging {
     }
 
     val convertedRecords = builder.build()
+
     val info = builder.info
+    val recordsProcessingStats = new RecordsProcessingStats(builder.uncompressedBytesWritten,
+      builder.numRecords, time.nanoseconds - now)
     ValidationAndOffsetAssignResult(
       validatedRecords = convertedRecords,
       maxTimestamp = info.maxTimestamp,
       shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp,
-      messageSizeMaybeChanged = true)
+      messageSizeMaybeChanged = true,
+      recordsProcessingStats = recordsProcessingStats)
   }
 
   private def assignOffsetsNonCompressed(records: MemoryRecords,
@@ -203,7 +210,8 @@ private[kafka] object LogValidator extends Logging {
       validatedRecords = records,
       maxTimestamp = maxTimestamp,
       shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp,
-      messageSizeMaybeChanged = false)
+      messageSizeMaybeChanged = false,
+      recordsProcessingStats = RecordsProcessingStats.EMPTY)
   }
 
   /**
@@ -215,6 +223,7 @@ private[kafka] object LogValidator extends Logging {
    */
   def validateMessagesAndAssignOffsetsCompressed(records: MemoryRecords,
                                                  offsetCounter: LongRef,
+                                                 time: Time,
                                                  now: Long,
                                                  sourceCodec: CompressionCodec,
                                                  targetCodec: CompressionCodec,
@@ -232,8 +241,11 @@ private[kafka] object LogValidator extends Logging {
       val expectedInnerOffset = new LongRef(0)
       val validatedRecords = new mutable.ArrayBuffer[Record]
 
+      var uncompressedSizeInBytes = 0
+
       for (batch <- records.batches.asScala) {
         validateBatch(batch, isFromClient, toMagic)
+        uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType())
 
         // Do not compress control records unless they are written compressed
         if (sourceCodec == NoCompressionCodec && batch.isControlBatch)
@@ -244,6 +256,8 @@ private[kafka] object LogValidator extends Logging {
           if (sourceCodec != NoCompressionCodec && record.isCompressed)
             throw new InvalidRecordException("Compressed outer record should not have an inner record with a " +
               s"compression attribute set: $record")
+
+          uncompressedSizeInBytes += record.sizeInBytes()
           if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
             // Check if we need to overwrite offset
             // No in place assignment situation 3
@@ -269,8 +283,9 @@ private[kafka] object LogValidator extends Logging {
           val first = records.batches.asScala.head
           (first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional)
         }
-        buildRecordsAndAssignOffsets(toMagic, offsetCounter, timestampType, CompressionType.forId(targetCodec.codec), now,
-          validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch)
+        buildRecordsAndAssignOffsets(toMagic, offsetCounter, time, timestampType, CompressionType.forId(targetCodec.codec), now,
+          validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch, isFromClient,
+          uncompressedSizeInBytes)
       } else {
         // we can update the batch only and write the compressed payload as is
         val batch = records.batches.iterator.next()
@@ -287,15 +302,18 @@ private[kafka] object LogValidator extends Logging {
         if (toMagic >= RecordBatch.MAGIC_VALUE_V2)
           batch.setPartitionLeaderEpoch(partitionLeaderEpoch)
 
+        val recordsProcessingStats = new RecordsProcessingStats(uncompressedSizeInBytes, 0, -1)
         ValidationAndOffsetAssignResult(validatedRecords = records,
           maxTimestamp = maxTimestamp,
           shallowOffsetOfMaxTimestamp = lastOffset,
-          messageSizeMaybeChanged = false)
+          messageSizeMaybeChanged = false,
+          recordsProcessingStats = recordsProcessingStats)
       }
   }
 
   private def buildRecordsAndAssignOffsets(magic: Byte,
                                            offsetCounter: LongRef,
+                                           time: Time,
                                            timestampType: TimestampType,
                                            compressionType: CompressionType,
                                            logAppendTime: Long,
@@ -304,7 +322,10 @@ private[kafka] object LogValidator extends Logging {
                                            producerEpoch: Short,
                                            baseSequence: Int,
                                            isTransactional: Boolean,
-                                           partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
+                                           partitionLeaderEpoch: Int,
+                                           isFromClient: Boolean,
+                                           uncompresssedSizeInBytes: Int): ValidationAndOffsetAssignResult = {
+    val startNanos = time.nanoseconds
     val estimatedSize = AbstractRecords.estimateSizeInBytes(magic, offsetCounter.value, compressionType,
       validatedRecords.asJava)
     val buffer = ByteBuffer.allocate(estimatedSize)
@@ -316,13 +337,23 @@ private[kafka] object LogValidator extends Logging {
     }
 
     val records = builder.build()
+
     val info = builder.info
 
+    // This is not strictly correct, it represents the number of records where in-place assignment is not possible
+    // instead of the number of records that were converted. It will over-count cases where the source and target are
+    // message format V0 or if the inner offsets are not consecutive. This is OK since the impact is the same: we have
+    // to rebuild the records (including recompression if enabled).
+    val conversionCount = builder.numRecords
+    val recordsProcessingStats = new RecordsProcessingStats(uncompresssedSizeInBytes + builder.uncompressedBytesWritten,
+      conversionCount, time.nanoseconds - startNanos)
+
     ValidationAndOffsetAssignResult(
       validatedRecords = records,
       maxTimestamp = info.maxTimestamp,
       shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp,
-      messageSizeMaybeChanged = true)
+      messageSizeMaybeChanged = true,
+      recordsProcessingStats = recordsProcessingStats)
   }
 
   private def validateKey(record: Record, compactedTopic: Boolean) {
@@ -352,6 +383,7 @@ private[kafka] object LogValidator extends Logging {
   case class ValidationAndOffsetAssignResult(validatedRecords: MemoryRecords,
                                              maxTimestamp: Long,
                                              shallowOffsetOfMaxTimestamp: Long,
-                                             messageSizeMaybeChanged: Boolean)
+                                             messageSizeMaybeChanged: Boolean,
+                                             recordsProcessingStats: RecordsProcessingStats)
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index 1894213..b9ab486 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -40,19 +40,22 @@ trait KafkaMetricsGroup extends Logging {
    * @param tags Additional attributes which mBean will have.
    * @return Sanitized metric name object.
    */
-  private def metricName(name: String, tags: scala.collection.Map[String, String]) = {
+  protected def metricName(name: String, tags: scala.collection.Map[String, String]): MetricName = {
     val klass = this.getClass
     val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName
     val simpleName = klass.getSimpleName.replaceAll("\\$$", "")
-    // Tags may contain ipv6 address with ':', which is not valid in JMX ObjectName
-    def quoteIfRequired(value: String) = if (value.contains(':')) ObjectName.quote(value) else value
-    val metricTags = tags.map(kv => (kv._1, quoteIfRequired(kv._2)))
 
-    explicitMetricName(pkg, simpleName, name, metricTags)
+    explicitMetricName(pkg, simpleName, name, tags)
   }
 
 
-  private def explicitMetricName(group: String, typeName: String, name: String, tags: scala.collection.Map[String, String]) = {
+  protected def explicitMetricName(group: String, typeName: String, name: String,
+      tags: scala.collection.Map[String, String]): MetricName = {
+
+    // Tags may contain ipv6 address with ':', which is not valid in JMX ObjectName
+    def quoteIfRequired(value: String) = if (value.contains(':')) ObjectName.quote(value) else value
+    val metricTags = tags.map(kv => (kv._1, quoteIfRequired(kv._2)))
+
     val nameBuilder: StringBuilder = new StringBuilder
 
     nameBuilder.append(group)

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index b2ef615..e5f115c 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -21,19 +21,20 @@ import java.net.InetAddress
 import java.nio.ByteBuffer
 import java.util.concurrent._
 
-import com.yammer.metrics.core.Gauge
+import com.yammer.metrics.core.{Gauge, Meter}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.network.RequestChannel.{BaseRequest, SendAction, ShutdownRequest, NoOpAction, CloseConnectionAction}
 import kafka.utils.{Logging, NotNothing}
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.metrics.Sanitizer
 import org.apache.kafka.common.network.Send
-import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Time
 import org.apache.log4j.Logger
 
+import scala.collection.mutable
 import scala.reflect.ClassTag
 
 object RequestChannel extends Logging {
@@ -60,6 +61,8 @@ object RequestChannel extends Logging {
     @volatile var responseCompleteTimeNanos = -1L
     @volatile var responseDequeueTimeNanos = -1L
     @volatile var apiRemoteCompleteTimeNanos = -1L
+    @volatile var messageConversionsTimeNanos = 0L
+    @volatile var temporaryMemoryBytes = 0L
     @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
     val session = Session(context.principal, context.clientAddress)
@@ -121,6 +124,7 @@ object RequestChannel extends Logging {
       val apiThrottleTimeMs = nanosToMs(responseCompleteTimeNanos - apiRemoteCompleteTimeNanos)
       val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos)
       val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos)
+      val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos)
       val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)
       val fetchMetricNames =
         if (header.apiKey == ApiKeys.FETCH) {
@@ -133,7 +137,7 @@ object RequestChannel extends Logging {
         else Seq.empty
       val metricNames = fetchMetricNames :+ header.apiKey.name
       metricNames.foreach { metricName =>
-        val m = RequestMetrics.metricsMap(metricName)
+        val m = RequestMetrics(metricName)
         m.requestRate.mark()
         m.requestQueueTimeHist.update(Math.round(requestQueueTimeMs))
         m.localTimeHist.update(Math.round(apiLocalTimeMs))
@@ -142,6 +146,9 @@ object RequestChannel extends Logging {
         m.responseQueueTimeHist.update(Math.round(responseQueueTimeMs))
         m.responseSendTimeHist.update(Math.round(responseSendTimeMs))
         m.totalTimeHist.update(Math.round(totalTimeMs))
+        m.requestBytesHist.update(sizeOfBodyInBytes)
+        m.messageConversionsTimeHist.foreach(_.update(Math.round(messageConversionsTimeMs)))
+        m.tempMemoryBytesHist.foreach(_.update(temporaryMemoryBytes))
       }
 
       // Records network handler thread usage. This is included towards the request quota for the
@@ -171,6 +178,10 @@ object RequestChannel extends Logging {
           .append(",securityProtocol:").append(context.securityProtocol)
           .append(",principal:").append(session.principal)
           .append(",listener:").append(context.listenerName.value)
+        if (temporaryMemoryBytes > 0)
+          builder.append(",temporaryMemoryBytes:").append(temporaryMemoryBytes)
+        if (messageConversionsTimeMs > 0)
+          builder.append(",messageConversionsTime:").append(messageConversionsTimeMs)
         requestLogger.debug(builder.toString)
       }
     }
@@ -281,6 +292,12 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
     responseListeners ::= onResponse
   }
 
+  def updateErrorMetrics(apiKey: ApiKeys, errors: collection.Map[Errors, Integer]) {
+    errors.foreach { case (error, count) =>
+      RequestMetrics.markErrorMeter(apiKey.name, error, count)
+    }
+  }
+
   def shutdown() {
     requestQueue.clear()
   }
@@ -290,11 +307,30 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
 }
 
 object RequestMetrics {
-  val metricsMap = new scala.collection.mutable.HashMap[String, RequestMetrics]
+
+  private val metricsMap = mutable.Map[String, RequestMetrics]()
+
   val consumerFetchMetricName = ApiKeys.FETCH.name + "Consumer"
   val followFetchMetricName = ApiKeys.FETCH.name + "Follower"
-  (ApiKeys.values().toList.map(e => e.name)
-    ++ List(consumerFetchMetricName, followFetchMetricName)).foreach(name => metricsMap.put(name, new RequestMetrics(name)))
+
+  (ApiKeys.values.toSeq.map(_.name) ++ Seq(consumerFetchMetricName, followFetchMetricName)).foreach { name =>
+    metricsMap.put(name, new RequestMetrics(name))
+  }
+
+  def apply(metricName: String) = metricsMap(metricName)
+
+  def markErrorMeter(name: String, error: Errors, count: Int) {
+      val errorMeter = metricsMap(name).errorMeters(error)
+      errorMeter.getOrCreateMeter().mark(count.toLong)
+  }
+
+  // Used for testing until these metrics are moved to a class
+  private[kafka] def clearErrorMeters(): Unit = {
+    metricsMap.values.foreach { requestMetrics =>
+      requestMetrics.errorMeters.values.foreach(_.removeMeter())
+    }
+  }
+
 }
 
 class RequestMetrics(name: String) extends KafkaMetricsGroup {
@@ -306,11 +342,58 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup {
   val localTimeHist = newHistogram("LocalTimeMs", biased = true, tags)
   // time a request takes to wait on remote brokers (currently only relevant to fetch and produce requests)
   val remoteTimeHist = newHistogram("RemoteTimeMs", biased = true, tags)
-  // time a request is throttled (only relevant to fetch and produce requests)
+  // time a request is throttled
   val throttleTimeHist = newHistogram("ThrottleTimeMs", biased = true, tags)
   // time a response spent in a response queue
   val responseQueueTimeHist = newHistogram("ResponseQueueTimeMs", biased = true, tags)
   // time to send the response to the requester
   val responseSendTimeHist = newHistogram("ResponseSendTimeMs", biased = true, tags)
   val totalTimeHist = newHistogram("TotalTimeMs", biased = true, tags)
+  // request size in bytes
+  val requestBytesHist = newHistogram("RequestBytes", biased = true, tags)
+  // time for message conversions (only relevant to fetch and produce requests)
+  val messageConversionsTimeHist =
+    if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name)
+      Some(newHistogram("MessageConversionsTimeMs", biased = true, tags))
+    else
+      None
+  // Temporary memory allocated for processing request (only populated for fetch and produce requests)
+  // This shows the memory allocated for compression/conversions excluding the actual request size
+  val tempMemoryBytesHist =
+    if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name)
+      Some(newHistogram("TemporaryMemoryBytes", biased = true, tags))
+    else
+      None
+
+  private val errorMeters = mutable.Map[Errors, ErrorMeter]()
+  Errors.values.foreach(error => errorMeters.put(error, new ErrorMeter(name, error)))
+
+  class ErrorMeter(name: String, error: Errors) {
+    private val tags = Map("request" -> name, "error" -> error.name)
+
+    @volatile private var meter: Meter = null
+
+    def getOrCreateMeter(): Meter = {
+      if (meter != null)
+        meter
+      else {
+        synchronized {
+          if (meter == null)
+             meter = newMeter("ErrorsPerSec", "requests", TimeUnit.SECONDS, tags)
+          meter
+        }
+      }
+    }
+
+    // This is currently used only in tests.
+    def removeMeter(): Unit = {
+      synchronized {
+        if (meter != null) {
+          removeMetric("ErrorsPerSec", tags)
+          meter = null
+        }
+      }
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/server/AdminManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index fd31fc4..2c83c1b 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -25,7 +25,7 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
 import org.apache.kafka.clients.admin.NewPartitions
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource}
-import org.apache.kafka.common.errors.{ApiException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, InvalidTopicException, PolicyViolationException, ReassignmentInProgressException, UnknownTopicOrPartitionException}
+import org.apache.kafka.common.errors.{ApiException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, PolicyViolationException, ReassignmentInProgressException, UnknownTopicOrPartitionException}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index c84fbcb..afaa5dd 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -16,8 +16,6 @@
  */
 package kafka.server
 
-import java.net.{URLEncoder, URLDecoder}
-import java.nio.charset.StandardCharsets
 import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit}
 import java.util.concurrent.locks.ReentrantReadWriteLock