You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Hatch M <ha...@gmail.com> on 2014/06/18 02:19:54 UTC

Issue while trying to aggregate with a sliding window

Trying to aggregate over a sliding window, playing with the slide duration.
Playing around with the slide interval I can see the aggregation works but
mostly fails with the below error. The stream has records coming in at
100ms.

JavaPairDStream<String, AggregateObject> aggregatedDStream =
pairsDStream.reduceByKeyAndWindow(aggFunc, new Duration(60000), new
Duration(600000));

14/06/18 00:14:46 INFO dstream.ShuffledDStream: Time 1403050486900 ms is
invalid as zeroTime is 1403050485800 ms and slideDuration is 60000 ms and
difference is 1100 ms
14/06/18 00:14:46 ERROR actor.OneForOneStrategy: key not found:
1403050486900 ms
java.util.NoSuchElementException: key not found: 1403050486900 ms
at scala.collection.MapLike$class.default(MapLike.scala:228)

Any hints on whats going on here?
Thanks!
Hatch

Re: Issue while trying to aggregate with a sliding window

Posted by Tathagata Das <ta...@gmail.com>.
zeroTime marks the time when the streaming job started, and the first batch
of data is from zeroTime to zeroTime + slideDuration. The validity check of
time - zeroTime) being multiple of slideDuration is to ensure that for a
given dstream, it generates RDD at the right times. For example, say the
batch size is 1 second. For a input DStream, the slideDuration will be 1
second, it should generate a RDD of input data every 1 second. However,
with window ops, the slideInterval of a dstream can be, say 2 seconds
(using window(10 seconds, 2 seconds). In that case, RDDs should be
generated every 2 seconds. This check ensures that.

The reduceByKeyAndWindow operation is a sliding window, so the RDDs
generate by the windowed DStream will contain data between (validTime -
windowDuration to validTime). Now, the way it is implemented is that it
unifies (RDD.union) the RDDs containing data from (validTime -
slideDuration to validTime), (validTime - 2 * slideDuration to validTime -
slideDuration), ..... till the trailing edge of the window (i.e. validTime
- windowDuration). Hence, it is necessary that the windowDuration is a
multiple of the slideDuration.


TD


On Wed, Jun 18, 2014 at 3:22 PM, Hatch M <ha...@gmail.com> wrote:

> Ok that patch does fix the key lookup exception. However, curious about
> the time validity check..isValidTime (
> https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L264
> )
>
> Why does (time - zerotime) have to be a multiple of slide duration ?
> Shouldn't the reduceByKeyAndWindow aggregate every record in a given
> window (zeroTime to zeroTime+windowDuration)?
>
>
> On Tue, Jun 17, 2014 at 10:55 PM, Hatch M <ha...@gmail.com> wrote:
>
>> Thanks! Will try to get the fix and retest.
>>
>>
>> On Tue, Jun 17, 2014 at 5:30 PM, onpoq l <on...@gmail.com> wrote:
>>
>>> There is a bug:
>>>
>>> https://github.com/apache/spark/pull/961#issuecomment-45125185
>>>
>>>
>>> On Tue, Jun 17, 2014 at 8:19 PM, Hatch M <ha...@gmail.com> wrote:
>>> > Trying to aggregate over a sliding window, playing with the slide
>>> duration.
>>> > Playing around with the slide interval I can see the aggregation works
>>> but
>>> > mostly fails with the below error. The stream has records coming in at
>>> > 100ms.
>>> >
>>> > JavaPairDStream<String, AggregateObject> aggregatedDStream =
>>> > pairsDStream.reduceByKeyAndWindow(aggFunc, new Duration(60000), new
>>> > Duration(600000));
>>> >
>>> > 14/06/18 00:14:46 INFO dstream.ShuffledDStream: Time 1403050486900 ms
>>> is
>>> > invalid as zeroTime is 1403050485800 ms and slideDuration is 60000 ms
>>> and
>>> > difference is 1100 ms
>>> > 14/06/18 00:14:46 ERROR actor.OneForOneStrategy: key not found:
>>> > 1403050486900 ms
>>> > java.util.NoSuchElementException: key not found: 1403050486900 ms
>>> > at scala.collection.MapLike$class.default(MapLike.scala:228)
>>> >
>>> > Any hints on whats going on here?
>>> > Thanks!
>>> > Hatch
>>> >
>>>
>>
>>
>

Re: Issue while trying to aggregate with a sliding window

Posted by Hatch M <ha...@gmail.com>.
Ok that patch does fix the key lookup exception. However, curious about the
time validity check..isValidTime (
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L264
)

Why does (time - zerotime) have to be a multiple of slide duration ?
Shouldn't the reduceByKeyAndWindow aggregate every record in a given window
(zeroTime to zeroTime+windowDuration)?


On Tue, Jun 17, 2014 at 10:55 PM, Hatch M <ha...@gmail.com> wrote:

> Thanks! Will try to get the fix and retest.
>
>
> On Tue, Jun 17, 2014 at 5:30 PM, onpoq l <on...@gmail.com> wrote:
>
>> There is a bug:
>>
>> https://github.com/apache/spark/pull/961#issuecomment-45125185
>>
>>
>> On Tue, Jun 17, 2014 at 8:19 PM, Hatch M <ha...@gmail.com> wrote:
>> > Trying to aggregate over a sliding window, playing with the slide
>> duration.
>> > Playing around with the slide interval I can see the aggregation works
>> but
>> > mostly fails with the below error. The stream has records coming in at
>> > 100ms.
>> >
>> > JavaPairDStream<String, AggregateObject> aggregatedDStream =
>> > pairsDStream.reduceByKeyAndWindow(aggFunc, new Duration(60000), new
>> > Duration(600000));
>> >
>> > 14/06/18 00:14:46 INFO dstream.ShuffledDStream: Time 1403050486900 ms is
>> > invalid as zeroTime is 1403050485800 ms and slideDuration is 60000 ms
>> and
>> > difference is 1100 ms
>> > 14/06/18 00:14:46 ERROR actor.OneForOneStrategy: key not found:
>> > 1403050486900 ms
>> > java.util.NoSuchElementException: key not found: 1403050486900 ms
>> > at scala.collection.MapLike$class.default(MapLike.scala:228)
>> >
>> > Any hints on whats going on here?
>> > Thanks!
>> > Hatch
>> >
>>
>
>

Re: Issue while trying to aggregate with a sliding window

Posted by Hatch M <ha...@gmail.com>.
Thanks! Will try to get the fix and retest.


On Tue, Jun 17, 2014 at 5:30 PM, onpoq l <on...@gmail.com> wrote:

> There is a bug:
>
> https://github.com/apache/spark/pull/961#issuecomment-45125185
>
>
> On Tue, Jun 17, 2014 at 8:19 PM, Hatch M <ha...@gmail.com> wrote:
> > Trying to aggregate over a sliding window, playing with the slide
> duration.
> > Playing around with the slide interval I can see the aggregation works
> but
> > mostly fails with the below error. The stream has records coming in at
> > 100ms.
> >
> > JavaPairDStream<String, AggregateObject> aggregatedDStream =
> > pairsDStream.reduceByKeyAndWindow(aggFunc, new Duration(60000), new
> > Duration(600000));
> >
> > 14/06/18 00:14:46 INFO dstream.ShuffledDStream: Time 1403050486900 ms is
> > invalid as zeroTime is 1403050485800 ms and slideDuration is 60000 ms and
> > difference is 1100 ms
> > 14/06/18 00:14:46 ERROR actor.OneForOneStrategy: key not found:
> > 1403050486900 ms
> > java.util.NoSuchElementException: key not found: 1403050486900 ms
> > at scala.collection.MapLike$class.default(MapLike.scala:228)
> >
> > Any hints on whats going on here?
> > Thanks!
> > Hatch
> >
>

Re: Issue while trying to aggregate with a sliding window

Posted by onpoq l <on...@gmail.com>.
There is a bug:

https://github.com/apache/spark/pull/961#issuecomment-45125185


On Tue, Jun 17, 2014 at 8:19 PM, Hatch M <ha...@gmail.com> wrote:
> Trying to aggregate over a sliding window, playing with the slide duration.
> Playing around with the slide interval I can see the aggregation works but
> mostly fails with the below error. The stream has records coming in at
> 100ms.
>
> JavaPairDStream<String, AggregateObject> aggregatedDStream =
> pairsDStream.reduceByKeyAndWindow(aggFunc, new Duration(60000), new
> Duration(600000));
>
> 14/06/18 00:14:46 INFO dstream.ShuffledDStream: Time 1403050486900 ms is
> invalid as zeroTime is 1403050485800 ms and slideDuration is 60000 ms and
> difference is 1100 ms
> 14/06/18 00:14:46 ERROR actor.OneForOneStrategy: key not found:
> 1403050486900 ms
> java.util.NoSuchElementException: key not found: 1403050486900 ms
> at scala.collection.MapLike$class.default(MapLike.scala:228)
>
> Any hints on whats going on here?
> Thanks!
> Hatch
>