You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/10/28 04:46:08 UTC
[kafka] branch trunk updated: MINOR: Remove duplicate `subscribe`
call in ConsumerPerformance (#5828)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5d7cb43 MINOR: Remove duplicate `subscribe` call in ConsumerPerformance (#5828)
5d7cb43 is described below
commit 5d7cb438a5607fd1bba35ee7a7cf1b2924bae45d
Author: huxi <hu...@hotmail.com>
AuthorDate: Sun Oct 28 12:45:59 2018 +0800
MINOR: Remove duplicate `subscribe` call in ConsumerPerformance (#5828)
In the `consume` method, the consumer subscribes the topic, so no need to do
the same thing before the method call. Also include minor clean-up in `consume`.
Reviewers: Ismael Juma <is...@juma.me.uk>
---
core/src/main/scala/kafka/tools/ConsumerPerformance.scala | 10 ++++------
1 file changed, 4 insertions(+), 6 deletions(-)
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 2e7b8dd..a065204 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
import kafka.utils.{CommandLineUtils, ToolsUtils}
-import java.util.{Collections, Properties, Random}
+import java.util.{Properties, Random}
import java.text.SimpleDateFormat
import java.time.Duration
@@ -54,7 +54,6 @@ object ConsumerPerformance extends LazyLogging {
var startMs, endMs = 0L
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config.props)
- consumer.subscribe(Collections.singletonList(config.topic))
startMs = System.currentTimeMillis
consume(consumer, List(config.topic), config.numMessages, config.recordFetchTimeoutMs, config, totalMessagesRead, totalBytesRead, joinGroupTimeInMs, startMs)
endMs = System.currentTimeMillis
@@ -121,10 +120,9 @@ object ConsumerPerformance extends LazyLogging {
}})
// Now start the benchmark
- val startMs = System.currentTimeMillis
- var lastReportTime: Long = startMs
- var lastConsumedTime = System.currentTimeMillis
- var currentTimeMillis = lastConsumedTime
+ var currentTimeMillis = System.currentTimeMillis
+ var lastReportTime: Long = currentTimeMillis
+ var lastConsumedTime = currentTimeMillis
while (messagesRead < count && currentTimeMillis - lastConsumedTime <= timeout) {
val records = consumer.poll(Duration.ofMillis(100)).asScala