You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Dominik Safaric <do...@gmail.com> on 2017/03/16 19:34:49 UTC

Streaming 2.1.0 - window vs. batch duration

Hi all,

As I’ve implemented a streaming application pulling data from Kafka every 1 second (batch interval), I am observing some quite strange behaviour (didn’t use Spark extensively in the past, but continuous operator based engines instead of). 

Namely the dstream.window(Seconds(60)) windowed stream when written back to Kafka contains more messages then they were consumed (for debugging purposes using a small dataset of a million Kafka byte array deserialized messages). In particular, in total I’ve streamed exactly 1 million messages, whereas upon window expiry 60 million messages are written back to Kafka. 

I’ve read on the official docs that both the window and window slide duration must be multiples of the batch interval. Does this mean that when consuming messages between two windows every batch interval the RDDs of a given batch interval t the same batch is being ingested 59 more times into the windowed stream? 

If I would like to achieve this behaviour (batch every being equal to a second, window duration 60 seconds) - how might one achieve this? 

I would appreciate if anyone could correct me if I got the internals of Spark’s windowed operations wrong and elaborate a bit. 

Thanks,
Dominik

Re: Streaming 2.1.0 - window vs. batch duration

Posted by Michael Armbrust <mi...@databricks.com>.
Have you considered trying event time aggregation in structured streaming
instead?

On Thu, Mar 16, 2017 at 12:34 PM, Dominik Safaric <do...@gmail.com>
wrote:

> Hi all,
>
> As I’ve implemented a streaming application pulling data from Kafka every
> 1 second (batch interval), I am observing some quite strange behaviour
> (didn’t use Spark extensively in the past, but continuous operator based
> engines instead of).
>
> Namely the dstream.window(Seconds(60)) windowed stream when written back
> to Kafka contains more messages then they were consumed (for debugging
> purposes using a small dataset of a million Kafka byte array deserialized
> messages). In particular, in total I’ve streamed exactly 1 million
> messages, whereas upon window expiry 60 million messages are written back
> to Kafka.
>
> I’ve read on the official docs that both the window and window slide
> duration must be multiples of the batch interval. Does this mean that when
> consuming messages between two windows every batch interval the RDDs of a
> given batch interval *t* the same batch is being ingested 59 more times
> into the windowed stream?
>
> If I would like to achieve this behaviour (batch every being equal to a
> second, window duration 60 seconds) - how might one achieve this?
>
> I would appreciate if anyone could correct me if I got the internals of
> Spark’s windowed operations wrong and elaborate a bit.
>
> Thanks,
> Dominik
>

Re: Streaming 2.1.0 - window vs. batch duration

Posted by Dominik Safaric <do...@gmail.com>.
Correct - that is the part that I understood nicely. 

However, what alternative transformation might I apply to iterate through the RDDs considering a window duration of 60 seconds which I cannot change?  

> On 17 Mar 2017, at 16:57, Cody Koeninger <co...@koeninger.org> wrote:
> 
> Probably easier if you show some more code, but if you just call
> dstream.window(Seconds(60))
> you didn't specify a slide duration, so it's going to default to your
> batch duration of 1 second.
> So yeah, if you're just using e.g. foreachRDD to output every message
> in the window, every second it's going to output the last 60 seconds
> of messages... which does mean each message will be output a total of
> 60 times.
> 
> Using a smaller window of 5 seconds for an example, 1 message per
> second, notice that message 1489765610 will be output a total of 5
> times
> 
> Window:
> 1489765605
> 1489765606
> 1489765607
> 1489765608
> 1489765609
> Window:
> 1489765606
> 1489765607
> 1489765608
> 1489765609
> 1489765610
> Window:
> 1489765607
> 1489765608
> 1489765609
> 1489765610
> 1489765611
> Window:
> 1489765608
> 1489765609
> 1489765610
> 1489765611
> 1489765612
> Window:
> 1489765609
> 1489765610
> 1489765611
> 1489765612
> 1489765613
> Window:
> 1489765610
> 1489765611
> 1489765612
> 1489765613
> 1489765614
> Window:
> 1489765611
> 1489765612
> 1489765613
> 1489765614
> 1489765615
> 
> On Thu, Mar 16, 2017 at 2:34 PM, Dominik Safaric
> <do...@gmail.com> wrote:
>> Hi all,
>> 
>> As I’ve implemented a streaming application pulling data from Kafka every 1
>> second (batch interval), I am observing some quite strange behaviour (didn’t
>> use Spark extensively in the past, but continuous operator based engines
>> instead of).
>> 
>> Namely the dstream.window(Seconds(60)) windowed stream when written back to
>> Kafka contains more messages then they were consumed (for debugging purposes
>> using a small dataset of a million Kafka byte array deserialized messages).
>> In particular, in total I’ve streamed exactly 1 million messages, whereas
>> upon window expiry 60 million messages are written back to Kafka.
>> 
>> I’ve read on the official docs that both the window and window slide
>> duration must be multiples of the batch interval. Does this mean that when
>> consuming messages between two windows every batch interval the RDDs of a
>> given batch interval t the same batch is being ingested 59 more times into
>> the windowed stream?
>> 
>> If I would like to achieve this behaviour (batch every being equal to a
>> second, window duration 60 seconds) - how might one achieve this?
>> 
>> I would appreciate if anyone could correct me if I got the internals of
>> Spark’s windowed operations wrong and elaborate a bit.
>> 
>> Thanks,
>> Dominik


