You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dian Fu (Jira)" <ji...@apache.org> on 2022/05/26 08:39:00 UTC

[jira] [Closed] (FLINK-27776) Throws exception when udaf used in sliding window does not implement merge method in PyFlink

     [ https://issues.apache.org/jira/browse/FLINK-27776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dian Fu closed FLINK-27776.
---------------------------
    Fix Version/s: 1.16.0
                   1.14.5
                   1.15.1
       Resolution: Fixed

Merged to:
- master via 51eb02e92287eff3493d4c999390d1d525f3f9d7
- release-1.15 via 9708f5767cd3983527efc26d7752a8e3cda5465d
- release-1.14 via 07fc8c40f18ed86b6adfe420bcef83da5fd022e2

> Throws exception when udaf used in sliding window does not implement merge method in PyFlink
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-27776
>                 URL: https://issues.apache.org/jira/browse/FLINK-27776
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / Python
>    Affects Versions: 1.15.0, 1.13.6, 1.14.4
>            Reporter: Huang Xingbo
>            Assignee: Huang Xingbo
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.16.0, 1.14.5, 1.15.1
>
>
> We use the pane state to optimize the result of calculating the window state, which requires udaf to implement the merge method. However, due to the lack of detection of whether the merge method of udaf is implemented, the user's output result did not meet his expectations and there is no exception. Below is an example of a UDAF that implements the merge method:
> {code:python}
> class SumAggregateFunction(AggregateFunction):
>     def get_value(self, accumulator):
>         return accumulator[0]
>     def create_accumulator(self):
>         return [0]
>     def accumulate(self, accumulator, *args):
>         accumulator[0] = accumulator[0] + args[0]
>     def retract(self, accumulator, *args):
>         accumulator[0] = accumulator[0] - args[0]
>     def merge(self, accumulator, accumulators):
>         for other_acc in accumulators:
>             accumulator[0] = accumulator[0] + other_acc[0]
>     def get_accumulator_type(self):
>         return DataTypes.ARRAY(DataTypes.BIGINT())
>     def get_result_type(self):
>         return DataTypes.BIGINT()
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)