You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/11/30 06:21:52 UTC

[kafka] branch 2.1 updated: MINOR: Fix handling of dummy record in EndToEndLatency tool

This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new b0860dc  MINOR: Fix handling of dummy record in EndToEndLatency tool
b0860dc is described below

commit b0860dcb3fd00b8099d43ee941ead1f6a82167c5
Author: Anna Povzner <an...@confluent.io>
AuthorDate: Thu Nov 29 22:21:20 2018 -0800

    MINOR: Fix handling of dummy record in EndToEndLatency tool
    
    EndToEndLatency tool produces a dummy record in case the topic does not exist. This behavior was introduced in this PR https://github.com/apache/kafka/pull/5319  as part of updating the tool to use latest consumer API. However, if we run the tool with producer acks == 1, the high watermark may not be updated before we reset consumer offsets to latest. In rare cases when this happens, the tool will throw an exception in the for loop where the consumer will unexpectedly consume the dumm [...]
    
    This PR checks if topic exists, and creates the topic using AdminClient if it does not exist.
    
    Author: Anna Povzner <an...@confluent.io>
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Ewen Cheslack-Postava <ew...@confluent.io>
    
    Closes #5950 from apovzner/fix-EndToEndLatency
    
    (cherry picked from commit 3acebe63836b4a30d21f8c2ca2934e1a0fcad2f5)
    Signed-off-by: Ewen Cheslack-Postava <me...@ewencp.org>
---
 .../main/scala/kafka/tools/EndToEndLatency.scala   | 41 +++++++++++++++++-----
 1 file changed, 32 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
index 4849b1e..8107584 100755
--- a/core/src/main/scala/kafka/tools/EndToEndLatency.scala
+++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
@@ -19,9 +19,11 @@ package kafka.tools
 
 import java.nio.charset.StandardCharsets
 import java.time.Duration
-import java.util.{Arrays, Properties}
+import java.util.{Collections, Arrays, Properties}
 
 import kafka.utils.Exit
+import org.apache.kafka.clients.admin.NewTopic
+import org.apache.kafka.clients.{admin, CommonClientConfigs}
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.TopicPartition
@@ -44,6 +46,8 @@ import scala.util.Random
 
 object EndToEndLatency {
   private val timeout: Long = 60000
+  private val defaultReplicationFactor: Short = 1
+  private val defaultNumPartitions: Int = 1
 
   def main(args: Array[String]) {
     if (args.length != 5 && args.length != 6) {
@@ -61,10 +65,13 @@ object EndToEndLatency {
     if (!List("1", "all").contains(producerAcks))
       throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all")
 
-    def loadProps: Properties = propsFile.map(Utils.loadProps).getOrElse(new Properties())
+    def loadPropsWithBootstrapServers: Properties = {
+      val props = propsFile.map(Utils.loadProps).getOrElse(new Properties())
+      props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+      props
+    }
 
-    val consumerProps = loadProps
-    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    val consumerProps = loadPropsWithBootstrapServers
     consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-" + System.currentTimeMillis())
     consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
     consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
@@ -73,8 +80,7 @@ object EndToEndLatency {
     consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0") //ensure we have no temporal batching
     val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
 
-    val producerProps = loadProps
-    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    val producerProps = loadPropsWithBootstrapServers
     producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0") //ensure writes are synchronous
     producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MaxValue.toString)
     producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString)
@@ -82,15 +88,22 @@ object EndToEndLatency {
     producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
     val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
 
-    // sends a dummy message to create the topic if it doesn't exist
-    producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, Array[Byte]())).get()
-
     def finalise() {
       consumer.commitSync()
       producer.close()
       consumer.close()
     }
 
+    // create topic if it does not exist
+    if (!consumer.listTopics().containsKey(topic)) {
+      try {
+        createTopic(topic, loadPropsWithBootstrapServers)
+      } catch {
+        case t: Throwable =>
+          finalise()
+          throw new RuntimeException(s"Failed to create topic $topic", t)
+      }
+    }
 
     val topicPartitions = consumer.partitionsFor(topic).asScala
       .map(p => new TopicPartition(p.topic(), p.partition())).asJava
@@ -153,4 +166,14 @@ object EndToEndLatency {
   def randomBytesOfLen(random: Random, len: Int): Array[Byte] = {
     Array.fill(len)((random.nextInt(26) + 65).toByte)
   }
+
+  def createTopic(topic: String, props: Properties): Unit = {
+    println("Topic \"%s\" does not exist. Will create topic with %d partition(s) and replication factor = %d"
+              .format(topic, defaultNumPartitions, defaultReplicationFactor))
+
+    val adminClient = admin.AdminClient.create(props)
+    val newTopic = new NewTopic(topic, defaultNumPartitions, defaultReplicationFactor)
+    try adminClient.createTopics(Collections.singleton(newTopic)).all().get()
+    finally Utils.closeQuietly(adminClient, "AdminClient")
+  }
 }