You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by jianjianjianjianjianjianjianjian <72...@qq.com.INVALID> on 2022/03/03 12:02:46 UTC
开启水位线时,消费kafka存量数据与消费kafka实时发生数据输出结果不一致
老师们,你们好:
在使用水位线时,发现消费kafka存量数据与消费kafka实时发生数据输出结果不一致。在消费实时数据时,水位会随着每条数据的进入对应发生变化;而消费具有存量数据的kafka时,水位会根据该批存量数据的最后一条数据生成水位,而我们在代码中Debug时,生成水位的结果又与实时消费数据一致。
且在查看源代码代码时,AbstractFetcher.java 中 emitRecordsWithTimestamps 方法中的逻辑按理存量数据也是会和实时数据相同,for循环每条数据并生成水位,故发邮件咨询,当前情况是否为水位逻辑设计初衷?
源代码 AbstractFetcher 类的 emitRecordsWithTimestamps 方法如下:
protected void emitRecordsWithTimestamps(
Queue<T> records,
KafkaTopicPartitionState<T, KPH> partitionState,
long offset,
long kafkaEventTimestamp) {
// emit the records, using the checkpoint lock to guarantee
// atomicity of record emission and offset state update
synchronized (checkpointLock) {
T record;
while ((record = records.poll()) != null) {
long timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp);
sourceContext.collectWithTimestamp(record, timestamp);
// this might emit a watermark, so do it after emitting the record
partitionState.onEvent(record, timestamp);
}
partitionState.setOffset(offset);
}
}
验证过程如下:
前提
kafka单分区,,0延迟,5s窗口大小
CREATE TABLE kafka_watermark (
`f1` INTEGER,
`f2` TIMESTAMP(3),
WATERMARK FOR f2 AS f2 - INTERVAL '0' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'test_watermark',
'properties.group.id' = 'groupId',
'properties.max.poll.records' = '10',
'properties.max.poll.interval.ms' = '1000',
'properties.fetch.max.bytes' = '52428800',
'properties.max.partition.fetch.bytes' = '1048576',
'scan.startup.mode' = 'earliest-offset',
'properties.enable.auto.commit' = 'false',
'format' = 'json',
'json.ignore-parse-errors' = 'false',
'json.timestamp-format.standard' = 'SQL',
'json.map-null-key.mode' = 'FAIL'
);
SELECT
f1,
f2
FROM kafka_watermark
GROUP BY
TUMBLE(f2, INTERVAL '5' SECOND),
f1,f2
实时数据
启动流并插入数据
{"f1":1,"f2":"2022-02-28 21:00:01.000"}
当前水位为1,不输出数据(纳入0-5窗口数据为1)
现象:无数据输出
{"f1":4,"f2":"2022-02-28 21:00:04.000"}
当前水位为4,不输出数据(纳入0-5窗口数据为1、4)
现象:无数据输出
{"f1":5,"f2":"2022-02-28 21:00:05.000"}
当前水位为5,>=5,故触发0-5窗口关闭,输出该窗口数据1、4(纳入5-10窗口数据为5)
现象:输出数据1、4
{"f1":2,"f2":"2022-02-28 21:00:02.000"}
当前水位仍为5,0延迟,且0-5已窗口关闭,当前2丢弃,不输出数据(纳入5-10窗口数据为5)
现象:无新数据输出
{"f1":3,"f2":"2022-02-28 21:00:03.000"}
当前水位仍为5,0延迟,且0-5已窗口关闭,当前3丢弃,不输出数据(纳入5-10窗口数据为5)
现象:无新数据输出
{"f1":7,"f2":"2022-02-28 21:00:07.000"}
当前水位为7,不输出数据(纳入5-10窗口数据为5、7)
现象:无新数据输出
{"f1":6,"f2":"2022-02-28 21:00:06.000"}
当前水位仍为7,不输出数据(纳入5-10窗口数据为5、7、6)
现象:无新数据输出
{"f1":10,"f2":"2022-02-28 21:00:10.000"}
当前水位为10,>=10,故触发5-10窗口关闭,输出该窗口数据5、7、6(纳入10-15窗口数据为10)
现象:输出新数据5、7、6,当前所有数据1、4、5、7、6
存量批数据
多例验证过程:
例1
存量批数据如下
{"f1":1,"f2":"2022-02-28 21:00:01.000"}
{"f1":4,"f2":"2022-02-28 21:00:04.000"}
{"f1":5,"f2":"2022-02-28 21:00:05.000"}
{"f1":2,"f2":"2022-02-28 21:00:02.000"}
{"f1":23,"f2":"2022-02-28 21:00:02.000"}
{"f1":2,"f2":"2022-02-28 21:00:02.000"}
{"f1":3,"f2":"2022-02-28 21:00:03.000"}
启动流,当前水位为3,不输出数据(纳入0-5窗口数据为1、4、2、23、3;纳入5-10窗口数据为5)
现象:无数据输出
插入新数据
{"f1":6,"f2":"2022-02-28 21:00:06.000"}
水位为6,当前水位 >=5,故触发0-5窗口关闭,输出该窗口数据1、4、2、23、3(纳入5-10窗口数据为6、5)
现象:输出数据1、4、2、23、3
{"f1":1,"f2":"2022-02-28 21:00:01.000"}
{"f1":4,"f2":"2022-02-28 21:00:04.000"}
{"f1":2,"f2":"2022-02-28 21:00:02.000"}
{"f1":23,"f2":"2022-02-28 21:00:02.000"}
{"f1":3,"f2":"2022-02-28 21:00:03.000"}
插入新数据
{"f1":12,"f2":"2022-02-28 21:00:12.000"}
水位为12,当前水位 >=10,故触发5-10窗口关闭,输出该窗口数据6、5(纳入10-15窗口数据为12)
现象:输出新数据6、5,当前所有数据1、4、2、23、3、6、5
{"f1":1,"f2":"2022-02-28 21:00:01.000"}
{"f1":4,"f2":"2022-02-28 21:00:04.000"}
{"f1":2,"f2":"2022-02-28 21:00:02.000"}
{"f1":23,"f2":"2022-02-28 21:00:02.000"}
{"f1":3,"f2":"2022-02-28 21:00:03.000"}
{"f1":6,"f2":"2022-02-28 21:00:06.000"}
{"f1":5,"f2":"2022-02-28 21:00:05.000"}
插入新数据
例2
存量批数据如下
{"f1":1,"f2":"2022-02-28 21:00:01.000"}
{"f1":4,"f2":"2022-02-28 21:00:04.000"}
{"f1":5,"f2":"2022-02-28 21:00:05.000"}
{"f1":3,"f2":"2022-02-28 21:00:03.000"}
{"f1":5,"f2":"2022-02-28 21:00:05.000"}
{"f1":10,"f2":"2022-02-28 21:00:10.000"}
{"f1":9,"f2":"2022-02-28 21:00:09.000"}
{"f1":6,"f2":"2022-02-28 21:00:06.000"}
启动流,当前水位为6,>=5,故触发0-5窗口关闭,输出该窗口数据1、4、3(纳入5-10窗口数据为9、6、5;纳入10-15窗口数据为10)
现象:输出数据1、4、3
{"f1":1,"f2":"2022-02-28 21:00:01.000"}
{"f1":4,"f2":"2022-02-28 21:00:04.000"}
{"f1":3,"f2":"2022-02-28 21:00:03.000"}
插入新数据
{"f1":7,"f2":"2022-02-28 21:00:07.000"}
{"f1":8,"f2":"2022-02-28 21:00:08.000"}
当前水位为8,无新数据输出(纳入5-10窗口数据为9、6、8、7、5;纳入10-15窗口数据为10)
现象:输出数据1、4、3
插入新数据
{"f1":11,"f2":"2022-02-28 21:00:11.000"}
水位为11,当前水位 >=10,故触发5-10窗口关闭,输出该窗口数据9、6、8、7、5(纳入10-15窗口数据为10、11)
现象:输出新数据9、6、8、7、5,当前所有数据1、4、3、9、6、8、7、5
{"f1":1,"f2":"2022-02-28 21:00:01.000"}
{"f1":4,"f2":"2022-02-28 21:00:04.000"}
{"f1":3,"f2":"2022-02-28 21:00:03.000"}
{"f1":9,"f2":"2022-02-28 21:00:09.000"}
{"f1":6,"f2":"2022-02-28 21:00:06.000"}
{"f1":8,"f2":"2022-02-28 21:00:08.000"}
{"f1":7,"f2":"2022-02-28 21:00:07.000"}
{"f1":5,"f2":"2022-02-28 21:00:05.000"}
例3
存量批数据如下
{"f1":1,"f2":"2022-02-28 21:00:01.000"}
{"f1":4,"f2":"2022-02-28 21:00:04.000"}
{"f1":5,"f2":"2022-02-28 21:00:05.000"}
{"f1":7,"f2":"2022-02-28 21:00:07.000"}
{"f1":10,"f2":"2022-02-28 21:00:10.000"}
{"f1":1,"f2":"2022-02-28 21:00:01.000"}
启动流,当前水位为1,不输出数据(纳入0-5窗口数据为1、4;纳入5-10窗口数据为7、5;纳入10-15窗口数据为10)
现象:无数据输出
插入新数据
{"f1":10,"f2":"2022-02-28 21:00:10.000"}
水位为10,当前水位 >=5 且 >= 10,故触发0-5和5-10窗口关闭,输出0-5窗口数据1、4和5-10窗口7、5(纳入10-15窗口数据为10)
现象:输出数据1、4、7、5
{"f1":1,"f2":"2022-02-28 21:00:01.000"}
{"f1":4,"f2":"2022-02-28 21:00:04.000"}
{"f1":7,"f2":"2022-02-28 21:00:07.000"}
{"f1":5,"f2":"2022-02-28 21:00:05.000"}