You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2019/03/06 09:55:18 UTC

[kafka] branch trunk updated: KAFKA-7976 - Fix DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable (#6374)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3747f55  KAFKA-7976 - Fix DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable (#6374)
3747f55 is described below

commit 3747f553366a50ae123c6176cd339e078bed33c1
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Wed Mar 6 09:55:05 2019 +0000

    KAFKA-7976 - Fix DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable (#6374)
    
    Ensure that controller is not shutdown in the test.
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>
---
 .../kafka/server/DynamicBrokerReconfigurationTest.scala    | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index c13b0a3..80ed131 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -32,7 +32,6 @@ import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.MetricName
 import kafka.admin.ConfigCommand
 import kafka.api.{KafkaSasl, SaslSetup}
-import kafka.coordinator.group.OffsetConfig
 import kafka.log.LogConfig
 import kafka.message.ProducerCompressionCodec
 import kafka.network.{Processor, RequestChannel}
@@ -133,7 +132,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     }
 
     TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor = numServers, servers)
-    TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, OffsetConfig.DefaultOffsetsTopicNumPartitions,
+    TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, servers.head.config.offsetsTopicPartitions,
       replicationFactor = numServers, servers, servers.head.groupCoordinator.offsetsTopicConfigs)
 
     createAdminClient(SecurityProtocol.SSL, SecureInternal)
@@ -445,8 +444,14 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
 
   @Test
   def testUncleanLeaderElectionEnable(): Unit = {
+    val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get
+    val controllerId = controller.config.brokerId
+
+    // Create a topic with two replicas on brokers other than the controller
     val topic = "testtopic2"
-    TestUtils.createTopic(zkClient, topic, 1, replicationFactor = 2, servers)
+    val assignment = Map(0 -> Seq((controllerId + 1) % servers.size, (controllerId + 2) % servers.size))
+    TestUtils.createTopic(zkClient, topic, assignment, servers)
+
     val producer = ProducerBuilder().acks(1).build()
     val consumer = ConsumerBuilder("unclean-leader-test").enableAutoCommit(false).topic(topic).build()
     verifyProduceConsume(producer, consumer, numRecords = 10, topic)
@@ -472,7 +477,6 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     leaderBroker.shutdown()
     leaderBroker.awaitShutdown()
     followerBroker.startup()
-    val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get
 
     // Verify that new leader is not elected with unclean leader disabled since there are no ISRs
     TestUtils.waitUntilTrue(() => partitionInfo.leader == null, "Unclean leader elected")
@@ -928,7 +932,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
       .build()
     verifyProduceConsume(producer1, consumer1, numRecords = 10, topic)
     // send another message to check consumer later
-    producer1.send(new ProducerRecord(topic, "key", "value")).get(100, TimeUnit.MILLISECONDS)
+    producer1.send(new ProducerRecord(topic, "key", "value")).get(1, TimeUnit.SECONDS)
 
     val config = servers.head.config
     val existingListenerCount = config.listeners.size