You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Alan Ngai <al...@opsclarity.com> on 2014/07/23 03:01:59 UTC

streaming window not behaving as advertised (v1.0.1)

I have a sample application pumping out records 1 per second.  The batch interval is set to 5 seconds.  Here’s a list of “observed window intervals” vs what was actually set

window=25, slide=25 : observed-window=25, overlapped-batches=0
window=25, slide=20 : observed-window=20, overlapped-batches=0
window=25, slide=15 : observed-window=15, overlapped-batches=0
window=25, slide=10 : observed-window=20, overlapped-batches=2
window=25, slide=5 : observed-window=25, overlapped-batches=3

can someone explain this behavior to me?  I’m trying to aggregate metrics by time batches, but want to skip partial batches.  Therefore, I’m trying to find a combination which results in 1 overlapped batch, but no combination I tried gets me there.  

Alan


Re: streaming window not behaving as advertised (v1.0.1)

Posted by Tathagata Das <ta...@gmail.com>.
1. udpateStateByKey should be called on all keys even if there is not data
corresponding to that key. There is a unit test for that.
https://github.com/apache/spark/blob/master/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala#L337

2. I am increasing the priority for this. Off the top of my head, this is
easy to fix, but hard to test reliably test in a unit test. Will fix it
soon after Spark 1.1 release.

TD


On Fri, Aug 1, 2014 at 7:37 AM, RodrigoB <ro...@aspect.com> wrote:

> Hi TD,
>
> I've also been fighting this issue only to find the exact same solution you
> are suggesting.
> Too bad I didn't find either the post or the issue sooner.
>
> I'm using a 1 second batch with N amount of kafka events (1 to 1 with the
> state objects) per batch and only calling the updatestatebykey function.
>
> This is my interpretation, please correct me if needed:
> Because of Spark’s lazy computation the RDDs weren’t being updated as
> expected on the batch interval execution. The assumption was that as long
> as
> I have a streaming batch run (with or without new messages), I should get
> updated RDDs, which was not happening. We only get updateStateByKey calls
> for objects which got events or that are forced through an output function
> to compute. I did not make further test to confirm this, but that's the
> given impression.
>
> This doesn't fit our requirements as we want to do duration updates based
> on
> the batch interval execution...so I had to force the computation of all the
> objects through the ForeachRDD function.
>
> I will also appreciate if the priority can be increased to the issue. I
> assume the ForeachRDD is additional unnecessary resource allocation
> (although I'm not sure how much) as opposite to doing it somehow by default
> on batch interval execution.
>
> tnks,
> Rod
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/streaming-window-not-behaving-as-advertised-v1-0-1-tp10453p11168.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Re: streaming window not behaving as advertised (v1.0.1)

Posted by RodrigoB <ro...@aspect.com>.
Hi TD,

I've also been fighting this issue only to find the exact same solution you
are suggesting. 
Too bad I didn't find either the post or the issue sooner.

I'm using a 1 second batch with N amount of kafka events (1 to 1 with the
state objects) per batch and only calling the updatestatebykey function.

This is my interpretation, please correct me if needed:
Because of Spark’s lazy computation the RDDs weren’t being updated as
expected on the batch interval execution. The assumption was that as long as
I have a streaming batch run (with or without new messages), I should get
updated RDDs, which was not happening. We only get updateStateByKey calls
for objects which got events or that are forced through an output function
to compute. I did not make further test to confirm this, but that's the
given impression.

This doesn't fit our requirements as we want to do duration updates based on
the batch interval execution...so I had to force the computation of all the
objects through the ForeachRDD function.

