You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by jianjianjianjianjianjianjianjian <72...@qq.com.INVALID> on 2022/03/03 12:02:46 UTC

开启水位线时,消费kafka存量数据与消费kafka实时发生数据输出结果不一致

老师们,你们好:
&nbsp; &nbsp; 在使用水位线时,发现消费kafka存量数据与消费kafka实时发生数据输出结果不一致。在消费实时数据时,水位会随着每条数据的进入对应发生变化;而消费具有存量数据的kafka时,水位会根据该批存量数据的最后一条数据生成水位,而我们在代码中Debug时,生成水位的结果又与实时消费数据一致。
&nbsp; &nbsp; 且在查看源代码代码时,AbstractFetcher.java 中&nbsp;emitRecordsWithTimestamps 方法中的逻辑按理存量数据也是会和实时数据相同,for循环每条数据并生成水位,故发邮件咨询,当前情况是否为水位逻辑设计初衷?
&nbsp; &nbsp; 源代码 AbstractFetcher 类的&nbsp;emitRecordsWithTimestamps&nbsp;方法如下:
protected void emitRecordsWithTimestamps(
        Queue<T&gt; records,
        KafkaTopicPartitionState<T, KPH&gt; partitionState,
        long offset,
        long kafkaEventTimestamp) {
    // emit the records, using the checkpoint lock to guarantee
    // atomicity of record emission and offset state update
    synchronized (checkpointLock) {
        T record;
        while ((record = records.poll()) != null) {
            long timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp);
            sourceContext.collectWithTimestamp(record, timestamp);

            // this might emit a watermark, so do it after emitting the record
            partitionState.onEvent(record, timestamp);
        }
        partitionState.setOffset(offset);
    }
}




&nbsp; &nbsp;验证过程如下:

前提

kafka单分区,,0延迟,5s窗口大小
CREATE TABLE kafka_watermark (
 &nbsp;`f1` INTEGER,
 &nbsp;`f2` TIMESTAMP(3),
  WATERMARK FOR f2 AS f2 - INTERVAL '0' SECOND
) WITH (
 &nbsp;'connector' = 'kafka',
 &nbsp;'topic' = 'test_watermark',
 &nbsp;'properties.group.id' = 'groupId',
 &nbsp;'properties.max.poll.records' = '10',
 &nbsp;'properties.max.poll.interval.ms' = '1000',
 &nbsp;'properties.fetch.max.bytes' = '52428800',
 &nbsp;'properties.max.partition.fetch.bytes' = '1048576',
 &nbsp;'scan.startup.mode' = 'earliest-offset',
 &nbsp;'properties.enable.auto.commit' = 'false',
 &nbsp;'format' = 'json',
 &nbsp;'json.ignore-parse-errors' = 'false',
 &nbsp;'json.timestamp-format.standard' = 'SQL',
 &nbsp;'json.map-null-key.mode' = 'FAIL'
);
​
SELECT
  f1,
  f2
FROM kafka_watermark
GROUP BY
  TUMBLE(f2, INTERVAL '5' SECOND),
  f1,f2


实时数据

启动流并插入数据

{"f1":1,"f2":"2022-02-28 21:00:01.000"}

当前水位为1,不输出数据(纳入0-5窗口数据为1)

现象:无数据输出

{"f1":4,"f2":"2022-02-28 21:00:04.000"}

当前水位为4,不输出数据(纳入0-5窗口数据为1、4)

现象:无数据输出

{"f1":5,"f2":"2022-02-28 21:00:05.000"}

当前水位为5,&gt;=5,故触发0-5窗口关闭,输出该窗口数据1、4(纳入5-10窗口数据为5)

现象:输出数据1、4

{"f1":2,"f2":"2022-02-28 21:00:02.000"}

当前水位仍为5,0延迟,且0-5已窗口关闭,当前2丢弃,不输出数据(纳入5-10窗口数据为5)

现象:无新数据输出

{"f1":3,"f2":"2022-02-28 21:00:03.000"}

当前水位仍为5,0延迟,且0-5已窗口关闭,当前3丢弃,不输出数据(纳入5-10窗口数据为5)

现象:无新数据输出

{"f1":7,"f2":"2022-02-28 21:00:07.000"}

当前水位为7,不输出数据(纳入5-10窗口数据为5、7)

现象:无新数据输出

{"f1":6,"f2":"2022-02-28 21:00:06.000"}

当前水位仍为7,不输出数据(纳入5-10窗口数据为5、7、6)

现象:无新数据输出

{"f1":10,"f2":"2022-02-28 21:00:10.000"}

当前水位为10,&gt;=10,故触发5-10窗口关闭,输出该窗口数据5、7、6(纳入10-15窗口数据为10)

现象:输出新数据5、7、6,当前所有数据1、4、5、7、6








存量批数据

多例验证过程:

例1

存量批数据如下
{"f1":1,"f2":"2022-02-28 21:00:01.000"}
{"f1":4,"f2":"2022-02-28 21:00:04.000"}
{"f1":5,"f2":"2022-02-28 21:00:05.000"}
{"f1":2,"f2":"2022-02-28 21:00:02.000"}
{"f1":23,"f2":"2022-02-28 21:00:02.000"}
{"f1":2,"f2":"2022-02-28 21:00:02.000"}
{"f1":3,"f2":"2022-02-28 21:00:03.000"}
启动流,当前水位为3,不输出数据(纳入0-5窗口数据为1、4、2、23、3;纳入5-10窗口数据为5)

