You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 李占阳 <15...@126.com> on 2021/08/13 09:14:05 UTC
FLink 1.13.2 use TVF data is not correct
Hi all:
我在使用flink1.13.2 的时候利用TVF 进行统计发下滚动统计的结果和离线明细统计的每天总量不对。下面是我的sql:
String message = " CREATE TABLE test(\n" +
" gid VARCHAR COMMENT 'uuid 唯一标识',\n" +
" ip VARCHAR COMMENT 'ip 地址',\n" +
" business_no VARCHAR COMMENT '商户号',\n" +
" rtime BIGINT ,\n" +
" event_time as TO_TIMESTAMP_LTZ(rtime,3) ,\n" +
" WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE , \n"+
" ts AS PROCTIME () , \n"+
" `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' \n"+
" ) \n" +
" WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'test',\n" +
" 'properties.group.id' = 'consumer-02',\n" +
" 'properties.bootstrap.servers' = 'XXX:9092',\n" +
" 'properties.security.protocol' = 'SASL_PLAINTEXT',\n" +
" 'properties.sasl.mechanism' = 'GSSAPI',\n" +
" 'properties.sasl.kerberos.service.name' = 'kafka',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'json'\n" +
" )";
// "
String message_cnts="SELECT " +
"ip ," +
"business_no as business_no ," +
" min(record_time) as record_time," +
" count(distinct gid) as total_call_num \n" +
",window_start, window_end" +
" FROM TABLE(\n" +
" TUMBLE(TABLE test, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))\n" +
" GROUP BY window_start, window_end, GROUPING SETS ((business_no ,ip)) ";
Re: FLink 1.13.2 use TVF data is not correct
Posted by Caizhi Weng <ts...@gmail.com>.
Hi!
一般来说 window 流计算结果和批计算结果不一致,很可能是因为有迟到数据。请检查一下是否有迟到数据,如果有可以考虑把 watermark
的时间再加长一点。
李占阳 <15...@126.com> 于2021年8月13日周五 下午5:21写道:
> Hi all:
> 我在使用flink1.13.2 的时候利用TVF 进行统计发下滚动统计的结果和离线明细统计的每天总量不对。下面是我的sql:
>
> String message = " CREATE TABLE test(\n" +
> " gid VARCHAR COMMENT 'uuid 唯一标识',\n" +
> " ip VARCHAR COMMENT 'ip 地址',\n" +
> " business_no VARCHAR COMMENT '商户号',\n" +
> " rtime BIGINT ,\n" +
> " event_time as TO_TIMESTAMP_LTZ(rtime,3) ,\n" +
> " WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE , \n"+
> " ts AS PROCTIME () , \n"+
> " `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' \n"+
> " ) \n" +
> " WITH (\n" +
> " 'connector' = 'kafka',\n" +
> " 'topic' = 'test',\n" +
> " 'properties.group.id' = 'consumer-02',\n" +
> " 'properties.bootstrap.servers' = 'XXX:9092',\n" +
> " 'properties.security.protocol' = 'SASL_PLAINTEXT',\n" +
> " 'properties.sasl.mechanism' = 'GSSAPI',\n" +
> " 'properties.sasl.kerberos.service.name' = 'kafka',\n" +
> " 'scan.startup.mode' = 'earliest-offset',\n" +
> " 'format' = 'json'\n" +
> " )";
> // "
>
>
>
> String message_cnts="SELECT " +
> "ip ," +
> "business_no as business_no ," +
> " min(record_time) as record_time," +
> " count(distinct gid) as total_call_num \n" +
> ",window_start, window_end" +
> " FROM TABLE(\n" +
> " TUMBLE(TABLE test, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))\n" +
> " GROUP BY window_start, window_end, GROUPING SETS ((business_no ,ip)) ";
>
>
>
>
>
>
>
>