You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jg...@apache.org on 2013/11/11 20:07:43 UTC
git commit: SAMZA-83: Discard rather than throw exception for Kafka
topics with non-existent partitions
Updated Branches:
refs/heads/master ac11fd92d -> 49134a47b
SAMZA-83: Discard rather than throw exception for Kafka topics with
non-existent partitions
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/49134a47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/49134a47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/49134a47
Branch: refs/heads/master
Commit: 49134a47bc6916c1ab705ac21b8a97d11d211c8c
Parents: ac11fd9
Author: Jakob Glen Homan <jg...@apache.org>
Authored: Mon Nov 11 11:03:26 2013 -0800
Committer: Jakob Glen Homan <jg...@apache.org>
Committed: Mon Nov 11 11:03:26 2013 -0800
----------------------------------------------------------------------
.../org/apache/samza/system/kafka/KafkaSystemConsumer.scala | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/49134a47/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index b8e17ce..9b83259 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -38,6 +38,7 @@ import org.apache.samza.util.BlockingEnvelopeMap
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.system.IncomingMessageEnvelope
import java.nio.charset.Charset
+import kafka.api.PartitionMetadata
object KafkaSystemConsumer {
def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = {
@@ -117,8 +118,7 @@ private[kafka] class KafkaSystemConsumer(
val brokerOption = partitionMetadata(topicAndPartition.topic)
.partitionsMetadata
.find(_.partitionId == topicAndPartition.partition)
- .getOrElse(toss("Can't find leader for %s" format topicAndPartition))
- .leader
+ .flatMap(_.leader)
brokerOption match {
case Some(broker) =>
@@ -127,7 +127,7 @@ private[kafka] class KafkaSystemConsumer(
})
brokerProxy.addTopicPartition(topicAndPartition, lastOffset)
- case _ => warn("Broker for %s not defined! " format topicAndPartition)
+ case _ => warn("No such topic-partition: %s, dropping." format topicAndPartition)
}
}