You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2015/12/03 14:47:11 UTC
[jira] [Resolved] (SPARK-12103) KafkaUtils createStream with
multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen resolved SPARK-12103.
-------------------------------
Resolution: Not A Problem
> KafkaUtils createStream with multiple topics -- does not work as expected
> -------------------------------------------------------------------------
>
> Key: SPARK-12103
> URL: https://issues.apache.org/jira/browse/SPARK-12103
> Project: Spark
> Issue Type: Improvement
> Components: Documentation, Streaming
> Affects Versions: 1.4.1
> Reporter: Dan Dutrow
> Priority: Minor
> Fix For: 1.4.2
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the easiest thing to get started with. The Kafka Receiver API still needs to work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case where you would want to use one (or a few) Kafka Streaming Receiver to pool resources. I have 10+ topics and don't want to dedicate 10 cores to processing all of them. However, when reading the data procuced by KafkaUtils.createStream, the DStream[(String,String)] does not properly insert the topic name into the tuple. The left-key always null, making it impossible to know what topic that data came from other than stashing your key into the value. Is there a way around that problem?
> //// CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( i =>
> KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
> ssc, consumerProperties,
> topics,
> StorageLevel.MEMORY_ONLY_SER))
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
> val (key, value) = x
> // key is always null!
> // value has data from any one of my topics
> key match ... {
> ......
> }
> }
> //// END CODE
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org