You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by docete <gi...@git.apache.org> on 2017/02/06 08:03:42 UTC
[GitHub] flink issue #3111: [FLINK-3475] [Table] DISTINCT aggregate function support ...
Github user docete commented on the issue:
https://github.com/apache/flink/pull/3111
@fhueske Yes, I have checked the execution plan. It's very similar to your description:
Take example for SQL "select sum(distinct a), sum(distinct b), sum(c) from expr", where expr is a table, and it has 3 fields: a, b, c.
The explaination for the query is:
```
== Abstract Syntax Tree ==
LogicalAggregate(group=[{}], EXPR$0=[SUM(DISTINCT $0)], EXPR$1=[SUM(DISTINCT $1)], EXPR$2=[SUM($2)])
LogicalTableScan(table=[[expr]])
== Optimized Logical Plan ==
DataSetCalc(select=[EXPR$0, EXPR$1, EXPR$2])
DataSetSingleRowJoin(where=[true], join=[EXPR$2, EXPR$0, EXPR$1], joinType=[NestedLoopJoin])
DataSetSingleRowJoin(where=[true], join=[EXPR$2, EXPR$0], joinType=[NestedLoopJoin])
DataSetAggregate(select=[SUM(c) AS EXPR$2])
DataSetUnion(union=[a, b, c])
DataSetValues(tuples=[[{ null, null, null }]], values=[a, b, c])
DataSetScan(table=[[_DataSetTable_0]])
DataSetAggregate(select=[SUM(a) AS EXPR$0])
DataSetUnion(union=[a])
DataSetValues(tuples=[[{ null }]], values=[a])
DataSetAggregate(groupBy=[a], select=[a])
DataSetCalc(select=[a])
DataSetScan(table=[[_DataSetTable_0]])
DataSetAggregate(select=[SUM(b) AS EXPR$1])
DataSetUnion(union=[b])
DataSetValues(tuples=[[{ null }]], values=[b])
DataSetAggregate(groupBy=[b], select=[b])
DataSetCalc(select=[b])
DataSetScan(table=[[_DataSetTable_0]])
== Physical Execution Plan ==
Stage 8 : Data Source
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED
Stage 14 : Data Source
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED
Stage 13 : Map
content : from: (a, b, c)
ship_strategy : Forward
exchange_mode : BATCH
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 12 : FlatMap
content : select: (a)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : FlatMap
Partitioning : RANDOM_PARTITIONED
Stage 11 : Map
content : prepare select: (a)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 10 : GroupCombine
content : groupBy: (a), select: (a)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Sorted Combine
Partitioning : RANDOM_PARTITIONED
Stage 9 : GroupReduce
content : groupBy: (a), select: (a)
ship_strategy : Hash Partition on [0]
exchange_mode : PIPELINED
driver_strategy : Sorted Group Reduce
Partitioning : RANDOM_PARTITIONED
Stage 7 : Union
content :
ship_strategy : Redistribute
exchange_mode : PIPELINED
Partitioning : RANDOM_PARTITIONED
Stage 6 : Map
content : prepare select: (SUM(a) AS EXPR$0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 5 : GroupCombine
content : select:(SUM(a) AS EXPR$0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Group Reduce All
Partitioning : RANDOM_PARTITIONED
Stage 4 : GroupReduce
content : select:(SUM(a) AS EXPR$0)
ship_strategy : Redistribute
exchange_mode : PIPELINED
driver_strategy : Group Reduce All
Partitioning : RANDOM_PARTITIONED
Stage 19 : Data Source
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED
Stage 20 : Map
content : from: (a, b, c)
ship_strategy : Forward
exchange_mode : BATCH
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 18 : Union
content :
ship_strategy : Redistribute
exchange_mode : PIPELINED
Partitioning : RANDOM_PARTITIONED
Stage 17 : Map
content : prepare select: (SUM(c) AS EXPR$2)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 16 : GroupCombine
content : select:(SUM(c) AS EXPR$2)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Group Reduce All
Partitioning : RANDOM_PARTITIONED
Stage 15 : GroupReduce
content : select:(SUM(c) AS EXPR$2)
ship_strategy : Redistribute
exchange_mode : PIPELINED
driver_strategy : Group Reduce All
Partitioning : RANDOM_PARTITIONED
Stage 3 : FlatMap
content : where: (true), join: (EXPR$2, EXPR$0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : FlatMap
Partitioning : RANDOM_PARTITIONED
Stage 25 : Data Source
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED
Stage 30 : Map
content : from: (a, b, c)
ship_strategy : Forward
exchange_mode : BATCH
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 29 : FlatMap
content : select: (b)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : FlatMap
Partitioning : RANDOM_PARTITIONED
Stage 28 : Map
content : prepare select: (b)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 27 : GroupCombine
content : groupBy: (b), select: (b)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Sorted Combine
Partitioning : RANDOM_PARTITIONED
Stage 26 : GroupReduce
content : groupBy: (b), select: (b)
ship_strategy : Hash Partition on [0]
exchange_mode : PIPELINED
driver_strategy : Sorted Group Reduce
Partitioning : RANDOM_PARTITIONED
Stage 24 : Union
content :
ship_strategy : Redistribute
exchange_mode : PIPELINED
Partitioning : RANDOM_PARTITIONED
Stage 23 : Map
content : prepare select: (SUM(b) AS EXPR$1)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 22 : GroupCombine
content : select:(SUM(b) AS EXPR$1)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Group Reduce All
Partitioning : RANDOM_PARTITIONED
Stage 21 : GroupReduce
content : select:(SUM(b) AS EXPR$1)
ship_strategy : Redistribute
exchange_mode : PIPELINED
driver_strategy : Group Reduce All
Partitioning : RANDOM_PARTITIONED
Stage 2 : FlatMap
content : where: (true), join: (EXPR$2, EXPR$0, EXPR$1)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : FlatMap
Partitioning : RANDOM_PARTITIONED
Stage 1 : FlatMap
content : select: (EXPR$0, EXPR$1, EXPR$2)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : FlatMap
Partitioning : RANDOM_PARTITIONED
Stage 0 : Data Sink
content : org.apache.flink.api.java.io.DiscardingOutputFormat
ship_strategy : Forward
exchange_mode : PIPELINED
Partitioning : RANDOM_PARTITIONED
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---