You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/03/06 22:10:41 UTC

git commit: SAMZA-169; retry on topic metadata refresh failure in kafka system admin. was ignoring, and losing metadata.

Repository: incubator-samza
Updated Branches:
  refs/heads/master 12594fb71 -> 38e828832


SAMZA-169; retry on topic metadata refresh failure in kafka system admin. was ignoring, and losing metadata.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/38e82883
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/38e82883
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/38e82883

Branch: refs/heads/master
Commit: 38e82883236de40a544d9f7a18cf49c7238dc258
Parents: 12594fb
Author: Martin Kleppmann <ma...@rapportive.com>
Authored: Thu Mar 6 13:10:37 2014 -0800
Committer: Chris Riccomini <cr...@criccomi-ld.linkedin.biz>
Committed: Thu Mar 6 13:10:37 2014 -0800

----------------------------------------------------------------------
 .../util/TestExponentialSleepStrategy.scala     |  1 -
 .../samza/system/kafka/KafkaSystemAdmin.scala   | 34 +++++++------
 .../system/kafka/TestKafkaSystemAdmin.scala     | 50 ++++++++++++++++++--
 3 files changed, 66 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/38e82883/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
index 962ca40..3036da9 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
@@ -23,7 +23,6 @@ package org.apache.samza.util
 
 import org.junit.Assert._
 import org.junit.Test
