You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "ChangjiGuo (Jira)" <ji...@apache.org> on 2021/12/29 08:10:00 UTC

[jira] [Created] (FLINK-25475) When windowAgg and groupAgg are included at the same time, there is no assigner generated but MiniBatch optimization is still used.

ChangjiGuo created FLINK-25475:
----------------------------------

             Summary: When windowAgg and groupAgg are included at the same time, there is no assigner generated but MiniBatch optimization is still used.
                 Key: FLINK-25475
                 URL: https://issues.apache.org/jira/browse/FLINK-25475
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.14.0
            Reporter: ChangjiGuo
         Attachments: image-2021-12-29-16-04-50-211.png, image-2021-12-29-16-05-15-519.png

If the relNode has both windowAgg and groupAgg, MiniBatchIntervalInferRule will not add StreamExecMiniBatchAssigner node, but MiniBatchGroupAggFunction or MiniBatchLocalGroupAggFunction or MiniBatchGlobalGroupAggFunction will still be generated when translated into transformation.

It will only judge whether to enable minibacth.
{code:java}
val operator = if (isMiniBatchEnabled) {
  val aggFunction = new MiniBatchGroupAggFunction(
    aggsHandler,
    recordEqualiser,
    accTypes,
    inputRowType,
    inputCountIndex,
    generateUpdateBefore)

  new KeyedMapBundleOperator(
    aggFunction,
    AggregateUtil.createMiniBatchTrigger(tableConfig))
} else {
  val aggFunction = new GroupAggFunction(
    tableConfig.getMinIdleStateRetentionTime,
    tableConfig.getMaxIdleStateRetentionTime,
    aggsHandler,
    recordEqualiser,
    accTypes,
    inputCountIndex,
    generateUpdateBefore)

  val operator = new KeyedProcessOperator[RowData, RowData, RowData](aggFunction)
  operator
} {code}
for example:

before:

!image-2021-12-29-16-04-50-211.png!

after:
!image-2021-12-29-16-05-15-519.png!

The WatermarkAssigner will send watermark to downstream, and the finishBundle method will be called frequently, which does not match the expected result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)