You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/07/14 21:08:41 UTC
kafka git commit: MINOR: Correct the ConsumerPerformance print format
Repository: kafka
Updated Branches:
refs/heads/trunk 873eeae9f -> d0ce0a95d
MINOR: Correct the ConsumerPerformance print format
Currently, the output of `ConsumerPerformance` looks strange. The `header` format as follow:
```
"time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec"
```
while the `body` as follow:
```
println("%s, %d, %.4f, %.4f, %d, %.4f".format(dateFormat.format(endMs), id, totalMBRead,
1000.0 * (mbRead / elapsedMs), messagesRead, ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0))
```
So we get the follow result:
```
time, data.consumeed.in.MB, MB.sec, data.consumeed.in.nMsg, nMsg.Sec
09:52:00, 0, 1100.3086, 220.0177, 563358, 112649.0702
```
So the `header` and `body` mismatching.
And also, this pr makes the functions more readable.
Author: Xianyang Liu <xi...@intel.com>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #3417 from ConeyLiu/consumertest
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d0ce0a95
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d0ce0a95
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d0ce0a95
Branch: refs/heads/trunk
Commit: d0ce0a95de8c5451d73dcc87f3f9b6c03f5f85de
Parents: 873eeae
Author: Xianyang Liu <xi...@intel.com>
Authored: Fri Jul 14 11:51:52 2017 -0700
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Jul 14 13:50:38 2017 -0700
----------------------------------------------------------------------
.../scala/kafka/tools/ConsumerPerformance.scala | 39 +++++++++++++----
.../kafka/tools/ConsumerPerformanceTest.scala | 46 ++++++++++++++++++++
2 files changed, 76 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ce0a95/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index a5d4d5d..c818661 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -56,10 +56,7 @@ object ConsumerPerformance {
var metrics: mutable.Map[MetricName, _ <: Metric] = null
if (!config.hideHeader) {
- if (!config.showDetailedStats)
- println("start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
- else
- println("time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
+ printHeader(!config.showDetailedStats)
}
var startMs, endMs = 0L
@@ -110,7 +107,20 @@ object ConsumerPerformance {
}
- def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], topics: List[String], count: Long, timeout: Long, config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) {
+ private[tools] def printHeader(showDetailedStats: Boolean): Unit = {
+ if (showDetailedStats)
+ println("start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
+ else
+ println("time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
+ }
+
+ def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]],
+ topics: List[String],
+ count: Long,
+ timeout: Long,
+ config: ConsumerPerfConfig,
+ totalMessagesRead: AtomicLong,
+ totalBytesRead: AtomicLong) {
var bytesRead = 0L
var messagesRead = 0L
var lastBytesRead = 0L
@@ -167,8 +177,14 @@ object ConsumerPerformance {
totalBytesRead.set(bytesRead)
}
- def printProgressMessage(id: Int, bytesRead: Long, lastBytesRead: Long, messagesRead: Long, lastMessagesRead: Long,
- startMs: Long, endMs: Long, dateFormat: SimpleDateFormat) = {
+ def printProgressMessage(id: Int,
+ bytesRead: Long,
+ lastBytesRead: Long,
+ messagesRead: Long,
+ lastMessagesRead: Long,
+ startMs: Long,
+ endMs: Long,
+ dateFormat: SimpleDateFormat) = {
val elapsedMs: Double = endMs - startMs
val totalMBRead = (bytesRead * 1.0) / (1024 * 1024)
val mbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024)
@@ -271,8 +287,13 @@ object ConsumerPerformance {
val hideHeader = options.has(hideHeaderOpt)
}
- class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Array[Byte], Array[Byte]],
- config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong, consumerTimeout: AtomicBoolean)
+ class ConsumerPerfThread(threadId: Int,
+ name: String,
+ stream: KafkaStream[Array[Byte], Array[Byte]],
+ config: ConsumerPerfConfig,
+ totalMessagesRead: AtomicLong,
+ totalBytesRead: AtomicLong,
+ consumerTimeout: AtomicBoolean)
extends Thread(name) {
override def run() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ce0a95/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala b/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala
new file mode 100644
index 0000000..2fa774c
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.io.ByteArrayOutputStream
+import java.text.SimpleDateFormat
+
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class ConsumerPerformanceTest {
+
+ private val outContent = new ByteArrayOutputStream()
+
+ @Test
+ def testHeaderMatchBody(): Unit = {
+ Console.withOut(outContent) {
+ ConsumerPerformance.printHeader(true)
+ ConsumerPerformance.printProgressMessage(1, 1024 * 1024, 0, 1, 0, 0, 1,
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS")
+ )
+ }
+
+ val contents = outContent.toString.split("\n")
+ assertEquals(2, contents.length)
+ val header = contents(0)
+ val body = contents(1)
+
+ assertEquals(header.split(",").length, body.split(",").length)
+ }
+}