You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/10/20 16:10:47 UTC

[GitHub] [flink] godfreyhe commented on a change in pull request #17344: [FLINK-20895] [flink-table-planner] support local aggregate push down in table planner

godfreyhe commented on a change in pull request #17344:
URL: https://github.com/apache/flink/pull/17344#discussion_r732638497



##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalHashAggIntoScanRule.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalLocalHashAggregate;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+
+/**
+ * Planner rule that tries to push a local hash or sort aggregate which without sort into a {@link

Review comment:
       the comment should be updated, remove "or sort aggregate"

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalSortAggWithoutSortIntoScanRule.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalLocalSortAggregate;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+
+/**
+ * Planner rule that tries to push a local hash or sort aggregate which without sort into a {@link
+ * BatchPhysicalTableSourceScan} whose table is a {@link TableSourceTable} with a source supporting
+ * {@link SupportsAggregatePushDown}. The {@link
+ * OptimizerConfigOptions#TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED} need to be true.
+ *
+ * <p>Suppose we have the original physical plan:
+ *
+ * <pre>{@code
+ * BatchPhysicalHashAggregate (global)

Review comment:
       BatchPhysicalSortAggregate

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoScanRuleBase.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.abilities.source.AggregatePushDownSpec;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.core.AggregateCall;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Planner rule that tries to push a local aggregator into an {@link BatchPhysicalTableSourceScan}
+ * whose table is a {@link TableSourceTable} with a source supporting {@link
+ * SupportsAggregatePushDown}.
+ *
+ * <p>The aggregate push down does not support a number of more complex statements at present:
+ *
+ * <ul>
+ *   <li>complex grouping operations such as ROLLUP, CUBE, or GROUPING SETS.
+ *   <li>expressions inside the aggregation function call: such as sum(a * b).
+ *   <li>aggregations with ordering.
+ *   <li>aggregations with filter.
+ * </ul>
+ */
+public abstract class PushLocalAggIntoScanRuleBase extends RelOptRule {
+
+    public PushLocalAggIntoScanRuleBase(RelOptRuleOperand operand, String description) {
+        super(operand, description);
+    }
+
+    protected boolean isMatch(
+            RelOptRuleCall call,
+            BatchPhysicalGroupAggregateBase aggregate,
+            BatchPhysicalTableSourceScan tableSourceScan) {
+        TableConfig tableConfig = ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig();
+        if (!tableConfig
+                .getConfiguration()
+                .getBoolean(
+                        OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED)) {
+            return false;
+        }
+
+        if (aggregate.isFinal() || aggregate.getAggCallList().isEmpty()) {
+            return false;
+        }
+        List<AggregateCall> aggCallList =
+                JavaScalaConversionUtil.toJava(aggregate.getAggCallList());
+        for (AggregateCall aggCall : aggCallList) {
+            if (aggCall.isDistinct()
+                    || aggCall.isApproximate()
+                    || aggCall.getArgList().size() > 1
+                    || aggCall.hasFilter()
+                    || !aggCall.getCollation().getFieldCollations().isEmpty()) {
+                return false;
+            }
+        }
+        TableSourceTable tableSourceTable = tableSourceScan.tableSourceTable();
+        // we can not push aggregates twice
+        return tableSourceTable != null
+                && tableSourceTable.tableSource() instanceof SupportsAggregatePushDown
+                && Arrays.stream(tableSourceTable.abilitySpecs())
+                        .noneMatch(spec -> spec instanceof AggregatePushDownSpec);
+    }
+
+    protected void pushLocalAggregateIntoScan(
+            RelOptRuleCall call,
+            BatchPhysicalGroupAggregateBase localAgg,
+            BatchPhysicalTableSourceScan oldScan) {
+        RowType inputType = FlinkTypeFactory.toLogicalRowType(oldScan.getRowType());
+        List<int[]> groupingSets = Arrays.asList(localAgg.grouping(), localAgg.auxGrouping());
+        List<AggregateCall> aggCallList = JavaScalaConversionUtil.toJava(localAgg.getAggCallList());
+        RowType producedType = FlinkTypeFactory.toLogicalRowType(localAgg.getRowType());
+
+        TableSourceTable oldTableSourceTable = oldScan.tableSourceTable();
+        DynamicTableSource newTableSource = oldScan.tableSource().copy();
+
+        boolean isPushDownSuccess =
+                AggregatePushDownSpec.apply(
+                        inputType, groupingSets, aggCallList, producedType, newTableSource);
+
+        if (!isPushDownSuccess) {
+            // aggregate push down failed, just return without changing any nodes.
+            return;
+        }
+
+        FlinkStatistic newFlinkStatistic = getNewFlinkStatistic(oldTableSourceTable);
+        AggregatePushDownSpec aggregatePushDownSpec =
+                new AggregatePushDownSpec(inputType, groupingSets, aggCallList, producedType);
+
+        TableSourceTable newTableSourceTable =
+                oldTableSourceTable
+                        .copy(
+                                newTableSource,
+                                newFlinkStatistic,
+                                new SourceAbilitySpec[] {aggregatePushDownSpec})
+                        .copy(localAgg.getRowType());
+        BatchPhysicalTableSourceScan newScan =
+                oldScan.copy(oldScan.getTraitSet(), newTableSourceTable);
+        BatchPhysicalExchange oldExchange = call.rel(0);
+        BatchPhysicalExchange newExchange =
+                oldExchange.copy(oldExchange.getTraitSet(), newScan, oldExchange.getDistribution());
+        call.transformTo(newExchange);
+    }
+
+    private FlinkStatistic getNewFlinkStatistic(TableSourceTable tableSourceTable) {
+        FlinkStatistic oldStatistic = tableSourceTable.getStatistic();
+        if (oldStatistic == FlinkStatistic.UNKNOWN()) {
+            return oldStatistic;
+        } else {
+            // Remove tableStats after all aggregates have been pushed down
+            return FlinkStatistic.builder().statistic(oldStatistic).tableStats(null).build();

Review comment:
       unique key info in `FlinkStatistic` may be also changed

##########
File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
##########
@@ -279,6 +281,21 @@ object AggregateUtil extends Enumeration {
       isBounded = false)
   }
 
+  def deriveSumAndCountFromAvg(avgAggFunction: UserDefinedFunction

Review comment:
       use  specific  type:`AvgAggFunction` instead of `UserDefinedFunction` here

##########
File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/LocalAggregatePushDownITCase.scala
##########
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.batch.sql.agg
+
+import org.apache.flink.table.api.DataTypes
+import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions}
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import org.apache.flink.table.planner.functions.aggfunctions.CollectAggFunction
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData}
+import org.junit.{Before, Test}
+
+class LocalAggregatePushDownITCase extends BatchTestBase {

Review comment:
       port this class into JAVA

##########
File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/LocalAggregatePushDownITCase.scala
##########
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.batch.sql.agg
+
+import org.apache.flink.table.api.DataTypes
+import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions}
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import org.apache.flink.table.planner.functions.aggfunctions.CollectAggFunction
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData}
+import org.junit.{Before, Test}
+
+class LocalAggregatePushDownITCase extends BatchTestBase {
+
+  @Before
+  override def before(): Unit = {
+    super.before()
+    env.setParallelism(1) // set sink parallelism to 1
+    conf.getConfiguration.setBoolean(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED, true)
+    val testDataId = TestValuesTableFactory.registerData(TestData.personData)
+    val ddl =
+      s"""
+         |CREATE TABLE AggregatableTable (
+         |  id int,
+         |  age int,
+         |  name string,
+         |  height int,
+         |  gender string,
+         |  deposit bigint,
+         |  points bigint
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$testDataId',
+         |  'filterable-fields' = 'id',
+         |  'bounded' = 'true'
+         |)
+       """.stripMargin
+    tEnv.executeSql(ddl)
+
+  }
+
+  @Test
+  def testPushDownLocalHashAggWithGroup(): Unit = {
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit) as avg_dep,
+        |  sum(deposit),
+        |  count(1),
+        |  gender
+        |FROM
+        |  AggregatableTable
+        |GROUP BY gender
+        |ORDER BY avg_dep
+        |""".stripMargin,
+      Seq(
+        row(126, 630, 5, "f"),
+        row(220, 1320, 6, "m"))
+    )
+  }
+
+  @Test
+  def testDisablePushDownLocalAgg(): Unit = {
+    // disable push down local agg
+    tEnv.getConfig.getConfiguration.setBoolean(
+        OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
+        false)
+
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit) as avg_dep,
+        |  sum(deposit),
+        |  count(1),
+        |  gender
+        |FROM
+        |  AggregatableTable
+        |GROUP BY gender
+        |ORDER BY avg_dep
+        |""".stripMargin,
+      Seq(
+        row(126, 630, 5, "f"),
+        row(220, 1320, 6, "m"))
+    )
+
+    // reset config
+    tEnv.getConfig.getConfiguration.setBoolean(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
+      true)
+  }
+
+  @Test
+  def testPushDownLocalHashAggWithOutGroup(): Unit = {
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit),
+        |  sum(deposit),
+        |  count(*)
+        |FROM
+        |  AggregatableTable
+        |""".stripMargin,
+      Seq(
+        row(177, 1950, 11))
+    )
+  }
+
+  @Test
+  def testPushDownLocalSortAggWithoutSort(): Unit = {
+    // enable sort agg
+    tEnv.getConfig.getConfiguration.setString(
+      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
+
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit),
+        |  sum(deposit),
+        |  count(*)
+        |FROM
+        |  AggregatableTable
+        |""".stripMargin,
+      Seq(
+        row(177, 1950, 11))
+    )
+
+    // reset config
+    tEnv.getConfig.getConfiguration.setString(
+      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "")
+  }
+
+  @Test
+  def testPushDownLocalSortAggWithSort(): Unit = {
+    // enable sort agg
+    tEnv.getConfig.getConfiguration.setString(
+      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
+
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit),
+        |  sum(deposit),
+        |  count(1),
+        |  gender,
+        |  age
+        |FROM
+        |  AggregatableTable
+        |GROUP BY gender, age
+        |""".stripMargin,
+      Seq(
+        row(50, 50, 1, "f", 19),
+        row(200, 200, 1, "f", 20),
+        row(250, 750, 3, "m", 23),
+        row(126, 380, 3, "f", 25),
+        row(300, 300, 1, "m", 27),
+        row(170, 170, 1, "m", 28),
+        row(100, 100, 1, "m", 34))
+    )
+
+    // reset config
+    tEnv.getConfig.getConfiguration.setString(
+      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "")
+  }
+
+  @Test
+  def testAggWithAuxGrouping(): Unit = {
+    checkResult(
+      """
+        |SELECT
+        |  name,
+        |  d,
+        |  p,
+        |  count(*)
+        |FROM (
+        |  SELECT
+        |    name,
+        |    sum(deposit) as d,
+        |    max(points) as p
+        |  FROM AggregatableTable
+        |  GROUP BY name
+        |) t
+        |GROUP BY name, d, p
+        |""".stripMargin,
+      Seq(
+        row("tom", 200, 1000, 1),
+        row("mary", 100, 1000, 1),
+        row("jack", 150, 1300, 1),
+        row("rose", 100, 500, 1),
+        row("danny", 300, 300, 1),
+        row("tommas", 400, 4000, 1),
+        row("olivia", 50, 9000, 1),
+        row("stef", 100, 1900, 1),
+        row("emma", 180, 800, 1),
+        row("benji", 170, 11000, 1),
+        row("eva", 200, 1000, 1))
+    )
+  }
+
+  @Test
+  def testPushDownLocalAggAfterFilterPushDown(): Unit = {
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit),
+        |  sum(deposit),
+        |  count(1),
+        |  gender,
+        |  age
+        |FROM
+        |  AggregatableTable
+        |WHERE age <= 20
+        |GROUP BY gender, age
+        |""".stripMargin,
+      Seq(
+        row(50, 50, 1, "f", 19),
+        row(200, 200, 1, "f", 20))
+    )
+  }
+
+  @Test
+  def testLocalAggWithLimit(): Unit = {
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit) as avg_dep,
+        |  sum(deposit),
+        |  count(1),
+        |  gender
+        |FROM
+        |  (
+        |    SELECT * FROM AggregatableTable
+        |    LIMIT 10
+        |  ) t
+        |GROUP BY gender
+        |ORDER BY avg_dep
+        |""".stripMargin,
+      Seq(
+        row(107, 430, 4, "f"),
+        row(220, 1320, 6, "m"))
+    )
+  }
+
+  @Test
+  def testLocalAggWithUDAF(): Unit = {
+    // add UDAF
+    tEnv.createTemporarySystemFunction(
+      "udaf_collect",
+      new CollectAggFunction(DataTypes.BIGINT().getLogicalType))
+
+    checkResult(
+      """
+        |SELECT
+        |  udaf_collect(deposit),
+        |  count(1),
+        |  gender,
+        |  age
+        |FROM
+        |  AggregatableTable
+        |GROUP BY gender, age
+        |""".stripMargin,
+      Seq(
+        row("{100=1}", 1, "m", 34),
+        row("{100=2, 180=1}", 3, "f", 25),
+        row("{170=1}", 1, "m", 28),
+        row("{200=1}", 1, "f", 20),
+        row("{300=1}", 1, "m", 27),
+        row("{400=1, 150=1, 200=1}", 3, "m", 23),
+        row("{50=1}", 1, "f", 19))
+    )
+  }
+
+  @Test
+  def testLocalAggWithUnsupportedDataTypes(): Unit = {
+    // only agg on Long columns and count are supported to be pushed down
+    // in {@link TestValuesTableFactory}
+
+    checkResult(
+      """
+        |SELECT
+        |  min(age),
+        |  max(height),
+        |  avg(deposit),
+        |  sum(deposit),
+        |  count(1),
+        |  gender,
+        |  age
+        |FROM
+        |  AggregatableTable
+        |GROUP BY gender, age
+        |""".stripMargin,
+      Seq(
+        row(19, 172, 50, 50, 1, "f", 19),
+        row(20, 180, 200, 200, 1, "f", 20),
+        row(23, 182, 250, 750, 3, "m", 23),
+        row(25, 171, 126, 380, 3, "f", 25),
+        row(27, 175, 300, 300, 1, "m", 27),
+        row(28, 165, 170, 170, 1, "m", 28),
+        row(34, 170, 100, 100, 1, "m", 34))
+    )
+  }
+
+  @Test
+  def testLocalAggWithColumnExpression1(): Unit = {
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit),
+        |  sum(deposit + points),
+        |  count(1),
+        |  gender,
+        |  age
+        |FROM
+        |  AggregatableTable
+        |GROUP BY gender, age
+        |""".stripMargin,
+      Seq(
+        row(250, 7050, 3, "m", 23),
+        row(126, 2680, 3, "f", 25),
+        row(300, 600, 1, "m", 27),
+        row(50, 9050, 1, "f", 19),
+        row(100, 2000, 1, "m", 34),
+        row(170, 11170, 1, "m", 28),
+        row(200, 1200, 1, "f", 20))
+    )
+  }
+
+  @Test
+  def testLocalAggWithColumnExpression2(): Unit = {

Review comment:
       testLocalAggWithColumnExpression2 -> testLocalAggWithDistinct

##########
File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/LocalAggregatePushDownITCase.scala
##########
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.batch.sql.agg
+
+import org.apache.flink.table.api.DataTypes
+import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions}
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import org.apache.flink.table.planner.functions.aggfunctions.CollectAggFunction
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData}
+import org.junit.{Before, Test}
+
+class LocalAggregatePushDownITCase extends BatchTestBase {
+
+  @Before
+  override def before(): Unit = {
+    super.before()
+    env.setParallelism(1) // set sink parallelism to 1
+    conf.getConfiguration.setBoolean(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED, true)
+    val testDataId = TestValuesTableFactory.registerData(TestData.personData)
+    val ddl =
+      s"""
+         |CREATE TABLE AggregatableTable (
+         |  id int,
+         |  age int,
+         |  name string,
+         |  height int,
+         |  gender string,
+         |  deposit bigint,
+         |  points bigint
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$testDataId',
+         |  'filterable-fields' = 'id',
+         |  'bounded' = 'true'
+         |)
+       """.stripMargin
+    tEnv.executeSql(ddl)
+
+  }
+
+  @Test
+  def testPushDownLocalHashAggWithGroup(): Unit = {
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit) as avg_dep,
+        |  sum(deposit),
+        |  count(1),
+        |  gender
+        |FROM
+        |  AggregatableTable
+        |GROUP BY gender
+        |ORDER BY avg_dep
+        |""".stripMargin,
+      Seq(
+        row(126, 630, 5, "f"),
+        row(220, 1320, 6, "m"))
+    )
+  }
+
+  @Test
+  def testDisablePushDownLocalAgg(): Unit = {
+    // disable push down local agg
+    tEnv.getConfig.getConfiguration.setBoolean(
+        OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
+        false)
+
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit) as avg_dep,
+        |  sum(deposit),
+        |  count(1),
+        |  gender
+        |FROM
+        |  AggregatableTable
+        |GROUP BY gender
+        |ORDER BY avg_dep
+        |""".stripMargin,
+      Seq(
+        row(126, 630, 5, "f"),
+        row(220, 1320, 6, "m"))
+    )
+
+    // reset config
+    tEnv.getConfig.getConfiguration.setBoolean(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
+      true)
+  }
+
+  @Test
+  def testPushDownLocalHashAggWithOutGroup(): Unit = {
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit),
+        |  sum(deposit),
+        |  count(*)
+        |FROM
+        |  AggregatableTable
+        |""".stripMargin,
+      Seq(
+        row(177, 1950, 11))
+    )
+  }
+
+  @Test
+  def testPushDownLocalSortAggWithoutSort(): Unit = {
+    // enable sort agg
+    tEnv.getConfig.getConfiguration.setString(
+      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
+
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit),
+        |  sum(deposit),
+        |  count(*)
+        |FROM
+        |  AggregatableTable
+        |""".stripMargin,
+      Seq(
+        row(177, 1950, 11))
+    )
+
+    // reset config
+    tEnv.getConfig.getConfiguration.setString(
+      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "")
+  }
+
+  @Test
+  def testPushDownLocalSortAggWithSort(): Unit = {
+    // enable sort agg
+    tEnv.getConfig.getConfiguration.setString(
+      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
+
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit),
+        |  sum(deposit),
+        |  count(1),
+        |  gender,
+        |  age
+        |FROM
+        |  AggregatableTable
+        |GROUP BY gender, age
+        |""".stripMargin,
+      Seq(
+        row(50, 50, 1, "f", 19),
+        row(200, 200, 1, "f", 20),
+        row(250, 750, 3, "m", 23),
+        row(126, 380, 3, "f", 25),
+        row(300, 300, 1, "m", 27),
+        row(170, 170, 1, "m", 28),
+        row(100, 100, 1, "m", 34))
+    )
+
+    // reset config
+    tEnv.getConfig.getConfiguration.setString(
+      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "")
+  }
+
+  @Test
+  def testAggWithAuxGrouping(): Unit = {
+    checkResult(
+      """
+        |SELECT
+        |  name,
+        |  d,
+        |  p,
+        |  count(*)
+        |FROM (
+        |  SELECT
+        |    name,
+        |    sum(deposit) as d,
+        |    max(points) as p
+        |  FROM AggregatableTable
+        |  GROUP BY name
+        |) t
+        |GROUP BY name, d, p
+        |""".stripMargin,
+      Seq(
+        row("tom", 200, 1000, 1),
+        row("mary", 100, 1000, 1),
+        row("jack", 150, 1300, 1),
+        row("rose", 100, 500, 1),
+        row("danny", 300, 300, 1),
+        row("tommas", 400, 4000, 1),
+        row("olivia", 50, 9000, 1),
+        row("stef", 100, 1900, 1),
+        row("emma", 180, 800, 1),
+        row("benji", 170, 11000, 1),
+        row("eva", 200, 1000, 1))
+    )
+  }
+
+  @Test
+  def testPushDownLocalAggAfterFilterPushDown(): Unit = {
+    checkResult(

Review comment:
       the local aggregate can not be pushed down to source, since the filter can not be pushed down the source.
   
   see the plan: 
   ```
   Sink(table=[default_catalog.default_database.Unregistered_Collect_Sink_1], fields=[EXPR$0, EXPR$1, EXPR$2, gender, age])
   +- Calc(select=[EXPR$0, EXPR$1, EXPR$2, gender, age])
      +- HashAggregate(isMerge=[true], groupBy=[gender, age], select=[gender, age, Final_AVG(sum$0, count$1) AS EXPR$0, Final_SUM(sum$2) AS EXPR$1, Final_COUNT(count1$3) AS EXPR$2])
         +- Exchange(distribution=[hash[gender, age]])
            +- LocalHashAggregate(groupBy=[gender, age], select=[gender, age, Partial_AVG(deposit) AS (sum$0, count$1), Partial_SUM(deposit) AS sum$2, Partial_COUNT(*) AS count1$3])
               +- Calc(select=[gender, age, deposit], where=[<=(age, 20)])
                  +- TableSourceScan(table=[[default_catalog, default_database, AggregatableTable, filter=[], project=[age, gender, deposit], metadata=[]]], fields=[age, gender, deposit])
   ```
   
   please add `filterable-fields` into `with` clause and then the filter can be pushed down successfully. see PushFilterInCalcIntoTableSourceRuleTest#L79

##########
File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/LocalAggregatePushDownITCase.scala
##########
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.batch.sql.agg
+
+import org.apache.flink.table.api.DataTypes
+import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions}
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import org.apache.flink.table.planner.functions.aggfunctions.CollectAggFunction
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData}
+import org.junit.{Before, Test}
+
+class LocalAggregatePushDownITCase extends BatchTestBase {
+
+  @Before
+  override def before(): Unit = {
+    super.before()
+    env.setParallelism(1) // set sink parallelism to 1
+    conf.getConfiguration.setBoolean(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED, true)
+    val testDataId = TestValuesTableFactory.registerData(TestData.personData)
+    val ddl =
+      s"""
+         |CREATE TABLE AggregatableTable (
+         |  id int,
+         |  age int,
+         |  name string,
+         |  height int,
+         |  gender string,
+         |  deposit bigint,
+         |  points bigint
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$testDataId',
+         |  'filterable-fields' = 'id',
+         |  'bounded' = 'true'
+         |)
+       """.stripMargin
+    tEnv.executeSql(ddl)
+
+  }
+
+  @Test
+  def testPushDownLocalHashAggWithGroup(): Unit = {
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit) as avg_dep,
+        |  sum(deposit),
+        |  count(1),
+        |  gender
+        |FROM
+        |  AggregatableTable
+        |GROUP BY gender
+        |ORDER BY avg_dep
+        |""".stripMargin,
+      Seq(
+        row(126, 630, 5, "f"),
+        row(220, 1320, 6, "m"))
+    )
+  }
+
+  @Test
+  def testDisablePushDownLocalAgg(): Unit = {
+    // disable push down local agg
+    tEnv.getConfig.getConfiguration.setBoolean(
+        OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
+        false)
+
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit) as avg_dep,
+        |  sum(deposit),
+        |  count(1),
+        |  gender
+        |FROM
+        |  AggregatableTable
+        |GROUP BY gender
+        |ORDER BY avg_dep
+        |""".stripMargin,
+      Seq(
+        row(126, 630, 5, "f"),
+        row(220, 1320, 6, "m"))
+    )
+
+    // reset config
+    tEnv.getConfig.getConfiguration.setBoolean(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
+      true)
+  }
+
+  @Test
+  def testPushDownLocalHashAggWithOutGroup(): Unit = {
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit),
+        |  sum(deposit),
+        |  count(*)
+        |FROM
+        |  AggregatableTable
+        |""".stripMargin,
+      Seq(
+        row(177, 1950, 11))
+    )
+  }
+
+  @Test
+  def testPushDownLocalSortAggWithoutSort(): Unit = {
+    // enable sort agg
+    tEnv.getConfig.getConfiguration.setString(
+      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
+
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit),
+        |  sum(deposit),
+        |  count(*)
+        |FROM
+        |  AggregatableTable
+        |""".stripMargin,
+      Seq(
+        row(177, 1950, 11))
+    )
+
+    // reset config
+    tEnv.getConfig.getConfiguration.setString(
+      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "")

Review comment:
       ditto

##########
File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala
##########
@@ -131,4 +130,23 @@ class TableSourceTable(
       flinkContext,
       abilitySpecs ++ newAbilitySpecs)
   }
+
+  /**
+   * Creates a copy of this table, changing the rowType
+   *
+   * @param newRowType new row type
+   * @return New TableSourceTable instance with new row type
+   */
+  def copy(newRowType: RelDataType): TableSourceTable = {

Review comment:
       providing the `copy` method with `FlinkStatistic` is more useful

##########
File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/LocalAggregatePushDownITCase.scala
##########
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.batch.sql.agg
+
+import org.apache.flink.table.api.DataTypes
+import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions}
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import org.apache.flink.table.planner.functions.aggfunctions.CollectAggFunction
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData}
+import org.junit.{Before, Test}
+
+class LocalAggregatePushDownITCase extends BatchTestBase {
+
+  @Before
+  override def before(): Unit = {
+    super.before()
+    env.setParallelism(1) // set sink parallelism to 1
+    conf.getConfiguration.setBoolean(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED, true)
+    val testDataId = TestValuesTableFactory.registerData(TestData.personData)
+    val ddl =
+      s"""
+         |CREATE TABLE AggregatableTable (
+         |  id int,
+         |  age int,
+         |  name string,
+         |  height int,
+         |  gender string,
+         |  deposit bigint,
+         |  points bigint
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$testDataId',
+         |  'filterable-fields' = 'id',
+         |  'bounded' = 'true'
+         |)
+       """.stripMargin
+    tEnv.executeSql(ddl)
+
+  }
+
+  @Test
+  def testPushDownLocalHashAggWithGroup(): Unit = {
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit) as avg_dep,
+        |  sum(deposit),
+        |  count(1),
+        |  gender
+        |FROM
+        |  AggregatableTable
+        |GROUP BY gender
+        |ORDER BY avg_dep
+        |""".stripMargin,
+      Seq(
+        row(126, 630, 5, "f"),
+        row(220, 1320, 6, "m"))
+    )
+  }
+
+  @Test
+  def testDisablePushDownLocalAgg(): Unit = {
+    // disable push down local agg
+    tEnv.getConfig.getConfiguration.setBoolean(
+        OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
+        false)
+
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit) as avg_dep,
+        |  sum(deposit),
+        |  count(1),
+        |  gender
+        |FROM
+        |  AggregatableTable
+        |GROUP BY gender
+        |ORDER BY avg_dep
+        |""".stripMargin,
+      Seq(
+        row(126, 630, 5, "f"),
+        row(220, 1320, 6, "m"))
+    )
+
+    // reset config
+    tEnv.getConfig.getConfiguration.setBoolean(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
+      true)

Review comment:
       this is unnecessary

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -867,27 +885,118 @@ public String asSummaryString() {
                     allPartitions.isEmpty()
                             ? Collections.singletonList(Collections.emptyMap())
                             : allPartitions;
+
             int numRetained = 0;
             for (Map<String, String> partition : keys) {
-                for (Row row : data.get(partition)) {
+                Collection<Row> rowsInPartition = data.get(partition);
+
+                // handle predicates and projection
+                List<Row> rowsRetained =
+                        rowsInPartition.stream()
+                                .filter(
+                                        row ->
+                                                FilterUtils.isRetainedAfterApplyingFilterPredicates(
+                                                        filterPredicates, getValueGetter(row)))
+                                .map(
+                                        row -> {
+                                            Row projectedRow = projectRow(row);
+                                            projectedRow.setKind(row.getKind());
+                                            return projectedRow;
+                                        })
+                                .collect(Collectors.toList());
+
+                // handle aggregates
+                if (!aggregateExpressions.isEmpty()) {
+                    rowsRetained = applyAggregatesToRows(rowsRetained);
+                }
+
+                // handle row data
+                for (Row row : rowsRetained) {
+                    final RowData rowData = (RowData) converter.toInternal(row);
+                    if (rowData != null) {
+                        if (numRetained >= numElementToSkip) {
+                            rowData.setRowKind(row.getKind());
+                            result.add(rowData);
+                        }
+                        numRetained++;
+                    }
+
+                    // handle limit. No aggregates will be pushed down when there is a limit.
                     if (result.size() >= limit) {
                         return result;
                     }
-                    boolean isRetained =
-                            FilterUtils.isRetainedAfterApplyingFilterPredicates(
-                                    filterPredicates, getValueGetter(row));
-                    if (isRetained) {
-                        final Row projectedRow = projectRow(row);
-                        final RowData rowData = (RowData) converter.toInternal(projectedRow);
-                        if (rowData != null) {
-                            if (numRetained >= numElementToSkip) {
-                                rowData.setRowKind(row.getKind());
-                                result.add(rowData);
-                            }
-                            numRetained++;
-                        }
+                }
+            }
+
+            return result;
+        }
+
+        private List<Row> applyAggregatesToRows(List<Row> rows) {
+            if (groupingSet != null && groupingSet.length > 0) {
+                // has group by, group firstly
+                Map<Row, List<Row>> buffer = new HashMap<>();
+                for (Row row : rows) {
+                    Row bufferKey = new Row(groupingSet.length);
+                    for (int i = 0; i < groupingSet.length; i++) {
+                        bufferKey.setField(i, row.getField(groupingSet[i]));
+                    }
+                    if (buffer.containsKey(bufferKey)) {
+                        buffer.get(bufferKey).add(row);
+                    } else {
+                        buffer.put(bufferKey, new ArrayList<>(Collections.singletonList(row)));
                     }
                 }
+                List<Row> result = new ArrayList<>();
+                for (Map.Entry<Row, List<Row>> entry : buffer.entrySet()) {
+                    result.add(Row.join(entry.getKey(), accumulateRows(entry.getValue())));
+                }
+                return result;
+            } else {
+                return Collections.singletonList(accumulateRows(rows));
+            }
+        }
+
+        // can only apply sum/sum0/avg function for long type fields for testing
+        private Row accumulateRows(List<Row> rows) {
+            Row result = new Row(aggregateExpressions.size());
+            for (int i = 0; i < aggregateExpressions.size(); i++) {
+                FunctionDefinition aggFunction =
+                        aggregateExpressions.get(i).getFunctionDefinition();
+                List<FieldReferenceExpression> arguments = aggregateExpressions.get(i).getArgs();
+                if (aggFunction instanceof MinAggFunction) {
+                    int argIndex = arguments.get(0).getFieldIndex();
+                    Row minRow =
+                            rows.stream()
+                                    .min(Comparator.comparing(row -> row.getFieldAs(argIndex)))
+                                    .get();
+                    result.setField(i, minRow.getField(argIndex));
+                } else if (aggFunction instanceof MaxAggFunction) {
+                    int argIndex = arguments.get(0).getFieldIndex();
+                    Row maxRow =
+                            rows.stream()
+                                    .max(Comparator.comparing(row -> row.getFieldAs(argIndex)))
+                                    .get();
+                    result.setField(i, maxRow.getField(argIndex));
+                } else if (aggFunction instanceof SumAggFunction) {
+                    int argIndex = arguments.get(0).getFieldIndex();
+                    Object finalSum =
+                            rows.stream()
+                                    .filter(row -> row.getField(argIndex) != null)
+                                    .mapToLong(row -> row.getFieldAs(argIndex))
+                                    .sum();
+                    result.setField(i, finalSum);
+                } else if (aggFunction instanceof Sum0AggFunction) {
+                    int argIndex = arguments.get(0).getFieldIndex();
+                    Object finalSum0 =
+                            rows.stream()
+                                    .filter(row -> row.getField(argIndex) != null)
+                                    .mapToLong(row -> row.getFieldAs(argIndex))
+                                    .sum();
+                    result.setField(i, finalSum0);
+                } else if (aggFunction instanceof CountAggFunction

Review comment:
       if all input is nll, CountAggFunction will return 0

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalSortAggWithoutSortIntoScanRule.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalLocalSortAggregate;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+
+/**
+ * Planner rule that tries to push a local hash or sort aggregate which without sort into a {@link

Review comment:
       please update the java doc: remove "local hash or"

##########
File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/LocalAggregatePushDownITCase.scala
##########
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.batch.sql.agg
+
+import org.apache.flink.table.api.DataTypes
+import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions}
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import org.apache.flink.table.planner.functions.aggfunctions.CollectAggFunction
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData}
+import org.junit.{Before, Test}
+
+class LocalAggregatePushDownITCase extends BatchTestBase {
+
+  @Before
+  override def before(): Unit = {
+    super.before()
+    env.setParallelism(1) // set sink parallelism to 1
+    conf.getConfiguration.setBoolean(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED, true)
+    val testDataId = TestValuesTableFactory.registerData(TestData.personData)
+    val ddl =
+      s"""
+         |CREATE TABLE AggregatableTable (
+         |  id int,
+         |  age int,
+         |  name string,
+         |  height int,
+         |  gender string,
+         |  deposit bigint,
+         |  points bigint
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$testDataId',
+         |  'filterable-fields' = 'id',
+         |  'bounded' = 'true'
+         |)
+       """.stripMargin
+    tEnv.executeSql(ddl)
+
+  }
+
+  @Test
+  def testPushDownLocalHashAggWithGroup(): Unit = {
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit) as avg_dep,
+        |  sum(deposit),
+        |  count(1),
+        |  gender
+        |FROM
+        |  AggregatableTable
+        |GROUP BY gender
+        |ORDER BY avg_dep
+        |""".stripMargin,
+      Seq(
+        row(126, 630, 5, "f"),
+        row(220, 1320, 6, "m"))
+    )
+  }
+
+  @Test
+  def testDisablePushDownLocalAgg(): Unit = {
+    // disable push down local agg
+    tEnv.getConfig.getConfiguration.setBoolean(
+        OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
+        false)
+
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit) as avg_dep,
+        |  sum(deposit),
+        |  count(1),
+        |  gender
+        |FROM
+        |  AggregatableTable
+        |GROUP BY gender
+        |ORDER BY avg_dep
+        |""".stripMargin,
+      Seq(
+        row(126, 630, 5, "f"),
+        row(220, 1320, 6, "m"))
+    )
+
+    // reset config
+    tEnv.getConfig.getConfiguration.setBoolean(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
+      true)
+  }
+
+  @Test
+  def testPushDownLocalHashAggWithOutGroup(): Unit = {

Review comment:
       nit: WithOut -> without

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalSortAggWithoutSortIntoScanRule.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalLocalSortAggregate;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+
+/**
+ * Planner rule that tries to push a local hash or sort aggregate which without sort into a {@link
+ * BatchPhysicalTableSourceScan} whose table is a {@link TableSourceTable} with a source supporting
+ * {@link SupportsAggregatePushDown}. The {@link
+ * OptimizerConfigOptions#TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED} need to be true.
+ *
+ * <p>Suppose we have the original physical plan:
+ *
+ * <pre>{@code
+ * BatchPhysicalHashAggregate (global)
+ * +- BatchPhysicalExchange (hash by group keys if group keys is not empty, else singleton)
+ *    +- BatchPhysicalLocalSortAggregate (local)
+ *       +- BatchPhysicalTableSourceScan
+ * }</pre>
+ *
+ * <p>This physical plan will be rewritten to:
+ *
+ * <pre>{@code
+ * BatchPhysicalHashAggregate (global)

Review comment:
       BatchPhysicalSortAggregate

##########
File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalTableSourceScan.scala
##########
@@ -18,6 +18,12 @@
 
 package org.apache.flink.table.planner.plan.nodes.physical.batch
 
+import java.util
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.hint.RelHint
+import org.apache.calcite.rel.metadata.RelMetadataQuery

Review comment:
       nit: reorder the imports

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/AggregatePushDownSpec.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.abilities.source;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
+import org.apache.flink.table.expressions.AggregateExpression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.planner.functions.aggfunctions.AvgAggFunction;
+import org.apache.flink.table.planner.functions.aggfunctions.CountAggFunction;
+import org.apache.flink.table.planner.functions.aggfunctions.Sum0AggFunction;
+import org.apache.flink.table.planner.plan.utils.AggregateInfo;
+import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A sub-class of {@link SourceAbilitySpec} that can not only serialize/deserialize the aggregation
+ * to/from JSON, but also can push the local aggregate into a {@link SupportsAggregatePushDown}.
+ */
+@JsonTypeName("AggregatePushDown")
+public class AggregatePushDownSpec extends SourceAbilitySpecBase {
+
+    public static final String FIELD_NAME_INPUT_TYPE = "inputType";
+
+    public static final String FIELD_NAME_GROUPING_SETS = "groupingSets";
+
+    public static final String FIELD_NAME_AGGREGATE_CALLS = "aggregateCalls";
+
+    @JsonProperty(FIELD_NAME_INPUT_TYPE)
+    private final RowType inputType;
+
+    @JsonProperty(FIELD_NAME_GROUPING_SETS)
+    private final List<int[]> groupingSets;
+
+    @JsonProperty(FIELD_NAME_AGGREGATE_CALLS)
+    private final List<AggregateCall> aggregateCalls;
+
+    @JsonCreator
+    public AggregatePushDownSpec(
+            @JsonProperty(FIELD_NAME_INPUT_TYPE) RowType inputType,
+            @JsonProperty(FIELD_NAME_GROUPING_SETS) List<int[]> groupingSets,
+            @JsonProperty(FIELD_NAME_AGGREGATE_CALLS) List<AggregateCall> aggregateCalls,
+            @JsonProperty(FIELD_NAME_PRODUCED_TYPE) RowType producedType) {
+        super(producedType);
+
+        this.inputType = inputType;
+        this.groupingSets = new ArrayList<>(checkNotNull(groupingSets));
+        this.aggregateCalls = aggregateCalls;
+    }
+
+    @Override
+    public void apply(DynamicTableSource tableSource, SourceAbilityContext context) {
+        checkArgument(getProducedType().isPresent());
+        apply(inputType, groupingSets, aggregateCalls, getProducedType().get(), tableSource);
+    }
+
+    @Override
+    public String getDigests(SourceAbilityContext context) {
+        int[] grouping = ArrayUtils.addAll(groupingSets.get(0), groupingSets.get(1));
+        String groupingStr =
+                Arrays.stream(grouping)
+                        .mapToObj(index -> inputType.getFieldNames().get(index))
+                        .collect(Collectors.joining(","));
+
+        List<AggregateExpression> aggregateExpressions =
+                buildAggregateExpressions(inputType, aggregateCalls);
+        String aggFunctionsStr =
+                aggregateExpressions.stream()
+                        .map(AggregateExpression::asSummaryString)
+                        .collect(Collectors.joining(","));
+
+        return "aggregates=[grouping=["
+                + groupingStr
+                + "], aggFunctions=["
+                + aggFunctionsStr
+                + "]]";
+    }
+
+    public static boolean apply(
+            RowType inputType,
+            List<int[]> groupingSets,
+            List<AggregateCall> aggregateCalls,
+            RowType producedType,
+            DynamicTableSource tableSource) {
+        List<AggregateExpression> aggregateExpressions =
+                buildAggregateExpressions(inputType, aggregateCalls);
+
+        if (tableSource instanceof SupportsAggregatePushDown) {
+            DataType producedDataType = TypeConversions.fromLogicalToDataType(producedType);
+            return ((SupportsAggregatePushDown) tableSource)
+                    .applyAggregates(groupingSets, aggregateExpressions, producedDataType);
+        } else {
+            throw new TableException(
+                    String.format(
+                            "%s does not support SupportsAggregatePushDown.",
+                            tableSource.getClass().getName()));
+        }
+    }
+
+    private static List<AggregateExpression> buildAggregateExpressions(
+            RowType inputType, List<AggregateCall> aggregateCalls) {
+        AggregateInfoList aggInfoList =
+                AggregateUtil.transformToBatchAggregateInfoList(

Review comment:
       we should improve the java-doc or add some validation to make sure the AggregatePushDownSpec can be only used for batch now

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoScanRuleBase.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.abilities.source.AggregatePushDownSpec;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.core.AggregateCall;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Planner rule that tries to push a local aggregator into an {@link BatchPhysicalTableSourceScan}
+ * whose table is a {@link TableSourceTable} with a source supporting {@link
+ * SupportsAggregatePushDown}.
+ *
+ * <p>The aggregate push down does not support a number of more complex statements at present:
+ *
+ * <ul>
+ *   <li>complex grouping operations such as ROLLUP, CUBE, or GROUPING SETS.
+ *   <li>expressions inside the aggregation function call: such as sum(a * b).
+ *   <li>aggregations with ordering.
+ *   <li>aggregations with filter.
+ * </ul>
+ */
+public abstract class PushLocalAggIntoScanRuleBase extends RelOptRule {
+
+    public PushLocalAggIntoScanRuleBase(RelOptRuleOperand operand, String description) {
+        super(operand, description);
+    }
+
+    protected boolean isMatch(
+            RelOptRuleCall call,
+            BatchPhysicalGroupAggregateBase aggregate,
+            BatchPhysicalTableSourceScan tableSourceScan) {
+        TableConfig tableConfig = ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig();
+        if (!tableConfig
+                .getConfiguration()
+                .getBoolean(
+                        OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED)) {
+            return false;
+        }
+
+        if (aggregate.isFinal() || aggregate.getAggCallList().isEmpty()) {
+            return false;
+        }
+        List<AggregateCall> aggCallList =
+                JavaScalaConversionUtil.toJava(aggregate.getAggCallList());
+        for (AggregateCall aggCall : aggCallList) {
+            if (aggCall.isDistinct()
+                    || aggCall.isApproximate()
+                    || aggCall.getArgList().size() > 1
+                    || aggCall.hasFilter()
+                    || !aggCall.getCollation().getFieldCollations().isEmpty()) {
+                return false;
+            }
+        }
+        TableSourceTable tableSourceTable = tableSourceScan.tableSourceTable();
+        // we can not push aggregates twice
+        return tableSourceTable != null
+                && tableSourceTable.tableSource() instanceof SupportsAggregatePushDown
+                && Arrays.stream(tableSourceTable.abilitySpecs())
+                        .noneMatch(spec -> spec instanceof AggregatePushDownSpec);
+    }
+
+    protected void pushLocalAggregateIntoScan(
+            RelOptRuleCall call,
+            BatchPhysicalGroupAggregateBase localAgg,
+            BatchPhysicalTableSourceScan oldScan) {
+        RowType inputType = FlinkTypeFactory.toLogicalRowType(oldScan.getRowType());
+        List<int[]> groupingSets = Arrays.asList(localAgg.grouping(), localAgg.auxGrouping());

Review comment:
       `grouping` and `auxGrouping` should be combined into ONE array. Actually, we only support 'single' group aggregate. `groupingSets` here should always have one element.

##########
File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
##########
@@ -446,6 +446,9 @@ object FlinkBatchRuleSets {
     */
   val PHYSICAL_REWRITE: RuleSet = RuleSets.ofList(
     EnforceLocalHashAggRule.INSTANCE,
-    EnforceLocalSortAggRule.INSTANCE
+    EnforceLocalSortAggRule.INSTANCE,
+    PushLocalHashAggIntoScanRule.INSTANCE,
+    PushLocalSortAggWithSortIntoScanRule.INSTANCE,
+    PushLocalSortAggWithoutSortIntoScanRule.INSTANCE

Review comment:
       Some table sources may do not implement `SupportsProjectionPushDown`, so  the pattern `Scan -> Calc (only reference projection) -> local agg` does also exist

##########
File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala
##########
@@ -18,19 +18,18 @@
 
 package org.apache.flink.table.planner.plan.schema
 
+import java.util
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.RelOptSchema
+import org.apache.calcite.rel.`type`.RelDataType

Review comment:
       reorder the imports

##########
File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
##########
@@ -279,6 +281,21 @@ object AggregateUtil extends Enumeration {
       isBounded = false)
   }
 
+  def deriveSumAndCountFromAvg(avgAggFunction: UserDefinedFunction
+                              ): (Sum0AggFunction, CountAggFunction) = {

Review comment:
       the indent seems not correct

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/AggregatePushDownSpec.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.abilities.source;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
+import org.apache.flink.table.expressions.AggregateExpression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.planner.functions.aggfunctions.AvgAggFunction;
+import org.apache.flink.table.planner.functions.aggfunctions.CountAggFunction;
+import org.apache.flink.table.planner.functions.aggfunctions.Sum0AggFunction;
+import org.apache.flink.table.planner.plan.utils.AggregateInfo;
+import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A sub-class of {@link SourceAbilitySpec} that can not only serialize/deserialize the aggregation
+ * to/from JSON, but also can push the local aggregate into a {@link SupportsAggregatePushDown}.
+ */
+@JsonTypeName("AggregatePushDown")
+public class AggregatePushDownSpec extends SourceAbilitySpecBase {
+
+    public static final String FIELD_NAME_INPUT_TYPE = "inputType";
+
+    public static final String FIELD_NAME_GROUPING_SETS = "groupingSets";
+
+    public static final String FIELD_NAME_AGGREGATE_CALLS = "aggregateCalls";
+
+    @JsonProperty(FIELD_NAME_INPUT_TYPE)
+    private final RowType inputType;
+
+    @JsonProperty(FIELD_NAME_GROUPING_SETS)
+    private final List<int[]> groupingSets;
+
+    @JsonProperty(FIELD_NAME_AGGREGATE_CALLS)
+    private final List<AggregateCall> aggregateCalls;
+
+    @JsonCreator
+    public AggregatePushDownSpec(
+            @JsonProperty(FIELD_NAME_INPUT_TYPE) RowType inputType,
+            @JsonProperty(FIELD_NAME_GROUPING_SETS) List<int[]> groupingSets,
+            @JsonProperty(FIELD_NAME_AGGREGATE_CALLS) List<AggregateCall> aggregateCalls,
+            @JsonProperty(FIELD_NAME_PRODUCED_TYPE) RowType producedType) {
+        super(producedType);
+
+        this.inputType = inputType;
+        this.groupingSets = new ArrayList<>(checkNotNull(groupingSets));
+        this.aggregateCalls = aggregateCalls;
+    }
+
+    @Override
+    public void apply(DynamicTableSource tableSource, SourceAbilityContext context) {
+        checkArgument(getProducedType().isPresent());
+        apply(inputType, groupingSets, aggregateCalls, getProducedType().get(), tableSource);
+    }
+
+    @Override
+    public String getDigests(SourceAbilityContext context) {
+        int[] grouping = ArrayUtils.addAll(groupingSets.get(0), groupingSets.get(1));

Review comment:
       should only use the first element

##########
File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/LocalAggregatePushDownITCase.scala
##########
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.batch.sql.agg
+
+import org.apache.flink.table.api.DataTypes
+import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions}
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import org.apache.flink.table.planner.functions.aggfunctions.CollectAggFunction
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData}
+import org.junit.{Before, Test}
+
+class LocalAggregatePushDownITCase extends BatchTestBase {
+
+  @Before
+  override def before(): Unit = {
+    super.before()
+    env.setParallelism(1) // set sink parallelism to 1
+    conf.getConfiguration.setBoolean(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED, true)
+    val testDataId = TestValuesTableFactory.registerData(TestData.personData)
+    val ddl =
+      s"""
+         |CREATE TABLE AggregatableTable (
+         |  id int,
+         |  age int,
+         |  name string,
+         |  height int,
+         |  gender string,
+         |  deposit bigint,
+         |  points bigint
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$testDataId',
+         |  'filterable-fields' = 'id',
+         |  'bounded' = 'true'
+         |)
+       """.stripMargin
+    tEnv.executeSql(ddl)
+
+  }
+
+  @Test
+  def testPushDownLocalHashAggWithGroup(): Unit = {
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit) as avg_dep,
+        |  sum(deposit),
+        |  count(1),
+        |  gender
+        |FROM
+        |  AggregatableTable
+        |GROUP BY gender
+        |ORDER BY avg_dep
+        |""".stripMargin,
+      Seq(
+        row(126, 630, 5, "f"),
+        row(220, 1320, 6, "m"))
+    )
+  }
+
+  @Test
+  def testDisablePushDownLocalAgg(): Unit = {
+    // disable push down local agg
+    tEnv.getConfig.getConfiguration.setBoolean(
+        OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
+        false)
+
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit) as avg_dep,
+        |  sum(deposit),
+        |  count(1),
+        |  gender
+        |FROM
+        |  AggregatableTable
+        |GROUP BY gender
+        |ORDER BY avg_dep
+        |""".stripMargin,
+      Seq(
+        row(126, 630, 5, "f"),
+        row(220, 1320, 6, "m"))
+    )
+
+    // reset config
+    tEnv.getConfig.getConfiguration.setBoolean(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
+      true)
+  }
+
+  @Test
+  def testPushDownLocalHashAggWithOutGroup(): Unit = {
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit),
+        |  sum(deposit),
+        |  count(*)
+        |FROM
+        |  AggregatableTable
+        |""".stripMargin,
+      Seq(
+        row(177, 1950, 11))
+    )
+  }
+
+  @Test
+  def testPushDownLocalSortAggWithoutSort(): Unit = {
+    // enable sort agg
+    tEnv.getConfig.getConfiguration.setString(
+      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
+
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit),
+        |  sum(deposit),
+        |  count(*)
+        |FROM
+        |  AggregatableTable
+        |""".stripMargin,
+      Seq(
+        row(177, 1950, 11))
+    )
+
+    // reset config
+    tEnv.getConfig.getConfiguration.setString(
+      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "")
+  }
+
+  @Test
+  def testPushDownLocalSortAggWithSort(): Unit = {
+    // enable sort agg
+    tEnv.getConfig.getConfiguration.setString(
+      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
+
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit),
+        |  sum(deposit),
+        |  count(1),
+        |  gender,
+        |  age
+        |FROM
+        |  AggregatableTable
+        |GROUP BY gender, age
+        |""".stripMargin,
+      Seq(
+        row(50, 50, 1, "f", 19),
+        row(200, 200, 1, "f", 20),
+        row(250, 750, 3, "m", 23),
+        row(126, 380, 3, "f", 25),
+        row(300, 300, 1, "m", 27),
+        row(170, 170, 1, "m", 28),
+        row(100, 100, 1, "m", 34))
+    )
+
+    // reset config
+    tEnv.getConfig.getConfiguration.setString(
+      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "")
+  }
+
+  @Test
+  def testAggWithAuxGrouping(): Unit = {
+    checkResult(
+      """
+        |SELECT
+        |  name,
+        |  d,
+        |  p,
+        |  count(*)
+        |FROM (
+        |  SELECT
+        |    name,
+        |    sum(deposit) as d,
+        |    max(points) as p
+        |  FROM AggregatableTable
+        |  GROUP BY name
+        |) t
+        |GROUP BY name, d, p
+        |""".stripMargin,
+      Seq(
+        row("tom", 200, 1000, 1),
+        row("mary", 100, 1000, 1),
+        row("jack", 150, 1300, 1),
+        row("rose", 100, 500, 1),
+        row("danny", 300, 300, 1),
+        row("tommas", 400, 4000, 1),
+        row("olivia", 50, 9000, 1),
+        row("stef", 100, 1900, 1),
+        row("emma", 180, 800, 1),
+        row("benji", 170, 11000, 1),
+        row("eva", 200, 1000, 1))
+    )
+  }
+
+  @Test
+  def testPushDownLocalAggAfterFilterPushDown(): Unit = {
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit),
+        |  sum(deposit),
+        |  count(1),
+        |  gender,
+        |  age
+        |FROM
+        |  AggregatableTable
+        |WHERE age <= 20
+        |GROUP BY gender, age
+        |""".stripMargin,
+      Seq(
+        row(50, 50, 1, "f", 19),
+        row(200, 200, 1, "f", 20))
+    )
+  }
+
+  @Test
+  def testLocalAggWithLimit(): Unit = {

Review comment:
       the unsupported cases can be removed from IT case, because plan tests have covered such case

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -867,27 +885,118 @@ public String asSummaryString() {
                     allPartitions.isEmpty()
                             ? Collections.singletonList(Collections.emptyMap())
                             : allPartitions;
+
             int numRetained = 0;
             for (Map<String, String> partition : keys) {
-                for (Row row : data.get(partition)) {
+                Collection<Row> rowsInPartition = data.get(partition);
+
+                // handle predicates and projection
+                List<Row> rowsRetained =
+                        rowsInPartition.stream()
+                                .filter(
+                                        row ->
+                                                FilterUtils.isRetainedAfterApplyingFilterPredicates(
+                                                        filterPredicates, getValueGetter(row)))
+                                .map(
+                                        row -> {
+                                            Row projectedRow = projectRow(row);
+                                            projectedRow.setKind(row.getKind());
+                                            return projectedRow;
+                                        })
+                                .collect(Collectors.toList());
+
+                // handle aggregates
+                if (!aggregateExpressions.isEmpty()) {
+                    rowsRetained = applyAggregatesToRows(rowsRetained);
+                }
+
+                // handle row data
+                for (Row row : rowsRetained) {
+                    final RowData rowData = (RowData) converter.toInternal(row);
+                    if (rowData != null) {
+                        if (numRetained >= numElementToSkip) {
+                            rowData.setRowKind(row.getKind());
+                            result.add(rowData);
+                        }
+                        numRetained++;
+                    }
+
+                    // handle limit. No aggregates will be pushed down when there is a limit.
                     if (result.size() >= limit) {
                         return result;
                     }
-                    boolean isRetained =
-                            FilterUtils.isRetainedAfterApplyingFilterPredicates(
-                                    filterPredicates, getValueGetter(row));
-                    if (isRetained) {
-                        final Row projectedRow = projectRow(row);
-                        final RowData rowData = (RowData) converter.toInternal(projectedRow);
-                        if (rowData != null) {
-                            if (numRetained >= numElementToSkip) {
-                                rowData.setRowKind(row.getKind());
-                                result.add(rowData);
-                            }
-                            numRetained++;
-                        }
+                }
+            }
+
+            return result;
+        }
+
+        private List<Row> applyAggregatesToRows(List<Row> rows) {
+            if (groupingSet != null && groupingSet.length > 0) {
+                // has group by, group firstly
+                Map<Row, List<Row>> buffer = new HashMap<>();
+                for (Row row : rows) {
+                    Row bufferKey = new Row(groupingSet.length);
+                    for (int i = 0; i < groupingSet.length; i++) {
+                        bufferKey.setField(i, row.getField(groupingSet[i]));
+                    }
+                    if (buffer.containsKey(bufferKey)) {
+                        buffer.get(bufferKey).add(row);
+                    } else {
+                        buffer.put(bufferKey, new ArrayList<>(Collections.singletonList(row)));
                     }
                 }
+                List<Row> result = new ArrayList<>();
+                for (Map.Entry<Row, List<Row>> entry : buffer.entrySet()) {
+                    result.add(Row.join(entry.getKey(), accumulateRows(entry.getValue())));
+                }
+                return result;
+            } else {
+                return Collections.singletonList(accumulateRows(rows));
+            }
+        }
+
+        // can only apply sum/sum0/avg function for long type fields for testing
+        private Row accumulateRows(List<Row> rows) {
+            Row result = new Row(aggregateExpressions.size());
+            for (int i = 0; i < aggregateExpressions.size(); i++) {
+                FunctionDefinition aggFunction =
+                        aggregateExpressions.get(i).getFunctionDefinition();
+                List<FieldReferenceExpression> arguments = aggregateExpressions.get(i).getArgs();
+                if (aggFunction instanceof MinAggFunction) {
+                    int argIndex = arguments.get(0).getFieldIndex();
+                    Row minRow =
+                            rows.stream()
+                                    .min(Comparator.comparing(row -> row.getFieldAs(argIndex)))
+                                    .get();
+                    result.setField(i, minRow.getField(argIndex));
+                } else if (aggFunction instanceof MaxAggFunction) {
+                    int argIndex = arguments.get(0).getFieldIndex();
+                    Row maxRow =
+                            rows.stream()
+                                    .max(Comparator.comparing(row -> row.getFieldAs(argIndex)))
+                                    .get();
+                    result.setField(i, maxRow.getField(argIndex));
+                } else if (aggFunction instanceof SumAggFunction) {
+                    int argIndex = arguments.get(0).getFieldIndex();
+                    Object finalSum =
+                            rows.stream()
+                                    .filter(row -> row.getField(argIndex) != null)
+                                    .mapToLong(row -> row.getFieldAs(argIndex))
+                                    .sum();

Review comment:
       if all input is null, SumAggFunction will output null 

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.java
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.functions.aggfunctions.CollectAggFunction;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test for {@link PushLocalHashAggIntoScanRule}, {@link PushLocalSortAggWithSortIntoScanRule} and
+ * {@link PushLocalSortAggWithoutSortIntoScanRule}.
+ */
+public class PushLocalAggIntoTableSourceScanRuleTest extends TableTestBase {

Review comment:
       Please add some test cases to cover push down partition and local aggregate , push down meta column with local aggregate, push down local aggregate without projection push down. so does IT case

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.java
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.functions.aggfunctions.CollectAggFunction;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test for {@link PushLocalHashAggIntoScanRule}, {@link PushLocalSortAggWithSortIntoScanRule} and
+ * {@link PushLocalSortAggWithoutSortIntoScanRule}.
+ */
+public class PushLocalAggIntoTableSourceScanRuleTest extends TableTestBase {
+    protected BatchTableTestUtil util = batchTestUtil(new TableConfig());
+
+    @Before
+    public void setup() {
+        TableConfig tableConfig = util.tableEnv().getConfig();
+        tableConfig
+                .getConfiguration()
+                .setBoolean(
+                        OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
+                        true);
+        String ddl =
+                "CREATE TABLE inventory (\n"
+                        + "  id BIGINT,\n"
+                        + "  name STRING,\n"
+                        + "  amount BIGINT,\n"
+                        + "  price BIGINT,\n"
+                        + "  type STRING\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'values',\n"
+                        + " 'filterable-fields' = 'id',\n"
+                        + " 'bounded' = 'true'\n"
+                        + ")";
+        util.tableEnv().executeSql(ddl);
+    }
+
+    @Test
+    public void testCanPushDownLocalHashAggWithGroup() {
+        util.verifyRelPlan(
+                "SELECT\n"
+                        + "  sum(amount),\n"
+                        + "  name,\n"
+                        + "  type\n"
+                        + "FROM inventory\n"
+                        + "  group by name, type");
+    }
+
+    @Test
+    public void testDisablePushDownLocalAgg() {
+        // disable push down local agg
+        util.getTableEnv()
+                .getConfig()
+                .getConfiguration()
+                .setBoolean(
+                        OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
+                        false);
+
+        util.verifyRelPlan(
+                "SELECT\n"
+                        + "  sum(amount),\n"
+                        + "  name,\n"
+                        + "  type\n"
+                        + "FROM inventory\n"
+                        + "  group by name, type");
+
+        // reset config
+        util.getTableEnv()
+                .getConfig()
+                .getConfiguration()
+                .setBoolean(
+                        OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
+                        true);
+    }
+
+    @Test
+    public void testCanPushDownLocalHashAggWithoutGroup() {
+        util.verifyRelPlan(
+                "SELECT\n"
+                        + "  min(id),\n"
+                        + "  max(amount),\n"
+                        + "  sum(price),\n"
+                        + "  avg(price),\n"
+                        + "  count(id)\n"
+                        + "FROM inventory");
+    }
+
+    @Test
+    public void testCanPushDownLocalSortAggWithoutSort() {
+        // enable sort agg
+        util.getTableEnv()
+                .getConfig()
+                .getConfiguration()
+                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
+
+        util.verifyRelPlan(
+                "SELECT\n"
+                        + "  min(id),\n"
+                        + "  max(amount),\n"
+                        + "  sum(price),\n"
+                        + "  avg(price),\n"
+                        + "  count(id)\n"
+                        + "FROM inventory");
+
+        // reset config
+        util.getTableEnv()
+                .getConfig()
+                .getConfiguration()
+                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "");
+    }
+
+    @Test
+    public void testCanPushDownLocalSortAggWithSort() {
+        // enable sort agg
+        util.getTableEnv()
+                .getConfig()
+                .getConfiguration()
+                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
+
+        util.verifyRelPlan(
+                "SELECT\n"
+                        + "  sum(amount),\n"
+                        + "  name,\n"
+                        + "  type\n"
+                        + "FROM inventory\n"
+                        + "  group by name, type");
+
+        // reset config
+        util.getTableEnv()
+                .getConfig()
+                .getConfiguration()
+                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "");
+    }
+
+    @Test
+    public void testCanPushDownLocalAggWithAuxGrouping() {

Review comment:
       the test is not correct 

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -867,27 +885,118 @@ public String asSummaryString() {
                     allPartitions.isEmpty()
                             ? Collections.singletonList(Collections.emptyMap())
                             : allPartitions;
+
             int numRetained = 0;
             for (Map<String, String> partition : keys) {
-                for (Row row : data.get(partition)) {
+                Collection<Row> rowsInPartition = data.get(partition);
+
+                // handle predicates and projection
+                List<Row> rowsRetained =
+                        rowsInPartition.stream()
+                                .filter(
+                                        row ->
+                                                FilterUtils.isRetainedAfterApplyingFilterPredicates(
+                                                        filterPredicates, getValueGetter(row)))
+                                .map(
+                                        row -> {
+                                            Row projectedRow = projectRow(row);
+                                            projectedRow.setKind(row.getKind());
+                                            return projectedRow;
+                                        })
+                                .collect(Collectors.toList());
+
+                // handle aggregates
+                if (!aggregateExpressions.isEmpty()) {
+                    rowsRetained = applyAggregatesToRows(rowsRetained);
+                }
+
+                // handle row data
+                for (Row row : rowsRetained) {
+                    final RowData rowData = (RowData) converter.toInternal(row);
+                    if (rowData != null) {
+                        if (numRetained >= numElementToSkip) {

Review comment:
       skipping the original data instead of the result data is more reasonable

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.java
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.functions.aggfunctions.CollectAggFunction;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test for {@link PushLocalHashAggIntoScanRule}, {@link PushLocalSortAggWithSortIntoScanRule} and
+ * {@link PushLocalSortAggWithoutSortIntoScanRule}.
+ */
+public class PushLocalAggIntoTableSourceScanRuleTest extends TableTestBase {
+    protected BatchTableTestUtil util = batchTestUtil(new TableConfig());
+
+    @Before
+    public void setup() {
+        TableConfig tableConfig = util.tableEnv().getConfig();
+        tableConfig
+                .getConfiguration()
+                .setBoolean(
+                        OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
+                        true);
+        String ddl =
+                "CREATE TABLE inventory (\n"
+                        + "  id BIGINT,\n"
+                        + "  name STRING,\n"
+                        + "  amount BIGINT,\n"
+                        + "  price BIGINT,\n"
+                        + "  type STRING\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'values',\n"
+                        + " 'filterable-fields' = 'id',\n"
+                        + " 'bounded' = 'true'\n"
+                        + ")";
+        util.tableEnv().executeSql(ddl);
+    }
+
+    @Test
+    public void testCanPushDownLocalHashAggWithGroup() {
+        util.verifyRelPlan(
+                "SELECT\n"
+                        + "  sum(amount),\n"
+                        + "  name,\n"
+                        + "  type\n"
+                        + "FROM inventory\n"
+                        + "  group by name, type");
+    }
+
+    @Test
+    public void testDisablePushDownLocalAgg() {
+        // disable push down local agg
+        util.getTableEnv()
+                .getConfig()
+                .getConfiguration()
+                .setBoolean(
+                        OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
+                        false);
+
+        util.verifyRelPlan(
+                "SELECT\n"
+                        + "  sum(amount),\n"
+                        + "  name,\n"
+                        + "  type\n"
+                        + "FROM inventory\n"
+                        + "  group by name, type");
+
+        // reset config
+        util.getTableEnv()
+                .getConfig()
+                .getConfiguration()
+                .setBoolean(
+                        OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
+                        true);
+    }
+
+    @Test
+    public void testCanPushDownLocalHashAggWithoutGroup() {
+        util.verifyRelPlan(
+                "SELECT\n"
+                        + "  min(id),\n"
+                        + "  max(amount),\n"
+                        + "  sum(price),\n"
+                        + "  avg(price),\n"
+                        + "  count(id)\n"
+                        + "FROM inventory");
+    }
+
+    @Test
+    public void testCanPushDownLocalSortAggWithoutSort() {
+        // enable sort agg
+        util.getTableEnv()
+                .getConfig()
+                .getConfiguration()
+                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
+
+        util.verifyRelPlan(
+                "SELECT\n"
+                        + "  min(id),\n"
+                        + "  max(amount),\n"
+                        + "  sum(price),\n"
+                        + "  avg(price),\n"
+                        + "  count(id)\n"
+                        + "FROM inventory");
+
+        // reset config
+        util.getTableEnv()
+                .getConfig()
+                .getConfiguration()
+                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "");
+    }
+
+    @Test
+    public void testCanPushDownLocalSortAggWithSort() {
+        // enable sort agg
+        util.getTableEnv()
+                .getConfig()
+                .getConfiguration()
+                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
+
+        util.verifyRelPlan(
+                "SELECT\n"
+                        + "  sum(amount),\n"
+                        + "  name,\n"
+                        + "  type\n"
+                        + "FROM inventory\n"
+                        + "  group by name, type");
+
+        // reset config
+        util.getTableEnv()
+                .getConfig()
+                .getConfiguration()
+                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "");
+    }
+
+    @Test
+    public void testCanPushDownLocalAggWithAuxGrouping() {
+        util.verifyRelPlan(
+                "SELECT\n"
+                        + "  name,\n"
+                        + "  a,\n"
+                        + "  p,\n"
+                        + "  count(*)\n"
+                        + "FROM (\n"
+                        + "  SELECT\n"
+                        + "    name,\n"
+                        + "    sum(amount) as a,\n"
+                        + "    max(price) as p\n"
+                        + "  FROM inventory\n"
+                        + "    group by name\n"
+                        + ") t\n"
+                        + "  group by name, a, p");
+    }
+
+    @Test
+    public void testCanPushDownLocalAggAfterFilterPushDown() {
+
+        util.verifyRelPlan(
+                "SELECT\n"
+                        + "  sum(amount),\n"
+                        + "  name,\n"
+                        + "  type\n"
+                        + "FROM inventory\n"
+                        + "  where id = 123\n"
+                        + "  group by name, type");
+    }
+
+    @Test
+    public void testCannotPushDownLocalAggAfterLimitPushDown() {
+
+        util.verifyRelPlan(
+                "SELECT\n"
+                        + "  sum(amount),\n"
+                        + "  name,\n"
+                        + "  type\n"
+                        + "FROM (\n"
+                        + "  SELECT\n"
+                        + "    *\n"
+                        + "  FROM inventory\n"
+                        + "  LIMIT 100\n"
+                        + ") t\n"
+                        + "  group by name, type");
+    }
+
+    @Test
+    public void testCannotPushDownLocalAggWithUDAF() {
+        // add udf
+        util.addTemporarySystemFunction(
+                "udaf_collect", new CollectAggFunction<>(DataTypes.BIGINT().getLogicalType()));
+
+        util.verifyRelPlan(
+                "SELECT\n"
+                        + "  udaf_collect(amount),\n"
+                        + "  name,\n"
+                        + "  type\n"
+                        + "FROM inventory\n"
+                        + "  group by name, type");
+    }
+
+    @Test
+    public void testCannotPushDownLocalAggWithUnsupportedDataTypes() {
+        util.verifyRelPlan(
+                "SELECT\n"
+                        + "  max(name),\n"
+                        + "  type\n"
+                        + "FROM inventory\n"
+                        + "  group by type");
+    }
+
+    @Test
+    public void testCannotPushDownWithColumnExpression() {
+        util.verifyRelPlan(
+                "SELECT\n"
+                        + "  min(amount + price),\n"
+                        + "  max(amount),\n"
+                        + "  sum(price),\n"
+                        + "  count(id),\n"
+                        + "  name\n"
+                        + "FROM inventory\n"
+                        + "  group by name");
+    }
+
+    @Test
+    public void testCannotPushDownWithUnsupportedAggFunction() {
+        util.verifyRelPlan(
+                "SELECT\n"
+                        + "  min(id),\n"
+                        + "  max(amount),\n"
+                        + "  sum(price),\n"
+                        + "  count(distinct id),\n"
+                        + "  name\n"
+                        + "FROM inventory\n"
+                        + "  group by name");
+    }
+
+    @Test
+    public void testCannotPushDownWithWindowAggFunction() {
+        util.verifyRelPlan(
+                "SELECT\n"
+                        + "  id,\n"
+                        + "  amount,\n"
+                        + "  sum(price) over (partition by name),\n"
+                        + "  name\n"
+                        + "FROM inventory");
+    }
+
+    @Test
+    public void testCannotPushDownWithFilter() {

Review comment:
       testCannotPushDownWithFilter -> testCannotPushDownWithArgFilter

##########
File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/LocalAggregatePushDownITCase.scala
##########
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.batch.sql.agg
+
+import org.apache.flink.table.api.DataTypes
+import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions}
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import org.apache.flink.table.planner.functions.aggfunctions.CollectAggFunction
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData}
+import org.junit.{Before, Test}
+
+class LocalAggregatePushDownITCase extends BatchTestBase {
+
+  @Before
+  override def before(): Unit = {
+    super.before()
+    env.setParallelism(1) // set sink parallelism to 1
+    conf.getConfiguration.setBoolean(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED, true)
+    val testDataId = TestValuesTableFactory.registerData(TestData.personData)
+    val ddl =
+      s"""
+         |CREATE TABLE AggregatableTable (
+         |  id int,
+         |  age int,
+         |  name string,
+         |  height int,
+         |  gender string,
+         |  deposit bigint,
+         |  points bigint
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$testDataId',
+         |  'filterable-fields' = 'id',
+         |  'bounded' = 'true'
+         |)
+       """.stripMargin
+    tEnv.executeSql(ddl)
+
+  }
+
+  @Test
+  def testPushDownLocalHashAggWithGroup(): Unit = {
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit) as avg_dep,
+        |  sum(deposit),
+        |  count(1),
+        |  gender
+        |FROM
+        |  AggregatableTable
+        |GROUP BY gender
+        |ORDER BY avg_dep
+        |""".stripMargin,
+      Seq(
+        row(126, 630, 5, "f"),
+        row(220, 1320, 6, "m"))
+    )
+  }
+
+  @Test
+  def testDisablePushDownLocalAgg(): Unit = {
+    // disable push down local agg
+    tEnv.getConfig.getConfiguration.setBoolean(
+        OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
+        false)
+
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit) as avg_dep,
+        |  sum(deposit),
+        |  count(1),
+        |  gender
+        |FROM
+        |  AggregatableTable
+        |GROUP BY gender
+        |ORDER BY avg_dep
+        |""".stripMargin,
+      Seq(
+        row(126, 630, 5, "f"),
+        row(220, 1320, 6, "m"))
+    )
+
+    // reset config
+    tEnv.getConfig.getConfiguration.setBoolean(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
+      true)
+  }
+
+  @Test
+  def testPushDownLocalHashAggWithOutGroup(): Unit = {
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit),
+        |  sum(deposit),
+        |  count(*)
+        |FROM
+        |  AggregatableTable
+        |""".stripMargin,
+      Seq(
+        row(177, 1950, 11))
+    )
+  }
+
+  @Test
+  def testPushDownLocalSortAggWithoutSort(): Unit = {
+    // enable sort agg
+    tEnv.getConfig.getConfiguration.setString(
+      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
+
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit),
+        |  sum(deposit),
+        |  count(*)
+        |FROM
+        |  AggregatableTable
+        |""".stripMargin,
+      Seq(
+        row(177, 1950, 11))
+    )
+
+    // reset config
+    tEnv.getConfig.getConfiguration.setString(
+      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "")
+  }
+
+  @Test
+  def testPushDownLocalSortAggWithSort(): Unit = {
+    // enable sort agg
+    tEnv.getConfig.getConfiguration.setString(
+      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
+
+    checkResult(
+      """
+        |SELECT
+        |  avg(deposit),
+        |  sum(deposit),
+        |  count(1),
+        |  gender,
+        |  age
+        |FROM
+        |  AggregatableTable
+        |GROUP BY gender, age
+        |""".stripMargin,
+      Seq(
+        row(50, 50, 1, "f", 19),
+        row(200, 200, 1, "f", 20),
+        row(250, 750, 3, "m", 23),
+        row(126, 380, 3, "f", 25),
+        row(300, 300, 1, "m", 27),
+        row(170, 170, 1, "m", 28),
+        row(100, 100, 1, "m", 34))
+    )
+
+    // reset config
+    tEnv.getConfig.getConfiguration.setString(
+      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "")
+  }
+
+  @Test
+  def testAggWithAuxGrouping(): Unit = {

Review comment:
       the push down local aggregate has no auxGrouping




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org