You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/06/08 18:50:32 UTC

kafka git commit: KAFKA-5357 follow-up: Yammer metrics, not Kafka Metrics

Repository: kafka
Updated Branches:
  refs/heads/trunk eff79849a -> b4b65e9ea


KAFKA-5357 follow-up: Yammer metrics, not Kafka Metrics

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Damian Guy <da...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #3242 from guozhangwang/K5357-yammer-metrics


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b4b65e9e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b4b65e9e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b4b65e9e

Branch: refs/heads/trunk
Commit: b4b65e9eafc45cadca093b023361bd15e77ccb21
Parents: eff7984
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu Jun 8 11:50:29 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Jun 8 11:50:29 2017 -0700

----------------------------------------------------------------------
 .../TransactionMarkerChannelManager.scala       | 66 ++++++++++----------
 .../TransactionMarkerChannelManagerTest.scala   | 45 +++++++------
 2 files changed, 58 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b4b65e9e/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index d3f65a0..63a382f 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -17,17 +17,13 @@
 package kafka.coordinator.transaction
 
 
-import java.util
-import java.util.Collections
-import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
-
 import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import kafka.metrics.KafkaMetricsGroup
 import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache}
 import kafka.utils.Logging
 import org.apache.kafka.clients._
 import org.apache.kafka.common.{Node, TopicPartition}
-import org.apache.kafka.common.metrics.{Metrics, Sensor}
-import org.apache.kafka.common.metrics.stats.Total
+import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network._
 import org.apache.kafka.common.requests.{TransactionResult, WriteTxnMarkersRequest}
 import org.apache.kafka.common.security.JaasContext
@@ -35,6 +31,11 @@ import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry
 
+import com.yammer.metrics.core.Gauge
+
+import java.util
+import java.util.concurrent.{BlockingQueue, ConcurrentHashMap, LinkedBlockingQueue}
+
 import collection.JavaConverters._
 import scala.collection.{concurrent, immutable, mutable}
 
@@ -84,8 +85,7 @@ object TransactionMarkerChannelManager {
       networkClient,
       txnStateManager,
       txnMarkerPurgatory,
-      time,
-      metrics
+      time
     )
   }
 
@@ -117,7 +117,7 @@ class TxnMarkerQueue(@volatile private var destination: Node) {
 
   def node: Node = destination
 
-  def totalNumMarkers(): Int = markersPerTxnTopicPartition.map { case(_, queue) => queue.size()}.sum
+  def totalNumMarkers: Int = markersPerTxnTopicPartition.map { case(_, queue) => queue.size()}.sum
 
   // visible for testing
   def totalNumMarkers(txnTopicPartition: Int): Int = markersPerTxnTopicPartition.get(txnTopicPartition).fold(0)(_.size())
@@ -128,22 +128,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
                                       networkClient: NetworkClient,
                                       txnStateManager: TransactionStateManager,
                                       txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker],
-                                      time: Time,
-                                      metrics:Metrics) extends Logging {
-
-  private def addQueueLengthMetric(sensor: Sensor, namePrefix: String, description: String): Unit = {
-    sensor.add(metrics.metricName(s"$namePrefix-queue-length",
-      "transaction-marker-channel-metrics",
-      description,
-      Collections.singletonMap("broker-id",config.brokerId.toString)),
-      new Total())
-  }
-
-  private val markersQueuePerBroker: concurrent.Map[Int, TxnMarkerQueue] = concurrent.TrieMap.empty[Int, TxnMarkerQueue]
-
-  private val markersQueueForUnknownBroker = new TxnMarkerQueue(Node.noNode)
-  private val unknownBrokerQueueSensor = metrics.sensor("unknown-broker-queue")
-  addQueueLengthMetric(unknownBrokerQueueSensor, "unknown-broker", "the number of WriteTxnMarker requests with unknown brokers")
+                                      time: Time) extends Logging with KafkaMetricsGroup {
 
   private val interBrokerListenerName: ListenerName = config.interBrokerListenerName
 
@@ -151,10 +136,27 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
     new InterBrokerSendThread("TxnMarkerSenderThread-" + config.brokerId, networkClient, drainQueuedTransactionMarkers, time)
   }
 
+  private val markersQueuePerBroker: concurrent.Map[Int, TxnMarkerQueue] = new ConcurrentHashMap[Int, TxnMarkerQueue]().asScala
+
+  private val markersQueueForUnknownBroker = new TxnMarkerQueue(Node.noNode)
+
   private val txnLogAppendRetryQueue = new LinkedBlockingQueue[TxnLogAppend]()
 
