You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Bill Jay <bi...@gmail.com> on 2014/07/17 02:39:15 UTC

Spark Streaming timestamps

Hi all,

I am currently using Spark Streaming to conduct a real-time data analytics.
We receive data from Kafka. We want to generate output files that contain
results that are based on the data we receive from a specific time
interval.

I have several questions on Spark Streaming's timestamp:

1) If I use saveAsTextFiles, it seems Spark streaming will generate files
in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix time),
etc. Does this mean the results are based on the data from 5:00:01 to
5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time the
files are generated?

2) If I do not use saveAsTextFiles, how do I get the exact time interval of
the RDD when I use foreachRDD to do custom output of the results?

3) How can we specify the starting time of the batches?

Thanks!

Bill

Re: Spark Streaming timestamps

Posted by Laeeq Ahmed <la...@yahoo.com>.
Hi Bill,

Hope the following is what you need.

val zerotime = System.currentTimeMillis()


Then in foreach do the following

//difference = RDDtimeparameter - zerotime //only to find the constant value to be used later
starttime = (RDDtimeparameter - (zerotime + difference)) -  intervalsize

endtime =  RDDtimeparameter - (zerotime + difference) 


Here zerotime is the time when streaming starts. Find out what is the difference (RDDtimeparameter - zerotime) for the first batch. Use it as a constant.(in mycase its 5000ms) At the end I will say its better to work with Spark time stamps rather than Application timestamps as it becomes very messy. If you reach a better solution, also let me know.

Regards,
Laeeq



On Friday, July 18, 2014 7:21 PM, Bill Jay <bi...@gmail.com> wrote:
 


Hi Tathagata,





On Thu, Jul 17, 2014 at 6:12 PM, Tathagata Das <ta...@gmail.com> wrote:

The RDD parameter in foreachRDD contains raw/transformed data from the last batch. So when forearchRDD is called with the time parameter as 5:02:01 and batch size is 1 minute, then the rdd will contain data based on the data received by between 5:02:00 and 5:02:01. 
Do you mean the data between 5:02:02 and 5:02:01? The time parameter is 5:02:01. Moreover, when the program is running, it is very difficult to specify a starting time because sometimes it is difficult to know when the program executes that line. And do we need a different time parameter for each foreachRDD or Spark will calculate the next one according to batch.   

>
>If you want to do custom intervals, then I suggest the following 
>1. Do 1 second batch intervals
>2. Then in the foreachRDD, from  5:02:30 to 5:03:28, put all the RDDs in a ArrayBuffer/ListBuffer
>3. At 5:03:29, add the RDD to the buffer, and do a union of all the buffered RDDs, and process them.
>
>
>So in foreachRDD, based on the time, buffer the RDDs, until you reach the appropriate time. Then union all the buffered RDDs and process them. 
>
>
>TD
>
>
>
>On Thu, Jul 17, 2014 at 2:05 PM, Bill Jay <bi...@gmail.com> wrote:
>
>Hi Tathagata,
>>
>>
>>
>>Thanks for your answer. Please see my further question below: 
>>
>>
>>
>>
>>On Wed, Jul 16, 2014 at 6:57 PM, Tathagata Das <ta...@gmail.com> wrote:
>>
>>Answers inline.
>>>
>>>
>>>
>>>
>>>On Wed, Jul 16, 2014 at 5:39 PM, Bill Jay <bi...@gmail.com> wrote:
>>>
>>>Hi all,
>>>>
>>>>
>>>>I am currently using Spark Streaming to conduct a real-time data analytics. We receive data from Kafka. We want to generate output files that contain results that are based on the data we receive from a specific time interval. 
>>>>
>>>>
>>>>I have several questions on Spark Streaming's timestamp:
>>>>
>>>>
>>>>1) If I use saveAsTextFiles, it seems Spark streaming will generate files in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix time), etc. Does this mean the results are based on the data from 5:00:01 to 5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time the files are generated?
>>>>
>>>>
>>>File named  5:00:01 contains results from data received between  5:00:00 and  5:00:01 (based on system time of the cluster).
>>>
>>>
>>> 
>>>2) If I do not use saveAsTextFiles, how do I get the exact time interval of the RDD when I use foreachRDD to do custom output of the results? 
>>>>
>>>>
>>>There is a version of foreachRDD which allows you specify the function that takes in Time object. 
>>> 
>>>3) How can we specify the starting time of the batches?
>>> 
>>>What do you mean? Batches are timed based on the system time of the cluster. 
>>I would like to control the starting time and ending time of each batch. For example, if I use saveAsTextFiles as output method and the batch size is 1 minute, Spark will align time intervals to complete minutes, such as 5:01:00, 5:02:00, 5:03:00. It will have not results that are 5:01:03, 5:02:03, 5:03:03, etc. My goal is to generate output for a customized interval such as from 5:01:30 to 5:02:29, 5:02:30 to 5:03:29, etc. 
>>
>>
>>I checked the api of foreachRDD with time parameter. It seems there is not explanation on what does that parameter mean. Does it mean the starting time of the first batch?   
>> 
>>>
>>>>
>>>>Thanks!
>>>>
>>>>
>>>>Bill
>>>
>>
>

