You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by JianWen Huang <ji...@gmail.com> on 2022/05/07 08:41:36 UTC

WaterMark that defined in DDL does not work

Flink version : 1.15.0
Flink sql :
create table custom_kafka(
name STRING,
money BIGINT,
status STRING,
createtime TIMESTAMP(3),
operation_ts TIMESTAMP_LTZ(3),
WATERMARK FOR createtime AS createtime - INTERVAL '5' SECOND
)WITH(
'connector' = 'kafka',
'topic' = 'flink.cdc_test',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = '*.70:9092,*.71:9092,*.72:9092',
'format' = 'debezium-json'
)

create view  custom_day_sum  AS
SELECT DATE_FORMAT(TUMBLE_START(createtime,INTERVAL '10'
MINUTES),'yyyy-MM-dd')as
date_str,SUBSTR(DATE_FORMAT(TUMBLE_END(createtime,INTERVAL '10'
MINUTES),'HH:mm'),1,4) || '0' as time_str,sum(money) as total ,name
FROM  custom_kafka
where status='1'
GROUP BY name,TUMBLE(createtime,INTERVAL '10' MINUTES)

CREATE TABLE print_table (
date_str STRING,
time_str STRING,
total BIGINT,
name STRING"
)WITH(
'connector'='print'
)

INSERT INTO print_table
SELECT date_str,time_str,total,name
FROM custom_day_sum

Error:
There are not watermark shows in web ui .

Re:WaterMark that defined in DDL does not work

Posted by Xuyang <xy...@163.com>.
Hi, Huang. I test the SQL with the connector 'datagen', and watermark exists in the we ui. You can change "WATERMARK FOR createtime AS createtime - INTERVAL '5' SECOND" to "WATERMARK FOR createtime AS createtime" and ensure all sutasks contain data for testing.
At 2022-05-07 16:41:36, "JianWen Huang" <ji...@gmail.com> wrote:
>Flink version : 1.15.0
>Flink sql :
>create table custom_kafka(
>name STRING,
>money BIGINT,
>status STRING,
>createtime TIMESTAMP(3),
>operation_ts TIMESTAMP_LTZ(3),
>WATERMARK FOR createtime AS createtime - INTERVAL '5' SECOND
>)WITH(
>'connector' = 'kafka',
>'topic' = 'flink.cdc_test',
>'scan.startup.mode' = 'earliest-offset',
>'properties.bootstrap.servers' = '*.70:9092,*.71:9092,*.72:9092',
>'format' = 'debezium-json'
>)
>
>create view  custom_day_sum  AS
>SELECT DATE_FORMAT(TUMBLE_START(createtime,INTERVAL '10'
>MINUTES),'yyyy-MM-dd')as
>date_str,SUBSTR(DATE_FORMAT(TUMBLE_END(createtime,INTERVAL '10'
>MINUTES),'HH:mm'),1,4) || '0' as time_str,sum(money) as total ,name
>FROM  custom_kafka
>where status='1'
>GROUP BY name,TUMBLE(createtime,INTERVAL '10' MINUTES)
>
>CREATE TABLE print_table (
>date_str STRING,
>time_str STRING,
>total BIGINT,
>name STRING"
>)WITH(
>'connector'='print'
>)
>
>INSERT INTO print_table
>SELECT date_str,time_str,total,name
>FROM custom_day_sum
>
>Error:
>There are not watermark shows in web ui .