You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/04/28 23:59:45 UTC

git commit: kafka-1424; (followup patch) transient unit test failure in testSendWithDeadBroker; patched by Jun Rao; reviewed by Guozhang Wang and Neha Narkhede

Repository: kafka
Updated Branches:
  refs/heads/trunk 8a0314d01 -> 631536327


kafka-1424; (followup patch) transient unit test failure in testSendWithDeadBroker; patched by Jun Rao; reviewed by Guozhang Wang and Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/63153632
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/63153632
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/63153632

Branch: refs/heads/trunk
Commit: 631536327e3aad866d7c797984794a8800fa9fc3
Parents: 8a0314d
Author: Jun Rao <ju...@gmail.com>
Authored: Mon Apr 28 14:59:35 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Apr 28 14:59:35 2014 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/MetadataCache.scala | 13 +++----------
 core/src/test/scala/unit/kafka/admin/AdminTest.scala | 10 +++++-----
 core/src/test/scala/unit/kafka/utils/TestUtils.scala | 10 +++++++++-
 3 files changed, 17 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/63153632/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index a8b7bf7..3198cdf 100644
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -96,18 +96,11 @@ private[server] class MetadataCache {
     }
   }
 
-  def getPartitionInfos(topic: String) = {
-    inLock(partitionMetadataLock.readLock()) {
-      cache(topic)
-    }
-  }
-
-  def containsTopicAndPartition(topic: String,
-                                partitionId: Int): Boolean = {
+  def getPartitionInfo(topic: String, partitionId: Int): Option[PartitionStateInfo] = {
     inLock(partitionMetadataLock.readLock()) {
       cache.get(topic) match {
-        case Some(partitionInfos) => partitionInfos.contains(partitionId)
-        case None => false
+        case Some(partitionInfos) => partitionInfos.get(partitionId)
+        case None => None
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/63153632/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index a8d92f6..4f6ddca 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -330,10 +330,10 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     try {
       // wait for the update metadata request to trickle to the brokers
       TestUtils.waitUntilTrue(() =>
-        activeServers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfos(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3),
+        activeServers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3),
         "Topic test not created after timeout")
       assertEquals(0, partitionsRemaining.size)
-      var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfos(topic)(partition)
+      var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get
       var leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
       assertEquals(0, leaderAfterShutdown)
       assertEquals(2, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size)
@@ -342,15 +342,15 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
       partitionsRemaining = controller.shutdownBroker(1)
       assertEquals(0, partitionsRemaining.size)
       activeServers = servers.filter(s => s.config.brokerId == 0)
-      partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfos(topic)(partition)
+      partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get
       leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
       assertEquals(0, leaderAfterShutdown)
 
-      assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfos(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
+      assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
       partitionsRemaining = controller.shutdownBroker(0)
       assertEquals(1, partitionsRemaining.size)
       // leader doesn't change since all the replicas are shut down
-      assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfos(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
+      assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
     }
     finally {
       servers.foreach(_.shutdown())

http://git-wip-us.apache.org/repos/asf/kafka/blob/63153632/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index f5a7a5b..384c74e 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -581,7 +581,15 @@ object TestUtils extends Logging {
 
   def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, timeout: Long = 5000L) = {
     TestUtils.waitUntilTrue(() =>
-      servers.foldLeft(true)(_ && _.apis.metadataCache.containsTopicAndPartition(topic, partition)),
+      servers.foldLeft(true) {
+        (result, server) =>
+          val partitionStateOpt = server.apis.metadataCache.getPartitionInfo(topic, partition)
+          partitionStateOpt match {
+            case None => false
+            case Some(partitionState) =>
+            result && Request.isValidBrokerId(partitionState.leaderIsrAndControllerEpoch.leaderAndIsr.leader)
+          }
+      },
       "Partition [%s,%d] metadata not propagated after %d ms".format(topic, partition, timeout),
       waitTime = timeout)
   }