You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dario Heinisch <da...@gmail.com> on 2022/01/28 15:00:13 UTC

Flink test late elements

Hey there,

Hope everyone is well!

I have a question:
```
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);

         DataStream<Integer> dataStream = env.addSource(new CustomSource());

         OutputTag<Integer> outputTag = new OutputTag<Integer>("late") {};

         WatermarkStrategy<Integer> waStrat = WatermarkStrategy
                 .<Integer>forMonotonousTimestamps()
                 .withTimestampAssigner((i, timestamp) -> Long.valueOf(i));

         SingleOutputStreamOperator<Integer> windowOperator =
                 dataStream
                         .assignTimestampsAndWatermarks(waStrat)
.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(1)))
                         .sideOutputLateData(outputTag)
                         .apply(
                                 new AllWindowFunction<Integer, Integer, 
TimeWindow>() {
                                     @Override
                                     public void apply(TimeWindow 
window, Iterable<Integer> values, Collector<Integer> out) {
System.out.println(window.getStart() + " -> " + window.getEnd());
                                         for (Integer val : values) {
                                             System.out.println(val + " 
in window ");
                                         }
                                     }
                                 });

         windowOperator
                 .getSideOutput(outputTag)
                 .flatMap(
                         new FlatMapFunction<Integer, String>() {
                             @Override
                             public void flatMap(Integer value, 
Collector<String> out) {
                                 System.out.println("LATE: " + value);
                             }
                         });

         env.execute();
```

And my custom source:
```
static class CustomSource implements SourceFunction<Integer> {

         @Override
         public void run(SourceContext<Integer> ctx) throws Exception {
             int i = 0;
             while (i++ < 10) {
                 Thread.sleep(100);
                 if (i < 5) {
                     ctx.collect(i * 10);
                 } else {
                     ctx.collect(i);
                 }

             }
         }

         @Override
         public void cancel() {

         }
     }
```

Now in the source if I leave `Thread.sleep(100)` in there I get the 
following output:
```
10 -> 11
10 in window
20 -> 21
20 in window
LATE: 5
30 -> 31
30 in window
LATE: 6
LATE: 7
LATE: 8
LATE: 9
LATE: 10
40 -> 41
40 in window
```

If I comment out the sleep, I get:
```
5 -> 6
5 in window
6 -> 7
6 in window
7 -> 8
7 in window
8 -> 9
8 in window
9 -> 10
9 in window
10 -> 11
10 in window
10 in window
20 -> 21
20 in window
30 -> 31
30 in window
40 -> 41
40 in window
```

I thought by using `forMonotonousTimestamps` it would automatically drop 
the late events from the window and emit them to the side output since 
the first 4 elements will have a higher timestamp than the last 5 but it 
seems
it evaluates all elements emitted by the source at once and then emit 
the watermark after one batch of events by the source has been processed.

I assume there would be a better approach then to sleep in between 
collecting in the source for getting the expected result?

( The background here is that I want to write tests for some event time 
processing and assert that the SideOutput will be used for late events 
and these are being
processed correctly. In production the Kafka source will not always emit 
data continuously, hence I want to test this but I have not found 
anything better than sleeping, maybe that is the correct approach?)

Best regards,

Dario