You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Till Rohrmann (Jira)" <ji...@apache.org> on 2021/10/24 21:37:00 UTC

[jira] [Commented] (FLINK-24623) Prevent usage of EventTimeWindows when EventTime is disabled

    [ https://issues.apache.org/jira/browse/FLINK-24623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17433513#comment-17433513 ] 

Till Rohrmann commented on FLINK-24623:
---------------------------------------

Thanks for creating this ticket [~Dario]. cc [~arvid].

> Prevent usage of EventTimeWindows when EventTime is disabled
> ------------------------------------------------------------
>
>                 Key: FLINK-24623
>                 URL: https://issues.apache.org/jira/browse/FLINK-24623
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>            Reporter: Dario Heinisch
>            Priority: Not a Priority
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Having the following stream will never process values after the windowing as event time based has been disabled via the Watermark strategy:
> {code:java}
> public class PlaygroundJob {
>      public static void main(String[] args) throws Exception {
>          StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new
> Configuration());         DataStreamSource<Tuple2<Long, Integer>> source =
> env.addSource(new SourceFunction<Tuple2<Long, Integer>>() {
>              @Override
>              public void run(SourceContext<Tuple2<Long, Integer>>
> sourceContext) throws Exception {
>                  int i = 0;
>                  while (true) {
>                      Tuple2<Long, Integer> tuple =
> Tuple2.of(System.currentTimeMillis(), i++ % 10);
>                      sourceContext.collect(tuple);
>                  }
>              }             @Override
>              public void cancel() {
>              }         });         source.assignTimestampsAndWatermarks(
>                  // Switch noWatermarks() to forMonotonousTimestamps()
>                  // and values are being printed.
>                  WatermarkStrategy.<Tuple2<Long, Integer>>noWatermarks()
>                          .withTimestampAssigner((t, timestamp) -> t.f0)
>                  ).keyBy(t -> t.f1)
> .window(TumblingEventTimeWindows.of(Time.seconds(1)))
>                  .process(new ProcessWindowFunction<Tuple2<Long,
> Integer>, String, Integer, TimeWindow>() {
>                      @Override
>                      public void process(Integer key, Context context,
> Iterable<Tuple2<Long, Integer>> iterable, Collector<String> out) throws
> Exception {
>                          int count = 0;
>                          Iterator<Tuple2<Long, Integer>> iter =
> iterable.iterator();
>                          while (iter.hasNext()) {
>                              count++;
>                              iter.next();
>                          }                         out.collect("Key: " + key + " count: " + count);                     }
>                  }).print();         env.execute();
>      }
> }{code}
>  
> The issue is that the stream makes use of _noWatermarks()_ which effectively disables any event time windowing. 
> As this pipeline can never process values it is faulty and Flink should throw an Exception when starting up. 
>  
> --------------------
> Proposed change:
> We extend the interface [WatermarkStrategy|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L55] with the method _boolean isEventTime()_.
> We create a new class named _EventTimeWindowPreconditions_ and add the following method to it where we make use of _isEventTime()_:
>  
> {code:java}
> public static void hasPrecedingEventTimeGenerator(final List<Transformation<?>> predecessors) {
>     for (int i = predecessors.size() - 1; i >= 0; i--) {
>         final Transformation<?> pre = predecessors.get(i);
>         if (pre instanceof TimestampsAndWatermarksTransformation) {
>             TimestampsAndWatermarksTransformation<?> timestampsAndWatermarksTransformation =
>                     (TimestampsAndWatermarksTransformation<?>) pre;
>             final WatermarkStrategy<?> waStrat = timestampsAndWatermarksTransformation.getWatermarkStrategy();
>             // assert that it generates timestamps or throw exception
>             if (!waStrat.isEventTime()) {
>                 // TODO: Custom exception
>                 throw new IllegalArgumentException(
>                         "Cannot use an EventTime window with a preceding water mark generator which"
>                                 + " does not ingest event times. Did you use noWatermarks() as the WatermarkStrategy"
>                                 + " and used EventTime windows such as SlidingEventTimeWindows/SlidingEventTimeWindows ?"
>                                 + " These windows will never window any values as your stream does not support event time"
>                 );
>             }
>             // We have to terminate the check now as we have found the first most recent
>             // timestamp assigner for this window and ensured that it actually adds event
>             // time stamps. If there has been previously in the chain a window assigner
>             // such as noWatermarks() we can safely ignore it as another valid event time watermark assigner
>             // exists in the chain after and before our current event time window.
>             break;
>         }
>     }
> }
> {code}
>  
> Then we can update the constructors of [AllWindowedStream|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L112] and [WindowedStream|https://github.com/apache/flink/blob/2cb477343de5dce70978c0add5ec58edbaec157c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java#L79] to:
> {code:java}
> if (windowAssigner.isEventTime()) {
>    EventTimeWindowPreconditions.hasPrecedingEventTimeGenerator(input.getTransformation().getInputs());
> }
> {code}
> This is the approach I currently have in mind but not sure whether this is the best approach. 
> Best regards, 
> Dario
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)