You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Adrian Mocanu <am...@verticalscope.com> on 2014/03/24 21:12:41 UTC

[bug?] streaming window unexpected behaviour

I have what I would call unexpected behaviour when using window on a stream.
I have 2 windowed streams with a 5s batch interval. One window stream is (5s,5s)=smallWindow and the other (10s,5s)=bigWindow
What I've noticed is that the 1st RDD produced by bigWindow is incorrect and is of the size 5s not 10s. So instead of waiting 10s and producing 1 RDD with size 10s, Spark produced the 1st 10s RDD of size 5s.
Why is this happening? To me it looks like a bug; Matei or TD can you verify that this is correct behaviour?


I have the following code
val ssc = new StreamingContext(conf, Seconds(5))

val smallWindowStream = ssc.queueStream(smallWindowRddQueue)
val bigWindowStream = ssc.queueStream(bigWindowRddQueue)

val smallWindow = smallWindowReshapedStream.window(Seconds(5), Seconds(5))
      .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3))
val bigWindow = bigWindowReshapedStream.window(Seconds(10), Seconds(5))
        .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3))

-Adrian


Re: [bug?] streaming window unexpected behaviour

Posted by Tathagata Das <ta...@gmail.com>.
You can probably do it in a simpler but sort of hacky way!

If your window size is W and sliding interval S, you can do some math to
figure out how many of the first windows are actually partial windows. Its
probably math.ceil(W/S) . So in a windowDStream.foreachRDD() you can
increment a global counter to count how many RDDs have been generated and
ignore the first few RDDs.

windowDStream.foreachRDD(rdd => {
    Global.counter += 1
    if (Global.counter < math.ceil(W/S)) {
      return  // ignore
    } else {
         // do something awesome
    }
})


On Tue, Mar 25, 2014 at 7:29 AM, Adrian Mocanu <am...@verticalscope.com>wrote:

> Let me rephrase that,
> Do you think it is possible to use an accumulator to skip the first few
> incomplete RDDs?
>
> -----Original Message-----
> From: Adrian Mocanu [mailto:amocanu@verticalscope.com]
> Sent: March-25-14 9:57 AM
> To: user@spark.apache.org
> Cc: user@spark.incubator.apache.org
> Subject: RE: [bug?] streaming window unexpected behaviour
>
> Thanks TD!
> Is it possible to perhaps add another window method that doesn't not
> generate partial windows? Or, Is it possible to remove the first few
> partial windows? I'm thinking of using an accumulator to count how many
> windows there are.
>
> -A
>
> -----Original Message-----
> From: Tathagata Das [mailto:tathagata.das1565@gmail.com]
> Sent: March-24-14 6:55 PM
> To: user@spark.apache.org
> Cc: user@spark.incubator.apache.org
> Subject: Re: [bug?] streaming window unexpected behaviour
>
> Yes, I believe that is current behavior. Essentially, the first few RDDs
> will be partial windows (assuming window duration > sliding interval).
>
> TD
>
>
> On Mon, Mar 24, 2014 at 1:12 PM, Adrian Mocanu <am...@verticalscope.com>
> wrote:
> > I have what I would call unexpected behaviour when using window on a
> stream.
> >
> > I have 2 windowed streams with a 5s batch interval. One window stream
> > is (5s,5s)=smallWindow and the other (10s,5s)=bigWindow
> >
> > What I've noticed is that the 1st RDD produced by bigWindow is
> > incorrect and is of the size 5s not 10s. So instead of waiting 10s and
> > producing 1 RDD with size 10s, Spark produced the 1st 10s RDD of size 5s.
> >
> > Why is this happening? To me it looks like a bug; Matei or TD can you
> > verify that this is correct behaviour?
> >
> >
> >
> >
> >
> > I have the following code
> >
> > val ssc = new StreamingContext(conf, Seconds(5))
> >
> >
> >
> > val smallWindowStream = ssc.queueStream(smallWindowRddQueue)
> >
> > val bigWindowStream = ssc.queueStream(bigWindowRddQueue)
> >
> >
> >
> > val smallWindow = smallWindowReshapedStream.window(Seconds(5),
> > Seconds(5))
> >
> >       .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3))
> >
> > val bigWindow = bigWindowReshapedStream.window(Seconds(10),
> > Seconds(5))
> >
> >         .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3))
> >
> >
> >
> > -Adrian
> >
> >
>

RE: [bug?] streaming window unexpected behaviour

Posted by Adrian Mocanu <am...@verticalscope.com>.
Let me rephrase that,
Do you think it is possible to use an accumulator to skip the first few incomplete RDDs?

-----Original Message-----
From: Adrian Mocanu [mailto:amocanu@verticalscope.com] 
Sent: March-25-14 9:57 AM
To: user@spark.apache.org
Cc: user@spark.incubator.apache.org
Subject: RE: [bug?] streaming window unexpected behaviour

Thanks TD!
Is it possible to perhaps add another window method that doesn't not generate partial windows? Or, Is it possible to remove the first few partial windows? I'm thinking of using an accumulator to count how many windows there are.

-A

