You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/07/29 00:11:41 UTC

[GitHub] [kafka] mumrah commented on a diff in pull request #12456: MINOR: convert some more junit tests to support KRaft

mumrah commented on code in PR #12456:
URL: https://github.com/apache/kafka/pull/12456#discussion_r932766733


##########
core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala:
##########
@@ -345,8 +353,15 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     * Producer will attempt to send messages to the partition specified in each record, and should
     * succeed as long as the partition is included in the metadata.
     */
-  @Test
-  def testSendBeforeAndAfterPartitionExpansion(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSendBeforeAndAfterPartitionExpansion(quorum: String): Unit = {
+    resource(createAdminClient(brokers, listenerName)) { admin =>

Review Comment:
   Interesting, never knew about `TestUtils.resource`. 



##########
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##########
@@ -918,6 +918,76 @@ object TestUtils extends Logging {
     }
   }
 
+  /**
+   *  If neither oldLeaderOpt nor newLeaderOpt is defined, wait until the leader of a partition is elected.
+   *  If oldLeaderOpt is defined, it waits until the new leader is different from the old leader.
+   *  If newLeaderOpt is defined, it waits until the new leader becomes the expected new leader.
+   *
+   * @return The new leader (note that negative values are used to indicate conditions like NoLeader and
+   *         LeaderDuringDelete).
+   * @throws AssertionError if the expected condition is not true within the timeout.
+   */
+  def waitUntilLeaderIsElectedOrChangedWithAdmin(
+    admin: Admin,
+    topic: String,
+    partition: Int,
+    timeoutMs: Long = 30000L,
+    oldLeaderOpt: Option[Int] = None,
+    newLeaderOpt: Option[Int] = None
+  ): Int = {
+    require(!(oldLeaderOpt.isDefined && newLeaderOpt.isDefined), "Can't define both the old and the new leader")
+    val startTime = System.currentTimeMillis()
+    val topicPartition = new TopicPartition(topic, partition)
+
+    trace(s"Waiting for leader to be elected or changed for partition $topicPartition, old leader is $oldLeaderOpt, " +
+      s"new leader is $newLeaderOpt")
+
+    var leader: Option[Int] = None
+    var electedOrChangedLeader: Option[Int] = None
+    while (electedOrChangedLeader.isEmpty && System.currentTimeMillis() < startTime + timeoutMs) {

Review Comment:
   Would it simplify any of this to use `computeUntilTrue` or one of the other helper "until" methods in TestUtils?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org