You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dario Heinisch (Jira)" <ji...@apache.org> on 2021/10/23 13:21:00 UTC
[jira] [Created] (FLINK-24623) Prevent usage of EventTimeWindows
when EventTime is disabled
Dario Heinisch created FLINK-24623:
--------------------------------------
Summary: Prevent usage of EventTimeWindows when EventTime is disabled
Key: FLINK-24623
URL: https://issues.apache.org/jira/browse/FLINK-24623
Project: Flink
Issue Type: Improvement
Components: API / DataStream
Reporter: Dario Heinisch
Having the following stream will never process values after the windowing as event time based has been disabled via the Watermark strategy:
{code:java}
public class PlaygroundJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new
Configuration()); DataStreamSource<Tuple2<Long, Integer>> source =
env.addSource(new SourceFunction<Tuple2<Long, Integer>>() {
@Override
public void run(SourceContext<Tuple2<Long, Integer>>
sourceContext) throws Exception {
int i = 0;
while (true) {
Tuple2<Long, Integer> tuple =
Tuple2.of(System.currentTimeMillis(), i++ % 10);
sourceContext.collect(tuple);
}
} @Override
public void cancel() {
} }); source.assignTimestampsAndWatermarks(
// Switch noWatermarks() to forMonotonousTimestamps()
// and values are being printed.
WatermarkStrategy.<Tuple2<Long, Integer>>noWatermarks()
.withTimestampAssigner((t, timestamp) -> t.f0)
).keyBy(t -> t.f1)
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
.process(new ProcessWindowFunction<Tuple2<Long,
Integer>, String, Integer, TimeWindow>() {
@Override
public void process(Integer key, Context context,
Iterable<Tuple2<Long, Integer>> iterable, Collector<String> out) throws
Exception {
int count = 0;
Iterator<Tuple2<Long, Integer>> iter =
iterable.iterator();
while (iter.hasNext()) {
count++;
iter.next();
} out.collect("Key: " + key + " count: " + count); }
}).print(); env.execute();
}
}{code}
The issue is that the stream makes use of _noWatermarks()_ which effectively disables any event time windowing.
As this pipeline can never process values it is faulty and Flink should throw an Exception when starting up.
--------------------
Proposed change:
We extend the interface [WatermarkStrategy|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L55] with the method _boolean isEventTime()_.
We create a new class named _EventTimeWindowPreconditions_ and add the following method to it where we make use of _isEventTime()_:
{code:java}
public static void hasPrecedingEventTimeGenerator(final List<Transformation<?>> predecessors) {
for (int i = predecessors.size() - 1; i >= 0; i--) {
final Transformation<?> pre = predecessors.get(i);
if (pre instanceof TimestampsAndWatermarksTransformation) {
TimestampsAndWatermarksTransformation<?> timestampsAndWatermarksTransformation =
(TimestampsAndWatermarksTransformation<?>) pre;
final WatermarkStrategy<?> waStrat = timestampsAndWatermarksTransformation.getWatermarkStrategy();
// assert that it generates timestamps or throw exception
if (!waStrat.isEventTime()) {
// TODO: Custom exception
throw new IllegalArgumentException(
"Cannot use an EventTime window with a preceding water mark generator which"
+ " does not ingest event times. Did you use noWatermarks() as the WatermarkStrategy"
+ " and used EventTime windows such as SlidingEventTimeWindows/SlidingEventTimeWindows ?"
+ " These windows will never window any values as your stream does not support event time"
);
}
// We have to terminate the check now as we have found the first most recent
// timestamp assigner for this window and ensured that it actually adds event
// time stamps. If there has been previously in the chain a window assigner
// such as noWatermarks() we can safely ignore it as another valid event time watermark assigner
// exists in the chain after and before our current event time window.
break;
}
}
}
{code}
Then we can update the constructors of [AllWindowedStream|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L112] and [WindowedStream|https://github.com/apache/flink/blob/2cb477343de5dce70978c0add5ec58edbaec157c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java#L79] to:
{code:java}
if (windowAssigner.isEventTime()) {
EventTimeWindowPreconditions.hasPrecedingEventTimeGenerator(input.getTransformation().getInputs());
}
{code}
This is the approach I currently have in mind but not sure whether this is the best approach.
Best regards,
Dario
--
This message was sent by Atlassian Jira
(v8.3.4#803005)