You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 董 加强 <fl...@outlook.com> on 2019/09/18 08:58:09 UTC
关于使用Flink计算TopN的问题
大家好:
最近需要使用Flink计算TopN碰到一些问题 不知道大家有没有遇到过 计算TopN的所使用的SQL语句是如下形式
create stream input table raw_log (
country STRING,
domain STRING,
flux LONG,
request LONG,
rowtime AS ROWTIME(request, "2 SECOND")
) USING kafka (
kafka.bootstrap.servers = "localhost:${actualConfig.kafkaPort}",
startingOffsets = earliest, subscribe = "input"
) ROW FORMAT JSON; create stream output table top_n_result USING kafka (
kafka.bootstrap.servers = "localhost:${actualConfig.kafkaPort}",
topic = "output"
) ROW FORMAT JSON TBLPROPERTIES("update-mode" = upsert); create view window_log as
select
TUMBLE_START(rowtime, INTERVAL '2' SECOND) as wStart,
country,
domain,
sum(flux) as flux
from
raw_log
group by
TUMBLE(rowtime, INTERVAL '2' SECOND),
country,
domain; insert into top_n_result
SELECT
*
FROM
(
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY wStart
ORDER BY
flux desc
) AS row_num
FROM
window_log
)
WHERE
row_num <= 10;
就是前面是一个基于事件时间的窗口计算逻辑后面跟着一个TopN的计算逻辑 跑在Flink 1.9的blink上的 在TopN计算上先按窗口开始时间做分区然后排序输出Top结果 这里就产生了
一个状态管理的问题 因为窗口计算是不断向前的 也就是将窗口开始时间作为分区键会导致状态不断增大 后续在测试过程中发现其底层是实现为RetractableTopNFunction 然后在这个实现中没有发现状态清理的逻辑 而在
AppendOnlyTopNFunction和UpdatableTopNFunction中存在状态清理的逻辑 为什么要这么实现? 能否在RetractableTopNFunction中实现状态清理? 并且保证状态安全被删除?
Re: 关于使用Flink计算TopN的问题
Posted by Jark Wu <im...@gmail.com>.
Hi,
多谢反馈。 这应该是一个 mistake。 我创建了一个 issue 去跟踪这个问题。 https://issues.apache.org/jira/browse/FLINK-14119 <https://issues.apache.org/jira/browse/FLINK-14119>
> 在 2019年9月18日,16:58,董 加强 <fl...@outlook.com> 写道:
>
> 大家好:
>
> 最近需要使用Flink计算TopN碰到一些问题 不知道大家有没有遇到过 计算TopN的所使用的SQL语句是如下形式
>
> create stream input table raw_log (
> country STRING,
> domain STRING,
> flux LONG,
> request LONG,
> rowtime AS ROWTIME(request, "2 SECOND")
> ) USING kafka (
> kafka.bootstrap.servers = "localhost:${actualConfig.kafkaPort}",
> startingOffsets = earliest, subscribe = "input"
> ) ROW FORMAT JSON; create stream output table top_n_result USING kafka (
> kafka.bootstrap.servers = "localhost:${actualConfig.kafkaPort}",
> topic = "output"
> ) ROW FORMAT JSON TBLPROPERTIES("update-mode" = upsert); create view window_log as
> select
> TUMBLE_START(rowtime, INTERVAL '2' SECOND) as wStart,
> country,
> domain,
> sum(flux) as flux
> from
> raw_log
> group by
> TUMBLE(rowtime, INTERVAL '2' SECOND),
> country,
> domain; insert into top_n_result
> SELECT
> *
> FROM
> (
> SELECT
> *,
> ROW_NUMBER() OVER (
> PARTITION BY wStart
> ORDER BY
> flux desc
> ) AS row_num
> FROM
> window_log
> )
> WHERE
> row_num <= 10;
>
> 就是前面是一个基于事件时间的窗口计算逻辑后面跟着一个TopN的计算逻辑 跑在Flink 1.9的blink上的 在TopN计算上先按窗口开始时间做分区然后排序输出Top结果 这里就产生了
>
> 一个状态管理的问题 因为窗口计算是不断向前的 也就是将窗口开始时间作为分区键会导致状态不断增大 后续在测试过程中发现其底层是实现为RetractableTopNFunction 然后在这个实现中没有发现状态清理的逻辑 而在
>
> AppendOnlyTopNFunction和UpdatableTopNFunction中存在状态清理的逻辑 为什么要这么实现? 能否在RetractableTopNFunction中实现状态清理? 并且保证状态安全被删除?