You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sendoh <un...@gmail.com> on 2017/05/24 17:00:29 UTC

Last event in event time window is not output

Hi Flink users,

We have a unit test to test event time window aggregation, but when the job
finishes, the last event is not output because the Flink job finishes before
the watermark proceeds, as there is no next event.

Does anyone have similar issue and have a solution?

The code is like:
env.fromElements(TestData.events("2017-05-20T19:34:17.097Z", "997"),
                TestData.events("2017-05-20T20:34:17.097Z", "998"),
                TestData.events("2017-05-20T20:38:17.097Z", "999"));


DataStream<JsonNode> testResult = source.assignTimestampsAndWatermarks(new
EventWatermark())
                .keyBy(new KeyByID())
                .window(TumblingEventTimeWindows.of(Time.minutes(1)))
                .trigger(PurgingTrigger.of(EventTimeTrigger103.create()))
                .allowedLateness(Time.minutes(Long.MAX_VALUE))
                .fold(null, new AggFoldFunction());

        Iterator<JsonNode> javaObj = DataStreamUtils.collect(testResult);

        int count = 0;
        while (javaObj.hasNext()) {
            JsonNode current = javaObj.next();
            System.out.println(current);
            count++;
        }
        Assert.assertEquals(3, count);

The watermark is simply as:
public class EventWatermark implements
AssignerWithPeriodicWatermarks<JsonNode> {

    private final long maxTimeLag = 5000;

    private long currentMaxTimestamp;
    public transient static DateTimeFormatter parseFromTimeFormatter =
ISODateTimeFormat.dateTimeParser();

    @Override
    public long extractTimestamp(JsonNode element, long
previousElementTimestamp) {
        long occurredAtLong;
        try {
            occurredAtLong =
DateTime.parse(element.get("metadata").get("occurred_at").asText(),
parseFromTimeFormatter).getMillis();
        }
        catch(IllegalArgumentException ie) {
            throw new IllegalArgumentException(element.asText());
        }

        if(occurredAtLong > currentMaxTimestamp){
            currentMaxTimestamp = occurredAtLong;
            }
        return occurredAtLong;
    }

    @Override
    public Watermark getCurrentWatermark() {

        return new Watermark(currentMaxTimestamp - maxTimeLag);

    }
}

Best,

Sendoh



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Last-event-in-event-time-window-is-not-output-tp13305.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Last event in event time window is not output

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

All sources emit a Long.MAX_VALUE watermark when they shut down.

What is the expected output and what is the output that you actually get?

Best,
Aljoscha

> On 27. May 2017, at 00:01, Fabian Hueske <fh...@gmail.com> wrote:
> 
> Hi, 
> 
> the problem might be that your source does not send a watermark this timestamp MAX_LONG after the last record has been sent.
> So your operators never compute the last window.
> 
> Best, Fabian
> 
> 2017-05-24 19:00 GMT+02:00 Sendoh <unicorn.banachi@gmail.com <ma...@gmail.com>>:
> Hi Flink users,
> 
> We have a unit test to test event time window aggregation, but when the job
> finishes, the last event is not output because the Flink job finishes before
> the watermark proceeds, as there is no next event.
> 
> Does anyone have similar issue and have a solution?
> 
> The code is like:
> env.fromElements(TestData.events("2017-05-20T19:34:17.097Z", "997"),
>                 TestData.events("2017-05-20T20:34:17.097Z", "998"),
>                 TestData.events("2017-05-20T20:38:17.097Z", "999"));
> 
> 
> DataStream<JsonNode> testResult = source.assignTimestampsAndWatermarks(new
> EventWatermark())
>                 .keyBy(new KeyByID())
>                 .window(TumblingEventTimeWindows.of(Time.minutes(1)))
>                 .trigger(PurgingTrigger.of(EventTimeTrigger103.create()))
>                 .allowedLateness(Time.minutes(Long.MAX_VALUE))
>                 .fold(null, new AggFoldFunction());
> 
>         Iterator<JsonNode> javaObj = DataStreamUtils.collect(testResult);
> 
>         int count = 0;
>         while (javaObj.hasNext()) {
>             JsonNode current = javaObj.next();
>             System.out.println(current);
>             count++;
>         }
>         Assert.assertEquals(3, count);
> 
> The watermark is simply as:
> public class EventWatermark implements
> AssignerWithPeriodicWatermarks<JsonNode> {
> 
>     private final long maxTimeLag = 5000;
> 
>     private long currentMaxTimestamp;
>     public transient static DateTimeFormatter parseFromTimeFormatter =
> ISODateTimeFormat.dateTimeParser();
> 
>     @Override
>     public long extractTimestamp(JsonNode element, long
> previousElementTimestamp) {
>         long occurredAtLong;
>         try {
>             occurredAtLong =
> DateTime.parse(element.get("metadata").get("occurred_at").asText(),
> parseFromTimeFormatter).getMillis();
>         }
>         catch(IllegalArgumentException ie) {
>             throw new IllegalArgumentException(element.asText());
>         }
> 
>         if(occurredAtLong > currentMaxTimestamp){
>             currentMaxTimestamp = occurredAtLong;
>             }
>         return occurredAtLong;
>     }
> 
>     @Override
>     public Watermark getCurrentWatermark() {
> 
>         return new Watermark(currentMaxTimestamp - maxTimeLag);
> 
>     }
> }
> 
> Best,
> 
> Sendoh
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Last-event-in-event-time-window-is-not-output-tp13305.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Last-event-in-event-time-window-is-not-output-tp13305.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
> 


