You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/06/08 18:50:32 UTC
kafka git commit: KAFKA-5357 follow-up: Yammer metrics,
not Kafka Metrics
Repository: kafka
Updated Branches:
refs/heads/trunk eff79849a -> b4b65e9ea
KAFKA-5357 follow-up: Yammer metrics, not Kafka Metrics
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Damian Guy <da...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #3242 from guozhangwang/K5357-yammer-metrics
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b4b65e9e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b4b65e9e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b4b65e9e
Branch: refs/heads/trunk
Commit: b4b65e9eafc45cadca093b023361bd15e77ccb21
Parents: eff7984
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu Jun 8 11:50:29 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Jun 8 11:50:29 2017 -0700
----------------------------------------------------------------------
.../TransactionMarkerChannelManager.scala | 66 ++++++++++----------
.../TransactionMarkerChannelManagerTest.scala | 45 +++++++------
2 files changed, 58 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b4b65e9e/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index d3f65a0..63a382f 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -17,17 +17,13 @@
package kafka.coordinator.transaction
-import java.util
-import java.util.Collections
-import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
-
import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import kafka.metrics.KafkaMetricsGroup
import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache}
import kafka.utils.Logging
import org.apache.kafka.clients._
import org.apache.kafka.common.{Node, TopicPartition}
-import org.apache.kafka.common.metrics.{Metrics, Sensor}
-import org.apache.kafka.common.metrics.stats.Total
+import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network._
import org.apache.kafka.common.requests.{TransactionResult, WriteTxnMarkersRequest}
import org.apache.kafka.common.security.JaasContext
@@ -35,6 +31,11 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry
+import com.yammer.metrics.core.Gauge
+
+import java.util
+import java.util.concurrent.{BlockingQueue, ConcurrentHashMap, LinkedBlockingQueue}
+
import collection.JavaConverters._
import scala.collection.{concurrent, immutable, mutable}
@@ -84,8 +85,7 @@ object TransactionMarkerChannelManager {
networkClient,
txnStateManager,
txnMarkerPurgatory,
- time,
- metrics
+ time
)
}
@@ -117,7 +117,7 @@ class TxnMarkerQueue(@volatile private var destination: Node) {
def node: Node = destination
- def totalNumMarkers(): Int = markersPerTxnTopicPartition.map { case(_, queue) => queue.size()}.sum
+ def totalNumMarkers: Int = markersPerTxnTopicPartition.map { case(_, queue) => queue.size()}.sum
// visible for testing
def totalNumMarkers(txnTopicPartition: Int): Int = markersPerTxnTopicPartition.get(txnTopicPartition).fold(0)(_.size())
@@ -128,22 +128,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
networkClient: NetworkClient,
txnStateManager: TransactionStateManager,
txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker],
- time: Time,
- metrics:Metrics) extends Logging {
-
- private def addQueueLengthMetric(sensor: Sensor, namePrefix: String, description: String): Unit = {
- sensor.add(metrics.metricName(s"$namePrefix-queue-length",
- "transaction-marker-channel-metrics",
- description,
- Collections.singletonMap("broker-id",config.brokerId.toString)),
- new Total())
- }
-
- private val markersQueuePerBroker: concurrent.Map[Int, TxnMarkerQueue] = concurrent.TrieMap.empty[Int, TxnMarkerQueue]
-
- private val markersQueueForUnknownBroker = new TxnMarkerQueue(Node.noNode)
- private val unknownBrokerQueueSensor = metrics.sensor("unknown-broker-queue")
- addQueueLengthMetric(unknownBrokerQueueSensor, "unknown-broker", "the number of WriteTxnMarker requests with unknown brokers")
+ time: Time) extends Logging with KafkaMetricsGroup {
private val interBrokerListenerName: ListenerName = config.interBrokerListenerName
@@ -151,10 +136,27 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
new InterBrokerSendThread("TxnMarkerSenderThread-" + config.brokerId, networkClient, drainQueuedTransactionMarkers, time)
}
+ private val markersQueuePerBroker: concurrent.Map[Int, TxnMarkerQueue] = new ConcurrentHashMap[Int, TxnMarkerQueue]().asScala
+
+ private val markersQueueForUnknownBroker = new TxnMarkerQueue(Node.noNode)
+
private val txnLogAppendRetryQueue = new LinkedBlockingQueue[TxnLogAppend]()
- private val txnLogAppendRetryQueueSensor = metrics.sensor("txn-log-append-retry-queue")
- addQueueLengthMetric(txnLogAppendRetryQueueSensor, "txn-log-append-retry", "the number of txn log appends that need to be retried")
+ newGauge(
+ "UnknownDestinationQueueSize",
+ new Gauge[Int] {
+ def value: Int = markersQueueForUnknownBroker.totalNumMarkers
+ },
+ Map("broker-id" -> config.brokerId.toString)
+ )
+
+ newGauge(
+ "CompleteTxnLogAppendQueueSize",
+ new Gauge[Int] {
+ def value: Int = txnLogAppendRetryQueue.size
+ },
+ Map("broker-id" -> config.brokerId.toString)
+ )
def start(): Unit = {
txnMarkerSendThread.start()
@@ -195,7 +197,6 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
def retryLogAppends(): Unit = {
val txnLogAppendRetries: java.util.List[TxnLogAppend] = new util.ArrayList[TxnLogAppend]()
txnLogAppendRetryQueue.drainTo(txnLogAppendRetries)
- txnLogAppendRetryQueueSensor.record(-txnLogAppendRetries.size())
debug(s"retrying: ${txnLogAppendRetries.size} transaction log appends")
txnLogAppendRetries.asScala.foreach { txnLogAppend =>
tryAppendToLog(txnLogAppend)
@@ -210,7 +211,6 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
queue.drainTo(txnIdAndMarkerEntries)
}
- unknownBrokerQueueSensor.record(-txnIdAndMarkerEntries.size())
for (txnIdAndMarker: TxnIdAndMarkerEntry <- txnIdAndMarkerEntries.asScala) {
val transactionalId = txnIdAndMarker.txnId
val producerId = txnIdAndMarker.txnMarkerEntry.producerId
@@ -222,7 +222,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
addTxnMarkersToBrokerQueue(transactionalId, producerId, producerEpoch, txnResult, coordinatorEpoch, topicPartitions)
}
- markersQueuePerBroker.map { case (brokerId: Int, brokerRequestQueue: TxnMarkerQueue) =>
+ markersQueuePerBroker.map { case (_, brokerRequestQueue: TxnMarkerQueue) =>
val txnIdAndMarkerEntries: java.util.List[TxnIdAndMarkerEntry] = new util.ArrayList[TxnIdAndMarkerEntry]()
brokerRequestQueue.forEachTxnTopicPartition { case (_, queue) =>
queue.drainTo(txnIdAndMarkerEntries)
@@ -284,7 +284,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
private def tryAppendToLog(txnLogAppend: TxnLogAppend) = {
// try to append to the transaction log
- def retryAppendCallback(error: Errors): Unit =
+ def appendCallback(error: Errors): Unit =
error match {
case Errors.NONE =>
trace(s"Completed transaction for ${txnLogAppend.transactionalId} with coordinator epoch ${txnLogAppend.coordinatorEpoch}, final state: state after commit: ${txnLogAppend.txnMetadata.state}")
@@ -294,7 +294,6 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
case Errors.COORDINATOR_NOT_AVAILABLE =>
warn(s"Failed updating transaction state for ${txnLogAppend.transactionalId} when appending to transaction log due to ${error.exceptionName}. retrying")
- txnLogAppendRetryQueueSensor.record(1)
// enqueue for retry
txnLogAppendRetryQueue.add(txnLogAppend)
@@ -302,7 +301,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
throw new IllegalStateException(s"Unexpected error ${errors.exceptionName} while appending to transaction log for ${txnLogAppend.transactionalId}")
}
- txnStateManager.appendTransactionToLog(txnLogAppend.transactionalId, txnLogAppend.coordinatorEpoch, txnLogAppend.newMetadata, retryAppendCallback)
+ txnStateManager.appendTransactionToLog(txnLogAppend.transactionalId, txnLogAppend.coordinatorEpoch, txnLogAppend.newMetadata, appendCallback)
}
def addTxnMarkersToBrokerQueue(transactionalId: String, producerId: Long, producerEpoch: Short,
@@ -320,7 +319,6 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
val txnIdAndMarker = TxnIdAndMarkerEntry(transactionalId, marker)
if (brokerNode == Node.noNode) {
- unknownBrokerQueueSensor.record(1)
// if the leader of the partition is known but node not available, put it into an unknown broker queue
// and let the sender thread to look for its broker and migrate them later
markersQueueForUnknownBroker.addMarkers(txnTopicPartition, txnIdAndMarker)
http://git-wip-us.apache.org/repos/asf/kafka/blob/b4b65e9e/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index e390ecd..5e328ae 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -20,14 +20,16 @@ import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache}
import kafka.utils.timer.MockTimer
import kafka.utils.TestUtils
import org.apache.kafka.clients.NetworkClient
-import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.requests.{TransactionResult, WriteTxnMarkersRequest}
import org.apache.kafka.common.utils.{MockTime, Utils}
import org.apache.kafka.common.{Node, TopicPartition}
import org.easymock.EasyMock
import org.junit.Assert._
-import org.junit.{After, Test}
+import org.junit.Test
+import com.yammer.metrics.Metrics
+
+import scala.collection.JavaConverters._
import scala.collection.mutable
class TransactionMarkerChannelManagerTest {
@@ -60,23 +62,16 @@ class TransactionMarkerChannelManagerTest {
reaperEnabled = false)
private val time = new MockTime
- private val metrics = new Metrics()
private val channelManager = new TransactionMarkerChannelManager(
KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:2181")),
metadataCache,
networkClient,
txnStateManager,
txnMarkerPurgatory,
- time,
- metrics)
+ time)
private val senderThread = channelManager.senderThread
- @After
- def after(): Unit = {
- metrics.close()
- }
-
private def mockCache(): Unit = {
EasyMock.expect(txnStateManager.partitionFor(transactionalId1))
.andReturn(txnTopicPartition1)
@@ -120,10 +115,10 @@ class TransactionMarkerChannelManagerTest {
channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnMetadata2.prepareComplete(time.milliseconds()))
assertEquals(2, txnMarkerPurgatory.watched)
- assertEquals(2, channelManager.queueForBroker(broker1.id).get.totalNumMarkers())
+ assertEquals(2, channelManager.queueForBroker(broker1.id).get.totalNumMarkers)
assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers(txnTopicPartition1))
assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers(txnTopicPartition2))
- assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers())
+ assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers)
assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition1))
assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition2))
@@ -162,7 +157,7 @@ class TransactionMarkerChannelManagerTest {
channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnMetadata2.prepareComplete(time.milliseconds()))
assertEquals(1, txnMarkerPurgatory.watched)
- assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers())
+ assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers)
assertTrue(channelManager.queueForBroker(broker1.id).isEmpty)
assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition1))
assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition2))
@@ -194,11 +189,11 @@ class TransactionMarkerChannelManagerTest {
channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnMetadata2.prepareComplete(time.milliseconds()))
assertEquals(2, txnMarkerPurgatory.watched)
- assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers())
+ assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers)
assertTrue(channelManager.queueForBroker(broker1.id).isEmpty)
assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition1))
assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition2))
- assertEquals(2, channelManager.queueForUnknownBroker.totalNumMarkers())
+ assertEquals(2, channelManager.queueForUnknownBroker.totalNumMarkers)
assertEquals(1, channelManager.queueForUnknownBroker.totalNumMarkers(txnTopicPartition1))
assertEquals(1, channelManager.queueForUnknownBroker.totalNumMarkers(txnTopicPartition2))
@@ -242,21 +237,33 @@ class TransactionMarkerChannelManagerTest {
channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnMetadata2.prepareComplete(time.milliseconds()))
assertEquals(2, txnMarkerPurgatory.watched)
- assertEquals(2, channelManager.queueForBroker(broker1.id).get.totalNumMarkers())
+ assertEquals(2, channelManager.queueForBroker(broker1.id).get.totalNumMarkers)
assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers(txnTopicPartition1))
assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers(txnTopicPartition2))
- assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers())
+ assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers)
assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition1))
assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition2))
channelManager.removeMarkersForTxnTopicPartition(txnTopicPartition1)
assertEquals(1, txnMarkerPurgatory.watched)
- assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers())
+ assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers)
assertEquals(0, channelManager.queueForBroker(broker1.id).get.totalNumMarkers(txnTopicPartition1))
assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers(txnTopicPartition2))
- assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers())
+ assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers)
assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition1))
assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition2))
}
+
+ @Test
+ def shouldCreateMetricsOnStarting(): Unit = {
+ val metrics = Metrics.defaultRegistry.allMetrics
+
+ assertEquals(1, Metrics.defaultRegistry.allMetrics.asScala
+ .filterKeys(_.getMBeanName == "kafka.coordinator.transaction:type=TransactionMarkerChannelManager,name=UnknownDestinationQueueSize,broker-id=1")
+ .size)
+ assertEquals(1, Metrics.defaultRegistry.allMetrics.asScala
+ .filterKeys(_.getMBeanName == "kafka.coordinator.transaction:type=TransactionMarkerChannelManager,name=CompleteTxnLogAppendQueueSize,broker-id=1")
+ .size)
+ }
}