You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2019/03/11 18:43:57 UTC

[kafka] branch trunk updated: KAFKA-8091; Wait for processor shutdown before testing removed listeners (#6425)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9aaa32b  KAFKA-8091; Wait for processor shutdown before testing removed listeners (#6425)
9aaa32b is described below

commit 9aaa32b64d70646c429cd657980b330f7b85d542
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Mon Mar 11 18:43:46 2019 +0000

    KAFKA-8091; Wait for processor shutdown before testing removed listeners (#6425)
    
    DynamicBrokerReconfigurationTest.testAddRemoveSaslListeners removes a listener, waits for the config to be propagated to all brokers and then validates that connections to the removed listener fail. But there is a small timing window between config update and Processor shutdown. Before validating that connections to a removed listener fail, this commit waits for all metrics of the removed listener to be deleted, ensuring that the Processors of the listener have been shutdown.
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>
---
 .../kafka/server/DynamicBrokerReconfigurationTest.scala        | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 80ed131..798961e 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -908,6 +908,8 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
   private def verifyAddListener(listenerName: String, securityProtocol: SecurityProtocol,
                                 saslMechanisms: Seq[String]): Unit = {
     addListener(servers, listenerName, securityProtocol, saslMechanisms)
+    TestUtils.waitUntilTrue(() => servers.forall(hasListenerMetric(_, listenerName)),
+      "Processors not started for new listener")
     if (saslMechanisms.nonEmpty)
       saslMechanisms.foreach { mechanism =>
         verifyListener(securityProtocol, Some(mechanism), s"add-listener-group-$securityProtocol-$mechanism")
@@ -954,6 +956,10 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
 
     TestUtils.waitUntilTrue(() => servers.forall(server => server.config.listeners.size == existingListenerCount - 1),
       "Listeners not updated")
+    // Wait until metrics of the listener have been removed to ensure that processors have been shutdown before
+    // verifying that connections to the removed listener fail.
+    TestUtils.waitUntilTrue(() => !servers.exists(hasListenerMetric(_, listenerName)),
+      "Processors not shutdown for removed listener")
 
     // Test that connections using deleted listener don't work
     val producerFuture = verifyConnectionFailure(producer1)
@@ -992,6 +998,10 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     verifyProduceConsume(producer, consumer, numRecords = 10, topic)
   }
 
+  private def hasListenerMetric(server: KafkaServer, listenerName: String): Boolean = {
+    server.socketServer.metrics.metrics.keySet.asScala.exists(_.tags.get("listener") == listenerName)
+  }
+
   private def fetchBrokerConfigsFromZooKeeper(server: KafkaServer): Properties = {
     val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, server.config.brokerId.toString)
     server.config.dynamicConfig.fromPersistentProps(props, perBrokerConfig = true)