You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ch...@apache.org on 2023/02/23 11:13:37 UTC
[kafka] branch trunk updated: MINOR: stabilize LeaderElectionTest#testLeaderElectionAndEpoch (#13290)
This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 61ece48a862 MINOR: stabilize LeaderElectionTest#testLeaderElectionAndEpoch (#13290)
61ece48a862 is described below
commit 61ece48a86263d8c4275effecd13e6f70f928c07
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Thu Feb 23 19:13:09 2023 +0800
MINOR: stabilize LeaderElectionTest#testLeaderElectionAndEpoch (#13290)
Reviewers: Luke Chen <sh...@gmail.com>
---
core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala | 3 ++-
core/src/test/scala/unit/kafka/utils/TestUtils.scala | 4 +++-
2 files changed, 5 insertions(+), 2 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 259ea91df87..6e7ca0747c9 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -86,7 +86,8 @@ class LeaderElectionTest extends QuorumTestHarness {
// kill the server hosting the preferred replica/initial leader
servers.head.shutdown()
// check if leader moves to the other server
- val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = Some(leader1))
+ val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId,
+ oldLeaderOpt = Some(leader1), ignoreNoLeader = true)
val leaderEpoch2 = zkClient.getEpochForPartition(new TopicPartition(topic, partitionId)).get
assertEquals(1, leader2, "Leader must move to broker 1")
// new leaderEpoch will be leaderEpoch1+2, one increment during ReplicaStateMachine.startup()-> handleStateChanges
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 2c9c4ae6690..2a71f86a453 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -900,10 +900,12 @@ object TestUtils extends Logging {
partition: Int,
timeoutMs: Long = 30000L,
oldLeaderOpt: Option[Int] = None,
- newLeaderOpt: Option[Int] = None
+ newLeaderOpt: Option[Int] = None,
+ ignoreNoLeader: Boolean = false
): Int = {
def getPartitionLeader(topic: String, partition: Int): Option[Int] = {
zkClient.getLeaderForPartition(new TopicPartition(topic, partition))
+ .filter(p => !ignoreNoLeader || p != LeaderAndIsr.NoLeader)
}
doWaitUntilLeaderIsElectedOrChanged(getPartitionLeader, topic, partition, timeoutMs, oldLeaderOpt, newLeaderOpt)
}