You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/14 22:15:10 UTC

[kafka] branch trunk updated: MINOR: Fix flaky ConsumerTopicCreationTest (#6727)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 16d4d8c  MINOR: Fix flaky ConsumerTopicCreationTest (#6727)
16d4d8c is described below

commit 16d4d8cafc7394a35caf3354b449505bde56920f
Author: Dhruvil Shah <dh...@confluent.io>
AuthorDate: Tue May 14 15:14:49 2019 -0700

    MINOR: Fix flaky ConsumerTopicCreationTest (#6727)
    
    `ConsumerTopicCreationTest` relied on `KafkaConsumer#poll` to send a `MetadataRequest` within 100ms to verify if a topic is auto created or not. This is brittle and does not guarantee if the request made it to the broker or was processed successfully. This PR fixes the flaky test by adding another topic; we wait until we consume a previously produced record to this topic. This ensures MetadataRequest was processed and we could then check if the topic we're interested in was created or not.
    
    Reviewers: Boyang Chen <bc...@outlook.com>, Jason Gustafson <ja...@confluent.io>
---
 .../kafka/api/ConsumerTopicCreationTest.scala      | 64 ++++++++++------------
 .../kafka/api/IntegrationTestHarness.scala         | 19 +++++++
 2 files changed, 47 insertions(+), 36 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala
index 11fbefd..c145b24 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala
@@ -17,23 +17,22 @@
 
 package integration.kafka.api
 
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
 import java.lang.{Boolean => JBoolean}
 import java.time.Duration
 import java.util
+import java.util.Collections
 
-import scala.collection.JavaConverters._
 import kafka.api.IntegrationTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
+import org.apache.kafka.clients.admin.NewTopic
 import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.clients.producer.ProducerConfig
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.utils.Utils
-import org.junit.{After, Test}
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.junit.Assert._
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
 
 /**
  * Tests behavior of specifying auto topic creation configuration for the consumer and broker
@@ -42,12 +41,10 @@ import org.junit.{After, Test}
 class ConsumerTopicCreationTest(brokerAutoTopicCreationEnable: JBoolean, consumerAllowAutoCreateTopics: JBoolean) extends IntegrationTestHarness {
   override protected def brokerCount: Int = 1
 
-  val topic = "topic"
-  val part = 0
-  val tp = new TopicPartition(topic, part)
+  val topic_1 = "topic-1"
+  val topic_2 = "topic-2"
   val producerClientId = "ConsumerTestProducer"
   val consumerClientId = "ConsumerTestConsumer"
-  var adminClient: AdminClient = null
 
   // configure server properties
   this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
@@ -62,36 +59,31 @@ class ConsumerTopicCreationTest(brokerAutoTopicCreationEnable: JBoolean, consume
   this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100")
   this.consumerConfig.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, consumerAllowAutoCreateTopics.toString)
 
-  @After
-  override def tearDown(): Unit = {
-    if (adminClient != null)
-      Utils.closeQuietly(adminClient, "AdminClient")
-    super.tearDown()
-  }
-
   @Test
   def testAutoTopicCreation(): Unit = {
     val consumer = createConsumer()
-    adminClient = AdminClient.create(createConfig())
+    val producer = createProducer()
+    val adminClient = createAdminClient()
+    val record = new ProducerRecord(topic_1, 0, "key".getBytes, "value".getBytes)
+
+    // create `topic_1` and produce a record to it
+    adminClient.createTopics(Collections.singleton(new NewTopic(topic_1, 1, 1))).all.get
+    producer.send(record).get
 
-    consumer.subscribe(util.Arrays.asList(topic))
-    consumer.poll(Duration.ofMillis(100))
+    consumer.subscribe(util.Arrays.asList(topic_1, topic_2))
 
-    val topicCreated = adminClient.listTopics.names.get.contains(topic)
+    // Wait until the produced record was consumed. This guarantees that metadata request for `topic_2` was sent to the
+    // broker.
+    TestUtils.waitUntilTrue(() => {
+      consumer.poll(Duration.ofMillis(100)).count > 0
+    }, "Timed out waiting to consume")
+
+    // MetadataRequest is guaranteed to create the topic znode if creation was required
+    val topicCreated = zkClient.getAllTopicsInCluster.contains(topic_2)
     if (brokerAutoTopicCreationEnable && consumerAllowAutoCreateTopics)
-      assert(topicCreated == true)
+      assertTrue(topicCreated)
     else
-      assert(topicCreated == false)
-  }
-
-  def createConfig(): util.Map[String, Object] = {
-    val config = new util.HashMap[String, Object]
-    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-    config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000")
-    val securityProps: util.Map[Object, Object] =
-      TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
-    securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
-    config
+      assertFalse(topicCreated)
   }
 }
 
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 5ffbc43..242a305 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -27,6 +27,7 @@ import java.util.Properties
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
 import kafka.server.KafkaConfig
 import kafka.integration.KafkaServerTestHarness
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
 import org.apache.kafka.common.network.{ListenerName, Mode}
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer}
 import org.junit.{After, Before}
@@ -42,10 +43,12 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
 
   val producerConfig = new Properties
   val consumerConfig = new Properties
+  val adminClientConfig = new Properties
   val serverConfig = new Properties
 
   private val consumers = mutable.Buffer[KafkaConsumer[_, _]]()
   private val producers = mutable.Buffer[KafkaProducer[_, _]]()
+  private val adminClients = mutable.Buffer[AdminClient]()
 
   protected def interBrokerListenerName: ListenerName = listenerName
 
@@ -84,6 +87,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
     // Generate client security properties before starting the brokers in case certs are needed
     producerConfig ++= clientSecurityProps("producer")
     consumerConfig ++= clientSecurityProps("consumer")
+    adminClientConfig ++= clientSecurityProps("adminClient")
 
     super.setUp()
 
@@ -98,6 +102,8 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
     consumerConfig.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName)
     consumerConfig.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName)
 
+    adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+
     if (createOffsetsTopic)
       TestUtils.createOffsetsTopic(zkClient, servers)
   }
@@ -131,13 +137,26 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
     consumer
   }
 
+  def createAdminClient(configOverrides: Properties = new Properties): AdminClient = {
+    val props = new Properties
+    props ++= adminClientConfig
+    props ++= configOverrides
+    val adminClient = AdminClient.create(props)
+    adminClients += adminClient
+    adminClient
+  }
+
   @After
   override def tearDown() {
     producers.foreach(_.close(Duration.ZERO))
     consumers.foreach(_.wakeup())
     consumers.foreach(_.close(Duration.ZERO))
+    adminClients.foreach(_.close(Duration.ZERO))
+
     producers.clear()
     consumers.clear()
+    adminClients.clear()
+
     super.tearDown()
   }