You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Piotr Nowojski (Jira)" <ji...@apache.org> on 2021/06/16 10:11:00 UTC
[jira] [Created] (FLINK-23011) FLIP-27 sources are generating
non-deterministic results when using event time
Piotr Nowojski created FLINK-23011:
--------------------------------------
Summary: FLIP-27 sources are generating non-deterministic results when using event time
Key: FLINK-23011
URL: https://issues.apache.org/jira/browse/FLINK-23011
Project: Flink
Issue Type: New Feature
Components: API / DataStream
Affects Versions: 1.12.4, 1.13.1, 1.14.0
Environment:
Reporter: Piotr Nowojski
FLIP-27 sources currently start in the {{StreamStatus.IDLE}} state and they switch to {{ACTIVE}} only after emitting first {{Watermark}}. Until this happens, downstream operators are ignoring {{IDLE}} inputs from calculating the input (min) watermark.
An extreme example to what problem this leads to, are completely bogus results if for example one FLIP-27 source subtask is slower than others for some reason:
{code:java}
env.getConfig().setAutoWatermarkInterval(2000);
env.setParallelism(2);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 10));
DataStream<Long> eventStream =
env.fromSource(
new NumberSequenceSource(0, Long.MAX_VALUE),
WatermarkStrategy.<Long>forMonotonousTimestamps()
.withTimestampAssigner(new LongTimestampAssigner()),
"NumberSequenceSource")
.map(
new RichMapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
Thread.sleep(1);
}
return 1L;
}
});
eventStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).sum(0).print();
(...)
private static class LongTimestampAssigner implements SerializableTimestampAssigner<Long> {
private long counter = 0;
@Override
public long extractTimestamp(Long record, long recordTimeStamp) {
return counter++;
}
}
{code}
In such case, after 2 seconds ({{setAutoWatermarkInterval}}) the not throttled subtask (subTaskId == 1) generates very high watermarks. The other source subtask (subTaskId == 0) emits very low watermarks. If the non throttled watermark reaches the downstream {{WindowOperator}} first, while the other input channel is still idle, it will take those high watermarks as combined input watermark for the the whole {{WindowOperator}}. When the input channel from the throttled source subtask finally receives it's {{ACTIVE}} status and a much lower watermark, that's already too late.
Actual output of the example program:
{noformat}
1596
2000
1000
1000
1000
1000
1000
1000
(...)
{noformat}
while the expected output should be always "2000" (2000 records fitting in every 1 second global window)
{noformat}
2000
2000
2000
2000
(...)
{noformat}.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)