I will also appreciate if the priority can be increased to the issue. I
assume the ForeachRDD is additional unnecessary resource allocation
(although I'm not sure how much) as opposite to doing it somehow by default
on batch interval execution. 

tnks,
Rod



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/streaming-window-not-behaving-as-advertised-v1-0-1-tp10453p11168.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: streaming window not behaving as advertised (v1.0.1)

Posted by Venkat Subramanian <vs...@gmail.com>.
TD,

We are seeing the same issue. We struggled through this until we found this
post and the work around.
A quick fix in the Spark Streaming software will help a lot for others who
are encountering this and pulling their hair out on why RDD on some
partitions are not computed (we ended up spending weeks trying to figure out
what is happening here and trying out different things).

This issue has been around from 0.9 till date (1.01) at least.

Thanks,

Venkat



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/streaming-window-not-behaving-as-advertised-v1-0-1-tp10453p11163.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: streaming window not behaving as advertised (v1.0.1)

Posted by Tathagata Das <ta...@gmail.com>.
Yeah, maybe I should bump the issue to major. Now that I thought about
to give my previous answer, this should be easy to fix just by doing a
foreachRDD on all the input streams within the system (rather than
explicitly doing it like I asked you to do).

Thanks Alan, for testing this out and confirming that this was the
same issue. I was worried that this is a totally new issue that we did
not know of.

TD

On Wed, Jul 23, 2014 at 12:37 AM, Alan Ngai <al...@opsclarity.com> wrote:
> TD, it looks like your instincts were correct.  I misunderstood what you
> meant.  If I force an eval on the inputstream using foreachRDD, the
> windowing will work correctly.  If I don’t do that, lazy eval somehow screws
> with window batches I eventually receive.  Any reason the bug is categorized
> as minor?  It seems that anyone who uses the windowing functionality would
> run into this bug.  I imagine this would include anyone who wants to use
> spark streaming to aggregate data in fixed time batches, which seems like a
> fairly common use case.
>
> Alan
>
>
>
> On Jul 22, 2014, at 11:30 PM, Alan Ngai <al...@opsclarity.com> wrote:
>
> foreachRDD is how I extracted values in the first place, so that’s not going
> to make a difference.  I don’t think it’s related to SPARK-1312 because I’m
> generating data every second in the first place and I’m using foreachRDD
> right after the window operation.  The code looks something like
>
> val batchInterval = 5
> val windowInterval = 25
> val slideInterval = 15
>
> val windowedStream = inputStream.window(Seconds(windowInterval),
> Seconds(slideInterval))
>
> val outputFunc = (r: RDD[MetricEvent], t: Time) => {
>   println("========================================
> %s".format(t.milliseconds / 1000))
>   r.foreach{metric =>
>     val timeKey = metric.timeStamp / batchInterval * batchInterval
>     println("%s %s %s %s".format(timeKey, metric.timeStamp, metric.name,
> metric.value))
>   }
> }
> testWindow.foreachRDD(outputFunc)
>
> On Jul 22, 2014, at 10:13 PM, Tathagata Das <ta...@gmail.com>
> wrote:
>
> It could be related to this bug that is currently open.
> https://issues.apache.org/jira/browse/SPARK-1312
>
> Here is a workaround. Can you put a inputStream.foreachRDD(rdd => { }) and
> try these combos again?
>
> TD
>
>
> On Tue, Jul 22, 2014 at 6:01 PM, Alan Ngai <al...@opsclarity.com> wrote:
>>
>> I have a sample application pumping out records 1 per second.  The batch
>> interval is set to 5 seconds.  Here’s a list of “observed window intervals”
>> vs what was actually set
>>
>> window=25, slide=25 : observed-window=25, overlapped-batches=0
>> window=25, slide=20 : observed-window=20, overlapped-batches=0
>> window=25, slide=15 : observed-window=15, overlapped-batches=0
>> window=25, slide=10 : observed-window=20, overlapped-batches=2
>> window=25, slide=5 : observed-window=25, overlapped-batches=3
>>
>> can someone explain this behavior to me?  I’m trying to aggregate metrics
>> by time batches, but want to skip partial batches.  Therefore, I’m trying to
>> find a combination which results in 1 overlapped batch, but no combination I
>> tried gets me there.
>>
>> Alan
>>
>
>
>

Re: streaming window not behaving as advertised (v1.0.1)

Posted by Alan Ngai <al...@opsclarity.com>.
TD, it looks like your instincts were correct.  I misunderstood what you meant.  If I force an eval on the inputstream using foreachRDD, the windowing will work correctly.  If I don’t do that, lazy eval somehow screws with window batches I eventually receive.  Any reason the bug is categorized as minor?  It seems that anyone who uses the windowing functionality would run into this bug.  I imagine this would include anyone who wants to use spark streaming to aggregate data in fixed time batches, which seems like a fairly common use case.

Alan


On Jul 22, 2014, at 11:30 PM, Alan Ngai <al...@opsclarity.com> wrote:

> foreachRDD is how I extracted values in the first place, so that’s not going to make a difference.  I don’t think it’s related to SPARK-1312 because I’m generating data every second in the first place and I’m using foreachRDD right after the window operation.  The code looks something like
> 
> val batchInterval = 5
> val windowInterval = 25
> val slideInterval = 15
> 
> val windowedStream = inputStream.window(Seconds(windowInterval), Seconds(slideInterval))
> 
> val outputFunc = (r: RDD[MetricEvent], t: Time) => {
>   println("======================================== %s".format(t.milliseconds / 1000))
>   r.foreach{metric =>
>     val timeKey = metric.timeStamp / batchInterval * batchInterval
>     println("%s %s %s %s".format(timeKey, metric.timeStamp, metric.name, metric.value))
>   }
> }
> testWindow.foreachRDD(outputFunc)
> 
> On Jul 22, 2014, at 10:13 PM, Tathagata Das <ta...@gmail.com> wrote:
> 
>> It could be related to this bug that is currently open. 
>> https://issues.apache.org/jira/browse/SPARK-1312
>> 
>> Here is a workaround. Can you put a inputStream.foreachRDD(rdd => { }) and try these combos again?
>> 
>> TD
>> 
>> 
>> On Tue, Jul 22, 2014 at 6:01 PM, Alan Ngai <al...@opsclarity.com> wrote:
>> I have a sample application pumping out records 1 per second.  The batch interval is set to 5 seconds.  Here’s a list of “observed window intervals” vs what was actually set
>> 
>> window=25, slide=25 : observed-window=25, overlapped-batches=0
>> window=25, slide=20 : observed-window=20, overlapped-batches=0
>> window=25, slide=15 : observed-window=15, overlapped-batches=0
>> window=25, slide=10 : observed-window=20, overlapped-batches=2
>> window=25, slide=5 : observed-window=25, overlapped-batches=3
>> 
>> can someone explain this behavior to me?  I’m trying to aggregate metrics by time batches, but want to skip partial batches.  Therefore, I’m trying to find a combination which results in 1 overlapped batch, but no combination I tried gets me there.
>> 
>> Alan
>> 
>> 
> 


Re: streaming window not behaving as advertised (v1.0.1)

Posted by Alan Ngai <al...@opsclarity.com>.
foreachRDD is how I extracted values in the first place, so that’s not going to make a difference.  I don’t think it’s related to SPARK-1312 because I’m generating data every second in the first place and I’m using foreachRDD right after the window operation.  The code looks something like

val batchInterval = 5
val windowInterval = 25
val slideInterval = 15

val windowedStream = inputStream.window(Seconds(windowInterval), Seconds(slideInterval))

val outputFunc = (r: RDD[MetricEvent], t: Time) => {
  println("======================================== %s".format(t.milliseconds / 1000))
  r.foreach{metric =>
    val timeKey = metric.timeStamp / batchInterval * batchInterval
    println("%s %s %s %s".format(timeKey, metric.timeStamp, metric.name, metric.value))
  }
}
testWindow.foreachRDD(outputFunc)

On Jul 22, 2014, at 10:13 PM, Tathagata Das <ta...@gmail.com> wrote:

> It could be related to this bug that is currently open. 
> https://issues.apache.org/jira/browse/SPARK-1312
> 
> Here is a workaround. Can you put a inputStream.foreachRDD(rdd => { }) and try these combos again?
> 
> TD
> 
> 
> On Tue, Jul 22, 2014 at 6:01 PM, Alan Ngai <al...@opsclarity.com> wrote:
> I have a sample application pumping out records 1 per second.  The batch interval is set to 5 seconds.  Here’s a list of “observed window intervals” vs what was actually set
> 
> window=25, slide=25 : observed-window=25, overlapped-batches=0
> window=25, slide=20 : observed-window=20, overlapped-batches=0
> window=25, slide=15 : observed-window=15, overlapped-batches=0
> window=25, slide=10 : observed-window=20, overlapped-batches=2
> window=25, slide=5 : observed-window=25, overlapped-batches=3
> 
> can someone explain this behavior to me?  I’m trying to aggregate metrics by time batches, but want to skip partial batches.  Therefore, I’m trying to find a combination which results in 1 overlapped batch, but no combination I tried gets me there.
> 
> Alan
> 
> 


Re: streaming window not behaving as advertised (v1.0.1)

Posted by Tathagata Das <ta...@gmail.com>.
It could be related to this bug that is currently open.
https://issues.apache.org/jira/browse/SPARK-1312

Here is a workaround. Can you put a inputStream.foreachRDD(rdd => { }) and
try these combos again?

TD


On Tue, Jul 22, 2014 at 6:01 PM, Alan Ngai <al...@opsclarity.com> wrote:

> I have a sample application pumping out records 1 per second.  The batch
> interval is set to 5 seconds.  Here’s a list of “observed window intervals”
> vs what was actually set
>
> window=25, slide=25 : observed-window=25, overlapped-batches=0
> window=25, slide=20 : observed-window=20, overlapped-batches=0
> window=25, slide=15 : observed-window=15, overlapped-batches=0
> window=25, slide=10 : observed-window=20, overlapped-batches=2
> window=25, slide=5 : observed-window=25, overlapped-batches=3
>
> can someone explain this behavior to me?  I’m trying to aggregate metrics
> by time batches, but want to skip partial batches.  Therefore, I’m trying
> to find a combination which results in 1 overlapped batch, but no
> combination I tried gets me there.
>
> Alan
>
>