Re: Last event in event time window is not output

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

the problem might be that your source does not send a watermark this
timestamp MAX_LONG after the last record has been sent.
So your operators never compute the last window.

Best, Fabian

2017-05-24 19:00 GMT+02:00 Sendoh <un...@gmail.com>:

> Hi Flink users,
>
> We have a unit test to test event time window aggregation, but when the job
> finishes, the last event is not output because the Flink job finishes
> before
> the watermark proceeds, as there is no next event.
>
> Does anyone have similar issue and have a solution?
>
> The code is like:
> env.fromElements(TestData.events("2017-05-20T19:34:17.097Z", "997"),
>                 TestData.events("2017-05-20T20:34:17.097Z", "998"),
>                 TestData.events("2017-05-20T20:38:17.097Z", "999"));
>
>
> DataStream<JsonNode> testResult = source.assignTimestampsAndWatermarks(new
> EventWatermark())
>                 .keyBy(new KeyByID())
>                 .window(TumblingEventTimeWindows.of(Time.minutes(1)))
>                 .trigger(PurgingTrigger.of(EventTimeTrigger103.create()))
>                 .allowedLateness(Time.minutes(Long.MAX_VALUE))
>                 .fold(null, new AggFoldFunction());
>
>         Iterator<JsonNode> javaObj = DataStreamUtils.collect(testResult);
>
>         int count = 0;
>         while (javaObj.hasNext()) {
>             JsonNode current = javaObj.next();
>             System.out.println(current);
>             count++;
>         }
>         Assert.assertEquals(3, count);
>
> The watermark is simply as:
> public class EventWatermark implements
> AssignerWithPeriodicWatermarks<JsonNode> {
>
>     private final long maxTimeLag = 5000;
>
>     private long currentMaxTimestamp;
>     public transient static DateTimeFormatter parseFromTimeFormatter =
> ISODateTimeFormat.dateTimeParser();
>
>     @Override
>     public long extractTimestamp(JsonNode element, long
> previousElementTimestamp) {
>         long occurredAtLong;
>         try {
>             occurredAtLong =
> DateTime.parse(element.get("metadata").get("occurred_at").asText(),
> parseFromTimeFormatter).getMillis();
>         }
>         catch(IllegalArgumentException ie) {
>             throw new IllegalArgumentException(element.asText());
>         }
>
>         if(occurredAtLong > currentMaxTimestamp){
>             currentMaxTimestamp = occurredAtLong;
>             }
>         return occurredAtLong;
>     }
>
>     @Override
>     public Watermark getCurrentWatermark() {
>
>         return new Watermark(currentMaxTimestamp - maxTimeLag);
>
>     }
> }
>
> Best,
>
> Sendoh
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Last-event-in-
> event-time-window-is-not-output-tp13305.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>