You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by docete <gi...@git.apache.org> on 2017/01/13 08:28:33 UTC

[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...

GitHub user docete opened a pull request:

    https://github.com/apache/flink/pull/3111

    [FLINK-3475] [Table] DISTINCT aggregate function support for SQL queries

    Copy calcite's AggregateExpandDistinctAggregatesRule to Flink project, and do a quick fix to avoid some bad case mentioned in CALCITE-1558. This rule let Flink support distinct aggregate functions.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/docete/flink master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3111.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3111
    
----
commit dc417f27d3d2f3644de7b8873577c5ed6d93230e
Author: Zhenghua Gao <do...@gmail.com>
Date:   2017-01-12T02:33:27Z

    [FLINK-3475] DISTINCT aggregate function support for SQL queries

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3111#discussion_r101732097
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala ---
    @@ -45,7 +45,7 @@ class DataSetAggregateRule
         // check if we have distinct aggregates
         val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
         if (distinctAggs) {
    --- End diff --
    
    the condition can be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3111#discussion_r101733413
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala ---
    @@ -213,34 +213,45 @@ class AggregationsITCase(
         TestBaseUtils.compareResultAsText(results.asJava, expected)
       }
     
    -  @Test(expected = classOf[TableException])
    +  @Test
       def testDistinctAggregate(): Unit = {
     
         val env = ExecutionEnvironment.getExecutionEnvironment
         val tEnv = TableEnvironment.getTableEnvironment(env, config)
     
         val sqlQuery = "SELECT sum(_1) as a, count(distinct _3) as b FROM MyTable"
     
    -    val ds = CollectionDataSets.get3TupleDataSet(env)
    -    tEnv.registerDataSet("MyTable", ds)
    +    val ds = env.fromElements(
    --- End diff --
    
    I think it would be good to use a bit more test data here, like on of the `CollectionDataSets`.
    ITCases are rather expensive to run, so we should to get the most out of them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...

Posted by docete <gi...@git.apache.org>.
Github user docete commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3111#discussion_r96568781
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/FlinkAggregateExpandDistinctAggregatesRule.java ---
    @@ -0,0 +1,1144 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to you under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.dataSet;
    +
    +import org.apache.calcite.plan.Contexts;
    +import org.apache.calcite.plan.RelOptCluster;
    +import org.apache.calcite.plan.RelOptRule;
    +import org.apache.calcite.plan.RelOptRuleCall;
    +import org.apache.calcite.rel.RelNode;
    +import org.apache.calcite.rel.core.Aggregate;
    +import org.apache.calcite.rel.core.AggregateCall;
    +import org.apache.calcite.rel.core.JoinRelType;
    +import org.apache.calcite.rel.core.RelFactories;
    +import org.apache.calcite.rel.logical.LogicalAggregate;
    +import org.apache.calcite.rel.type.RelDataType;
    +import org.apache.calcite.rel.type.RelDataTypeFactory;
    +import org.apache.calcite.rel.type.RelDataTypeField;
    +import org.apache.calcite.rex.RexBuilder;
    +import org.apache.calcite.rex.RexInputRef;
    +import org.apache.calcite.rex.RexNode;
    +import org.apache.calcite.sql.SqlAggFunction;
    +import org.apache.calcite.sql.fun.SqlCountAggFunction;
    +import org.apache.calcite.sql.fun.SqlMinMaxAggFunction;
    +import org.apache.calcite.sql.fun.SqlStdOperatorTable;
    +import org.apache.calcite.sql.fun.SqlSumAggFunction;
    +import org.apache.calcite.sql.fun.SqlSumEmptyIsZeroAggFunction;
    +import org.apache.calcite.sql.type.SqlTypeName;
    +import org.apache.calcite.tools.RelBuilder;
    +import org.apache.calcite.tools.RelBuilderFactory;
    +import org.apache.calcite.util.ImmutableBitSet;
    +import org.apache.calcite.util.ImmutableIntList;
    +import org.apache.calcite.util.Pair;
    +import org.apache.calcite.util.Util;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.Iterables;
    +import com.google.common.collect.Lists;
    +
    +import java.math.BigDecimal;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.LinkedHashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.SortedSet;
    +import java.util.TreeSet;
    +
    +/**
    + * Planner rule that expands distinct aggregates
    + * (such as {@code COUNT(DISTINCT x)}) from a
    + * {@link org.apache.calcite.rel.logical.LogicalAggregate}.
    + *
    + * <p>How this is done depends upon the arguments to the function. If all
    + * functions have the same argument
    + * (e.g. {@code COUNT(DISTINCT x), SUM(DISTINCT x)} both have the argument
    + * {@code x}) then one extra {@link org.apache.calcite.rel.core.Aggregate} is
    + * sufficient.
    + *
    + * <p>If there are multiple arguments
    + * (e.g. {@code COUNT(DISTINCT x), COUNT(DISTINCT y)})
    + * the rule creates separate {@code Aggregate}s and combines using a
    + * {@link org.apache.calcite.rel.core.Join}.
    + */
    +public final class FlinkAggregateExpandDistinctAggregatesRule extends RelOptRule {
    --- End diff --
    
    classes in org.apache.flink.table.calcite pacage are inspired by calcite and stay in flink, and FlinkAggregateExpandDistinctAggregatesRule is copied and would be droped. So I don't think it's a good idea to move this class to it.
    I will move the comment to FlinkAggregateExpandDistinctAggregatesRule, and create a jira to track to remove it(recommended by @KurtYoung ) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3111#discussion_r101733431
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala ---
    @@ -213,34 +213,45 @@ class AggregationsITCase(
         TestBaseUtils.compareResultAsText(results.asJava, expected)
       }
     
    -  @Test(expected = classOf[TableException])
    +  @Test
       def testDistinctAggregate(): Unit = {
     
         val env = ExecutionEnvironment.getExecutionEnvironment
         val tEnv = TableEnvironment.getTableEnvironment(env, config)
     
         val sqlQuery = "SELECT sum(_1) as a, count(distinct _3) as b FROM MyTable"
     
    -    val ds = CollectionDataSets.get3TupleDataSet(env)
    -    tEnv.registerDataSet("MyTable", ds)
    +    val ds = env.fromElements(
    +      (1, 1L, 1.0f, "Hello"),
    +      (2, 2L, 1.0f, "Ciao")).toTable(tEnv)
    +    tEnv.registerTable("MyTable", ds)
     
    -    // must fail. distinct aggregates are not supported
    -    tEnv.sql(sqlQuery).toDataSet[Row]
    +    val result = tEnv.sql(sqlQuery)
    +
    +    val expected = "3,1"
    +    val results = result.toDataSet[Row].collect()
    +    TestBaseUtils.compareResultAsText(results.asJava, expected)
       }
     
    -  @Test(expected = classOf[TableException])
    +  @Test
       def testGroupedDistinctAggregate(): Unit = {
     
         val env = ExecutionEnvironment.getExecutionEnvironment
         val tEnv = TableEnvironment.getTableEnvironment(env, config)
     
         val sqlQuery = "SELECT _2, avg(distinct _1) as a, count(_3) as b FROM MyTable GROUP BY _2"
     
    -    val ds = CollectionDataSets.get3TupleDataSet(env)
    -    tEnv.registerDataSet("MyTable", ds)
    +    val ds = env.fromElements(
    --- End diff --
    
    I think it would be good to use a bit more test data here, like on of the `CollectionDataSets`.
    ITCases are rather expensive to run, so we should to get the most out of them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3111: [FLINK-3475] [Table] DISTINCT aggregate function support ...

Posted by docete <gi...@git.apache.org>.
Github user docete commented on the issue:

    https://github.com/apache/flink/pull/3111
  
    @fhueske Fixed according to your comments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3111: [FLINK-3475] [Table] DISTINCT aggregate function support ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3111
  
    @docete, did you check if the execution plan is similar to what I outlined in the related JIRA issue [FLINK-3475](https://issues.apache.org/jira/browse/FLINK-3475)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3111: [FLINK-3475] [Table] DISTINCT aggregate function support ...

Posted by docete <gi...@git.apache.org>.
Github user docete commented on the issue:

    https://github.com/apache/flink/pull/3111
  
    @fhueske Yes, I have checked the execution plan. It's very similar to your description:
    
    Take example for SQL "select sum(distinct a), sum(distinct b), sum(c) from expr", where expr is a table, and it has 3 fields: a, b, c.
    
    The explaination for the query is:
    ```
    == Abstract Syntax Tree ==
    LogicalAggregate(group=[{}], EXPR$0=[SUM(DISTINCT $0)], EXPR$1=[SUM(DISTINCT $1)], EXPR$2=[SUM($2)])
      LogicalTableScan(table=[[expr]])
    
    == Optimized Logical Plan ==
    DataSetCalc(select=[EXPR$0, EXPR$1, EXPR$2])
      DataSetSingleRowJoin(where=[true], join=[EXPR$2, EXPR$0, EXPR$1], joinType=[NestedLoopJoin])
        DataSetSingleRowJoin(where=[true], join=[EXPR$2, EXPR$0], joinType=[NestedLoopJoin])
          DataSetAggregate(select=[SUM(c) AS EXPR$2])
            DataSetUnion(union=[a, b, c])
              DataSetValues(tuples=[[{ null, null, null }]], values=[a, b, c])
              DataSetScan(table=[[_DataSetTable_0]])
          DataSetAggregate(select=[SUM(a) AS EXPR$0])
            DataSetUnion(union=[a])
              DataSetValues(tuples=[[{ null }]], values=[a])
              DataSetAggregate(groupBy=[a], select=[a])
                DataSetCalc(select=[a])
                  DataSetScan(table=[[_DataSetTable_0]])
        DataSetAggregate(select=[SUM(b) AS EXPR$1])
          DataSetUnion(union=[b])
            DataSetValues(tuples=[[{ null }]], values=[b])
            DataSetAggregate(groupBy=[b], select=[b])
              DataSetCalc(select=[b])
                DataSetScan(table=[[_DataSetTable_0]])
    
    == Physical Execution Plan ==
    Stage 8 : Data Source
    	content : collect elements with CollectionInputFormat
    	Partitioning : RANDOM_PARTITIONED
    
    Stage 14 : Data Source
    	content : collect elements with CollectionInputFormat
    	Partitioning : RANDOM_PARTITIONED
    
    	Stage 13 : Map
    		content : from: (a, b, c)
    		ship_strategy : Forward
    		exchange_mode : BATCH
    		driver_strategy : Map
    		Partitioning : RANDOM_PARTITIONED
    
    		Stage 12 : FlatMap
    			content : select: (a)
    			ship_strategy : Forward
    			exchange_mode : PIPELINED
    			driver_strategy : FlatMap
    			Partitioning : RANDOM_PARTITIONED
    
    			Stage 11 : Map
    				content : prepare select: (a)
    				ship_strategy : Forward
    				exchange_mode : PIPELINED
    				driver_strategy : Map
    				Partitioning : RANDOM_PARTITIONED
    
    				Stage 10 : GroupCombine
    					content : groupBy: (a), select: (a)
    					ship_strategy : Forward
    					exchange_mode : PIPELINED
    					driver_strategy : Sorted Combine
    					Partitioning : RANDOM_PARTITIONED
    
    					Stage 9 : GroupReduce
    						content : groupBy: (a), select: (a)
    						ship_strategy : Hash Partition on [0]
    						exchange_mode : PIPELINED
    						driver_strategy : Sorted Group Reduce
    						Partitioning : RANDOM_PARTITIONED
    
    						Stage 7 : Union
    							content : 
    							ship_strategy : Redistribute
    							exchange_mode : PIPELINED
    							Partitioning : RANDOM_PARTITIONED
    
    							Stage 6 : Map
    								content : prepare select: (SUM(a) AS EXPR$0)
    								ship_strategy : Forward
    								exchange_mode : PIPELINED
    								driver_strategy : Map
    								Partitioning : RANDOM_PARTITIONED
    
    								Stage 5 : GroupCombine
    									content : select:(SUM(a) AS EXPR$0)
    									ship_strategy : Forward
    									exchange_mode : PIPELINED
    									driver_strategy : Group Reduce All
    									Partitioning : RANDOM_PARTITIONED
    
    									Stage 4 : GroupReduce
    										content : select:(SUM(a) AS EXPR$0)
    										ship_strategy : Redistribute
    										exchange_mode : PIPELINED
    										driver_strategy : Group Reduce All
    										Partitioning : RANDOM_PARTITIONED
    
    Stage 19 : Data Source
    	content : collect elements with CollectionInputFormat
    	Partitioning : RANDOM_PARTITIONED
    
    	Stage 20 : Map
    		content : from: (a, b, c)
    		ship_strategy : Forward
    		exchange_mode : BATCH
    		driver_strategy : Map
    		Partitioning : RANDOM_PARTITIONED
    
    		Stage 18 : Union
    			content : 
    			ship_strategy : Redistribute
    			exchange_mode : PIPELINED
    			Partitioning : RANDOM_PARTITIONED
    
    			Stage 17 : Map
    				content : prepare select: (SUM(c) AS EXPR$2)
    				ship_strategy : Forward
    				exchange_mode : PIPELINED
    				driver_strategy : Map
    				Partitioning : RANDOM_PARTITIONED
    
    				Stage 16 : GroupCombine
    					content : select:(SUM(c) AS EXPR$2)
    					ship_strategy : Forward
    					exchange_mode : PIPELINED
    					driver_strategy : Group Reduce All
    					Partitioning : RANDOM_PARTITIONED
    
    					Stage 15 : GroupReduce
    						content : select:(SUM(c) AS EXPR$2)
    						ship_strategy : Redistribute
    						exchange_mode : PIPELINED
    						driver_strategy : Group Reduce All
    						Partitioning : RANDOM_PARTITIONED
    
    						Stage 3 : FlatMap
    							content : where: (true), join: (EXPR$2, EXPR$0)
    							ship_strategy : Forward
    							exchange_mode : PIPELINED
    							driver_strategy : FlatMap
    							Partitioning : RANDOM_PARTITIONED
    
    Stage 25 : Data Source
    	content : collect elements with CollectionInputFormat
    	Partitioning : RANDOM_PARTITIONED
    
    	Stage 30 : Map
    		content : from: (a, b, c)
    		ship_strategy : Forward
    		exchange_mode : BATCH
    		driver_strategy : Map
    		Partitioning : RANDOM_PARTITIONED
    
    		Stage 29 : FlatMap
    			content : select: (b)
    			ship_strategy : Forward
    			exchange_mode : PIPELINED
    			driver_strategy : FlatMap
    			Partitioning : RANDOM_PARTITIONED
    
    			Stage 28 : Map
    				content : prepare select: (b)
    				ship_strategy : Forward
    				exchange_mode : PIPELINED
    				driver_strategy : Map
    				Partitioning : RANDOM_PARTITIONED
    
    				Stage 27 : GroupCombine
    					content : groupBy: (b), select: (b)
    					ship_strategy : Forward
    					exchange_mode : PIPELINED
    					driver_strategy : Sorted Combine
    					Partitioning : RANDOM_PARTITIONED
    
    					Stage 26 : GroupReduce
    						content : groupBy: (b), select: (b)
    						ship_strategy : Hash Partition on [0]
    						exchange_mode : PIPELINED
    						driver_strategy : Sorted Group Reduce
    						Partitioning : RANDOM_PARTITIONED
    
    						Stage 24 : Union
    							content : 
    							ship_strategy : Redistribute
    							exchange_mode : PIPELINED
    							Partitioning : RANDOM_PARTITIONED
    
    							Stage 23 : Map
    								content : prepare select: (SUM(b) AS EXPR$1)
    								ship_strategy : Forward
    								exchange_mode : PIPELINED
    								driver_strategy : Map
    								Partitioning : RANDOM_PARTITIONED
    
    								Stage 22 : GroupCombine
    									content : select:(SUM(b) AS EXPR$1)
    									ship_strategy : Forward
    									exchange_mode : PIPELINED
    									driver_strategy : Group Reduce All
    									Partitioning : RANDOM_PARTITIONED
    
    									Stage 21 : GroupReduce
    										content : select:(SUM(b) AS EXPR$1)
    										ship_strategy : Redistribute
    										exchange_mode : PIPELINED
    										driver_strategy : Group Reduce All
    										Partitioning : RANDOM_PARTITIONED
    
    										Stage 2 : FlatMap
    											content : where: (true), join: (EXPR$2, EXPR$0, EXPR$1)
    											ship_strategy : Forward
    											exchange_mode : PIPELINED
    											driver_strategy : FlatMap
    											Partitioning : RANDOM_PARTITIONED
    
    											Stage 1 : FlatMap
    												content : select: (EXPR$0, EXPR$1, EXPR$2)
    												ship_strategy : Forward
    												exchange_mode : PIPELINED
    												driver_strategy : FlatMap
    												Partitioning : RANDOM_PARTITIONED
    
    												Stage 0 : Data Sink
    													content : org.apache.flink.api.java.io.DiscardingOutputFormat
    													ship_strategy : Forward
    													exchange_mode : PIPELINED
    													Partitioning : RANDOM_PARTITIONED
    
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3111#discussion_r103165057
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/rules/dataSet/FlinkAggregateExpandDistinctAggregatesRule.java ---
    @@ -0,0 +1,1152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to you under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.dataSet;
    --- End diff --
    
    Move to `org.apache.flink.table.calcite.rules` next to `FlinkAggregateJoinTransposeRule`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3111: [FLINK-3475] [Table] DISTINCT aggregate function support ...

Posted by docete <gi...@git.apache.org>.
Github user docete commented on the issue:

    https://github.com/apache/flink/pull/3111
  
    Agree. If we have more than one distinct agg with groupings, do the partition first and reuse the subsets would improve the performance. 
    Could we merge this PR first and create another JIRA to track the grouping cases? 
    We need a workaround to support distinct aggs ASAP.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...

Posted by docete <gi...@git.apache.org>.
Github user docete commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3111#discussion_r96628698
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala ---
    @@ -96,6 +96,13 @@ object FlinkRuleSets {
         ProjectToCalcRule.INSTANCE,
         CalcMergeRule.INSTANCE,
     
    +    // distinct aggregate rule for FLINK-3475
    --- End diff --
    
    Done. The tracking jira is https://issues.apache.org/jira/browse/FLINK-5545


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3111#discussion_r101735785
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/FlinkAggregateExpandDistinctAggregatesRule.java ---
    @@ -0,0 +1,1144 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to you under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.dataSet;
    +
    +import org.apache.calcite.plan.Contexts;
    +import org.apache.calcite.plan.RelOptCluster;
    +import org.apache.calcite.plan.RelOptRule;
    +import org.apache.calcite.plan.RelOptRuleCall;
    +import org.apache.calcite.rel.RelNode;
    +import org.apache.calcite.rel.core.Aggregate;
    +import org.apache.calcite.rel.core.AggregateCall;
    +import org.apache.calcite.rel.core.JoinRelType;
    +import org.apache.calcite.rel.core.RelFactories;
    +import org.apache.calcite.rel.logical.LogicalAggregate;
    +import org.apache.calcite.rel.type.RelDataType;
    +import org.apache.calcite.rel.type.RelDataTypeFactory;
    +import org.apache.calcite.rel.type.RelDataTypeField;
    +import org.apache.calcite.rex.RexBuilder;
    +import org.apache.calcite.rex.RexInputRef;
    +import org.apache.calcite.rex.RexNode;
    +import org.apache.calcite.sql.SqlAggFunction;
    +import org.apache.calcite.sql.fun.SqlCountAggFunction;
    +import org.apache.calcite.sql.fun.SqlMinMaxAggFunction;
    +import org.apache.calcite.sql.fun.SqlStdOperatorTable;
    +import org.apache.calcite.sql.fun.SqlSumAggFunction;
    +import org.apache.calcite.sql.fun.SqlSumEmptyIsZeroAggFunction;
    +import org.apache.calcite.sql.type.SqlTypeName;
    +import org.apache.calcite.tools.RelBuilder;
    +import org.apache.calcite.tools.RelBuilderFactory;
    +import org.apache.calcite.util.ImmutableBitSet;
    +import org.apache.calcite.util.ImmutableIntList;
    +import org.apache.calcite.util.Pair;
    +import org.apache.calcite.util.Util;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.Iterables;
    +import com.google.common.collect.Lists;
    +
    +import java.math.BigDecimal;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.LinkedHashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.SortedSet;
    +import java.util.TreeSet;
    +
    +/**
    + * Planner rule that expands distinct aggregates
    + * (such as {@code COUNT(DISTINCT x)}) from a
    + * {@link org.apache.calcite.rel.logical.LogicalAggregate}.
    + *
    + * <p>How this is done depends upon the arguments to the function. If all
    + * functions have the same argument
    + * (e.g. {@code COUNT(DISTINCT x), SUM(DISTINCT x)} both have the argument
    + * {@code x}) then one extra {@link org.apache.calcite.rel.core.Aggregate} is
    + * sufficient.
    + *
    + * <p>If there are multiple arguments
    + * (e.g. {@code COUNT(DISTINCT x), COUNT(DISTINCT y)})
    + * the rule creates separate {@code Aggregate}s and combines using a
    + * {@link org.apache.calcite.rel.core.Join}.
    + */
    +public final class FlinkAggregateExpandDistinctAggregatesRule extends RelOptRule {
    --- End diff --
    
    This is a Java class and should be in the Java source folder `./src/main/java`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3111: [FLINK-3475] [Table] DISTINCT aggregate function support ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/3111
  
    Looks good to me. I would remove the ITCases. Logical tests should be sufficient.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3111#discussion_r96160329
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/FlinkAggregateExpandDistinctAggregatesRule.java ---
    @@ -0,0 +1,1144 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to you under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.dataSet;
    +
    +import org.apache.calcite.plan.Contexts;
    +import org.apache.calcite.plan.RelOptCluster;
    +import org.apache.calcite.plan.RelOptRule;
    +import org.apache.calcite.plan.RelOptRuleCall;
    +import org.apache.calcite.rel.RelNode;
    +import org.apache.calcite.rel.core.Aggregate;
    +import org.apache.calcite.rel.core.AggregateCall;
    +import org.apache.calcite.rel.core.JoinRelType;
    +import org.apache.calcite.rel.core.RelFactories;
    +import org.apache.calcite.rel.logical.LogicalAggregate;
    +import org.apache.calcite.rel.type.RelDataType;
    +import org.apache.calcite.rel.type.RelDataTypeFactory;
    +import org.apache.calcite.rel.type.RelDataTypeField;
    +import org.apache.calcite.rex.RexBuilder;
    +import org.apache.calcite.rex.RexInputRef;
    +import org.apache.calcite.rex.RexNode;
    +import org.apache.calcite.sql.SqlAggFunction;
    +import org.apache.calcite.sql.fun.SqlCountAggFunction;
    +import org.apache.calcite.sql.fun.SqlMinMaxAggFunction;
    +import org.apache.calcite.sql.fun.SqlStdOperatorTable;
    +import org.apache.calcite.sql.fun.SqlSumAggFunction;
    +import org.apache.calcite.sql.fun.SqlSumEmptyIsZeroAggFunction;
    +import org.apache.calcite.sql.type.SqlTypeName;
    +import org.apache.calcite.tools.RelBuilder;
    +import org.apache.calcite.tools.RelBuilderFactory;
    +import org.apache.calcite.util.ImmutableBitSet;
    +import org.apache.calcite.util.ImmutableIntList;
    +import org.apache.calcite.util.Pair;
    +import org.apache.calcite.util.Util;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.Iterables;
    +import com.google.common.collect.Lists;
    +
    +import java.math.BigDecimal;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.LinkedHashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.SortedSet;
    +import java.util.TreeSet;
    +
    +/**
    + * Planner rule that expands distinct aggregates
    + * (such as {@code COUNT(DISTINCT x)}) from a
    + * {@link org.apache.calcite.rel.logical.LogicalAggregate}.
    + *
    + * <p>How this is done depends upon the arguments to the function. If all
    + * functions have the same argument
    + * (e.g. {@code COUNT(DISTINCT x), SUM(DISTINCT x)} both have the argument
    + * {@code x}) then one extra {@link org.apache.calcite.rel.core.Aggregate} is
    + * sufficient.
    + *
    + * <p>If there are multiple arguments
    + * (e.g. {@code COUNT(DISTINCT x), COUNT(DISTINCT y)})
    + * the rule creates separate {@code Aggregate}s and combines using a
    + * {@link org.apache.calcite.rel.core.Join}.
    + */
    +public final class FlinkAggregateExpandDistinctAggregatesRule extends RelOptRule {
    --- End diff --
    
    I would like to move this class to `org.apache.flink.table.calcite` package, and add a comment to the top of the class to annotate this is a temporary solution and should be removed later, such as 
    
    >This is a copy of Calcite's [[AggregateExpandDistinctAggregatesRule]] with a quick fix to avoid some bad case mentioned in CALCITE-1558. Should drop it and use calcite's [[AggregateExpandDistinctAggregatesRule]] when upgrade to calcite 1.12 (above).



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3111: [FLINK-3475] [Table] DISTINCT aggregate function support ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3111
  
    Merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...

Posted by KurtYoung <gi...@git.apache.org>.
Github user KurtYoung commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3111#discussion_r96329552
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala ---
    @@ -96,6 +96,13 @@ object FlinkRuleSets {
         ProjectToCalcRule.INSTANCE,
         CalcMergeRule.INSTANCE,
     
    +    // distinct aggregate rule for FLINK-3475
    --- End diff --
    
    I think this comment can move to `FlinkAggregateExpandDistinctAggregatesRule`, and open a jira to track to remove this class after we upgrade Calcite.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3111#discussion_r103169886
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala ---
    @@ -0,0 +1,275 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api.scala.batch.sql
    +
    +import org.apache.flink.table.utils.TableTestBase
    +import org.apache.flink.api.scala._
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.utils.TableTestUtil._
    +import org.junit.Test
    +
    +class DistinctAggregateTest extends TableTestBase {
    +
    +  @Test
    +  def testSingleDistinctAggregate(): Unit = {
    +    val util = batchTestUtil()
    +    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
    +
    +    val sqlQuery = "SELECT COUNT(DISTINCT a) FROM MyTable"
    +
    +    val expected = unaryNode(
    +      "DataSetAggregate",
    +      unaryNode(
    +        "DataSetUnion",
    +        unaryNode(
    +          "DataSetValues",
    +          unaryNode(
    +            "DataSetAggregate",
    +            unaryNode(
    +              "DataSetCalc",
    +              batchTableNode(0),
    +              term("select", "a")
    +            ),
    +            term("groupBy", "a"),
    +            term("select", "a")
    +          ),
    +          tuples(List(null)),
    +          term("values", "a")
    +        ),
    +        term("union", "a")
    +      ),
    +      term("select", "COUNT(a) AS EXPR$0")
    +    )
    +
    +    util.verifySql(sqlQuery, expected)
    +  }
    +
    +  @Test
    +  def testMultiDistinctAggregateOnSameColumn(): Unit = {
    +    val util = batchTestUtil()
    +    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
    +
    +    val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM MyTable"
    +
    +    val expected = unaryNode(
    +      "DataSetAggregate",
    +      unaryNode(
    +        "DataSetUnion",
    +        unaryNode(
    +          "DataSetValues",
    +          unaryNode(
    +            "DataSetAggregate",
    +            unaryNode(
    +              "DataSetCalc",
    +              batchTableNode(0),
    +              term("select", "a")
    +            ),
    +            term("groupBy", "a"),
    +            term("select", "a")
    +          ),
    +          tuples(List(null)),
    +          term("values", "a")
    +        ),
    +        term("union", "a")
    +      ),
    +      term("select", "COUNT(a) AS EXPR$0", "SUM(a) AS EXPR$1", "MAX(a) AS EXPR$2")
    +    )
    +
    +    util.verifySql(sqlQuery, expected)
    +  }
    +
    +  @Test
    +  def testSingleDistinctAggregateAndOneOrMultiNonDistinctAggregate(): Unit = {
    +    val util = batchTestUtil()
    +    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
    +
    +    // case 0x00: DISTINCT on COUNT and Non-DISTINCT on others
    +    val sqlQuery0 = "SELECT COUNT(DISTINCT a), SUM(b) FROM MyTable"
    +
    +    val expected0 = unaryNode(
    +      "DataSetAggregate",
    +      unaryNode(
    +        "DataSetUnion",
    +        unaryNode(
    +          "DataSetValues",
    +          unaryNode(
    +            "DataSetAggregate",
    +            unaryNode(
    +              "DataSetCalc",
    +              batchTableNode(0),
    +              term("select", "a", "b")
    +            ),
    +            term("groupBy", "a"),
    +            term("select", "a", "SUM(b) AS EXPR$1")
    +          ),
    +          tuples(List(null, null)),
    +          term("values", "a", "EXPR$1")
    +        ),
    +        term("union", "a", "EXPR$1")
    +      ),
    +      term("select", "COUNT(a) AS EXPR$0", "SUM(EXPR$1) AS EXPR$1")
    +    )
    +
    +    util.verifySql(sqlQuery0, expected0)
    +
    +    // case 0x01: Non-DISTINCT on COUNT and DISTINCT on others
    +    val sqlQuery1 = "SELECT COUNT(a), SUM(DISTINCT b) FROM MyTable"
    +
    +    val expected1 = unaryNode(
    +      "DataSetAggregate",
    +      unaryNode(
    +        "DataSetUnion",
    +        unaryNode(
    +          "DataSetValues",
    +          unaryNode(
    +            "DataSetAggregate",
    +            unaryNode(
    +              "DataSetCalc",
    +              batchTableNode(0),
    +              term("select", "a", "b")
    +            ),
    +            term("groupBy", "b"),
    +            term("select", "b", "COUNT(a) AS EXPR$0")
    +          ),
    +          tuples(List(null, null)),
    +          term("values", "b", "EXPR$0")
    +        ),
    +        term("union", "b", "EXPR$0")
    +      ),
    +      term("select", "$SUM0(EXPR$0) AS EXPR$0", "SUM(b) AS EXPR$1")
    +    )
    +
    +    util.verifySql(sqlQuery1, expected1)
    +  }
    +
    +  @Test
    +  def testMultiDistinctAggregateOnDifferentColumn(): Unit = {
    +    val util = batchTestUtil()
    +    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
    +
    +    val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT b) FROM MyTable"
    --- End diff --
    
    Add another test with two distinct aggregates on different attributes and a non-distinct aggregate.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3111#discussion_r97295864
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala ---
    @@ -0,0 +1,275 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api.scala.batch.sql
    +
    +import org.apache.flink.table.utils.TableTestBase
    +import org.apache.flink.api.scala._
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.utils.TableTestUtil._
    +import org.junit.Test
    +
    +class DistinctAggregateTest extends TableTestBase {
    +
    +  @Test
    +  def testSingleDistinctAggregate(): Unit = {
    +    val util = batchTestUtil()
    +    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
    +
    +    val sqlQuery = "SELECT COUNT(DISTINCT a) FROM MyTable"
    +
    +    val expected = unaryNode(
    +      "DataSetAggregate",
    +      unaryNode(
    +        "DataSetUnion",
    +        unaryNode(
    +          "DataSetValues",
    +          unaryNode(
    +            "DataSetAggregate",
    +            unaryNode(
    +              "DataSetCalc",
    +              batchTableNode(0),
    +              term("select", "a")
    +            ),
    +            term("groupBy", "a"),
    +            term("select", "a")
    +          ),
    +          tuples(List(null)),
    +          term("values", "a")
    +        ),
    +        term("union", "a")
    +      ),
    +      term("select", "COUNT(a) AS EXPR$0")
    +    )
    +
    +    util.verifySql(sqlQuery, expected)
    +  }
    +
    +  @Test
    +  def testMultiDistinctAggregateOnSameColumn(): Unit = {
    +    val util = batchTestUtil()
    +    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
    +
    +    val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM MyTable"
    +
    +    val expected = unaryNode(
    +      "DataSetAggregate",
    +      unaryNode(
    +        "DataSetUnion",
    +        unaryNode(
    +          "DataSetValues",
    +          unaryNode(
    +            "DataSetAggregate",
    +            unaryNode(
    +              "DataSetCalc",
    +              batchTableNode(0),
    +              term("select", "a")
    +            ),
    +            term("groupBy", "a"),
    +            term("select", "a")
    +          ),
    +          tuples(List(null)),
    +          term("values", "a")
    +        ),
    +        term("union", "a")
    +      ),
    +      term("select", "COUNT(a) AS EXPR$0", "SUM(a) AS EXPR$1", "MAX(a) AS EXPR$2")
    +    )
    +
    +    util.verifySql(sqlQuery, expected)
    +  }
    +
    +  @Test
    +  def testSingleDistinctAggregateAndOneOrMultiNonDistinctAggregate(): Unit = {
    +    val util = batchTestUtil()
    +    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
    +
    +    // case 0x00: DISTINCT on COUNT and Non-DISTINCT on others
    +    val sqlQuery0 = "SELECT COUNT(DISTINCT a), SUM(b) FROM MyTable"
    +
    +    val expected0 = unaryNode(
    +      "DataSetAggregate",
    +      unaryNode(
    +        "DataSetUnion",
    +        unaryNode(
    +          "DataSetValues",
    +          unaryNode(
    +            "DataSetAggregate",
    +            unaryNode(
    +              "DataSetCalc",
    +              batchTableNode(0),
    +              term("select", "a", "b")
    +            ),
    +            term("groupBy", "a"),
    +            term("select", "a", "SUM(b) AS EXPR$1")
    +          ),
    +          tuples(List(null, null)),
    +          term("values", "a", "EXPR$1")
    +        ),
    +        term("union", "a", "EXPR$1")
    +      ),
    +      term("select", "COUNT(a) AS EXPR$0", "SUM(EXPR$1) AS EXPR$1")
    +    )
    +
    +    util.verifySql(sqlQuery0, expected0)
    +
    +    // case 0x01: Non-DISTINCT on COUNT and DISTINCT on others
    +    val sqlQuery1 = "SELECT COUNT(a), SUM(DISTINCT b) FROM MyTable"
    +
    +    val expected1 = unaryNode(
    +      "DataSetAggregate",
    +      unaryNode(
    +        "DataSetUnion",
    +        unaryNode(
    +          "DataSetValues",
    +          unaryNode(
    +            "DataSetAggregate",
    +            unaryNode(
    +              "DataSetCalc",
    +              batchTableNode(0),
    +              term("select", "a", "b")
    +            ),
    +            term("groupBy", "b"),
    +            term("select", "b", "COUNT(a) AS EXPR$0")
    +          ),
    +          tuples(List(null, null)),
    +          term("values", "b", "EXPR$0")
    +        ),
    +        term("union", "b", "EXPR$0")
    +      ),
    +      term("select", "$SUM0(EXPR$0) AS EXPR$0", "SUM(b) AS EXPR$1")
    +    )
    +
    +    util.verifySql(sqlQuery1, expected1)
    +  }
    +
    +  @Test
    +  def testMultiDistinctAggregateOnDifferentColumn(): Unit = {
    +    val util = batchTestUtil()
    +    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
    +
    +    val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT b) FROM MyTable"
    +
    +    val expected = binaryNode(
    +      "DataSetSingleRowJoin",
    +      unaryNode(
    +        "DataSetAggregate",
    +        unaryNode(
    +          "DataSetUnion",
    +          unaryNode(
    +            "DataSetValues",
    +            unaryNode(
    +              "DataSetAggregate",
    +              unaryNode(
    +                "DataSetCalc",
    +                batchTableNode(0),
    +                term("select", "a")
    +              ),
    +              term("groupBy", "a"),
    +              term("select", "a")
    +            ),
    +            tuples(List(null)),
    +            term("values", "a")
    +          ),
    +          term("union", "a")
    +        ),
    +        term("select", "COUNT(a) AS EXPR$0")
    +      ),
    +      unaryNode(
    +        "DataSetAggregate",
    +        unaryNode(
    +          "DataSetUnion",
    +          unaryNode(
    +            "DataSetValues",
    +            unaryNode(
    +              "DataSetAggregate",
    +              unaryNode(
    +                "DataSetCalc",
    +                batchTableNode(0),
    +                term("select", "b")
    +              ),
    +              term("groupBy", "b"),
    +              term("select", "b")
    +            ),
    +            tuples(List(null)),
    +            term("values", "b")
    +          ),
    +          term("union", "b")
    +        ),
    +        term("select", "SUM(b) AS EXPR$1")
    +      ),
    +      term("where", "true"),
    +      term("join", "EXPR$0", "EXPR$1"),
    +      term("joinType", "NestedLoopJoin")
    --- End diff --
    
    This join type is not supported. It would fail during execution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3111#discussion_r101732154
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala ---
    @@ -51,7 +51,7 @@ class DataSetAggregateWithNullValuesRule
         // check if we have distinct aggregates
         val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
         if (distinctAggs) {
    --- End diff --
    
    The condition can be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3111#discussion_r96160082
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala ---
    @@ -165,9 +165,13 @@ class AggregationTest extends TableTestBase {
         val util = batchTestUtil()
         val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
     
    -    val resultTable = sourceTable.groupBy('a)
    +    // Move "where" before "groupBy" for the former query would generate
    +    // nondeterministic plans with same cost. If we change FlinkRuleSets.DATASET_OPT_RULES,
    +    // the importance of relNode may change, and the test may fail. This issue is mentioned
    +    // in FLINK-5394, we could move "where" to the end when FLINK-5394 is fixed.
    +    val resultTable = sourceTable.where('a === 1).groupBy('a)
    --- End diff --
    
    It might make sense to wait with this until #3058 is in. It is almost done I think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3111#discussion_r103170129
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala ---
    @@ -0,0 +1,275 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api.scala.batch.sql
    +
    +import org.apache.flink.table.utils.TableTestBase
    +import org.apache.flink.api.scala._
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.utils.TableTestUtil._
    +import org.junit.Test
    +
    +class DistinctAggregateTest extends TableTestBase {
    +
    +  @Test
    +  def testSingleDistinctAggregate(): Unit = {
    +    val util = batchTestUtil()
    +    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
    +
    +    val sqlQuery = "SELECT COUNT(DISTINCT a) FROM MyTable"
    +
    +    val expected = unaryNode(
    +      "DataSetAggregate",
    +      unaryNode(
    +        "DataSetUnion",
    +        unaryNode(
    +          "DataSetValues",
    +          unaryNode(
    +            "DataSetAggregate",
    +            unaryNode(
    +              "DataSetCalc",
    +              batchTableNode(0),
    +              term("select", "a")
    +            ),
    +            term("groupBy", "a"),
    +            term("select", "a")
    +          ),
    +          tuples(List(null)),
    +          term("values", "a")
    +        ),
    +        term("union", "a")
    +      ),
    +      term("select", "COUNT(a) AS EXPR$0")
    +    )
    +
    +    util.verifySql(sqlQuery, expected)
    +  }
    +
    +  @Test
    +  def testMultiDistinctAggregateOnSameColumn(): Unit = {
    +    val util = batchTestUtil()
    +    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
    +
    +    val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM MyTable"
    +
    +    val expected = unaryNode(
    +      "DataSetAggregate",
    +      unaryNode(
    +        "DataSetUnion",
    +        unaryNode(
    +          "DataSetValues",
    +          unaryNode(
    +            "DataSetAggregate",
    +            unaryNode(
    +              "DataSetCalc",
    +              batchTableNode(0),
    +              term("select", "a")
    +            ),
    +            term("groupBy", "a"),
    +            term("select", "a")
    +          ),
    +          tuples(List(null)),
    +          term("values", "a")
    +        ),
    +        term("union", "a")
    +      ),
    +      term("select", "COUNT(a) AS EXPR$0", "SUM(a) AS EXPR$1", "MAX(a) AS EXPR$2")
    +    )
    +
    +    util.verifySql(sqlQuery, expected)
    +  }
    +
    +  @Test
    +  def testSingleDistinctAggregateAndOneOrMultiNonDistinctAggregate(): Unit = {
    +    val util = batchTestUtil()
    +    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
    +
    +    // case 0x00: DISTINCT on COUNT and Non-DISTINCT on others
    +    val sqlQuery0 = "SELECT COUNT(DISTINCT a), SUM(b) FROM MyTable"
    +
    +    val expected0 = unaryNode(
    +      "DataSetAggregate",
    +      unaryNode(
    +        "DataSetUnion",
    +        unaryNode(
    +          "DataSetValues",
    +          unaryNode(
    +            "DataSetAggregate",
    +            unaryNode(
    +              "DataSetCalc",
    +              batchTableNode(0),
    +              term("select", "a", "b")
    +            ),
    +            term("groupBy", "a"),
    +            term("select", "a", "SUM(b) AS EXPR$1")
    +          ),
    +          tuples(List(null, null)),
    +          term("values", "a", "EXPR$1")
    +        ),
    +        term("union", "a", "EXPR$1")
    +      ),
    +      term("select", "COUNT(a) AS EXPR$0", "SUM(EXPR$1) AS EXPR$1")
    +    )
    +
    +    util.verifySql(sqlQuery0, expected0)
    +
    +    // case 0x01: Non-DISTINCT on COUNT and DISTINCT on others
    +    val sqlQuery1 = "SELECT COUNT(a), SUM(DISTINCT b) FROM MyTable"
    +
    +    val expected1 = unaryNode(
    +      "DataSetAggregate",
    +      unaryNode(
    +        "DataSetUnion",
    +        unaryNode(
    +          "DataSetValues",
    +          unaryNode(
    +            "DataSetAggregate",
    +            unaryNode(
    +              "DataSetCalc",
    +              batchTableNode(0),
    +              term("select", "a", "b")
    +            ),
    +            term("groupBy", "b"),
    +            term("select", "b", "COUNT(a) AS EXPR$0")
    +          ),
    +          tuples(List(null, null)),
    +          term("values", "b", "EXPR$0")
    +        ),
    +        term("union", "b", "EXPR$0")
    +      ),
    +      term("select", "$SUM0(EXPR$0) AS EXPR$0", "SUM(b) AS EXPR$1")
    +    )
    +
    +    util.verifySql(sqlQuery1, expected1)
    +  }
    +
    +  @Test
    +  def testMultiDistinctAggregateOnDifferentColumn(): Unit = {
    +    val util = batchTestUtil()
    +    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
    +
    +    val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT b) FROM MyTable"
    +
    +    val expected = binaryNode(
    +      "DataSetSingleRowJoin",
    +      unaryNode(
    +        "DataSetAggregate",
    +        unaryNode(
    +          "DataSetUnion",
    +          unaryNode(
    +            "DataSetValues",
    +            unaryNode(
    +              "DataSetAggregate",
    +              unaryNode(
    +                "DataSetCalc",
    +                batchTableNode(0),
    +                term("select", "a")
    +              ),
    +              term("groupBy", "a"),
    +              term("select", "a")
    +            ),
    +            tuples(List(null)),
    +            term("values", "a")
    +          ),
    +          term("union", "a")
    +        ),
    +        term("select", "COUNT(a) AS EXPR$0")
    +      ),
    +      unaryNode(
    +        "DataSetAggregate",
    +        unaryNode(
    +          "DataSetUnion",
    +          unaryNode(
    +            "DataSetValues",
    +            unaryNode(
    +              "DataSetAggregate",
    +              unaryNode(
    +                "DataSetCalc",
    +                batchTableNode(0),
    +                term("select", "b")
    +              ),
    +              term("groupBy", "b"),
    +              term("select", "b")
    +            ),
    +            tuples(List(null)),
    +            term("values", "b")
    +          ),
    +          term("union", "b")
    +        ),
    +        term("select", "SUM(b) AS EXPR$1")
    +      ),
    +      term("where", "true"),
    +      term("join", "EXPR$0", "EXPR$1"),
    +      term("joinType", "NestedLoopJoin")
    +    )
    +
    +    util.verifySql(sqlQuery, expected)
    +  }
    +
    +  @Test
    +  def testSingleDistinctAggregateWithGrouping(): Unit = {
    +    val util = batchTestUtil()
    +    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
    +
    +    val sqlQuery = "SELECT a, COUNT(a), SUM(DISTINCT b) FROM MyTable GROUP BY a"
    +
    +    val expected = unaryNode(
    +      "DataSetAggregate",
    +      unaryNode(
    +        "DataSetAggregate",
    +        unaryNode(
    +          "DataSetCalc",
    +          batchTableNode(0),
    +          term("select", "a", "b")
    +        ),
    +        term("groupBy", "a", "b"),
    +        term("select", "a", "b", "COUNT(a) AS EXPR$1")
    +      ),
    +      term("groupBy", "a"),
    +      term("select", "a", "SUM(EXPR$1) AS EXPR$1", "SUM(b) AS EXPR$2")
    +    )
    +
    +    util.verifySql(sqlQuery, expected)
    +  }
    +
    +  @Test
    +  def testSingleDistinctAggregateWithGroupingAndCountStar(): Unit = {
    +    val util = batchTestUtil()
    +    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
    +
    +    val sqlQuery = "SELECT a, COUNT(*), SUM(DISTINCT b) FROM MyTable GROUP BY a"
    --- End diff --
    
    Add two tests for `GROUP BY` with two distinct aggregates
    1. on same column
    2. on different column


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3111: [FLINK-3475] [Table] DISTINCT aggregate function support ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3111
  
    Hi @docete,
    
    thanks for posting the plan. It looks OK, but this is because the query computes non-grouped aggregates. 
    
    In case of grouped aggregates, the resulting plan will be less efficient than the plan I proposed because the it will not reuse existing partitioning and sorting properties of the data. This will result in at least one shuffle and one full sort for each distinct aggregate.
    
    The trick is to explicitly partition the data for the distinct operation on a subset (i.e., the grouping keys) of the attributes that usually would be used. The following aggregations and joins can be performed in a streaming fashion without partitioning or sorting the data again. This is not possible with your plan. 
    
    We could try to implement that with some tweaking as an optimization rule (which would be custom and based on the rule you copied from Calcite) or implement it as a dedicated `DataSetRelNode` for distinct aggregates. I'm more in favor of the latter option. 
    Once, the optimizer tracks physical data properties, it is easier to implement distinct aggregates using optimizer rules.
    
    What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3111


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3111#discussion_r96572366
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/FlinkAggregateExpandDistinctAggregatesRule.java ---
    @@ -0,0 +1,1144 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to you under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.dataSet;
    +
    +import org.apache.calcite.plan.Contexts;
    +import org.apache.calcite.plan.RelOptCluster;
    +import org.apache.calcite.plan.RelOptRule;
    +import org.apache.calcite.plan.RelOptRuleCall;
    +import org.apache.calcite.rel.RelNode;
    +import org.apache.calcite.rel.core.Aggregate;
    +import org.apache.calcite.rel.core.AggregateCall;
    +import org.apache.calcite.rel.core.JoinRelType;
    +import org.apache.calcite.rel.core.RelFactories;
    +import org.apache.calcite.rel.logical.LogicalAggregate;
    +import org.apache.calcite.rel.type.RelDataType;
    +import org.apache.calcite.rel.type.RelDataTypeFactory;
    +import org.apache.calcite.rel.type.RelDataTypeField;
    +import org.apache.calcite.rex.RexBuilder;
    +import org.apache.calcite.rex.RexInputRef;
    +import org.apache.calcite.rex.RexNode;
    +import org.apache.calcite.sql.SqlAggFunction;
    +import org.apache.calcite.sql.fun.SqlCountAggFunction;
    +import org.apache.calcite.sql.fun.SqlMinMaxAggFunction;
    +import org.apache.calcite.sql.fun.SqlStdOperatorTable;
    +import org.apache.calcite.sql.fun.SqlSumAggFunction;
    +import org.apache.calcite.sql.fun.SqlSumEmptyIsZeroAggFunction;
    +import org.apache.calcite.sql.type.SqlTypeName;
    +import org.apache.calcite.tools.RelBuilder;
    +import org.apache.calcite.tools.RelBuilderFactory;
    +import org.apache.calcite.util.ImmutableBitSet;
    +import org.apache.calcite.util.ImmutableIntList;
    +import org.apache.calcite.util.Pair;
    +import org.apache.calcite.util.Util;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.Iterables;
    +import com.google.common.collect.Lists;
    +
    +import java.math.BigDecimal;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.LinkedHashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.SortedSet;
    +import java.util.TreeSet;
    +
    +/**
    + * Planner rule that expands distinct aggregates
    + * (such as {@code COUNT(DISTINCT x)}) from a
    + * {@link org.apache.calcite.rel.logical.LogicalAggregate}.
    + *
    + * <p>How this is done depends upon the arguments to the function. If all
    + * functions have the same argument
    + * (e.g. {@code COUNT(DISTINCT x), SUM(DISTINCT x)} both have the argument
    + * {@code x}) then one extra {@link org.apache.calcite.rel.core.Aggregate} is
    + * sufficient.
    + *
    + * <p>If there are multiple arguments
    + * (e.g. {@code COUNT(DISTINCT x), COUNT(DISTINCT y)})
    + * the rule creates separate {@code Aggregate}s and combines using a
    + * {@link org.apache.calcite.rel.core.Join}.
    + */
    +public final class FlinkAggregateExpandDistinctAggregatesRule extends RelOptRule {
    --- End diff --
    
    Thank you. That's fine to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3111: [FLINK-3475] [Table] DISTINCT aggregate function support ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3111
  
    Thanks for the PR @docete.
    I'll have a look at it soon.
    
    Best, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...

Posted by docete <gi...@git.apache.org>.
Github user docete commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3111#discussion_r96568191
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala ---
    @@ -165,9 +165,13 @@ class AggregationTest extends TableTestBase {
         val util = batchTestUtil()
         val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
     
    -    val resultTable = sourceTable.groupBy('a)
    +    // Move "where" before "groupBy" for the former query would generate
    +    // nondeterministic plans with same cost. If we change FlinkRuleSets.DATASET_OPT_RULES,
    +    // the importance of relNode may change, and the test may fail. This issue is mentioned
    +    // in FLINK-5394, we could move "where" to the end when FLINK-5394 is fixed.
    +    val resultTable = sourceTable.where('a === 1).groupBy('a)
    --- End diff --
    
    I will check the finish time for #3058 with @beyond1920 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---