You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/07/14 21:08:41 UTC

kafka git commit: MINOR: Correct the ConsumerPerformance print format

Repository: kafka
Updated Branches:
  refs/heads/trunk 873eeae9f -> d0ce0a95d


MINOR: Correct the ConsumerPerformance print format

Currently, the output of `ConsumerPerformance` looks strange. The `header` format as follow:
```
"time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec"
```
while the `body` as follow:
```
println("%s, %d, %.4f, %.4f, %d, %.4f".format(dateFormat.format(endMs), id, totalMBRead,
        1000.0 * (mbRead / elapsedMs), messagesRead, ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0))
```

So we get the follow result:
```
time, data.consumeed.in.MB, MB.sec, data.consumeed.in.nMsg, nMsg.Sec
09:52:00, 0, 1100.3086, 220.0177, 563358, 112649.0702
```
So the `header` and `body` mismatching.

And also, this pr makes the functions more readable.

Author: Xianyang Liu <xi...@intel.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #3417 from ConeyLiu/consumertest


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d0ce0a95
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d0ce0a95
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d0ce0a95

Branch: refs/heads/trunk
Commit: d0ce0a95de8c5451d73dcc87f3f9b6c03f5f85de
Parents: 873eeae
Author: Xianyang Liu <xi...@intel.com>
Authored: Fri Jul 14 11:51:52 2017 -0700
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Jul 14 13:50:38 2017 -0700

----------------------------------------------------------------------
 .../scala/kafka/tools/ConsumerPerformance.scala | 39 +++++++++++++----
 .../kafka/tools/ConsumerPerformanceTest.scala   | 46 ++++++++++++++++++++
 2 files changed, 76 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ce0a95/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index a5d4d5d..c818661 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -56,10 +56,7 @@ object ConsumerPerformance {
     var metrics: mutable.Map[MetricName, _ <: Metric] = null
 
     if (!config.hideHeader) {
-      if (!config.showDetailedStats)
-        println("start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
-      else
-        println("time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
+      printHeader(!config.showDetailedStats)
     }
 
     var startMs, endMs = 0L
@@ -110,7 +107,20 @@ object ConsumerPerformance {
 
   }
 
-  def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], topics: List[String], count: Long, timeout: Long, config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) {
+  private[tools] def printHeader(showDetailedStats: Boolean): Unit = {
+    if (showDetailedStats)
+      println("start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
+    else
+      println("time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
+  }
+
+  def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]],
+              topics: List[String],
+              count: Long,
+              timeout: Long,
+              config: ConsumerPerfConfig,
+              totalMessagesRead: AtomicLong,
+              totalBytesRead: AtomicLong) {
     var bytesRead = 0L
     var messagesRead = 0L
     var lastBytesRead = 0L
@@ -167,8 +177,14 @@ object ConsumerPerformance {
     totalBytesRead.set(bytesRead)
   }
 
-  def printProgressMessage(id: Int, bytesRead: Long, lastBytesRead: Long, messagesRead: Long, lastMessagesRead: Long,
-    startMs: Long, endMs: Long, dateFormat: SimpleDateFormat) = {
+  def printProgressMessage(id: Int,
+                           bytesRead: Long,
+                           lastBytesRead: Long,
+                           messagesRead: Long,
+                           lastMessagesRead: Long,
+                           startMs: Long,
+                           endMs: Long,
+                           dateFormat: SimpleDateFormat) = {
     val elapsedMs: Double = endMs - startMs
     val totalMBRead = (bytesRead * 1.0) / (1024 * 1024)
     val mbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024)
@@ -271,8 +287,13 @@ object ConsumerPerformance {
     val hideHeader = options.has(hideHeaderOpt)
   }
 
-  class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Array[Byte], Array[Byte]],
-    config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong, consumerTimeout: AtomicBoolean)
+  class ConsumerPerfThread(threadId: Int,
+                           name: String,
+                           stream: KafkaStream[Array[Byte], Array[Byte]],
+                           config: ConsumerPerfConfig,
+                           totalMessagesRead: AtomicLong,
+                           totalBytesRead: AtomicLong,
+                           consumerTimeout: AtomicBoolean)
     extends Thread(name) {
 
     override def run() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ce0a95/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala b/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala
new file mode 100644
index 0000000..2fa774c
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.io.ByteArrayOutputStream
+import java.text.SimpleDateFormat
+
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class ConsumerPerformanceTest {
+
+  private val outContent = new ByteArrayOutputStream()
+
+  @Test
+  def testHeaderMatchBody(): Unit = {
+    Console.withOut(outContent) {
+      ConsumerPerformance.printHeader(true)
+      ConsumerPerformance.printProgressMessage(1, 1024 * 1024, 0, 1, 0, 0, 1,
+        new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS")
+      )
+    }
+
+    val contents = outContent.toString.split("\n")
+    assertEquals(2, contents.length)
+    val header = contents(0)
+    val body = contents(1)
+
+    assertEquals(header.split(",").length, body.split(",").length)
+  }
+}