You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2024/01/31 10:34:48 UTC
(kafka) branch trunk updated: KAFKA-15711: KRaft support in LogRecoveryTest (#14693)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cdd9c62c553 KAFKA-15711: KRaft support in LogRecoveryTest (#14693)
cdd9c62c553 is described below
commit cdd9c62c5533e67a9057a47005fa1d296b903a11
Author: Gantigmaa Selenge <39...@users.noreply.github.com>
AuthorDate: Wed Jan 31 10:34:42 2024 +0000
KAFKA-15711: KRaft support in LogRecoveryTest (#14693)
Reviewers: Mickael Maison <mi...@gmail.com>, Zihao Lin
---
.../scala/unit/kafka/server/LogRecoveryTest.scala | 59 +++++++++++++---------
.../test/scala/unit/kafka/utils/TestUtils.scala | 16 ++++++
2 files changed, 51 insertions(+), 24 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 37c5a097bc0..53110040885 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -20,17 +20,22 @@ import java.util.Properties
import scala.collection.Seq
-import kafka.utils.TestUtils
+import kafka.utils.{TestUtils, TestInfoUtils}
import TestUtils._
import kafka.server.QuorumTestHarness
import java.io.File
import kafka.server.checkpoints.OffsetCheckpointFile
+import org.apache.kafka.clients.admin.Admin
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
class LogRecoveryTest extends QuorumTestHarness {
@@ -49,18 +54,19 @@ class LogRecoveryTest extends QuorumTestHarness {
val partitionId = 0
val topicPartition = new TopicPartition(topic, partitionId)
- var server1: KafkaServer = _
- var server2: KafkaServer = _
+ var server1: KafkaBroker = _
+ var server2: KafkaBroker = _
def configProps1 = configs.head
def configProps2 = configs.last
val message = "hello"
+ var admin: Admin = _
var producer: KafkaProducer[Integer, String] = _
def hwFile1 = new OffsetCheckpointFile(new File(configProps1.logDirs.head, ReplicaManager.HighWatermarkFilename))
def hwFile2 = new OffsetCheckpointFile(new File(configProps2.logDirs.head, ReplicaManager.HighWatermarkFilename))
- var servers = Seq.empty[KafkaServer]
+ var servers = Seq.empty[KafkaBroker]
// Some tests restart the brokers then produce more data. But since test brokers use random ports, we need
// to use a new producer that knows the new ports
@@ -78,15 +84,15 @@ class LogRecoveryTest extends QuorumTestHarness {
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
- configs = TestUtils.createBrokerConfigs(2, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
+ configs = TestUtils.createBrokerConfigs(2, zkConnectOrNull, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
// start both servers
- server1 = TestUtils.createServer(configProps1)
- server2 = TestUtils.createServer(configProps2)
+ server1 = createBroker(configProps1)
+ server2 = createBroker(configProps2)
servers = List(server1, server2)
- // create topic with 1 partition, 2 replicas, one on each broker
- createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(0,1)), servers = servers)
+ admin = createAdminClient(servers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+ createTopicWithAdmin(admin, topic, servers, controllerServers, replicaAssignment = Map(0 -> Seq(0, 1)))
// create the producer
updateProducer()
@@ -95,12 +101,14 @@ class LogRecoveryTest extends QuorumTestHarness {
@AfterEach
override def tearDown(): Unit = {
producer.close()
+ if (admin != null) admin.close()
TestUtils.shutdownServers(servers)
super.tearDown()
}
- @Test
- def testHWCheckpointNoFailuresSingleLogSegment(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testHWCheckpointNoFailuresSingleLogSegment(quorum: String): Unit = {
val numMessages = 2L
sendMessages(numMessages.toInt)
@@ -116,9 +124,10 @@ class LogRecoveryTest extends QuorumTestHarness {
assertEquals(numMessages, followerHW)
}
- @Test
- def testHWCheckpointWithFailuresSingleLogSegment(): Unit = {
- var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testHWCheckpointWithFailuresSingleLogSegment(quorum: String): Unit = {
+ var leader = getLeaderIdForPartition(servers, topicPartition)
assertEquals(0L, hwFile1.read().getOrElse(topicPartition, 0L))
@@ -131,7 +140,7 @@ class LogRecoveryTest extends QuorumTestHarness {
assertEquals(hw, hwFile1.read().getOrElse(topicPartition, 0L))
// check if leader moves to the other server
- leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = Some(leader))
+ leader = awaitLeaderChange(servers, topicPartition, leader)
assertEquals(1, leader, "Leader must move to broker 1")
// bring the preferred replica back
@@ -139,7 +148,7 @@ class LogRecoveryTest extends QuorumTestHarness {
// Update producer with new server settings
updateProducer()
- leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
+ leader = getLeaderIdForPartition(servers, topicPartition)
assertTrue(leader == 0 || leader == 1,
"Leader must remain on broker 1, in case of ZooKeeper session expiration it can move to broker 0")
@@ -159,7 +168,7 @@ class LogRecoveryTest extends QuorumTestHarness {
server2.startup()
updateProducer()
- leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = Some(leader))
+ leader = awaitLeaderChange(servers, topicPartition, leader)
assertTrue(leader == 0 || leader == 1,
"Leader must remain on broker 0, in case of ZooKeeper session expiration it can move to broker 1")
@@ -176,8 +185,9 @@ class LogRecoveryTest extends QuorumTestHarness {
assertEquals(hw, hwFile2.read().getOrElse(topicPartition, 0L))
}
- @Test
- def testHWCheckpointNoFailuresMultipleLogSegments(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testHWCheckpointNoFailuresMultipleLogSegments(quorum: String): Unit = {
sendMessages(20)
val hw = 20L
// give some time for follower 1 to record leader HW of 600
@@ -192,9 +202,10 @@ class LogRecoveryTest extends QuorumTestHarness {
assertEquals(hw, followerHW)
}
- @Test
- def testHWCheckpointWithFailuresMultipleLogSegments(): Unit = {
- var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testHWCheckpointWithFailuresMultipleLogSegments(quorum: String): Unit = {
+ var leader = getLeaderIdForPartition(servers, topicPartition)
sendMessages(2)
var hw = 2L
@@ -212,7 +223,7 @@ class LogRecoveryTest extends QuorumTestHarness {
server2.startup()
updateProducer()
// check if leader moves to the other server
- leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = Some(leader))
+ leader = awaitLeaderChange(servers, topicPartition, leader)
assertEquals(1, leader, "Leader must move to broker 1")
assertEquals(hw, hwFile1.read().getOrElse(topicPartition, 0L))
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 3e179f5d04c..16a7e95c0bf 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1359,6 +1359,22 @@ object TestUtils extends Logging {
newLeaderExists.get
}
+ def getLeaderIdForPartition[B <: KafkaBroker](
+ brokers: Seq[B],
+ tp: TopicPartition,
+ timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = {
+ def leaderExists: Option[Int] = {
+ brokers.find { broker =>
+ broker.replicaManager.onlinePartition(tp).exists(_.leaderLogIfLocal.isDefined)
+ }.map(_.config.brokerId)
+ }
+
+ waitUntilTrue(() => leaderExists.isDefined,
+ s"Did not find a leader for partition $tp after $timeout ms", waitTimeMs = timeout)
+
+ leaderExists.get
+ }
+
def waitUntilLeaderIsKnown[B <: KafkaBroker](
brokers: Seq[B],
tp: TopicPartition,