You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/12/18 18:44:12 UTC

[19/30] git commit: KAFKA-633 Fix mostly consistent test failure in testShutdownBroker; patched by Joel Koshy; reviewed by Jun Rao

KAFKA-633 Fix mostly consistent test failure in testShutdownBroker; patched by Joel Koshy; reviewed by Jun Rao

git-svn-id: https://svn.apache.org/repos/asf/kafka/branches/0.8@1418155 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a4c76758
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a4c76758
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a4c76758

Branch: refs/heads/trunk
Commit: a4c767582991787ca4a06b6e2844b23a2010def9
Parents: 98042e9
Author: Joel Jacob Koshy <jj...@apache.org>
Authored: Fri Dec 7 01:09:11 2012 +0000
Committer: Joel Jacob Koshy <jj...@apache.org>
Committed: Fri Dec 7 01:09:11 2012 +0000

----------------------------------------------------------------------
 .../test/scala/unit/kafka/admin/AdminTest.scala    |   61 ++++++++-------
 1 files changed, 32 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a4c76758/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index e1f68dd..7a31b51 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -374,35 +374,38 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
     var controllerId = ZkUtils.getController(zkClient)
     var controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
     var partitionsRemaining = controller.shutdownBroker(2)
-    assertEquals(0, partitionsRemaining)
-    var topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
-    var leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
-    assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
-    assertEquals(2, topicMetadata.partitionsMetadata.head.isr.size)
-
-    leaderBeforeShutdown = leaderAfterShutdown
-    controllerId = ZkUtils.getController(zkClient)
-    controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
-    partitionsRemaining = controller.shutdownBroker(1)
-    assertEquals(0, partitionsRemaining)
-    topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
-    leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
-    assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
-    assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size)
-
-    leaderBeforeShutdown = leaderAfterShutdown
-    controllerId = ZkUtils.getController(zkClient)
-    controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
-    partitionsRemaining = controller.shutdownBroker(0)
-    assertEquals(1, partitionsRemaining)
-    topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
-    leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
-    assertTrue(leaderAfterShutdown == leaderBeforeShutdown)
-    assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size)
-
-    servers.foreach(_.shutdown())
-
-
+    try {
+      assertEquals(0, partitionsRemaining)
+      var topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
+      var leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
+      assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
+      // assertEquals(2, topicMetadata.partitionsMetadata.head.isr.size)
+      assertEquals(2, controller.controllerContext.allLeaders(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
+
+      leaderBeforeShutdown = leaderAfterShutdown
+      controllerId = ZkUtils.getController(zkClient)
+      controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
+      partitionsRemaining = controller.shutdownBroker(1)
+      assertEquals(0, partitionsRemaining)
+      topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
+      leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
+      assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
+      // assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size)
+      assertEquals(1, controller.controllerContext.allLeaders(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
+
+      leaderBeforeShutdown = leaderAfterShutdown
+      controllerId = ZkUtils.getController(zkClient)
+      controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
+      partitionsRemaining = controller.shutdownBroker(0)
+      assertEquals(1, partitionsRemaining)
+      topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
+      leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
+      assertTrue(leaderAfterShutdown == leaderBeforeShutdown)
+      assertEquals(1, controller.controllerContext.allLeaders(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
+    }
+    finally {
+      servers.foreach(_.shutdown())
+    }
   }
 
   private def checkIfReassignPartitionPathExists(): Boolean = {