You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/25 13:08:27 UTC

[GitHub] [flink] wuchong commented on a change in pull request #14748: [FLINK-20894][Table SQL / API] Introduce SupportsAggregatePushDown interface

wuchong commented on a change in pull request #14748:
URL: https://github.com/apache/flink/pull/14748#discussion_r563673211



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/AggregateExpression.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** Expression for aggregate function which is corresponding to the AggregateCall in Calcite. */
+public class AggregateExpression implements ResolvedExpression {
+
+    private final FunctionDefinition functionDefinition;
+
+    private final List<FieldReferenceExpression> args;
+
+    private final @Nullable CallExpression filterExpression;
+
+    private final DataType resultType;
+
+    private final boolean distinct;
+
+    private final boolean approximate;
+
+    private final boolean ignoreNulls;
+
+    public AggregateExpression(
+            FunctionDefinition functionDefinition,
+            List<FieldReferenceExpression> args,
+            @Nullable CallExpression filterExpression,
+            DataType resultType,
+            boolean distinct,
+            boolean approximate,
+            boolean ignoreNulls) {
+        this.functionDefinition =
+                Preconditions.checkNotNull(
+                        functionDefinition, "Function definition must not be null.");
+        this.args = args;
+        this.filterExpression = filterExpression;
+        this.resultType = resultType;
+        this.distinct = distinct;
+        this.approximate = approximate;
+        this.ignoreNulls = ignoreNulls;
+    }
+
+    public FunctionDefinition getFunctionDefinition() {
+        return functionDefinition;
+    }
+
+    public boolean isDistinct() {
+        return distinct;
+    }
+
+    public boolean isApproximate() {
+        return approximate;
+    }
+
+    public boolean isIgnoreNulls() {
+        return ignoreNulls;
+    }
+
+    public List<FieldReferenceExpression> getArgs() {
+        return args;
+    }
+
+    @Nullable
+    public CallExpression getFilterExpression() {

Review comment:
       Would be better to return `Optional<CallExpression>`.

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/AggregateExpression.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** Expression for aggregate function which is corresponding to the AggregateCall in Calcite. */
+public class AggregateExpression implements ResolvedExpression {
+
+    private final FunctionDefinition functionDefinition;
+
+    private final List<FieldReferenceExpression> args;
+
+    private final @Nullable CallExpression filterExpression;
+
+    private final DataType resultType;
+
+    private final boolean distinct;
+
+    private final boolean approximate;
+
+    private final boolean ignoreNulls;
+
+    public AggregateExpression(
+            FunctionDefinition functionDefinition,
+            List<FieldReferenceExpression> args,
+            @Nullable CallExpression filterExpression,
+            DataType resultType,
+            boolean distinct,
+            boolean approximate,
+            boolean ignoreNulls) {
+        this.functionDefinition =
+                Preconditions.checkNotNull(
+                        functionDefinition, "Function definition must not be null.");
+        this.args = args;
+        this.filterExpression = filterExpression;
+        this.resultType = resultType;
+        this.distinct = distinct;
+        this.approximate = approximate;
+        this.ignoreNulls = ignoreNulls;
+    }
+
+    public FunctionDefinition getFunctionDefinition() {
+        return functionDefinition;
+    }
+
+    public boolean isDistinct() {
+        return distinct;
+    }
+
+    public boolean isApproximate() {
+        return approximate;
+    }
+
+    public boolean isIgnoreNulls() {
+        return ignoreNulls;
+    }
+
+    public List<FieldReferenceExpression> getArgs() {
+        return args;
+    }
+
+    @Nullable
+    public CallExpression getFilterExpression() {
+        return filterExpression;
+    }
+
+    /**
+     * Returns a string representation of the aggregate function for logging or printing to a
+     * console.
+     */
+    public String getFunctionName() {

Review comment:
       Is this necessary?

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/AggregateExpression.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** Expression for aggregate function which is corresponding to the AggregateCall in Calcite. */
+public class AggregateExpression implements ResolvedExpression {

Review comment:
       Add `@PublicEvolving` annotation. 

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsAggregatePushDown.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.expressions.AggregateExpression;
+import org.apache.flink.table.types.DataType;
+
+import java.util.List;
+
+/**
+ * Enables to push down local aggregates into a {@link ScanTableSource}.
+ *
+ * <p>Given the following SQL:
+ *
+ * <pre>{@code
+ * SELECT sum(a), max(a), b FROM t GROUP BY b;
+ * }</pre>
+ *
+ * <p>In the example above, {@CODE sum(a), max(a)} and {@CODE group by b} are aggregate functions
+ * and grouping sets. The optimized plan will be: TableSourceScan -> LocalHashAggregate -> Exchange
+ * -> HashAggregate -> Calc
+ *
+ * <p>By default, if this interface is not implemented, local aggregates are applied in a subsequent
+ * operation after the source.
+ *
+ * <p>For efficiency, a source can push local aggregates further down in order to reduce the network
+ * and computing overhead. The passed aggregate functions and grouping sets are in the original
+ * order. The downstream storage which has aggregation capability can directly return the aggregated
+ * values to the exchange operator.
+ *
+ * <p>Note: The local aggregate push down strategy is all or nothing, it can only be pushed down if
+ * all aggregate functions are supported.
+ *
+ * <p>A {@link ScanTableSource} extending this interface is able to aggregate records before

Review comment:
       I think we don't need this because we have mentioned what this interface is used for at the beginning. 

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsAggregatePushDown.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.expressions.AggregateExpression;
+import org.apache.flink.table.types.DataType;
+
+import java.util.List;
+
+/**
+ * Enables to push down local aggregates into a {@link ScanTableSource}.
+ *
+ * <p>Given the following SQL:
+ *
+ * <pre>{@code
+ * SELECT sum(a), max(a), b FROM t GROUP BY b;
+ * }</pre>
+ *
+ * <p>In the example above, {@CODE sum(a), max(a)} and {@CODE group by b} are aggregate functions
+ * and grouping sets. The optimized plan will be: TableSourceScan -> LocalHashAggregate -> Exchange
+ * -> HashAggregate -> Calc
+ *
+ * <p>By default, if this interface is not implemented, local aggregates are applied in a subsequent
+ * operation after the source.
+ *
+ * <p>For efficiency, a source can push local aggregates further down in order to reduce the network
+ * and computing overhead. The passed aggregate functions and grouping sets are in the original
+ * order. The downstream storage which has aggregation capability can directly return the aggregated
+ * values to the exchange operator.
+ *
+ * <p>Note: The local aggregate push down strategy is all or nothing, it can only be pushed down if
+ * all aggregate functions are supported.
+ *
+ * <p>A {@link ScanTableSource} extending this interface is able to aggregate records before
+ * returning.

Review comment:
       Add a note at the end:
   
   ```
   <p>Note: currently, the {@link SupportsAggregatePushDown} is not supported by planner.
   ```
   
   And we can update the message to `only supported in batch mode, not supported in streaming mode` in the future. 

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/AggregateExpression.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** Expression for aggregate function which is corresponding to the AggregateCall in Calcite. */
+public class AggregateExpression implements ResolvedExpression {
+
+    private final FunctionDefinition functionDefinition;
+
+    private final List<FieldReferenceExpression> args;
+
+    private final @Nullable CallExpression filterExpression;
+
+    private final DataType resultType;
+
+    private final boolean distinct;
+
+    private final boolean approximate;
+
+    private final boolean ignoreNulls;
+
+    public AggregateExpression(
+            FunctionDefinition functionDefinition,
+            List<FieldReferenceExpression> args,
+            @Nullable CallExpression filterExpression,
+            DataType resultType,
+            boolean distinct,
+            boolean approximate,
+            boolean ignoreNulls) {
+        this.functionDefinition =
+                Preconditions.checkNotNull(
+                        functionDefinition, "Function definition must not be null.");
+        this.args = args;
+        this.filterExpression = filterExpression;
+        this.resultType = resultType;
+        this.distinct = distinct;
+        this.approximate = approximate;
+        this.ignoreNulls = ignoreNulls;
+    }
+
+    public FunctionDefinition getFunctionDefinition() {
+        return functionDefinition;
+    }
+
+    public boolean isDistinct() {
+        return distinct;
+    }
+
+    public boolean isApproximate() {
+        return approximate;
+    }
+
+    public boolean isIgnoreNulls() {
+        return ignoreNulls;
+    }
+
+    public List<FieldReferenceExpression> getArgs() {
+        return args;
+    }
+
+    @Nullable
+    public CallExpression getFilterExpression() {
+        return filterExpression;
+    }
+
+    /**
+     * Returns a string representation of the aggregate function for logging or printing to a
+     * console.
+     */
+    public String getFunctionName() {
+        return functionDefinition.toString();
+    }
+
+    @Override
+    public DataType getOutputDataType() {
+        return resultType;
+    }
+
+    @Override
+    public List<ResolvedExpression> getResolvedChildren() {
+        return Collections.singletonList(this.filterExpression);

Review comment:
       The children should also include `args`.

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/AggregateExpression.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** Expression for aggregate function which is corresponding to the AggregateCall in Calcite. */
+public class AggregateExpression implements ResolvedExpression {
+
+    private final FunctionDefinition functionDefinition;
+
+    private final List<FieldReferenceExpression> args;
+
+    private final @Nullable CallExpression filterExpression;
+
+    private final DataType resultType;
+
+    private final boolean distinct;
+
+    private final boolean approximate;
+
+    private final boolean ignoreNulls;
+
+    public AggregateExpression(
+            FunctionDefinition functionDefinition,
+            List<FieldReferenceExpression> args,
+            @Nullable CallExpression filterExpression,
+            DataType resultType,
+            boolean distinct,
+            boolean approximate,
+            boolean ignoreNulls) {
+        this.functionDefinition =
+                Preconditions.checkNotNull(
+                        functionDefinition, "Function definition must not be null.");
+        this.args = args;
+        this.filterExpression = filterExpression;
+        this.resultType = resultType;
+        this.distinct = distinct;
+        this.approximate = approximate;
+        this.ignoreNulls = ignoreNulls;
+    }
+
+    public FunctionDefinition getFunctionDefinition() {
+        return functionDefinition;
+    }
+
+    public boolean isDistinct() {
+        return distinct;
+    }
+
+    public boolean isApproximate() {
+        return approximate;
+    }
+
+    public boolean isIgnoreNulls() {
+        return ignoreNulls;
+    }
+
+    public List<FieldReferenceExpression> getArgs() {
+        return args;
+    }
+
+    @Nullable
+    public CallExpression getFilterExpression() {
+        return filterExpression;
+    }
+
+    /**
+     * Returns a string representation of the aggregate function for logging or printing to a
+     * console.
+     */
+    public String getFunctionName() {
+        return functionDefinition.toString();
+    }
+
+    @Override
+    public DataType getOutputDataType() {
+        return resultType;
+    }
+
+    @Override
+    public List<ResolvedExpression> getResolvedChildren() {
+        return Collections.singletonList(this.filterExpression);
+    }
+
+    @Override
+    public String asSummaryString() {
+        final String argList =
+                args.stream()
+                        .map(Expression::asSummaryString)
+                        .collect(Collectors.joining(", ", "(", ")"));
+        return getFunctionName() + argList;
+    }
+
+    @Override
+    public List<Expression> getChildren() {
+        return Collections.singletonList(this.filterExpression);
+    }
+
+    @Override
+    public <R> R accept(ExpressionVisitor<R> visitor) {
+        return visitor.visit(this);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        AggregateExpression that = (AggregateExpression) o;
+        return Objects.equals(functionDefinition, that.functionDefinition)
+                && args.equals(that.args)
+                && filterExpression.equals(that.filterExpression)

Review comment:
       This may throw NPE.

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/AggregateExpression.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** Expression for aggregate function which is corresponding to the AggregateCall in Calcite. */

Review comment:
       We can improve the javadoc:
   
   ```
   Resolved and validated expression for calling an aggregate function.
   
   <p>A aggregate call contains:
   
   <ul>
     <li>an output type
     <li>a {@link FunctionDefinition} that identifies the function to be called
     <li>...
   </ul>
   
   <p>Note: currently, the {@link AggregateExpression} is only used in {@link SupportsAggregatePushDown}.
   ```
   
   We shouldn't mention Calcite's AggregateCall, because Calcite is an internal implementation, we shouldn't expose to users, and most users don't know about Calcite. 

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsAggregatePushDown.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.expressions.AggregateExpression;
+import org.apache.flink.table.types.DataType;
+
+import java.util.List;
+
+/**
+ * Enables to push down local aggregates into a {@link ScanTableSource}.
+ *
+ * <p>Given the following SQL:
+ *
+ * <pre>{@code
+ * SELECT sum(a), max(a), b FROM t GROUP BY b;

Review comment:
       Would be better to give a more complex example, e.g. including `COUNT(DISTINCT ..)` and group by multiple fields. 
   
   Also give the original CREATE TABLE statement, to show how the indexes of `groupingSets` mapping to the fields. Most users don't understand what is `groupingSets`. We should add more description for it. 

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsAggregatePushDown.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.expressions.AggregateExpression;
+import org.apache.flink.table.types.DataType;
+
+import java.util.List;
+
+/**
+ * Enables to push down local aggregates into a {@link ScanTableSource}.
+ *
+ * <p>Given the following SQL:
+ *
+ * <pre>{@code
+ * SELECT sum(a), max(a), b FROM t GROUP BY b;
+ * }</pre>
+ *
+ * <p>In the example above, {@CODE sum(a), max(a)} and {@CODE group by b} are aggregate functions
+ * and grouping sets. The optimized plan will be: TableSourceScan -> LocalHashAggregate -> Exchange
+ * -> HashAggregate -> Calc
+ *
+ * <p>By default, if this interface is not implemented, local aggregates are applied in a subsequent
+ * operation after the source.
+ *
+ * <p>For efficiency, a source can push local aggregates further down in order to reduce the network
+ * and computing overhead. The passed aggregate functions and grouping sets are in the original
+ * order. The downstream storage which has aggregation capability can directly return the aggregated
+ * values to the exchange operator.
+ *
+ * <p>Note: The local aggregate push down strategy is all or nothing, it can only be pushed down if
+ * all aggregate functions are supported.
+ *

Review comment:
       Add the following node:
   
   ```
   <p>Regardless if this interface is implemented or not, a final aggregation is also applied in a subsequent
    * operation after the source.
   ```

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/AggregateExpression.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** Expression for aggregate function which is corresponding to the AggregateCall in Calcite. */
+public class AggregateExpression implements ResolvedExpression {
+
+    private final FunctionDefinition functionDefinition;
+
+    private final List<FieldReferenceExpression> args;
+
+    private final @Nullable CallExpression filterExpression;
+
+    private final DataType resultType;
+
+    private final boolean distinct;
+
+    private final boolean approximate;
+
+    private final boolean ignoreNulls;
+
+    public AggregateExpression(
+            FunctionDefinition functionDefinition,
+            List<FieldReferenceExpression> args,
+            @Nullable CallExpression filterExpression,
+            DataType resultType,
+            boolean distinct,
+            boolean approximate,
+            boolean ignoreNulls) {
+        this.functionDefinition =
+                Preconditions.checkNotNull(
+                        functionDefinition, "Function definition must not be null.");
+        this.args = args;
+        this.filterExpression = filterExpression;
+        this.resultType = resultType;
+        this.distinct = distinct;
+        this.approximate = approximate;
+        this.ignoreNulls = ignoreNulls;
+    }
+
+    public FunctionDefinition getFunctionDefinition() {
+        return functionDefinition;
+    }
+
+    public boolean isDistinct() {
+        return distinct;
+    }
+
+    public boolean isApproximate() {
+        return approximate;
+    }
+
+    public boolean isIgnoreNulls() {
+        return ignoreNulls;
+    }
+
+    public List<FieldReferenceExpression> getArgs() {
+        return args;
+    }
+
+    @Nullable
+    public CallExpression getFilterExpression() {
+        return filterExpression;
+    }
+
+    /**
+     * Returns a string representation of the aggregate function for logging or printing to a
+     * console.
+     */
+    public String getFunctionName() {
+        return functionDefinition.toString();
+    }
+
+    @Override
+    public DataType getOutputDataType() {
+        return resultType;
+    }
+
+    @Override
+    public List<ResolvedExpression> getResolvedChildren() {
+        return Collections.singletonList(this.filterExpression);
+    }
+
+    @Override
+    public String asSummaryString() {
+        final String argList =
+                args.stream()
+                        .map(Expression::asSummaryString)
+                        .collect(Collectors.joining(", ", "(", ")"));
+        return getFunctionName() + argList;
+    }
+
+    @Override
+    public List<Expression> getChildren() {
+        return Collections.singletonList(this.filterExpression);

Review comment:
       The children should also include `args`.

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsAggregatePushDown.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.expressions.AggregateExpression;
+import org.apache.flink.table.types.DataType;
+
+import java.util.List;
+
+/**
+ * Enables to push down local aggregates into a {@link ScanTableSource}.
+ *
+ * <p>Given the following SQL:
+ *
+ * <pre>{@code
+ * SELECT sum(a), max(a), b FROM t GROUP BY b;
+ * }</pre>
+ *
+ * <p>In the example above, {@CODE sum(a), max(a)} and {@CODE group by b} are aggregate functions
+ * and grouping sets. The optimized plan will be: TableSourceScan -> LocalHashAggregate -> Exchange
+ * -> HashAggregate -> Calc
+ *
+ * <p>By default, if this interface is not implemented, local aggregates are applied in a subsequent
+ * operation after the source.
+ *
+ * <p>For efficiency, a source can push local aggregates further down in order to reduce the network
+ * and computing overhead. The passed aggregate functions and grouping sets are in the original
+ * order. The downstream storage which has aggregation capability can directly return the aggregated
+ * values to the exchange operator.
+ *
+ * <p>Note: The local aggregate push down strategy is all or nothing, it can only be pushed down if
+ * all aggregate functions are supported.
+ *
+ * <p>A {@link ScanTableSource} extending this interface is able to aggregate records before
+ * returning.
+ */
+@PublicEvolving
+public interface SupportsAggregatePushDown {
+
+    /**
+     * Check and pick all aggregate expressions this table source can support. The passed in
+     * aggregates and grouping sets have been keep in original order.

Review comment:
       ```suggestion
        * Provides a list of aggregate expressions and the grouping keys. The source should pick all the aggregates or nothing and return whether all the aggregates have been pushed down into the source.
   ```

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsAggregatePushDown.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.expressions.AggregateExpression;
+import org.apache.flink.table.types.DataType;
+
+import java.util.List;
+
+/**
+ * Enables to push down local aggregates into a {@link ScanTableSource}.
+ *
+ * <p>Given the following SQL:
+ *
+ * <pre>{@code
+ * SELECT sum(a), max(a), b FROM t GROUP BY b;
+ * }</pre>
+ *
+ * <p>In the example above, {@CODE sum(a), max(a)} and {@CODE group by b} are aggregate functions
+ * and grouping sets. The optimized plan will be: TableSourceScan -> LocalHashAggregate -> Exchange
+ * -> HashAggregate -> Calc
+ *
+ * <p>By default, if this interface is not implemented, local aggregates are applied in a subsequent
+ * operation after the source.
+ *
+ * <p>For efficiency, a source can push local aggregates further down in order to reduce the network
+ * and computing overhead. The passed aggregate functions and grouping sets are in the original
+ * order. The downstream storage which has aggregation capability can directly return the aggregated
+ * values to the exchange operator.

Review comment:
       What is exchange operator? I think what you mean is "The source function can return the aggregated values if the underlying database or storage system has aggregation capability"

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsAggregatePushDown.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.expressions.AggregateExpression;
+import org.apache.flink.table.types.DataType;
+
+import java.util.List;
+
+/**
+ * Enables to push down local aggregates into a {@link ScanTableSource}.
+ *
+ * <p>Given the following SQL:
+ *
+ * <pre>{@code
+ * SELECT sum(a), max(a), b FROM t GROUP BY b;
+ * }</pre>
+ *
+ * <p>In the example above, {@CODE sum(a), max(a)} and {@CODE group by b} are aggregate functions
+ * and grouping sets. The optimized plan will be: TableSourceScan -> LocalHashAggregate -> Exchange
+ * -> HashAggregate -> Calc
+ *
+ * <p>By default, if this interface is not implemented, local aggregates are applied in a subsequent
+ * operation after the source.
+ *
+ * <p>For efficiency, a source can push local aggregates further down in order to reduce the network
+ * and computing overhead. The passed aggregate functions and grouping sets are in the original
+ * order. The downstream storage which has aggregation capability can directly return the aggregated
+ * values to the exchange operator.
+ *
+ * <p>Note: The local aggregate push down strategy is all or nothing, it can only be pushed down if
+ * all aggregate functions are supported.
+ *
+ * <p>A {@link ScanTableSource} extending this interface is able to aggregate records before
+ * returning.
+ */
+@PublicEvolving
+public interface SupportsAggregatePushDown {
+
+    /**
+     * Check and pick all aggregate expressions this table source can support. The passed in
+     * aggregates and grouping sets have been keep in original order.
+     *
+     * <p>Note: The final output data type emitted by the source changes from the original produced
+     * data type to the emitted data type of local aggregate. The passed {@code DataType
+     * producedDataType} is the updated data type for convenience.
+     *
+     * @param groupingSets a array list of the grouping sets.
+     * @param aggregateExpressions a list contains all of aggregates, you should check if all of
+     *     aggregate functions can be processed by downstream system. The applying strategy is all
+     *     or nothing.
+     * @param producedDataType the final output type of the source.
+     * @return Whether all of the aggregates push down succeeds

Review comment:
       ```suggestion
        * @return true if all the aggregates have been pushed down into source, false otherwise.
   ```

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsAggregatePushDown.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.expressions.AggregateExpression;
+import org.apache.flink.table.types.DataType;
+
+import java.util.List;
+
+/**
+ * Enables to push down local aggregates into a {@link ScanTableSource}.
+ *
+ * <p>Given the following SQL:
+ *
+ * <pre>{@code
+ * SELECT sum(a), max(a), b FROM t GROUP BY b;
+ * }</pre>
+ *
+ * <p>In the example above, {@CODE sum(a), max(a)} and {@CODE group by b} are aggregate functions
+ * and grouping sets. The optimized plan will be: TableSourceScan -> LocalHashAggregate -> Exchange
+ * -> HashAggregate -> Calc
+ *
+ * <p>By default, if this interface is not implemented, local aggregates are applied in a subsequent
+ * operation after the source.
+ *
+ * <p>For efficiency, a source can push local aggregates further down in order to reduce the network
+ * and computing overhead. The passed aggregate functions and grouping sets are in the original
+ * order. The downstream storage which has aggregation capability can directly return the aggregated
+ * values to the exchange operator.
+ *
+ * <p>Note: The local aggregate push down strategy is all or nothing, it can only be pushed down if
+ * all aggregate functions are supported.
+ *
+ * <p>A {@link ScanTableSource} extending this interface is able to aggregate records before
+ * returning.
+ */
+@PublicEvolving
+public interface SupportsAggregatePushDown {
+
+    /**
+     * Check and pick all aggregate expressions this table source can support. The passed in
+     * aggregates and grouping sets have been keep in original order.
+     *

Review comment:
       Mention how to construct the output row, that the grouping keys should come first, and then the aggregates result. And give an example for better understanding. 

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsAggregatePushDown.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.expressions.AggregateExpression;
+import org.apache.flink.table.types.DataType;
+
+import java.util.List;
+
+/**
+ * Enables to push down local aggregates into a {@link ScanTableSource}.
+ *
+ * <p>Given the following SQL:
+ *
+ * <pre>{@code
+ * SELECT sum(a), max(a), b FROM t GROUP BY b;
+ * }</pre>
+ *
+ * <p>In the example above, {@CODE sum(a), max(a)} and {@CODE group by b} are aggregate functions
+ * and grouping sets. The optimized plan will be: TableSourceScan -> LocalHashAggregate -> Exchange
+ * -> HashAggregate -> Calc
+ *
+ * <p>By default, if this interface is not implemented, local aggregates are applied in a subsequent
+ * operation after the source.
+ *
+ * <p>For efficiency, a source can push local aggregates further down in order to reduce the network
+ * and computing overhead. The passed aggregate functions and grouping sets are in the original
+ * order. The downstream storage which has aggregation capability can directly return the aggregated
+ * values to the exchange operator.
+ *
+ * <p>Note: The local aggregate push down strategy is all or nothing, it can only be pushed down if
+ * all aggregate functions are supported.
+ *
+ * <p>A {@link ScanTableSource} extending this interface is able to aggregate records before
+ * returning.
+ */
+@PublicEvolving
+public interface SupportsAggregatePushDown {
+
+    /**
+     * Check and pick all aggregate expressions this table source can support. The passed in
+     * aggregates and grouping sets have been keep in original order.
+     *
+     * <p>Note: The final output data type emitted by the source changes from the original produced
+     * data type to the emitted data type of local aggregate. The passed {@code DataType
+     * producedDataType} is the updated data type for convenience.

Review comment:
       ```suggestion
        * <p>Note: Use the passed data type instead of {@link TableSchema#toPhysicalRowDataType()} for
        * describing the final output data type when creating {@link TypeInformation}. The projection of grouping keys and aggregate values is already considered in the
        * given output data type.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org