You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Till Rohrmann (Jira)" <ji...@apache.org> on 2021/10/24 21:37:00 UTC
[jira] [Commented] (FLINK-24623) Prevent usage of EventTimeWindows
when EventTime is disabled
[ https://issues.apache.org/jira/browse/FLINK-24623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17433513#comment-17433513 ]
Till Rohrmann commented on FLINK-24623:
---------------------------------------
Thanks for creating this ticket [~Dario]. cc [~arvid].
> Prevent usage of EventTimeWindows when EventTime is disabled
> ------------------------------------------------------------
>
> Key: FLINK-24623
> URL: https://issues.apache.org/jira/browse/FLINK-24623
> Project: Flink
> Issue Type: Improvement
> Components: API / DataStream
> Reporter: Dario Heinisch
> Priority: Not a Priority
> Original Estimate: 24h
> Remaining Estimate: 24h
>
> Having the following stream will never process values after the windowing as event time based has been disabled via the Watermark strategy:
> {code:java}
> public class PlaygroundJob {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new
> Configuration()); DataStreamSource<Tuple2<Long, Integer>> source =
> env.addSource(new SourceFunction<Tuple2<Long, Integer>>() {
> @Override
> public void run(SourceContext<Tuple2<Long, Integer>>
> sourceContext) throws Exception {
> int i = 0;
> while (true) {
> Tuple2<Long, Integer> tuple =
> Tuple2.of(System.currentTimeMillis(), i++ % 10);
> sourceContext.collect(tuple);
> }
> } @Override
> public void cancel() {
> } }); source.assignTimestampsAndWatermarks(
> // Switch noWatermarks() to forMonotonousTimestamps()
> // and values are being printed.
> WatermarkStrategy.<Tuple2<Long, Integer>>noWatermarks()
> .withTimestampAssigner((t, timestamp) -> t.f0)
> ).keyBy(t -> t.f1)
> .window(TumblingEventTimeWindows.of(Time.seconds(1)))
> .process(new ProcessWindowFunction<Tuple2<Long,
> Integer>, String, Integer, TimeWindow>() {
> @Override
> public void process(Integer key, Context context,
> Iterable<Tuple2<Long, Integer>> iterable, Collector<String> out) throws
> Exception {
> int count = 0;
> Iterator<Tuple2<Long, Integer>> iter =
> iterable.iterator();
> while (iter.hasNext()) {
> count++;
> iter.next();
> } out.collect("Key: " + key + " count: " + count); }
> }).print(); env.execute();
> }
> }{code}
>
> The issue is that the stream makes use of _noWatermarks()_ which effectively disables any event time windowing.
> As this pipeline can never process values it is faulty and Flink should throw an Exception when starting up.
>
> --------------------
> Proposed change:
> We extend the interface [WatermarkStrategy|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L55] with the method _boolean isEventTime()_.
> We create a new class named _EventTimeWindowPreconditions_ and add the following method to it where we make use of _isEventTime()_:
>
> {code:java}
> public static void hasPrecedingEventTimeGenerator(final List<Transformation<?>> predecessors) {
> for (int i = predecessors.size() - 1; i >= 0; i--) {
> final Transformation<?> pre = predecessors.get(i);
> if (pre instanceof TimestampsAndWatermarksTransformation) {
> TimestampsAndWatermarksTransformation<?> timestampsAndWatermarksTransformation =
> (TimestampsAndWatermarksTransformation<?>) pre;
> final WatermarkStrategy<?> waStrat = timestampsAndWatermarksTransformation.getWatermarkStrategy();
> // assert that it generates timestamps or throw exception
> if (!waStrat.isEventTime()) {
> // TODO: Custom exception
> throw new IllegalArgumentException(
> "Cannot use an EventTime window with a preceding water mark generator which"
> + " does not ingest event times. Did you use noWatermarks() as the WatermarkStrategy"
> + " and used EventTime windows such as SlidingEventTimeWindows/SlidingEventTimeWindows ?"
> + " These windows will never window any values as your stream does not support event time"
> );
> }
> // We have to terminate the check now as we have found the first most recent
> // timestamp assigner for this window and ensured that it actually adds event
> // time stamps. If there has been previously in the chain a window assigner
> // such as noWatermarks() we can safely ignore it as another valid event time watermark assigner
> // exists in the chain after and before our current event time window.
> break;
> }
> }
> }
> {code}
>
> Then we can update the constructors of [AllWindowedStream|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L112] and [WindowedStream|https://github.com/apache/flink/blob/2cb477343de5dce70978c0add5ec58edbaec157c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java#L79] to:
> {code:java}
> if (windowAssigner.isEventTime()) {
> EventTimeWindowPreconditions.hasPrecedingEventTimeGenerator(input.getTransformation().getInputs());
> }
> {code}
> This is the approach I currently have in mind but not sure whether this is the best approach.
> Best regards,
> Dario
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)