-----Original Message-----
From: Tathagata Das [mailto:tathagata.das1565@gmail.com]
Sent: March-24-14 6:55 PM
To: user@spark.apache.org
Cc: user@spark.incubator.apache.org
Subject: Re: [bug?] streaming window unexpected behaviour

Yes, I believe that is current behavior. Essentially, the first few RDDs will be partial windows (assuming window duration > sliding interval).

TD


On Mon, Mar 24, 2014 at 1:12 PM, Adrian Mocanu <am...@verticalscope.com> wrote:
> I have what I would call unexpected behaviour when using window on a stream.
>
> I have 2 windowed streams with a 5s batch interval. One window stream 
> is (5s,5s)=smallWindow and the other (10s,5s)=bigWindow
>
> What I've noticed is that the 1st RDD produced by bigWindow is 
> incorrect and is of the size 5s not 10s. So instead of waiting 10s and 
> producing 1 RDD with size 10s, Spark produced the 1st 10s RDD of size 5s.
>
> Why is this happening? To me it looks like a bug; Matei or TD can you 
> verify that this is correct behaviour?
>
>
>
>
>
> I have the following code
>
> val ssc = new StreamingContext(conf, Seconds(5))
>
>
>
> val smallWindowStream = ssc.queueStream(smallWindowRddQueue)
>
> val bigWindowStream = ssc.queueStream(bigWindowRddQueue)
>
>
>
> val smallWindow = smallWindowReshapedStream.window(Seconds(5),
> Seconds(5))
>
>       .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3))
>
> val bigWindow = bigWindowReshapedStream.window(Seconds(10),
> Seconds(5))
>
>         .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3))
>
>
>
> -Adrian
>
>

RE: [bug?] streaming window unexpected behaviour

Posted by Adrian Mocanu <am...@verticalscope.com>.
Thanks TD!
Is it possible to perhaps add another window method that doesn't not generate partial windows? Or, Is it possible to remove the first few partial windows? I'm thinking of using an accumulator to count how many windows there are.

-A

-----Original Message-----
From: Tathagata Das [mailto:tathagata.das1565@gmail.com] 
Sent: March-24-14 6:55 PM
To: user@spark.apache.org
Cc: user@spark.incubator.apache.org
Subject: Re: [bug?] streaming window unexpected behaviour

Yes, I believe that is current behavior. Essentially, the first few RDDs will be partial windows (assuming window duration > sliding interval).

TD


On Mon, Mar 24, 2014 at 1:12 PM, Adrian Mocanu <am...@verticalscope.com> wrote:
> I have what I would call unexpected behaviour when using window on a stream.
>
> I have 2 windowed streams with a 5s batch interval. One window stream 
> is (5s,5s)=smallWindow and the other (10s,5s)=bigWindow
>
> What I've noticed is that the 1st RDD produced by bigWindow is 
> incorrect and is of the size 5s not 10s. So instead of waiting 10s and 
> producing 1 RDD with size 10s, Spark produced the 1st 10s RDD of size 5s.
>
> Why is this happening? To me it looks like a bug; Matei or TD can you 
> verify that this is correct behaviour?
>
>
>
>
>
> I have the following code
>
> val ssc = new StreamingContext(conf, Seconds(5))
>
>
>
> val smallWindowStream = ssc.queueStream(smallWindowRddQueue)
>
> val bigWindowStream = ssc.queueStream(bigWindowRddQueue)
>
>
>
> val smallWindow = smallWindowReshapedStream.window(Seconds(5), 
> Seconds(5))
>
>       .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3))
>
> val bigWindow = bigWindowReshapedStream.window(Seconds(10), 
> Seconds(5))
>
>         .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3))
>
>
>
> -Adrian
>
>

Re: [bug?] streaming window unexpected behaviour

Posted by Tathagata Das <ta...@gmail.com>.
Yes, I believe that is current behavior. Essentially, the first few
RDDs will be partial windows (assuming window duration > sliding
interval).

TD


On Mon, Mar 24, 2014 at 1:12 PM, Adrian Mocanu
<am...@verticalscope.com> wrote:
> I have what I would call unexpected behaviour when using window on a stream.
>
> I have 2 windowed streams with a 5s batch interval. One window stream is
> (5s,5s)=smallWindow and the other (10s,5s)=bigWindow
>
> What I've noticed is that the 1st RDD produced by bigWindow is incorrect and
> is of the size 5s not 10s. So instead of waiting 10s and producing 1 RDD
> with size 10s, Spark produced the 1st 10s RDD of size 5s.
>
> Why is this happening? To me it looks like a bug; Matei or TD can you verify
> that this is correct behaviour?
>
>
>
>
>
> I have the following code
>
> val ssc = new StreamingContext(conf, Seconds(5))
>
>
>
> val smallWindowStream = ssc.queueStream(smallWindowRddQueue)
>
> val bigWindowStream = ssc.queueStream(bigWindowRddQueue)
>
>
>
> val smallWindow = smallWindowReshapedStream.window(Seconds(5), Seconds(5))
>
>       .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3))
>
> val bigWindow = bigWindowReshapedStream.window(Seconds(10), Seconds(5))
>
>         .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3))
>
>
>
> -Adrian
>
>