You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/05 10:19:04 UTC

[GitHub] [flink] wenlong88 commented on a change in pull request #14478: [FLINK-20737][table-planner-blink] Separate the implementation of stream group aggregate nodes

wenlong88 commented on a change in pull request #14478:
URL: https://github.com/apache/flink/pull/14478#discussion_r551806728



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
+import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.aggregate.MiniBatchGlobalGroupAggFunction;
+import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
+import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.tools.RelBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+/** Stream {@link ExecNode} for unbounded global group aggregate. */
+public class StreamExecGlobalGroupAggregate extends ExecNodeBase<RowData>
+        implements StreamExecNode<RowData> {
+    private static final Logger LOG = LoggerFactory.getLogger(StreamExecGlobalGroupAggregate.class);
+
+    private final int[] grouping;
+    private final AggregateCall[] aggCalls;
+    /** Each element indicates whether the corresponding agg call needs `retract` method. */
+    private final boolean[] aggCallNeedRetractions;
+    /** The input row type of this node's local agg. */
+    private final RowType localAggInputRowType;
+    /** Whether this node will generate UPDATE_BEFORE messages. */
+    private final boolean generateUpdateBefore;
+    /** Whether this node consumes retraction messages. */
+    private final boolean needRetraction;
+
+    public StreamExecGlobalGroupAggregate(
+            int[] grouping,
+            AggregateCall[] aggCalls,
+            boolean[] aggCallNeedRetractions,
+            RowType localAggInputRowType,
+            boolean generateUpdateBefore,
+            boolean needRetraction,
+            ExecEdge inputEdge,
+            RowType outputType,
+            String description) {
+        super(Collections.singletonList(inputEdge), outputType, description);
+        this.grouping = grouping;
+        this.aggCalls = aggCalls;
+        this.aggCallNeedRetractions = aggCallNeedRetractions;
+        this.localAggInputRowType = localAggInputRowType;
+        this.generateUpdateBefore = generateUpdateBefore;
+        this.needRetraction = needRetraction;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
+        final TableConfig tableConfig = planner.getTableConfig();
+
+        if (grouping.length > 0 && tableConfig.getMinIdleStateRetentionTime() < 0) {
+            LOG.warn(
+                    "No state retention interval configured for a query which accumulates state. "
+                            + "Please provide a query configuration with valid retention interval to prevent excessive "
+                            + "state size. You may specify a retention time of 0 to not clean up the state.");
+        }
+
+        final ExecNode<RowData> inputNode = (ExecNode<RowData>) getInputNodes().get(0);
+        final Transformation<RowData> inputTransform = inputNode.translateToPlan(planner);
+        final RowType inputRowType = (RowType) inputNode.getOutputType();
+
+        final AggregateInfoList localAggInfoList =
+                AggregateUtil.transformToStreamAggregateInfoList(
+                        localAggInputRowType,
+                        JavaScalaConversionUtil.toScala(Arrays.asList(aggCalls)),
+                        aggCallNeedRetractions,
+                        needRetraction,
+                        false,
+                        true);
+        final AggregateInfoList globalAggInfoList =
+                AggregateUtil.transformToStreamAggregateInfoList(
+                        localAggInputRowType,
+                        JavaScalaConversionUtil.toScala(Arrays.asList(aggCalls)),
+                        aggCallNeedRetractions,
+                        needRetraction,
+                        true,
+                        true);
+
+        final GeneratedAggsHandleFunction localAggsHandler =
+                generateAggsHandler(
+                        "LocalGroupAggsHandler",
+                        localAggInfoList,
+                        grouping.length,
+                        localAggInfoList.getAccTypes(),
+                        tableConfig,
+                        planner.getRelBuilder());
+
+        final GeneratedAggsHandleFunction globalAggsHandler =
+                generateAggsHandler(
+                        "GlobalGroupAggsHandler",
+                        globalAggInfoList,
+                        0,
+                        localAggInfoList.getAccTypes(),
+                        tableConfig,
+                        planner.getRelBuilder());
+
+        final int indexOfCountStar = globalAggInfoList.getIndexOfCountStar();
+        final LogicalType[] globalAccTypes =
+                Arrays.stream(globalAggInfoList.getAccTypes())
+                        .map(LogicalTypeDataTypeConverter::fromDataTypeToLogicalType)
+                        .toArray(LogicalType[]::new);
+        final LogicalType[] globalAggValueTypes =
+                Arrays.stream(globalAggInfoList.getActualValueTypes())
+                        .map(LogicalTypeDataTypeConverter::fromDataTypeToLogicalType)
+                        .toArray(LogicalType[]::new);
+        final GeneratedRecordEqualiser recordEqualiser =
+                new EqualiserCodeGenerator(globalAggValueTypes)
+                        .generateRecordEqualiser("GroupAggValueEqualiser");
+
+        final OneInputStreamOperator<RowData, RowData> operator;
+        final boolean isMiniBatchEnabled =
+                tableConfig
+                        .getConfiguration()
+                        .getBoolean(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED);
+        if (isMiniBatchEnabled) {
+            MiniBatchGlobalGroupAggFunction aggFunction =
+                    new MiniBatchGlobalGroupAggFunction(
+                            localAggsHandler,
+                            globalAggsHandler,
+                            recordEqualiser,
+                            globalAccTypes,
+                            indexOfCountStar,
+                            generateUpdateBefore,
+                            tableConfig.getIdleStateRetention().toMillis());
+
+            operator =
+                    new KeyedMapBundleOperator<>(
+                            aggFunction, AggregateUtil.createMiniBatchTrigger(tableConfig));
+        } else {
+            throw new TableException("Local-Global optimization is only worked in miniBatch mode");
+        }
+
+        final RowDataKeySelector selector =
+                KeySelectorUtil.getRowDataSelector(grouping, InternalTypeInfo.of(inputRowType));

Review comment:
       move to the place used below

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
+import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.aggregate.MiniBatchGlobalGroupAggFunction;
+import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
+import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.tools.RelBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+/** Stream {@link ExecNode} for unbounded global group aggregate. */
+public class StreamExecGlobalGroupAggregate extends ExecNodeBase<RowData>
+        implements StreamExecNode<RowData> {
+    private static final Logger LOG = LoggerFactory.getLogger(StreamExecGlobalGroupAggregate.class);
+
+    private final int[] grouping;
+    private final AggregateCall[] aggCalls;
+    /** Each element indicates whether the corresponding agg call needs `retract` method. */
+    private final boolean[] aggCallNeedRetractions;
+    /** The input row type of this node's local agg. */
+    private final RowType localAggInputRowType;
+    /** Whether this node will generate UPDATE_BEFORE messages. */
+    private final boolean generateUpdateBefore;
+    /** Whether this node consumes retraction messages. */
+    private final boolean needRetraction;
+
+    public StreamExecGlobalGroupAggregate(
+            int[] grouping,
+            AggregateCall[] aggCalls,
+            boolean[] aggCallNeedRetractions,
+            RowType localAggInputRowType,
+            boolean generateUpdateBefore,
+            boolean needRetraction,
+            ExecEdge inputEdge,
+            RowType outputType,
+            String description) {
+        super(Collections.singletonList(inputEdge), outputType, description);
+        this.grouping = grouping;
+        this.aggCalls = aggCalls;
+        this.aggCallNeedRetractions = aggCallNeedRetractions;
+        this.localAggInputRowType = localAggInputRowType;
+        this.generateUpdateBefore = generateUpdateBefore;
+        this.needRetraction = needRetraction;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
+        final TableConfig tableConfig = planner.getTableConfig();
+
+        if (grouping.length > 0 && tableConfig.getMinIdleStateRetentionTime() < 0) {
+            LOG.warn(
+                    "No state retention interval configured for a query which accumulates state. "
+                            + "Please provide a query configuration with valid retention interval to prevent excessive "
+                            + "state size. You may specify a retention time of 0 to not clean up the state.");
+        }
+
+        final ExecNode<RowData> inputNode = (ExecNode<RowData>) getInputNodes().get(0);
+        final Transformation<RowData> inputTransform = inputNode.translateToPlan(planner);
+        final RowType inputRowType = (RowType) inputNode.getOutputType();
+
+        final AggregateInfoList localAggInfoList =
+                AggregateUtil.transformToStreamAggregateInfoList(
+                        localAggInputRowType,
+                        JavaScalaConversionUtil.toScala(Arrays.asList(aggCalls)),
+                        aggCallNeedRetractions,
+                        needRetraction,
+                        false,
+                        true);
+        final AggregateInfoList globalAggInfoList =
+                AggregateUtil.transformToStreamAggregateInfoList(
+                        localAggInputRowType,
+                        JavaScalaConversionUtil.toScala(Arrays.asList(aggCalls)),
+                        aggCallNeedRetractions,
+                        needRetraction,
+                        true,
+                        true);
+
+        final GeneratedAggsHandleFunction localAggsHandler =
+                generateAggsHandler(
+                        "LocalGroupAggsHandler",
+                        localAggInfoList,
+                        grouping.length,
+                        localAggInfoList.getAccTypes(),
+                        tableConfig,
+                        planner.getRelBuilder());
+
+        final GeneratedAggsHandleFunction globalAggsHandler =
+                generateAggsHandler(
+                        "GlobalGroupAggsHandler",
+                        globalAggInfoList,
+                        0,
+                        localAggInfoList.getAccTypes(),
+                        tableConfig,
+                        planner.getRelBuilder());
+
+        final int indexOfCountStar = globalAggInfoList.getIndexOfCountStar();
+        final LogicalType[] globalAccTypes =
+                Arrays.stream(globalAggInfoList.getAccTypes())
+                        .map(LogicalTypeDataTypeConverter::fromDataTypeToLogicalType)
+                        .toArray(LogicalType[]::new);
+        final LogicalType[] globalAggValueTypes =
+                Arrays.stream(globalAggInfoList.getActualValueTypes())
+                        .map(LogicalTypeDataTypeConverter::fromDataTypeToLogicalType)
+                        .toArray(LogicalType[]::new);
+        final GeneratedRecordEqualiser recordEqualiser =
+                new EqualiserCodeGenerator(globalAggValueTypes)
+                        .generateRecordEqualiser("GroupAggValueEqualiser");
+
+        final OneInputStreamOperator<RowData, RowData> operator;
+        final boolean isMiniBatchEnabled =
+                tableConfig
+                        .getConfiguration()
+                        .getBoolean(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED);
+        if (isMiniBatchEnabled) {
+            MiniBatchGlobalGroupAggFunction aggFunction =
+                    new MiniBatchGlobalGroupAggFunction(
+                            localAggsHandler,
+                            globalAggsHandler,
+                            recordEqualiser,
+                            globalAccTypes,
+                            indexOfCountStar,
+                            generateUpdateBefore,
+                            tableConfig.getIdleStateRetention().toMillis());
+
+            operator =
+                    new KeyedMapBundleOperator<>(
+                            aggFunction, AggregateUtil.createMiniBatchTrigger(tableConfig));
+        } else {
+            throw new TableException("Local-Global optimization is only worked in miniBatch mode");
+        }
+
+        final RowDataKeySelector selector =
+                KeySelectorUtil.getRowDataSelector(grouping, InternalTypeInfo.of(inputRowType));
+
+        // partitioned aggregation
+        final OneInputTransformation<RowData, RowData> transform =
+                new OneInputTransformation<>(
+                        inputTransform,
+                        getDesc(),
+                        operator,
+                        InternalTypeInfo.of(getOutputType()),
+                        inputTransform.getParallelism());
+
+        if (inputsContainSingleton()) {
+            transform.setParallelism(1);
+            transform.setMaxParallelism(1);
+        }
+
+        // set KeyType and Selector for state
+        transform.setStateKeySelector(selector);
+        transform.setStateKeyType(selector.getProducedType());
+
+        return transform;
+    }
+
+    private GeneratedAggsHandleFunction generateAggsHandler(
+            String name,
+            AggregateInfoList aggInfoList,
+            int mergedAccOffset,
+            DataType[] mergedAccExternalTypes,
+            TableConfig config,
+            RelBuilder relBuilder) {
+
+        // For local aggregate, the result will be buffered, so copyInputField is true.
+        // For global aggregate, result will be put into state, then not need copy
+        // but this global aggregate result will be put into a buffered map first,
+        // then multiput to state, so copyInputField is true.
+        AggsHandlerCodeGenerator generator =
+                new AggsHandlerCodeGenerator(
+                        new CodeGeneratorContext(config),
+                        relBuilder,
+                        JavaScalaConversionUtil.toScala(localAggInputRowType.getChildren()),
+                        true);
+
+        return generator
+                .needAccumulate()

Review comment:
       I think we don't need accumulate here?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.aggregate.MiniBatchIncrementalGroupAggFunction;
+import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+/** Stream {@link ExecNode} for unbounded incremental group aggregate. */
+public class StreamExecIncrementalGroupAggregate extends ExecNodeBase<RowData>
+        implements StreamExecNode<RowData> {
+
+    /** The partial agg's grouping. */
+    private final int[] partialAggGrouping;
+    /** The final agg's grouping. */
+    private final int[] finalAggGrouping;
+    /** The partial agg's original agg calls. */
+    private final AggregateCall[] partialOriginalAggCalls;
+    /** Each element indicates whether the corresponding agg call needs `retract` method. */
+    private final boolean[] partialAggCallNeedRetractions;
+    /** The input row type of this node's partial local agg. */
+    private final RowType partialLocalAggInputRowType;
+    /** Whether this node consumes retraction messages. */
+    private final boolean partialAggNeedRetraction;
+
+    public StreamExecIncrementalGroupAggregate(
+            int[] partialAggGrouping,
+            int[] finalAggGrouping,
+            AggregateCall[] partialOriginalAggCalls,
+            boolean[] partialAggCallNeedRetractions,
+            RowType partialLocalAggInputRowType,
+            boolean partialAggNeedRetraction,
+            ExecEdge inputEdge,
+            RowType outputType,
+            String description) {
+        super(Collections.singletonList(inputEdge), outputType, description);
+        this.partialAggGrouping = partialAggGrouping;
+        this.finalAggGrouping = finalAggGrouping;
+        this.partialOriginalAggCalls = partialOriginalAggCalls;
+        this.partialAggCallNeedRetractions = partialAggCallNeedRetractions;
+        this.partialLocalAggInputRowType = partialLocalAggInputRowType;
+        this.partialAggNeedRetraction = partialAggNeedRetraction;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
+        final TableConfig config = planner.getTableConfig();
+        final ExecNode<RowData> inputNode = (ExecNode<RowData>) getInputNodes().get(0);
+        final Transformation<RowData> inputTransform = inputNode.translateToPlan(planner);
+
+        final AggregateInfoList partialLocalAggInfoList =
+                AggregateUtil.createPartialAggInfoList(
+                        partialLocalAggInputRowType,
+                        JavaScalaConversionUtil.toScala(Arrays.asList(partialOriginalAggCalls)),
+                        partialAggCallNeedRetractions,
+                        partialAggNeedRetraction,
+                        false);
+
+        final GeneratedAggsHandleFunction partialAggsHandler =
+                generateAggsHandler(
+                        "PartialGroupAggsHandler",
+                        partialLocalAggInfoList,
+                        partialAggGrouping.length,
+                        partialLocalAggInfoList.getAccTypes(),
+                        config,
+                        planner.getRelBuilder(),
+                        // the partial aggregate accumulators will be buffered, so need copy
+                        true);
+
+        final AggregateInfoList incrementalAggInfo =
+                AggregateUtil.createIncrementalAggInfoList(
+                        partialLocalAggInputRowType,
+                        JavaScalaConversionUtil.toScala(Arrays.asList(partialOriginalAggCalls)),
+                        partialAggCallNeedRetractions,
+                        partialAggNeedRetraction);
+
+        final GeneratedAggsHandleFunction finalAggsHandler =
+                generateAggsHandler(
+                        "FinalGroupAggsHandler",
+                        incrementalAggInfo,
+                        0,
+                        partialLocalAggInfoList.getAccTypes(),
+                        config,
+                        planner.getRelBuilder(),
+                        // the final aggregate accumulators is not buffered
+                        false);
+
+        final RowDataKeySelector partialKeySelector =
+                KeySelectorUtil.getRowDataSelector(
+                        partialAggGrouping, InternalTypeInfo.of(inputNode.getOutputType()));
+        final RowDataKeySelector finalKeySelector =
+                KeySelectorUtil.getRowDataSelector(
+                        finalAggGrouping, partialKeySelector.getProducedType());
+
+        final MiniBatchIncrementalGroupAggFunction aggFunction =
+                new MiniBatchIncrementalGroupAggFunction(
+                        partialAggsHandler,
+                        finalAggsHandler,
+                        finalKeySelector,
+                        config.getIdleStateRetention().toMillis());
+
+        final OneInputStreamOperator<RowData, RowData> operator =
+                new KeyedMapBundleOperator<>(
+                        aggFunction, AggregateUtil.createMiniBatchTrigger(config));
+
+        // partitioned aggregation
+        final OneInputTransformation<RowData, RowData> transform =
+                new OneInputTransformation<>(
+                        inputTransform,
+                        getDesc(),
+                        operator,
+                        InternalTypeInfo.of(getOutputType()),
+                        inputTransform.getParallelism());
+
+        if (inputsContainSingleton()) {
+            transform.setParallelism(1);
+            transform.setMaxParallelism(1);
+        }
+
+        // set KeyType and Selector for state
+        transform.setStateKeySelector(partialKeySelector);
+        transform.setStateKeyType(partialKeySelector.getProducedType());
+        return transform;
+    }
+
+    private GeneratedAggsHandleFunction generateAggsHandler(
+            String name,
+            AggregateInfoList aggInfoList,
+            int mergedAccOffset,
+            DataType[] mergedAccExternalTypes,
+            TableConfig config,
+            RelBuilder relBuilder,
+            boolean inputFieldCopy) {
+
+        AggsHandlerCodeGenerator generator =
+                new AggsHandlerCodeGenerator(
+                        new CodeGeneratorContext(config),
+                        relBuilder,
+                        JavaScalaConversionUtil.toScala(partialLocalAggInputRowType.getChildren()),
+                        inputFieldCopy);
+
+        return generator
+                .needAccumulate()

Review comment:
       not need to gen accumulate

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
+import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction;
+import org.apache.flink.table.runtime.operators.aggregate.MiniBatchGroupAggFunction;
+import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
+import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.core.AggregateCall;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+/**
+ * Stream {@link ExecNode} for unbounded group aggregate.
+ *
+ * <p>This node does support un-splittable aggregate function (e.g. STDDEV_POP).
+ */
+public class StreamExecGroupAggregate extends ExecNodeBase<RowData>
+        implements StreamExecNode<RowData> {
+    private static final Logger LOG = LoggerFactory.getLogger(StreamExecGroupAggregate.class);
+
+    private final int[] grouping;
+    private final AggregateCall[] aggCalls;
+    /** Each element indicates whether the corresponding agg call needs `retract` method. */
+    private final boolean[] aggCallNeedRetractions;
+    /** Whether this node will generate UPDATE_BEFORE messages. */
+    private final boolean generateUpdateBefore;
+    /** Whether this node consumes retraction messages. */
+    private final boolean needRetraction;
+
+    public StreamExecGroupAggregate(
+            int[] grouping,
+            AggregateCall[] aggCalls,
+            boolean[] aggCallNeedRetractions,
+            boolean generateUpdateBefore,
+            boolean needRetraction,
+            ExecEdge inputEdge,
+            RowType outputType,
+            String description) {
+        super(Collections.singletonList(inputEdge), outputType, description);
+        Preconditions.checkArgument(aggCalls.length == aggCallNeedRetractions.length);
+        this.grouping = grouping;
+        this.aggCalls = aggCalls;
+        this.aggCallNeedRetractions = aggCallNeedRetractions;
+        this.generateUpdateBefore = generateUpdateBefore;
+        this.needRetraction = needRetraction;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
+        final TableConfig tableConfig = planner.getTableConfig();
+        if (grouping.length > 0 && tableConfig.getMinIdleStateRetentionTime() < 0) {
+            LOG.warn(
+                    "No state retention interval configured for a query which accumulates state. "
+                            + "Please provide a query configuration with valid retention interval to prevent excessive "
+                            + "state size. You may specify a retention time of 0 to not clean up the state.");
+        }
+
+        final ExecNode<RowData> inputNode = (ExecNode<RowData>) getInputNodes().get(0);
+        final Transformation<RowData> inputTransform = inputNode.translateToPlan(planner);
+        final RowType inputRowType = (RowType) inputNode.getOutputType();
+
+        final AggsHandlerCodeGenerator generator =
+                new AggsHandlerCodeGenerator(
+                        new CodeGeneratorContext(tableConfig),
+                        planner.getRelBuilder(),
+                        JavaScalaConversionUtil.toScala(inputRowType.getChildren()),
+                        // TODO: heap state backend do not copy key currently,
+                        //  we have to copy input field
+                        // TODO: copy is not need when state backend is rocksdb,
+                        //  improve this in future
+                        // TODO: but other operators do not copy this input field.....
+                        true);
+
+        if (needRetraction) {
+            generator.needRetract();
+        }
+
+        final AggregateInfoList aggInfoList =
+                AggregateUtil.transformToStreamAggregateInfoList(
+                        inputRowType,
+                        JavaScalaConversionUtil.toScala(Arrays.asList(aggCalls)),
+                        aggCallNeedRetractions,
+                        needRetraction,
+                        true,
+                        true);
+        final GeneratedAggsHandleFunction aggsHandler =
+                generator.needAccumulate().generateAggsHandler("GroupAggsHandler", aggInfoList);
+
+        final LogicalType[] accTypes =
+                Arrays.stream(aggInfoList.getAccTypes())
+                        .map(LogicalTypeDataTypeConverter::fromDataTypeToLogicalType)
+                        .toArray(LogicalType[]::new);
+        final LogicalType[] aggValueTypes =
+                Arrays.stream(aggInfoList.getActualValueTypes())
+                        .map(LogicalTypeDataTypeConverter::fromDataTypeToLogicalType)
+                        .toArray(LogicalType[]::new);
+        final GeneratedRecordEqualiser recordEqualiser =
+                new EqualiserCodeGenerator(aggValueTypes)
+                        .generateRecordEqualiser("GroupAggValueEqualiser");
+        final int inputCountIndex = aggInfoList.getIndexOfCountStar();
+        final boolean isMiniBatchEnabled =
+                tableConfig
+                        .getConfiguration()
+                        .getBoolean(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED);
+
+        final OneInputStreamOperator<RowData, RowData> operator;
+        if (isMiniBatchEnabled) {
+            MiniBatchGroupAggFunction aggFunction =
+                    new MiniBatchGroupAggFunction(
+                            aggsHandler,
+                            recordEqualiser,
+                            accTypes,
+                            inputRowType,
+                            inputCountIndex,
+                            generateUpdateBefore,
+                            tableConfig.getIdleStateRetention().toMillis());
+            operator =
+                    new KeyedMapBundleOperator<>(
+                            aggFunction, AggregateUtil.createMiniBatchTrigger(tableConfig));
+        } else {
+            GroupAggFunction aggFunction =
+                    new GroupAggFunction(
+                            aggsHandler,
+                            recordEqualiser,
+                            accTypes,
+                            inputCountIndex,
+                            generateUpdateBefore,
+                            tableConfig.getIdleStateRetention().toMillis());
+            operator = new KeyedProcessOperator<>(aggFunction);
+        }
+
+        final RowDataKeySelector selector =
+                KeySelectorUtil.getRowDataSelector(grouping, InternalTypeInfo.of(inputRowType));

Review comment:
       ditto

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
+import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction;
+import org.apache.flink.table.runtime.operators.aggregate.MiniBatchGroupAggFunction;
+import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
+import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.core.AggregateCall;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+/**
+ * Stream {@link ExecNode} for unbounded group aggregate.
+ *
+ * <p>This node does support un-splittable aggregate function (e.g. STDDEV_POP).
+ */
+public class StreamExecGroupAggregate extends ExecNodeBase<RowData>
+        implements StreamExecNode<RowData> {
+    private static final Logger LOG = LoggerFactory.getLogger(StreamExecGroupAggregate.class);
+
+    private final int[] grouping;
+    private final AggregateCall[] aggCalls;
+    /** Each element indicates whether the corresponding agg call needs `retract` method. */
+    private final boolean[] aggCallNeedRetractions;
+    /** Whether this node will generate UPDATE_BEFORE messages. */
+    private final boolean generateUpdateBefore;
+    /** Whether this node consumes retraction messages. */
+    private final boolean needRetraction;
+
+    public StreamExecGroupAggregate(
+            int[] grouping,
+            AggregateCall[] aggCalls,
+            boolean[] aggCallNeedRetractions,
+            boolean generateUpdateBefore,
+            boolean needRetraction,
+            ExecEdge inputEdge,
+            RowType outputType,
+            String description) {
+        super(Collections.singletonList(inputEdge), outputType, description);
+        Preconditions.checkArgument(aggCalls.length == aggCallNeedRetractions.length);
+        this.grouping = grouping;
+        this.aggCalls = aggCalls;
+        this.aggCallNeedRetractions = aggCallNeedRetractions;
+        this.generateUpdateBefore = generateUpdateBefore;
+        this.needRetraction = needRetraction;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
+        final TableConfig tableConfig = planner.getTableConfig();
+        if (grouping.length > 0 && tableConfig.getMinIdleStateRetentionTime() < 0) {
+            LOG.warn(
+                    "No state retention interval configured for a query which accumulates state. "
+                            + "Please provide a query configuration with valid retention interval to prevent excessive "
+                            + "state size. You may specify a retention time of 0 to not clean up the state.");
+        }
+
+        final ExecNode<RowData> inputNode = (ExecNode<RowData>) getInputNodes().get(0);
+        final Transformation<RowData> inputTransform = inputNode.translateToPlan(planner);
+        final RowType inputRowType = (RowType) inputNode.getOutputType();
+
+        final AggsHandlerCodeGenerator generator =
+                new AggsHandlerCodeGenerator(
+                        new CodeGeneratorContext(tableConfig),
+                        planner.getRelBuilder(),
+                        JavaScalaConversionUtil.toScala(inputRowType.getChildren()),
+                        // TODO: heap state backend do not copy key currently,
+                        //  we have to copy input field
+                        // TODO: copy is not need when state backend is rocksdb,
+                        //  improve this in future
+                        // TODO: but other operators do not copy this input field.....
+                        true);

Review comment:
       how about set needAccumulate here

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupTableAggregate.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.runtime.generated.GeneratedTableAggsHandleFunction;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.aggregate.GroupTableAggFunction;
+import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.core.AggregateCall;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+/** Stream {@link ExecNode} for unbounded java/scala group table aggregate. */

Review comment:
       how about link to StreamExecNode and link to BatchExecNode for batch nodes?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupTableAggregate.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.runtime.generated.GeneratedTableAggsHandleFunction;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.aggregate.GroupTableAggFunction;
+import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.core.AggregateCall;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+/** Stream {@link ExecNode} for unbounded java/scala group table aggregate. */
+public class StreamExecGroupTableAggregate extends ExecNodeBase<RowData>
+        implements StreamExecNode<RowData> {
+    private static final Logger LOG = LoggerFactory.getLogger(StreamExecGroupTableAggregate.class);
+
+    private final int[] grouping;
+    private final AggregateCall[] aggCalls;
+    /** Each element indicates whether the corresponding agg call needs `retract` method. */
+    private final boolean[] aggCallNeedRetractions;
+    /** Whether this node will generate UPDATE_BEFORE messages. */
+    private final boolean generateUpdateBefore;
+    /** Whether this node consumes retraction messages. */
+    private final boolean needRetraction;
+
+    public StreamExecGroupTableAggregate(
+            int[] grouping,
+            AggregateCall[] aggCalls,
+            boolean[] aggCallNeedRetractions,
+            boolean generateUpdateBefore,
+            boolean needRetraction,
+            ExecEdge inputEdge,
+            RowType outputType,
+            String description) {
+        super(Collections.singletonList(inputEdge), outputType, description);
+        Preconditions.checkArgument(aggCalls.length == aggCallNeedRetractions.length);
+        this.grouping = grouping;
+        this.aggCalls = aggCalls;
+        this.aggCallNeedRetractions = aggCallNeedRetractions;
+        this.generateUpdateBefore = generateUpdateBefore;
+        this.needRetraction = needRetraction;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
+        final TableConfig tableConfig = planner.getTableConfig();
+
+        if (grouping.length > 0 && tableConfig.getMinIdleStateRetentionTime() < 0) {
+            LOG.warn(
+                    "No state retention interval configured for a query which accumulates state. "
+                            + "Please provide a query configuration with valid retention interval to prevent excessive "
+                            + "state size. You may specify a retention time of 0 to not clean up the state.");
+        }
+
+        final ExecNode<RowData> inputNode = (ExecNode<RowData>) getInputNodes().get(0);
+        final Transformation<RowData> inputTransform = inputNode.translateToPlan(planner);
+        final RowType inputRowType = (RowType) inputNode.getOutputType();
+
+        final AggsHandlerCodeGenerator generator =
+                new AggsHandlerCodeGenerator(
+                        new CodeGeneratorContext(tableConfig),
+                        planner.getRelBuilder(),
+                        JavaScalaConversionUtil.toScala(inputRowType.getChildren()),
+                        // TODO: heap state backend do not copy key currently, we have to copy input
+                        // field
+                        // TODO: copy is not need when state backend is rocksdb, improve this in
+                        // future
+                        // TODO: but other operators do not copy this input field.....
+                        true);
+
+        if (needRetraction) {
+            generator.needRetract();
+        }
+
+        final AggregateInfoList aggInfoList =
+                AggregateUtil.transformToStreamAggregateInfoList(
+                        inputRowType,
+                        JavaScalaConversionUtil.toScala(Arrays.asList(aggCalls)),
+                        aggCallNeedRetractions,
+                        needRetraction,
+                        true,
+                        true);
+        final GeneratedTableAggsHandleFunction aggsHandler =
+                generator
+                        .needAccumulate()

Review comment:
       ditto

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonOverAggregate.scala
##########
@@ -27,13 +27,8 @@ import org.apache.flink.table.data.RowData
 import org.apache.flink.table.functions.python.PythonFunctionInfo
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.delegation.StreamPlanner
-import org.apache.flink.table.planner.plan.nodes.common.CommonPythonAggregate
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonOverAggregate.{
-  ARROW_PYTHON_OVER_WINDOW_RANGE_ROW_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME,
-  ARROW_PYTHON_OVER_WINDOW_RANGE_PROC_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME,
-  ARROW_PYTHON_OVER_WINDOW_ROWS_ROW_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME,
-  ARROW_PYTHON_OVER_WINDOW_ROWS_PROC_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME
-}
+import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonAggregate
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonOverAggregate.{ARROW_PYTHON_OVER_WINDOW_RANGE_PROC_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME, ARROW_PYTHON_OVER_WINDOW_RANGE_ROW_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME, ARROW_PYTHON_OVER_WINDOW_ROWS_PROC_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME, ARROW_PYTHON_OVER_WINDOW_ROWS_ROW_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME}

Review comment:
       use the format above?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org