You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jg...@apache.org on 2013/11/11 20:07:43 UTC

git commit: SAMZA-83: Discard rather than throw exception for Kafka topics with non-existent partitions

Updated Branches:
  refs/heads/master ac11fd92d -> 49134a47b


SAMZA-83: Discard rather than throw exception for Kafka topics with
non-existent partitions


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/49134a47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/49134a47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/49134a47

Branch: refs/heads/master
Commit: 49134a47bc6916c1ab705ac21b8a97d11d211c8c
Parents: ac11fd9
Author: Jakob Glen Homan <jg...@apache.org>
Authored: Mon Nov 11 11:03:26 2013 -0800
Committer: Jakob Glen Homan <jg...@apache.org>
Committed: Mon Nov 11 11:03:26 2013 -0800

----------------------------------------------------------------------
 .../org/apache/samza/system/kafka/KafkaSystemConsumer.scala    | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/49134a47/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index b8e17ce..9b83259 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -38,6 +38,7 @@ import org.apache.samza.util.BlockingEnvelopeMap
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.system.IncomingMessageEnvelope
 import java.nio.charset.Charset
+import kafka.api.PartitionMetadata
 
 object KafkaSystemConsumer {
   def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = {
@@ -117,8 +118,7 @@ private[kafka] class KafkaSystemConsumer(
             val brokerOption = partitionMetadata(topicAndPartition.topic)
               .partitionsMetadata
               .find(_.partitionId == topicAndPartition.partition)
-              .getOrElse(toss("Can't find leader for %s" format topicAndPartition))
-              .leader
+              .flatMap(_.leader)
 
             brokerOption match {
               case Some(broker) =>
@@ -127,7 +127,7 @@ private[kafka] class KafkaSystemConsumer(
                 })
 
                 brokerProxy.addTopicPartition(topicAndPartition, lastOffset)
-              case _ => warn("Broker for %s not defined! " format topicAndPartition)
+              case _ => warn("No such topic-partition: %s, dropping." format topicAndPartition)
             }
         }