You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2015/12/08 12:04:11 UTC

[jira] [Updated] (SPARK-12103) Clarify documentation of KafkaUtils createStream with multiple topics

     [ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean Owen updated SPARK-12103:
------------------------------
    Summary: Clarify documentation of KafkaUtils createStream with multiple topics  (was: KafkaUtils createStream with multiple topics -- does not work as expected)

> Clarify documentation of KafkaUtils createStream with multiple topics
> ---------------------------------------------------------------------
>
>                 Key: SPARK-12103
>                 URL: https://issues.apache.org/jira/browse/SPARK-12103
>             Project: Spark
>          Issue Type: Improvement
>          Components: Documentation, Streaming
>    Affects Versions: 1.4.1
>            Reporter: Dan Dutrow
>            Assignee: Cody Koeninger
>            Priority: Minor
>             Fix For: 1.6.1, 2.0.0
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the easiest thing to get started with. The Kafka Receiver API still needs to work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case where you would want to use one (or a few) Kafka Streaming Receiver to pool resources. I have 10+ topics and don't want to dedicate 10 cores to processing all of them. However, when reading the data procuced by KafkaUtils.createStream, the DStream[(String,String)] does not properly insert the topic name into the tuple. The left-key always null, making it impossible to know what topic that data came from other than stashing your key into the value.  Is there a way around that problem?
> //// CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( i =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
>     ssc, consumerProperties,
>     topics,
>     StorageLevel.MEMORY_ONLY_SER))
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
>    val (key, value) = x
>   // key is always null!
>   // value has data from any one of my topics
>   key match ... {
>       ......
>   }
> }
> //// END CODE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org