-  private val txnLogAppendRetryQueueSensor = metrics.sensor("txn-log-append-retry-queue")
-  addQueueLengthMetric(txnLogAppendRetryQueueSensor, "txn-log-append-retry", "the number of txn log appends that need to be retried")
+  newGauge(
+    "UnknownDestinationQueueSize",
+    new Gauge[Int] {
+      def value: Int = markersQueueForUnknownBroker.totalNumMarkers
+    },
+    Map("broker-id" -> config.brokerId.toString)
+  )
+
+  newGauge(
+    "CompleteTxnLogAppendQueueSize",
+    new Gauge[Int] {
+      def value: Int = txnLogAppendRetryQueue.size
+    },
+    Map("broker-id" -> config.brokerId.toString)
+  )
 
   def start(): Unit = {
     txnMarkerSendThread.start()
@@ -195,7 +197,6 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
   def retryLogAppends(): Unit = {
     val txnLogAppendRetries: java.util.List[TxnLogAppend] = new util.ArrayList[TxnLogAppend]()
     txnLogAppendRetryQueue.drainTo(txnLogAppendRetries)
-    txnLogAppendRetryQueueSensor.record(-txnLogAppendRetries.size())
     debug(s"retrying: ${txnLogAppendRetries.size} transaction log appends")
     txnLogAppendRetries.asScala.foreach { txnLogAppend =>
       tryAppendToLog(txnLogAppend)
@@ -210,7 +211,6 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
       queue.drainTo(txnIdAndMarkerEntries)
     }
 
-    unknownBrokerQueueSensor.record(-txnIdAndMarkerEntries.size())
     for (txnIdAndMarker: TxnIdAndMarkerEntry <- txnIdAndMarkerEntries.asScala) {
       val transactionalId = txnIdAndMarker.txnId
       val producerId = txnIdAndMarker.txnMarkerEntry.producerId
@@ -222,7 +222,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
       addTxnMarkersToBrokerQueue(transactionalId, producerId, producerEpoch, txnResult, coordinatorEpoch, topicPartitions)
     }
 
-    markersQueuePerBroker.map { case (brokerId: Int, brokerRequestQueue: TxnMarkerQueue) =>
+    markersQueuePerBroker.map { case (_, brokerRequestQueue: TxnMarkerQueue) =>
       val txnIdAndMarkerEntries: java.util.List[TxnIdAndMarkerEntry] = new util.ArrayList[TxnIdAndMarkerEntry]()
       brokerRequestQueue.forEachTxnTopicPartition { case (_, queue) =>
         queue.drainTo(txnIdAndMarkerEntries)
@@ -284,7 +284,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
 
   private def tryAppendToLog(txnLogAppend: TxnLogAppend) = {
     // try to append to the transaction log
-    def retryAppendCallback(error: Errors): Unit =
+    def appendCallback(error: Errors): Unit =
       error match {
         case Errors.NONE =>
           trace(s"Completed transaction for ${txnLogAppend.transactionalId} with coordinator epoch ${txnLogAppend.coordinatorEpoch}, final state: state after commit: ${txnLogAppend.txnMetadata.state}")
@@ -294,7 +294,6 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
 
         case Errors.COORDINATOR_NOT_AVAILABLE =>
           warn(s"Failed updating transaction state for ${txnLogAppend.transactionalId} when appending to transaction log due to ${error.exceptionName}. retrying")
-          txnLogAppendRetryQueueSensor.record(1)
           // enqueue for retry
           txnLogAppendRetryQueue.add(txnLogAppend)
 
@@ -302,7 +301,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
           throw new IllegalStateException(s"Unexpected error ${errors.exceptionName} while appending to transaction log for ${txnLogAppend.transactionalId}")
       }
 
-    txnStateManager.appendTransactionToLog(txnLogAppend.transactionalId, txnLogAppend.coordinatorEpoch, txnLogAppend.newMetadata, retryAppendCallback)
+    txnStateManager.appendTransactionToLog(txnLogAppend.transactionalId, txnLogAppend.coordinatorEpoch, txnLogAppend.newMetadata, appendCallback)
   }
 
   def addTxnMarkersToBrokerQueue(transactionalId: String, producerId: Long, producerEpoch: Short,
@@ -320,7 +319,6 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
           val txnIdAndMarker = TxnIdAndMarkerEntry(transactionalId, marker)
 
           if (brokerNode == Node.noNode) {
-            unknownBrokerQueueSensor.record(1)
             // if the leader of the partition is known but node not available, put it into an unknown broker queue
             // and let the sender thread to look for its broker and migrate them later
             markersQueueForUnknownBroker.addMarkers(txnTopicPartition, txnIdAndMarker)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b4b65e9e/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index e390ecd..5e328ae 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -20,14 +20,16 @@ import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache}
 import kafka.utils.timer.MockTimer
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.NetworkClient
-import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.requests.{TransactionResult, WriteTxnMarkersRequest}
 import org.apache.kafka.common.utils.{MockTime, Utils}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.easymock.EasyMock
 import org.junit.Assert._
-import org.junit.{After, Test}
+import org.junit.Test
 
+import com.yammer.metrics.Metrics
+
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 class TransactionMarkerChannelManagerTest {
@@ -60,23 +62,16 @@ class TransactionMarkerChannelManagerTest {
     reaperEnabled = false)
   private val time = new MockTime
 
-  private val metrics = new Metrics()
   private val channelManager = new TransactionMarkerChannelManager(
     KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:2181")),
     metadataCache,
     networkClient,
     txnStateManager,
     txnMarkerPurgatory,
-    time,
-    metrics)
+    time)
 
   private val senderThread = channelManager.senderThread
 
-  @After
-  def after(): Unit = {
-    metrics.close()
-  }
-
   private def mockCache(): Unit = {
     EasyMock.expect(txnStateManager.partitionFor(transactionalId1))
       .andReturn(txnTopicPartition1)
@@ -120,10 +115,10 @@ class TransactionMarkerChannelManagerTest {
     channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnMetadata2.prepareComplete(time.milliseconds()))
 
     assertEquals(2, txnMarkerPurgatory.watched)
-    assertEquals(2, channelManager.queueForBroker(broker1.id).get.totalNumMarkers())
+    assertEquals(2, channelManager.queueForBroker(broker1.id).get.totalNumMarkers)
     assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers(txnTopicPartition1))
     assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers(txnTopicPartition2))
-    assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers())
+    assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers)
     assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition1))
     assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition2))
 
