You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/06/10 19:21:11 UTC

[kafka] branch trunk updated: MINOR: Fix transient failure in PreferredReplicaLeaderElectionCommandTest (#6908)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1e5227c  MINOR: Fix transient failure in PreferredReplicaLeaderElectionCommandTest (#6908)
1e5227c is described below

commit 1e5227c230c20af4a8182c6324ffbb481712b718
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Jun 10 12:20:51 2019 -0700

    MINOR: Fix transient failure in PreferredReplicaLeaderElectionCommandTest (#6908)
    
    We have seen this failing recently due to the follower error:
    ```
    java.util.NoSuchElementException: None.get
    	at scala.None$.get(Option.scala:366)
    	at scala.None$.get(Option.scala:364)
    	at kafka.admin.PreferredReplicaLeaderElectionCommandTest.getLeader(PreferredReplicaLeaderElectionCommandTest.scala:101)
    	at kafka.admin.PreferredReplicaLeaderElectionCommandTest.testNoopElection(PreferredReplicaLeaderElectionCommandTest.scala:240)
    ```
    We need to wait for the leader to be available.
    
    Reviewers: David Arthur <mu...@gmail.com>
---
 ...PreferredReplicaLeaderElectionCommandTest.scala | 54 ++++++++++++----------
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 49 +++++++++++++-------
 2 files changed, 62 insertions(+), 41 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala
index 03ffd2f..38f2430 100644
--- a/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.errors.PreferredLeaderNotAvailableException
 import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
 import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.test
 import org.junit.Assert._
 import org.junit.{After, Test}
 
@@ -63,9 +64,9 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
     // create brokers
     servers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     // create the topic
-    partitionsAndAssignments.foreach { case (tp, assigment) =>
-      zkClient.createTopicAssignment(tp.topic(),
-      Map(tp -> assigment))
+    partitionsAndAssignments.foreach { case (tp, assignment) =>
+      zkClient.createTopicAssignment(tp.topic,
+      Map(tp -> assignment))
     }
     // wait until replica log is created on every broker
     TestUtils.waitUntilTrue(
@@ -97,8 +98,11 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
     servers.find(p => p.kafkaController.isActive)
   }
 
-  private def getLeader(topicPartition: TopicPartition) = {
-    servers(0).metadataCache.getPartitionInfo(topicPartition.topic(), topicPartition.partition()).get.basePartitionState.leader
+  private def awaitLeader(topicPartition: TopicPartition, timeoutMs: Long = test.TestUtils.DEFAULT_MAX_WAIT_MS): Int = {
+    TestUtils.awaitValue(() => {
+      servers.head.metadataCache.getPartitionInfo(topicPartition.topic, topicPartition.partition)
+          .map(_.basePartitionState.leader)
+    }, s"Timed out waiting to find current leader of $topicPartition", timeoutMs)
   }
 
   private def bootstrapServer(broker: Int = 0): String = {
@@ -118,11 +122,11 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
     createTestTopicAndCluster(testPartitionAndAssignment)
     bounceServer(testPartitionPreferredLeader, testPartition)
     // Check the leader for the partition is not the preferred one
-    assertNotEquals(testPartitionPreferredLeader, getLeader(testPartition))
+    assertNotEquals(testPartitionPreferredLeader, awaitLeader(testPartition))
     PreferredReplicaLeaderElectionCommand.run(Array(
       "--bootstrap-server", s"${bootstrapServer(1)},${bootstrapServer(0)}"))
     // Check the leader for the partition IS the preferred one
-    assertEquals(testPartitionPreferredLeader, getLeader(testPartition))
+    assertEquals(testPartitionPreferredLeader, awaitLeader(testPartition))
   }
 
   /** Test the case when an invalid broker is given for --bootstrap-broker */
@@ -145,11 +149,11 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
     createTestTopicAndCluster(testPartitionAndAssignment)
     bounceServer(testPartitionPreferredLeader, testPartition)
     // Check the leader for the partition is not the preferred one
-    assertNotEquals(testPartitionPreferredLeader, getLeader(testPartition))
+    assertNotEquals(testPartitionPreferredLeader, awaitLeader(testPartition))
     PreferredReplicaLeaderElectionCommand.run(Array(
       "--bootstrap-server", bootstrapServer()))
     // Check the leader for the partition IS the preferred one
-    assertEquals(testPartitionPreferredLeader, getLeader(testPartition))
+    assertEquals(testPartitionPreferredLeader, awaitLeader(testPartition))
   }
 
   private def toJsonFile(partitions: Set[TopicPartition]): File = {
@@ -167,7 +171,7 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
     createTestTopicAndCluster(testPartitionAndAssignment)
     bounceServer(testPartitionPreferredLeader, testPartition)
     // Check the leader for the partition is not the preferred one
-    assertNotEquals(testPartitionPreferredLeader, getLeader(testPartition))
+    assertNotEquals(testPartitionPreferredLeader, awaitLeader(testPartition))
     val jsonFile = toJsonFile(testPartitionAndAssignment.keySet)
     try {
       PreferredReplicaLeaderElectionCommand.run(Array(
@@ -177,7 +181,7 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
       jsonFile.delete()
     }
     // Check the leader for the partition IS the preferred one
-    assertEquals(testPartitionPreferredLeader, getLeader(testPartition))
+    assertEquals(testPartitionPreferredLeader, awaitLeader(testPartition))
   }
 
   /** Test the case where a topic does not exist */
@@ -217,8 +221,8 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
     createTestTopicAndCluster(testPartitionAndAssignment)
     bounceServer(testPartitionPreferredLeader, testPartitionA)
     // Check the leader for the partition is not the preferred one
-    assertNotEquals(testPartitionPreferredLeader, getLeader(testPartitionA))
-    assertNotEquals(testPartitionPreferredLeader, getLeader(testPartitionB))
+    assertNotEquals(testPartitionPreferredLeader, awaitLeader(testPartitionA))
+    assertNotEquals(testPartitionPreferredLeader, awaitLeader(testPartitionB))
     val jsonFile = toJsonFile(testPartitionAndAssignment.keySet)
     try {
       PreferredReplicaLeaderElectionCommand.run(Array(
@@ -228,16 +232,16 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
       jsonFile.delete()
     }
     // Check the leader for the partition IS the preferred one
-    assertEquals(testPartitionPreferredLeader, getLeader(testPartitionA))
-    assertEquals(testPartitionPreferredLeader, getLeader(testPartitionB))
+    assertEquals(testPartitionPreferredLeader, awaitLeader(testPartitionA))
+    assertEquals(testPartitionPreferredLeader, awaitLeader(testPartitionB))
   }
 
   /** What happens when the preferred replica is already the leader? */
   @Test
   def testNoopElection() {
     createTestTopicAndCluster(testPartitionAndAssignment)
-    // Don't bounce the server. Doublec heck the leader for the partition is the preferred one
-    assertEquals(testPartitionPreferredLeader, getLeader(testPartition))
+    // Don't bounce the server. Doublecheck the leader for the partition is the preferred one
+    assertEquals(testPartitionPreferredLeader, awaitLeader(testPartition))
     val jsonFile = toJsonFile(testPartitionAndAssignment.keySet)
     try {
       // Now do the election, even though the preferred replica is *already* the leader
@@ -245,7 +249,7 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
         "--bootstrap-server", bootstrapServer(),
         "--path-to-json-file", jsonFile.getAbsolutePath))
       // Check the leader for the partition still is the preferred one
-      assertEquals(testPartitionPreferredLeader, getLeader(testPartition))
+      assertEquals(testPartitionPreferredLeader, awaitLeader(testPartition))
     } finally {
       jsonFile.delete()
     }
@@ -257,7 +261,7 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
     createTestTopicAndCluster(testPartitionAndAssignment)
     bounceServer(testPartitionPreferredLeader, testPartition)
     // Check the leader for the partition is not the preferred one
-    val leader = getLeader(testPartition)
+    val leader = awaitLeader(testPartition)
     assertNotEquals(testPartitionPreferredLeader, leader)
     // Now kill the preferred one
     servers(testPartitionPreferredLeader).shutdown()
@@ -275,7 +279,7 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
         assertTrue(suppressed.isInstanceOf[PreferredLeaderNotAvailableException])
         assertTrue(suppressed.getMessage, suppressed.getMessage.contains("Failed to elect leader for partition test-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"))
         // Check we still have the same leader
-        assertEquals(leader, getLeader(testPartition))
+        assertEquals(leader, awaitLeader(testPartition))
     } finally {
       jsonFile.delete()
     }
@@ -287,7 +291,7 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
     createTestTopicAndCluster(testPartitionAndAssignment)
     bounceServer(testPartitionPreferredLeader, testPartition)
     // Check the leader for the partition is not the preferred one
-    val leader = getLeader(testPartition)
+    val leader = awaitLeader(testPartition)
     assertNotEquals(testPartitionPreferredLeader, leader)
     // Now kill the controller just before we trigger the election
     val controller = getController().get.config.brokerId
@@ -303,7 +307,7 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
       case e: AdminCommandFailedException =>
         assertEquals("Timeout waiting for election results", e.getMessage)
         // Check we still have the same leader
-        assertEquals(leader, getLeader(testPartition))
+        assertEquals(leader, awaitLeader(testPartition))
     } finally {
       jsonFile.delete()
     }
@@ -315,10 +319,10 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
     createTestTopicAndCluster(testPartitionAndAssignment, Some(classOf[PreferredReplicaLeaderElectionCommandTestAuthorizer].getName))
     bounceServer(testPartitionPreferredLeader, testPartition)
     // Check the leader for the partition is not the preferred one
-    val leader = getLeader(testPartition)
+    val leader = awaitLeader(testPartition)
     assertNotEquals(testPartitionPreferredLeader, leader)
     // Check the leader for the partition is not the preferred one
-    assertNotEquals(testPartitionPreferredLeader, getLeader(testPartition))
+    assertNotEquals(testPartitionPreferredLeader, awaitLeader(testPartition))
     val jsonFile = toJsonFile(testPartitionAndAssignment.keySet)
     try {
       PreferredReplicaLeaderElectionCommand.run(Array(
@@ -330,7 +334,7 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
         assertEquals("Not authorized to perform leader election", e.getMessage)
         assertTrue(e.getCause().isInstanceOf[ClusterAuthorizationException])
         // Check we still have the same leader
-        assertEquals(leader, getLeader(testPartition))
+        assertEquals(leader, awaitLeader(testPartition))
     } finally {
       jsonFile.delete()
     }
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index ca62a79..5c9284f 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -313,7 +313,7 @@ object TestUtils extends Logging {
                   topicConfig: Properties = new Properties): scala.collection.immutable.Map[Int, Int] = {
     val adminZkClient = new AdminZkClient(zkClient)
     // create topic
-    TestUtils.waitUntilTrue( () => {
+    waitUntilTrue( () => {
       var hasSessionExpirationException = false
       try {
         adminZkClient.createTopic(topic, numPartitions, replicationFactor, topicConfig)
@@ -355,7 +355,7 @@ object TestUtils extends Logging {
                   topicConfig: Properties): scala.collection.immutable.Map[Int, Int] = {
     val adminZkClient = new AdminZkClient(zkClient)
     // create topic
-    TestUtils.waitUntilTrue( () => {
+    waitUntilTrue( () => {
       var hasSessionExpirationException = false
       try {
         adminZkClient.createTopicWithAssignment(topic, topicConfig, partitionReplicaAssignment)
@@ -777,6 +777,23 @@ object TestUtils extends Logging {
   }
 
   /**
+   * Wait for the presence of an optional value.
+   *
+   * @param func The function defining the optional value
+   * @param msg Error message in the case that the value never appears
+   * @param waitTimeMs Maximum time to wait
+   * @return The unwrapped value returned by the function
+   */
+  def awaitValue[T](func: () => Option[T], msg: => String, waitTimeMs: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): T = {
+    var value: Option[T] = None
+    waitUntilTrue(() => {
+      value = func()
+      value.isDefined
+    }, msg, waitTimeMs)
+    value.get
+  }
+
+  /**
     *  Wait until the given condition is true or throw an exception if the given wait time elapses.
     *
     * @param condition condition to check
@@ -865,7 +882,7 @@ object TestUtils extends Logging {
   def waitUntilBrokerMetadataIsPropagated(servers: Seq[KafkaServer],
                                           timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Unit = {
     val expectedBrokerIds = servers.map(_.config.brokerId).toSet
-    TestUtils.waitUntilTrue(() => servers.forall(server =>
+    waitUntilTrue(() => servers.forall(server =>
       expectedBrokerIds == server.dataPlaneRequestProcessor.metadataCache.getAliveBrokers.map(_.id).toSet
     ), "Timed out waiting for broker metadata to propagate to all servers", timeout)
   }
@@ -883,7 +900,7 @@ object TestUtils extends Logging {
   def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int,
                                     timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = {
     var leader: Int = -1
-    TestUtils.waitUntilTrue(() =>
+    waitUntilTrue(() =>
       servers.foldLeft(true) {
         (result, server) =>
           val partitionStateOpt = server.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic, partition)
@@ -916,7 +933,7 @@ object TestUtils extends Logging {
       }.map(_.config.brokerId)
     }
 
-    TestUtils.waitUntilTrue(() => newLeaderExists.isDefined,
+    waitUntilTrue(() => newLeaderExists.isDefined,
       s"Did not observe leader change for partition $tp after $timeout ms", waitTimeMs = timeout)
 
     newLeaderExists.get
@@ -931,7 +948,7 @@ object TestUtils extends Logging {
       }.map(_.config.brokerId)
     }
 
-    TestUtils.waitUntilTrue(() => leaderIfExists.isDefined,
+    waitUntilTrue(() => leaderIfExists.isDefined,
       s"Partition $tp leaders not made yet after $timeout ms", waitTimeMs = timeout)
 
     leaderIfExists.get
@@ -964,18 +981,18 @@ object TestUtils extends Logging {
   def ensureNoUnderReplicatedPartitions(zkClient: KafkaZkClient, topic: String, partitionToBeReassigned: Int, assignedReplicas: Seq[Int],
                                                 servers: Seq[KafkaServer]) {
     val topicPartition = new TopicPartition(topic, partitionToBeReassigned)
-    TestUtils.waitUntilTrue(() => {
+    waitUntilTrue(() => {
         val inSyncReplicas = zkClient.getInSyncReplicasForPartition(topicPartition)
         inSyncReplicas.get.size == assignedReplicas.size
       },
       "Reassigned partition [%s,%d] is under replicated".format(topic, partitionToBeReassigned))
     var leader: Option[Int] = None
-    TestUtils.waitUntilTrue(() => {
+    waitUntilTrue(() => {
         leader = zkClient.getLeaderForPartition(topicPartition)
         leader.isDefined
       },
       "Reassigned partition [%s,%d] is unavailable".format(topic, partitionToBeReassigned))
-    TestUtils.waitUntilTrue(() => {
+    waitUntilTrue(() => {
         val leaderBroker = servers.filter(s => s.config.brokerId == leader.get).head
         leaderBroker.replicaManager.underReplicatedPartitionCount == 0
       },
@@ -1059,33 +1076,33 @@ object TestUtils extends Logging {
   def verifyTopicDeletion(zkClient: KafkaZkClient, topic: String, numPartitions: Int, servers: Seq[KafkaServer]) {
     val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _))
     // wait until admin path for delete topic is deleted, signaling completion of topic deletion
-    TestUtils.waitUntilTrue(() => !zkClient.isTopicMarkedForDeletion(topic),
+    waitUntilTrue(() => !zkClient.isTopicMarkedForDeletion(topic),
       "Admin path /admin/delete_topics/%s path not deleted even after a replica is restarted".format(topic))
-    TestUtils.waitUntilTrue(() => !zkClient.topicExists(topic),
+    waitUntilTrue(() => !zkClient.topicExists(topic),
       "Topic path /brokers/topics/%s not deleted after /admin/delete_topics/%s path is deleted".format(topic, topic))
     // ensure that the topic-partition has been deleted from all brokers' replica managers
-    TestUtils.waitUntilTrue(() =>
+    waitUntilTrue(() =>
       servers.forall(server => topicPartitions.forall(tp => server.replicaManager.nonOfflinePartition(tp).isEmpty)),
       "Replica manager's should have deleted all of this topic's partitions")
     // ensure that logs from all replicas are deleted if delete topic is marked successful in ZooKeeper
     assertTrue("Replica logs not deleted after delete topic is complete",
       servers.forall(server => topicPartitions.forall(tp => server.getLogManager.getLog(tp).isEmpty)))
     // ensure that topic is removed from all cleaner offsets
-    TestUtils.waitUntilTrue(() => servers.forall(server => topicPartitions.forall { tp =>
+    waitUntilTrue(() => servers.forall(server => topicPartitions.forall { tp =>
       val checkpoints = server.getLogManager.liveLogDirs.map { logDir =>
         new OffsetCheckpointFile(new File(logDir, "cleaner-offset-checkpoint")).read()
       }
       checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
     }), "Cleaner offset for deleted partition should have been removed")
     import scala.collection.JavaConverters._
-    TestUtils.waitUntilTrue(() => servers.forall(server =>
+    waitUntilTrue(() => servers.forall(server =>
       server.config.logDirs.forall { logDir =>
         topicPartitions.forall { tp =>
           !new File(logDir, tp.topic + "-" + tp.partition).exists()
         }
       }
     ), "Failed to soft-delete the data to a delete directory")
-    TestUtils.waitUntilTrue(() => servers.forall(server =>
+    waitUntilTrue(() => servers.forall(server =>
       server.config.logDirs.forall { logDir =>
         topicPartitions.forall { tp =>
           !java.util.Arrays.asList(new File(logDir).list()).asScala.exists { partitionDirectoryName =>
@@ -1145,7 +1162,7 @@ object TestUtils extends Logging {
   def waitAndVerifyAcls(expected: Set[Acl], authorizer: Authorizer, resource: Resource) = {
     val newLine = scala.util.Properties.lineSeparator
 
-    TestUtils.waitUntilTrue(() => authorizer.getAcls(resource) == expected,
+    waitUntilTrue(() => authorizer.getAcls(resource) == expected,
       s"expected acls:${expected.mkString(newLine + "\t", newLine + "\t", newLine)}" +
         s"but got:${authorizer.getAcls(resource).mkString(newLine + "\t", newLine + "\t", newLine)}", waitTimeMs = JTestUtils.DEFAULT_MAX_WAIT_MS)
   }