You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/12 11:37:22 UTC
kafka git commit: KAFKA-5194;
Include only client traffic in BytesOutPerSec metric (KIP-153)
Repository: kafka
Updated Branches:
refs/heads/trunk 911c768bc -> d96866243
KAFKA-5194; Include only client traffic in BytesOutPerSec metric (KIP-153)
Also added 2 new metrics to account for incoming/outgoing traffic due to internal replication
- ReplicationBytesInPerSec
- ReplicationBytesOutPerSec
Author: Mickael Maison <mi...@gmail.com>
Reviewers: Jun Rao <ju...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #3003 from mimaison/KAFKA-5194
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d9686624
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d9686624
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d9686624
Branch: refs/heads/trunk
Commit: d96866243990a4d739ec8fc239f0c2758bba66e8
Parents: 911c768
Author: Mickael Maison <mi...@gmail.com>
Authored: Fri May 12 11:50:19 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri May 12 12:33:48 2017 +0100
----------------------------------------------------------------------
.../src/main/scala/kafka/server/KafkaApis.scala | 3 +-
.../kafka/server/KafkaRequestHandler.scala | 31 +++++++++++
.../kafka/server/ReplicaFetcherThread.scala | 1 +
.../scala/unit/kafka/metrics/MetricsTest.scala | 56 ++++++++++++++++----
.../test/scala/unit/kafka/utils/TestUtils.scala | 25 ++++++++-
5 files changed, 102 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d9686624/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index fbd74ac..150d16d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -516,8 +516,7 @@ class KafkaApis(val requestChannel: RequestChannel,
fetchedPartitionData.put(topicPartition, data)
// record the bytes out metrics only when the response is being sent
- BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesOutRate.mark(data.records.sizeInBytes)
- BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.records.sizeInBytes)
+ BrokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes)
}
val response = new FetchResponse(fetchedPartitionData, 0)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d9686624/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index a1600cb..d1d63f1 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -115,6 +115,12 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup {
val bytesInRate = newMeter(BrokerTopicStats.BytesInPerSec, "bytes", TimeUnit.SECONDS, tags)
val bytesOutRate = newMeter(BrokerTopicStats.BytesOutPerSec, "bytes", TimeUnit.SECONDS, tags)
val bytesRejectedRate = newMeter(BrokerTopicStats.BytesRejectedPerSec, "bytes", TimeUnit.SECONDS, tags)
+ private[server] val replicationBytesInRate =
+ if (name.isEmpty) Some(newMeter(BrokerTopicStats.ReplicationBytesInPerSec, "bytes", TimeUnit.SECONDS, tags))
+ else None
+ private[server] val replicationBytesOutRate =
+ if (name.isEmpty) Some(newMeter(BrokerTopicStats.ReplicationBytesOutPerSec, "bytes", TimeUnit.SECONDS, tags))
+ else None
val failedProduceRequestRate = newMeter(BrokerTopicStats.FailedProduceRequestsPerSec, "requests", TimeUnit.SECONDS, tags)
val failedFetchRequestRate = newMeter(BrokerTopicStats.FailedFetchRequestsPerSec, "requests", TimeUnit.SECONDS, tags)
val totalProduceRequestRate = newMeter(BrokerTopicStats.TotalProduceRequestsPerSec, "requests", TimeUnit.SECONDS, tags)
@@ -125,6 +131,8 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup {
removeMetric(BrokerTopicStats.BytesInPerSec, tags)
removeMetric(BrokerTopicStats.BytesOutPerSec, tags)
removeMetric(BrokerTopicStats.BytesRejectedPerSec, tags)
+ removeMetric(BrokerTopicStats.ReplicationBytesInPerSec, tags)
+ removeMetric(BrokerTopicStats.ReplicationBytesOutPerSec, tags)
removeMetric(BrokerTopicStats.FailedProduceRequestsPerSec, tags)
removeMetric(BrokerTopicStats.FailedFetchRequestsPerSec, tags)
removeMetric(BrokerTopicStats.TotalProduceRequestsPerSec, tags)
@@ -137,6 +145,8 @@ object BrokerTopicStats extends Logging {
val BytesInPerSec = "BytesInPerSec"
val BytesOutPerSec = "BytesOutPerSec"
val BytesRejectedPerSec = "BytesRejectedPerSec"
+ val ReplicationBytesInPerSec = "ReplicationBytesInPerSec"
+ val ReplicationBytesOutPerSec = "ReplicationBytesOutPerSec"
val FailedProduceRequestsPerSec = "FailedProduceRequestsPerSec"
val FailedFetchRequestsPerSec = "FailedFetchRequestsPerSec"
val TotalProduceRequestsPerSec = "TotalProduceRequestsPerSec"
@@ -152,9 +162,30 @@ object BrokerTopicStats extends Logging {
stats.getAndMaybePut(topic)
}
+ def updateReplicationBytesIn(value: Long) {
+ getBrokerAllTopicsStats.replicationBytesInRate.foreach { metric =>
+ metric.mark(value)
+ }
+ }
+
+ private def updateReplicationBytesOut(value: Long) {
+ getBrokerAllTopicsStats.replicationBytesOutRate.foreach { metric =>
+ metric.mark(value)
+ }
+ }
+
def removeMetrics(topic: String) {
val metrics = stats.remove(topic)
if (metrics != null)
metrics.close()
}
+
+ def updateBytesOut(topic: String, isFollower: Boolean, value: Long) {
+ if (isFollower) {
+ updateReplicationBytesOut(value)
+ } else {
+ getBrokerTopicStats(topic).bytesOutRate.mark(value)
+ getBrokerAllTopicsStats.bytesOutRate.mark(value)
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d9686624/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 9016fcf..1148e92 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -112,6 +112,7 @@ class ReplicaFetcherThread(name: String,
trace(s"Follower ${replica.brokerId} set replica high watermark for partition $topicPartition to $followerHighWatermark")
if (quota.isThrottled(topicPartition))
quota.record(records.sizeInBytes)
+ BrokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)
} catch {
case e: KafkaStorageException =>
fatal(s"Disk error while replicating data for $topicPartition", e)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d9686624/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 16f0636..f33055f 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -20,8 +20,8 @@ package kafka.metrics
import java.util.Properties
import com.yammer.metrics.Metrics
-import com.yammer.metrics.core.{Metric, MetricName, MetricPredicate}
-import org.junit.{After, Test}
+import com.yammer.metrics.core.{Meter, MetricPredicate}
+import org.junit.Test
import org.junit.Assert._
import kafka.integration.KafkaServerTestHarness
import kafka.server._
@@ -48,11 +48,6 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
val nMessages = 2
- @After
- override def tearDown() {
- super.tearDown()
- }
-
@Test
@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
def testMetricsLeak() {
@@ -93,10 +88,10 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
}
@Test
- def testClusterIdMetric(): Unit ={
+ def testClusterIdMetric(): Unit = {
// Check if clusterId metric exists.
val metrics = Metrics.defaultRegistry().allMetrics
- assertEquals(metrics.keySet.asScala.count(_.getMBeanName().equals("kafka.server:type=KafkaServer,name=ClusterId")), 1)
+ assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=KafkaServer,name=ClusterId"), 1)
}
@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
@@ -111,10 +106,51 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
zkConsumerConnector1.shutdown()
}
+ @Test
+ def testBrokerTopicMetricsBytesInOut(): Unit = {
+ val replicationBytesIn = BrokerTopicStats.ReplicationBytesInPerSec
+ val replicationBytesOut = BrokerTopicStats.ReplicationBytesOutPerSec
+ val bytesIn = s"${BrokerTopicStats.BytesInPerSec},topic=$topic"
+ val bytesOut = s"${BrokerTopicStats.BytesOutPerSec},topic=$topic"
+
+ createTopic(zkUtils, topic, 1, numNodes, servers)
+ // Produce a few messages to create the metrics
+ TestUtils.produceMessages(servers, topic, nMessages)
+
+ val initialReplicationBytesIn = meterCount(replicationBytesIn)
+ val initialReplicationBytesOut = meterCount(replicationBytesOut)
+ val initialBytesIn = meterCount(bytesIn)
+ val initialBytesOut = meterCount(bytesOut)
+
+ // Produce a few messages to make the metrics tick
+ TestUtils.produceMessages(servers, topic, nMessages)
+
+ assertTrue(meterCount(replicationBytesIn) > initialReplicationBytesIn)
+ assertTrue(meterCount(replicationBytesOut) > initialReplicationBytesOut)
+ assertTrue(meterCount(bytesIn) > initialBytesIn)
+ // BytesOut doesn't include replication, so it shouldn't have changed
+ assertEquals(initialBytesOut, meterCount(bytesOut))
+
+ // Consume messages to make bytesOut tick
+ TestUtils.consumeTopicRecords(servers, topic, nMessages * 2)
+
+ assertTrue(meterCount(bytesOut) > initialBytesOut)
+ }
+
+ private def meterCount(metricName: String): Long = {
+ Metrics.defaultRegistry.allMetrics.asScala
+ .filterKeys(_.getMBeanName.endsWith(metricName))
+ .values
+ .headOption
+ .getOrElse(fail(s"Unable to find metric $metricName"))
+ .asInstanceOf[Meter]
+ .count
+ }
+
private def checkTopicMetricsExists(topic: String): Boolean = {
val topicMetricRegex = new Regex(".*("+topic+")$")
val metricGroups = Metrics.defaultRegistry().groupedMetrics(MetricPredicate.ALL).entrySet()
- for(metricGroup <- metricGroups.asScala) {
+ for (metricGroup <- metricGroups.asScala) {
if (topicMetricRegex.pattern.matcher(metricGroup.getKey()).matches)
return true
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d9686624/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index a51a07c..f254ee4 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -22,7 +22,7 @@ import java.nio._
import java.nio.channels._
import java.nio.charset.Charset
import java.security.cert.X509Certificate
-import java.util.Properties
+import java.util.{ArrayList, Collections, Properties}
import java.util.concurrent.{Callable, Executors, TimeUnit}
import javax.net.ssl.X509TrustManager
@@ -40,7 +40,7 @@ import kafka.server._
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils.ZkUtils._
import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor}
+import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer, RangeAssignor}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.{ListenerName, Mode}
@@ -1298,6 +1298,27 @@ object TestUtils extends Logging {
assertTrue(s"$message failed with exception(s) $exceptions", exceptions.isEmpty)
}
+
+ def consumeTopicRecords[K, V](servers: Seq[KafkaServer], topic: String, numMessages: Int,
+ waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = {
+ val consumer = createNewConsumer(TestUtils.getBrokerListStrFromServers(servers),
+ securityProtocol = SecurityProtocol.PLAINTEXT)
+ try {
+ consumer.subscribe(Collections.singleton(topic))
+ consumeRecords(consumer, numMessages, waitTime)
+ } finally consumer.close()
+ }
+
+ def consumeRecords[K, V](consumer: KafkaConsumer[K, V], numMessages: Int,
+ waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Seq[ConsumerRecord[K, V]] = {
+ val records = new ArrayBuffer[ConsumerRecord[K, V]]()
+ waitUntilTrue(() => {
+ records ++= consumer.poll(50).asScala
+ records.size >= numMessages
+ }, s"Consumed ${records.size} records until timeout instead of the expected $numMessages records", waitTime)
+ assertEquals("Consumed more records than expected", numMessages, records.size)
+ records
+ }
}
class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {