You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sanjay Awatramani <sa...@yahoo.com> on 2014/03/20 06:20:52 UTC

Relation between DStream and RDDs

Hi,

As I understand, a DStream consists of 1 or more RDDs. And foreachRDD will run a given func on each and every RDD inside a DStream.

I created a simple program which reads log files from a folder every hour:
JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new Duration(60 * 60 * 1000)); //1 hour
JavaDStream<String> obj = stcObj.textFileStream("/Users/path/to/Input");

When the interval is reached, Spark reads all the files and creates one and only one RDD (as i verified from a sysout inside foreachRDD).

The streaming doc at a lot of places gives an indication that many operations (e.g. flatMap) on a DStream are applied individually to a RDD and the resulting DStream consists of the mapped RDDs in the same number as the input DStream.
ref: https://spark.apache.org/docs/latest/streaming-programming-guide.html#dstreams

If that is the case, how can i generate a scenario where in I have multiple RDDs inside a DStream in my example ?

Regards,
Sanjay

Re: Relation between DStream and RDDs

Posted by Azuryy <az...@gmail.com>.
Thanks for sharing here.

Sent from my iPhone5s

> On 2014年3月21日, at 20:44, Sanjay Awatramani <sa...@yahoo.com> wrote:
> 
> Hi,
> 
> I searched more articles and ran few examples and have clarified my doubts. This answer by TD in another thread ( https://groups.google.com/d/msg/spark-users/GQoxJHAAtX4/0kiRX0nm1xsJ ) helped me a lot.
> 
> Here is the summary of my finding:
> 1) A DStream can consist of 0 or 1 or more RDDs.
> 2) Even if you have multiple files to be read in a time interval, DStream will have only 1 RDD.
> 3) Functions like reduce & count return as many no. of RDDs as there were in the input DStream. However the internal computation in every batch will have only 1 RDD, so these functions will return 1 RDD in the returned DStream. However if you are using window functions to get more RDDs, and run reduce/count on the windowed DStream, your returned DStream will have more than 1 RDD.
> 
> Hope this helps someone.
> Thanks everyone for the answers.
> 
> Regards,
> Sanjay
> 
> 
> On Thursday, 20 March 2014 9:30 PM, andy petrella <an...@gmail.com> wrote:
> Don't see an example, but conceptually it looks like you'll need an according structure like a Monoid. I mean, because if it's not tied to a window, it's an overall computation that has to be increased over time (otherwise it would land in the batch world see after) and that will be the purpose of Monoid, and specially probabilistic sets (avoid sucking the whole memory).
> 
> If it falls in the batch job's world because you have enough information encapsulated in one conceptual RDD, it might be helpful to have DStream storing it in hdfs, then using the SparkContext within the StreaminContext to run a batch job on the data.
> 
> But I'm only thinking out of "loud", so I might be completely wrong.
> 
> hth
> 
> Andy Petrella
> Belgium (Liège)
>        ********
>  Data Engineer in NextLab sprl (owner)
>  Engaged Citizen Coder for WAJUG (co-founder)
>  Author of Learning Play! Framework 2
>  Bio: on visify
>        ********
> Mobile: +32 495 99 11 04
> Mails:  
> andy.petrella@nextlab.be
> andy.petrella@gmail.com
>        ********
> Socials:
> Twitter: https://twitter.com/#!/noootsab
> LinkedIn: http://be.linkedin.com/in/andypetrella
> Blogger: http://ska-la.blogspot.com/
> GitHub:  https://github.com/andypetrella
> Masterbranch: https://masterbranch.com/andy.petrella
> 
> 
> On Thu, Mar 20, 2014 at 12:18 PM, Pascal Voitot Dev <pa...@gmail.com> wrote:
> 
> 
> 
> On Thu, Mar 20, 2014 at 11:57 AM, andy petrella <an...@gmail.com> wrote:
> also consider creating pairs and use *byKey* operators, and then the key will be the structure that will be used to consolidate or deduplicate your data
> my2c
> 
> 
> One thing I wonder: imagine I want to sub-divide RDDs in a DStream into several RDDs but not according to time window, I don't see any trivial way to do it...
>  
> 
> 
> On Thu, Mar 20, 2014 at 11:50 AM, Pascal Voitot Dev <pa...@gmail.com> wrote:
> Actually it's quite simple...
> 
> DStream[T] is a stream of RDD[T].
> So applying count on DStream is just applying count on each RDD of this DStream.
> So at the end of count, you have a DStream[Int] containing the same number of RDDs as before but each RDD just contains one element being the count result for the corresponding original RDD.
> 
> For reduce, it's the same using reduce operation...
> 
> The only operations that are a bit more complex are reduceByWindow & countByValueAndWindow which union RDD over the time window...
> 
> On Thu, Mar 20, 2014 at 9:51 AM, Sanjay Awatramani <sa...@yahoo.com> wrote:
> @TD: I do not need multiple RDDs in a DStream in every batch. On the contrary my logic would work fine if there is only 1 RDD. But then the description for functions like reduce & count (Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream.) left me confused whether I should account for the fact that a DStream can have multiple RDDs. My streaming code processes a batch every hour. In the 2nd batch, i checked that the DStream contains only 1 RDD, i.e. the 2nd batch's RDD. I verified this using sysout in foreachRDD. Does that mean that the DStream will always contain only 1 RDD ?
> 
> A DStream creates a RDD for each window corresponding to your batch duration (maybe if there are no data in the current time window, no RDD is created but I'm not sure about that)
> So no, there is not one single RDD in a DStream, it just depends on the batch duration and the collected data.
> 
>  
> Is there a way to access the RDD of the 1st batch in the 2nd batch ? The 1st batch may contain some records which were not relevant to the first batch and are to be processed in the 2nd batch. I know i can use the sliding window mechanism of streaming, but if i'm not using it and there is no way to access the previous batch's RDD, then it means that functions like count will always return a DStream containing only 1 RDD, am i correct ?
> 
> 
> count will be executed for each RDD in the dstream as explained above.
> 
> If you want to do operations on several RDD in the same DStream, you should try using reduceByWindow for example to "union" several RDD and perform operations on them. But it really depends on what you want to do and I advise you to test different approaches.
> 
> Maybe other people more skilled than me will have better answers ?
>  
> @Pascal, yes your answer resolves my question partially, but the other part of the question(which i've clarified in above paragraph) still remains.
> 
> Thanks for your answers !
> 
> Regards,
> Sanjay
> 
> 
> On Thursday, 20 March 2014 1:27 PM, Pascal Voitot Dev <pa...@gmail.com> wrote:
> If I may add my contribution to this discussion if I understand well your question...
> 
> DStream is discretized stream. It discretized the data stream over windows of time (according to the project code I've read and paper too). so when you write:
> 
> JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new Duration(60 * 60 * 1000)); //1 hour
> 
> It means you are discretizing over a 1h window. Each batch so each RDD of the dstream will collect data for 1h before going to next RDD.
> So if you want to have more RDD, you should reduce batch size/duration...
> 
> Pascal
> 
> 
> On Thu, Mar 20, 2014 at 7:51 AM, Tathagata Das <ta...@gmail.com> wrote:
> That is a good question. If I understand correctly, you need multiple RDDs from a DStream in *every batch*. Can you elaborate on why do you need multiple RDDs every batch?
> 
> TD
> 
> 
> On Wed, Mar 19, 2014 at 10:20 PM, Sanjay Awatramani <sa...@yahoo.com> wrote:
> Hi,
> 
> As I understand, a DStream consists of 1 or more RDDs. And foreachRDD will run a given func on each and every RDD inside a DStream.
> 
> I created a simple program which reads log files from a folder every hour:
> JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new Duration(60 * 60 * 1000)); //1 hour
> JavaDStream<String> obj = stcObj.textFileStream("/Users/path/to/Input");
> 
> When the interval is reached, Spark reads all the files and creates one and only one RDD (as i verified from a sysout inside foreachRDD).
> 
> The streaming doc at a lot of places gives an indication that many operations (e.g. flatMap) on a DStream are applied individually to a RDD and the resulting DStream consists of the mapped RDDs in the same number as the input DStream.
> ref: https://spark.apache.org/docs/latest/streaming-programming-guide.html#dstreams
> 
> If that is the case, how can i generate a scenario where in I have multiple RDDs inside a DStream in my example ?
> 
> Regards,
> Sanjay
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 

Re: Relation between DStream and RDDs

Posted by Sanjay Awatramani <sa...@yahoo.com>.
Hi,

I searched more articles and ran few examples and have clarified my doubts. This answer by TD in another thread ( https://groups.google.com/d/msg/spark-users/GQoxJHAAtX4/0kiRX0nm1xsJ ) helped me a lot.

Here is the summary of my finding:
1) A DStream can consist of 0 or 1 or more RDDs.
2) Even if you have multiple files to be read in a time interval, DStream will have only 1 RDD.
3) Functions like reduce & count return as many no. of RDDs as there were in the input DStream. However the internal computation in every batch will have only 1 RDD, so these functions will return 1 RDD in the returned DStream. However if you are using window functions to get more RDDs, and run reduce/count on the windowed DStream, your returned DStream will have more than 1 RDD.

