You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/12 11:37:22 UTC

kafka git commit: KAFKA-5194; Include only client traffic in BytesOutPerSec metric (KIP-153)

Repository: kafka
Updated Branches:
  refs/heads/trunk 911c768bc -> d96866243


KAFKA-5194; Include only client traffic in BytesOutPerSec metric (KIP-153)

Also added 2 new metrics to account for incoming/outgoing traffic due to internal replication
- ReplicationBytesInPerSec
- ReplicationBytesOutPerSec

Author: Mickael Maison <mi...@gmail.com>

Reviewers: Jun Rao <ju...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #3003 from mimaison/KAFKA-5194


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d9686624
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d9686624
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d9686624

Branch: refs/heads/trunk
Commit: d96866243990a4d739ec8fc239f0c2758bba66e8
Parents: 911c768
Author: Mickael Maison <mi...@gmail.com>
Authored: Fri May 12 11:50:19 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri May 12 12:33:48 2017 +0100

----------------------------------------------------------------------
 .../src/main/scala/kafka/server/KafkaApis.scala |  3 +-
 .../kafka/server/KafkaRequestHandler.scala      | 31 +++++++++++
 .../kafka/server/ReplicaFetcherThread.scala     |  1 +
 .../scala/unit/kafka/metrics/MetricsTest.scala  | 56 ++++++++++++++++----
 .../test/scala/unit/kafka/utils/TestUtils.scala | 25 ++++++++-
 5 files changed, 102 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d9686624/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index fbd74ac..150d16d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -516,8 +516,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         fetchedPartitionData.put(topicPartition, data)
 
         // record the bytes out metrics only when the response is being sent
