You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by docete <gi...@git.apache.org> on 2017/02/06 08:03:42 UTC

[GitHub] flink issue #3111: [FLINK-3475] [Table] DISTINCT aggregate function support ...

Github user docete commented on the issue:

    https://github.com/apache/flink/pull/3111
  
    @fhueske Yes, I have checked the execution plan. It's very similar to your description:
    
    Take example for SQL "select sum(distinct a), sum(distinct b), sum(c) from expr", where expr is a table, and it has 3 fields: a, b, c.
    
    The explaination for the query is:
    ```
    == Abstract Syntax Tree ==
    LogicalAggregate(group=[{}], EXPR$0=[SUM(DISTINCT $0)], EXPR$1=[SUM(DISTINCT $1)], EXPR$2=[SUM($2)])
      LogicalTableScan(table=[[expr]])
    
    == Optimized Logical Plan ==
    DataSetCalc(select=[EXPR$0, EXPR$1, EXPR$2])
      DataSetSingleRowJoin(where=[true], join=[EXPR$2, EXPR$0, EXPR$1], joinType=[NestedLoopJoin])
        DataSetSingleRowJoin(where=[true], join=[EXPR$2, EXPR$0], joinType=[NestedLoopJoin])
          DataSetAggregate(select=[SUM(c) AS EXPR$2])
            DataSetUnion(union=[a, b, c])
              DataSetValues(tuples=[[{ null, null, null }]], values=[a, b, c])
              DataSetScan(table=[[_DataSetTable_0]])
          DataSetAggregate(select=[SUM(a) AS EXPR$0])
            DataSetUnion(union=[a])
              DataSetValues(tuples=[[{ null }]], values=[a])
              DataSetAggregate(groupBy=[a], select=[a])
                DataSetCalc(select=[a])
                  DataSetScan(table=[[_DataSetTable_0]])
        DataSetAggregate(select=[SUM(b) AS EXPR$1])
          DataSetUnion(union=[b])
            DataSetValues(tuples=[[{ null }]], values=[b])
            DataSetAggregate(groupBy=[b], select=[b])
              DataSetCalc(select=[b])
                DataSetScan(table=[[_DataSetTable_0]])
    
    == Physical Execution Plan ==
    Stage 8 : Data Source
    	content : collect elements with CollectionInputFormat
    	Partitioning : RANDOM_PARTITIONED
    
    Stage 14 : Data Source
    	content : collect elements with CollectionInputFormat
    	Partitioning : RANDOM_PARTITIONED
    
    	Stage 13 : Map
    		content : from: (a, b, c)
    		ship_strategy : Forward
    		exchange_mode : BATCH
    		driver_strategy : Map
    		Partitioning : RANDOM_PARTITIONED
    
    		Stage 12 : FlatMap
    			content : select: (a)
    			ship_strategy : Forward
    			exchange_mode : PIPELINED
    			driver_strategy : FlatMap
    			Partitioning : RANDOM_PARTITIONED
    
    			Stage 11 : Map
    				content : prepare select: (a)
    				ship_strategy : Forward
    				exchange_mode : PIPELINED
    				driver_strategy : Map
    				Partitioning : RANDOM_PARTITIONED
    
    				Stage 10 : GroupCombine
    					content : groupBy: (a), select: (a)
    					ship_strategy : Forward
    					exchange_mode : PIPELINED
    					driver_strategy : Sorted Combine
    					Partitioning : RANDOM_PARTITIONED
    
    					Stage 9 : GroupReduce
    						content : groupBy: (a), select: (a)
    						ship_strategy : Hash Partition on [0]
    						exchange_mode : PIPELINED
    						driver_strategy : Sorted Group Reduce
    						Partitioning : RANDOM_PARTITIONED
    
    						Stage 7 : Union
    							content : 
    							ship_strategy : Redistribute
    							exchange_mode : PIPELINED
    							Partitioning : RANDOM_PARTITIONED
    
    							Stage 6 : Map
    								content : prepare select: (SUM(a) AS EXPR$0)
    								ship_strategy : Forward
    								exchange_mode : PIPELINED
    								driver_strategy : Map
    								Partitioning : RANDOM_PARTITIONED
    
    								Stage 5 : GroupCombine
    									content : select:(SUM(a) AS EXPR$0)
    									ship_strategy : Forward
    									exchange_mode : PIPELINED
    									driver_strategy : Group Reduce All
    									Partitioning : RANDOM_PARTITIONED
    
    									Stage 4 : GroupReduce
    										content : select:(SUM(a) AS EXPR$0)
    										ship_strategy : Redistribute
    										exchange_mode : PIPELINED
    										driver_strategy : Group Reduce All
    										Partitioning : RANDOM_PARTITIONED
    
    Stage 19 : Data Source
    	content : collect elements with CollectionInputFormat
    	Partitioning : RANDOM_PARTITIONED
    
    	Stage 20 : Map
    		content : from: (a, b, c)
    		ship_strategy : Forward
    		exchange_mode : BATCH
    		driver_strategy : Map
    		Partitioning : RANDOM_PARTITIONED
    
    		Stage 18 : Union
    			content : 
    			ship_strategy : Redistribute
    			exchange_mode : PIPELINED
    			Partitioning : RANDOM_PARTITIONED
    
    			Stage 17 : Map
    				content : prepare select: (SUM(c) AS EXPR$2)
    				ship_strategy : Forward
    				exchange_mode : PIPELINED
    				driver_strategy : Map
    				Partitioning : RANDOM_PARTITIONED
    
    				Stage 16 : GroupCombine
    					content : select:(SUM(c) AS EXPR$2)
    					ship_strategy : Forward
    					exchange_mode : PIPELINED
    					driver_strategy : Group Reduce All
    					Partitioning : RANDOM_PARTITIONED
    
    					Stage 15 : GroupReduce
    						content : select:(SUM(c) AS EXPR$2)
    						ship_strategy : Redistribute
    						exchange_mode : PIPELINED
    						driver_strategy : Group Reduce All
    						Partitioning : RANDOM_PARTITIONED
    
    						Stage 3 : FlatMap
    							content : where: (true), join: (EXPR$2, EXPR$0)
    							ship_strategy : Forward
    							exchange_mode : PIPELINED
    							driver_strategy : FlatMap
    							Partitioning : RANDOM_PARTITIONED
    
    Stage 25 : Data Source
    	content : collect elements with CollectionInputFormat
    	Partitioning : RANDOM_PARTITIONED
    
    	Stage 30 : Map
    		content : from: (a, b, c)
    		ship_strategy : Forward
    		exchange_mode : BATCH
    		driver_strategy : Map
    		Partitioning : RANDOM_PARTITIONED
    
    		Stage 29 : FlatMap
    			content : select: (b)
    			ship_strategy : Forward
    			exchange_mode : PIPELINED
    			driver_strategy : FlatMap
    			Partitioning : RANDOM_PARTITIONED
    
    			Stage 28 : Map
    				content : prepare select: (b)
    				ship_strategy : Forward
    				exchange_mode : PIPELINED
    				driver_strategy : Map
    				Partitioning : RANDOM_PARTITIONED
    
    				Stage 27 : GroupCombine
    					content : groupBy: (b), select: (b)
    					ship_strategy : Forward
    					exchange_mode : PIPELINED
    					driver_strategy : Sorted Combine
    					Partitioning : RANDOM_PARTITIONED
    
    					Stage 26 : GroupReduce
    						content : groupBy: (b), select: (b)
    						ship_strategy : Hash Partition on [0]
    						exchange_mode : PIPELINED
    						driver_strategy : Sorted Group Reduce
    						Partitioning : RANDOM_PARTITIONED
    
    						Stage 24 : Union
    							content : 
    							ship_strategy : Redistribute
    							exchange_mode : PIPELINED
    							Partitioning : RANDOM_PARTITIONED
    
    							Stage 23 : Map
    								content : prepare select: (SUM(b) AS EXPR$1)
    								ship_strategy : Forward
    								exchange_mode : PIPELINED
    								driver_strategy : Map
    								Partitioning : RANDOM_PARTITIONED
    
    								Stage 22 : GroupCombine
    									content : select:(SUM(b) AS EXPR$1)
    									ship_strategy : Forward
    									exchange_mode : PIPELINED
    									driver_strategy : Group Reduce All
    									Partitioning : RANDOM_PARTITIONED
    
    									Stage 21 : GroupReduce
    										content : select:(SUM(b) AS EXPR$1)
    										ship_strategy : Redistribute
    										exchange_mode : PIPELINED
    										driver_strategy : Group Reduce All
    										Partitioning : RANDOM_PARTITIONED
    
    										Stage 2 : FlatMap
    											content : where: (true), join: (EXPR$2, EXPR$0, EXPR$1)
    											ship_strategy : Forward
    											exchange_mode : PIPELINED
    											driver_strategy : FlatMap
    											Partitioning : RANDOM_PARTITIONED
    
    											Stage 1 : FlatMap
    												content : select: (EXPR$0, EXPR$1, EXPR$2)
    												ship_strategy : Forward
    												exchange_mode : PIPELINED
    												driver_strategy : FlatMap
    												Partitioning : RANDOM_PARTITIONED
    
    												Stage 0 : Data Sink
    													content : org.apache.flink.api.java.io.DiscardingOutputFormat
    													ship_strategy : Forward
    													exchange_mode : PIPELINED
    													Partitioning : RANDOM_PARTITIONED
    
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---