You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2019/06/07 06:46:43 UTC

[kafka] branch 2.3 updated: KAFKA-8461: Wait for follower to join the ISR in testUncleanLeaderElectionDisabled Test

This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new d6e42be  KAFKA-8461: Wait for follower to join the ISR in testUncleanLeaderElectionDisabled Test
d6e42be is described below

commit d6e42be90053c2d5285c1b0053f5deb49cc88855
Author: Manikumar Reddy <ma...@gmail.com>
AuthorDate: Fri Jun 7 12:15:03 2019 +0530

    KAFKA-8461: Wait for follower to join the ISR in testUncleanLeaderElectionDisabled Test
    
    Author: Manikumar Reddy <ma...@gmail.com>
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, Boyang Chen <bo...@confluent.io>
    
    Closes #6887 from omkreddy/unclean-leader
    
    (cherry picked from commit 060701fb6f000046e19ccb94a5e366377d0a2599)
    Signed-off-by: Manikumar Reddy <ma...@confluent.io>
---
 .../kafka/integration/UncleanLeaderElectionTest.scala    | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 2974761..c2cd367 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -219,16 +219,16 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     assertEquals(List("first"), consumeAllMessages(topic, 1))
 
     // shutdown follower server
-    servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server))
+    servers.filter(server => server.config.brokerId == followerId).foreach(server => shutdownServer(server))
 
     produceMessage(servers, topic, "second")
     assertEquals(List("first", "second"), consumeAllMessages(topic, 2))
 
     //remove any previous unclean election metric
-    servers.map(server => server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec"))
+    servers.foreach(server => server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec"))
 
     // shutdown leader and then restart follower
-    servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server))
+    servers.filter(server => server.config.brokerId == leaderId).foreach(server => shutdownServer(server))
     val followerServer = servers.find(_.config.brokerId == followerId).get
     followerServer.startup()
 
@@ -247,13 +247,17 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     assertEquals(List.empty[String], consumeAllMessages(topic, 0))
 
     // restart leader temporarily to send a successfully replicated message
-    servers.filter(server => server.config.brokerId == leaderId).map(server => server.startup())
+    servers.filter(server => server.config.brokerId == leaderId).foreach(server => server.startup())
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(leaderId))
 
     produceMessage(servers, topic, "third")
-    waitUntilMetadataIsPropagated(servers, topic, partitionId)
-    servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server))
+    //make sure follower server joins the ISR
+    TestUtils.waitUntilTrue(() => {
+      val partitionInfoOpt = followerServer.metadataCache.getPartitionInfo(topic, partitionId)
+      partitionInfoOpt.isDefined && partitionInfoOpt.get.basePartitionState.isr.contains(followerId)
+    }, "Inconsistent metadata after first server startup")
 
+    servers.filter(server => server.config.brokerId == leaderId).foreach(server => shutdownServer(server))
     // verify clean leader transition to ISR follower
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(followerId))