You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Shaoxuan Wang (JIRA)" <ji...@apache.org> on 2017/02/06 15:02:41 UTC

[jira] [Comment Edited] (FLINK-5564) User Defined Aggregates

    [ https://issues.apache.org/jira/browse/FLINK-5564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15854168#comment-15854168 ] 

Shaoxuan Wang edited comment on FLINK-5564 at 2/6/17 3:02 PM:
--------------------------------------------------------------

Hi all, as also discussed in the dev email thread for the FLIP11 over window design, when I work on refactoring the streaming query plan, I feel that we do not need to keep the non-incremental query plan for streaming aggregation, as all the streaming aggregation should be suitable for incremental aggregate (even for max, min and median). One can choose to accumulate all records at the same time when the window is completed. But it will still execute the accumulate method to update the accumulator state for each record. The way it executes accumulate function to accumulate each record already implies that the aggregation is incremental. Whether it is accumulated once at each record arrival (incremental) or accumulated all records when the window is completed (non-incremental), really does not matter in terms of the correctness and the complexity. On the other hand, the non-incremental approach will introduce CPU jitter and latency overhead, so I would like to propose to always apply incremental mode for all streaming aggregations. 



was (Author: shaoxuanwang):
Hi all, as also discussed in the dev email thread for the FLIP11 over window design, when I work on refactoring the streaming query plan, we do not need to keep the non-incremental query plan for streaming aggregation, as all the streaming aggregation should be suitable for incremental aggregate (even for max, min and median). One can choose to accumulate all records at the same time when the window is completed. But it will still execute the accumulate method to update the accumulator state for each record. The way it executes accumulate function to accumulate each record already implies that the aggregation is incremental. Whether it is accumulated once at each record arrival (incremental) or accumulated all records when the window is completed (non-incremental), really does not matter in terms of the correctness and the complexity. On the other hand, the non-incremental approach will introduce CPU jitter and latency overhead, so I would like to propose to always apply incremental mode for all streaming aggregations. 


> User Defined Aggregates
> -----------------------
>
>                 Key: FLINK-5564
>                 URL: https://issues.apache.org/jira/browse/FLINK-5564
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>            Reporter: Shaoxuan Wang
>            Assignee: Shaoxuan Wang
>
> User-defined aggregates would be a great addition to the Table API / SQL.
> The current aggregate interface is not well suited for the external users.  This issue proposes to redesign the aggregate such that we can expose an better external UDAGG interface to the users. The detailed design proposal can be found here: https://docs.google.com/document/d/19JXK8jLIi8IqV9yf7hOs_Oz67yXOypY7Uh5gIOK2r-U/edit
> Motivation:
> 1. The current aggregate interface is not very concise to the users. One needs to know the design details of the intermediate Row buffer before implements an Aggregate. Seven functions are needed even for a simple Count aggregate.
> 2. Another limitation of current aggregate function is that it can only be applied on one single column. There are many scenarios which require the aggregate function taking multiple columns as the inputs.
> 3. “Retraction” is not considered and covered in the current Aggregate.
> 4. It might be very good to have a local/global aggregate query plan optimization, which is very promising to optimize UDAGG performance in some scenarios.
> Proposed Changes:
> 1. Implement an aggregate dataStream API (Done by [FLINK-5582|https://issues.apache.org/jira/browse/FLINK-5582])
> 2. Update all the existing aggregates to use the new aggregate dataStream API
> 3. Provide a better User-Defined Aggregate interface
> 4. Add retraction support
> 5. Add local/global aggregate



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)