You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by vidhan <vi...@kitboard.co> on 2016/08/17 20:40:00 UTC

How to combine two DStreams(pyspark)?

I have a *kafka* stream coming in with some input topic.
This is the code i wrote for accepting *kafka* stream.

*>>> conf = SparkConf().setAppName(appname)
>>> sc = SparkContext(conf=conf)
>>> ssc = StreamingContext(sc)
>>> kvs = KafkaUtils.createDirectStream(ssc, topics,\
                {"metadata.broker.list": brokers})*

Then I create two DStreams of the keys and values of the original stream.

*>>> keys = kvs.map(lambda x: x[0].split(" "))
>>> values = kvs.map(lambda x: x[1].split(" "))*

Then I perform some computation in the values DStream.
For Example,
*>>> val = values.flatMap(lambda x: x*2)*

Now, I need to combine the */keys/* and the */val/* *DStream* and return the
result in the form of *Kafka* stream.

How to combine val to the corressponding key?



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-combine-two-DStreams-pyspark-tp27552.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: How to combine two DStreams(pyspark)?

Posted by ayan guha <gu...@gmail.com>.
Wondering why are you creating separate dstreams? You should apply the
logic directly on input dstream
On 18 Aug 2016 06:40, "vidhan" <vi...@kitboard.co> wrote:

> I have a *kafka* stream coming in with some input topic.
> This is the code i wrote for accepting *kafka* stream.
>
> *>>> conf = SparkConf().setAppName(appname)
> >>> sc = SparkContext(conf=conf)
> >>> ssc = StreamingContext(sc)
> >>> kvs = KafkaUtils.createDirectStream(ssc, topics,\
>                 {"metadata.broker.list": brokers})*
>
> Then I create two DStreams of the keys and values of the original stream.
>
> *>>> keys = kvs.map(lambda x: x[0].split(" "))
> >>> values = kvs.map(lambda x: x[1].split(" "))*
>
> Then I perform some computation in the values DStream.
> For Example,
> *>>> val = values.flatMap(lambda x: x*2)*
>
> Now, I need to combine the */keys/* and the */val/* *DStream* and return
> the
> result in the form of *Kafka* stream.
>
> How to combine val to the corressponding key?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/How-to-combine-two-DStreams-pyspark-tp27552.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>