-        BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesOutRate.mark(data.records.sizeInBytes)
-        BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.records.sizeInBytes)
+        BrokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes)
       }
 
       val response = new FetchResponse(fetchedPartitionData, 0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9686624/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index a1600cb..d1d63f1 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -115,6 +115,12 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup {
   val bytesInRate = newMeter(BrokerTopicStats.BytesInPerSec, "bytes", TimeUnit.SECONDS, tags)
   val bytesOutRate = newMeter(BrokerTopicStats.BytesOutPerSec, "bytes", TimeUnit.SECONDS, tags)
   val bytesRejectedRate = newMeter(BrokerTopicStats.BytesRejectedPerSec, "bytes", TimeUnit.SECONDS, tags)
+  private[server] val replicationBytesInRate =
+    if (name.isEmpty) Some(newMeter(BrokerTopicStats.ReplicationBytesInPerSec, "bytes", TimeUnit.SECONDS, tags))
+    else None
+  private[server] val replicationBytesOutRate =
+    if (name.isEmpty) Some(newMeter(BrokerTopicStats.ReplicationBytesOutPerSec, "bytes", TimeUnit.SECONDS, tags))
+    else None
   val failedProduceRequestRate = newMeter(BrokerTopicStats.FailedProduceRequestsPerSec, "requests", TimeUnit.SECONDS, tags)
   val failedFetchRequestRate = newMeter(BrokerTopicStats.FailedFetchRequestsPerSec, "requests", TimeUnit.SECONDS, tags)
   val totalProduceRequestRate = newMeter(BrokerTopicStats.TotalProduceRequestsPerSec, "requests", TimeUnit.SECONDS, tags)
@@ -125,6 +131,8 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup {
     removeMetric(BrokerTopicStats.BytesInPerSec, tags)
     removeMetric(BrokerTopicStats.BytesOutPerSec, tags)
     removeMetric(BrokerTopicStats.BytesRejectedPerSec, tags)
+    removeMetric(BrokerTopicStats.ReplicationBytesInPerSec, tags)
+    removeMetric(BrokerTopicStats.ReplicationBytesOutPerSec, tags)
     removeMetric(BrokerTopicStats.FailedProduceRequestsPerSec, tags)
     removeMetric(BrokerTopicStats.FailedFetchRequestsPerSec, tags)
     removeMetric(BrokerTopicStats.TotalProduceRequestsPerSec, tags)
@@ -137,6 +145,8 @@ object BrokerTopicStats extends Logging {
   val BytesInPerSec = "BytesInPerSec"
   val BytesOutPerSec = "BytesOutPerSec"
   val BytesRejectedPerSec = "BytesRejectedPerSec"
+  val ReplicationBytesInPerSec = "ReplicationBytesInPerSec"
+  val ReplicationBytesOutPerSec = "ReplicationBytesOutPerSec"
   val FailedProduceRequestsPerSec = "FailedProduceRequestsPerSec"
   val FailedFetchRequestsPerSec = "FailedFetchRequestsPerSec"
   val TotalProduceRequestsPerSec = "TotalProduceRequestsPerSec"
@@ -152,9 +162,30 @@ object BrokerTopicStats extends Logging {
     stats.getAndMaybePut(topic)
   }
 
+  def updateReplicationBytesIn(value: Long) {
+    getBrokerAllTopicsStats.replicationBytesInRate.foreach { metric =>
+      metric.mark(value)
+    }
+  }
+
+  private def updateReplicationBytesOut(value: Long) {
+    getBrokerAllTopicsStats.replicationBytesOutRate.foreach { metric =>
+      metric.mark(value)
+    }
+  }
+
   def removeMetrics(topic: String) {
     val metrics = stats.remove(topic)
     if (metrics != null)
       metrics.close()
   }
+
+  def updateBytesOut(topic: String, isFollower: Boolean, value: Long) {
+    if (isFollower) {
+      updateReplicationBytesOut(value)
+    } else {
+      getBrokerTopicStats(topic).bytesOutRate.mark(value)
+      getBrokerAllTopicsStats.bytesOutRate.mark(value)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9686624/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 9016fcf..1148e92 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -112,6 +112,7 @@ class ReplicaFetcherThread(name: String,
         trace(s"Follower ${replica.brokerId} set replica high watermark for partition $topicPartition to $followerHighWatermark")
       if (quota.isThrottled(topicPartition))
         quota.record(records.sizeInBytes)
+      BrokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)
     } catch {
       case e: KafkaStorageException =>
         fatal(s"Disk error while replicating data for $topicPartition", e)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9686624/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 16f0636..f33055f 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -20,8 +20,8 @@ package kafka.metrics
 import java.util.Properties
 
 import com.yammer.metrics.Metrics
-import com.yammer.metrics.core.{Metric, MetricName, MetricPredicate}
-import org.junit.{After, Test}
+import com.yammer.metrics.core.{Meter, MetricPredicate}
+import org.junit.Test
 import org.junit.Assert._
 import kafka.integration.KafkaServerTestHarness
 import kafka.server._
@@ -48,11 +48,6 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
 
   val nMessages = 2
 
-  @After
-  override def tearDown() {
-    super.tearDown()
-  }
-
   @Test
   @deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
   def testMetricsLeak() {
@@ -93,10 +88,10 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
   }
 
   @Test
-  def testClusterIdMetric(): Unit ={
+  def testClusterIdMetric(): Unit = {
     // Check if clusterId metric exists.
     val metrics = Metrics.defaultRegistry().allMetrics
-    assertEquals(metrics.keySet.asScala.count(_.getMBeanName().equals("kafka.server:type=KafkaServer,name=ClusterId")), 1)
+    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=KafkaServer,name=ClusterId"), 1)
   }
 
   @deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
@@ -111,10 +106,51 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
     zkConsumerConnector1.shutdown()
   }
 
+  @Test
+  def testBrokerTopicMetricsBytesInOut(): Unit = {
+    val replicationBytesIn = BrokerTopicStats.ReplicationBytesInPerSec
+    val replicationBytesOut = BrokerTopicStats.ReplicationBytesOutPerSec
+    val bytesIn = s"${BrokerTopicStats.BytesInPerSec},topic=$topic"
+    val bytesOut = s"${BrokerTopicStats.BytesOutPerSec},topic=$topic"
+
+    createTopic(zkUtils, topic, 1, numNodes, servers)
+    // Produce a few messages to create the metrics
+    TestUtils.produceMessages(servers, topic, nMessages)
+
+    val initialReplicationBytesIn = meterCount(replicationBytesIn)
+    val initialReplicationBytesOut = meterCount(replicationBytesOut)
+    val initialBytesIn = meterCount(bytesIn)
+    val initialBytesOut = meterCount(bytesOut)
+
+    // Produce a few messages to make the metrics tick
+    TestUtils.produceMessages(servers, topic, nMessages)
+
+    assertTrue(meterCount(replicationBytesIn) > initialReplicationBytesIn)
+    assertTrue(meterCount(replicationBytesOut) > initialReplicationBytesOut)
+    assertTrue(meterCount(bytesIn) > initialBytesIn)
+    // BytesOut doesn't include replication, so it shouldn't have changed
+    assertEquals(initialBytesOut, meterCount(bytesOut))
+
+    // Consume messages to make bytesOut tick
+    TestUtils.consumeTopicRecords(servers, topic, nMessages * 2)
+
+    assertTrue(meterCount(bytesOut) > initialBytesOut)
+  }
+
+  private def meterCount(metricName: String): Long = {
+    Metrics.defaultRegistry.allMetrics.asScala
+      .filterKeys(_.getMBeanName.endsWith(metricName))
+      .values
+      .headOption
+      .getOrElse(fail(s"Unable to find metric $metricName"))
+      .asInstanceOf[Meter]
+      .count
+  }
+
   private def checkTopicMetricsExists(topic: String): Boolean = {
     val topicMetricRegex = new Regex(".*("+topic+")$")
     val metricGroups = Metrics.defaultRegistry().groupedMetrics(MetricPredicate.ALL).entrySet()
-    for(metricGroup <- metricGroups.asScala) {
+    for (metricGroup <- metricGroups.asScala) {
       if (topicMetricRegex.pattern.matcher(metricGroup.getKey()).matches)
         return true
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9686624/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index a51a07c..f254ee4 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -22,7 +22,7 @@ import java.nio._
 import java.nio.channels._
 import java.nio.charset.Charset
 import java.security.cert.X509Certificate
-import java.util.Properties
+import java.util.{ArrayList, Collections, Properties}
 import java.util.concurrent.{Callable, Executors, TimeUnit}
 import javax.net.ssl.X509TrustManager
 
@@ -40,7 +40,7 @@ import kafka.server._
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils.ZkUtils._
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor}
+import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer, RangeAssignor}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.{ListenerName, Mode}
@@ -1298,6 +1298,27 @@ object TestUtils extends Logging {
     assertTrue(s"$message failed with exception(s) $exceptions", exceptions.isEmpty)
 
   }
+
+  def consumeTopicRecords[K, V](servers: Seq[KafkaServer], topic: String, numMessages: Int,
+                                waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = {
+    val consumer = createNewConsumer(TestUtils.getBrokerListStrFromServers(servers),
+      securityProtocol = SecurityProtocol.PLAINTEXT)
+    try {
+      consumer.subscribe(Collections.singleton(topic))
+      consumeRecords(consumer, numMessages, waitTime)
+    } finally consumer.close()
+  }
+
+  def consumeRecords[K, V](consumer: KafkaConsumer[K, V], numMessages: Int,
+                           waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Seq[ConsumerRecord[K, V]] = {
+    val records = new ArrayBuffer[ConsumerRecord[K, V]]()
+    waitUntilTrue(() => {
+      records ++= consumer.poll(50).asScala
+      records.size >= numMessages
+    }, s"Consumed ${records.size} records until timeout instead of the expected $numMessages records", waitTime)
+    assertEquals("Consumed more records than expected", numMessages, records.size)
+    records
+  }
 }
 
 class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {