You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by docete <gi...@git.apache.org> on 2017/01/13 08:28:33 UTC
[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...
GitHub user docete opened a pull request:
https://github.com/apache/flink/pull/3111
[FLINK-3475] [Table] DISTINCT aggregate function support for SQL queries
Copy calcite's AggregateExpandDistinctAggregatesRule to Flink project, and do a quick fix to avoid some bad case mentioned in CALCITE-1558. This rule let Flink support distinct aggregate functions.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/docete/flink master
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3111.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3111
----
commit dc417f27d3d2f3644de7b8873577c5ed6d93230e
Author: Zhenghua Gao <do...@gmail.com>
Date: 2017-01-12T02:33:27Z
[FLINK-3475] DISTINCT aggregate function support for SQL queries
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3111#discussion_r101732097
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala ---
@@ -45,7 +45,7 @@ class DataSetAggregateRule
// check if we have distinct aggregates
val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
if (distinctAggs) {
--- End diff --
the condition can be removed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3111#discussion_r101733413
--- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala ---
@@ -213,34 +213,45 @@ class AggregationsITCase(
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
- @Test(expected = classOf[TableException])
+ @Test
def testDistinctAggregate(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT sum(_1) as a, count(distinct _3) as b FROM MyTable"
- val ds = CollectionDataSets.get3TupleDataSet(env)
- tEnv.registerDataSet("MyTable", ds)
+ val ds = env.fromElements(
--- End diff --
I think it would be good to use a bit more test data here, like on of the `CollectionDataSets`.
ITCases are rather expensive to run, so we should to get the most out of them.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...
Posted by docete <gi...@git.apache.org>.
Github user docete commented on a diff in the pull request:
https://github.com/apache/flink/pull/3111#discussion_r96568781
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/FlinkAggregateExpandDistinctAggregatesRule.java ---
@@ -0,0 +1,1144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet;
+
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.fun.SqlCountAggFunction;
+import org.apache.calcite.sql.fun.SqlMinMaxAggFunction;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.fun.SqlSumAggFunction;
+import org.apache.calcite.sql.fun.SqlSumEmptyIsZeroAggFunction;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Planner rule that expands distinct aggregates
+ * (such as {@code COUNT(DISTINCT x)}) from a
+ * {@link org.apache.calcite.rel.logical.LogicalAggregate}.
+ *
+ * <p>How this is done depends upon the arguments to the function. If all
+ * functions have the same argument
+ * (e.g. {@code COUNT(DISTINCT x), SUM(DISTINCT x)} both have the argument
+ * {@code x}) then one extra {@link org.apache.calcite.rel.core.Aggregate} is
+ * sufficient.
+ *
+ * <p>If there are multiple arguments
+ * (e.g. {@code COUNT(DISTINCT x), COUNT(DISTINCT y)})
+ * the rule creates separate {@code Aggregate}s and combines using a
+ * {@link org.apache.calcite.rel.core.Join}.
+ */
+public final class FlinkAggregateExpandDistinctAggregatesRule extends RelOptRule {
--- End diff --
classes in org.apache.flink.table.calcite pacage are inspired by calcite and stay in flink, and FlinkAggregateExpandDistinctAggregatesRule is copied and would be droped. So I don't think it's a good idea to move this class to it.
I will move the comment to FlinkAggregateExpandDistinctAggregatesRule, and create a jira to track to remove it(recommended by @KurtYoung )
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3111#discussion_r101733431
--- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala ---
@@ -213,34 +213,45 @@ class AggregationsITCase(
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
- @Test(expected = classOf[TableException])
+ @Test
def testDistinctAggregate(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT sum(_1) as a, count(distinct _3) as b FROM MyTable"
- val ds = CollectionDataSets.get3TupleDataSet(env)
- tEnv.registerDataSet("MyTable", ds)
+ val ds = env.fromElements(
+ (1, 1L, 1.0f, "Hello"),
+ (2, 2L, 1.0f, "Ciao")).toTable(tEnv)
+ tEnv.registerTable("MyTable", ds)
- // must fail. distinct aggregates are not supported
- tEnv.sql(sqlQuery).toDataSet[Row]
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "3,1"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
- @Test(expected = classOf[TableException])
+ @Test
def testGroupedDistinctAggregate(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT _2, avg(distinct _1) as a, count(_3) as b FROM MyTable GROUP BY _2"
- val ds = CollectionDataSets.get3TupleDataSet(env)
- tEnv.registerDataSet("MyTable", ds)
+ val ds = env.fromElements(
--- End diff --
I think it would be good to use a bit more test data here, like on of the `CollectionDataSets`.
ITCases are rather expensive to run, so we should to get the most out of them.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink issue #3111: [FLINK-3475] [Table] DISTINCT aggregate function support ...
Posted by docete <gi...@git.apache.org>.
Github user docete commented on the issue:
https://github.com/apache/flink/pull/3111
@fhueske Fixed according to your comments
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink issue #3111: [FLINK-3475] [Table] DISTINCT aggregate function support ...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:
https://github.com/apache/flink/pull/3111
@docete, did you check if the execution plan is similar to what I outlined in the related JIRA issue [FLINK-3475](https://issues.apache.org/jira/browse/FLINK-3475)?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink issue #3111: [FLINK-3475] [Table] DISTINCT aggregate function support ...
Posted by docete <gi...@git.apache.org>.
Github user docete commented on the issue:
https://github.com/apache/flink/pull/3111
@fhueske Yes, I have checked the execution plan. It's very similar to your description:
Take example for SQL "select sum(distinct a), sum(distinct b), sum(c) from expr", where expr is a table, and it has 3 fields: a, b, c.
The explaination for the query is:
```
== Abstract Syntax Tree ==
LogicalAggregate(group=[{}], EXPR$0=[SUM(DISTINCT $0)], EXPR$1=[SUM(DISTINCT $1)], EXPR$2=[SUM($2)])
LogicalTableScan(table=[[expr]])
== Optimized Logical Plan ==
DataSetCalc(select=[EXPR$0, EXPR$1, EXPR$2])
DataSetSingleRowJoin(where=[true], join=[EXPR$2, EXPR$0, EXPR$1], joinType=[NestedLoopJoin])
DataSetSingleRowJoin(where=[true], join=[EXPR$2, EXPR$0], joinType=[NestedLoopJoin])
DataSetAggregate(select=[SUM(c) AS EXPR$2])
DataSetUnion(union=[a, b, c])
DataSetValues(tuples=[[{ null, null, null }]], values=[a, b, c])
DataSetScan(table=[[_DataSetTable_0]])
DataSetAggregate(select=[SUM(a) AS EXPR$0])
DataSetUnion(union=[a])
DataSetValues(tuples=[[{ null }]], values=[a])
DataSetAggregate(groupBy=[a], select=[a])
DataSetCalc(select=[a])
DataSetScan(table=[[_DataSetTable_0]])
DataSetAggregate(select=[SUM(b) AS EXPR$1])
DataSetUnion(union=[b])
DataSetValues(tuples=[[{ null }]], values=[b])
DataSetAggregate(groupBy=[b], select=[b])
DataSetCalc(select=[b])
DataSetScan(table=[[_DataSetTable_0]])
== Physical Execution Plan ==
Stage 8 : Data Source
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED
Stage 14 : Data Source
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED
Stage 13 : Map
content : from: (a, b, c)
ship_strategy : Forward
exchange_mode : BATCH
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 12 : FlatMap
content : select: (a)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : FlatMap
Partitioning : RANDOM_PARTITIONED
Stage 11 : Map
content : prepare select: (a)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 10 : GroupCombine
content : groupBy: (a), select: (a)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Sorted Combine
Partitioning : RANDOM_PARTITIONED
Stage 9 : GroupReduce
content : groupBy: (a), select: (a)
ship_strategy : Hash Partition on [0]
exchange_mode : PIPELINED
driver_strategy : Sorted Group Reduce
Partitioning : RANDOM_PARTITIONED
Stage 7 : Union
content :
ship_strategy : Redistribute
exchange_mode : PIPELINED
Partitioning : RANDOM_PARTITIONED
Stage 6 : Map
content : prepare select: (SUM(a) AS EXPR$0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 5 : GroupCombine
content : select:(SUM(a) AS EXPR$0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Group Reduce All
Partitioning : RANDOM_PARTITIONED
Stage 4 : GroupReduce
content : select:(SUM(a) AS EXPR$0)
ship_strategy : Redistribute
exchange_mode : PIPELINED
driver_strategy : Group Reduce All
Partitioning : RANDOM_PARTITIONED
Stage 19 : Data Source
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED
Stage 20 : Map
content : from: (a, b, c)
ship_strategy : Forward
exchange_mode : BATCH
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 18 : Union
content :
ship_strategy : Redistribute
exchange_mode : PIPELINED
Partitioning : RANDOM_PARTITIONED
Stage 17 : Map
content : prepare select: (SUM(c) AS EXPR$2)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 16 : GroupCombine
content : select:(SUM(c) AS EXPR$2)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Group Reduce All
Partitioning : RANDOM_PARTITIONED
Stage 15 : GroupReduce
content : select:(SUM(c) AS EXPR$2)
ship_strategy : Redistribute
exchange_mode : PIPELINED
driver_strategy : Group Reduce All
Partitioning : RANDOM_PARTITIONED
Stage 3 : FlatMap
content : where: (true), join: (EXPR$2, EXPR$0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : FlatMap
Partitioning : RANDOM_PARTITIONED
Stage 25 : Data Source
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED
Stage 30 : Map
content : from: (a, b, c)
ship_strategy : Forward
exchange_mode : BATCH
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 29 : FlatMap
content : select: (b)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : FlatMap
Partitioning : RANDOM_PARTITIONED
Stage 28 : Map
content : prepare select: (b)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 27 : GroupCombine
content : groupBy: (b), select: (b)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Sorted Combine
Partitioning : RANDOM_PARTITIONED
Stage 26 : GroupReduce
content : groupBy: (b), select: (b)
ship_strategy : Hash Partition on [0]
exchange_mode : PIPELINED
driver_strategy : Sorted Group Reduce
Partitioning : RANDOM_PARTITIONED
Stage 24 : Union
content :
ship_strategy : Redistribute
exchange_mode : PIPELINED
Partitioning : RANDOM_PARTITIONED
Stage 23 : Map
content : prepare select: (SUM(b) AS EXPR$1)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 22 : GroupCombine
content : select:(SUM(b) AS EXPR$1)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Group Reduce All
Partitioning : RANDOM_PARTITIONED
Stage 21 : GroupReduce
content : select:(SUM(b) AS EXPR$1)
ship_strategy : Redistribute
exchange_mode : PIPELINED
driver_strategy : Group Reduce All
Partitioning : RANDOM_PARTITIONED
Stage 2 : FlatMap
content : where: (true), join: (EXPR$2, EXPR$0, EXPR$1)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : FlatMap
Partitioning : RANDOM_PARTITIONED
Stage 1 : FlatMap
content : select: (EXPR$0, EXPR$1, EXPR$2)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : FlatMap
Partitioning : RANDOM_PARTITIONED
Stage 0 : Data Sink
content : org.apache.flink.api.java.io.DiscardingOutputFormat
ship_strategy : Forward
exchange_mode : PIPELINED
Partitioning : RANDOM_PARTITIONED
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3111#discussion_r103165057
--- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/rules/dataSet/FlinkAggregateExpandDistinctAggregatesRule.java ---
@@ -0,0 +1,1152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet;
--- End diff --
Move to `org.apache.flink.table.calcite.rules` next to `FlinkAggregateJoinTransposeRule`
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink issue #3111: [FLINK-3475] [Table] DISTINCT aggregate function support ...
Posted by docete <gi...@git.apache.org>.
Github user docete commented on the issue:
https://github.com/apache/flink/pull/3111
Agree. If we have more than one distinct agg with groupings, do the partition first and reuse the subsets would improve the performance.
Could we merge this PR first and create another JIRA to track the grouping cases?
We need a workaround to support distinct aggs ASAP.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...
Posted by docete <gi...@git.apache.org>.
Github user docete commented on a diff in the pull request:
https://github.com/apache/flink/pull/3111#discussion_r96628698
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala ---
@@ -96,6 +96,13 @@ object FlinkRuleSets {
ProjectToCalcRule.INSTANCE,
CalcMergeRule.INSTANCE,
+ // distinct aggregate rule for FLINK-3475
--- End diff --
Done. The tracking jira is https://issues.apache.org/jira/browse/FLINK-5545
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3111#discussion_r101735785
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/FlinkAggregateExpandDistinctAggregatesRule.java ---
@@ -0,0 +1,1144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet;
+
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.fun.SqlCountAggFunction;
+import org.apache.calcite.sql.fun.SqlMinMaxAggFunction;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.fun.SqlSumAggFunction;
+import org.apache.calcite.sql.fun.SqlSumEmptyIsZeroAggFunction;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Planner rule that expands distinct aggregates
+ * (such as {@code COUNT(DISTINCT x)}) from a
+ * {@link org.apache.calcite.rel.logical.LogicalAggregate}.
+ *
+ * <p>How this is done depends upon the arguments to the function. If all
+ * functions have the same argument
+ * (e.g. {@code COUNT(DISTINCT x), SUM(DISTINCT x)} both have the argument
+ * {@code x}) then one extra {@link org.apache.calcite.rel.core.Aggregate} is
+ * sufficient.
+ *
+ * <p>If there are multiple arguments
+ * (e.g. {@code COUNT(DISTINCT x), COUNT(DISTINCT y)})
+ * the rule creates separate {@code Aggregate}s and combines using a
+ * {@link org.apache.calcite.rel.core.Join}.
+ */
+public final class FlinkAggregateExpandDistinctAggregatesRule extends RelOptRule {
--- End diff --
This is a Java class and should be in the Java source folder `./src/main/java`.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink issue #3111: [FLINK-3475] [Table] DISTINCT aggregate function support ...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:
https://github.com/apache/flink/pull/3111
Looks good to me. I would remove the ITCases. Logical tests should be sufficient.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...
Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:
https://github.com/apache/flink/pull/3111#discussion_r96160329
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/FlinkAggregateExpandDistinctAggregatesRule.java ---
@@ -0,0 +1,1144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet;
+
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.fun.SqlCountAggFunction;
+import org.apache.calcite.sql.fun.SqlMinMaxAggFunction;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.fun.SqlSumAggFunction;
+import org.apache.calcite.sql.fun.SqlSumEmptyIsZeroAggFunction;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Planner rule that expands distinct aggregates
+ * (such as {@code COUNT(DISTINCT x)}) from a
+ * {@link org.apache.calcite.rel.logical.LogicalAggregate}.
+ *
+ * <p>How this is done depends upon the arguments to the function. If all
+ * functions have the same argument
+ * (e.g. {@code COUNT(DISTINCT x), SUM(DISTINCT x)} both have the argument
+ * {@code x}) then one extra {@link org.apache.calcite.rel.core.Aggregate} is
+ * sufficient.
+ *
+ * <p>If there are multiple arguments
+ * (e.g. {@code COUNT(DISTINCT x), COUNT(DISTINCT y)})
+ * the rule creates separate {@code Aggregate}s and combines using a
+ * {@link org.apache.calcite.rel.core.Join}.
+ */
+public final class FlinkAggregateExpandDistinctAggregatesRule extends RelOptRule {
--- End diff --
I would like to move this class to `org.apache.flink.table.calcite` package, and add a comment to the top of the class to annotate this is a temporary solution and should be removed later, such as
>This is a copy of Calcite's [[AggregateExpandDistinctAggregatesRule]] with a quick fix to avoid some bad case mentioned in CALCITE-1558. Should drop it and use calcite's [[AggregateExpandDistinctAggregatesRule]] when upgrade to calcite 1.12 (above).
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink issue #3111: [FLINK-3475] [Table] DISTINCT aggregate function support ...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:
https://github.com/apache/flink/pull/3111
Merging
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...
Posted by KurtYoung <gi...@git.apache.org>.
Github user KurtYoung commented on a diff in the pull request:
https://github.com/apache/flink/pull/3111#discussion_r96329552
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala ---
@@ -96,6 +96,13 @@ object FlinkRuleSets {
ProjectToCalcRule.INSTANCE,
CalcMergeRule.INSTANCE,
+ // distinct aggregate rule for FLINK-3475
--- End diff --
I think this comment can move to `FlinkAggregateExpandDistinctAggregatesRule`, and open a jira to track to remove this class after we upgrade Calcite.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3111#discussion_r103169886
--- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala ---
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.sql
+
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class DistinctAggregateTest extends TableTestBase {
+
+ @Test
+ def testSingleDistinctAggregate(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT COUNT(DISTINCT a) FROM MyTable"
+
+ val expected = unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a")
+ ),
+ term("groupBy", "a"),
+ term("select", "a")
+ ),
+ tuples(List(null)),
+ term("values", "a")
+ ),
+ term("union", "a")
+ ),
+ term("select", "COUNT(a) AS EXPR$0")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testMultiDistinctAggregateOnSameColumn(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM MyTable"
+
+ val expected = unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a")
+ ),
+ term("groupBy", "a"),
+ term("select", "a")
+ ),
+ tuples(List(null)),
+ term("values", "a")
+ ),
+ term("union", "a")
+ ),
+ term("select", "COUNT(a) AS EXPR$0", "SUM(a) AS EXPR$1", "MAX(a) AS EXPR$2")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testSingleDistinctAggregateAndOneOrMultiNonDistinctAggregate(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ // case 0x00: DISTINCT on COUNT and Non-DISTINCT on others
+ val sqlQuery0 = "SELECT COUNT(DISTINCT a), SUM(b) FROM MyTable"
+
+ val expected0 = unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b")
+ ),
+ term("groupBy", "a"),
+ term("select", "a", "SUM(b) AS EXPR$1")
+ ),
+ tuples(List(null, null)),
+ term("values", "a", "EXPR$1")
+ ),
+ term("union", "a", "EXPR$1")
+ ),
+ term("select", "COUNT(a) AS EXPR$0", "SUM(EXPR$1) AS EXPR$1")
+ )
+
+ util.verifySql(sqlQuery0, expected0)
+
+ // case 0x01: Non-DISTINCT on COUNT and DISTINCT on others
+ val sqlQuery1 = "SELECT COUNT(a), SUM(DISTINCT b) FROM MyTable"
+
+ val expected1 = unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b")
+ ),
+ term("groupBy", "b"),
+ term("select", "b", "COUNT(a) AS EXPR$0")
+ ),
+ tuples(List(null, null)),
+ term("values", "b", "EXPR$0")
+ ),
+ term("union", "b", "EXPR$0")
+ ),
+ term("select", "$SUM0(EXPR$0) AS EXPR$0", "SUM(b) AS EXPR$1")
+ )
+
+ util.verifySql(sqlQuery1, expected1)
+ }
+
+ @Test
+ def testMultiDistinctAggregateOnDifferentColumn(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT b) FROM MyTable"
--- End diff --
Add another test with two distinct aggregates on different attributes and a non-distinct aggregate.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/3111#discussion_r97295864
--- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala ---
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.sql
+
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class DistinctAggregateTest extends TableTestBase {
+
+ @Test
+ def testSingleDistinctAggregate(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT COUNT(DISTINCT a) FROM MyTable"
+
+ val expected = unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a")
+ ),
+ term("groupBy", "a"),
+ term("select", "a")
+ ),
+ tuples(List(null)),
+ term("values", "a")
+ ),
+ term("union", "a")
+ ),
+ term("select", "COUNT(a) AS EXPR$0")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testMultiDistinctAggregateOnSameColumn(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM MyTable"
+
+ val expected = unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a")
+ ),
+ term("groupBy", "a"),
+ term("select", "a")
+ ),
+ tuples(List(null)),
+ term("values", "a")
+ ),
+ term("union", "a")
+ ),
+ term("select", "COUNT(a) AS EXPR$0", "SUM(a) AS EXPR$1", "MAX(a) AS EXPR$2")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testSingleDistinctAggregateAndOneOrMultiNonDistinctAggregate(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ // case 0x00: DISTINCT on COUNT and Non-DISTINCT on others
+ val sqlQuery0 = "SELECT COUNT(DISTINCT a), SUM(b) FROM MyTable"
+
+ val expected0 = unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b")
+ ),
+ term("groupBy", "a"),
+ term("select", "a", "SUM(b) AS EXPR$1")
+ ),
+ tuples(List(null, null)),
+ term("values", "a", "EXPR$1")
+ ),
+ term("union", "a", "EXPR$1")
+ ),
+ term("select", "COUNT(a) AS EXPR$0", "SUM(EXPR$1) AS EXPR$1")
+ )
+
+ util.verifySql(sqlQuery0, expected0)
+
+ // case 0x01: Non-DISTINCT on COUNT and DISTINCT on others
+ val sqlQuery1 = "SELECT COUNT(a), SUM(DISTINCT b) FROM MyTable"
+
+ val expected1 = unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b")
+ ),
+ term("groupBy", "b"),
+ term("select", "b", "COUNT(a) AS EXPR$0")
+ ),
+ tuples(List(null, null)),
+ term("values", "b", "EXPR$0")
+ ),
+ term("union", "b", "EXPR$0")
+ ),
+ term("select", "$SUM0(EXPR$0) AS EXPR$0", "SUM(b) AS EXPR$1")
+ )
+
+ util.verifySql(sqlQuery1, expected1)
+ }
+
+ @Test
+ def testMultiDistinctAggregateOnDifferentColumn(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT b) FROM MyTable"
+
+ val expected = binaryNode(
+ "DataSetSingleRowJoin",
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a")
+ ),
+ term("groupBy", "a"),
+ term("select", "a")
+ ),
+ tuples(List(null)),
+ term("values", "a")
+ ),
+ term("union", "a")
+ ),
+ term("select", "COUNT(a) AS EXPR$0")
+ ),
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "b")
+ ),
+ term("groupBy", "b"),
+ term("select", "b")
+ ),
+ tuples(List(null)),
+ term("values", "b")
+ ),
+ term("union", "b")
+ ),
+ term("select", "SUM(b) AS EXPR$1")
+ ),
+ term("where", "true"),
+ term("join", "EXPR$0", "EXPR$1"),
+ term("joinType", "NestedLoopJoin")
--- End diff --
This join type is not supported. It would fail during execution.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3111#discussion_r101732154
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala ---
@@ -51,7 +51,7 @@ class DataSetAggregateWithNullValuesRule
// check if we have distinct aggregates
val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
if (distinctAggs) {
--- End diff --
The condition can be removed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...
Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:
https://github.com/apache/flink/pull/3111#discussion_r96160082
--- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala ---
@@ -165,9 +165,13 @@ class AggregationTest extends TableTestBase {
val util = batchTestUtil()
val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
- val resultTable = sourceTable.groupBy('a)
+ // Move "where" before "groupBy" for the former query would generate
+ // nondeterministic plans with same cost. If we change FlinkRuleSets.DATASET_OPT_RULES,
+ // the importance of relNode may change, and the test may fail. This issue is mentioned
+ // in FLINK-5394, we could move "where" to the end when FLINK-5394 is fixed.
+ val resultTable = sourceTable.where('a === 1).groupBy('a)
--- End diff --
It might make sense to wait with this until #3058 is in. It is almost done I think.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3111#discussion_r103170129
--- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala ---
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.sql
+
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class DistinctAggregateTest extends TableTestBase {
+
+ @Test
+ def testSingleDistinctAggregate(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT COUNT(DISTINCT a) FROM MyTable"
+
+ val expected = unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a")
+ ),
+ term("groupBy", "a"),
+ term("select", "a")
+ ),
+ tuples(List(null)),
+ term("values", "a")
+ ),
+ term("union", "a")
+ ),
+ term("select", "COUNT(a) AS EXPR$0")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testMultiDistinctAggregateOnSameColumn(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM MyTable"
+
+ val expected = unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a")
+ ),
+ term("groupBy", "a"),
+ term("select", "a")
+ ),
+ tuples(List(null)),
+ term("values", "a")
+ ),
+ term("union", "a")
+ ),
+ term("select", "COUNT(a) AS EXPR$0", "SUM(a) AS EXPR$1", "MAX(a) AS EXPR$2")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testSingleDistinctAggregateAndOneOrMultiNonDistinctAggregate(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ // case 0x00: DISTINCT on COUNT and Non-DISTINCT on others
+ val sqlQuery0 = "SELECT COUNT(DISTINCT a), SUM(b) FROM MyTable"
+
+ val expected0 = unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b")
+ ),
+ term("groupBy", "a"),
+ term("select", "a", "SUM(b) AS EXPR$1")
+ ),
+ tuples(List(null, null)),
+ term("values", "a", "EXPR$1")
+ ),
+ term("union", "a", "EXPR$1")
+ ),
+ term("select", "COUNT(a) AS EXPR$0", "SUM(EXPR$1) AS EXPR$1")
+ )
+
+ util.verifySql(sqlQuery0, expected0)
+
+ // case 0x01: Non-DISTINCT on COUNT and DISTINCT on others
+ val sqlQuery1 = "SELECT COUNT(a), SUM(DISTINCT b) FROM MyTable"
+
+ val expected1 = unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b")
+ ),
+ term("groupBy", "b"),
+ term("select", "b", "COUNT(a) AS EXPR$0")
+ ),
+ tuples(List(null, null)),
+ term("values", "b", "EXPR$0")
+ ),
+ term("union", "b", "EXPR$0")
+ ),
+ term("select", "$SUM0(EXPR$0) AS EXPR$0", "SUM(b) AS EXPR$1")
+ )
+
+ util.verifySql(sqlQuery1, expected1)
+ }
+
+ @Test
+ def testMultiDistinctAggregateOnDifferentColumn(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT b) FROM MyTable"
+
+ val expected = binaryNode(
+ "DataSetSingleRowJoin",
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a")
+ ),
+ term("groupBy", "a"),
+ term("select", "a")
+ ),
+ tuples(List(null)),
+ term("values", "a")
+ ),
+ term("union", "a")
+ ),
+ term("select", "COUNT(a) AS EXPR$0")
+ ),
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "b")
+ ),
+ term("groupBy", "b"),
+ term("select", "b")
+ ),
+ tuples(List(null)),
+ term("values", "b")
+ ),
+ term("union", "b")
+ ),
+ term("select", "SUM(b) AS EXPR$1")
+ ),
+ term("where", "true"),
+ term("join", "EXPR$0", "EXPR$1"),
+ term("joinType", "NestedLoopJoin")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testSingleDistinctAggregateWithGrouping(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT a, COUNT(a), SUM(DISTINCT b) FROM MyTable GROUP BY a"
+
+ val expected = unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b")
+ ),
+ term("groupBy", "a", "b"),
+ term("select", "a", "b", "COUNT(a) AS EXPR$1")
+ ),
+ term("groupBy", "a"),
+ term("select", "a", "SUM(EXPR$1) AS EXPR$1", "SUM(b) AS EXPR$2")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testSingleDistinctAggregateWithGroupingAndCountStar(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT a, COUNT(*), SUM(DISTINCT b) FROM MyTable GROUP BY a"
--- End diff --
Add two tests for `GROUP BY` with two distinct aggregates
1. on same column
2. on different column
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink issue #3111: [FLINK-3475] [Table] DISTINCT aggregate function support ...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:
https://github.com/apache/flink/pull/3111
Hi @docete,
thanks for posting the plan. It looks OK, but this is because the query computes non-grouped aggregates.
In case of grouped aggregates, the resulting plan will be less efficient than the plan I proposed because the it will not reuse existing partitioning and sorting properties of the data. This will result in at least one shuffle and one full sort for each distinct aggregate.
The trick is to explicitly partition the data for the distinct operation on a subset (i.e., the grouping keys) of the attributes that usually would be used. The following aggregations and joins can be performed in a streaming fashion without partitioning or sorting the data again. This is not possible with your plan.
We could try to implement that with some tweaking as an optimization rule (which would be custom and based on the rule you copied from Calcite) or implement it as a dedicated `DataSetRelNode` for distinct aggregates. I'm more in favor of the latter option.
Once, the optimizer tracks physical data properties, it is easier to implement distinct aggregates using optimizer rules.
What do you think?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/3111
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...
Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:
https://github.com/apache/flink/pull/3111#discussion_r96572366
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/FlinkAggregateExpandDistinctAggregatesRule.java ---
@@ -0,0 +1,1144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet;
+
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.fun.SqlCountAggFunction;
+import org.apache.calcite.sql.fun.SqlMinMaxAggFunction;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.fun.SqlSumAggFunction;
+import org.apache.calcite.sql.fun.SqlSumEmptyIsZeroAggFunction;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Planner rule that expands distinct aggregates
+ * (such as {@code COUNT(DISTINCT x)}) from a
+ * {@link org.apache.calcite.rel.logical.LogicalAggregate}.
+ *
+ * <p>How this is done depends upon the arguments to the function. If all
+ * functions have the same argument
+ * (e.g. {@code COUNT(DISTINCT x), SUM(DISTINCT x)} both have the argument
+ * {@code x}) then one extra {@link org.apache.calcite.rel.core.Aggregate} is
+ * sufficient.
+ *
+ * <p>If there are multiple arguments
+ * (e.g. {@code COUNT(DISTINCT x), COUNT(DISTINCT y)})
+ * the rule creates separate {@code Aggregate}s and combines using a
+ * {@link org.apache.calcite.rel.core.Join}.
+ */
+public final class FlinkAggregateExpandDistinctAggregatesRule extends RelOptRule {
--- End diff --
Thank you. That's fine to me.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink issue #3111: [FLINK-3475] [Table] DISTINCT aggregate function support ...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:
https://github.com/apache/flink/pull/3111
Thanks for the PR @docete.
I'll have a look at it soon.
Best, Fabian
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...
Posted by docete <gi...@git.apache.org>.
Github user docete commented on a diff in the pull request:
https://github.com/apache/flink/pull/3111#discussion_r96568191
--- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala ---
@@ -165,9 +165,13 @@ class AggregationTest extends TableTestBase {
val util = batchTestUtil()
val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
- val resultTable = sourceTable.groupBy('a)
+ // Move "where" before "groupBy" for the former query would generate
+ // nondeterministic plans with same cost. If we change FlinkRuleSets.DATASET_OPT_RULES,
+ // the importance of relNode may change, and the test may fail. This issue is mentioned
+ // in FLINK-5394, we could move "where" to the end when FLINK-5394 is fixed.
+ val resultTable = sourceTable.where('a === 1).groupBy('a)
--- End diff --
I will check the finish time for #3058 with @beyond1920
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---