@@ -162,7 +157,7 @@ class TransactionMarkerChannelManagerTest {
     channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnMetadata2.prepareComplete(time.milliseconds()))
 
     assertEquals(1, txnMarkerPurgatory.watched)
-    assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers())
+    assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers)
     assertTrue(channelManager.queueForBroker(broker1.id).isEmpty)
     assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition1))
     assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition2))
@@ -194,11 +189,11 @@ class TransactionMarkerChannelManagerTest {
     channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnMetadata2.prepareComplete(time.milliseconds()))
 
     assertEquals(2, txnMarkerPurgatory.watched)
-    assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers())
+    assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers)
     assertTrue(channelManager.queueForBroker(broker1.id).isEmpty)
     assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition1))
     assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition2))
-    assertEquals(2, channelManager.queueForUnknownBroker.totalNumMarkers())
+    assertEquals(2, channelManager.queueForUnknownBroker.totalNumMarkers)
     assertEquals(1, channelManager.queueForUnknownBroker.totalNumMarkers(txnTopicPartition1))
     assertEquals(1, channelManager.queueForUnknownBroker.totalNumMarkers(txnTopicPartition2))
 
@@ -242,21 +237,33 @@ class TransactionMarkerChannelManagerTest {
     channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnMetadata2.prepareComplete(time.milliseconds()))
 
     assertEquals(2, txnMarkerPurgatory.watched)
-    assertEquals(2, channelManager.queueForBroker(broker1.id).get.totalNumMarkers())
+    assertEquals(2, channelManager.queueForBroker(broker1.id).get.totalNumMarkers)
     assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers(txnTopicPartition1))
     assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers(txnTopicPartition2))
-    assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers())
+    assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers)
     assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition1))
     assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition2))
 
     channelManager.removeMarkersForTxnTopicPartition(txnTopicPartition1)
 
     assertEquals(1, txnMarkerPurgatory.watched)
-    assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers())
+    assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers)
     assertEquals(0, channelManager.queueForBroker(broker1.id).get.totalNumMarkers(txnTopicPartition1))
     assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers(txnTopicPartition2))
-    assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers())
+    assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers)
     assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition1))
     assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition2))
   }
+
+  @Test
+  def shouldCreateMetricsOnStarting(): Unit = {
+    val metrics = Metrics.defaultRegistry.allMetrics
+
+    assertEquals(1, Metrics.defaultRegistry.allMetrics.asScala
+      .filterKeys(_.getMBeanName == "kafka.coordinator.transaction:type=TransactionMarkerChannelManager,name=UnknownDestinationQueueSize,broker-id=1")
+      .size)
+    assertEquals(1, Metrics.defaultRegistry.allMetrics.asScala
+      .filterKeys(_.getMBeanName == "kafka.coordinator.transaction:type=TransactionMarkerChannelManager,name=CompleteTxnLogAppendQueueSize,broker-id=1")
+      .size)
+  }
 }