You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/02/06 08:03:42 UTC

[jira] [Commented] (FLINK-3475) DISTINCT aggregate function support for SQL queries

    [ https://issues.apache.org/jira/browse/FLINK-3475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853647#comment-15853647 ] 

ASF GitHub Bot commented on FLINK-3475:
---------------------------------------

Github user docete commented on the issue:

    https://github.com/apache/flink/pull/3111
  
    @fhueske Yes, I have checked the execution plan. It's very similar to your description:
    
    Take example for SQL "select sum(distinct a), sum(distinct b), sum(c) from expr", where expr is a table, and it has 3 fields: a, b, c.
    
    The explaination for the query is:
    ```
    == Abstract Syntax Tree ==
    LogicalAggregate(group=[{}], EXPR$0=[SUM(DISTINCT $0)], EXPR$1=[SUM(DISTINCT $1)], EXPR$2=[SUM($2)])
      LogicalTableScan(table=[[expr]])
    
    == Optimized Logical Plan ==
    DataSetCalc(select=[EXPR$0, EXPR$1, EXPR$2])
      DataSetSingleRowJoin(where=[true], join=[EXPR$2, EXPR$0, EXPR$1], joinType=[NestedLoopJoin])
        DataSetSingleRowJoin(where=[true], join=[EXPR$2, EXPR$0], joinType=[NestedLoopJoin])
          DataSetAggregate(select=[SUM(c) AS EXPR$2])
            DataSetUnion(union=[a, b, c])
              DataSetValues(tuples=[[{ null, null, null }]], values=[a, b, c])
              DataSetScan(table=[[_DataSetTable_0]])
          DataSetAggregate(select=[SUM(a) AS EXPR$0])
            DataSetUnion(union=[a])
              DataSetValues(tuples=[[{ null }]], values=[a])
              DataSetAggregate(groupBy=[a], select=[a])
                DataSetCalc(select=[a])
                  DataSetScan(table=[[_DataSetTable_0]])
        DataSetAggregate(select=[SUM(b) AS EXPR$1])
          DataSetUnion(union=[b])
            DataSetValues(tuples=[[{ null }]], values=[b])
            DataSetAggregate(groupBy=[b], select=[b])
              DataSetCalc(select=[b])
                DataSetScan(table=[[_DataSetTable_0]])
    
    == Physical Execution Plan ==
    Stage 8 : Data Source
    	content : collect elements with CollectionInputFormat
    	Partitioning : RANDOM_PARTITIONED
    
    Stage 14 : Data Source
    	content : collect elements with CollectionInputFormat
    	Partitioning : RANDOM_PARTITIONED
    
    	Stage 13 : Map
    		content : from: (a, b, c)
    		ship_strategy : Forward
    		exchange_mode : BATCH
    		driver_strategy : Map
    		Partitioning : RANDOM_PARTITIONED
    
    		Stage 12 : FlatMap
    			content : select: (a)
    			ship_strategy : Forward
    			exchange_mode : PIPELINED
    			driver_strategy : FlatMap
    			Partitioning : RANDOM_PARTITIONED
    
    			Stage 11 : Map
    				content : prepare select: (a)
    				ship_strategy : Forward
    				exchange_mode : PIPELINED
    				driver_strategy : Map
    				Partitioning : RANDOM_PARTITIONED
    
    				Stage 10 : GroupCombine
    					content : groupBy: (a), select: (a)
    					ship_strategy : Forward
    					exchange_mode : PIPELINED
    					driver_strategy : Sorted Combine
    					Partitioning : RANDOM_PARTITIONED
    
    					Stage 9 : GroupReduce
    						content : groupBy: (a), select: (a)
    						ship_strategy : Hash Partition on [0]
    						exchange_mode : PIPELINED
    						driver_strategy : Sorted Group Reduce
    						Partitioning : RANDOM_PARTITIONED
    
    						Stage 7 : Union
    							content : 
    							ship_strategy : Redistribute
    							exchange_mode : PIPELINED
    							Partitioning : RANDOM_PARTITIONED
    
    							Stage 6 : Map
    								content : prepare select: (SUM(a) AS EXPR$0)
    								ship_strategy : Forward
    								exchange_mode : PIPELINED
    								driver_strategy : Map
    								Partitioning : RANDOM_PARTITIONED
    
    								Stage 5 : GroupCombine
    									content : select:(SUM(a) AS EXPR$0)
    									ship_strategy : Forward
    									exchange_mode : PIPELINED
    									driver_strategy : Group Reduce All
    									Partitioning : RANDOM_PARTITIONED
    
    									Stage 4 : GroupReduce
    										content : select:(SUM(a) AS EXPR$0)
    										ship_strategy : Redistribute
    										exchange_mode : PIPELINED
    										driver_strategy : Group Reduce All
    										Partitioning : RANDOM_PARTITIONED
    
    Stage 19 : Data Source
    	content : collect elements with CollectionInputFormat
    	Partitioning : RANDOM_PARTITIONED
    
    	Stage 20 : Map
    		content : from: (a, b, c)
    		ship_strategy : Forward
    		exchange_mode : BATCH
    		driver_strategy : Map
    		Partitioning : RANDOM_PARTITIONED
    
    		Stage 18 : Union
    			content : 
    			ship_strategy : Redistribute
    			exchange_mode : PIPELINED
    			Partitioning : RANDOM_PARTITIONED
    
    			Stage 17 : Map
    				content : prepare select: (SUM(c) AS EXPR$2)
    				ship_strategy : Forward
    				exchange_mode : PIPELINED
    				driver_strategy : Map
    				Partitioning : RANDOM_PARTITIONED
    
    				Stage 16 : GroupCombine
    					content : select:(SUM(c) AS EXPR$2)
    					ship_strategy : Forward
    					exchange_mode : PIPELINED
    					driver_strategy : Group Reduce All
    					Partitioning : RANDOM_PARTITIONED
    
    					Stage 15 : GroupReduce
    						content : select:(SUM(c) AS EXPR$2)
    						ship_strategy : Redistribute
    						exchange_mode : PIPELINED
    						driver_strategy : Group Reduce All
    						Partitioning : RANDOM_PARTITIONED
    
    						Stage 3 : FlatMap
    							content : where: (true), join: (EXPR$2, EXPR$0)
    							ship_strategy : Forward
    							exchange_mode : PIPELINED
    							driver_strategy : FlatMap
    							Partitioning : RANDOM_PARTITIONED
    
    Stage 25 : Data Source
    	content : collect elements with CollectionInputFormat
    	Partitioning : RANDOM_PARTITIONED
    
    	Stage 30 : Map
    		content : from: (a, b, c)
    		ship_strategy : Forward
    		exchange_mode : BATCH
    		driver_strategy : Map
    		Partitioning : RANDOM_PARTITIONED
    
    		Stage 29 : FlatMap
    			content : select: (b)
    			ship_strategy : Forward
    			exchange_mode : PIPELINED
    			driver_strategy : FlatMap
    			Partitioning : RANDOM_PARTITIONED
    
    			Stage 28 : Map
    				content : prepare select: (b)
    				ship_strategy : Forward
    				exchange_mode : PIPELINED
    				driver_strategy : Map
    				Partitioning : RANDOM_PARTITIONED
    
    				Stage 27 : GroupCombine
    					content : groupBy: (b), select: (b)
    					ship_strategy : Forward
    					exchange_mode : PIPELINED
    					driver_strategy : Sorted Combine
    					Partitioning : RANDOM_PARTITIONED
    
    					Stage 26 : GroupReduce
    						content : groupBy: (b), select: (b)
    						ship_strategy : Hash Partition on [0]
    						exchange_mode : PIPELINED
    						driver_strategy : Sorted Group Reduce
    						Partitioning : RANDOM_PARTITIONED
    
    						Stage 24 : Union
    							content : 
    							ship_strategy : Redistribute
    							exchange_mode : PIPELINED
    							Partitioning : RANDOM_PARTITIONED
    
    							Stage 23 : Map
    								content : prepare select: (SUM(b) AS EXPR$1)
    								ship_strategy : Forward
    								exchange_mode : PIPELINED
    								driver_strategy : Map
    								Partitioning : RANDOM_PARTITIONED
    
    								Stage 22 : GroupCombine
    									content : select:(SUM(b) AS EXPR$1)
    									ship_strategy : Forward
    									exchange_mode : PIPELINED
    									driver_strategy : Group Reduce All
    									Partitioning : RANDOM_PARTITIONED
    
    									Stage 21 : GroupReduce
    										content : select:(SUM(b) AS EXPR$1)
    										ship_strategy : Redistribute
    										exchange_mode : PIPELINED
    										driver_strategy : Group Reduce All
    										Partitioning : RANDOM_PARTITIONED
    
    										Stage 2 : FlatMap
    											content : where: (true), join: (EXPR$2, EXPR$0, EXPR$1)
    											ship_strategy : Forward
    											exchange_mode : PIPELINED
    											driver_strategy : FlatMap
    											Partitioning : RANDOM_PARTITIONED
    
    											Stage 1 : FlatMap
    												content : select: (EXPR$0, EXPR$1, EXPR$2)
    												ship_strategy : Forward
    												exchange_mode : PIPELINED
    												driver_strategy : FlatMap
    												Partitioning : RANDOM_PARTITIONED
    
    												Stage 0 : Data Sink
    													content : org.apache.flink.api.java.io.DiscardingOutputFormat
    													ship_strategy : Forward
    													exchange_mode : PIPELINED
    													Partitioning : RANDOM_PARTITIONED
    
    ```


> DISTINCT aggregate function support for SQL queries
> ---------------------------------------------------
>
>                 Key: FLINK-3475
>                 URL: https://issues.apache.org/jira/browse/FLINK-3475
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Chengxiang Li
>            Assignee: Zhenghua Gao
>
> DISTINCT aggregate function may be able to reuse the aggregate function instead of separate implementation, and let Flink runtime take care of duplicate records.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)