You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/06/08 03:59:42 UTC

[kafka] branch trunk updated: MINOR: Convert `ReassignPartitionsIntegrationTest` to KRaft (#12258)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4542acdc14 MINOR: Convert `ReassignPartitionsIntegrationTest` to KRaft (#12258)
4542acdc14 is described below

commit 4542acdc14d5ec3daa1f36d8dc24abc244ee24ff
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Jun 7 20:59:24 2022 -0700

    MINOR: Convert `ReassignPartitionsIntegrationTest` to KRaft (#12258)
    
    Updates relevant tests in `ReassignPartitionsIntegrationTest` for KRaft. We skip JBOD tests since it is not supported and we skip `AlterPartition` upgrade tests since they are not relevant.
    
    Reviewers: Kvicii <Ka...@gmail.com>, David Arthur <mu...@gmail.com>
---
 .../admin/ReassignPartitionsIntegrationTest.scala  | 121 ++++++++++++---------
 .../kafka/server/QuorumTestHarness.scala           |  22 ++--
 2 files changed, 87 insertions(+), 56 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
index 769389a5ef..29b4c82740 100644
--- a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
@@ -17,23 +17,23 @@
 
 package kafka.admin
 
-import java.io.Closeable
-import java.util.{Collections, HashMap, List}
-
 import kafka.admin.ReassignPartitionsCommand._
-import kafka.server.{IsrChangePropagationConfig, KafkaConfig, KafkaServer, ZkAlterPartitionManager}
+import kafka.server._
 import kafka.utils.Implicits._
-import kafka.utils.TestUtils
-import kafka.server.QuorumTestHarness
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, ConfigEntry, DescribeLogDirsResult, NewTopic}
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{TopicPartition, TopicPartitionReplica}
 import org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV1
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
-import org.junit.jupiter.api.{AfterEach, Test, Timeout}
+import org.junit.jupiter.api.{AfterEach, Timeout}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
+import java.io.Closeable
+import java.util.{Collections, HashMap, List}
 import scala.collection.{Map, Seq, mutable}
 import scala.jdk.CollectionConverters._
 
@@ -54,33 +54,36 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
     }.toMap
 
 
