You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "NEKRASSOV, ALEXEI" <an...@att.com> on 2018/03/27 14:22:19 UTC

timeWindow emits records before window ends?

Hello,

With time characteristic set to IngestionTime I expected "timeWindow(Time.minutes(3))" to NOT produce any records in the first 3 minutes of running the job, and yet it does emit the record before 3 minutes elapse.
Am I doing something wrong? Or my understanding of timeWindow is incorrect?

For example, in Flink UI I see:

TriggerWindow(TumblingEventTimeWindows(180000), AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@7c810ef9, aggFunction=nextgen.McdrAggregator@7d7758be}, EventTimeTrigger(), WindowedStream.aggregate(WindowedStream.java:752)) -> Map

With "duration" 42s and "records sent" 689516.

I expected no records would be sent out until 180000 ms elapse.

Thanks,
Alex Nekrassov
nekrassov@att.com<ma...@att.com>


Re: timeWindow emits records before window ends?

Posted by Nico Kruber <ni...@data-artisans.com>.
Hi Alex,
If you don't set an offset for the Window, it will be aligned with epoch
[1], i.e. assume you started your program at time 00:02:18, then the
window by default starts 00:00:00 and ends 00:02:59.999 and you will
emit records 42 after you started your program.
If you need the window to count 3 minutes from any other time, then
please refer to using TumblingEventTimeWindows#of(Time size, Time offset).


Nico


[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#tumbling-windows

On 27/03/18 16:22, NEKRASSOV, ALEXEI wrote:
> Hello,
> 
>  
> 
> With time characteristic set to IngestionTime I expected
> “timeWindow(Time.minutes(3))” to NOT produce any records in the first 3
> minutes of running the job, and yet it does emit the record before 3
> minutes elapse.
> 
> Am I doing something wrong? Or my understanding of timeWindow is incorrect?
> 
>  
> 
> For example, in Flink UI I see:
> 
>  
> 
> TriggerWindow(TumblingEventTimeWindows(180000),
> AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@7c810ef9,
> aggFunction=nextgen.McdrAggregator@7d7758be}, EventTimeTrigger(),
> WindowedStream.aggregate(WindowedStream.java:752)) -> Map
> 
>  
> 
> With “duration” 42s and “records sent” 689516.
> 
>  
> 
> I expected no records would be sent out until 180000 ms elapse.
> 
>  
> 
> Thanks,
> 
> Alex Nekrassov
> 
> nekrassov@att.com <ma...@att.com>
> 
>  
> 

-- 
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen