You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jing <aj...@gmail.com> on 2021/12/23 08:11:46 UTC

Window Top N for Flink 1.12

Hi, Flink community,

Is there any existing code I can use to get the window top N with Flink
1.12? I saw the one possibility is to create a table and insert the
aggregated data to the table, then do top N like [1]. However, I cannot
make this approach work because I need to specify the connector for this
table and I may also need to create another kafka topic for this. Is there
any existing way to do the Window Top N with Flink 1.12?

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/sql/queries.html#top-n


Thanks,
Jing

Re: Window Top N for Flink 1.12

Posted by Jing Zhang <be...@gmail.com>.
Hi Jing,
Please try this way,
Only create one sink for final output, write the window aggregate and topN
in one query, write the result of topN into the final sink.

Best,
Jing Zhang


Jing <aj...@gmail.com> 于2021年12月24日周五 03:13写道:

> Hi Jing Zhang,
>
> Thanks for the reply! My current implementation is like this:
>
>
> tableEnv.executeSql(
>   "CREATE TABLE ItemDesc (item_id STRING, channel_id STRING, window_end
> BIGINT, num_select BIGINT) WITH ('connector' = 'kafka', 'scan.startup.mode'
> = 'latest-offset')"
> )
>
> tableEnv.executeSql("""
>         |INSERT INTO ItemDesc
>         |SELECT
>         |   item_id,
>         |   channel_id,
>         |   CAST(HOP_END(proctime, INTERVAL '15' SECOND, INTERVAL '60'
> SECOND) AS BIGINT) AS window_end,
>         |   COUNT(*) as num_select
>         |FROM mytable
>         |GROUP BY item_id, channel_id, HOP(proctime, INTERVAL '15' SECOND,
> INTERVAL '60' SECOND)
>           """.stripMargin)
>
> val result = tableEnv.sqlQuery("""
>     |SELECT roku_content_id, window_end, channel_id, num_select, row_num
>     |FROM (
>     |   SELECT *
>     |      ROW_NUMBER() OVER (PARTITION BY channel_id ORDER BY num_select
> DESC) as row_num
>     |   FROM ItemDesc)
>     |WHERE row_num <= 20
>     |""".stripMargin)
>
> But I got the error:
>
> org.apache.flink.table.api.ValidationException: Unable to create a sink
> for writing table 'default_catalog.default_database.ItemDesc'.
>
> The table ItemDesc is an intermediate table. If I put everything in a
> single query, that doesn't work. If I create a table like this:
>
> tableEnv.executeSql(
>   "CREATE TABLE ItemDesc (item_id STRING, channel_id STRING, window_end
> BIGINT, num_select BIGINT) "
> )
>
> This also doesn't work.
>
>
> Thanks,
> Jing
>
>
>
>
> On Thu, Dec 23, 2021 at 1:20 AM Jing Zhang <be...@gmail.com> wrote:
>
>> Hi Jing,
>> In fact, I agree with you to use TopN [2] instead of Window TopN[1] by normalizing
>> time into a unit with 5 minute, and add it to be one of partition keys.
>> Please note two points when use TopN
>> 1. the result is an update stream instead of append stream, which means
>> the result sent might be retracted later
>> 2. you could take care of state clean.
>>
>> However you said you meet with a problem when use TopN. I didn't
>> understand your question here. Would you please explain a little more?
>> > > I saw the one possibility is to create a table and insert the
>> aggregated data to the table, then do top N like [1]. However, I cannot
>> make this approach work because I need to specify the connector for this
>> table and I may also need to create another kafka topic for this.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-topn/
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/
>> topn/
>>
>> Jing Zhang <be...@gmail.com> 于2021年12月23日周四 17:04写道:
>>
>>> Hi Jing,
>>> I'm afraid there is no possible to Window TopN in SQL on 1.12 version
>>> because window TopN is introduced since 1.13.
>>>
>>> > I saw the one possibility is to create a table and insert the
>>> aggregated data to the table, then do top N like [1]. However, I cannot
>>> make this approach work because I need to specify the connector for this
>>> table and I may also need to create another kafka topic for this.
>>> I didn't understand you here.
>>> Do you mean you need a sink to store output data of TopN? However, you
>>> still need a sink to store the output even you use Window TopN.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-topn/
>>>
>>> Best,
>>> Jing Zhang
>>>
>>>
>>> Jing <aj...@gmail.com> 于2021年12月23日周四 16:12写道:
>>>
>>>> Hi, Flink community,
>>>>
>>>> Is there any existing code I can use to get the window top N with Flink
>>>> 1.12? I saw the one possibility is to create a table and insert the
>>>> aggregated data to the table, then do top N like [1]. However, I cannot
>>>> make this approach work because I need to specify the connector for this
>>>> table and I may also need to create another kafka topic for this. Is there
>>>> any existing way to do the Window Top N with Flink 1.12?
>>>>
>>>> [1]
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/sql/queries.html#top-n
>>>>
>>>>
>>>> Thanks,
>>>> Jing
>>>>
>>>

Re: Window Top N for Flink 1.12

Posted by Jing <aj...@gmail.com>.
Hi Jing Zhang,

Thanks for the reply! My current implementation is like this:


tableEnv.executeSql(
  "CREATE TABLE ItemDesc (item_id STRING, channel_id STRING, window_end
BIGINT, num_select BIGINT) WITH ('connector' = 'kafka', 'scan.startup.mode'
= 'latest-offset')"
)

tableEnv.executeSql("""
        |INSERT INTO ItemDesc
        |SELECT
        |   item_id,
        |   channel_id,
        |   CAST(HOP_END(proctime, INTERVAL '15' SECOND, INTERVAL '60'
SECOND) AS BIGINT) AS window_end,
        |   COUNT(*) as num_select
        |FROM mytable
        |GROUP BY item_id, channel_id, HOP(proctime, INTERVAL '15' SECOND,
INTERVAL '60' SECOND)
          """.stripMargin)

val result = tableEnv.sqlQuery("""
    |SELECT roku_content_id, window_end, channel_id, num_select, row_num
    |FROM (
    |   SELECT *
    |      ROW_NUMBER() OVER (PARTITION BY channel_id ORDER BY num_select
DESC) as row_num
    |   FROM ItemDesc)
    |WHERE row_num <= 20
    |""".stripMargin)

But I got the error:

org.apache.flink.table.api.ValidationException: Unable to create a sink for
writing table 'default_catalog.default_database.ItemDesc'.

The table ItemDesc is an intermediate table. If I put everything in a
single query, that doesn't work. If I create a table like this:

tableEnv.executeSql(
  "CREATE TABLE ItemDesc (item_id STRING, channel_id STRING, window_end
BIGINT, num_select BIGINT) "
)

This also doesn't work.


Thanks,
Jing




On Thu, Dec 23, 2021 at 1:20 AM Jing Zhang <be...@gmail.com> wrote:

> Hi Jing,
> In fact, I agree with you to use TopN [2] instead of Window TopN[1] by normalizing
> time into a unit with 5 minute, and add it to be one of partition keys.
> Please note two points when use TopN
> 1. the result is an update stream instead of append stream, which means
> the result sent might be retracted later
> 2. you could take care of state clean.
>
> However you said you meet with a problem when use TopN. I didn't
> understand your question here. Would you please explain a little more?
> > > I saw the one possibility is to create a table and insert the
> aggregated data to the table, then do top N like [1]. However, I cannot
> make this approach work because I need to specify the connector for this
> table and I may also need to create another kafka topic for this.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-topn/
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/
> topn/
>
> Jing Zhang <be...@gmail.com> 于2021年12月23日周四 17:04写道:
>
>> Hi Jing,
>> I'm afraid there is no possible to Window TopN in SQL on 1.12 version
>> because window TopN is introduced since 1.13.
>>
>> > I saw the one possibility is to create a table and insert the
>> aggregated data to the table, then do top N like [1]. However, I cannot
>> make this approach work because I need to specify the connector for this
>> table and I may also need to create another kafka topic for this.
>> I didn't understand you here.
>> Do you mean you need a sink to store output data of TopN? However, you
>> still need a sink to store the output even you use Window TopN.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-topn/
>>
>> Best,
>> Jing Zhang
>>
>>
>> Jing <aj...@gmail.com> 于2021年12月23日周四 16:12写道:
>>
>>> Hi, Flink community,
>>>
>>> Is there any existing code I can use to get the window top N with Flink
>>> 1.12? I saw the one possibility is to create a table and insert the
>>> aggregated data to the table, then do top N like [1]. However, I cannot
>>> make this approach work because I need to specify the connector for this
>>> table and I may also need to create another kafka topic for this. Is there
>>> any existing way to do the Window Top N with Flink 1.12?
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/sql/queries.html#top-n
>>>
>>>
>>> Thanks,
>>> Jing
>>>
>>

