You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/09/20 16:34:46 UTC

kafka git commit: KAFKA-5358; Consumer perf tool should count rebalance time (KIP-177)

Repository: kafka
Updated Branches:
  refs/heads/trunk c82be0f30 -> 239dad1b9


KAFKA-5358; Consumer perf tool should count rebalance time (KIP-177)

Author: huxihx <hu...@hotmail.com>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #3188 from huxihx/KAKFA-5358


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/239dad1b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/239dad1b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/239dad1b

Branch: refs/heads/trunk
Commit: 239dad1b9fb6803842067dd588f679ba6ae5efe7
Parents: c82be0f
Author: huxihx <hu...@hotmail.com>
Authored: Wed Sep 20 09:28:31 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Sep 20 09:29:39 2017 -0700

----------------------------------------------------------------------
 .../scala/kafka/tools/ConsumerPerformance.scala | 143 +++++++++++++------
 .../kafka/tools/ConsumerPerformanceTest.scala   |  37 +++--
 2 files changed, 129 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/239dad1b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index ed1b440..bdec41f 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -54,9 +54,10 @@ object ConsumerPerformance {
     val totalBytesRead = new AtomicLong(0)
     val consumerTimeout = new AtomicBoolean(false)
     var metrics: mutable.Map[MetricName, _ <: Metric] = null
+    val joinGroupTimeInMs = new AtomicLong(0)
 
     if (!config.hideHeader) {
-      printHeader(!config.showDetailedStats)
+      printHeader(config.showDetailedStats, config.useOldConsumer)
     }
 
     var startMs, endMs = 0L
@@ -64,7 +65,7 @@ object ConsumerPerformance {
       val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config.props)
       consumer.subscribe(Collections.singletonList(config.topic))
       startMs = System.currentTimeMillis
-      consume(consumer, List(config.topic), config.numMessages, 1000, config, totalMessagesRead, totalBytesRead)
+      consume(consumer, List(config.topic), config.numMessages, 1000, config, totalMessagesRead, totalBytesRead, joinGroupTimeInMs, startMs)
       endMs = System.currentTimeMillis
 
       if (config.printMetrics) {
@@ -86,19 +87,35 @@ object ConsumerPerformance {
       logger.info("starting threads")
       startMs = System.currentTimeMillis
       for (thread <- threadList)
-        thread.start
+        thread.start()
       for (thread <- threadList)
-        thread.join
+        thread.join()
       endMs =
         if (consumerTimeout.get()) System.currentTimeMillis - consumerConfig.consumerTimeoutMs
         else System.currentTimeMillis
       consumerConnector.shutdown()
     }
     val elapsedSecs = (endMs - startMs) / 1000.0
+    val fetchTimeInMs = (endMs - startMs) - joinGroupTimeInMs.get
     if (!config.showDetailedStats) {
       val totalMBRead = (totalBytesRead.get * 1.0) / (1024 * 1024)
-      println("%s, %s, %.4f, %.4f, %d, %.4f".format(config.dateFormat.format(startMs), config.dateFormat.format(endMs),
-        totalMBRead, totalMBRead / elapsedSecs, totalMessagesRead.get, totalMessagesRead.get / elapsedSecs))
+      print("%s, %s, %.4f, %.4f, %d, %.4f".format(
+        config.dateFormat.format(startMs),
+        config.dateFormat.format(endMs),
+        totalMBRead,
+        totalMBRead / elapsedSecs,
+        totalMessagesRead.get,
+        totalMessagesRead.get / elapsedSecs
+      ))
+      if (!config.useOldConsumer) {
+        print(", %d, %d, %.4f, %.4f".format(
+          joinGroupTimeInMs.get,
+          fetchTimeInMs,
+          totalMBRead / (fetchTimeInMs / 1000.0),
+          totalMessagesRead.get / (fetchTimeInMs / 1000.0)
+        ))
+      }
+      println()
     }
 
     if (metrics != null) {
@@ -107,11 +124,13 @@ object ConsumerPerformance {
 
   }
 
-  private[tools] def printHeader(showDetailedStats: Boolean): Unit = {
-    if (showDetailedStats)
-      println("start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
-    else
-      println("time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
+  private[tools] def printHeader(showDetailedStats: Boolean, useOldConsumer: Boolean): Unit = {
+    val newFieldsInHeader = if (!useOldConsumer) ", rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec" else ""
+    if (!showDetailedStats) {
+        println("start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec" + newFieldsInHeader)
+      } else {
+        println("time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec" + newFieldsInHeader)
+    }
   }
 
   def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]],
@@ -120,29 +139,25 @@ object ConsumerPerformance {
               timeout: Long,
               config: ConsumerPerfConfig,
               totalMessagesRead: AtomicLong,
-              totalBytesRead: AtomicLong) {
+              totalBytesRead: AtomicLong,
+              joinTime: AtomicLong,
+              testStartTime: Long) {
     var bytesRead = 0L
     var messagesRead = 0L
     var lastBytesRead = 0L
     var lastMessagesRead = 0L
+    var joinStart = 0L
+    var joinTimeMsInSingleRound = 0L
 
-    // Wait for group join, metadata fetch, etc
-    val joinTimeout = 10000
-    val isAssigned = new AtomicBoolean(false)
     consumer.subscribe(topics.asJava, new ConsumerRebalanceListener {
       def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) {
-        isAssigned.set(true)
+        joinTime.addAndGet(System.currentTimeMillis - joinStart)
+        joinTimeMsInSingleRound += System.currentTimeMillis - joinStart
       }
       def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) {
-        isAssigned.set(false)
+        joinStart = System.currentTimeMillis
       }})
-    val joinStart = System.currentTimeMillis()
-    while (!isAssigned.get()) {
-      if (System.currentTimeMillis() - joinStart >= joinTimeout) {
-        throw new Exception("Timed out waiting for initial group join.")
-      }
-      consumer.poll(100)
-    }
+    consumer.poll(0)
     consumer.seekToBeginning(Collections.emptyList())
 
     // Now start the benchmark
@@ -165,7 +180,9 @@ object ConsumerPerformance {
 
         if (currentTimeMillis - lastReportTime >= config.reportingInterval) {
           if (config.showDetailedStats)
-            printProgressMessage(0, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, currentTimeMillis, config.dateFormat)
+            printNewConsumerProgress(0, bytesRead, lastBytesRead, messagesRead, lastMessagesRead,
+              lastReportTime, currentTimeMillis, config.dateFormat, joinTimeMsInSingleRound)
+          joinTimeMsInSingleRound = 0L
           lastReportTime = currentTimeMillis
           lastMessagesRead = messagesRead
           lastBytesRead = bytesRead
@@ -177,19 +194,64 @@ object ConsumerPerformance {
     totalBytesRead.set(bytesRead)
   }
 
-  def printProgressMessage(id: Int,
-                           bytesRead: Long,
-                           lastBytesRead: Long,
-                           messagesRead: Long,
-                           lastMessagesRead: Long,
-                           startMs: Long,
-                           endMs: Long,
-                           dateFormat: SimpleDateFormat) = {
+  def printOldConsumerProgress(id: Int,
+                               bytesRead: Long,
+                               lastBytesRead: Long,
+                               messagesRead: Long,
+                               lastMessagesRead: Long,
+                               startMs: Long,
+                               endMs: Long,
+                               dateFormat: SimpleDateFormat): Unit = {
+    printBasicProgress(id, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, dateFormat)
+    println()
+  }
+
+  def printNewConsumerProgress(id: Int,
+                               bytesRead: Long,
+                               lastBytesRead: Long,
+                               messagesRead: Long,
+                               lastMessagesRead: Long,
+                               startMs: Long,
+                               endMs: Long,
+                               dateFormat: SimpleDateFormat,
+                               periodicJoinTimeInMs: Long): Unit = {
+    printBasicProgress(id, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, dateFormat)
+    printExtendedProgress(bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, periodicJoinTimeInMs)
+    println()
+  }
+
+  private def printBasicProgress(id: Int,
+                                 bytesRead: Long,
+                                 lastBytesRead: Long,
+                                 messagesRead: Long,
+                                 lastMessagesRead: Long,
+                                 startMs: Long,
+                                 endMs: Long,
+                                 dateFormat: SimpleDateFormat): Unit = {
     val elapsedMs: Double = endMs - startMs
-    val totalMBRead = (bytesRead * 1.0) / (1024 * 1024)
-    val mbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024)
-    println("%s, %d, %.4f, %.4f, %d, %.4f".format(dateFormat.format(endMs), id, totalMBRead,
-        1000.0 * (mbRead / elapsedMs), messagesRead, ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0))
+    val totalMbRead = (bytesRead * 1.0) / (1024 * 1024)
+    val intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024)
+    val intervalMbPerSec = 1000.0 * intervalMbRead / elapsedMs
+    val intervalMessagesPerSec = ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0
+    print("%s, %d, %.4f, %.4f, %d, %.4f".format(dateFormat.format(endMs), id, totalMbRead,
+      intervalMbPerSec, messagesRead, intervalMessagesPerSec))
+  }
+
+  private def printExtendedProgress(bytesRead: Long,
+                                    lastBytesRead: Long,
+                                    messagesRead: Long,
+                                    lastMessagesRead: Long,
+                                    startMs: Long,
+                                    endMs: Long,
+                                    periodicJoinTimeInMs: Long): Unit = {
+    val fetchTimeMs = endMs - startMs - periodicJoinTimeInMs
+    val intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024)
+    val intervalMessagesRead = messagesRead - lastMessagesRead
+    val (intervalMbPerSec, intervalMessagesPerSec) = if (fetchTimeMs <= 0)
+      (0.0, 0.0)
+    else
+      (1000.0 * intervalMbRead / fetchTimeMs, 1000.0 * intervalMessagesRead / fetchTimeMs)
+    print(", %d, %d, %.4f, %.4f".format(periodicJoinTimeInMs, fetchTimeMs, intervalMbPerSec, intervalMessagesPerSec))
   }
 
   class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) {
@@ -316,14 +378,14 @@ object ConsumerPerformance {
       try {
         val iter = stream.iterator
         while (iter.hasNext && messagesRead < config.numMessages) {
-          val messageAndMetadata = iter.next
+          val messageAndMetadata = iter.next()
           messagesRead += 1
           bytesRead += messageAndMetadata.message.length
           val currentTimeMillis = System.currentTimeMillis
 
           if (currentTimeMillis - lastReportTime >= config.reportingInterval) {
             if (config.showDetailedStats)
-              printProgressMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, currentTimeMillis, config.dateFormat)
+              printOldConsumerProgress(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, currentTimeMillis, config.dateFormat)
             lastReportTime = currentTimeMillis
             lastMessagesRead = messagesRead
             lastBytesRead = bytesRead
@@ -339,7 +401,8 @@ object ConsumerPerformance {
       totalMessagesRead.addAndGet(messagesRead)
       totalBytesRead.addAndGet(bytesRead)
       if (config.showDetailedStats)
-        printProgressMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, System.currentTimeMillis, config.dateFormat)
+        printOldConsumerProgress(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, System.currentTimeMillis, config.dateFormat)
+
     }
 
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/239dad1b/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala b/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala
index 2fa774c..bafe8ed 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala
@@ -26,21 +26,36 @@ import org.junit.Test
 class ConsumerPerformanceTest {
 
   private val outContent = new ByteArrayOutputStream()
+  private val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS")
 
   @Test
-  def testHeaderMatchBody(): Unit = {
+  def testDetailedHeaderMatchBody(): Unit = {
+    testHeaderMatchContent(detailed = true, useOldConsumer = false, 2,
+      () => ConsumerPerformance.printNewConsumerProgress(1, 1024 * 1024, 0, 1, 0, 0, 1, dateFormat, 1L))
+    testHeaderMatchContent(detailed = true, useOldConsumer = true, 4,
+      () => ConsumerPerformance.printOldConsumerProgress(1, 1024 * 1024, 0, 1, 0, 0, 1,
+      dateFormat))
+  }
+
+  @Test
+  def testNonDetailedHeaderMatchBody(): Unit = {
+    testHeaderMatchContent(detailed = false, useOldConsumer = false, 2, () => println(s"${dateFormat.format(System.currentTimeMillis)}, " +
+      s"${dateFormat.format(System.currentTimeMillis)}, 1.0, 1.0, 1, 1.0, 1, 1, 1.1, 1.1"))
+    testHeaderMatchContent(detailed = false, useOldConsumer = true, 4, () => println(s"${dateFormat.format(System.currentTimeMillis)}, " +
+      s"${dateFormat.format(System.currentTimeMillis)}, 1.0, 1.0, 1, 1.0"))
+  }
+
+  private def testHeaderMatchContent(detailed: Boolean, useOldConsumer: Boolean, expectedOutputLineCount: Int, fun: () => Unit): Unit = {
     Console.withOut(outContent) {
-      ConsumerPerformance.printHeader(true)
-      ConsumerPerformance.printProgressMessage(1, 1024 * 1024, 0, 1, 0, 0, 1,
-        new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS")
-      )
-    }
+      ConsumerPerformance.printHeader(detailed, useOldConsumer)
+      fun()
 
-    val contents = outContent.toString.split("\n")
-    assertEquals(2, contents.length)
-    val header = contents(0)
-    val body = contents(1)
+      val contents = outContent.toString.split("\n")
+      assertEquals(expectedOutputLineCount, contents.length)
+      val header = contents(0)
+      val body = contents(1)
 
-    assertEquals(header.split(",").length, body.split(",").length)
+      assertEquals(header.split(",").length, body.split(",").length)
+    }
   }
 }