You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by steven chen <st...@163.com> on 2020/05/29 06:34:12 UTC

关于flink sql 滚动窗口无法输出结果集合

数据没次都能进来,并且统计,但是为什么结果insert 不会保存到mysql 中?是sql的问题?还是?求大神解答
CREATE TABLE user_behavior (

itemCode VARCHAR,

ts BIGINT COMMENT '时间戳',

t as TO_TIMESTAMP(FROM_UNIXTIME(ts /1000,'yyyy-MM-dd HH:mm:ss')),

proctime as PROCTIME(),

WATERMARK FOR t as t - INTERVAL '5' SECOND

) WITH (

'connector.type' = 'kafka',

'connector.version' = '0.11',

'connector.topic' = 'scan-flink-topic',

'connector.properties.group.id' ='qrcode_pv_five_min',

'connector.startup-mode' = 'latest-offset',

'connector.properties.zookeeper.connect' = 'localhost:2181',

'connector.properties.bootstrap.servers' = 'localhost:9092',

'update-mode' = 'append',

'format.type' = 'json',

'format.derive-schema' = 'true'

);

CREATE TABLE pv_five_min (
item_code VARCHAR,
dt VARCHAR,
dd VARCHAR,
pv BIGINT
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://127.0.0.1:3306/qrcode',
'connector.table' = 'qrcode_pv_five_min',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'root',
'connector.password' = 'root',
'connector.write.flush.max-rows' = '1'
);

INSERT INTO pv_five_min
SELECT
itemCode As item_code,
DATE_FORMAT(TUMBLE_START(t, INTERVAL '5' MINUTE),'yyyy-MM-dd HH:mm') dt,
DATE_FORMAT(TUMBLE_END(t, INTERVAL '5' MINUTE),'yyyy-MM-dd HH:mm') dd,
COUNT(*) AS pv
FROM user_behavior
GROUP BY TUMBLE(t, INTERVAL '5' MINUTE),itemCode;




 

Re: 关于flink sql 滚动窗口无法输出结果集合

Posted by Benchao Li <li...@gmail.com>.
CC user-zh

Hi steven,
我刚意识到这个回复只是回复到了私人邮箱,没有抄送社区。现在已经抄送了user-zh邮件列表。
第二个问题是,我才意识到最初你发送的邮件列表是user,而不是user-zh。下次如果是中文的邮件,可以直接发送user-zh,而不是user。
user邮件列表推荐用英文来交流。

关于你的问题,我认为这个watermark的时区其实对你的数据计算过程是没有影响的,不管是不是存在时区偏移,
watermark跟事件时间他们两个是可以对齐的。如果你不需要把时间输出到sink,这个watermark的偏移你可以暂时先不用关注。

steven chen <st...@163.com> 于2020年5月30日周六 上午1:28写道:

> 1.添加了时区函数,但是水印时间还是提前了8个小时,是否只能从数据源头上去减去8个小时,在输入到flink
>  2.处理事件窗口刚好晚8个小时,这个是否添加时区函数即可恢复,如果添加请问是SQL 如何添加
>
> steven chen
> 邮箱:stevenchen01@163.com
>
> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=steven+chen&uid=stevenchen01%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22%E9%82%AE%E7%AE%B1%EF%BC%9Astevenchen01%40163.com%22%5D>
>
> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail88> 定制
>
> 在2020年05月29日 17:22,steven chen <st...@163.com> 写道:
>
>
>
> 谢谢大佬!我感觉我思路一下清晰了!感谢提供思路和方案
>
>
>
>
> 在 2020-05-29 17:12:45,"Benchao Li" <li...@gmail.com> 写道:
>
> 1,你现在用的是TUMBLE(滚动窗口),不是滑动窗口。 窗口的划分就是按照数据的时间来计算,
>       比如你是5min的窗口,就应该是整数的窗口,比如[1:00, 1:05), [1:05, 1:10)这样子。
> 2, 如果你观察到的是30分钟后才有数据输出,这个大概率是跟你的watermark有关系,比如有些source subtask
>       的watermark要明显小于其他的subtask等情况。
> 3, 你这个情况如果不是必须要求事件时间,直接用处理时间窗口应该是比较符合你的预期的。
>
> steven chen <st...@163.com> 于2020年5月29日周五 下午3:46写道:
>
>>
>>
>> 谢谢!
>> 1.sql 这个滑动窗口的触发器的时间怎么设置?
>> 2.现在有数据写入,但是并不是5分钟后把窗口结果集sink 到mysql
>> 好像每开一个统计窗,都是30分钟后才能写入到mysql,或者下一次一定时间范围数据输入,才能写入, 这个又是什么原因?
>> 3.如果我现在只要有数据进来,开窗统计后,5分钟窗口结束触发将结果集insert到mysql
>>
>>
>>
>>
>>
>> 在 2020-05-29 15:36:08,"Benchao Li" <li...@gmail.com> 写道:
>>
>> 这应该是时区问题吧,TO_TIMESTAMP这个函数是有时区概念的。可以看下社区文档关于这个函数的介绍[1]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/systemFunctions.html
>>
>> steven chen <st...@163.com> 于2020年5月29日周五 下午3:19写道:
>>
>>> 有个疑问就是 我从webui中查看 watermark  为今晚上22点58分?这个是为什么
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-05-29 15:02:21,"Benchao Li" <li...@gmail.com> 写道:
>>>
>>> Hi,
>>>
>>> Flink里面watermark要求时间字段是毫秒级别的,你可以看下你的watermark是否正常。感觉可能是这里的问题。
>>>
>>>
>>> steven chen <st...@163.com> 于2020年5月29日周五 下午2:34写道:
>>>
>>>> 数据没次都能进来,并且统计,但是为什么结果insert 不会保存到mysql 中?是sql的问题?还是?求大神解答
>>>> CREATE TABLE user_behavior (
>>>>
>>>> itemCode VARCHAR,
>>>>
>>>> ts BIGINT COMMENT '时间戳',
>>>>
>>>> t as TO_TIMESTAMP(FROM_UNIXTIME(ts /1000,'yyyy-MM-dd HH:mm:ss')),
>>>>
>>>> proctime as PROCTIME(),
>>>>
>>>> WATERMARK FOR t as t - INTERVAL '5' SECOND
>>>>
>>>> ) WITH (
>>>>
>>>> 'connector.type' = 'kafka',
>>>>
>>>> 'connector.version' = '0.11',
>>>>
>>>> 'connector.topic' = 'scan-flink-topic',
>>>>
>>>> 'connector.properties.group.id' ='qrcode_pv_five_min',
>>>>
>>>> 'connector.startup-mode' = 'latest-offset',
>>>>
>>>> 'connector.properties.zookeeper.connect' = 'localhost:2181',
>>>>
>>>> 'connector.properties.bootstrap.servers' = 'localhost:9092',
>>>>
>>>> 'update-mode' = 'append',
>>>>
>>>> 'format.type' = 'json',
>>>>
>>>> 'format.derive-schema' = 'true'
>>>> );
>>>> CREATE TABLE pv_five_min (
>>>> item_code VARCHAR,
>>>> dt VARCHAR,
>>>> dd VARCHAR,
>>>> pv BIGINT
>>>> ) WITH (
>>>> 'connector.type' = 'jdbc',
>>>> 'connector.url' = 'jdbc:mysql://127.0.0.1:3306/qrcode',
>>>> 'connector.table' = 'qrcode_pv_five_min',
>>>> 'connector.driver' = 'com.mysql.jdbc.Driver',
>>>> 'connector.username' = 'root',
>>>> 'connector.password' = 'root',
>>>> 'connector.write.flush.max-rows' = '1'
>>>> );
>>>> INSERT INTO pv_five_min
>>>> SELECT
>>>> itemCode As item_code,
>>>> DATE_FORMAT(TUMBLE_START(t, INTERVAL '5' MINUTE),'yyyy-MM-dd HH:mm') dt,
>>>> DATE_FORMAT(TUMBLE_END(t, INTERVAL '5' MINUTE),'yyyy-MM-dd HH:mm') dd,
>>>> COUNT(*) AS pv
>>>> FROM user_behavior
>>>> GROUP BY TUMBLE(t, INTERVAL '5' MINUTE),itemCode;
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>>
>>> Best,
>>> Benchao Li
>>>
>>>
>>>
>>>
>>>
>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>>
>>
>>
>>
>
>
> --
>
> Best,
> Benchao Li
>
>
>
>
>
>

-- 

Best,
Benchao Li

Re: 关于flink sql 滚动窗口无法输出结果集合

Posted by Benchao Li <li...@gmail.com>.
Hi,

Flink里面watermark要求时间字段是毫秒级别的,你可以看下你的watermark是否正常。感觉可能是这里的问题。


steven chen <st...@163.com> 于2020年5月29日周五 下午2:34写道:

> 数据没次都能进来,并且统计,但是为什么结果insert 不会保存到mysql 中?是sql的问题?还是?求大神解答
> CREATE TABLE user_behavior (
>
> itemCode VARCHAR,
>
> ts BIGINT COMMENT '时间戳',
>
> t as TO_TIMESTAMP(FROM_UNIXTIME(ts /1000,'yyyy-MM-dd HH:mm:ss')),
>
> proctime as PROCTIME(),
>
> WATERMARK FOR t as t - INTERVAL '5' SECOND
>
> ) WITH (
>
> 'connector.type' = 'kafka',
>
> 'connector.version' = '0.11',
>
> 'connector.topic' = 'scan-flink-topic',
>
> 'connector.properties.group.id' ='qrcode_pv_five_min',
>
> 'connector.startup-mode' = 'latest-offset',
>
> 'connector.properties.zookeeper.connect' = 'localhost:2181',
>
> 'connector.properties.bootstrap.servers' = 'localhost:9092',
>
> 'update-mode' = 'append',
>
> 'format.type' = 'json',
>
> 'format.derive-schema' = 'true'
> );
> CREATE TABLE pv_five_min (
> item_code VARCHAR,
> dt VARCHAR,
> dd VARCHAR,
> pv BIGINT
> ) WITH (
> 'connector.type' = 'jdbc',
> 'connector.url' = 'jdbc:mysql://127.0.0.1:3306/qrcode',
> 'connector.table' = 'qrcode_pv_five_min',
> 'connector.driver' = 'com.mysql.jdbc.Driver',
> 'connector.username' = 'root',
> 'connector.password' = 'root',
> 'connector.write.flush.max-rows' = '1'
> );
> INSERT INTO pv_five_min
> SELECT
> itemCode As item_code,
> DATE_FORMAT(TUMBLE_START(t, INTERVAL '5' MINUTE),'yyyy-MM-dd HH:mm') dt,
> DATE_FORMAT(TUMBLE_END(t, INTERVAL '5' MINUTE),'yyyy-MM-dd HH:mm') dd,
> COUNT(*) AS pv
> FROM user_behavior
> GROUP BY TUMBLE(t, INTERVAL '5' MINUTE),itemCode;
>
>
>
>
>
>
>


-- 

Best,
Benchao Li