You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/10/28 04:46:08 UTC

[kafka] branch trunk updated: MINOR: Remove duplicate `subscribe` call in ConsumerPerformance (#5828)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5d7cb43  MINOR: Remove duplicate `subscribe` call in ConsumerPerformance (#5828)
5d7cb43 is described below

commit 5d7cb438a5607fd1bba35ee7a7cf1b2924bae45d
Author: huxi <hu...@hotmail.com>
AuthorDate: Sun Oct 28 12:45:59 2018 +0800

    MINOR: Remove duplicate `subscribe` call in ConsumerPerformance (#5828)
    
    In the `consume` method, the consumer subscribes the topic, so no need to do
    the same thing before the method call. Also include minor clean-up in `consume`.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 core/src/main/scala/kafka/tools/ConsumerPerformance.scala | 10 ++++------
 1 file changed, 4 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 2e7b8dd..a065204 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
 import kafka.utils.{CommandLineUtils, ToolsUtils}
-import java.util.{Collections, Properties, Random}
+import java.util.{Properties, Random}
 import java.text.SimpleDateFormat
 import java.time.Duration
 
@@ -54,7 +54,6 @@ object ConsumerPerformance extends LazyLogging {
 
     var startMs, endMs = 0L
     val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config.props)
-    consumer.subscribe(Collections.singletonList(config.topic))
     startMs = System.currentTimeMillis
     consume(consumer, List(config.topic), config.numMessages, config.recordFetchTimeoutMs, config, totalMessagesRead, totalBytesRead, joinGroupTimeInMs, startMs)
     endMs = System.currentTimeMillis
@@ -121,10 +120,9 @@ object ConsumerPerformance extends LazyLogging {
       }})
 
     // Now start the benchmark
-    val startMs = System.currentTimeMillis
-    var lastReportTime: Long = startMs
-    var lastConsumedTime = System.currentTimeMillis
-    var currentTimeMillis = lastConsumedTime
+    var currentTimeMillis = System.currentTimeMillis
+    var lastReportTime: Long = currentTimeMillis
+    var lastConsumedTime = currentTimeMillis
 
     while (messagesRead < count && currentTimeMillis - lastConsumedTime <= timeout) {
       val records = consumer.poll(Duration.ofMillis(100)).asScala