You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 李占阳 <15...@126.com> on 2021/08/13 09:14:05 UTC

FLink 1.13.2 use TVF data is not correct

Hi all:
  我在使用flink1.13.2 的时候利用TVF 进行统计发下滚动统计的结果和离线明细统计的每天总量不对。下面是我的sql:
    String message = " CREATE TABLE test(\n" +
"                gid VARCHAR COMMENT 'uuid 唯一标识',\n" +
"                ip VARCHAR COMMENT 'ip 地址',\n" +
"                business_no VARCHAR COMMENT '商户号',\n" +
"                rtime  BIGINT ,\n" +
"                event_time as TO_TIMESTAMP_LTZ(rtime,3)  ,\n" +
"                WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE  , \n"+
"                ts AS PROCTIME () , \n"+
"               `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'  \n"+
" ) \n" +
"        WITH (\n" +
"                'connector' = 'kafka',\n" +
"                'topic' = 'test',\n" +
"                'properties.group.id' = 'consumer-02',\n" +
"                'properties.bootstrap.servers' = 'XXX:9092',\n" +
"                'properties.security.protocol' = 'SASL_PLAINTEXT',\n" +
"                'properties.sasl.mechanism' = 'GSSAPI',\n" +
"                'properties.sasl.kerberos.service.name' = 'kafka',\n" +
"                 'scan.startup.mode' = 'earliest-offset',\n" +
"                'format' = 'json'\n" +
"        )";
//                "             




String message_cnts="SELECT " +
"ip ," +
"business_no as business_no ," +
" min(record_time) as record_time," +
" count(distinct gid) as total_call_num \n" +
",window_start, window_end" +
"  FROM TABLE(\n" +
"    TUMBLE(TABLE test, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))\n" +
"  GROUP BY window_start, window_end, GROUPING SETS ((business_no ,ip)) ";




Re: FLink 1.13.2 use TVF data is not correct

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

一般来说 window 流计算结果和批计算结果不一致,很可能是因为有迟到数据。请检查一下是否有迟到数据,如果有可以考虑把 watermark
的时间再加长一点。

李占阳 <15...@126.com> 于2021年8月13日周五 下午5:21写道:

> Hi all:
>   我在使用flink1.13.2 的时候利用TVF 进行统计发下滚动统计的结果和离线明细统计的每天总量不对。下面是我的sql:
>
>     String message = " CREATE TABLE test(\n" +
>                 "                gid VARCHAR COMMENT 'uuid 唯一标识',\n" +
>                 "                ip VARCHAR COMMENT 'ip 地址',\n" +
>                 "                business_no VARCHAR COMMENT '商户号',\n" +
>                 "                rtime  BIGINT ,\n" +
>                 "                event_time as TO_TIMESTAMP_LTZ(rtime,3)  ,\n" +
>                 "                WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE  , \n"+
>                 "                ts AS PROCTIME () , \n"+
>                 "               `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'  \n"+
>                 " ) \n" +
>                 "        WITH (\n" +
>                 "                'connector' = 'kafka',\n" +
>                 "                'topic' = 'test',\n" +
>                 "                'properties.group.id' = 'consumer-02',\n" +
>                 "                'properties.bootstrap.servers' = 'XXX:9092',\n" +
>                 "                'properties.security.protocol' = 'SASL_PLAINTEXT',\n" +
>                 "                'properties.sasl.mechanism' = 'GSSAPI',\n" +
>                 "                'properties.sasl.kerberos.service.name' = 'kafka',\n" +
>                 "                 'scan.startup.mode' = 'earliest-offset',\n" +
>                 "                'format' = 'json'\n" +
>                 "        )";
> //                "
>
>
>
> String message_cnts="SELECT " +
>         "ip ," +
>         "business_no as business_no ," +
>         " min(record_time) as record_time," +
>         " count(distinct gid) as total_call_num \n" +
>         ",window_start, window_end" +
>         "  FROM TABLE(\n" +
>         "    TUMBLE(TABLE test, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))\n" +
>         "  GROUP BY window_start, window_end, GROUPING SETS ((business_no ,ip)) ";
>
>
>
>
>
>
>
>