You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/06/08 03:59:42 UTC
[kafka] branch trunk updated: MINOR: Convert `ReassignPartitionsIntegrationTest` to KRaft (#12258)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4542acdc14 MINOR: Convert `ReassignPartitionsIntegrationTest` to KRaft (#12258)
4542acdc14 is described below
commit 4542acdc14d5ec3daa1f36d8dc24abc244ee24ff
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Jun 7 20:59:24 2022 -0700
MINOR: Convert `ReassignPartitionsIntegrationTest` to KRaft (#12258)
Updates relevant tests in `ReassignPartitionsIntegrationTest` for KRaft. We skip JBOD tests since it is not supported and we skip `AlterPartition` upgrade tests since they are not relevant.
Reviewers: Kvicii <Ka...@gmail.com>, David Arthur <mu...@gmail.com>
---
.../admin/ReassignPartitionsIntegrationTest.scala | 121 ++++++++++++---------
.../kafka/server/QuorumTestHarness.scala | 22 ++--
2 files changed, 87 insertions(+), 56 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
index 769389a5ef..29b4c82740 100644
--- a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
@@ -17,23 +17,23 @@
package kafka.admin
-import java.io.Closeable
-import java.util.{Collections, HashMap, List}
-
import kafka.admin.ReassignPartitionsCommand._
-import kafka.server.{IsrChangePropagationConfig, KafkaConfig, KafkaServer, ZkAlterPartitionManager}
+import kafka.server._
import kafka.utils.Implicits._
-import kafka.utils.TestUtils
-import kafka.server.QuorumTestHarness
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, ConfigEntry, DescribeLogDirsResult, NewTopic}
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{TopicPartition, TopicPartitionReplica}
import org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV1
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
-import org.junit.jupiter.api.{AfterEach, Test, Timeout}
+import org.junit.jupiter.api.{AfterEach, Timeout}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+import java.io.Closeable
+import java.util.{Collections, HashMap, List}
import scala.collection.{Map, Seq, mutable}
import scala.jdk.CollectionConverters._
@@ -54,33 +54,36 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
}.toMap
- @Test
- def testReassignment(): Unit = {
- cluster = new ReassignPartitionsTestCluster(zkConnect)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testReassignment(quorum: String): Unit = {
+ cluster = new ReassignPartitionsTestCluster()
cluster.setup()
executeAndVerifyReassignment()
}
- @Test
- def testReassignmentWithAlterIsrDisabled(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk")) // Note: KRaft requires AlterPartition
+ def testReassignmentWithAlterPartitionDisabled(quorum: String): Unit = {
// Test reassignment when the IBP is on an older version which does not use
- // the `AlterIsr` API. In this case, the controller will register individual
+ // the `AlterPartition` API. In this case, the controller will register individual
// watches for each reassigning partition so that the reassignment can be
// completed as soon as the ISR is expanded.
val configOverrides = Map(KafkaConfig.InterBrokerProtocolVersionProp -> IBP_2_7_IV1.version)
- cluster = new ReassignPartitionsTestCluster(zkConnect, configOverrides = configOverrides)
+ cluster = new ReassignPartitionsTestCluster(configOverrides = configOverrides)
cluster.setup()
executeAndVerifyReassignment()
}
- @Test
- def testReassignmentCompletionDuringPartialUpgrade(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk")) // Note: KRaft requires AlterPartition
+ def testReassignmentCompletionDuringPartialUpgrade(quorum: String): Unit = {
// Test reassignment during a partial upgrade when some brokers are relying on
- // `AlterIsr` and some rely on the old notification logic through Zookeeper.
+ // `AlterPartition` and some rely on the old notification logic through Zookeeper.
// In this test case, broker 0 starts up first on the latest IBP and is typically
// elected as controller. The three remaining brokers start up on the older IBP.
// We want to ensure that reassignment can still complete through the ISR change
- // notification path even though the controller expects `AlterIsr`.
+ // notification path even though the controller expects `AlterPartition`.
// Override change notification settings so that test is not delayed by ISR
// change notification delay
@@ -93,13 +96,13 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
val oldIbpConfig = Map(KafkaConfig.InterBrokerProtocolVersionProp -> IBP_2_7_IV1.version)
val brokerConfigOverrides = Map(1 -> oldIbpConfig, 2 -> oldIbpConfig, 3 -> oldIbpConfig)
- cluster = new ReassignPartitionsTestCluster(zkConnect, brokerConfigOverrides = brokerConfigOverrides)
+ cluster = new ReassignPartitionsTestCluster(brokerConfigOverrides = brokerConfigOverrides)
cluster.setup()
executeAndVerifyReassignment()
}
- def executeAndVerifyReassignment(): Unit = {
+ private def executeAndVerifyReassignment(): Unit = {
val assignment = """{"version":1,"partitions":""" +
"""[{"topic":"foo","partition":0,"replicas":[0,1,3],"log_dirs":["any","any","any"]},""" +
"""{"topic":"bar","partition":0,"replicas":[3,2,0],"log_dirs":["any","any","any"]}""" +
@@ -136,9 +139,10 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet.toSeq))
}
- @Test
- def testHighWaterMarkAfterPartitionReassignment(): Unit = {
- cluster = new ReassignPartitionsTestCluster(zkConnect)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testHighWaterMarkAfterPartitionReassignment(quorum: String): Unit = {
+ cluster = new ReassignPartitionsTestCluster()
cluster.setup()
val assignment = """{"version":1,"partitions":""" +
"""[{"topic":"foo","partition":0,"replicas":[3,1,2],"log_dirs":["any","any","any"]}""" +
@@ -165,9 +169,10 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
s"Expected broker 3 to have the correct high water mark for the partition.")
}
- @Test
- def testAlterReassignmentThrottle(): Unit = {
- cluster = new ReassignPartitionsTestCluster(zkConnect)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAlterReassignmentThrottle(quorum: String): Unit = {
+ cluster = new ReassignPartitionsTestCluster()
cluster.setup()
cluster.produceMessages("foo", 0, 50)
cluster.produceMessages("baz", 2, 60)
@@ -201,9 +206,10 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
/**
* Test running a reassignment with the interBrokerThrottle set.
*/
- @Test
- def testThrottledReassignment(): Unit = {
- cluster = new ReassignPartitionsTestCluster(zkConnect)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testThrottledReassignment(quorum: String): Unit = {
+ cluster = new ReassignPartitionsTestCluster()
cluster.setup()
cluster.produceMessages("foo", 0, 50)
cluster.produceMessages("baz", 2, 60)
@@ -258,9 +264,10 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
waitForBrokerLevelThrottles(unthrottledBrokerConfigs)
}
- @Test
- def testProduceAndConsumeWithReassignmentInProgress(): Unit = {
- cluster = new ReassignPartitionsTestCluster(zkConnect)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testProduceAndConsumeWithReassignmentInProgress(quorum: String): Unit = {
+ cluster = new ReassignPartitionsTestCluster()
cluster.setup()
cluster.produceMessages("baz", 2, 60)
val assignment = """{"version":1,"partitions":""" +
@@ -286,9 +293,10 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
/**
* Test running a reassignment and then cancelling it.
*/
- @Test
- def testCancellation(): Unit = {
- cluster = new ReassignPartitionsTestCluster(zkConnect)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCancellation(quorum: String): Unit = {
+ cluster = new ReassignPartitionsTestCluster()
cluster.setup()
cluster.produceMessages("foo", 0, 200)
cluster.produceMessages("baz", 1, 200)
@@ -369,9 +377,16 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
*/
private def describeBrokerLevelThrottles(brokerIds: Seq[Int]): Map[Int, Map[String, Long]] = {
brokerIds.map { brokerId =>
- val props = zkClient.getEntityConfigs("brokers", brokerId.toString)
+ val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId.toString)
+ val brokerConfigs = cluster.adminClient.describeConfigs(Collections.singleton(brokerResource)).values()
+ .get(brokerResource)
+ .get()
+
val throttles = brokerLevelThrottles.map { throttleName =>
- (throttleName, props.getOrDefault(throttleName, "-1").asInstanceOf[String].toLong)
+ val configValue = Option(brokerConfigs.get(throttleName))
+ .map(_.value)
+ .getOrElse("-1")
+ (throttleName, configValue.toLong)
}.toMap
brokerId -> throttles
}.toMap
@@ -380,11 +395,12 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
/**
* Test moving partitions between directories.
*/
- @Test
- def testLogDirReassignment(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk")) // JBOD not yet implemented for KRaft
+ def testLogDirReassignment(quorum: String): Unit = {
val topicPartition = new TopicPartition("foo", 0)
- cluster = new ReassignPartitionsTestCluster(zkConnect)
+ cluster = new ReassignPartitionsTestCluster()
cluster.setup()
cluster.produceMessages(topicPartition.topic, topicPartition.partition, 700)
@@ -430,11 +446,12 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
assertEquals(reassignment.targetDir, info1.curLogDirs.getOrElse(topicPartition, ""))
}
- @Test
- def testAlterLogDirReassignmentThrottle(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk")) // JBOD not yet implemented for KRaft
+ def testAlterLogDirReassignmentThrottle(quorum: String): Unit = {
val topicPartition = new TopicPartition("foo", 0)
- cluster = new ReassignPartitionsTestCluster(zkConnect)
+ cluster = new ReassignPartitionsTestCluster()
cluster.setup()
cluster.produceMessages(topicPartition.topic, topicPartition.partition, 700)
@@ -560,7 +577,6 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
}
class ReassignPartitionsTestCluster(
- val zkConnect: String,
configOverrides: Map[String, String] = Map.empty,
brokerConfigOverrides: Map[Int, Map[String, String]] = Map.empty
) extends Closeable {
@@ -582,7 +598,7 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
case (brokerId, rack) =>
val config = TestUtils.createBrokerConfig(
nodeId = brokerId,
- zkConnect = zkConnect,
+ zkConnect = zkConnectOrNull,
rack = Some(rack),
enableControlledShutdown = false, // shorten test time
logDirCount = 3)
@@ -597,10 +613,10 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
overrides.forKeyValue(config.setProperty)
}
- config
+ new KafkaConfig(config)
}.toBuffer
- var servers = new mutable.ArrayBuffer[KafkaServer]
+ var servers = new mutable.ArrayBuffer[KafkaBroker]
var brokerList: String = null
@@ -613,7 +629,7 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
def createServers(): Unit = {
brokers.keySet.foreach { brokerId =>
- servers += TestUtils.createServer(KafkaConfig(brokerConfigs(brokerId)))
+ servers += createBroker(brokerConfigs(brokerId))
}
}
@@ -635,6 +651,13 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
case (topicName, parts) =>
TestUtils.waitForAllPartitionsMetadata(servers, topicName, parts.size)
}
+
+ if (isKRaftTest()) {
+ TestUtils.ensureConsistentKRaftMetadata(
+ cluster.servers,
+ controllerServer
+ )
+ }
}
def produceMessages(topic: String, partition: Int, numMessages: Int): Unit = {
diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index b4ec1dbc87..920b24441e 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -53,10 +53,13 @@ trait QuorumImplementation {
def shutdown(): Unit
}
-class ZooKeeperQuorumImplementation(val zookeeper: EmbeddedZookeeper,
- val zkClient: KafkaZkClient,
- val adminZkClient: AdminZkClient,
- val log: Logging) extends QuorumImplementation {
+class ZooKeeperQuorumImplementation(
+ val zookeeper: EmbeddedZookeeper,
+ val zkConnect: String,
+ val zkClient: KafkaZkClient,
+ val adminZkClient: AdminZkClient,
+ val log: Logging
+) extends QuorumImplementation {
override def createBroker(config: KafkaConfig,
time: Time,
startup: Boolean): KafkaBroker = {
@@ -320,8 +323,10 @@ abstract class QuorumTestHarness extends Logging {
val zookeeper = new EmbeddedZookeeper()
var zkClient: KafkaZkClient = null
var adminZkClient: AdminZkClient = null
+ val zkConnect = s"127.0.0.1:${zookeeper.port}"
try {
- zkClient = KafkaZkClient(s"127.0.0.1:${zookeeper.port}",
+ zkClient = KafkaZkClient(
+ zkConnect,
zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled),
zkSessionTimeout,
zkConnectionTimeout,
@@ -336,10 +341,13 @@ abstract class QuorumTestHarness extends Logging {
if (zkClient != null) CoreUtils.swallow(zkClient.close(), this)
throw t
}
- new ZooKeeperQuorumImplementation(zookeeper,
+ new ZooKeeperQuorumImplementation(
+ zookeeper,
+ zkConnect,
zkClient,
adminZkClient,
- this)
+ this
+ )
}
@AfterEach