You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/06/10 19:21:11 UTC
[kafka] branch trunk updated: MINOR: Fix transient failure in
PreferredReplicaLeaderElectionCommandTest (#6908)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1e5227c MINOR: Fix transient failure in PreferredReplicaLeaderElectionCommandTest (#6908)
1e5227c is described below
commit 1e5227c230c20af4a8182c6324ffbb481712b718
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Jun 10 12:20:51 2019 -0700
MINOR: Fix transient failure in PreferredReplicaLeaderElectionCommandTest (#6908)
We have seen this failing recently due to the follower error:
```
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:366)
at scala.None$.get(Option.scala:364)
at kafka.admin.PreferredReplicaLeaderElectionCommandTest.getLeader(PreferredReplicaLeaderElectionCommandTest.scala:101)
at kafka.admin.PreferredReplicaLeaderElectionCommandTest.testNoopElection(PreferredReplicaLeaderElectionCommandTest.scala:240)
```
We need to wait for the leader to be available.
Reviewers: David Arthur <mu...@gmail.com>
---
...PreferredReplicaLeaderElectionCommandTest.scala | 54 ++++++++++++----------
.../test/scala/unit/kafka/utils/TestUtils.scala | 49 +++++++++++++-------
2 files changed, 62 insertions(+), 41 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala
index 03ffd2f..38f2430 100644
--- a/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.errors.PreferredLeaderNotAvailableException
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.test
import org.junit.Assert._
import org.junit.{After, Test}
@@ -63,9 +64,9 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
// create brokers
servers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
// create the topic
- partitionsAndAssignments.foreach { case (tp, assigment) =>
- zkClient.createTopicAssignment(tp.topic(),
- Map(tp -> assigment))
+ partitionsAndAssignments.foreach { case (tp, assignment) =>
+ zkClient.createTopicAssignment(tp.topic,
+ Map(tp -> assignment))
}
// wait until replica log is created on every broker
TestUtils.waitUntilTrue(
@@ -97,8 +98,11 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
servers.find(p => p.kafkaController.isActive)
}
- private def getLeader(topicPartition: TopicPartition) = {
- servers(0).metadataCache.getPartitionInfo(topicPartition.topic(), topicPartition.partition()).get.basePartitionState.leader
+ private def awaitLeader(topicPartition: TopicPartition, timeoutMs: Long = test.TestUtils.DEFAULT_MAX_WAIT_MS): Int = {
+ TestUtils.awaitValue(() => {
+ servers.head.metadataCache.getPartitionInfo(topicPartition.topic, topicPartition.partition)
+ .map(_.basePartitionState.leader)
+ }, s"Timed out waiting to find current leader of $topicPartition", timeoutMs)
}
private def bootstrapServer(broker: Int = 0): String = {
@@ -118,11 +122,11 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
createTestTopicAndCluster(testPartitionAndAssignment)
bounceServer(testPartitionPreferredLeader, testPartition)
// Check the leader for the partition is not the preferred one
- assertNotEquals(testPartitionPreferredLeader, getLeader(testPartition))
+ assertNotEquals(testPartitionPreferredLeader, awaitLeader(testPartition))
PreferredReplicaLeaderElectionCommand.run(Array(
"--bootstrap-server", s"${bootstrapServer(1)},${bootstrapServer(0)}"))
// Check the leader for the partition IS the preferred one
- assertEquals(testPartitionPreferredLeader, getLeader(testPartition))
+ assertEquals(testPartitionPreferredLeader, awaitLeader(testPartition))
}
/** Test the case when an invalid broker is given for --bootstrap-broker */
@@ -145,11 +149,11 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
createTestTopicAndCluster(testPartitionAndAssignment)
bounceServer(testPartitionPreferredLeader, testPartition)
// Check the leader for the partition is not the preferred one
- assertNotEquals(testPartitionPreferredLeader, getLeader(testPartition))
+ assertNotEquals(testPartitionPreferredLeader, awaitLeader(testPartition))
PreferredReplicaLeaderElectionCommand.run(Array(
"--bootstrap-server", bootstrapServer()))
// Check the leader for the partition IS the preferred one
- assertEquals(testPartitionPreferredLeader, getLeader(testPartition))
+ assertEquals(testPartitionPreferredLeader, awaitLeader(testPartition))
}
private def toJsonFile(partitions: Set[TopicPartition]): File = {
@@ -167,7 +171,7 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
createTestTopicAndCluster(testPartitionAndAssignment)
bounceServer(testPartitionPreferredLeader, testPartition)
// Check the leader for the partition is not the preferred one
- assertNotEquals(testPartitionPreferredLeader, getLeader(testPartition))
+ assertNotEquals(testPartitionPreferredLeader, awaitLeader(testPartition))
val jsonFile = toJsonFile(testPartitionAndAssignment.keySet)
try {
PreferredReplicaLeaderElectionCommand.run(Array(
@@ -177,7 +181,7 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
jsonFile.delete()
}
// Check the leader for the partition IS the preferred one
- assertEquals(testPartitionPreferredLeader, getLeader(testPartition))
+ assertEquals(testPartitionPreferredLeader, awaitLeader(testPartition))
}
/** Test the case where a topic does not exist */
@@ -217,8 +221,8 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
createTestTopicAndCluster(testPartitionAndAssignment)
bounceServer(testPartitionPreferredLeader, testPartitionA)
// Check the leader for the partition is not the preferred one
- assertNotEquals(testPartitionPreferredLeader, getLeader(testPartitionA))
- assertNotEquals(testPartitionPreferredLeader, getLeader(testPartitionB))
+ assertNotEquals(testPartitionPreferredLeader, awaitLeader(testPartitionA))
+ assertNotEquals(testPartitionPreferredLeader, awaitLeader(testPartitionB))
val jsonFile = toJsonFile(testPartitionAndAssignment.keySet)
try {
PreferredReplicaLeaderElectionCommand.run(Array(
@@ -228,16 +232,16 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
jsonFile.delete()
}
// Check the leader for the partition IS the preferred one
- assertEquals(testPartitionPreferredLeader, getLeader(testPartitionA))
- assertEquals(testPartitionPreferredLeader, getLeader(testPartitionB))
+ assertEquals(testPartitionPreferredLeader, awaitLeader(testPartitionA))
+ assertEquals(testPartitionPreferredLeader, awaitLeader(testPartitionB))
}
/** What happens when the preferred replica is already the leader? */
@Test
def testNoopElection() {
createTestTopicAndCluster(testPartitionAndAssignment)
- // Don't bounce the server. Doublec heck the leader for the partition is the preferred one
- assertEquals(testPartitionPreferredLeader, getLeader(testPartition))
+ // Don't bounce the server. Doublecheck the leader for the partition is the preferred one
+ assertEquals(testPartitionPreferredLeader, awaitLeader(testPartition))
val jsonFile = toJsonFile(testPartitionAndAssignment.keySet)
try {
// Now do the election, even though the preferred replica is *already* the leader
@@ -245,7 +249,7 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
"--bootstrap-server", bootstrapServer(),
"--path-to-json-file", jsonFile.getAbsolutePath))
// Check the leader for the partition still is the preferred one
- assertEquals(testPartitionPreferredLeader, getLeader(testPartition))
+ assertEquals(testPartitionPreferredLeader, awaitLeader(testPartition))
} finally {
jsonFile.delete()
}
@@ -257,7 +261,7 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
createTestTopicAndCluster(testPartitionAndAssignment)
bounceServer(testPartitionPreferredLeader, testPartition)
// Check the leader for the partition is not the preferred one
- val leader = getLeader(testPartition)
+ val leader = awaitLeader(testPartition)
assertNotEquals(testPartitionPreferredLeader, leader)
// Now kill the preferred one
servers(testPartitionPreferredLeader).shutdown()
@@ -275,7 +279,7 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
assertTrue(suppressed.isInstanceOf[PreferredLeaderNotAvailableException])
assertTrue(suppressed.getMessage, suppressed.getMessage.contains("Failed to elect leader for partition test-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"))
// Check we still have the same leader
- assertEquals(leader, getLeader(testPartition))
+ assertEquals(leader, awaitLeader(testPartition))
} finally {
jsonFile.delete()
}
@@ -287,7 +291,7 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
createTestTopicAndCluster(testPartitionAndAssignment)
bounceServer(testPartitionPreferredLeader, testPartition)
// Check the leader for the partition is not the preferred one
- val leader = getLeader(testPartition)
+ val leader = awaitLeader(testPartition)
assertNotEquals(testPartitionPreferredLeader, leader)
// Now kill the controller just before we trigger the election
val controller = getController().get.config.brokerId
@@ -303,7 +307,7 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
case e: AdminCommandFailedException =>
assertEquals("Timeout waiting for election results", e.getMessage)
// Check we still have the same leader
- assertEquals(leader, getLeader(testPartition))
+ assertEquals(leader, awaitLeader(testPartition))
} finally {
jsonFile.delete()
}
@@ -315,10 +319,10 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
createTestTopicAndCluster(testPartitionAndAssignment, Some(classOf[PreferredReplicaLeaderElectionCommandTestAuthorizer].getName))
bounceServer(testPartitionPreferredLeader, testPartition)
// Check the leader for the partition is not the preferred one
- val leader = getLeader(testPartition)
+ val leader = awaitLeader(testPartition)
assertNotEquals(testPartitionPreferredLeader, leader)
// Check the leader for the partition is not the preferred one
- assertNotEquals(testPartitionPreferredLeader, getLeader(testPartition))
+ assertNotEquals(testPartitionPreferredLeader, awaitLeader(testPartition))
val jsonFile = toJsonFile(testPartitionAndAssignment.keySet)
try {
PreferredReplicaLeaderElectionCommand.run(Array(
@@ -330,7 +334,7 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
assertEquals("Not authorized to perform leader election", e.getMessage)
assertTrue(e.getCause().isInstanceOf[ClusterAuthorizationException])
// Check we still have the same leader
- assertEquals(leader, getLeader(testPartition))
+ assertEquals(leader, awaitLeader(testPartition))
} finally {
jsonFile.delete()
}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index ca62a79..5c9284f 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -313,7 +313,7 @@ object TestUtils extends Logging {
topicConfig: Properties = new Properties): scala.collection.immutable.Map[Int, Int] = {
val adminZkClient = new AdminZkClient(zkClient)
// create topic
- TestUtils.waitUntilTrue( () => {
+ waitUntilTrue( () => {
var hasSessionExpirationException = false
try {
adminZkClient.createTopic(topic, numPartitions, replicationFactor, topicConfig)
@@ -355,7 +355,7 @@ object TestUtils extends Logging {
topicConfig: Properties): scala.collection.immutable.Map[Int, Int] = {
val adminZkClient = new AdminZkClient(zkClient)
// create topic
- TestUtils.waitUntilTrue( () => {
+ waitUntilTrue( () => {
var hasSessionExpirationException = false
try {
adminZkClient.createTopicWithAssignment(topic, topicConfig, partitionReplicaAssignment)
@@ -777,6 +777,23 @@ object TestUtils extends Logging {
}
/**
+ * Wait for the presence of an optional value.
+ *
+ * @param func The function defining the optional value
+ * @param msg Error message in the case that the value never appears
+ * @param waitTimeMs Maximum time to wait
+ * @return The unwrapped value returned by the function
+ */
+ def awaitValue[T](func: () => Option[T], msg: => String, waitTimeMs: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): T = {
+ var value: Option[T] = None
+ waitUntilTrue(() => {
+ value = func()
+ value.isDefined
+ }, msg, waitTimeMs)
+ value.get
+ }
+
+ /**
* Wait until the given condition is true or throw an exception if the given wait time elapses.
*
* @param condition condition to check
@@ -865,7 +882,7 @@ object TestUtils extends Logging {
def waitUntilBrokerMetadataIsPropagated(servers: Seq[KafkaServer],
timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Unit = {
val expectedBrokerIds = servers.map(_.config.brokerId).toSet
- TestUtils.waitUntilTrue(() => servers.forall(server =>
+ waitUntilTrue(() => servers.forall(server =>
expectedBrokerIds == server.dataPlaneRequestProcessor.metadataCache.getAliveBrokers.map(_.id).toSet
), "Timed out waiting for broker metadata to propagate to all servers", timeout)
}
@@ -883,7 +900,7 @@ object TestUtils extends Logging {
def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int,
timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = {
var leader: Int = -1
- TestUtils.waitUntilTrue(() =>
+ waitUntilTrue(() =>
servers.foldLeft(true) {
(result, server) =>
val partitionStateOpt = server.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic, partition)
@@ -916,7 +933,7 @@ object TestUtils extends Logging {
}.map(_.config.brokerId)
}
- TestUtils.waitUntilTrue(() => newLeaderExists.isDefined,
+ waitUntilTrue(() => newLeaderExists.isDefined,
s"Did not observe leader change for partition $tp after $timeout ms", waitTimeMs = timeout)
newLeaderExists.get
@@ -931,7 +948,7 @@ object TestUtils extends Logging {
}.map(_.config.brokerId)
}
- TestUtils.waitUntilTrue(() => leaderIfExists.isDefined,
+ waitUntilTrue(() => leaderIfExists.isDefined,
s"Partition $tp leaders not made yet after $timeout ms", waitTimeMs = timeout)
leaderIfExists.get
@@ -964,18 +981,18 @@ object TestUtils extends Logging {
def ensureNoUnderReplicatedPartitions(zkClient: KafkaZkClient, topic: String, partitionToBeReassigned: Int, assignedReplicas: Seq[Int],
servers: Seq[KafkaServer]) {
val topicPartition = new TopicPartition(topic, partitionToBeReassigned)
- TestUtils.waitUntilTrue(() => {
+ waitUntilTrue(() => {
val inSyncReplicas = zkClient.getInSyncReplicasForPartition(topicPartition)
inSyncReplicas.get.size == assignedReplicas.size
},
"Reassigned partition [%s,%d] is under replicated".format(topic, partitionToBeReassigned))
var leader: Option[Int] = None
- TestUtils.waitUntilTrue(() => {
+ waitUntilTrue(() => {
leader = zkClient.getLeaderForPartition(topicPartition)
leader.isDefined
},
"Reassigned partition [%s,%d] is unavailable".format(topic, partitionToBeReassigned))
- TestUtils.waitUntilTrue(() => {
+ waitUntilTrue(() => {
val leaderBroker = servers.filter(s => s.config.brokerId == leader.get).head
leaderBroker.replicaManager.underReplicatedPartitionCount == 0
},
@@ -1059,33 +1076,33 @@ object TestUtils extends Logging {
def verifyTopicDeletion(zkClient: KafkaZkClient, topic: String, numPartitions: Int, servers: Seq[KafkaServer]) {
val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _))
// wait until admin path for delete topic is deleted, signaling completion of topic deletion
- TestUtils.waitUntilTrue(() => !zkClient.isTopicMarkedForDeletion(topic),
+ waitUntilTrue(() => !zkClient.isTopicMarkedForDeletion(topic),
"Admin path /admin/delete_topics/%s path not deleted even after a replica is restarted".format(topic))
- TestUtils.waitUntilTrue(() => !zkClient.topicExists(topic),
+ waitUntilTrue(() => !zkClient.topicExists(topic),
"Topic path /brokers/topics/%s not deleted after /admin/delete_topics/%s path is deleted".format(topic, topic))
// ensure that the topic-partition has been deleted from all brokers' replica managers
- TestUtils.waitUntilTrue(() =>
+ waitUntilTrue(() =>
servers.forall(server => topicPartitions.forall(tp => server.replicaManager.nonOfflinePartition(tp).isEmpty)),
"Replica manager's should have deleted all of this topic's partitions")
// ensure that logs from all replicas are deleted if delete topic is marked successful in ZooKeeper
assertTrue("Replica logs not deleted after delete topic is complete",
servers.forall(server => topicPartitions.forall(tp => server.getLogManager.getLog(tp).isEmpty)))
// ensure that topic is removed from all cleaner offsets
- TestUtils.waitUntilTrue(() => servers.forall(server => topicPartitions.forall { tp =>
+ waitUntilTrue(() => servers.forall(server => topicPartitions.forall { tp =>
val checkpoints = server.getLogManager.liveLogDirs.map { logDir =>
new OffsetCheckpointFile(new File(logDir, "cleaner-offset-checkpoint")).read()
}
checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
}), "Cleaner offset for deleted partition should have been removed")
import scala.collection.JavaConverters._
- TestUtils.waitUntilTrue(() => servers.forall(server =>
+ waitUntilTrue(() => servers.forall(server =>
server.config.logDirs.forall { logDir =>
topicPartitions.forall { tp =>
!new File(logDir, tp.topic + "-" + tp.partition).exists()
}
}
), "Failed to soft-delete the data to a delete directory")
- TestUtils.waitUntilTrue(() => servers.forall(server =>
+ waitUntilTrue(() => servers.forall(server =>
server.config.logDirs.forall { logDir =>
topicPartitions.forall { tp =>
!java.util.Arrays.asList(new File(logDir).list()).asScala.exists { partitionDirectoryName =>
@@ -1145,7 +1162,7 @@ object TestUtils extends Logging {
def waitAndVerifyAcls(expected: Set[Acl], authorizer: Authorizer, resource: Resource) = {
val newLine = scala.util.Properties.lineSeparator
- TestUtils.waitUntilTrue(() => authorizer.getAcls(resource) == expected,
+ waitUntilTrue(() => authorizer.getAcls(resource) == expected,
s"expected acls:${expected.mkString(newLine + "\t", newLine + "\t", newLine)}" +
s"but got:${authorizer.getAcls(resource).mkString(newLine + "\t", newLine + "\t", newLine)}", waitTimeMs = JTestUtils.DEFAULT_MAX_WAIT_MS)
}