You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2019/06/07 06:45:41 UTC
[kafka] branch trunk updated: KAFKA-8461: Wait for follower to join
the ISR in testUncleanLeaderElectionDisabled Test
This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 060701f KAFKA-8461: Wait for follower to join the ISR in testUncleanLeaderElectionDisabled Test
060701f is described below
commit 060701fb6f000046e19ccb94a5e366377d0a2599
Author: Manikumar Reddy <ma...@gmail.com>
AuthorDate: Fri Jun 7 12:15:03 2019 +0530
KAFKA-8461: Wait for follower to join the ISR in testUncleanLeaderElectionDisabled Test
Author: Manikumar Reddy <ma...@gmail.com>
Reviewers: Jason Gustafson <ja...@confluent.io>, Boyang Chen <bo...@confluent.io>
Closes #6887 from omkreddy/unclean-leader
---
.../kafka/integration/UncleanLeaderElectionTest.scala | 16 ++++++++++------
1 file changed, 10 insertions(+), 6 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index c0c8c95..64d7174 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -219,16 +219,16 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
assertEquals(List("first"), consumeAllMessages(topic, 1))
// shutdown follower server
- servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server))
+ servers.filter(server => server.config.brokerId == followerId).foreach(server => shutdownServer(server))
produceMessage(servers, topic, "second")
assertEquals(List("first", "second"), consumeAllMessages(topic, 2))
//remove any previous unclean election metric
- servers.map(server => server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec"))
+ servers.foreach(server => server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec"))
// shutdown leader and then restart follower
- servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server))
+ servers.filter(server => server.config.brokerId == leaderId).foreach(server => shutdownServer(server))
val followerServer = servers.find(_.config.brokerId == followerId).get
followerServer.startup()
@@ -247,13 +247,17 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
assertEquals(List.empty[String], consumeAllMessages(topic, 0))
// restart leader temporarily to send a successfully replicated message
- servers.filter(server => server.config.brokerId == leaderId).map(server => server.startup())
+ servers.filter(server => server.config.brokerId == leaderId).foreach(server => server.startup())
waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(leaderId))
produceMessage(servers, topic, "third")
- waitUntilMetadataIsPropagated(servers, topic, partitionId)
- servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server))
+ //make sure follower server joins the ISR
+ TestUtils.waitUntilTrue(() => {
+ val partitionInfoOpt = followerServer.metadataCache.getPartitionInfo(topic, partitionId)
+ partitionInfoOpt.isDefined && partitionInfoOpt.get.basePartitionState.isr.contains(followerId)
+ }, "Inconsistent metadata after first server startup")
+ servers.filter(server => server.config.brokerId == leaderId).foreach(server => shutdownServer(server))
// verify clean leader transition to ISR follower
waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(followerId))