You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2019/03/11 18:43:57 UTC
[kafka] branch trunk updated: KAFKA-8091;
Wait for processor shutdown before testing removed listeners (#6425)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9aaa32b KAFKA-8091; Wait for processor shutdown before testing removed listeners (#6425)
9aaa32b is described below
commit 9aaa32b64d70646c429cd657980b330f7b85d542
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Mon Mar 11 18:43:46 2019 +0000
KAFKA-8091; Wait for processor shutdown before testing removed listeners (#6425)
DynamicBrokerReconfigurationTest.testAddRemoveSaslListeners removes a listener, waits for the config to be propagated to all brokers and then validates that connections to the removed listener fail. But there is a small timing window between config update and Processor shutdown. Before validating that connections to a removed listener fail, this commit waits for all metrics of the removed listener to be deleted, ensuring that the Processors of the listener have been shutdown.
Reviewers: Manikumar Reddy <ma...@gmail.com>
---
.../kafka/server/DynamicBrokerReconfigurationTest.scala | 10 ++++++++++
1 file changed, 10 insertions(+)
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 80ed131..798961e 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -908,6 +908,8 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
private def verifyAddListener(listenerName: String, securityProtocol: SecurityProtocol,
saslMechanisms: Seq[String]): Unit = {
addListener(servers, listenerName, securityProtocol, saslMechanisms)
+ TestUtils.waitUntilTrue(() => servers.forall(hasListenerMetric(_, listenerName)),
+ "Processors not started for new listener")
if (saslMechanisms.nonEmpty)
saslMechanisms.foreach { mechanism =>
verifyListener(securityProtocol, Some(mechanism), s"add-listener-group-$securityProtocol-$mechanism")
@@ -954,6 +956,10 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
TestUtils.waitUntilTrue(() => servers.forall(server => server.config.listeners.size == existingListenerCount - 1),
"Listeners not updated")
+ // Wait until metrics of the listener have been removed to ensure that processors have been shutdown before
+ // verifying that connections to the removed listener fail.
+ TestUtils.waitUntilTrue(() => !servers.exists(hasListenerMetric(_, listenerName)),
+ "Processors not shutdown for removed listener")
// Test that connections using deleted listener don't work
val producerFuture = verifyConnectionFailure(producer1)
@@ -992,6 +998,10 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
verifyProduceConsume(producer, consumer, numRecords = 10, topic)
}
+ private def hasListenerMetric(server: KafkaServer, listenerName: String): Boolean = {
+ server.socketServer.metrics.metrics.keySet.asScala.exists(_.tags.get("listener") == listenerName)
+ }
+
private def fetchBrokerConfigsFromZooKeeper(server: KafkaServer): Properties = {
val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, server.config.brokerId.toString)
server.config.dynamicConfig.fromPersistentProps(props, perBrokerConfig = true)