Re: Spark Streaming timestamps

Posted by Bill Jay <bi...@gmail.com>.
Hi Tathagata,




On Thu, Jul 17, 2014 at 6:12 PM, Tathagata Das <ta...@gmail.com>
wrote:

> The RDD parameter in foreachRDD contains raw/transformed data from the
> last batch. So when forearchRDD is called with the time parameter as 5:02:01
> and batch size is 1 minute, then the rdd will contain data based on the
> data received by between 5:02:00 and 5:02:01.
>
Do you mean the data between 5:02:02 and 5:02:01? The time parameter is
5:02:01. Moreover, when the program is running, it is very difficult to
specify a starting time because sometimes it is difficult to know when the
program executes that line. And do we need a different time parameter for
each foreachRDD or Spark will calculate the next one according to batch.

>
> If you want to do custom intervals, then I suggest the following
> 1. Do 1 second batch intervals
> 2. Then in the foreachRDD, from  5:02:30 to 5:03:28, put all the RDDs in
> a ArrayBuffer/ListBuffer
> 3. At 5:03:29, add the RDD to the buffer, and do a union of all the
> buffered RDDs, and process them.
>
> So in foreachRDD, based on the time, buffer the RDDs, until you reach the
> appropriate time. Then union all the buffered RDDs and process them.
>
> TD
>
>
> On Thu, Jul 17, 2014 at 2:05 PM, Bill Jay <bi...@gmail.com>
> wrote:
>
>> Hi Tathagata,
>>
>> Thanks for your answer. Please see my further question below:
>>
>>
>> On Wed, Jul 16, 2014 at 6:57 PM, Tathagata Das <
>> tathagata.das1565@gmail.com> wrote:
>>
>>> Answers inline.
>>>
>>>
>>> On Wed, Jul 16, 2014 at 5:39 PM, Bill Jay <bi...@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I am currently using Spark Streaming to conduct a real-time data
>>>> analytics. We receive data from Kafka. We want to generate output files
>>>> that contain results that are based on the data we receive from a specific
>>>> time interval.
>>>>
>>>> I have several questions on Spark Streaming's timestamp:
>>>>
>>>> 1) If I use saveAsTextFiles, it seems Spark streaming will generate
>>>> files in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix
>>>> time), etc. Does this mean the results are based on the data from 5:00:01
>>>> to 5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time
>>>> the files are generated?
>>>>
>>>> File named  5:00:01 contains results from data received between
>>>  5:00:00 and  5:00:01 (based on system time of the cluster).
>>>
>>>
>>>
>>>> 2) If I do not use saveAsTextFiles, how do I get the exact time
>>>> interval of the RDD when I use foreachRDD to do custom output of the
>>>> results?
>>>>
>>>> There is a version of foreachRDD which allows you specify the function
>>> that takes in Time object.
>>>
>>>
>>>> 3) How can we specify the starting time of the batches?
>>>>
>>>
>>> What do you mean? Batches are timed based on the system time of the
>>> cluster.
>>>
>> I would like to control the starting time and ending time of each batch.
>> For example, if I use saveAsTextFiles as output method and the batch size
>> is 1 minute, Spark will align time intervals to complete minutes, such as
>> 5:01:00, 5:02:00, 5:03:00. It will have not results that are 5:01:03,
>> 5:02:03, 5:03:03, etc. My goal is to generate output for a customized
>> interval such as from 5:01:30 to 5:02:29, 5:02:30 to 5:03:29, etc.
>>
>> I checked the api of foreachRDD with time parameter. It seems there is
>> not explanation on what does that parameter mean. Does it mean the starting
>> time of the first batch?
>>
>>>
>>>
>>>>
>>>> Thanks!
>>>>
>>>> Bill
>>>>
>>>
>>>
>>
>

Re: Spark Streaming timestamps

Posted by Tathagata Das <ta...@gmail.com>.
The RDD parameter in foreachRDD contains raw/transformed data from the last
batch. So when forearchRDD is called with the time parameter as 5:02:01 and
batch size is 1 minute, then the rdd will contain data based on the data
received by between 5:02:00 and 5:02:01.

If you want to do custom intervals, then I suggest the following
1. Do 1 second batch intervals
2. Then in the foreachRDD, from  5:02:30 to 5:03:28, put all the RDDs in a
ArrayBuffer/ListBuffer
3. At 5:03:29, add the RDD to the buffer, and do a union of all the
buffered RDDs, and process them.

So in foreachRDD, based on the time, buffer the RDDs, until you reach the
appropriate time. Then union all the buffered RDDs and process them.

TD


On Thu, Jul 17, 2014 at 2:05 PM, Bill Jay <bi...@gmail.com>
wrote:

> Hi Tathagata,
>
> Thanks for your answer. Please see my further question below:
>
>
> On Wed, Jul 16, 2014 at 6:57 PM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
>> Answers inline.
>>
>>
>> On Wed, Jul 16, 2014 at 5:39 PM, Bill Jay <bi...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I am currently using Spark Streaming to conduct a real-time data
>>> analytics. We receive data from Kafka. We want to generate output files
>>> that contain results that are based on the data we receive from a specific
>>> time interval.
>>>
>>> I have several questions on Spark Streaming's timestamp:
>>>
>>> 1) If I use saveAsTextFiles, it seems Spark streaming will generate
>>> files in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix
>>> time), etc. Does this mean the results are based on the data from 5:00:01
>>> to 5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time
>>> the files are generated?
>>>
>>> File named  5:00:01 contains results from data received between  5:00:00
>> and  5:00:01 (based on system time of the cluster).
>>
>>
>>
>>> 2) If I do not use saveAsTextFiles, how do I get the exact time interval
>>> of the RDD when I use foreachRDD to do custom output of the results?
>>>
>>> There is a version of foreachRDD which allows you specify the function
>> that takes in Time object.
>>
>>
>>> 3) How can we specify the starting time of the batches?
>>>
>>
>> What do you mean? Batches are timed based on the system time of the
>> cluster.
>>
> I would like to control the starting time and ending time of each batch.
> For example, if I use saveAsTextFiles as output method and the batch size
> is 1 minute, Spark will align time intervals to complete minutes, such as
> 5:01:00, 5:02:00, 5:03:00. It will have not results that are 5:01:03,
> 5:02:03, 5:03:03, etc. My goal is to generate output for a customized
> interval such as from 5:01:30 to 5:02:29, 5:02:30 to 5:03:29, etc.
>
> I checked the api of foreachRDD with time parameter. It seems there is not
> explanation on what does that parameter mean. Does it mean the starting
> time of the first batch?
>
>>
>>
>>>
>>> Thanks!
>>>
>>> Bill
>>>
>>
>>
>

Re: Spark Streaming timestamps

Posted by Bill Jay <bi...@gmail.com>.
Hi Tathagata,

Thanks for your answer. Please see my further question below:


On Wed, Jul 16, 2014 at 6:57 PM, Tathagata Das <ta...@gmail.com>
wrote:

> Answers inline.
>
>
> On Wed, Jul 16, 2014 at 5:39 PM, Bill Jay <bi...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> I am currently using Spark Streaming to conduct a real-time data
>> analytics. We receive data from Kafka. We want to generate output files
>> that contain results that are based on the data we receive from a specific
>> time interval.
>>
>> I have several questions on Spark Streaming's timestamp:
>>
>> 1) If I use saveAsTextFiles, it seems Spark streaming will generate files
>> in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix time),
>> etc. Does this mean the results are based on the data from 5:00:01 to
>> 5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time the
>> files are generated?
>>
>> File named  5:00:01 contains results from data received between  5:00:00
> and  5:00:01 (based on system time of the cluster).
>
>
>
>> 2) If I do not use saveAsTextFiles, how do I get the exact time interval
>> of the RDD when I use foreachRDD to do custom output of the results?
>>
>> There is a version of foreachRDD which allows you specify the function
> that takes in Time object.
>
>
>> 3) How can we specify the starting time of the batches?
>>
>
> What do you mean? Batches are timed based on the system time of the
> cluster.
>
I would like to control the starting time and ending time of each batch.
For example, if I use saveAsTextFiles as output method and the batch size
is 1 minute, Spark will align time intervals to complete minutes, such as
5:01:00, 5:02:00, 5:03:00. It will have not results that are 5:01:03,
5:02:03, 5:03:03, etc. My goal is to generate output for a customized
interval such as from 5:01:30 to 5:02:29, 5:02:30 to 5:03:29, etc.

I checked the api of foreachRDD with time parameter. It seems there is not
explanation on what does that parameter mean. Does it mean the starting
time of the first batch?

>
>
>>
>> Thanks!
>>
>> Bill
>>
>
>

Re: Spark Streaming timestamps

Posted by Tathagata Das <ta...@gmail.com>.
Answers inline.


On Wed, Jul 16, 2014 at 5:39 PM, Bill Jay <bi...@gmail.com>
wrote:

> Hi all,
>
> I am currently using Spark Streaming to conduct a real-time data
> analytics. We receive data from Kafka. We want to generate output files
> that contain results that are based on the data we receive from a specific
> time interval.
>
> I have several questions on Spark Streaming's timestamp:
>
> 1) If I use saveAsTextFiles, it seems Spark streaming will generate files
> in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix time),
> etc. Does this mean the results are based on the data from 5:00:01 to
> 5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time the
> files are generated?
>
> File named  5:00:01 contains results from data received between  5:00:00
and  5:00:01 (based on system time of the cluster).



> 2) If I do not use saveAsTextFiles, how do I get the exact time interval
> of the RDD when I use foreachRDD to do custom output of the results?
>
> There is a version of foreachRDD which allows you specify the function
that takes in Time object.


> 3) How can we specify the starting time of the batches?
>

What do you mean? Batches are timed based on the system time of the
cluster.


>
> Thanks!
>
> Bill
>