Hope this helps someone.
Thanks everyone for the answers.

Regards,
Sanjay



On Thursday, 20 March 2014 9:30 PM, andy petrella <an...@gmail.com> wrote:
 
Don't see an example, but conceptually it looks like you'll need an according structure like a Monoid. I mean, because if it's not tied to a window, it's an overall computation that has to be increased over time (otherwise it would land in the batch world see after) and that will be the purpose of Monoid, and specially probabilistic sets (avoid sucking the whole memory).

If it falls in the batch job's world because you have enough information encapsulated in one conceptual RDD, it might be helpful to have DStream storing it in hdfs, then using the SparkContext within the StreaminContext to run a batch job on the data.

But I'm only thinking out of "loud", so I might be completely wrong.

hth


Andy Petrella

Belgium (Liège)

       ********

 Data Engineer in NextLab sprl (owner)
 Engaged Citizen Coder for WAJUG (co-founder)
 Author of Learning Play! Framework 2

 Bio: on visify
       ********

Mobile: +32 495 99 11 04
Mails:  
	* andy.petrella@nextlab.be
	* andy.petrella@gmail.com
       ********

Socials:
	* Twitter: https://twitter.com/#!/noootsab

	* LinkedIn: http://be.linkedin.com/in/andypetrella
	* Blogger: http://ska-la.blogspot.com/
	* GitHub:  https://github.com/andypetrella
	* Masterbranch: https://masterbranch.com/andy.petrella


On Thu, Mar 20, 2014 at 12:18 PM, Pascal Voitot Dev <pa...@gmail.com> wrote:


>
>
>
>
>On Thu, Mar 20, 2014 at 11:57 AM, andy petrella <an...@gmail.com> wrote:
>
>also consider creating pairs and use *byKey* operators, and then the key will be the structure that will be used to consolidate or deduplicate your data
>>my2c
>>
>>
>
>
>One thing I wonder: imagine I want to sub-divide RDDs in a DStream into several RDDs but not according to time window, I don't see any trivial way to do it...
>
> 
>
>>
>>
>>On Thu, Mar 20, 2014 at 11:50 AM, Pascal Voitot Dev <pa...@gmail.com> wrote:
>>
>>Actually it's quite simple...
>>>
>>>DStream[T] is a stream of RDD[T].
>>>So applying count on DStream is just applying count on each RDD of this DStream.
>>>So at the end of count, you have a DStream[Int] containing the same number of RDDs as before but each RDD just contains one element being the count result for the corresponding original RDD.
>>>
>>>
>>>
>>>For reduce, it's the same using reduce operation...
>>>
>>>The only operations that are a bit more complex are reduceByWindow & countByValueAndWindow which union RDD over the time window...
>>>
>>>
>>>
>>>On Thu, Mar 20, 2014 at 9:51 AM, Sanjay Awatramani <sa...@yahoo.com> wrote:
>>>
>>>@TD: I do not need multiple RDDs in a DStream in every batch. On the contrary my logic would work fine if there is only 1 RDD. But then the description for functions like reduce & count (Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream.) left me confused whether I should account for the fact that a DStream can have multiple RDDs. My streaming code processes a batch every hour. In the 2nd batch, i checked that the DStream contains only 1 RDD, i.e. the 2nd batch's RDD. I verified this using sysout in foreachRDD. Does that mean that the DStream will always contain only 1 RDD ? 
>>>
>>>
>>>A DStream creates a RDD for each window corresponding to your batch duration (maybe if there are no data in the current time window, no RDD is created but I'm not sure about that)
>>>So no, there is not one single RDD in a DStream, it just depends on the batch duration and the collected data.
>>>
>>>
>>> 
>>>Is there a way to access the RDD of the 1st batch in the 2nd batch ? The 1st batch may contain some records which were not relevant to the first batch and are to be processed in the 2nd batch. I know i can use the sliding window mechanism of streaming, but if i'm not using it and there is no way to access the previous batch's RDD, then it means that functions like count will always return a DStream containing only 1 RDD, am i correct ?
>>>>
>>>>
>>>
>>>
>>>count will be executed for each RDD in the dstream as explained above.
>>>
>>>
>>>If you want to do operations on several RDD in the same DStream, you should try using reduceByWindow for example to "union" several RDD and perform operations on them. But it really depends on what you want to do and I advise you to test different approaches.
>>>
>>>
>>>Maybe other people more skilled than me will have better answers ?
>>>
>>> 
>>>@Pascal, yes your answer resolves my question partially, but the other part of the question(which i've clarified in above paragraph) still remains.
>>>>
>>>>
>>>>Thanks for your answers !
>>>>
>>>>
>>>>Regards,
>>>>Sanjay
>>>>
>>>>
>>>>
>>>>On Thursday, 20 March 2014 1:27 PM, Pascal Voitot Dev <pa...@gmail.com> wrote:
>>>> 
>>>>If I may add my contribution to this discussion if I understand well your question...
>>>>
>>>>
>>>>DStream is discretized stream. It discretized the data stream over windows of time (according to the project code I've read and paper too). so when you write:
>>>>
>>>>
>>>>JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new Duration(60 * 60 * 1000)); //1 hour
>>>>
>>>>
>>>>It means you are discretizing over a 1h window. Each batch so each RDD of the dstream will collect data for 1h before going to next RDD.
>>>>
>>>>So if you want to have more RDD, you should reduce batch size/duration...
>>>>
>>>>
>>>>Pascal
>>>>
>>>>
>>>>
>>>>
>>>>On Thu, Mar 20, 2014 at 7:51 AM, Tathagata Das <ta...@gmail.com> wrote:
>>>>
>>>>That is a good question. If I understand correctly, you need multiple RDDs from a DStream in *every batch*. Can you elaborate on why do you need multiple RDDs every batch?
>>>>>
>>>>>
>>>>>TD
>>>>>
>>>>>
>>>>>
>>>>>On Wed, Mar 19, 2014 at 10:20 PM, Sanjay Awatramani <sa...@yahoo.com> wrote:
>>>>>
>>>>>Hi,
>>>>>>
>>>>>>
>>>>>>As I understand, a DStream consists of 1 or more RDDs. And foreachRDD will run a given func on each and every RDD inside a DStream.
>>>>>>
>>>>>>
>>>>>>I created a simple program which reads log files from a folder every hour:
>>>>>>JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new Duration(60 * 60 * 1000)); //1 hour
>>>>>>JavaDStream<String> obj = stcObj.textFileStream("/Users/path/to/Input");
>>>>>>
>>>>>>
>>>>>>When the interval is reached, Spark reads all the files and creates one and only one RDD (as i verified from a sysout inside foreachRDD).
>>>>>>
>>>>>>
>>>>>>The streaming doc at a lot of places gives an indication that many operations (e.g. flatMap) on a DStream are applied individually to a RDD and the resulting DStream consists of the mapped RDDs in the same number as the input DStream.
>>>>>>ref: https://spark.apache.org/docs/latest/streaming-programming-guide.html#dstreams
>>>>>>
>>>>>>
>>>>>>If that is the case, how can i generate a scenario where in I have multiple RDDs inside a DStream in my example ?
>>>>>>
>>>>>>
>>>>>>Regards,
>>>>>>Sanjay
>>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Re: Relation between DStream and RDDs

Posted by andy petrella <an...@gmail.com>.
Don't see an example, but conceptually it looks like you'll need an
according structure like a Monoid. I mean, because if it's not tied to a
window, it's an overall computation that has to be increased over time
(otherwise it would land in the batch world see after) and that will be the
purpose of Monoid, and specially probabilistic sets (avoid sucking the
whole memory).

If it falls in the batch job's world because you have enough information
encapsulated in one conceptual RDD, it might be helpful to have DStream
storing it in hdfs, then using the SparkContext within the StreaminContext
to run a batch job on the data.

But I'm only thinking out of "loud", so I might be completely wrong.

hth

Andy Petrella
Belgium (Liège)

*       *********
 Data Engineer in *NextLab <http://nextlab.be/> sprl* (owner)
 Engaged Citizen Coder for *WAJUG <http://wajug.be/>* (co-founder)
 Author of *Learning Play! Framework 2
<http://www.packtpub.com/learning-play-framework-2/book>*
 Bio: on visify <https://www.vizify.com/es/52c3feec2163aa0010001eaa>
*       *********
Mobile: *+32 495 99 11 04*
Mails:

   - andy.petrella@nextlab.be
   - andy.petrella@gmail.com

*       *********
Socials:

   - Twitter: https://twitter.com/#!/noootsab
   - LinkedIn: http://be.linkedin.com/in/andypetrella
   - Blogger: http://ska-la.blogspot.com/
   - GitHub:  https://github.com/andypetrella
   - Masterbranch: https://masterbranch.com/andy.petrella



On Thu, Mar 20, 2014 at 12:18 PM, Pascal Voitot Dev <
pascal.voitot.dev@gmail.com> wrote:

>
>
>
> On Thu, Mar 20, 2014 at 11:57 AM, andy petrella <an...@gmail.com>wrote:
>
>> also consider creating pairs and use *byKey* operators, and then the key
>> will be the structure that will be used to consolidate or deduplicate your
>> data
>> my2c
>>
>>
> One thing I wonder: imagine I want to sub-divide RDDs in a DStream into
> several RDDs but not according to time window, I don't see any trivial way
> to do it...
>
>
>>
>>
>> On Thu, Mar 20, 2014 at 11:50 AM, Pascal Voitot Dev <
>> pascal.voitot.dev@gmail.com> wrote:
>>
>>> Actually it's quite simple...
>>>
>>> DStream[T] is a stream of RDD[T].
>>> So applying count on DStream is just applying count on each RDD of this
>>> DStream.
>>> So at the end of count, you have a DStream[Int] containing the same
>>> number of RDDs as before but each RDD just contains one element being the
>>> count result for the corresponding original RDD.
>>>
>>> For reduce, it's the same using reduce operation...
>>>
>>> The only operations that are a bit more complex are reduceByWindow &
>>> countByValueAndWindow which union RDD over the time window...
>>>
>>> On Thu, Mar 20, 2014 at 9:51 AM, Sanjay Awatramani <
>>> sanjay_awat@yahoo.com> wrote:
>>>
>>>> @TD: I do not need multiple RDDs in a DStream in every batch. On the
>>>> contrary my logic would work fine if there is only 1 RDD. But then the
>>>> description for functions like reduce & count (Return a new DStream of
>>>> single-element RDDs by counting the number of elements in each RDD of the
>>>> source DStream.) left me confused whether I should account for the
>>>> fact that a DStream can have multiple RDDs. My streaming code processes a
>>>> batch every hour. In the 2nd batch, i checked that the DStream contains
>>>> only 1 RDD, i.e. the 2nd batch's RDD. I verified this using sysout in
>>>> foreachRDD. Does that mean that the DStream will always contain only 1 RDD
>>>> ?
>>>>
>>>
>>> A DStream creates a RDD for each window corresponding to your batch
>>> duration (maybe if there are no data in the current time window, no RDD is
>>> created but I'm not sure about that)
>>> So no, there is not one single RDD in a DStream, it just depends on the
>>> batch duration and the collected data.
>>>
>>>
>>>
>>>> Is there a way to access the RDD of the 1st batch in the 2nd batch ?
>>>> The 1st batch may contain some records which were not relevant to the first
>>>> batch and are to be processed in the 2nd batch. I know i can use the
>>>> sliding window mechanism of streaming, but if i'm not using it and there is
>>>> no way to access the previous batch's RDD, then it means that functions
>>>> like count will always return a DStream containing only 1 RDD, am i correct
>>>> ?
>>>>
>>>>
>>> count will be executed for each RDD in the dstream as explained above.
>>>
>>> If you want to do operations on several RDD in the same DStream, you
>>> should try using reduceByWindow for example to "union" several RDD and
>>> perform operations on them. But it really depends on what you want to do
>>> and I advise you to test different approaches.
>>>
>>> Maybe other people more skilled than me will have better answers ?
>>>
>>>
>>>>  @Pascal, yes your answer resolves my question partially, but the
>>>> other part of the question(which i've clarified in above paragraph) still
>>>> remains.
>>>>
>>>> Thanks for your answers !
>>>>
>>>> Regards,
>>>> Sanjay
>>>>
>>>>
>>>>   On Thursday, 20 March 2014 1:27 PM, Pascal Voitot Dev <
>>>> pascal.voitot.dev@gmail.com> wrote:
>>>>   If I may add my contribution to this discussion if I understand well
>>>> your question...
>>>>
>>>> DStream is discretized stream. It discretized the data stream over
>>>> windows of time (according to the project code I've read and paper too). so
>>>> when you write:
>>>>
>>>> JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new
>>>> Duration(60 * 60 * 1000)); //1 hour
>>>>
>>>> It means you are discretizing over a 1h window. Each batch so each RDD
>>>> of the dstream will collect data for 1h before going to next RDD.
>>>> So if you want to have more RDD, you should reduce batch
>>>> size/duration...
>>>>
>>>> Pascal
>>>>
>>>>
>>>> On Thu, Mar 20, 2014 at 7:51 AM, Tathagata Das <
>>>> tathagata.das1565@gmail.com> wrote:
>>>>
>>>> That is a good question. If I understand correctly, you need multiple
>>>> RDDs from a DStream in *every batch*. Can you elaborate on why do you need
>>>> multiple RDDs every batch?
>>>>
>>>> TD
>>>>
>>>>
>>>> On Wed, Mar 19, 2014 at 10:20 PM, Sanjay Awatramani <
>>>> sanjay_awat@yahoo.com> wrote:
>>>>
>>>> Hi,
>>>>
>>>> As I understand, a DStream consists of 1 or more RDDs. And foreachRDD
>>>> will run a given func on each and every RDD inside a DStream.
>>>>
>>>> I created a simple program which reads log files from a folder every
>>>> hour:
>>>> JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new
>>>> Duration(60 * 60 * 1000)); //1 hour
>>>> JavaDStream<String> obj = stcObj.textFileStream("/Users/path/to/Input");
>>>>
>>>> When the interval is reached, Spark reads all the files and creates one
>>>> and only one RDD (as i verified from a sysout inside foreachRDD).
>>>>
>>>> The streaming doc at a lot of places gives an indication that many
>>>> operations (e.g. flatMap) on a DStream are applied individually to a RDD
>>>> and the resulting DStream consists of the mapped RDDs in the same number as
>>>> the input DStream.
>>>> ref:
>>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#dstreams
>>>>
>>>> If that is the case, how can i generate a scenario where in I have
>>>> multiple RDDs inside a DStream in my example ?
>>>>
>>>> Regards,
>>>> Sanjay
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Re: Relation between DStream and RDDs

Posted by Pascal Voitot Dev <pa...@gmail.com>.
On Thu, Mar 20, 2014 at 11:57 AM, andy petrella <an...@gmail.com>wrote:

> also consider creating pairs and use *byKey* operators, and then the key
> will be the structure that will be used to consolidate or deduplicate your
> data
> my2c
>
>
One thing I wonder: imagine I want to sub-divide RDDs in a DStream into
several RDDs but not according to time window, I don't see any trivial way
to do it...


>
>
> On Thu, Mar 20, 2014 at 11:50 AM, Pascal Voitot Dev <
> pascal.voitot.dev@gmail.com> wrote:
>
>> Actually it's quite simple...
>>
>> DStream[T] is a stream of RDD[T].
>> So applying count on DStream is just applying count on each RDD of this
>> DStream.
>> So at the end of count, you have a DStream[Int] containing the same
>> number of RDDs as before but each RDD just contains one element being the
>> count result for the corresponding original RDD.
>>
>> For reduce, it's the same using reduce operation...
>>
>> The only operations that are a bit more complex are reduceByWindow &
>> countByValueAndWindow which union RDD over the time window...
>>
>> On Thu, Mar 20, 2014 at 9:51 AM, Sanjay Awatramani <sanjay_awat@yahoo.com
>> > wrote:
>>
>>> @TD: I do not need multiple RDDs in a DStream in every batch. On the
>>> contrary my logic would work fine if there is only 1 RDD. But then the
>>> description for functions like reduce & count (Return a new DStream of
>>> single-element RDDs by counting the number of elements in each RDD of the
>>> source DStream.) left me confused whether I should account for the fact
>>> that a DStream can have multiple RDDs. My streaming code processes a batch
>>> every hour. In the 2nd batch, i checked that the DStream contains only 1
>>> RDD, i.e. the 2nd batch's RDD. I verified this using sysout in foreachRDD.
>>> Does that mean that the DStream will always contain only 1 RDD ?
>>>
>>
>> A DStream creates a RDD for each window corresponding to your batch
>> duration (maybe if there are no data in the current time window, no RDD is
>> created but I'm not sure about that)
>> So no, there is not one single RDD in a DStream, it just depends on the
>> batch duration and the collected data.
>>
>>
>>
>>> Is there a way to access the RDD of the 1st batch in the 2nd batch ? The
>>> 1st batch may contain some records which were not relevant to the first
>>> batch and are to be processed in the 2nd batch. I know i can use the
>>> sliding window mechanism of streaming, but if i'm not using it and there is
>>> no way to access the previous batch's RDD, then it means that functions
>>> like count will always return a DStream containing only 1 RDD, am i correct
>>> ?
>>>
>>>
>> count will be executed for each RDD in the dstream as explained above.
>>
>> If you want to do operations on several RDD in the same DStream, you
>> should try using reduceByWindow for example to "union" several RDD and
>> perform operations on them. But it really depends on what you want to do
>> and I advise you to test different approaches.
>>
>> Maybe other people more skilled than me will have better answers ?
>>
>>
>>>  @Pascal, yes your answer resolves my question partially, but the other
>>> part of the question(which i've clarified in above paragraph) still remains.
>>>
>>> Thanks for your answers !
>>>
>>> Regards,
>>> Sanjay
>>>
>>>
>>>   On Thursday, 20 March 2014 1:27 PM, Pascal Voitot Dev <
>>> pascal.voitot.dev@gmail.com> wrote:
>>>   If I may add my contribution to this discussion if I understand well
>>> your question...
>>>
>>> DStream is discretized stream. It discretized the data stream over
>>> windows of time (according to the project code I've read and paper too). so
>>> when you write:
>>>
>>> JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new
>>> Duration(60 * 60 * 1000)); //1 hour
>>>
>>> It means you are discretizing over a 1h window. Each batch so each RDD
>>> of the dstream will collect data for 1h before going to next RDD.
>>> So if you want to have more RDD, you should reduce batch size/duration...
>>>
>>> Pascal
>>>
>>>
>>> On Thu, Mar 20, 2014 at 7:51 AM, Tathagata Das <
>>> tathagata.das1565@gmail.com> wrote:
>>>
>>> That is a good question. If I understand correctly, you need multiple
>>> RDDs from a DStream in *every batch*. Can you elaborate on why do you need
>>> multiple RDDs every batch?
>>>
>>> TD
>>>
>>>
>>> On Wed, Mar 19, 2014 at 10:20 PM, Sanjay Awatramani <
>>> sanjay_awat@yahoo.com> wrote:
>>>
>>> Hi,
>>>
>>> As I understand, a DStream consists of 1 or more RDDs. And foreachRDD
>>> will run a given func on each and every RDD inside a DStream.
>>>
>>> I created a simple program which reads log files from a folder every
>>> hour:
>>> JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new
>>> Duration(60 * 60 * 1000)); //1 hour
>>> JavaDStream<String> obj = stcObj.textFileStream("/Users/path/to/Input");
>>>
>>> When the interval is reached, Spark reads all the files and creates one
>>> and only one RDD (as i verified from a sysout inside foreachRDD).
>>>
>>> The streaming doc at a lot of places gives an indication that many
>>> operations (e.g. flatMap) on a DStream are applied individually to a RDD
>>> and the resulting DStream consists of the mapped RDDs in the same number as
>>> the input DStream.
>>> ref:
>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#dstreams
>>>
>>> If that is the case, how can i generate a scenario where in I have
>>> multiple RDDs inside a DStream in my example ?
>>>
>>> Regards,
>>> Sanjay
>>>
>>>
>>>
>>>
>>>
>>>
>>
>

Re: Relation between DStream and RDDs

Posted by andy petrella <an...@gmail.com>.
also consider creating pairs and use *byKey* operators, and then the key
will be the structure that will be used to consolidate or deduplicate your
data
my2c


On Thu, Mar 20, 2014 at 11:50 AM, Pascal Voitot Dev <
pascal.voitot.dev@gmail.com> wrote:

> Actually it's quite simple...
>
> DStream[T] is a stream of RDD[T].
> So applying count on DStream is just applying count on each RDD of this
> DStream.
> So at the end of count, you have a DStream[Int] containing the same number
> of RDDs as before but each RDD just contains one element being the count
> result for the corresponding original RDD.
>
> For reduce, it's the same using reduce operation...
>
> The only operations that are a bit more complex are reduceByWindow &
> countByValueAndWindow which union RDD over the time window...
>
> On Thu, Mar 20, 2014 at 9:51 AM, Sanjay Awatramani <sa...@yahoo.com>wrote:
>
>> @TD: I do not need multiple RDDs in a DStream in every batch. On the
>> contrary my logic would work fine if there is only 1 RDD. But then the
>> description for functions like reduce & count (Return a new DStream of
>> single-element RDDs by counting the number of elements in each RDD of the
>> source DStream.) left me confused whether I should account for the fact
>> that a DStream can have multiple RDDs. My streaming code processes a batch
>> every hour. In the 2nd batch, i checked that the DStream contains only 1
>> RDD, i.e. the 2nd batch's RDD. I verified this using sysout in foreachRDD.
>> Does that mean that the DStream will always contain only 1 RDD ?
>>
>
> A DStream creates a RDD for each window corresponding to your batch
> duration (maybe if there are no data in the current time window, no RDD is
> created but I'm not sure about that)
> So no, there is not one single RDD in a DStream, it just depends on the
> batch duration and the collected data.
>
>
>
>> Is there a way to access the RDD of the 1st batch in the 2nd batch ? The
>> 1st batch may contain some records which were not relevant to the first
>> batch and are to be processed in the 2nd batch. I know i can use the
>> sliding window mechanism of streaming, but if i'm not using it and there is
>> no way to access the previous batch's RDD, then it means that functions
>> like count will always return a DStream containing only 1 RDD, am i correct
>> ?
>>
>>
> count will be executed for each RDD in the dstream as explained above.
>
> If you want to do operations on several RDD in the same DStream, you
> should try using reduceByWindow for example to "union" several RDD and
> perform operations on them. But it really depends on what you want to do
> and I advise you to test different approaches.
>
> Maybe other people more skilled than me will have better answers ?
>
>
>>  @Pascal, yes your answer resolves my question partially, but the other
>> part of the question(which i've clarified in above paragraph) still remains.
>>
>> Thanks for your answers !
>>
>> Regards,
>> Sanjay
>>
>>
>>   On Thursday, 20 March 2014 1:27 PM, Pascal Voitot Dev <
>> pascal.voitot.dev@gmail.com> wrote:
>>   If I may add my contribution to this discussion if I understand well
>> your question...
>>
>> DStream is discretized stream. It discretized the data stream over
>> windows of time (according to the project code I've read and paper too). so
>> when you write:
>>
>> JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new
>> Duration(60 * 60 * 1000)); //1 hour
>>
>> It means you are discretizing over a 1h window. Each batch so each RDD of
>> the dstream will collect data for 1h before going to next RDD.
>> So if you want to have more RDD, you should reduce batch size/duration...
>>
>> Pascal
>>
>>
>> On Thu, Mar 20, 2014 at 7:51 AM, Tathagata Das <
>> tathagata.das1565@gmail.com> wrote:
>>
>> That is a good question. If I understand correctly, you need multiple
>> RDDs from a DStream in *every batch*. Can you elaborate on why do you need
>> multiple RDDs every batch?
>>
>> TD
>>
>>
>> On Wed, Mar 19, 2014 at 10:20 PM, Sanjay Awatramani <
>> sanjay_awat@yahoo.com> wrote:
>>
>> Hi,
>>
>> As I understand, a DStream consists of 1 or more RDDs. And foreachRDD
>> will run a given func on each and every RDD inside a DStream.
>>
>> I created a simple program which reads log files from a folder every hour:
>> JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new
>> Duration(60 * 60 * 1000)); //1 hour
>> JavaDStream<String> obj = stcObj.textFileStream("/Users/path/to/Input");
>>
>> When the interval is reached, Spark reads all the files and creates one
>> and only one RDD (as i verified from a sysout inside foreachRDD).
>>
>> The streaming doc at a lot of places gives an indication that many
>> operations (e.g. flatMap) on a DStream are applied individually to a RDD
>> and the resulting DStream consists of the mapped RDDs in the same number as
>> the input DStream.
>> ref:
>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#dstreams
>>
>> If that is the case, how can i generate a scenario where in I have
>> multiple RDDs inside a DStream in my example ?
>>
>> Regards,
>> Sanjay
>>
>>
>>
>>
>>
>>
>

Re: Relation between DStream and RDDs

Posted by Pascal Voitot Dev <pa...@gmail.com>.
Actually it's quite simple...

DStream[T] is a stream of RDD[T].
So applying count on DStream is just applying count on each RDD of this
DStream.
So at the end of count, you have a DStream[Int] containing the same number
of RDDs as before but each RDD just contains one element being the count
result for the corresponding original RDD.

For reduce, it's the same using reduce operation...

The only operations that are a bit more complex are reduceByWindow &
countByValueAndWindow which union RDD over the time window...

On Thu, Mar 20, 2014 at 9:51 AM, Sanjay Awatramani <sa...@yahoo.com>wrote:

> @TD: I do not need multiple RDDs in a DStream in every batch. On the
> contrary my logic would work fine if there is only 1 RDD. But then the
> description for functions like reduce & count (Return a new DStream of
> single-element RDDs by counting the number of elements in each RDD of the
> source DStream.) left me confused whether I should account for the fact
> that a DStream can have multiple RDDs. My streaming code processes a batch
> every hour. In the 2nd batch, i checked that the DStream contains only 1
> RDD, i.e. the 2nd batch's RDD. I verified this using sysout in foreachRDD.
> Does that mean that the DStream will always contain only 1 RDD ?
>

A DStream creates a RDD for each window corresponding to your batch
duration (maybe if there are no data in the current time window, no RDD is
created but I'm not sure about that)
So no, there is not one single RDD in a DStream, it just depends on the
batch duration and the collected data.



> Is there a way to access the RDD of the 1st batch in the 2nd batch ? The
> 1st batch may contain some records which were not relevant to the first
> batch and are to be processed in the 2nd batch. I know i can use the
> sliding window mechanism of streaming, but if i'm not using it and there is
> no way to access the previous batch's RDD, then it means that functions
> like count will always return a DStream containing only 1 RDD, am i correct
> ?
>
>
count will be executed for each RDD in the dstream as explained above.

If you want to do operations on several RDD in the same DStream, you should
try using reduceByWindow for example to "union" several RDD and perform
operations on them. But it really depends on what you want to do and I
advise you to test different approaches.

Maybe other people more skilled than me will have better answers ?


>  @Pascal, yes your answer resolves my question partially, but the other
> part of the question(which i've clarified in above paragraph) still remains.
>
> Thanks for your answers !
>
> Regards,
> Sanjay
>
>
>   On Thursday, 20 March 2014 1:27 PM, Pascal Voitot Dev <
> pascal.voitot.dev@gmail.com> wrote:
>   If I may add my contribution to this discussion if I understand well
> your question...
>
> DStream is discretized stream. It discretized the data stream over windows
> of time (according to the project code I've read and paper too). so when
> you write:
>
> JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new
> Duration(60 * 60 * 1000)); //1 hour
>
> It means you are discretizing over a 1h window. Each batch so each RDD of
> the dstream will collect data for 1h before going to next RDD.
> So if you want to have more RDD, you should reduce batch size/duration...
>
> Pascal
>
>
> On Thu, Mar 20, 2014 at 7:51 AM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
> That is a good question. If I understand correctly, you need multiple RDDs
> from a DStream in *every batch*. Can you elaborate on why do you need
> multiple RDDs every batch?
>
> TD
>
>
> On Wed, Mar 19, 2014 at 10:20 PM, Sanjay Awatramani <sanjay_awat@yahoo.com
> > wrote:
>
> Hi,
>
> As I understand, a DStream consists of 1 or more RDDs. And foreachRDD will
> run a given func on each and every RDD inside a DStream.
>
> I created a simple program which reads log files from a folder every hour:
> JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new
> Duration(60 * 60 * 1000)); //1 hour
> JavaDStream<String> obj = stcObj.textFileStream("/Users/path/to/Input");
>
> When the interval is reached, Spark reads all the files and creates one
> and only one RDD (as i verified from a sysout inside foreachRDD).
>
> The streaming doc at a lot of places gives an indication that many
> operations (e.g. flatMap) on a DStream are applied individually to a RDD
> and the resulting DStream consists of the mapped RDDs in the same number as
> the input DStream.
> ref:
> https://spark.apache.org/docs/latest/streaming-programming-guide.html#dstreams
>
> If that is the case, how can i generate a scenario where in I have
> multiple RDDs inside a DStream in my example ?
>
> Regards,
> Sanjay
>
>
>
>
>
>

Re: Relation between DStream and RDDs

Posted by Sanjay Awatramani <sa...@yahoo.com>.
@TD: I do not need multiple RDDs in a DStream in every batch. On the contrary my logic would work fine if there is only 1 RDD. But then the description for functions like reduce & count (Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream.) left me confused whether I should account for the fact that a DStream can have multiple RDDs. My streaming code processes a batch every hour. In the 2nd batch, i checked that the DStream contains only 1 RDD, i.e. the 2nd batch's RDD. I verified this using sysout in foreachRDD. Does that mean that the DStream will always contain only 1 RDD ? Is there a way to access the RDD of the 1st batch in the 2nd batch ? The 1st batch may contain some records which were not relevant to the first batch and are to be processed in the 2nd batch. I know i can use the sliding window mechanism of streaming, but if i'm not using it and there is no way to access the previous
 batch's RDD, then it means that functions like count will always return a DStream containing only 1 RDD, am i correct ?

@Pascal, yes your answer resolves my question partially, but the other part of the question(which i've clarified in above paragraph) still remains.

Thanks for your answers !

Regards,
Sanjay



On Thursday, 20 March 2014 1:27 PM, Pascal Voitot Dev <pa...@gmail.com> wrote:
 
If I may add my contribution to this discussion if I understand well your question...


DStream is discretized stream. It discretized the data stream over windows of time (according to the project code I've read and paper too). so when you write:


JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new Duration(60 * 60 * 1000)); //1 hour


It means you are discretizing over a 1h window. Each batch so each RDD of the dstream will collect data for 1h before going to next RDD.

So if you want to have more RDD, you should reduce batch size/duration...


Pascal




On Thu, Mar 20, 2014 at 7:51 AM, Tathagata Das <ta...@gmail.com> wrote:

That is a good question. If I understand correctly, you need multiple RDDs from a DStream in *every batch*. Can you elaborate on why do you need multiple RDDs every batch?
>
>
>TD
>
>
>
>On Wed, Mar 19, 2014 at 10:20 PM, Sanjay Awatramani <sa...@yahoo.com> wrote:
>
>Hi,
>>
>>
>>As I understand, a DStream consists of 1 or more RDDs. And foreachRDD will run a given func on each and every RDD inside a DStream.
>>
>>
>>I created a simple program which reads log files from a folder every hour:
>>JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new Duration(60 * 60 * 1000)); //1 hour
>>JavaDStream<String> obj = stcObj.textFileStream("/Users/path/to/Input");
>>
>>
>>When the interval is reached, Spark reads all the files and creates one and only one RDD (as i verified from a sysout inside foreachRDD).
>>
>>
>>The streaming doc at a lot of places gives an indication that many operations (e.g. flatMap) on a DStream are applied individually to a RDD and the resulting DStream consists of the mapped RDDs in the same number as the input DStream.
>>ref: https://spark.apache.org/docs/latest/streaming-programming-guide.html#dstreams
>>
>>
>>If that is the case, how can i generate a scenario where in I have multiple RDDs inside a DStream in my example ?
>>
>>
>>Regards,
>>Sanjay
>

Re: Relation between DStream and RDDs

Posted by Pascal Voitot Dev <pa...@gmail.com>.
If I may add my contribution to this discussion if I understand well your
question...

DStream is discretized stream. It discretized the data stream over windows
of time (according to the project code I've read and paper too). so when
you write:

JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new
Duration(60 * 60 * 1000)); //1 hour

It means you are discretizing over a 1h window. Each batch so each RDD of
the dstream will collect data for 1h before going to next RDD.
So if you want to have more RDD, you should reduce batch size/duration...

Pascal


On Thu, Mar 20, 2014 at 7:51 AM, Tathagata Das
<ta...@gmail.com>wrote:

> That is a good question. If I understand correctly, you need multiple RDDs
> from a DStream in *every batch*. Can you elaborate on why do you need
> multiple RDDs every batch?
>
> TD
>
>
> On Wed, Mar 19, 2014 at 10:20 PM, Sanjay Awatramani <sanjay_awat@yahoo.com
> > wrote:
>
>> Hi,
>>
>> As I understand, a DStream consists of 1 or more RDDs. And foreachRDD
>> will run a given func on each and every RDD inside a DStream.
>>
>> I created a simple program which reads log files from a folder every hour:
>> JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new
>> Duration(60 * 60 * 1000)); //1 hour
>> JavaDStream<String> obj = stcObj.textFileStream("/Users/path/to/Input");
>>
>> When the interval is reached, Spark reads all the files and creates one
>> and only one RDD (as i verified from a sysout inside foreachRDD).
>>
>> The streaming doc at a lot of places gives an indication that many
>> operations (e.g. flatMap) on a DStream are applied individually to a RDD
>> and the resulting DStream consists of the mapped RDDs in the same number as
>> the input DStream.
>> ref:
>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#dstreams
>>
>> If that is the case, how can i generate a scenario where in I have
>> multiple RDDs inside a DStream in my example ?
>>
>> Regards,
>> Sanjay
>>
>
>

Re: Relation between DStream and RDDs

Posted by Tathagata Das <ta...@gmail.com>.
That is a good question. If I understand correctly, you need multiple RDDs
from a DStream in *every batch*. Can you elaborate on why do you need
multiple RDDs every batch?

TD


On Wed, Mar 19, 2014 at 10:20 PM, Sanjay Awatramani
<sa...@yahoo.com>wrote:

> Hi,
>
> As I understand, a DStream consists of 1 or more RDDs. And foreachRDD will
> run a given func on each and every RDD inside a DStream.
>
> I created a simple program which reads log files from a folder every hour:
> JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new
> Duration(60 * 60 * 1000)); //1 hour
> JavaDStream<String> obj = stcObj.textFileStream("/Users/path/to/Input");
>
> When the interval is reached, Spark reads all the files and creates one
> and only one RDD (as i verified from a sysout inside foreachRDD).
>
> The streaming doc at a lot of places gives an indication that many
> operations (e.g. flatMap) on a DStream are applied individually to a RDD
> and the resulting DStream consists of the mapped RDDs in the same number as
> the input DStream.
> ref:
> https://spark.apache.org/docs/latest/streaming-programming-guide.html#dstreams
>
> If that is the case, how can i generate a scenario where in I have
> multiple RDDs inside a DStream in my example ?
>
> Regards,
> Sanjay
>