---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Streaming 2.1.0 - window vs. batch duration

Posted by Dominik Safaric <do...@gmail.com>.
If I were to set the window duration to 60 seconds, while having a batch interval equal to a second, and a slide duration of 59 seconds I would get the desired behaviour.

However, would the Receiver pull messages from Kafka only at the 59th second slide interval or it would constantly pull the messages throughout the entire window duration of 60 seconds? 

Thanks,
Dominik

> On 17 Mar 2017, at 16:57, Cody Koeninger <co...@koeninger.org> wrote:
> 
> Probably easier if you show some more code, but if you just call
> dstream.window(Seconds(60))
> you didn't specify a slide duration, so it's going to default to your
> batch duration of 1 second.
> So yeah, if you're just using e.g. foreachRDD to output every message
> in the window, every second it's going to output the last 60 seconds
> of messages... which does mean each message will be output a total of
> 60 times.
> 
> Using a smaller window of 5 seconds for an example, 1 message per
> second, notice that message 1489765610 will be output a total of 5
> times
> 
> Window:
> 1489765605
> 1489765606
> 1489765607
> 1489765608
> 1489765609
> Window:
> 1489765606
> 1489765607
> 1489765608
> 1489765609
> 1489765610
> Window:
> 1489765607
> 1489765608
> 1489765609
> 1489765610
> 1489765611
> Window:
> 1489765608
> 1489765609
> 1489765610
> 1489765611
> 1489765612
> Window:
> 1489765609
> 1489765610
> 1489765611
> 1489765612
> 1489765613
> Window:
> 1489765610
> 1489765611
> 1489765612
> 1489765613
> 1489765614
> Window:
> 1489765611
> 1489765612
> 1489765613
> 1489765614
> 1489765615
> 
> On Thu, Mar 16, 2017 at 2:34 PM, Dominik Safaric
> <do...@gmail.com> wrote:
>> Hi all,
>> 
>> As I’ve implemented a streaming application pulling data from Kafka every 1
>> second (batch interval), I am observing some quite strange behaviour (didn’t
>> use Spark extensively in the past, but continuous operator based engines
>> instead of).
>> 
>> Namely the dstream.window(Seconds(60)) windowed stream when written back to
>> Kafka contains more messages then they were consumed (for debugging purposes
>> using a small dataset of a million Kafka byte array deserialized messages).
>> In particular, in total I’ve streamed exactly 1 million messages, whereas
>> upon window expiry 60 million messages are written back to Kafka.
>> 
>> I’ve read on the official docs that both the window and window slide
>> duration must be multiples of the batch interval. Does this mean that when
>> consuming messages between two windows every batch interval the RDDs of a
>> given batch interval t the same batch is being ingested 59 more times into
>> the windowed stream?
>> 
>> If I would like to achieve this behaviour (batch every being equal to a
>> second, window duration 60 seconds) - how might one achieve this?
>> 
>> I would appreciate if anyone could correct me if I got the internals of
>> Spark’s windowed operations wrong and elaborate a bit.
>> 
>> Thanks,
>> Dominik


---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Streaming 2.1.0 - window vs. batch duration

Posted by Cody Koeninger <co...@koeninger.org>.
Probably easier if you show some more code, but if you just call
dstream.window(Seconds(60))
you didn't specify a slide duration, so it's going to default to your
batch duration of 1 second.
So yeah, if you're just using e.g. foreachRDD to output every message
in the window, every second it's going to output the last 60 seconds
of messages... which does mean each message will be output a total of
60 times.

Using a smaller window of 5 seconds for an example, 1 message per
second, notice that message 1489765610 will be output a total of 5
times

Window:
1489765605
1489765606
1489765607
1489765608
1489765609
Window:
1489765606
1489765607
1489765608
1489765609
1489765610
Window:
1489765607
1489765608
1489765609
1489765610
1489765611
Window:
1489765608
1489765609
1489765610
1489765611
1489765612
Window:
1489765609
1489765610
1489765611
1489765612
1489765613
Window:
1489765610
1489765611
1489765612
1489765613
1489765614
Window:
1489765611
1489765612
1489765613
1489765614
1489765615

On Thu, Mar 16, 2017 at 2:34 PM, Dominik Safaric
<do...@gmail.com> wrote:
> Hi all,
>
> As I’ve implemented a streaming application pulling data from Kafka every 1
> second (batch interval), I am observing some quite strange behaviour (didn’t
> use Spark extensively in the past, but continuous operator based engines
> instead of).
>
> Namely the dstream.window(Seconds(60)) windowed stream when written back to
> Kafka contains more messages then they were consumed (for debugging purposes
> using a small dataset of a million Kafka byte array deserialized messages).
> In particular, in total I’ve streamed exactly 1 million messages, whereas
> upon window expiry 60 million messages are written back to Kafka.
>
> I’ve read on the official docs that both the window and window slide
> duration must be multiples of the batch interval. Does this mean that when
> consuming messages between two windows every batch interval the RDDs of a
> given batch interval t the same batch is being ingested 59 more times into
> the windowed stream?
>
> If I would like to achieve this behaviour (batch every being equal to a
> second, window duration 60 seconds) - how might one achieve this?
>
> I would appreciate if anyone could correct me if I got the internals of
> Spark’s windowed operations wrong and elaborate a bit.
>
> Thanks,
> Dominik

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org