Re: Window Top N for Flink 1.12

Posted by Jing Zhang <be...@gmail.com>.
Hi Jing,
In fact, I agree with you to use TopN [2] instead of Window TopN[1] by
normalizing
time into a unit with 5 minute, and add it to be one of partition keys.
Please note two points when use TopN
1. the result is an update stream instead of append stream, which means the
result sent might be retracted later
2. you could take care of state clean.

However you said you meet with a problem when use TopN. I didn't understand
your question here. Would you please explain a little more?
> > I saw the one possibility is to create a table and insert the
aggregated data to the table, then do top N like [1]. However, I cannot
make this approach work because I need to specify the connector for this
table and I may also need to create another kafka topic for this.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-topn/
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/
topn/

Jing Zhang <be...@gmail.com> 于2021年12月23日周四 17:04写道:

> Hi Jing,
> I'm afraid there is no possible to Window TopN in SQL on 1.12 version
> because window TopN is introduced since 1.13.
>
> > I saw the one possibility is to create a table and insert the aggregated
> data to the table, then do top N like [1]. However, I cannot make this
> approach work because I need to specify the connector for this table and I
> may also need to create another kafka topic for this.
> I didn't understand you here.
> Do you mean you need a sink to store output data of TopN? However, you
> still need a sink to store the output even you use Window TopN.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-topn/
>
> Best,
> Jing Zhang
>
>
> Jing <aj...@gmail.com> 于2021年12月23日周四 16:12写道:
>
>> Hi, Flink community,
>>
>> Is there any existing code I can use to get the window top N with Flink
>> 1.12? I saw the one possibility is to create a table and insert the
>> aggregated data to the table, then do top N like [1]. However, I cannot
>> make this approach work because I need to specify the connector for this
>> table and I may also need to create another kafka topic for this. Is there
>> any existing way to do the Window Top N with Flink 1.12?
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/sql/queries.html#top-n
>>
>>
>> Thanks,
>> Jing
>>
>

Re: Window Top N for Flink 1.12

Posted by Jing Zhang <be...@gmail.com>.
Hi Jing,
I'm afraid there is no possible to Window TopN in SQL on 1.12 version
because window TopN is introduced since 1.13.

> I saw the one possibility is to create a table and insert the aggregated
data to the table, then do top N like [1]. However, I cannot make this
approach work because I need to specify the connector for this table and I
may also need to create another kafka topic for this.
I didn't understand you here.
Do you mean you need a sink to store output data of TopN? However, you
still need a sink to store the output even you use Window TopN.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-topn/

Best,
Jing Zhang


Jing <aj...@gmail.com> 于2021年12月23日周四 16:12写道:

> Hi, Flink community,
>
> Is there any existing code I can use to get the window top N with Flink
> 1.12? I saw the one possibility is to create a table and insert the
> aggregated data to the table, then do top N like [1]. However, I cannot
> make this approach work because I need to specify the connector for this
> table and I may also need to create another kafka topic for this. Is there
> any existing way to do the Window Top N with Flink 1.12?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/sql/queries.html#top-n
>
>
> Thanks,
> Jing
>