现象:无数据输出

插入新数据
{"f1":6,"f2":"2022-02-28 21:00:06.000"}
水位为6,当前水位 &gt;=5,故触发0-5窗口关闭,输出该窗口数据1、4、2、23、3(纳入5-10窗口数据为6、5)

现象:输出数据1、4、2、23、3
{"f1":1,"f2":"2022-02-28 21:00:01.000"}
{"f1":4,"f2":"2022-02-28 21:00:04.000"}
{"f1":2,"f2":"2022-02-28 21:00:02.000"}
{"f1":23,"f2":"2022-02-28 21:00:02.000"}
{"f1":3,"f2":"2022-02-28 21:00:03.000"}
插入新数据
{"f1":12,"f2":"2022-02-28 21:00:12.000"}
水位为12,当前水位 &gt;=10,故触发5-10窗口关闭,输出该窗口数据6、5(纳入10-15窗口数据为12)

现象:输出新数据6、5,当前所有数据1、4、2、23、3、6、5
{"f1":1,"f2":"2022-02-28 21:00:01.000"}
{"f1":4,"f2":"2022-02-28 21:00:04.000"}
{"f1":2,"f2":"2022-02-28 21:00:02.000"}
{"f1":23,"f2":"2022-02-28 21:00:02.000"}
{"f1":3,"f2":"2022-02-28 21:00:03.000"}
{"f1":6,"f2":"2022-02-28 21:00:06.000"}
{"f1":5,"f2":"2022-02-28 21:00:05.000"}
插入新数据



例2

存量批数据如下
{"f1":1,"f2":"2022-02-28 21:00:01.000"}
{"f1":4,"f2":"2022-02-28 21:00:04.000"}
{"f1":5,"f2":"2022-02-28 21:00:05.000"}
{"f1":3,"f2":"2022-02-28 21:00:03.000"}
{"f1":5,"f2":"2022-02-28 21:00:05.000"}
{"f1":10,"f2":"2022-02-28 21:00:10.000"}
{"f1":9,"f2":"2022-02-28 21:00:09.000"}
{"f1":6,"f2":"2022-02-28 21:00:06.000"}
启动流,当前水位为6,&gt;=5,故触发0-5窗口关闭,输出该窗口数据1、4、3(纳入5-10窗口数据为9、6、5;纳入10-15窗口数据为10)

现象:输出数据1、4、3
{"f1":1,"f2":"2022-02-28 21:00:01.000"}
{"f1":4,"f2":"2022-02-28 21:00:04.000"}
{"f1":3,"f2":"2022-02-28 21:00:03.000"}


插入新数据
{"f1":7,"f2":"2022-02-28 21:00:07.000"}
{"f1":8,"f2":"2022-02-28 21:00:08.000"}
当前水位为8,无新数据输出(纳入5-10窗口数据为9、6、8、7、5;纳入10-15窗口数据为10)

现象:输出数据1、4、3



插入新数据
{"f1":11,"f2":"2022-02-28 21:00:11.000"}
水位为11,当前水位 &gt;=10,故触发5-10窗口关闭,输出该窗口数据9、6、8、7、5(纳入10-15窗口数据为10、11)

现象:输出新数据9、6、8、7、5,当前所有数据1、4、3、9、6、8、7、5
{"f1":1,"f2":"2022-02-28 21:00:01.000"}
{"f1":4,"f2":"2022-02-28 21:00:04.000"}
{"f1":3,"f2":"2022-02-28 21:00:03.000"}
{"f1":9,"f2":"2022-02-28 21:00:09.000"}
{"f1":6,"f2":"2022-02-28 21:00:06.000"}
{"f1":8,"f2":"2022-02-28 21:00:08.000"}
{"f1":7,"f2":"2022-02-28 21:00:07.000"}
{"f1":5,"f2":"2022-02-28 21:00:05.000"}


例3

存量批数据如下
{"f1":1,"f2":"2022-02-28 21:00:01.000"}
{"f1":4,"f2":"2022-02-28 21:00:04.000"}
{"f1":5,"f2":"2022-02-28 21:00:05.000"}
{"f1":7,"f2":"2022-02-28 21:00:07.000"}
{"f1":10,"f2":"2022-02-28 21:00:10.000"}
{"f1":1,"f2":"2022-02-28 21:00:01.000"}
启动流,当前水位为1,不输出数据(纳入0-5窗口数据为1、4;纳入5-10窗口数据为7、5;纳入10-15窗口数据为10)

现象:无数据输出



插入新数据
{"f1":10,"f2":"2022-02-28 21:00:10.000"}
水位为10,当前水位 &gt;=5 且 &gt;= 10,故触发0-5和5-10窗口关闭,输出0-5窗口数据1、4和5-10窗口7、5(纳入10-15窗口数据为10)

现象:输出数据1、4、7、5
{"f1":1,"f2":"2022-02-28 21:00:01.000"}
{"f1":4,"f2":"2022-02-28 21:00:04.000"}
{"f1":7,"f2":"2022-02-28 21:00:07.000"}
{"f1":5,"f2":"2022-02-28 21:00:05.000"}