You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yu Yang <yu...@gmail.com> on 2020/06/09 21:58:30 UTC

Tumbling window with timestamp out-of-range events

Hi,


We implement a flink application that uses TumblingWindow, and uses even
time as time characteristics. In the TumblingWindow's process function, we
has the implementation below that checks whether the event's timestamp is
in the tumbling window's timestamp range.  We expected that all events
shall be in the range. However, the application reports events with
out-of-range timestamps.  Any insights on how this happens?


@Override
public void process(EventStreamPartitionKey key,
                  Context context, Iterable<Event> elements,
Collector<EventResult> out) {

for(Event event : elements) {
    if ( event.getTimestamp() >= context.window().getEnd() ||
   event.getTimestamp() < context.window().getStart() )

    System.out.println("NOT in RANGE: " + context.window().getStart()

        + ", " + event.getTimestamp() + ", " + context.window().getEnd());
...

}
out.collect(res);
}


Thanks!


Regards,

-Yu

Re: Tumbling window with timestamp out-of-range events

Posted by Yu Yang <yu...@gmail.com>.
Please ignore this message. The issue was that a different timestamp
extractor was used when the kafka source was setup. That caused the issue.

On Tue, Jun 9, 2020 at 2:58 PM Yu Yang <yu...@gmail.com> wrote:

> Hi,
>
>
> We implement a flink application that uses TumblingWindow, and uses even
> time as time characteristics. In the TumblingWindow's process function, we
> has the implementation below that checks whether the event's timestamp is
> in the tumbling window's timestamp range.  We expected that all events
> shall be in the range. However, the application reports events with
> out-of-range timestamps.  Any insights on how this happens?
>
>
> @Override
> public void process(EventStreamPartitionKey key,
>                   Context context, Iterable<Event> elements,
> Collector<EventResult> out) {
>
> for(Event event : elements) {
>     if ( event.getTimestamp() >= context.window().getEnd() ||
>    event.getTimestamp() < context.window().getStart() )
>
>     System.out.println("NOT in RANGE: " + context.window().getStart()
>
>         + ", " + event.getTimestamp() + ", " + context.window().getEnd());
> ...
>
> }
> out.collect(res);
> }
>
>
> Thanks!
>
>
> Regards,
>
> -Yu
>