You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2016/11/26 08:11:58 UTC

[jira] [Resolved] (SPARK-18525) Kafka DirectInputStream cannot be aware of new partition

     [ https://issues.apache.org/jira/browse/SPARK-18525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean Owen resolved SPARK-18525.
-------------------------------
    Resolution: Won't Fix

> Kafka DirectInputStream cannot be aware of new partition
> --------------------------------------------------------
>
>                 Key: SPARK-18525
>                 URL: https://issues.apache.org/jira/browse/SPARK-18525
>             Project: Spark
>          Issue Type: Improvement
>          Components: Input/Output
>    Affects Versions: 2.0.2
>            Reporter: Zhiwen Sun
>
> It seems that DirectKafkaInputStream does not support read new partition when spark streaming is running.
> Related spark code:
> https://github.com/apache/spark/blob/v2.0.2/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala#L101
> How to produce it:
> {code:title=KafkaDirectTest.scala|borderStyle=solid}
> object KafkaDirectTest {
>   def main(args: Array[String]) {
>     val conf = new SparkConf().setAppName("kafka direct test 5")
>     conf.setIfMissing("spark.master", "local[3]")
>     conf.set("spark.streaming.kafka.maxRatePerPartition", "10")
>     val ssc = new StreamingContext(conf, Seconds(1))
>     val zkQuorum = Config("common").getString("kafka.zkquorum")
>     val topic = "test_use"
>     val groupId = "stream-test-0809"
>     val kafkaParams = Map(
>       "metadata.broker.list" -> "dev-002:9092,dev-004:9092",
>       "group.id" -> groupId
>     )
>     val fromOffsets: Map[TopicAndPartition, Long] = Map(
>       new TopicAndPartition(topic, 0) -> 0L,
>       new TopicAndPartition(topic, 1) -> 0L,
>       new TopicAndPartition(topic, 2) -> 0L,
>       new TopicAndPartition(topic, 3) -> 0L
>     )
>     val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
>     val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic))
>     lines.foreachRDD { rdd =>
>       rdd.foreach { row =>
>         println(s"\n row: ${row} ")
>       }
>       val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>       offsetRanges.foreach { offset =>
>         println(s"\n----- offset: ${offset.topic} ${offset.partition} ${offset.fromOffset} ${offset.untilOffset}")
>       }
>     }
>     ssc.start()
>     ssc.awaitTermination()
>   }
> }
> {code}
> 1. start the job
> 2. add new partition of test_use topic
> The job cannot read new partition data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org