You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Bill Jay <bi...@gmail.com> on 2014/07/08 20:03:09 UTC

Join two Spark Streaming

Hi all,

I am working on a pipeline that needs to join two Spark streams. The input
is a stream of integers. And the output is the number of integer's
appearance divided by the total number of unique integers. Suppose the
input is:

1
2
3
1
2
2

There are 3 unique integers and 1 appears twice. Therefore, the output for
the integer 1 will be:
1 0.67

Since the input is from a stream, it seems we need to first join the
appearance of the integers and the total number of unique integers and then
do a calculation using map. I am thinking of adding a dummy key to both
streams and use join. However, a Cartesian product matches the application
here better. How to do this effectively? Thanks!

Bill

Re: Join two Spark Streaming

Posted by vinay453 <vi...@gmail.com>.
I am working on window dstreams wherein each dstream contains 3 rdd with
following keys:

a,b,c
b,c,d
c,d,e
d,e,f

I want to get only unique keys across all dstream

a,b,c,d,e,f
How to do it in pyspark streaming?



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Join-two-Spark-Streaming-tp9052p27108.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Join two Spark Streaming

Posted by Tathagata Das <ta...@gmail.com>.
1. Since the RDD of the previous batch is used to create the RDD of the
next batch, the lineage of dependencies in the RDDs continues to grow
infinitely. Thats not good because of it increases fault-recover times,
task sizes, etc. Checkpointing saves the data of an RDD to HDFS and
truncates the lineage.


2. The code should have been the following. Sorry about the confusion.

var uniqueValuesRDD: RDD[Int] = ...

dstreamOfIntegers.transform(newDataRDD => {
   val newUniqueValuesRDD  = newDataRDD.union(*uniqueValuesRDD*).distinct
   uniqueValuesRDD = newUniqueValuesRDD

   // periodically call uniqueValuesRDD.checkpoint()

   val uniqueCount = uniqueValuesRDD.count()
   newDataRDD.map(x => x / count)
})




On Fri, Jul 11, 2014 at 12:10 AM, Bill Jay <bi...@gmail.com>
wrote:

> Hi Tathagata,
>
> Thanks for the solution. Actually, I will use the number of unique
> integers in the batch instead of accumulative number of unique integers.
>
> I do have two questions about your code:
>
> 1. Why do we need uniqueValuesRDD?  Why do we need to call
> uniqueValuesRDD.checkpoint()?
>
> 2. Where is distinctValues defined?
>
> Bill
>
>
> On Thu, Jul 10, 2014 at 8:46 PM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
>> Do you want to continuously maintain the set of unique integers seen
>> since the beginning of stream?
>>
>> var uniqueValuesRDD: RDD[Int] = ...
>>
>> dstreamOfIntegers.transform(newDataRDD => {
>>    val newUniqueValuesRDD  = newDataRDD.union(distinctValues).distinct
>>    uniqueValuesRDD = newUniqueValuesRDD
>>
>>    // periodically call uniqueValuesRDD.checkpoint()
>>
>>    val uniqueCount = uniqueValuesRDD.count()
>>    newDataRDD.map(x => x / count)
>> })
>>
>>
>>
>>
>>
>> On Tue, Jul 8, 2014 at 11:03 AM, Bill Jay <bi...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I am working on a pipeline that needs to join two Spark streams. The
>>> input is a stream of integers. And the output is the number of integer's
>>> appearance divided by the total number of unique integers. Suppose the
>>> input is:
>>>
>>> 1
>>> 2
>>> 3
>>> 1
>>> 2
>>> 2
>>>
>>> There are 3 unique integers and 1 appears twice. Therefore, the output
>>> for the integer 1 will be:
>>> 1 0.67
>>>
>>> Since the input is from a stream, it seems we need to first join the
>>> appearance of the integers and the total number of unique integers and then
>>> do a calculation using map. I am thinking of adding a dummy key to both
>>> streams and use join. However, a Cartesian product matches the application
>>> here better. How to do this effectively? Thanks!
>>>
>>> Bill
>>>
>>
>>
>

Re: Join two Spark Streaming

Posted by Bill Jay <bi...@gmail.com>.
Hi Tathagata,

Thanks for the solution. Actually, I will use the number of unique integers
in the batch instead of accumulative number of unique integers.

I do have two questions about your code:

1. Why do we need uniqueValuesRDD?  Why do we need to call
uniqueValuesRDD.checkpoint()?

2. Where is distinctValues defined?

Bill


On Thu, Jul 10, 2014 at 8:46 PM, Tathagata Das <ta...@gmail.com>
wrote:

> Do you want to continuously maintain the set of unique integers seen since
> the beginning of stream?
>
> var uniqueValuesRDD: RDD[Int] = ...
>
> dstreamOfIntegers.transform(newDataRDD => {
>    val newUniqueValuesRDD  = newDataRDD.union(distinctValues).distinct
>    uniqueValuesRDD = newUniqueValuesRDD
>
>    // periodically call uniqueValuesRDD.checkpoint()
>
>    val uniqueCount = uniqueValuesRDD.count()
>    newDataRDD.map(x => x / count)
> })
>
>
>
>
>
> On Tue, Jul 8, 2014 at 11:03 AM, Bill Jay <bi...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> I am working on a pipeline that needs to join two Spark streams. The
>> input is a stream of integers. And the output is the number of integer's
>> appearance divided by the total number of unique integers. Suppose the
>> input is:
>>
>> 1
>> 2
>> 3
>> 1
>> 2
>> 2
>>
>> There are 3 unique integers and 1 appears twice. Therefore, the output
>> for the integer 1 will be:
>> 1 0.67
>>
>> Since the input is from a stream, it seems we need to first join the
>> appearance of the integers and the total number of unique integers and then
>> do a calculation using map. I am thinking of adding a dummy key to both
>> streams and use join. However, a Cartesian product matches the application
>> here better. How to do this effectively? Thanks!
>>
>> Bill
>>
>
>

Re: Join two Spark Streaming

Posted by Tathagata Das <ta...@gmail.com>.
Do you want to continuously maintain the set of unique integers seen since
the beginning of stream?

var uniqueValuesRDD: RDD[Int] = ...

dstreamOfIntegers.transform(newDataRDD => {
   val newUniqueValuesRDD  = newDataRDD.union(distinctValues).distinct
   uniqueValuesRDD = newUniqueValuesRDD

   // periodically call uniqueValuesRDD.checkpoint()

   val uniqueCount = uniqueValuesRDD.count()
   newDataRDD.map(x => x / count)
})





On Tue, Jul 8, 2014 at 11:03 AM, Bill Jay <bi...@gmail.com>
wrote:

> Hi all,
>
> I am working on a pipeline that needs to join two Spark streams. The input
> is a stream of integers. And the output is the number of integer's
> appearance divided by the total number of unique integers. Suppose the
> input is:
>
> 1
> 2
> 3
> 1
> 2
> 2
>
> There are 3 unique integers and 1 appears twice. Therefore, the output for
> the integer 1 will be:
> 1 0.67
>
> Since the input is from a stream, it seems we need to first join the
> appearance of the integers and the total number of unique integers and then
> do a calculation using map. I am thinking of adding a dummy key to both
> streams and use join. However, a Cartesian product matches the application
> here better. How to do this effectively? Thanks!
>
> Bill
>