You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/04/28 23:59:45 UTC
git commit: kafka-1424;
(followup patch) transient unit test failure in
testSendWithDeadBroker; patched by Jun Rao;
reviewed by Guozhang Wang and Neha Narkhede
Repository: kafka
Updated Branches:
refs/heads/trunk 8a0314d01 -> 631536327
kafka-1424; (followup patch) transient unit test failure in testSendWithDeadBroker; patched by Jun Rao; reviewed by Guozhang Wang and Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/63153632
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/63153632
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/63153632
Branch: refs/heads/trunk
Commit: 631536327e3aad866d7c797984794a8800fa9fc3
Parents: 8a0314d
Author: Jun Rao <ju...@gmail.com>
Authored: Mon Apr 28 14:59:35 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Apr 28 14:59:35 2014 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/server/MetadataCache.scala | 13 +++----------
core/src/test/scala/unit/kafka/admin/AdminTest.scala | 10 +++++-----
core/src/test/scala/unit/kafka/utils/TestUtils.scala | 10 +++++++++-
3 files changed, 17 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/63153632/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index a8b7bf7..3198cdf 100644
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -96,18 +96,11 @@ private[server] class MetadataCache {
}
}
- def getPartitionInfos(topic: String) = {
- inLock(partitionMetadataLock.readLock()) {
- cache(topic)
- }
- }
-
- def containsTopicAndPartition(topic: String,
- partitionId: Int): Boolean = {
+ def getPartitionInfo(topic: String, partitionId: Int): Option[PartitionStateInfo] = {
inLock(partitionMetadataLock.readLock()) {
cache.get(topic) match {
- case Some(partitionInfos) => partitionInfos.contains(partitionId)
- case None => false
+ case Some(partitionInfos) => partitionInfos.get(partitionId)
+ case None => None
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/63153632/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index a8d92f6..4f6ddca 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -330,10 +330,10 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
try {
// wait for the update metadata request to trickle to the brokers
TestUtils.waitUntilTrue(() =>
- activeServers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfos(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3),
+ activeServers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3),
"Topic test not created after timeout")
assertEquals(0, partitionsRemaining.size)
- var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfos(topic)(partition)
+ var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get
var leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
assertEquals(0, leaderAfterShutdown)
assertEquals(2, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size)
@@ -342,15 +342,15 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
partitionsRemaining = controller.shutdownBroker(1)
assertEquals(0, partitionsRemaining.size)
activeServers = servers.filter(s => s.config.brokerId == 0)
- partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfos(topic)(partition)
+ partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get
leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
assertEquals(0, leaderAfterShutdown)
- assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfos(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
+ assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
partitionsRemaining = controller.shutdownBroker(0)
assertEquals(1, partitionsRemaining.size)
// leader doesn't change since all the replicas are shut down
- assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfos(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
+ assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
}
finally {
servers.foreach(_.shutdown())
http://git-wip-us.apache.org/repos/asf/kafka/blob/63153632/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index f5a7a5b..384c74e 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -581,7 +581,15 @@ object TestUtils extends Logging {
def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, timeout: Long = 5000L) = {
TestUtils.waitUntilTrue(() =>
- servers.foldLeft(true)(_ && _.apis.metadataCache.containsTopicAndPartition(topic, partition)),
+ servers.foldLeft(true) {
+ (result, server) =>
+ val partitionStateOpt = server.apis.metadataCache.getPartitionInfo(topic, partition)
+ partitionStateOpt match {
+ case None => false
+ case Some(partitionState) =>
+ result && Request.isValidBrokerId(partitionState.leaderIsrAndControllerEpoch.leaderAndIsr.leader)
+ }
+ },
"Partition [%s,%d] metadata not propagated after %d ms".format(topic, partition, timeout),
waitTime = timeout)
}