You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/11/30 06:21:52 UTC
[kafka] branch 2.1 updated: MINOR: Fix handling of dummy record in
EndToEndLatency tool
This is an automated email from the ASF dual-hosted git repository.
ewencp pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new b0860dc MINOR: Fix handling of dummy record in EndToEndLatency tool
b0860dc is described below
commit b0860dcb3fd00b8099d43ee941ead1f6a82167c5
Author: Anna Povzner <an...@confluent.io>
AuthorDate: Thu Nov 29 22:21:20 2018 -0800
MINOR: Fix handling of dummy record in EndToEndLatency tool
EndToEndLatency tool produces a dummy record in case the topic does not exist. This behavior was introduced in this PR https://github.com/apache/kafka/pull/5319 as part of updating the tool to use latest consumer API. However, if we run the tool with producer acks == 1, the high watermark may not be updated before we reset consumer offsets to latest. In rare cases when this happens, the tool will throw an exception in the for loop where the consumer will unexpectedly consume the dumm [...]
This PR checks if topic exists, and creates the topic using AdminClient if it does not exist.
Author: Anna Povzner <an...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #5950 from apovzner/fix-EndToEndLatency
(cherry picked from commit 3acebe63836b4a30d21f8c2ca2934e1a0fcad2f5)
Signed-off-by: Ewen Cheslack-Postava <me...@ewencp.org>
---
.../main/scala/kafka/tools/EndToEndLatency.scala | 41 +++++++++++++++++-----
1 file changed, 32 insertions(+), 9 deletions(-)
diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
index 4849b1e..8107584 100755
--- a/core/src/main/scala/kafka/tools/EndToEndLatency.scala
+++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
@@ -19,9 +19,11 @@ package kafka.tools
import java.nio.charset.StandardCharsets
import java.time.Duration
-import java.util.{Arrays, Properties}
+import java.util.{Collections, Arrays, Properties}
import kafka.utils.Exit
+import org.apache.kafka.clients.admin.NewTopic
+import org.apache.kafka.clients.{admin, CommonClientConfigs}
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.TopicPartition
@@ -44,6 +46,8 @@ import scala.util.Random
object EndToEndLatency {
private val timeout: Long = 60000
+ private val defaultReplicationFactor: Short = 1
+ private val defaultNumPartitions: Int = 1
def main(args: Array[String]) {
if (args.length != 5 && args.length != 6) {
@@ -61,10 +65,13 @@ object EndToEndLatency {
if (!List("1", "all").contains(producerAcks))
throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all")
- def loadProps: Properties = propsFile.map(Utils.loadProps).getOrElse(new Properties())
+ def loadPropsWithBootstrapServers: Properties = {
+ val props = propsFile.map(Utils.loadProps).getOrElse(new Properties())
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+ props
+ }
- val consumerProps = loadProps
- consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+ val consumerProps = loadPropsWithBootstrapServers
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-" + System.currentTimeMillis())
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
@@ -73,8 +80,7 @@ object EndToEndLatency {
consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0") //ensure we have no temporal batching
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
- val producerProps = loadProps
- producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+ val producerProps = loadPropsWithBootstrapServers
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0") //ensure writes are synchronous
producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MaxValue.toString)
producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString)
@@ -82,15 +88,22 @@ object EndToEndLatency {
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
- // sends a dummy message to create the topic if it doesn't exist
- producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, Array[Byte]())).get()
-
def finalise() {
consumer.commitSync()
producer.close()
consumer.close()
}
+ // create topic if it does not exist
+ if (!consumer.listTopics().containsKey(topic)) {
+ try {
+ createTopic(topic, loadPropsWithBootstrapServers)
+ } catch {
+ case t: Throwable =>
+ finalise()
+ throw new RuntimeException(s"Failed to create topic $topic", t)
+ }
+ }
val topicPartitions = consumer.partitionsFor(topic).asScala
.map(p => new TopicPartition(p.topic(), p.partition())).asJava
@@ -153,4 +166,14 @@ object EndToEndLatency {
def randomBytesOfLen(random: Random, len: Int): Array[Byte] = {
Array.fill(len)((random.nextInt(26) + 65).toByte)
}
+
+ def createTopic(topic: String, props: Properties): Unit = {
+ println("Topic \"%s\" does not exist. Will create topic with %d partition(s) and replication factor = %d"
+ .format(topic, defaultNumPartitions, defaultReplicationFactor))
+
+ val adminClient = admin.AdminClient.create(props)
+ val newTopic = new NewTopic(topic, defaultNumPartitions, defaultReplicationFactor)
+ try adminClient.createTopics(Collections.singleton(newTopic)).all().get()
+ finally Utils.closeQuietly(adminClient, "AdminClient")
+ }
}