You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by wang <24...@163.com> on 2022/08/07 14:06:29 UTC

Does flink sql support UDTAGG

Hi dear engineers,


One small question:  does flink sql support UDTAGG? (user-defined table aggregate function), seems only supported in flink table api? If not supported in flink sql, how can I define an aggregated udf which could output multiple rows to kafka.


Thanks for your help!




Regards,
Hunk

Re: Re:Does flink sql support UDTAGG

Posted by Weihua Hu <hu...@gmail.com>.
Hi, wang

Maybe you can take a look at
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#create-function

Best,
Weihua


On Mon, Aug 8, 2022 at 3:52 PM wang <24...@163.com> wrote:

>
>
>
> Hi,
>
>
> Thanks for your response,  I guess what I need should be this one
> (UDTAGG):
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#table-aggregate-functions
> As I want multiple rows as aggregate output. So my question: can we use
> UDTAGG in flink SQL?.  If so, is there some guide of how to use UDTAGG in
> flink SQL?  As there are only flink table api instructions of UDTAGG  in
> the page above.
>
>
>
>
> Thanks,
> Hunk
>
>
>
>
>
>
>
>
> At 2022-08-08 10:56:22, "Xuyang" <xy...@163.com> wrote:
> >Hi, what you want is UDAF? Please check whether this[1] is meet your
> requirement.
> >
> >[1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#aggregate-functions
> >
> >
> >
> >在 2022-08-07 22:06:29,"wang" <24...@163.com> 写道:
> >
> >Hi dear engineers,
> >
> >
> >One small question:  does flink sql support UDTAGG? (user-defined table
> aggregate function), seems only supported in flink table api? If not
> supported in flink sql, how can I define an aggregated udf which could
> output multiple rows to kafka.
> >
> >
> >Thanks for your help!
> >
> >
> >
> >
> >Regards,
> >Hunk
>

Re:Re:Does flink sql support UDTAGG

Posted by wang <24...@163.com>.


Hi,


Thanks for your response,  I guess what I need should be this one (UDTAGG): https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#table-aggregate-functions
As I want multiple rows as aggregate output. So my question: can we use UDTAGG in flink SQL?.  If so, is there some guide of how to use UDTAGG in flink SQL?  As there are only flink table api instructions of UDTAGG  in the page above.




Thanks,
Hunk








At 2022-08-08 10:56:22, "Xuyang" <xy...@163.com> wrote:
>Hi, what you want is UDAF? Please check whether this[1] is meet your requirement.
>
>[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#aggregate-functions
>
>
>
>在 2022-08-07 22:06:29,"wang" <24...@163.com> 写道:
>
>Hi dear engineers,
>
>
>One small question:  does flink sql support UDTAGG? (user-defined table aggregate function), seems only supported in flink table api? If not supported in flink sql, how can I define an aggregated udf which could output multiple rows to kafka.
>
>
>Thanks for your help!
>
>
>
>
>Regards,
>Hunk

Re:Does flink sql support UDTAGG

Posted by Xuyang <xy...@163.com>.
Hi, what you want is UDAF? Please check whether this[1] is meet your requirement.

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#aggregate-functions



在 2022-08-07 22:06:29,"wang" <24...@163.com> 写道:

Hi dear engineers,


One small question:  does flink sql support UDTAGG? (user-defined table aggregate function), seems only supported in flink table api? If not supported in flink sql, how can I define an aggregated udf which could output multiple rows to kafka.


Thanks for your help!




Regards,
Hunk

Re:Does flink sql support UDTAGG

Posted by Xuyang <xy...@163.com>.
Hi, what you want is UDAF? Please check whether this[1] is meet your requirement.

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#aggregate-functions



在 2022-08-07 22:06:29,"wang" <24...@163.com> 写道:

Hi dear engineers,


One small question:  does flink sql support UDTAGG? (user-defined table aggregate function), seems only supported in flink table api? If not supported in flink sql, how can I define an aggregated udf which could output multiple rows to kafka.


Thanks for your help!




Regards,
Hunk

Re: Whether Flink SQL window operations support "Allow Lateness and SideOutput"?

Posted by Weihua Hu <hu...@gmail.com>.
Hi,

Maybe you can use CURRENT_WATERMARK()[1]  to handle some late data.


[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/


Best,
Weihua


On Tue, Feb 21, 2023 at 1:46 PM wang <24...@163.com> wrote:

> Hi dear engineers,
>
>   One question as title: Whether Flink SQL window operations support
> "Allow Lateness and SideOutput"?
>
>   Just as supported in Datastream api (allowedLateness
> and sideOutputLateData) like:
>
>     SingleOutputStreamOperator<> sumStream = dataStream.keyBy()
> .timeWindow()
>                                                                .
> allowedLateness(Time.minutes(1))
>                                                                .
> sideOutputLateData(outputTag)
>                                                                .sum();
>
> Thanks && Regards,
> Hunk
>
>

Re: Whether Flink SQL window operations support "Allow Lateness and SideOutput"?

Posted by Weihua Hu <hu...@gmail.com>.
Hi,

Maybe you can use CURRENT_WATERMARK()[1]  to handle some late data.


[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/


Best,
Weihua


On Tue, Feb 21, 2023 at 1:46 PM wang <24...@163.com> wrote:

> Hi dear engineers,
>
>   One question as title: Whether Flink SQL window operations support
> "Allow Lateness and SideOutput"?
>
>   Just as supported in Datastream api (allowedLateness
> and sideOutputLateData) like:
>
>     SingleOutputStreamOperator<> sumStream = dataStream.keyBy()
> .timeWindow()
>                                                                .
> allowedLateness(Time.minutes(1))
>                                                                .
> sideOutputLateData(outputTag)
>                                                                .sum();
>
> Thanks && Regards,
> Hunk
>
>

Whether Flink SQL window operations support "Allow Lateness and SideOutput"?

Posted by wang <24...@163.com>.
Hi dear engineers,


  One question as title: Whether Flink SQL window operations support "Allow Lateness and SideOutput"?


  Just as supported in Datastream api (allowedLateness and sideOutputLateData) like:


    SingleOutputStreamOperator<>sumStream = dataStream.keyBy().timeWindow()
                                                               .allowedLateness(Time.minutes(1)) 
                                                               .sideOutputLateData(outputTag)
                                                               .sum();


Thanks && Regards,
Hunk


Re: Watermark generating mechanism in Flink SQL

Posted by Matthias Pohl <ma...@aiven.io.INVALID>.
Hi Hunk,
there is documentation about watermarking in FlinkSQL [1]. There is also a
FlinkSQL cookbook entry about watermarking [2]. Essentially, you define the
watermark strategy in your CREATE TABLE statement and specify the lateness
for a given event (not the period in which watermarks are automatically
generated!). You have to apply the `WATERMARK FOR` phrase on a column that
is declared as a time attribute [3]. Watermarks are based on event time,
i.e. based on an event being processed that provides the event time. Your
idea of generating them "every 5 seconds" does not work out of the box
because a watermark wouldn't be generated if the source idles for more than
5 seconds (in case of your specific example). Sending periodic dummy events
extrapolating the current event time would be a way to work around this
issue. Keep in mind that mixing processing time (what you would do if you
create a watermark based on the system's current time rather than relying
on events) and event time is usually not advised. I hope that helps.

Best,
Matthias

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/#watermark
[2]
https://github.com/ververica/flink-sql-cookbook/blob/main/aggregations-and-analytics/02_watermarks/02_watermarks.md
[3]
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/concepts/time_attributes/#event-time

On Tue, Oct 18, 2022 at 5:32 AM wang <24...@163.com> wrote:

> Hi dear engineers,
>
> I have one question about watermark generating mechanism in Flink SQL.
> There are two mechanisms called *Periodic Watermarks* and *Punctuated
> Watermarks, *I want to use* Periodic Watermarks* with interval 5 seconds
> (meaning watermarks will be generated every 5 seconds), how should I set in
> Flink sql? thanks in advance!
>
> Regards,
> Hunk
>

Whether Flink SQL window operations support "Allow Lateness and SideOutput"?

Posted by wang <24...@163.com>.
Hi dear engineers,


  One question as title: Whether Flink SQL window operations support "Allow Lateness and SideOutput"?


  Just as supported in Datastream api (allowedLateness and sideOutputLateData) like:


    SingleOutputStreamOperator<>sumStream = dataStream.keyBy().timeWindow()
                                                               .allowedLateness(Time.minutes(1)) 
                                                               .sideOutputLateData(outputTag)
                                                               .sum();


Thanks && Regards,
Hunk


Re: Watermark generating mechanism in Flink SQL

Posted by Matthias Pohl via user <us...@flink.apache.org>.
Hi Hunk,
there is documentation about watermarking in FlinkSQL [1]. There is also a
FlinkSQL cookbook entry about watermarking [2]. Essentially, you define the
watermark strategy in your CREATE TABLE statement and specify the lateness
for a given event (not the period in which watermarks are automatically
generated!). You have to apply the `WATERMARK FOR` phrase on a column that
is declared as a time attribute [3]. Watermarks are based on event time,
i.e. based on an event being processed that provides the event time. Your
idea of generating them "every 5 seconds" does not work out of the box
because a watermark wouldn't be generated if the source idles for more than
5 seconds (in case of your specific example). Sending periodic dummy events
extrapolating the current event time would be a way to work around this
issue. Keep in mind that mixing processing time (what you would do if you
create a watermark based on the system's current time rather than relying
on events) and event time is usually not advised. I hope that helps.

Best,
Matthias

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/#watermark
[2]
https://github.com/ververica/flink-sql-cookbook/blob/main/aggregations-and-analytics/02_watermarks/02_watermarks.md
[3]
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/concepts/time_attributes/#event-time

On Tue, Oct 18, 2022 at 5:32 AM wang <24...@163.com> wrote:

> Hi dear engineers,
>
> I have one question about watermark generating mechanism in Flink SQL.
> There are two mechanisms called *Periodic Watermarks* and *Punctuated
> Watermarks, *I want to use* Periodic Watermarks* with interval 5 seconds
> (meaning watermarks will be generated every 5 seconds), how should I set in
> Flink sql? thanks in advance!
>
> Regards,
> Hunk
>

Watermark generating mechanism in Flink SQL

Posted by wang <24...@163.com>.
Hi dear engineers,


I have one question about watermark generating mechanism in Flink SQL.  There are two mechanisms called Periodic Watermarks and Punctuated Watermarks, I want to use Periodic Watermarks with interval 5 seconds (meaning watermarks will be generated every 5 seconds), how should I set in Flink sql? thanks in advance!


Regards,
Hunk

Watermark generating mechanism in Flink SQL

Posted by wang <24...@163.com>.
Hi dear engineers,


I have one question about watermark generating mechanism in Flink SQL.  There are two mechanisms called Periodic Watermarks and Punctuated Watermarks, I want to use Periodic Watermarks with interval 5 seconds (meaning watermarks will be generated every 5 seconds), how should I set in Flink sql? thanks in advance!


Regards,
Hunk