You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Adrian Mocanu <am...@verticalscope.com> on 2014/02/19 21:45:03 UTC

Q: Discretized Streams: Fault-Tolerant Streaming Computation paper

I have a question on the following paper
"Discretized Streams: Fault-Tolerant Streaming Computation at Scale"
written by
Matei Zaharia, Tathagata Das, Haoyuan Li, Timothy Hunter, Scott Shenker, Ion Stoica
and available at
http://www.cs.berkeley.edu/~matei/papers/2013/sosp_spark_streaming.pdf

Specifically I'm interested in Section 3.2 on page 5 called "Timing Considerations".
This section talks about external timestamp. For me I'm looking to use method 2 and correct for late records at the
application level.
The paper says "[application] could output a new count for time interval [t, t+1) at time t+5, based on the records for this interval received between t and t+5. This computation can be performed with an efficient incremental reduce operation that adds the old counts computed at t+1 to the counts of new records since then, avoiding wasted work."

Q1:
If my data comes 24 hours late on the same DStream (ie: t=t0+24hr) and I'm recording per minute aggregates, wouldn't the RDD with data which came 24 hours ago be already deleted from disk by Spark? (I'd hope so otherwise it runs out of space)

Q2:
The paper talks about "incremental reduce". I'd like to know what it is. I do use reduce so I could get an aggregate of counts. What is this incremental reduce?

Thanks
-A

Re: Q: Discretized Streams: Fault-Tolerant Streaming Computation paper

Posted by Tathagata Das <ta...@gmail.com>.
1. Spark Streaming automatically keeps track of how long to remember the
RDDs for each DStream (which varies based on window operations etc.). As
Dachuan pointed out correctly, remember allows you to configure that
duration if you want RDDs to be remembered for a great duration. Now, in
the current implementation (Spark 0.9), even though the Spark streaming
dereferences the RDDs, the actual cached data of the RDD is not
automatically uncached. The spark.cleaner.ttl configuration parameter (see
configuration page in Spark online documentation) forcing all RDD data that
are older than the "ttl" value to be cleaned. That needs to be set.
Alternatively, you can also enabled the configuration
spark.streaming.unpersist=true (set to false by default) which make the
system automatically uncache those RDDs.

In future (Spark 1.0), this will improve further; we will be able to
automatically uncache any RDDs (not just Spark Streaming) that are not in
scope any more, without any explicit configuration.

2. Yes! Dachuan's explanation of the incremental reduce is absolutely
correct.


On Wed, Feb 19, 2014 at 2:04 PM, Adrian Mocanu <am...@verticalscope.com>wrote:

>  That makes sense.
>
> I wonder if any of the authors of that paper could comment.
>
>
>
> *From:* dachuan [mailto:hdc1112@gmail.com]
> *Sent:* February-19-14 3:55 PM
> *To:* user@spark.incubator.apache.org
> *Subject:* Re: Q: Discretized Streams: Fault-Tolerant Streaming
> Computation paper
>
>
>
> It seems StreamingContext has a function:
>
>
>
>   def remember(duration: Duration) {
>
>     graph.remember(duration)
>
>   }
>
>
>
> and in my opinion, incremental reduce means:
>
>
>
> 1 2 3 4 5 6
>
> window_size =5
>
> sum_of_first_window = 1+2+3+4+5=15
>
> sum_of_second_window_method_1=2+3+4+5+6=20
>
> sum_of_second_window_method_2=15+6-1=20
>
>
>
> On Wed, Feb 19, 2014 at 3:45 PM, Adrian Mocanu <am...@verticalscope.com>
> wrote:
>
>  I have a question on the following paper
>
> "Discretized Streams: Fault-Tolerant Streaming Computation at Scale"
>
> written by
>
> Matei Zaharia, Tathagata Das, Haoyuan Li, Timothy Hunter, Scott Shenker,
> Ion Stoica
>
> and available at
>
> http://www.cs.berkeley.edu/~matei/papers/2013/sosp_spark_streaming.pdf
>
>
>
> Specifically I'm interested in Section 3.2 on page 5 called "Timing
> Considerations".
>
> This section talks about external timestamp. For me I'm looking to use
> method 2 and correct for late records at the
>
> application level.
>
> The paper says "[application] could output a new count for time interval
> [t, t+1) at time t+5, based on the records for this interval received
> between t and t+5. This computation can be performed with an efficient
> incremental reduce operation that adds the old counts computed at t+1 to
> the counts of new records since then, avoiding wasted work."
>
>
>
> Q1:
>
> If my data comes 24 hours late on the same DStream (ie: t=t0+24hr) and I'm
> recording per minute aggregates, wouldn't the RDD with data which came 24
> hours ago be already deleted from disk by Spark? (I'd hope so otherwise it
> runs out of space)
>
>
>
> Q2:
>
> The paper talks about "incremental reduce". I'd like to know what it is. I
> do use reduce so I could get an aggregate of counts. What is this
> incremental reduce?
>
>
>
> Thanks
>
> -A
>
>
>
>
>
> --
>
> Dachuan Huang
> Cellphone: 614-390-7234
>
> 2015 Neil Avenue
>
> Ohio State University
> Columbus, Ohio
> U.S.A.
> 43210
>

RE: Q: Discretized Streams: Fault-Tolerant Streaming Computation paper

Posted by Adrian Mocanu <am...@verticalscope.com>.
That makes sense.
I wonder if any of the authors of that paper could comment.

From: dachuan [mailto:hdc1112@gmail.com]
Sent: February-19-14 3:55 PM
To: user@spark.incubator.apache.org
Subject: Re: Q: Discretized Streams: Fault-Tolerant Streaming Computation paper

It seems StreamingContext has a function:

  def remember(duration: Duration) {
    graph.remember(duration)
  }

and in my opinion, incremental reduce means:

1 2 3 4 5 6
window_size =5
sum_of_first_window = 1+2+3+4+5=15
sum_of_second_window_method_1=2+3+4+5+6=20
sum_of_second_window_method_2=15+6-1=20

On Wed, Feb 19, 2014 at 3:45 PM, Adrian Mocanu <am...@verticalscope.com>> wrote:
I have a question on the following paper
"Discretized Streams: Fault-Tolerant Streaming Computation at Scale"
written by
Matei Zaharia, Tathagata Das, Haoyuan Li, Timothy Hunter, Scott Shenker, Ion Stoica
and available at
http://www.cs.berkeley.edu/~matei/papers/2013/sosp_spark_streaming.pdf

Specifically I'm interested in Section 3.2 on page 5 called "Timing Considerations".
This section talks about external timestamp. For me I'm looking to use method 2 and correct for late records at the
application level.
The paper says "[application] could output a new count for time interval [t, t+1) at time t+5, based on the records for this interval received between t and t+5. This computation can be performed with an efficient incremental reduce operation that adds the old counts computed at t+1 to the counts of new records since then, avoiding wasted work."

Q1:
If my data comes 24 hours late on the same DStream (ie: t=t0+24hr) and I'm recording per minute aggregates, wouldn't the RDD with data which came 24 hours ago be already deleted from disk by Spark? (I'd hope so otherwise it runs out of space)

Q2:
The paper talks about "incremental reduce". I'd like to know what it is. I do use reduce so I could get an aggregate of counts. What is this incremental reduce?

Thanks
-A



--
Dachuan Huang
Cellphone: 614-390-7234
2015 Neil Avenue
Ohio State University
Columbus, Ohio
U.S.A.
43210

Re: Q: Discretized Streams: Fault-Tolerant Streaming Computation paper

Posted by dachuan <hd...@gmail.com>.
It seems StreamingContext has a function:

  def remember(duration: Duration) {
    graph.remember(duration)
  }

and in my opinion, incremental reduce means:

1 2 3 4 5 6
window_size =5
sum_of_first_window = 1+2+3+4+5=15
sum_of_second_window_method_1=2+3+4+5+6=20
sum_of_second_window_method_2=15+6-1=20


On Wed, Feb 19, 2014 at 3:45 PM, Adrian Mocanu <am...@verticalscope.com>wrote:

>  I have a question on the following paper
>
> "Discretized Streams: Fault-Tolerant Streaming Computation at Scale"
>
> written by
>
> Matei Zaharia, Tathagata Das, Haoyuan Li, Timothy Hunter, Scott Shenker,
> Ion Stoica
>
> and available at
>
> http://www.cs.berkeley.edu/~matei/papers/2013/sosp_spark_streaming.pdf
>
>
>
> Specifically I'm interested in Section 3.2 on page 5 called "Timing
> Considerations".
>
> This section talks about external timestamp. For me I'm looking to use
> method 2 and correct for late records at the
>
> application level.
>
> The paper says "[application] could output a new count for time interval
> [t, t+1) at time t+5, based on the records for this interval received
> between t and t+5. This computation can be performed with an efficient
> incremental reduce operation that adds the old counts computed at t+1 to
> the counts of new records since then, avoiding wasted work."
>
>
>
> Q1:
>
> If my data comes 24 hours late on the same DStream (ie: t=t0+24hr) and I'm
> recording per minute aggregates, wouldn't the RDD with data which came 24
> hours ago be already deleted from disk by Spark? (I'd hope so otherwise it
> runs out of space)
>
>
>
> Q2:
>
> The paper talks about "incremental reduce". I'd like to know what it is. I
> do use reduce so I could get an aggregate of counts. What is this
> incremental reduce?
>
>
>
> Thanks
>
> -A
>



-- 
Dachuan Huang
Cellphone: 614-390-7234
2015 Neil Avenue
Ohio State University
Columbus, Ohio
U.S.A.
43210