-  @Test
-  def testReassignment(): Unit = {
-    cluster = new ReassignPartitionsTestCluster(zkConnect)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testReassignment(quorum: String): Unit = {
+    cluster = new ReassignPartitionsTestCluster()
     cluster.setup()
     executeAndVerifyReassignment()
   }
 
-  @Test
-  def testReassignmentWithAlterIsrDisabled(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk")) // Note: KRaft requires AlterPartition
+  def testReassignmentWithAlterPartitionDisabled(quorum: String): Unit = {
     // Test reassignment when the IBP is on an older version which does not use
-    // the `AlterIsr` API. In this case, the controller will register individual
+    // the `AlterPartition` API. In this case, the controller will register individual
     // watches for each reassigning partition so that the reassignment can be
     // completed as soon as the ISR is expanded.
     val configOverrides = Map(KafkaConfig.InterBrokerProtocolVersionProp -> IBP_2_7_IV1.version)
-    cluster = new ReassignPartitionsTestCluster(zkConnect, configOverrides = configOverrides)
+    cluster = new ReassignPartitionsTestCluster(configOverrides = configOverrides)
     cluster.setup()
     executeAndVerifyReassignment()
   }
 
-  @Test
-  def testReassignmentCompletionDuringPartialUpgrade(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk")) // Note: KRaft requires AlterPartition
+  def testReassignmentCompletionDuringPartialUpgrade(quorum: String): Unit = {
     // Test reassignment during a partial upgrade when some brokers are relying on
-    // `AlterIsr` and some rely on the old notification logic through Zookeeper.
+    // `AlterPartition` and some rely on the old notification logic through Zookeeper.
     // In this test case, broker 0 starts up first on the latest IBP and is typically
     // elected as controller. The three remaining brokers start up on the older IBP.
     // We want to ensure that reassignment can still complete through the ISR change
-    // notification path even though the controller expects `AlterIsr`.
+    // notification path even though the controller expects `AlterPartition`.
 
     // Override change notification settings so that test is not delayed by ISR
     // change notification delay
@@ -93,13 +96,13 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
     val oldIbpConfig = Map(KafkaConfig.InterBrokerProtocolVersionProp -> IBP_2_7_IV1.version)
     val brokerConfigOverrides = Map(1 -> oldIbpConfig, 2 -> oldIbpConfig, 3 -> oldIbpConfig)
 
-    cluster = new ReassignPartitionsTestCluster(zkConnect, brokerConfigOverrides = brokerConfigOverrides)
+    cluster = new ReassignPartitionsTestCluster(brokerConfigOverrides = brokerConfigOverrides)
     cluster.setup()
 
     executeAndVerifyReassignment()
   }
 
-  def executeAndVerifyReassignment(): Unit = {
+  private def executeAndVerifyReassignment(): Unit = {
     val assignment = """{"version":1,"partitions":""" +
       """[{"topic":"foo","partition":0,"replicas":[0,1,3],"log_dirs":["any","any","any"]},""" +
       """{"topic":"bar","partition":0,"replicas":[3,2,0],"log_dirs":["any","any","any"]}""" +
@@ -136,9 +139,10 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
       describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet.toSeq))
   }
 
-  @Test
-  def testHighWaterMarkAfterPartitionReassignment(): Unit = {
-    cluster = new ReassignPartitionsTestCluster(zkConnect)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testHighWaterMarkAfterPartitionReassignment(quorum: String): Unit = {
+    cluster = new ReassignPartitionsTestCluster()
     cluster.setup()
     val assignment = """{"version":1,"partitions":""" +
       """[{"topic":"foo","partition":0,"replicas":[3,1,2],"log_dirs":["any","any","any"]}""" +
@@ -165,9 +169,10 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
       s"Expected broker 3 to have the correct high water mark for the partition.")
   }
 
-  @Test
-  def testAlterReassignmentThrottle(): Unit = {
-    cluster = new ReassignPartitionsTestCluster(zkConnect)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAlterReassignmentThrottle(quorum: String): Unit = {
+    cluster = new ReassignPartitionsTestCluster()
     cluster.setup()
     cluster.produceMessages("foo", 0, 50)
     cluster.produceMessages("baz", 2, 60)
@@ -201,9 +206,10 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
   /**
    * Test running a reassignment with the interBrokerThrottle set.
    */
-  @Test
-  def testThrottledReassignment(): Unit = {
-    cluster = new ReassignPartitionsTestCluster(zkConnect)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testThrottledReassignment(quorum: String): Unit = {
+    cluster = new ReassignPartitionsTestCluster()
     cluster.setup()
     cluster.produceMessages("foo", 0, 50)
     cluster.produceMessages("baz", 2, 60)
@@ -258,9 +264,10 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
     waitForBrokerLevelThrottles(unthrottledBrokerConfigs)
   }
 
-  @Test
-  def testProduceAndConsumeWithReassignmentInProgress(): Unit = {
-    cluster = new ReassignPartitionsTestCluster(zkConnect)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testProduceAndConsumeWithReassignmentInProgress(quorum: String): Unit = {
+    cluster = new ReassignPartitionsTestCluster()
     cluster.setup()
     cluster.produceMessages("baz", 2, 60)
     val assignment = """{"version":1,"partitions":""" +
@@ -286,9 +293,10 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
   /**
    * Test running a reassignment and then cancelling it.
    */
-  @Test
-  def testCancellation(): Unit = {
-    cluster = new ReassignPartitionsTestCluster(zkConnect)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCancellation(quorum: String): Unit = {
+    cluster = new ReassignPartitionsTestCluster()
     cluster.setup()
     cluster.produceMessages("foo", 0, 200)
     cluster.produceMessages("baz", 1, 200)
@@ -369,9 +377,16 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
    */
   private def describeBrokerLevelThrottles(brokerIds: Seq[Int]): Map[Int, Map[String, Long]] = {
     brokerIds.map { brokerId =>
-      val props = zkClient.getEntityConfigs("brokers", brokerId.toString)
+      val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId.toString)
+      val brokerConfigs = cluster.adminClient.describeConfigs(Collections.singleton(brokerResource)).values()
+        .get(brokerResource)
+        .get()
+
       val throttles = brokerLevelThrottles.map { throttleName =>
-        (throttleName, props.getOrDefault(throttleName, "-1").asInstanceOf[String].toLong)
+        val configValue = Option(brokerConfigs.get(throttleName))
+          .map(_.value)
+          .getOrElse("-1")
+        (throttleName, configValue.toLong)
       }.toMap
       brokerId -> throttles
     }.toMap
@@ -380,11 +395,12 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
   /**
    * Test moving partitions between directories.
    */
-  @Test
-  def testLogDirReassignment(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk")) // JBOD not yet implemented for KRaft
+  def testLogDirReassignment(quorum: String): Unit = {
     val topicPartition = new TopicPartition("foo", 0)
 
-    cluster = new ReassignPartitionsTestCluster(zkConnect)
+    cluster = new ReassignPartitionsTestCluster()
     cluster.setup()
     cluster.produceMessages(topicPartition.topic, topicPartition.partition, 700)
 
@@ -430,11 +446,12 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
     assertEquals(reassignment.targetDir, info1.curLogDirs.getOrElse(topicPartition, ""))
   }
 
-  @Test
-  def testAlterLogDirReassignmentThrottle(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk")) // JBOD not yet implemented for KRaft
+  def testAlterLogDirReassignmentThrottle(quorum: String): Unit = {
     val topicPartition = new TopicPartition("foo", 0)
 
-    cluster = new ReassignPartitionsTestCluster(zkConnect)
+    cluster = new ReassignPartitionsTestCluster()
     cluster.setup()
     cluster.produceMessages(topicPartition.topic, topicPartition.partition, 700)
 
@@ -560,7 +577,6 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
   }
 
   class ReassignPartitionsTestCluster(
-    val zkConnect: String,
     configOverrides: Map[String, String] = Map.empty,
     brokerConfigOverrides: Map[Int, Map[String, String]] = Map.empty
   ) extends Closeable {
@@ -582,7 +598,7 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
       case (brokerId, rack) =>
         val config = TestUtils.createBrokerConfig(
           nodeId = brokerId,
-          zkConnect = zkConnect,
+          zkConnect = zkConnectOrNull,
           rack = Some(rack),
           enableControlledShutdown = false, // shorten test time
           logDirCount = 3)
@@ -597,10 +613,10 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
           overrides.forKeyValue(config.setProperty)
         }
 
-        config
+        new KafkaConfig(config)
     }.toBuffer
 
-    var servers = new mutable.ArrayBuffer[KafkaServer]
+    var servers = new mutable.ArrayBuffer[KafkaBroker]
 
     var brokerList: String = null
 
@@ -613,7 +629,7 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
 
     def createServers(): Unit = {
       brokers.keySet.foreach { brokerId =>
-        servers += TestUtils.createServer(KafkaConfig(brokerConfigs(brokerId)))
+        servers += createBroker(brokerConfigs(brokerId))
       }
     }
 
@@ -635,6 +651,13 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
         case (topicName, parts) =>
           TestUtils.waitForAllPartitionsMetadata(servers, topicName, parts.size)
       }
+
+      if (isKRaftTest()) {
+        TestUtils.ensureConsistentKRaftMetadata(
+          cluster.servers,
+          controllerServer
+        )
+      }
     }
 
     def produceMessages(topic: String, partition: Int, numMessages: Int): Unit = {
diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index b4ec1dbc87..920b24441e 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -53,10 +53,13 @@ trait QuorumImplementation {
   def shutdown(): Unit
 }
 
-class ZooKeeperQuorumImplementation(val zookeeper: EmbeddedZookeeper,
-                                    val zkClient: KafkaZkClient,
-                                    val adminZkClient: AdminZkClient,
-                                    val log: Logging) extends QuorumImplementation {
+class ZooKeeperQuorumImplementation(
+  val zookeeper: EmbeddedZookeeper,
+  val zkConnect: String,
+  val zkClient: KafkaZkClient,
+  val adminZkClient: AdminZkClient,
+  val log: Logging
+) extends QuorumImplementation {
   override def createBroker(config: KafkaConfig,
                             time: Time,
                             startup: Boolean): KafkaBroker = {
@@ -320,8 +323,10 @@ abstract class QuorumTestHarness extends Logging {
     val zookeeper = new EmbeddedZookeeper()
     var zkClient: KafkaZkClient = null
     var adminZkClient: AdminZkClient = null
+    val zkConnect = s"127.0.0.1:${zookeeper.port}"
     try {
-      zkClient = KafkaZkClient(s"127.0.0.1:${zookeeper.port}",
+      zkClient = KafkaZkClient(
+        zkConnect,
         zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled),
         zkSessionTimeout,
         zkConnectionTimeout,
@@ -336,10 +341,13 @@ abstract class QuorumTestHarness extends Logging {
         if (zkClient != null) CoreUtils.swallow(zkClient.close(), this)
         throw t
     }
-    new ZooKeeperQuorumImplementation(zookeeper,
+    new ZooKeeperQuorumImplementation(
+      zookeeper,
+      zkConnect,
       zkClient,
       adminZkClient,
-      this)
+      this
+    )
   }
 
   @AfterEach