-import org.apache.samza.util.ExponentialSleepStrategy
 
 class TestExponentialSleepStrategy {
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/38e82883/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index ad5f2fa..5325549 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -109,6 +109,9 @@ class KafkaSystemAdmin(
 
   import KafkaSystemAdmin._
 
+  def getSystemStreamMetadata(streams: java.util.Set[String]) =
+    getSystemStreamMetadata(streams, new ExponentialSleepStrategy(initialDelayMs = 500))
+
   /**
    * Given a set of stream names (topics), fetch metadata from Kafka for each
    * stream, and return a map from stream name to SystemStreamMetadata for
@@ -116,14 +119,13 @@ class KafkaSystemAdmin(
    * if a given SystemStreamPartition is empty. This method will block and
    * retry indefinitely until it gets a successful response from Kafka.
    */
-  def getSystemStreamMetadata(streams: java.util.Set[String]) = {
+  def getSystemStreamMetadata(streams: java.util.Set[String], retryBackoff: ExponentialSleepStrategy) = {
     var partitions = Map[String, Set[Partition]]()
     var oldestOffsets = Map[SystemStreamPartition, String]()
     var newestOffsets = Map[SystemStreamPartition, String]()
     var upcomingOffsets = Map[SystemStreamPartition, String]()
     var done = false
     var consumer: SimpleConsumer = null
-    val retryBackoff = new ExponentialSleepStrategy(initialDelayMs = 500)
 
     debug("Fetching offsets for: %s" format streams)
 
@@ -189,7 +191,7 @@ class KafkaSystemAdmin(
    * Helper method to use topic metadata cache when fetching metadata, so we
    * don't hammer Kafka more than we need to.
    */
-  private def getTopicMetadata(topics: Set[String]) = {
+  protected def getTopicMetadata(topics: Set[String]) = {
     new ClientUtilTopicMetadataStore(brokerListString, clientId)
       .getTopicInfo(topics)
   }
@@ -202,17 +204,21 @@ class KafkaSystemAdmin(
     val brokersToTopicPartitions = metadata
       .values
       // Convert the topic metadata to a Seq[(Broker, TopicAndPartition)] 
-      .flatMap(topicMetadata => topicMetadata
-        .partitionsMetadata
-        // Convert Seq[PartitionMetadata] to Seq[(Broker, TopicAndPartition)]
-        .map(partitionMetadata => {
-          ErrorMapping.maybeThrowException(partitionMetadata.errorCode)
-          val topicAndPartition = new TopicAndPartition(topicMetadata.topic, partitionMetadata.partitionId)
-          val leader = partitionMetadata
-            .leader
-            .getOrElse(throw new SamzaException("Need leaders for all partitions when fetching offsets. No leader available for TopicAndPartition: %s" format topicAndPartition))
-          (leader, topicAndPartition)
-        }))
+      .flatMap(topicMetadata => {
+        ErrorMapping.maybeThrowException(topicMetadata.errorCode)
+        topicMetadata
+          .partitionsMetadata
+          // Convert Seq[PartitionMetadata] to Seq[(Broker, TopicAndPartition)]
+          .map(partitionMetadata => {
+            ErrorMapping.maybeThrowException(partitionMetadata.errorCode)
+            val topicAndPartition = new TopicAndPartition(topicMetadata.topic, partitionMetadata.partitionId)
+            val leader = partitionMetadata
+              .leader
+              .getOrElse(throw new SamzaException("Need leaders for all partitions when fetching offsets. No leader available for TopicAndPartition: %s" format topicAndPartition))
+            (leader, topicAndPartition)
+          })
+      })
+
       // Convert to a Map[Broker, Seq[(Broker, TopicAndPartition)]]
       .groupBy(_._1)
       // Convert to a Map[Broker, Set[TopicAndPartition]]

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/38e82883/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index aaf11d0..eaa9e53 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -46,7 +46,9 @@ import kafka.consumer.Consumer
 import kafka.consumer.ConsumerConfig
 import java.util.Properties
 import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.SystemStreamMetadata
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.util.ExponentialSleepStrategy
 
 object TestKafkaSystemAdmin {
   val TOPIC = "input"
@@ -205,10 +207,6 @@ class TestKafkaSystemAdmin {
     val systemName = "test"
     val systemAdmin = new KafkaSystemAdmin(systemName, brokers)
 
-    // Get a non-existent topic.
-    val initialInputOffsets = systemAdmin.getSystemStreamMetadata(Set("foo"))
-    assertEquals(0, initialInputOffsets.size)
-
     // Create an empty topic with 50 partitions, but with no offsets.
     createTopic
     validateTopic(50)
@@ -269,4 +267,48 @@ class TestKafkaSystemAdmin {
     assertEquals(sspMetadata.get(new Partition(48)).getNewestOffset, message.offset.toString)
     assertEquals("val2", text)
   }
+
+  @Test
+  def testNonExistentTopic {
+    val systemAdmin = new KafkaSystemAdmin("test", brokers)
+    val initialOffsets = systemAdmin.getSystemStreamMetadata(Set("non-existent-topic"))
+    val metadata = initialOffsets.getOrElse("non-existent-topic", fail("missing metadata"))
+    assertEquals(metadata, new SystemStreamMetadata("non-existent-topic", Map(
+      new Partition(0) -> new SystemStreamPartitionMetadata(null, null, "0")
+    )))
+  }
+
+  class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin("test", brokers) {
+    import kafka.api.{TopicMetadata, TopicMetadataResponse}
+
+    // Simulate Kafka telling us that the leader for the topic is not available
+    override def getTopicMetadata(topics: Set[String]) = {
+      val topicMetadata = TopicMetadata(topic = "quux", partitionsMetadata = Seq(), errorCode = ErrorMapping.LeaderNotAvailableCode)
+      Map("quux" -> topicMetadata)
+    }
+  }
+
+  class CallLimitReached extends Exception
+
+  class MockSleepStrategy(maxCalls: Int) extends ExponentialSleepStrategy {
+    var countCalls = 0
+  
+    override def sleep() = {
+      if (countCalls >= maxCalls) throw new CallLimitReached
+      countCalls += 1
+    }
+  }
+
+  @Test
+  def testShouldRetryOnTopicMetadataError {
+    val systemAdmin = new KafkaSystemAdminWithTopicMetadataError
+    val retryBackoff = new MockSleepStrategy(maxCalls = 3)
+    try {
+      systemAdmin.getSystemStreamMetadata(Set("quux"), retryBackoff)
+    } catch {
+      case e: CallLimitReached => () // this would be less ugly if we were using scalatest
+      case e: Throwable => throw e
+    }
+    assertEquals(retryBackoff.countCalls, 3)
+  }
 }
\ No newline at end of file