You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ChangjiGuo (Jira)" <ji...@apache.org> on 2022/01/06 09:30:00 UTC
[jira] [Updated] (FLINK-25475) When windowAgg and groupAgg are included at the same time, there is no assigner generated but MiniBatch optimization is still used.
[ https://issues.apache.org/jira/browse/FLINK-25475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ChangjiGuo updated FLINK-25475:
-------------------------------
Affects Version/s: 1.14.2
> When windowAgg and groupAgg are included at the same time, there is no assigner generated but MiniBatch optimization is still used.
> -----------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-25475
> URL: https://issues.apache.org/jira/browse/FLINK-25475
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.14.2
> Reporter: ChangjiGuo
> Priority: Major
> Attachments: image-2021-12-29-16-04-50-211.png, image-2021-12-29-16-05-15-519.png
>
>
> If the relNode has both windowAgg and groupAgg, MiniBatchIntervalInferRule will not add StreamExecMiniBatchAssigner node, but MiniBatchGroupAggFunction or MiniBatchLocalGroupAggFunction or MiniBatchGlobalGroupAggFunction will still be generated when translated into transformation.
> It will only judge whether to enable minibacth.
> {code:java}
> val operator = if (isMiniBatchEnabled) {
> val aggFunction = new MiniBatchGroupAggFunction(
> aggsHandler,
> recordEqualiser,
> accTypes,
> inputRowType,
> inputCountIndex,
> generateUpdateBefore)
> new KeyedMapBundleOperator(
> aggFunction,
> AggregateUtil.createMiniBatchTrigger(tableConfig))
> } else {
> val aggFunction = new GroupAggFunction(
> tableConfig.getMinIdleStateRetentionTime,
> tableConfig.getMaxIdleStateRetentionTime,
> aggsHandler,
> recordEqualiser,
> accTypes,
> inputCountIndex,
> generateUpdateBefore)
> val operator = new KeyedProcessOperator[RowData, RowData, RowData](aggFunction)
> operator
> } {code}
> for example:
> before:
> !image-2021-12-29-16-04-50-211.png!
> after:
> !image-2021-12-29-16-05-15-519.png!
> The WatermarkAssigner will send watermark to downstream, and the finishBundle method will be called frequently, which does not match the expected result.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)