You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by x <35...@qq.com> on 2020/06/17 03:14:12 UTC
求助:FLINKSQL1.10实时统计累计UV
需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
CREATE VIEW uv_per_10min AS
SELECT
MAX(DATE_FORMAT(proctime , 'yyyy-MM-dd HH:mm:00')) OVER w AS time_str,
COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
想请教一下,应该如何处理?
PARTITION BY DATE_FORMAT(rowtm, 'yyyy-MM-dd') 这样可以吗,另外状态应该如何清理?
PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧
多谢
Re: 求助:FLINKSQL1.10实时统计累计UV
Posted by Benchao Li <li...@apache.org>.
你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题,
这个已经在1.11中修复了。
[1] https://issues.apache.org/jira/browse/FLINK-17942
x <35...@qq.com> 于2020年7月3日周五 下午4:34写道:
> 您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期,
>
> 我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。
>
>
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "Jark Wu"<imjark@gmail.com>;
> 发送时间: 2020年6月18日(星期四) 中午12:16
> 收件人: "user-zh"<user-zh@flink.apache.org>;
>
> 主题: Re: 求助:FLINKSQL1.10实时统计累计UV
>
>
>
> 是的,我觉得这样子是能绕过的。
>
> On Thu, 18 Jun 2020 at 10:34, x <35907418@qq.com> wrote:
>
> > 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗?
> > val resTmpTab: Table = tabEnv.sqlQuery(
> > """
> > SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd
> HH:mm:00'))
> > time_str,COUNT(DISTINCT userkey) uv
> > FROM user_behavior GROUP BY
> DATE_FORMAT(ts, 'yyyy-MM-dd') """)
> >
> > val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab)
> >
> .filter(line=&gt;line._1==true).map(line=&gt;line._2)
> >
> > val res= tabEnv.fromDataStream(resTmpStream)
> > tabEnv.sqlUpdate(
> > s"""
> > INSERT INTO rt_totaluv
> > SELECT _1,MAX(_2)
> > FROM $res
> > GROUP BY _1
> > """)
> >
> >
> > ------------------&nbsp;原始邮件&nbsp;------------------
> > 发件人:&nbsp;"Jark Wu"<imjark@gmail.com&gt;;
> > 发送时间:&nbsp;2020年6月17日(星期三) 中午1:55
> > 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
> >
> > 主题:&nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
> >
> >
> >
> > 在 Flink 1.11 中,你可以尝试这样:
> >
> > CREATE TABLE mysql (
> > &nbsp;&nbsp; time_str STRING,
> > &nbsp;&nbsp; uv BIGINT,
> > &nbsp;&nbsp; PRIMARY KEY (ts) NOT ENFORCED
> > ) WITH (
> > &nbsp;&nbsp; 'connector' = 'jdbc',
> > &nbsp;&nbsp; 'url' = 'jdbc:mysql://localhost:3306/mydatabase',
> > &nbsp;&nbsp; 'table-name' = 'myuv'
> > );
> >
> > INSERT INTO mysql
> > SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')),
> COUNT(DISTINCT&nbsp;
> > user_id)
> > FROM user_behavior;
> >
> > On Wed, 17 Jun 2020 at 13:49, x <35907418@qq.com&gt; wrote:
> >
> > &gt; 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV,
> > &gt; sink表这个样式
> > &gt; tm uv
> > &gt; 2020/06/17 13:46:00 10000
> > &gt; 2020/06/17 13:47:00 20000
> > &gt; 2020/06/17 13:48:00 30000
> > &gt;
> > &gt;
> > &gt; group by 日期的话,分钟如何获取
> > &gt;
> > &gt;
> > &gt;
> ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> > &gt; 发件人:&amp;nbsp;"Benchao Li"<libenchao@apache.org
> &amp;gt;;
> > &gt; 发送时间:&amp;nbsp;2020年6月17日(星期三) 中午11:46
> > &gt; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org
> &amp;gt;;
> > &gt;
> > &gt; 主题:&amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
> > &gt;
> > &gt;
> > &gt;
> > &gt; Hi,
> > &gt; 我感觉这种场景可以有两种方式,
> > &gt; 1. 可以直接用group by + mini batch
> > &gt; 2. window聚合 + fast emit
> > &gt;
> > &gt; 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm,
> 'yyyy-MM-dd')。
> > &gt; 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini
> batch的开启也需要
> > &gt; 用参数[2] 来打开。
> > &gt;
> > &gt; 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
> > &gt; fast
> emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
> > &gt; table.exec.emit.early-fire.enabled = true
> > &gt; table.exec.emit.early-fire.delay = 60 s
> > &gt;
> > &gt; [1]
> > &gt;
> > &gt;
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
> > &gt; [2]
> > &gt;
> > &gt;
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
> > &gt;
> > &gt; x <35907418@qq.com&amp;gt; 于2020年6月17日周三 上午11:14写道:
> > &gt;
> > &gt; &amp;gt;
> 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
> > &gt; &amp;gt; CREATE VIEW uv_per_10min AS
> > &gt; &amp;gt; SELECT&amp;amp;nbsp;
> > &gt; &amp;gt; &amp;amp;nbsp;
> MAX(DATE_FORMAT(proctime&amp;amp;nbsp;,
> > 'yyyy-MM-dd
> > &gt; HH:mm:00'))&amp;amp;nbsp;OVER w
> > &gt; &amp;gt; AS time_str,&amp;amp;nbsp;
> > &gt; &amp;gt; &amp;amp;nbsp; COUNT(DISTINCT user_id) OVER
> w AS uv
> > &gt; &amp;gt; FROM user_behavior
> > &gt; &amp;gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN
> UNBOUNDED
> > PRECEDING AND
> > &gt; &amp;gt; CURRENT ROW);
> > &gt; &amp;gt;
> > &gt; &amp;gt;
> > &gt; &amp;gt; 想请教一下,应该如何处理?
> > &gt; &amp;gt; PARTITION BY DATE_FORMAT(rowtm, 'yyyy-MM-dd')
> > 这样可以吗,另外状态应该如何清理?
> > &gt; &amp;gt; PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧
> > &gt; &amp;gt; 多谢
--
Best,
Benchao Li
回复: 求助:FLINKSQL1.10实时统计累计UV
Posted by x <35...@qq.com>.
您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期,
我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。
------------------ 原始邮件 ------------------
发件人: "Jark Wu"<imjark@gmail.com>;
发送时间: 2020年6月18日(星期四) 中午12:16
收件人: "user-zh"<user-zh@flink.apache.org>;
主题: Re: 求助:FLINKSQL1.10实时统计累计UV
是的,我觉得这样子是能绕过的。
On Thu, 18 Jun 2020 at 10:34, x <35907418@qq.com> wrote:
> 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗?
> val resTmpTab: Table = tabEnv.sqlQuery(
> """
> SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00'))
> time_str,COUNT(DISTINCT userkey) uv
> FROM user_behavior GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd') """)
>
> val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab)
> .filter(line=&gt;line._1==true).map(line=&gt;line._2)
>
> val res= tabEnv.fromDataStream(resTmpStream)
> tabEnv.sqlUpdate(
> s"""
> INSERT INTO rt_totaluv
> SELECT _1,MAX(_2)
> FROM $res
> GROUP BY _1
> """)
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"Jark Wu"<imjark@gmail.com&gt;;
> 发送时间:&nbsp;2020年6月17日(星期三) 中午1:55
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
>
>
>
> 在 Flink 1.11 中,你可以尝试这样:
>
> CREATE TABLE mysql (
> &nbsp;&nbsp; time_str STRING,
> &nbsp;&nbsp; uv BIGINT,
> &nbsp;&nbsp; PRIMARY KEY (ts) NOT ENFORCED
> ) WITH (
> &nbsp;&nbsp; 'connector' = 'jdbc',
> &nbsp;&nbsp; 'url' = 'jdbc:mysql://localhost:3306/mydatabase',
> &nbsp;&nbsp; 'table-name' = 'myuv'
> );
>
> INSERT INTO mysql
> SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')), COUNT(DISTINCT&nbsp;
> user_id)
> FROM user_behavior;
>
> On Wed, 17 Jun 2020 at 13:49, x <35907418@qq.com&gt; wrote:
>
> &gt; 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV,
> &gt; sink表这个样式
> &gt; tm uv
> &gt; 2020/06/17 13:46:00 10000
> &gt; 2020/06/17 13:47:00 20000
> &gt; 2020/06/17 13:48:00 30000
> &gt;
> &gt;
> &gt; group by 日期的话,分钟如何获取
> &gt;
> &gt;
> &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> &gt; 发件人:&amp;nbsp;"Benchao Li"<libenchao@apache.org&amp;gt;;
> &gt; 发送时间:&amp;nbsp;2020年6月17日(星期三) 中午11:46
> &gt; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
> &gt;
> &gt; 主题:&amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
> &gt;
> &gt;
> &gt;
> &gt; Hi,
> &gt; 我感觉这种场景可以有两种方式,
> &gt; 1. 可以直接用group by + mini batch
> &gt; 2. window聚合 + fast emit
> &gt;
> &gt; 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, 'yyyy-MM-dd')。
> &gt; 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要
> &gt; 用参数[2] 来打开。
> &gt;
> &gt; 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
> &gt; fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
> &gt; table.exec.emit.early-fire.enabled = true
> &gt; table.exec.emit.early-fire.delay = 60 s
> &gt;
> &gt; [1]
> &gt;
> &gt;
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
> &gt; [2]
> &gt;
> &gt;
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
> &gt;
> &gt; x <35907418@qq.com&amp;gt; 于2020年6月17日周三 上午11:14写道:
> &gt;
> &gt; &amp;gt; 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
> &gt; &amp;gt; CREATE VIEW uv_per_10min AS
> &gt; &amp;gt; SELECT&amp;amp;nbsp;
> &gt; &amp;gt; &amp;amp;nbsp; MAX(DATE_FORMAT(proctime&amp;amp;nbsp;,
> 'yyyy-MM-dd
> &gt; HH:mm:00'))&amp;amp;nbsp;OVER w
> &gt; &amp;gt; AS time_str,&amp;amp;nbsp;
> &gt; &amp;gt; &amp;amp;nbsp; COUNT(DISTINCT user_id) OVER w AS uv
> &gt; &amp;gt; FROM user_behavior
> &gt; &amp;gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED
> PRECEDING AND
> &gt; &amp;gt; CURRENT ROW);
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt; 想请教一下,应该如何处理?
> &gt; &amp;gt; PARTITION BY DATE_FORMAT(rowtm, 'yyyy-MM-dd')
> 这样可以吗,另外状态应该如何清理?
> &gt; &amp;gt; PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧
> &gt; &amp;gt; 多谢
Re: 求助:FLINKSQL1.10实时统计累计UV
Posted by Jark Wu <im...@gmail.com>.
是的,我觉得这样子是能绕过的。
On Thu, 18 Jun 2020 at 10:34, x <35...@qq.com> wrote:
> 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗?
> val resTmpTab: Table = tabEnv.sqlQuery(
> """
> SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00'))
> time_str,COUNT(DISTINCT userkey) uv
> FROM user_behavior GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd') """)
>
> val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab)
> .filter(line=>line._1==true).map(line=>line._2)
>
> val res= tabEnv.fromDataStream(resTmpStream)
> tabEnv.sqlUpdate(
> s"""
> INSERT INTO rt_totaluv
> SELECT _1,MAX(_2)
> FROM $res
> GROUP BY _1
> """)
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "Jark Wu"<imjark@gmail.com>;
> 发送时间: 2020年6月17日(星期三) 中午1:55
> 收件人: "user-zh"<user-zh@flink.apache.org>;
>
> 主题: Re: 求助:FLINKSQL1.10实时统计累计UV
>
>
>
> 在 Flink 1.11 中,你可以尝试这样:
>
> CREATE TABLE mysql (
> time_str STRING,
> uv BIGINT,
> PRIMARY KEY (ts) NOT ENFORCED
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://localhost:3306/mydatabase',
> 'table-name' = 'myuv'
> );
>
> INSERT INTO mysql
> SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')), COUNT(DISTINCT
> user_id)
> FROM user_behavior;
>
> On Wed, 17 Jun 2020 at 13:49, x <35907418@qq.com> wrote:
>
> > 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV,
> > sink表这个样式
> > tm uv
> > 2020/06/17 13:46:00 10000
> > 2020/06/17 13:47:00 20000
> > 2020/06/17 13:48:00 30000
> >
> >
> > group by 日期的话,分钟如何获取
> >
> >
> > ------------------&nbsp;原始邮件&nbsp;------------------
> > 发件人:&nbsp;"Benchao Li"<libenchao@apache.org&gt;;
> > 发送时间:&nbsp;2020年6月17日(星期三) 中午11:46
> > 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
> >
> > 主题:&nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
> >
> >
> >
> > Hi,
> > 我感觉这种场景可以有两种方式,
> > 1. 可以直接用group by + mini batch
> > 2. window聚合 + fast emit
> >
> > 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, 'yyyy-MM-dd')。
> > 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要
> > 用参数[2] 来打开。
> >
> > 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
> > fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
> > table.exec.emit.early-fire.enabled = true
> > table.exec.emit.early-fire.delay = 60 s
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
> > [2]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
> >
> > x <35907418@qq.com&gt; 于2020年6月17日周三 上午11:14写道:
> >
> > &gt; 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
> > &gt; CREATE VIEW uv_per_10min AS
> > &gt; SELECT&amp;nbsp;
> > &gt; &amp;nbsp; MAX(DATE_FORMAT(proctime&amp;nbsp;,
> 'yyyy-MM-dd
> > HH:mm:00'))&amp;nbsp;OVER w
> > &gt; AS time_str,&amp;nbsp;
> > &gt; &amp;nbsp; COUNT(DISTINCT user_id) OVER w AS uv
> > &gt; FROM user_behavior
> > &gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED
> PRECEDING AND
> > &gt; CURRENT ROW);
> > &gt;
> > &gt;
> > &gt; 想请教一下,应该如何处理?
> > &gt; PARTITION BY DATE_FORMAT(rowtm, 'yyyy-MM-dd')
> 这样可以吗,另外状态应该如何清理?
> > &gt; PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧
> > &gt; 多谢
回复: 求助:FLINKSQL1.10实时统计累计UV
Posted by x <35...@qq.com>.
如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗?
val resTmpTab: Table = tabEnv.sqlQuery(
"""
SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')) time_str,COUNT(DISTINCT userkey) uv
FROM user_behavior GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd') """)
val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab)
.filter(line=>line._1==true).map(line=>line._2)
val res= tabEnv.fromDataStream(resTmpStream)
tabEnv.sqlUpdate(
s"""
INSERT INTO rt_totaluv
SELECT _1,MAX(_2)
FROM $res
GROUP BY _1
""")
------------------ 原始邮件 ------------------
发件人: "Jark Wu"<imjark@gmail.com>;
发送时间: 2020年6月17日(星期三) 中午1:55
收件人: "user-zh"<user-zh@flink.apache.org>;
主题: Re: 求助:FLINKSQL1.10实时统计累计UV
在 Flink 1.11 中,你可以尝试这样:
CREATE TABLE mysql (
time_str STRING,
uv BIGINT,
PRIMARY KEY (ts) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'myuv'
);
INSERT INTO mysql
SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')), COUNT(DISTINCT user_id)
FROM user_behavior;
On Wed, 17 Jun 2020 at 13:49, x <35907418@qq.com> wrote:
> 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV,
> sink表这个样式
> tm uv
> 2020/06/17 13:46:00 10000
> 2020/06/17 13:47:00 20000
> 2020/06/17 13:48:00 30000
>
>
> group by 日期的话,分钟如何获取
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"Benchao Li"<libenchao@apache.org&gt;;
> 发送时间:&nbsp;2020年6月17日(星期三) 中午11:46
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
>
>
>
> Hi,
> 我感觉这种场景可以有两种方式,
> 1. 可以直接用group by + mini batch
> 2. window聚合 + fast emit
>
> 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, 'yyyy-MM-dd')。
> 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要
> 用参数[2] 来打开。
>
> 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
> fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
> table.exec.emit.early-fire.enabled = true
> table.exec.emit.early-fire.delay = 60 s
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
>
> x <35907418@qq.com&gt; 于2020年6月17日周三 上午11:14写道:
>
> &gt; 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
> &gt; CREATE VIEW uv_per_10min AS
> &gt; SELECT&amp;nbsp;
> &gt; &amp;nbsp; MAX(DATE_FORMAT(proctime&amp;nbsp;, 'yyyy-MM-dd
> HH:mm:00'))&amp;nbsp;OVER w
> &gt; AS time_str,&amp;nbsp;
> &gt; &amp;nbsp; COUNT(DISTINCT user_id) OVER w AS uv
> &gt; FROM user_behavior
> &gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND
> &gt; CURRENT ROW);
> &gt;
> &gt;
> &gt; 想请教一下,应该如何处理?
> &gt; PARTITION BY DATE_FORMAT(rowtm, 'yyyy-MM-dd') 这样可以吗,另外状态应该如何清理?
> &gt; PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧
> &gt; 多谢
回复: 求助:FLINKSQL1.10实时统计累计UV
Posted by x <35...@qq.com>.
明白了,十分感谢
------------------ 原始邮件 ------------------
发件人: "Jark Wu"<imjark@gmail.com>;
发送时间: 2020年6月17日(星期三) 中午1:55
收件人: "user-zh"<user-zh@flink.apache.org>;
主题: Re: 求助:FLINKSQL1.10实时统计累计UV
在 Flink 1.11 中,你可以尝试这样:
CREATE TABLE mysql (
time_str STRING,
uv BIGINT,
PRIMARY KEY (ts) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'myuv'
);
INSERT INTO mysql
SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')), COUNT(DISTINCT user_id)
FROM user_behavior;
On Wed, 17 Jun 2020 at 13:49, x <35907418@qq.com> wrote:
> 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV,
> sink表这个样式
> tm uv
> 2020/06/17 13:46:00 10000
> 2020/06/17 13:47:00 20000
> 2020/06/17 13:48:00 30000
>
>
> group by 日期的话,分钟如何获取
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"Benchao Li"<libenchao@apache.org&gt;;
> 发送时间:&nbsp;2020年6月17日(星期三) 中午11:46
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
>
>
>
> Hi,
> 我感觉这种场景可以有两种方式,
> 1. 可以直接用group by + mini batch
> 2. window聚合 + fast emit
>
> 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, 'yyyy-MM-dd')。
> 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要
> 用参数[2] 来打开。
>
> 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
> fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
> table.exec.emit.early-fire.enabled = true
> table.exec.emit.early-fire.delay = 60 s
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
>
> x <35907418@qq.com&gt; 于2020年6月17日周三 上午11:14写道:
>
> &gt; 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
> &gt; CREATE VIEW uv_per_10min AS
> &gt; SELECT&amp;nbsp;
> &gt; &amp;nbsp; MAX(DATE_FORMAT(proctime&amp;nbsp;, 'yyyy-MM-dd
> HH:mm:00'))&amp;nbsp;OVER w
> &gt; AS time_str,&amp;nbsp;
> &gt; &amp;nbsp; COUNT(DISTINCT user_id) OVER w AS uv
> &gt; FROM user_behavior
> &gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND
> &gt; CURRENT ROW);
> &gt;
> &gt;
> &gt; 想请教一下,应该如何处理?
> &gt; PARTITION BY DATE_FORMAT(rowtm, 'yyyy-MM-dd') 这样可以吗,另外状态应该如何清理?
> &gt; PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧
> &gt; 多谢
Re: 求助:FLINKSQL1.10实时统计累计UV
Posted by Jark Wu <im...@gmail.com>.
在 Flink 1.11 中,你可以尝试这样:
CREATE TABLE mysql (
time_str STRING,
uv BIGINT,
PRIMARY KEY (ts) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'myuv'
);
INSERT INTO mysql
SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')), COUNT(DISTINCT user_id)
FROM user_behavior;
On Wed, 17 Jun 2020 at 13:49, x <35...@qq.com> wrote:
> 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV,
> sink表这个样式
> tm uv
> 2020/06/17 13:46:00 10000
> 2020/06/17 13:47:00 20000
> 2020/06/17 13:48:00 30000
>
>
> group by 日期的话,分钟如何获取
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "Benchao Li"<libenchao@apache.org>;
> 发送时间: 2020年6月17日(星期三) 中午11:46
> 收件人: "user-zh"<user-zh@flink.apache.org>;
>
> 主题: Re: 求助:FLINKSQL1.10实时统计累计UV
>
>
>
> Hi,
> 我感觉这种场景可以有两种方式,
> 1. 可以直接用group by + mini batch
> 2. window聚合 + fast emit
>
> 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, 'yyyy-MM-dd')。
> 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要
> 用参数[2] 来打开。
>
> 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
> fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
> table.exec.emit.early-fire.enabled = true
> table.exec.emit.early-fire.delay = 60 s
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
>
> x <35907418@qq.com> 于2020年6月17日周三 上午11:14写道:
>
> > 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
> > CREATE VIEW uv_per_10min AS
> > SELECT&nbsp;
> > &nbsp; MAX(DATE_FORMAT(proctime&nbsp;, 'yyyy-MM-dd
> HH:mm:00'))&nbsp;OVER w
> > AS time_str,&nbsp;
> > &nbsp; COUNT(DISTINCT user_id) OVER w AS uv
> > FROM user_behavior
> > WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND
> > CURRENT ROW);
> >
> >
> > 想请教一下,应该如何处理?
> > PARTITION BY DATE_FORMAT(rowtm, 'yyyy-MM-dd') 这样可以吗,另外状态应该如何清理?
> > PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧
> > 多谢
回复: 求助:FLINKSQL1.10实时统计累计UV
Posted by x <35...@qq.com>.
感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV,
sink表这个样式
tm uv
2020/06/17 13:46:00 10000
2020/06/17 13:47:00 20000
2020/06/17 13:48:00 30000
group by 日期的话,分钟如何获取
------------------ 原始邮件 ------------------
发件人: "Benchao Li"<libenchao@apache.org>;
发送时间: 2020年6月17日(星期三) 中午11:46
收件人: "user-zh"<user-zh@flink.apache.org>;
主题: Re: 求助:FLINKSQL1.10实时统计累计UV
Hi,
我感觉这种场景可以有两种方式,
1. 可以直接用group by + mini batch
2. window聚合 + fast emit
对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, 'yyyy-MM-dd')。
这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要
用参数[2] 来打开。
对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
table.exec.emit.early-fire.enabled = true
table.exec.emit.early-fire.delay = 60 s
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
x <35907418@qq.com> 于2020年6月17日周三 上午11:14写道:
> 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
> CREATE VIEW uv_per_10min AS
> SELECT&nbsp;
> &nbsp; MAX(DATE_FORMAT(proctime&nbsp;, 'yyyy-MM-dd HH:mm:00'))&nbsp;OVER w
> AS time_str,&nbsp;
> &nbsp; COUNT(DISTINCT user_id) OVER w AS uv
> FROM user_behavior
> WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND
> CURRENT ROW);
>
>
> 想请教一下,应该如何处理?
> PARTITION BY DATE_FORMAT(rowtm, 'yyyy-MM-dd') 这样可以吗,另外状态应该如何清理?
> PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧
> 多谢
Re: 求助:FLINKSQL1.10实时统计累计UV
Posted by Jark Wu <im...@gmail.com>.
本超提的两个方案也是阿里内部解决这个问题最常用的方式,但是 1.10 会有 primary key 的限制,要等到 1.11 才行。
另外这两个方案在追数据时,都可能会有毛刺现象(有几分钟没有值,因为数据追太快,跳过了)。
On Wed, 17 Jun 2020 at 11:46, Benchao Li <li...@apache.org> wrote:
> Hi,
> 我感觉这种场景可以有两种方式,
> 1. 可以直接用group by + mini batch
> 2. window聚合 + fast emit
>
> 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, 'yyyy-MM-dd')。
> 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要
> 用参数[2] 来打开。
>
> 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
> fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
> table.exec.emit.early-fire.enabled = true
> table.exec.emit.early-fire.delay = 60 s
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
>
> x <35...@qq.com> 于2020年6月17日周三 上午11:14写道:
>
> > 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
> > CREATE VIEW uv_per_10min AS
> > SELECT
> > MAX(DATE_FORMAT(proctime , 'yyyy-MM-dd HH:mm:00')) OVER
> w
> > AS time_str,
> > COUNT(DISTINCT user_id) OVER w AS uv
> > FROM user_behavior
> > WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND
> > CURRENT ROW);
> >
> >
> > 想请教一下,应该如何处理?
> > PARTITION BY DATE_FORMAT(rowtm, 'yyyy-MM-dd') 这样可以吗,另外状态应该如何清理?
> > PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧
> > 多谢
>
Re: 求助:FLINKSQL1.10实时统计累计UV
Posted by Benchao Li <li...@apache.org>.
Hi,
我感觉这种场景可以有两种方式,
1. 可以直接用group by + mini batch
2. window聚合 + fast emit
对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, 'yyyy-MM-dd')。
这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要
用参数[2] 来打开。
对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
table.exec.emit.early-fire.enabled = true
table.exec.emit.early-fire.delay = 60 s
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
x <35...@qq.com> 于2020年6月17日周三 上午11:14写道:
> 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
> CREATE VIEW uv_per_10min AS
> SELECT
> MAX(DATE_FORMAT(proctime , 'yyyy-MM-dd HH:mm:00')) OVER w
> AS time_str,
> COUNT(DISTINCT user_id) OVER w AS uv
> FROM user_behavior
> WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND
> CURRENT ROW);
>
>
> 想请教一下,应该如何处理?
> PARTITION BY DATE_FORMAT(rowtm, 'yyyy-MM-dd') 这样可以吗,另外状态应该如何清理?
> PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧
> 多谢