You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/03/06 22:10:41 UTC
git commit: SAMZA-169;
retry on topic metadata refresh failure in kafka system admin. was
ignoring, and losing metadata.
Repository: incubator-samza
Updated Branches:
refs/heads/master 12594fb71 -> 38e828832
SAMZA-169; retry on topic metadata refresh failure in kafka system admin. was ignoring, and losing metadata.
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/38e82883
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/38e82883
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/38e82883
Branch: refs/heads/master
Commit: 38e82883236de40a544d9f7a18cf49c7238dc258
Parents: 12594fb
Author: Martin Kleppmann <ma...@rapportive.com>
Authored: Thu Mar 6 13:10:37 2014 -0800
Committer: Chris Riccomini <cr...@criccomi-ld.linkedin.biz>
Committed: Thu Mar 6 13:10:37 2014 -0800
----------------------------------------------------------------------
.../util/TestExponentialSleepStrategy.scala | 1 -
.../samza/system/kafka/KafkaSystemAdmin.scala | 34 +++++++------
.../system/kafka/TestKafkaSystemAdmin.scala | 50 ++++++++++++++++++--
3 files changed, 66 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/38e82883/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
index 962ca40..3036da9 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
@@ -23,7 +23,6 @@ package org.apache.samza.util
import org.junit.Assert._
import org.junit.Test
-import org.apache.samza.util.ExponentialSleepStrategy
class TestExponentialSleepStrategy {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/38e82883/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index ad5f2fa..5325549 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -109,6 +109,9 @@ class KafkaSystemAdmin(
import KafkaSystemAdmin._
+ def getSystemStreamMetadata(streams: java.util.Set[String]) =
+ getSystemStreamMetadata(streams, new ExponentialSleepStrategy(initialDelayMs = 500))
+
/**
* Given a set of stream names (topics), fetch metadata from Kafka for each
* stream, and return a map from stream name to SystemStreamMetadata for
@@ -116,14 +119,13 @@ class KafkaSystemAdmin(
* if a given SystemStreamPartition is empty. This method will block and
* retry indefinitely until it gets a successful response from Kafka.
*/
- def getSystemStreamMetadata(streams: java.util.Set[String]) = {
+ def getSystemStreamMetadata(streams: java.util.Set[String], retryBackoff: ExponentialSleepStrategy) = {
var partitions = Map[String, Set[Partition]]()
var oldestOffsets = Map[SystemStreamPartition, String]()
var newestOffsets = Map[SystemStreamPartition, String]()
var upcomingOffsets = Map[SystemStreamPartition, String]()
var done = false
var consumer: SimpleConsumer = null
- val retryBackoff = new ExponentialSleepStrategy(initialDelayMs = 500)
debug("Fetching offsets for: %s" format streams)
@@ -189,7 +191,7 @@ class KafkaSystemAdmin(
* Helper method to use topic metadata cache when fetching metadata, so we
* don't hammer Kafka more than we need to.
*/
- private def getTopicMetadata(topics: Set[String]) = {
+ protected def getTopicMetadata(topics: Set[String]) = {
new ClientUtilTopicMetadataStore(brokerListString, clientId)
.getTopicInfo(topics)
}
@@ -202,17 +204,21 @@ class KafkaSystemAdmin(
val brokersToTopicPartitions = metadata
.values
// Convert the topic metadata to a Seq[(Broker, TopicAndPartition)]
- .flatMap(topicMetadata => topicMetadata
- .partitionsMetadata
- // Convert Seq[PartitionMetadata] to Seq[(Broker, TopicAndPartition)]
- .map(partitionMetadata => {
- ErrorMapping.maybeThrowException(partitionMetadata.errorCode)
- val topicAndPartition = new TopicAndPartition(topicMetadata.topic, partitionMetadata.partitionId)
- val leader = partitionMetadata
- .leader
- .getOrElse(throw new SamzaException("Need leaders for all partitions when fetching offsets. No leader available for TopicAndPartition: %s" format topicAndPartition))
- (leader, topicAndPartition)
- }))
+ .flatMap(topicMetadata => {
+ ErrorMapping.maybeThrowException(topicMetadata.errorCode)
+ topicMetadata
+ .partitionsMetadata
+ // Convert Seq[PartitionMetadata] to Seq[(Broker, TopicAndPartition)]
+ .map(partitionMetadata => {
+ ErrorMapping.maybeThrowException(partitionMetadata.errorCode)
+ val topicAndPartition = new TopicAndPartition(topicMetadata.topic, partitionMetadata.partitionId)
+ val leader = partitionMetadata
+ .leader
+ .getOrElse(throw new SamzaException("Need leaders for all partitions when fetching offsets. No leader available for TopicAndPartition: %s" format topicAndPartition))
+ (leader, topicAndPartition)
+ })
+ })
+
// Convert to a Map[Broker, Seq[(Broker, TopicAndPartition)]]
.groupBy(_._1)
// Convert to a Map[Broker, Set[TopicAndPartition]]
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/38e82883/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index aaf11d0..eaa9e53 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -46,7 +46,9 @@ import kafka.consumer.Consumer
import kafka.consumer.ConsumerConfig
import java.util.Properties
import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.SystemStreamMetadata
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.util.ExponentialSleepStrategy
object TestKafkaSystemAdmin {
val TOPIC = "input"
@@ -205,10 +207,6 @@ class TestKafkaSystemAdmin {
val systemName = "test"
val systemAdmin = new KafkaSystemAdmin(systemName, brokers)
- // Get a non-existent topic.
- val initialInputOffsets = systemAdmin.getSystemStreamMetadata(Set("foo"))
- assertEquals(0, initialInputOffsets.size)
-
// Create an empty topic with 50 partitions, but with no offsets.
createTopic
validateTopic(50)
@@ -269,4 +267,48 @@ class TestKafkaSystemAdmin {
assertEquals(sspMetadata.get(new Partition(48)).getNewestOffset, message.offset.toString)
assertEquals("val2", text)
}
+
+ @Test
+ def testNonExistentTopic {
+ val systemAdmin = new KafkaSystemAdmin("test", brokers)
+ val initialOffsets = systemAdmin.getSystemStreamMetadata(Set("non-existent-topic"))
+ val metadata = initialOffsets.getOrElse("non-existent-topic", fail("missing metadata"))
+ assertEquals(metadata, new SystemStreamMetadata("non-existent-topic", Map(
+ new Partition(0) -> new SystemStreamPartitionMetadata(null, null, "0")
+ )))
+ }
+
+ class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin("test", brokers) {
+ import kafka.api.{TopicMetadata, TopicMetadataResponse}
+
+ // Simulate Kafka telling us that the leader for the topic is not available
+ override def getTopicMetadata(topics: Set[String]) = {
+ val topicMetadata = TopicMetadata(topic = "quux", partitionsMetadata = Seq(), errorCode = ErrorMapping.LeaderNotAvailableCode)
+ Map("quux" -> topicMetadata)
+ }
+ }
+
+ class CallLimitReached extends Exception
+
+ class MockSleepStrategy(maxCalls: Int) extends ExponentialSleepStrategy {
+ var countCalls = 0
+
+ override def sleep() = {
+ if (countCalls >= maxCalls) throw new CallLimitReached
+ countCalls += 1
+ }
+ }
+
+ @Test
+ def testShouldRetryOnTopicMetadataError {
+ val systemAdmin = new KafkaSystemAdminWithTopicMetadataError
+ val retryBackoff = new MockSleepStrategy(maxCalls = 3)
+ try {
+ systemAdmin.getSystemStreamMetadata(Set("quux"), retryBackoff)
+ } catch {
+ case e: CallLimitReached => () // this would be less ugly if we were using scalatest
+ case e: Throwable => throw e
+ }
+ assertEquals(retryBackoff.countCalls, 3)
+ }
}
\ No newline at end of file