You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2019/03/06 09:55:18 UTC
[kafka] branch trunk updated: KAFKA-7976 - Fix
DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable (#6374)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3747f55 KAFKA-7976 - Fix DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable (#6374)
3747f55 is described below
commit 3747f553366a50ae123c6176cd339e078bed33c1
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Wed Mar 6 09:55:05 2019 +0000
KAFKA-7976 - Fix DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable (#6374)
Ensure that controller is not shutdown in the test.
Reviewers: Manikumar Reddy <ma...@gmail.com>
---
.../kafka/server/DynamicBrokerReconfigurationTest.scala | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index c13b0a3..80ed131 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -32,7 +32,6 @@ import com.yammer.metrics.Metrics
import com.yammer.metrics.core.MetricName
import kafka.admin.ConfigCommand
import kafka.api.{KafkaSasl, SaslSetup}
-import kafka.coordinator.group.OffsetConfig
import kafka.log.LogConfig
import kafka.message.ProducerCompressionCodec
import kafka.network.{Processor, RequestChannel}
@@ -133,7 +132,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
}
TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor = numServers, servers)
- TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, OffsetConfig.DefaultOffsetsTopicNumPartitions,
+ TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, servers.head.config.offsetsTopicPartitions,
replicationFactor = numServers, servers, servers.head.groupCoordinator.offsetsTopicConfigs)
createAdminClient(SecurityProtocol.SSL, SecureInternal)
@@ -445,8 +444,14 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
@Test
def testUncleanLeaderElectionEnable(): Unit = {
+ val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get
+ val controllerId = controller.config.brokerId
+
+ // Create a topic with two replicas on brokers other than the controller
val topic = "testtopic2"
- TestUtils.createTopic(zkClient, topic, 1, replicationFactor = 2, servers)
+ val assignment = Map(0 -> Seq((controllerId + 1) % servers.size, (controllerId + 2) % servers.size))
+ TestUtils.createTopic(zkClient, topic, assignment, servers)
+
val producer = ProducerBuilder().acks(1).build()
val consumer = ConsumerBuilder("unclean-leader-test").enableAutoCommit(false).topic(topic).build()
verifyProduceConsume(producer, consumer, numRecords = 10, topic)
@@ -472,7 +477,6 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
leaderBroker.shutdown()
leaderBroker.awaitShutdown()
followerBroker.startup()
- val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get
// Verify that new leader is not elected with unclean leader disabled since there are no ISRs
TestUtils.waitUntilTrue(() => partitionInfo.leader == null, "Unclean leader elected")
@@ -928,7 +932,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
.build()
verifyProduceConsume(producer1, consumer1, numRecords = 10, topic)
// send another message to check consumer later
- producer1.send(new ProducerRecord(topic, "key", "value")).get(100, TimeUnit.MILLISECONDS)
+ producer1.send(new ProducerRecord(topic, "key", "value")).get(1, TimeUnit.SECONDS)
val config = servers.head.config
val existingListenerCount = config.listeners.size