You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dario Heinisch (Jira)" <ji...@apache.org> on 2021/10/23 13:21:00 UTC

[jira] [Created] (FLINK-24623) Prevent usage of EventTimeWindows when EventTime is disabled

Dario Heinisch created FLINK-24623:
--------------------------------------

             Summary: Prevent usage of EventTimeWindows when EventTime is disabled
                 Key: FLINK-24623
                 URL: https://issues.apache.org/jira/browse/FLINK-24623
             Project: Flink
          Issue Type: Improvement
          Components: API / DataStream
            Reporter: Dario Heinisch


Having the following stream will never process values after the windowing as event time based has been disabled via the Watermark strategy:


{code:java}
public class PlaygroundJob {
     public static void main(String[] args) throws Exception {
         StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new
Configuration());         DataStreamSource<Tuple2<Long, Integer>> source =
env.addSource(new SourceFunction<Tuple2<Long, Integer>>() {
             @Override
             public void run(SourceContext<Tuple2<Long, Integer>>
sourceContext) throws Exception {
                 int i = 0;
                 while (true) {
                     Tuple2<Long, Integer> tuple =
Tuple2.of(System.currentTimeMillis(), i++ % 10);
                     sourceContext.collect(tuple);
                 }
             }             @Override
             public void cancel() {
             }         });         source.assignTimestampsAndWatermarks(
                 // Switch noWatermarks() to forMonotonousTimestamps()
                 // and values are being printed.
                 WatermarkStrategy.<Tuple2<Long, Integer>>noWatermarks()
                         .withTimestampAssigner((t, timestamp) -> t.f0)
                 ).keyBy(t -> t.f1)
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
                 .process(new ProcessWindowFunction<Tuple2<Long,
Integer>, String, Integer, TimeWindow>() {
                     @Override
                     public void process(Integer key, Context context,
Iterable<Tuple2<Long, Integer>> iterable, Collector<String> out) throws
Exception {
                         int count = 0;
                         Iterator<Tuple2<Long, Integer>> iter =
iterable.iterator();
                         while (iter.hasNext()) {
                             count++;
                             iter.next();
                         }                         out.collect("Key: " + key + " count: " + count);                     }
                 }).print();         env.execute();
     }
}{code}
 

The issue is that the stream makes use of _noWatermarks()_ which effectively disables any event time windowing. 

As this pipeline can never process values it is faulty and Flink should throw an Exception when starting up. 

 

--------------------

Proposed change:

We extend the interface [WatermarkStrategy|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L55] with the method _boolean isEventTime()_.

We create a new class named _EventTimeWindowPreconditions_ and add the following method to it where we make use of _isEventTime()_:

 
{code:java}
public static void hasPrecedingEventTimeGenerator(final List<Transformation<?>> predecessors) {
    for (int i = predecessors.size() - 1; i >= 0; i--) {
        final Transformation<?> pre = predecessors.get(i);
        if (pre instanceof TimestampsAndWatermarksTransformation) {

            TimestampsAndWatermarksTransformation<?> timestampsAndWatermarksTransformation =
                    (TimestampsAndWatermarksTransformation<?>) pre;

            final WatermarkStrategy<?> waStrat = timestampsAndWatermarksTransformation.getWatermarkStrategy();

            // assert that it generates timestamps or throw exception
            if (!waStrat.isEventTime()) {
                // TODO: Custom exception
                throw new IllegalArgumentException(
                        "Cannot use an EventTime window with a preceding water mark generator which"
                                + " does not ingest event times. Did you use noWatermarks() as the WatermarkStrategy"
                                + " and used EventTime windows such as SlidingEventTimeWindows/SlidingEventTimeWindows ?"
                                + " These windows will never window any values as your stream does not support event time"
                );
            }

            // We have to terminate the check now as we have found the first most recent
            // timestamp assigner for this window and ensured that it actually adds event
            // time stamps. If there has been previously in the chain a window assigner
            // such as noWatermarks() we can safely ignore it as another valid event time watermark assigner
            // exists in the chain after and before our current event time window.
            break;

        }
    }
}
{code}
 

Then we can update the constructors of [AllWindowedStream|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L112] and [WindowedStream|https://github.com/apache/flink/blob/2cb477343de5dce70978c0add5ec58edbaec157c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java#L79] to:
{code:java}
if (windowAssigner.isEventTime()) {
   EventTimeWindowPreconditions.hasPrecedingEventTimeGenerator(input.getTransformation().getInputs());
}
{code}
This is the approach I currently have in mind but not sure whether this is the best approach. 

Best regards, 

Dario

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)