You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Simon Su (JIRA)" <ji...@apache.org> on 2019/07/30 08:49:00 UTC
[jira] [Created] (FLINK-13492) BoundedOutOfOrderTimestamps cause
Watermark's timestamp leak
Simon Su created FLINK-13492:
--------------------------------
Summary: BoundedOutOfOrderTimestamps cause Watermark's timestamp leak
Key: FLINK-13492
URL: https://issues.apache.org/jira/browse/FLINK-13492
Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 1.9.0
Reporter: Simon Su
Attachments: Watermark_timestamp_leak.diff
{code:java}
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, conf);
// Use eventtime, default autoWatermarkInterval is 200ms
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Kafka kafka = new Kafka()
.version("0.11")
.topic(topic)
.startFromLatest()
.properties(properties);
Schema schema = new Schema();
for (int i = 0; i < names.length; i++) {
if ("timestamp".equalsIgnoreCase(names[i])) {
// set latency to 1000ms
schema.field("rowtime", types[i]).rowtime(new Rowtime().timestampsFromField("timestamp").watermarksPeriodicBounded(1000)); }
else {
schema.field(names[i], types[i]);
}
/** ..... */
tableEnv
.connect(kafka)
.withFormat(new Protobuf().protobufName("order_sink"))
.withSchema(schema)
.inAppendMode()
.registerTableSource("orderStream");{code}
Register up stream table, then use a 10s Tumble window on this table, we input a sequence of normal data, but there is not result output.
Then we start to debug to see if the watermark is normally emitted, finally we found the issue.
# maxTimestamp will be initialized in BoundedOutOfOrderTimestamps to Long.MIN_VALUE.
# nextTimestamp method will extract timestamp from source and set to maxTimestamp.
# getWatermark() method will calculate the watermark's timestamp based on maxTimestamp and delay.
When +{color:#205081}TimestampsAndPeriodicWatermarksOperator{color}+ {color:#333333}initialize and call open method, it will start to register a SystemTimeService to generate watermark based on watermarkInterval, so that's the problem, the thread initialize and call BoundedOutOfOrderTimestamps${color}getCurrentWatermark, it will cause a Long Value leak. {color:#d04437}(Long.MIN_VALUE - delay). which cause all of the watermark will be dropped because apparently there are less then ( Long.MIN_VALUE - delay ).
{color}
{color:#d04437}A workaround is to set a large autoWatermarkInterval to make SystemTimeService Thread a long start delay.{color}
{code:java}
public void onProcessingTime(long timestamp) throws Exception {
...
getProcessingTimeService().registerTimer(now + watermarkInterval, this);
...
}
{code}
{code:java}
public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) {
...
long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1;
...
}
{code}
{color:#d04437} {color}
{color:#d04437}Actually, I think we can fix it by add the delay in BoundedOutOfOrderTimestamps's constructor which can avoid the calculation leak ...{color}
{color:#d04437} {color}
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)