You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 111 <xi...@163.com> on 2020/04/07 05:07:19 UTC

Flink SQL 1.10中ROW_NUMBER的使用

HI,
我这边想要在滑动窗口之后做一个topN,于是就有下面的SQL:
-- 窗口聚合
create view window_v as
select
        member_id,
        category_id,
 HOP_END(ts, INTERVAL '10' SECOND, INTERVAL '30' MINUTE) as `time`,
        count(1) as c
from stream_t 
group by member_id, category_id, 
HOP(ts, INTERVAL '10' SECOND, INTERVAL '30' MINUTE);


-- 排序取top 20
SELECT
member_id,
        category_id,
        c,
        row_number() over (PARTITION BY member_id ORDER BY c) AS rn
FROM window_v;


但是执行异常:
Caused by: org.apache.flink.table.api.TableException: OVER windows' ordering in stream mode must be defined on a time attribute. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:195) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:56) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
我看官网的TOPN例子中,order by 后面也可以是long型。难道row_number不能作用在窗口结果上?
Best,Xinghalo

回复: Flink SQL 1.10中ROW_NUMBER的使用

Posted by 111 <xi...@163.com>.
Hi,
已经提交到JIRA,https://issues.apache.org/jira/browse/FLINK-17022
Best,
Xinghalo

回复: Flink SQL 1.10中ROW_NUMBER的使用

Posted by 111 <xi...@163.com>.
Hi,
好的,没问题。


Best,
xinghalo


| |
xinghalo
|
|
xinghalo@163.com
|
签名由网易邮箱大师定制


在2020年04月7日 14:28,Jark Wu<im...@gmail.com> 写道:
Hi Xinghalo,

看起来是个 codegen bug, 能帮忙在 JIRA 中建一个 issue 么?最好能附上您的例子。

Best,
Jark

On Tue, 7 Apr 2020 at 14:22, 111 <xi...@163.com> wrote:

Hi,
原来如此,是搭配使用的。


不过加上条件后,提示无法进行剪枝
Caused by: org.apache.flink.table.api.TableException: This calc has no
useful projection and no filter. It should be removed by CalcRemoveRule. at
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:176)
at
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:49)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:77)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)必须强制加上一个where
1=1才能走通。select * from ( SELECT member_id, category_id, c, row_number() over
(PARTITION BY member_id ORDER BY c) AS rn FROM window_v where 1=1 ) where
rn <= 5
Best,Xinghalo

Re: Flink SQL 1.10中ROW_NUMBER的使用

Posted by Jark Wu <im...@gmail.com>.
Hi Xinghalo,

看起来是个 codegen bug, 能帮忙在 JIRA 中建一个 issue 么?最好能附上您的例子。

Best,
Jark

On Tue, 7 Apr 2020 at 14:22, 111 <xi...@163.com> wrote:

> Hi,
> 原来如此,是搭配使用的。
>
>
> 不过加上条件后,提示无法进行剪枝
> Caused by: org.apache.flink.table.api.TableException: This calc has no
> useful projection and no filter. It should be removed by CalcRemoveRule. at
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:176)
> at
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:49)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:77)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)必须强制加上一个where
> 1=1才能走通。select * from ( SELECT member_id, category_id, c, row_number() over
> (PARTITION BY member_id ORDER BY c) AS rn FROM window_v where 1=1 ) where
> rn <= 5
> Best,Xinghalo

回复: Flink SQL 1.10中ROW_NUMBER的使用

Posted by 111 <xi...@163.com>.
Hi,
原来如此,是搭配使用的。


不过加上条件后,提示无法进行剪枝
Caused by: org.apache.flink.table.api.TableException: This calc has no useful projection and no filter. It should be removed by CalcRemoveRule. at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:176) at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:49) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:77) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)必须强制加上一个where 1=1才能走通。select * from ( SELECT member_id, category_id, c, row_number() over (PARTITION BY member_id ORDER BY c) AS rn FROM window_v where 1=1 ) where rn <= 5
Best,Xinghalo

Re: Flink SQL 1.10中ROW_NUMBER的使用

Posted by Jark Wu <im...@gmail.com>.
Hi,

你的 topn 只做了排序,没做前 n 名的过滤,加上 where 条件再试下。

Select * from (
SELECT
member_id,
     category_id,
     c,
     row_number() over (PARTITION BY member_id ORDER BY c) AS rn
FROM window_vl
) where rn <= 20;

Best,
Jark

On Tue, 7 Apr 2020 at 14:01, 111 <xi...@163.com> wrote:

> Hi,
>
>
> 这里不太理解,我是想取每个用户在某个窗口内的topn分类,正常在batch模式下的语法是:
> row_number() over (PARTITION BY member_id ORDER BY c) AS rn
> 得到的就是每个用户下category按照对应数量的排序结果。
>
>
> 如果我这里使用HOP_PROCTIME得到了time属性,
> row_number() over (PARTITION BY member_id ORDER BY time) AS rn
> 这样的结果貌似是每个用户按照滑动时间窗口排序,并不是每个滑动窗口下按照c来排序。
>
>
> Best,
> Xinghalo

回复: Flink SQL 1.10中ROW_NUMBER的使用

Posted by 111 <xi...@163.com>.
Hi,


这里不太理解,我是想取每个用户在某个窗口内的topn分类,正常在batch模式下的语法是:
row_number() over (PARTITION BY member_id ORDER BY c) AS rn
得到的就是每个用户下category按照对应数量的排序结果。


如果我这里使用HOP_PROCTIME得到了time属性,
row_number() over (PARTITION BY member_id ORDER BY time) AS rn
这样的结果貌似是每个用户按照滑动时间窗口排序,并不是每个滑动窗口下按照c来排序。


Best,
Xinghalo

Re: Flink SQL 1.10中ROW_NUMBER的使用

Posted by Benchao Li <li...@gmail.com>.
这个是因为你的over window的 order by字段不是具有时间属性的字段。
而之所以两个query串起来执行不成功,是因为第一个hop window之后,时间属性就丢掉了。
关于如何保持时间属性,可以参考文档[1],具体到你这里,就是用HOP_ROWTIME或者HOP_PROCTIME,
具体用哪个取决于你的ts的类型。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#time-attributes

111 <xi...@163.com> 于2020年4月7日周二 下午1:07写道:

> HI,
> 我这边想要在滑动窗口之后做一个topN,于是就有下面的SQL:
> -- 窗口聚合
> create view window_v as
> select
>         member_id,
>         category_id,
>  HOP_END(ts, INTERVAL '10' SECOND, INTERVAL '30' MINUTE) as `time`,
>         count(1) as c
> from stream_t
> group by member_id, category_id,
> HOP(ts, INTERVAL '10' SECOND, INTERVAL '30' MINUTE);
>
>
> -- 排序取top 20
> SELECT
> member_id,
>         category_id,
>         c,
>         row_number() over (PARTITION BY member_id ORDER BY c) AS rn
> FROM window_v;
>
>
> 但是执行异常:
> Caused by: org.apache.flink.table.api.TableException: OVER windows'
> ordering in stream mode must be defined on a time attribute. at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:195)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:56)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> 我看官网的TOPN例子中,order by 后面也可以是long型。难道row_number不能作用在窗口结果上?
> Best,Xinghalo



-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn