You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Aries Kong <ar...@gmail.com> on 2014/02/19 07:05:50 UTC

Spark Streaming windowing Driven by absolutely time?

hi all,

It seems that the Windowing in Spark Streaming Driven by absolutely
time not conventionally by the timestamp of the data, can anybody
kindly explains why? How can I do if I need Windowing driven by the
data-timestamp?

Thanks!


Aries.Kong

Re: Spark Streaming windowing Driven by absolutely time?

Posted by Aries Kong <ar...@gmail.com>.
A larger window does not work in my case,  i will try to change the
algorithm to avoid sliding window.

Thanks very much!

2014-02-21 6:17 GMT+08:00 Tathagata Das <ta...@gmail.com>:
> The reason we chose to define windows based on time because of our
> underlying system design of Spark Streaming. Spark Streaming essentially
> divides received data in batches of fixed time interval and then runs Spark
> job on that data. So the system naturally maintains a mapping of <time
> interval> to <RDD containing data received in that interval> . So it is
> conceptually simple to defined windows based on this batch interval and
> combine RDDs accordingly. For example, if you are doing 1 seconds batches
> and you have generated RDD1 for data in 0 - 1 second interval, RDD2 for 1-2
> second interval, and so on, defining the window of 4 seconds means combine 4
> RDDs together. This keeps the system simple which providing a windowing
> functionality that works for many use cases.
>
> Defining windows based on the data-timestamp is actually a non-trivial
> problem to solve, to get all the semantics right. For example, what happens
> if some records arrive late / out of order and the corresponding window has
> been close? We may look at solving such problems in the future.
>
> In the meantime, here is a way for defining windows based on timestamps.
> Lets say you want to define a window of 5 minutes, and your data can be at
> most 1 minute late (i.e. out of order) based on your data-timestamp. Then
> you can define a DStream window (based on arrival time) of 10 seconds (more
> than 5 + 1 = 6) and then filter out only the timestamped based window that
> you want.
>
> |-------------------------------- 10 minute DStream.window
> --------------------------------------|
>           |_____________ filtered 5 minute window based on timestamp__|
>
> In terms of code it will probably look something like this (in Scala)
>
> yourInputDStream.window(Minutes(10)).transform(windowedRDD => {
>      val timeStampBasedWindow = ....              // define the window over
> the timestamp that you want to process
>      val filteredRDD = windowedRDD.filter( .... ) // filter and retain only
> the records that fall in the timstamp-based window
>      ....                                         // do your processing on
> that filtered dataset
> })
>
> To answer what Mayur asked, there is no in-built way to batch by number of
> files. However, for files, you can write your batch logic to define the
> batches of files, and convert each batch to RDDs (using the standard
> sparkContext.hadoopFile, etc.) And then you can use
> StreamingContext.queueStream to push those manually generated RDDs into a
> DStream.
>
> Hope this helps!
>
>
> On Wed, Feb 19, 2014 at 5:30 AM, dachuan <hd...@gmail.com> wrote:
>>
>> How about existing solution, for example, mapreduce online model?
>>
>>
>> On Wed, Feb 19, 2014 at 8:15 AM, Mayur Rustagi <ma...@gmail.com>
>> wrote:
>>>
>>> also can we batch by other stuff like number of files as well as time?
>>>
>>> Mayur Rustagi
>>> Ph: +919632149971
>>> http://www.sigmoidanalytics.com
>>> https://twitter.com/mayur_rustagi
>>>
>>>
>>>
>>> On Wed, Feb 19, 2014 at 5:05 AM, dachuan <hd...@gmail.com> wrote:
>>>>
>>>> I don't have a conclusive answer but I would like to discuss this.
>>>>
>>>> If one node CPU is slower than the other, Windowing in absolute time
>>>> won't cause any trouble because data are well partitioned.
>>>>
>>>> On Feb 19, 2014 1:06 AM, "Aries Kong" <ar...@gmail.com> wrote:
>>>>>
>>>>> hi all,
>>>>>
>>>>> It seems that the Windowing in Spark Streaming Driven by absolutely
>>>>> time not conventionally by the timestamp of the data, can anybody
>>>>> kindly explains why? How can I do if I need Windowing driven by the
>>>>> data-timestamp?
>>>>>
>>>>> Thanks!
>>>>>
>>>>>
>>>>> Aries.Kong
>>>
>>>
>>
>>
>>
>> --
>> Dachuan Huang
>> Cellphone: 614-390-7234
>> 2015 Neil Avenue
>> Ohio State University
>> Columbus, Ohio
>> U.S.A.
>> 43210
>
>

Re: Spark Streaming windowing Driven by absolutely time?

Posted by Tathagata Das <ta...@gmail.com>.
The reason we chose to define windows based on time because of our
underlying system design of Spark Streaming. Spark Streaming essentially
divides received data in batches of fixed time interval and then runs Spark
job on that data. So the system naturally maintains a mapping of <time
interval> to <RDD containing data received in that interval> . So it is
conceptually simple to defined windows based on this batch interval and
combine RDDs accordingly. For example, if you are doing 1 seconds batches
and you have generated RDD1 for data in 0 - 1 second interval, RDD2 for 1-2
second interval, and so on, defining the window of 4 seconds means combine
4 RDDs together. This keeps the system simple which providing a windowing
functionality that works for many use cases.

Defining windows based on the data-timestamp is actually a non-trivial
problem to solve, to get all the semantics right. For example, what happens
if some records arrive late / out of order and the corresponding window has
been close? We may look at solving such problems in the future.

In the meantime, here is a way for defining windows based on timestamps.
Lets say you want to define a window of 5 minutes, and your data can be at
most 1 minute late (i.e. out of order) based on your data-timestamp. Then
you can define a DStream window (based on arrival time) of 10 seconds (more
than 5 + 1 = 6) and then filter out only the timestamped based window that
you want.

