You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dan Dutrow (JIRA)" <ji...@apache.org> on 2015/12/02 23:39:10 UTC

[jira] [Comment Edited] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected

    [ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15036625#comment-15036625 ] 

Dan Dutrow edited comment on SPARK-12103 at 12/2/15 10:38 PM:
--------------------------------------------------------------

After digging into the Kafka code some more (specifically kafka.consumer.KafkaStream, kafka.consumer.ConsumerIterator and kafka.message.MessageAndMetadata), it appears that the Left value of the tuple is not the topic name but rather a key that Kafka puts on each message. See http://kafka.apache.org/documentation.html#impl_producer

I don't see a way around this without hacking KafkaStream and ConsumerIterator to return the topic name instead of the message key.

The return value should probably be clarified in the documentation.


was (Author: dutrow):
After digging into the Kafka code some more (specifically kafka.consumer.KafkaStream, kafka.consumer.ConsumerIterator and kafka.message.MessageAndMetadata), it appears that the Left value of the tuple is not the topic name but rather a key that Kafka puts on each message. See http://kafka.apache.org/documentation.html#compaction

I don't see a way around this without hacking KafkaStream and ConsumerIterator to return the topic name instead of the message key.

The return value should probably be clarified in the documentation.

> KafkaUtils createStream with multiple topics -- does not work as expected
> -------------------------------------------------------------------------
>
>                 Key: SPARK-12103
>                 URL: https://issues.apache.org/jira/browse/SPARK-12103
>             Project: Spark
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 1.0.0, 1.1.0, 1.2.0, 1.3.0, 1.4.0, 1.4.1
>            Reporter: Dan Dutrow
>             Fix For: 1.4.2
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the easiest thing to get started with. The Kafka Receiver API still needs to work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case where you would want to use one (or a few) Kafka Streaming Receiver to pool resources. I have 10+ topics and don't want to dedicate 10 cores to processing all of them. However, when reading the data procuced by KafkaUtils.createStream, the DStream[(String,String)] does not properly insert the topic name into the tuple. The left-key always null, making it impossible to know what topic that data came from other than stashing your key into the value.  Is there a way around that problem?
> //// CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( i =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
>     ssc, consumerProperties,
>     topics,
>     StorageLevel.MEMORY_ONLY_SER))
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
>    val (key, value) = x
>   // key is always null!
>   // value has data from any one of my topics
>   key match ... {
>       ......
>   }
> }
> //// END CODE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org