|-------------------------------- 10 minute DStream.window
--------------------------------------|
          |_____________ filtered 5 minute window based on timestamp__|

In terms of code it will probably look something like this (in Scala)

yourInputDStream.window(Minutes(10)).transform(windowedRDD => {
     val timeStampBasedWindow = ....              // define the window over
the timestamp that you want to process
     val filteredRDD = windowedRDD.filter( .... ) // filter and retain only
the records that fall in the timstamp-based window
     ....                                         // do your processing on
that filtered dataset
})

To answer what Mayur asked, there is no in-built way to batch by number of
files. However, for files, you can write your batch logic to define the
batches of files, and convert each batch to RDDs (using the standard
sparkContext.hadoopFile, etc.) And then you can use
StreamingContext.queueStream to push those manually generated RDDs into a
DStream.

Hope this helps!


On Wed, Feb 19, 2014 at 5:30 AM, dachuan <hd...@gmail.com> wrote:

> How about existing solution, for example, mapreduce online model?
>
>
> On Wed, Feb 19, 2014 at 8:15 AM, Mayur Rustagi <ma...@gmail.com>wrote:
>
>> also can we batch by other stuff like number of files as well as time?
>>
>> Mayur Rustagi
>> Ph: +919632149971
>> h <https://twitter.com/mayur_rustagi>ttp://www.sigmoidanalytics.com
>> https://twitter.com/mayur_rustagi
>>
>>
>>
>> On Wed, Feb 19, 2014 at 5:05 AM, dachuan <hd...@gmail.com> wrote:
>>
>>> I don't have a conclusive answer but I would like to discuss this.
>>>
>>> If one node CPU is slower than the other, Windowing in absolute time
>>> won't cause any trouble because data are well partitioned.
>>> On Feb 19, 2014 1:06 AM, "Aries Kong" <ar...@gmail.com> wrote:
>>>
>>>> hi all,
>>>>
>>>> It seems that the Windowing in Spark Streaming Driven by absolutely
>>>> time not conventionally by the timestamp of the data, can anybody
>>>> kindly explains why? How can I do if I need Windowing driven by the
>>>> data-timestamp?
>>>>
>>>> Thanks!
>>>>
>>>>
>>>> Aries.Kong
>>>>
>>>
>>
>
>
> --
> Dachuan Huang
> Cellphone: 614-390-7234
> 2015 Neil Avenue
> Ohio State University
> Columbus, Ohio
> U.S.A.
> 43210
>

Re: Spark Streaming windowing Driven by absolutely time?

Posted by dachuan <hd...@gmail.com>.
How about existing solution, for example, mapreduce online model?


On Wed, Feb 19, 2014 at 8:15 AM, Mayur Rustagi <ma...@gmail.com>wrote:

> also can we batch by other stuff like number of files as well as time?
>
> Mayur Rustagi
> Ph: +919632149971
> h <https://twitter.com/mayur_rustagi>ttp://www.sigmoidanalytics.com
> https://twitter.com/mayur_rustagi
>
>
>
> On Wed, Feb 19, 2014 at 5:05 AM, dachuan <hd...@gmail.com> wrote:
>
>> I don't have a conclusive answer but I would like to discuss this.
>>
>> If one node CPU is slower than the other, Windowing in absolute time
>> won't cause any trouble because data are well partitioned.
>> On Feb 19, 2014 1:06 AM, "Aries Kong" <ar...@gmail.com> wrote:
>>
>>> hi all,
>>>
>>> It seems that the Windowing in Spark Streaming Driven by absolutely
>>> time not conventionally by the timestamp of the data, can anybody
>>> kindly explains why? How can I do if I need Windowing driven by the
>>> data-timestamp?
>>>
>>> Thanks!
>>>
>>>
>>> Aries.Kong
>>>
>>
>


-- 
Dachuan Huang
Cellphone: 614-390-7234
2015 Neil Avenue
Ohio State University
Columbus, Ohio
U.S.A.
43210

Re: Spark Streaming windowing Driven by absolutely time?

Posted by Mayur Rustagi <ma...@gmail.com>.
also can we batch by other stuff like number of files as well as time?

Mayur Rustagi
Ph: +919632149971
h <https://twitter.com/mayur_rustagi>ttp://www.sigmoidanalytics.com
https://twitter.com/mayur_rustagi



On Wed, Feb 19, 2014 at 5:05 AM, dachuan <hd...@gmail.com> wrote:

> I don't have a conclusive answer but I would like to discuss this.
>
> If one node CPU is slower than the other, Windowing in absolute time won't
> cause any trouble because data are well partitioned.
> On Feb 19, 2014 1:06 AM, "Aries Kong" <ar...@gmail.com> wrote:
>
>> hi all,
>>
>> It seems that the Windowing in Spark Streaming Driven by absolutely
>> time not conventionally by the timestamp of the data, can anybody
>> kindly explains why? How can I do if I need Windowing driven by the
>> data-timestamp?
>>
>> Thanks!
>>
>>
>> Aries.Kong
>>
>

Re: Spark Streaming windowing Driven by absolutely time?

Posted by dachuan <hd...@gmail.com>.
I don't have a conclusive answer but I would like to discuss this.

If one node CPU is slower than the other, Windowing in absolute time won't
cause any trouble because data are well partitioned.
On Feb 19, 2014 1:06 AM, "Aries Kong" <ar...@gmail.com> wrote:

> hi all,
>
> It seems that the Windowing in Spark Streaming Driven by absolutely
> time not conventionally by the timestamp of the data, can anybody
> kindly explains why? How can I do if I need Windowing driven by the
> data-timestamp?
>
> Thanks!
>
>
> Aries.Kong
>