You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/02/06 08:03:42 UTC
[jira] [Commented] (FLINK-3475) DISTINCT aggregate function support
for SQL queries
[ https://issues.apache.org/jira/browse/FLINK-3475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853647#comment-15853647 ]
ASF GitHub Bot commented on FLINK-3475:
---------------------------------------
Github user docete commented on the issue:
https://github.com/apache/flink/pull/3111
@fhueske Yes, I have checked the execution plan. It's very similar to your description:
Take example for SQL "select sum(distinct a), sum(distinct b), sum(c) from expr", where expr is a table, and it has 3 fields: a, b, c.
The explaination for the query is:
```
== Abstract Syntax Tree ==
LogicalAggregate(group=[{}], EXPR$0=[SUM(DISTINCT $0)], EXPR$1=[SUM(DISTINCT $1)], EXPR$2=[SUM($2)])
LogicalTableScan(table=[[expr]])
== Optimized Logical Plan ==
DataSetCalc(select=[EXPR$0, EXPR$1, EXPR$2])
DataSetSingleRowJoin(where=[true], join=[EXPR$2, EXPR$0, EXPR$1], joinType=[NestedLoopJoin])
DataSetSingleRowJoin(where=[true], join=[EXPR$2, EXPR$0], joinType=[NestedLoopJoin])
DataSetAggregate(select=[SUM(c) AS EXPR$2])
DataSetUnion(union=[a, b, c])
DataSetValues(tuples=[[{ null, null, null }]], values=[a, b, c])
DataSetScan(table=[[_DataSetTable_0]])
DataSetAggregate(select=[SUM(a) AS EXPR$0])
DataSetUnion(union=[a])
DataSetValues(tuples=[[{ null }]], values=[a])
DataSetAggregate(groupBy=[a], select=[a])
DataSetCalc(select=[a])
DataSetScan(table=[[_DataSetTable_0]])
DataSetAggregate(select=[SUM(b) AS EXPR$1])
DataSetUnion(union=[b])
DataSetValues(tuples=[[{ null }]], values=[b])
DataSetAggregate(groupBy=[b], select=[b])
DataSetCalc(select=[b])
DataSetScan(table=[[_DataSetTable_0]])
== Physical Execution Plan ==
Stage 8 : Data Source
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED
Stage 14 : Data Source
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED
Stage 13 : Map
content : from: (a, b, c)
ship_strategy : Forward
exchange_mode : BATCH
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 12 : FlatMap
content : select: (a)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : FlatMap
Partitioning : RANDOM_PARTITIONED
Stage 11 : Map
content : prepare select: (a)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 10 : GroupCombine
content : groupBy: (a), select: (a)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Sorted Combine
Partitioning : RANDOM_PARTITIONED
Stage 9 : GroupReduce
content : groupBy: (a), select: (a)
ship_strategy : Hash Partition on [0]
exchange_mode : PIPELINED
driver_strategy : Sorted Group Reduce
Partitioning : RANDOM_PARTITIONED
Stage 7 : Union
content :
ship_strategy : Redistribute
exchange_mode : PIPELINED
Partitioning : RANDOM_PARTITIONED
Stage 6 : Map
content : prepare select: (SUM(a) AS EXPR$0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 5 : GroupCombine
content : select:(SUM(a) AS EXPR$0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Group Reduce All
Partitioning : RANDOM_PARTITIONED
Stage 4 : GroupReduce
content : select:(SUM(a) AS EXPR$0)
ship_strategy : Redistribute
exchange_mode : PIPELINED
driver_strategy : Group Reduce All
Partitioning : RANDOM_PARTITIONED
Stage 19 : Data Source
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED
Stage 20 : Map
content : from: (a, b, c)
ship_strategy : Forward
exchange_mode : BATCH
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 18 : Union
content :
ship_strategy : Redistribute
exchange_mode : PIPELINED
Partitioning : RANDOM_PARTITIONED
Stage 17 : Map
content : prepare select: (SUM(c) AS EXPR$2)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 16 : GroupCombine
content : select:(SUM(c) AS EXPR$2)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Group Reduce All
Partitioning : RANDOM_PARTITIONED
Stage 15 : GroupReduce
content : select:(SUM(c) AS EXPR$2)
ship_strategy : Redistribute
exchange_mode : PIPELINED
driver_strategy : Group Reduce All
Partitioning : RANDOM_PARTITIONED
Stage 3 : FlatMap
content : where: (true), join: (EXPR$2, EXPR$0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : FlatMap
Partitioning : RANDOM_PARTITIONED
Stage 25 : Data Source
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED
Stage 30 : Map
content : from: (a, b, c)
ship_strategy : Forward
exchange_mode : BATCH
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 29 : FlatMap
content : select: (b)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : FlatMap
Partitioning : RANDOM_PARTITIONED
Stage 28 : Map
content : prepare select: (b)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 27 : GroupCombine
content : groupBy: (b), select: (b)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Sorted Combine
Partitioning : RANDOM_PARTITIONED
Stage 26 : GroupReduce
content : groupBy: (b), select: (b)
ship_strategy : Hash Partition on [0]
exchange_mode : PIPELINED
driver_strategy : Sorted Group Reduce
Partitioning : RANDOM_PARTITIONED
Stage 24 : Union
content :
ship_strategy : Redistribute
exchange_mode : PIPELINED
Partitioning : RANDOM_PARTITIONED
Stage 23 : Map
content : prepare select: (SUM(b) AS EXPR$1)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 22 : GroupCombine
content : select:(SUM(b) AS EXPR$1)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Group Reduce All
Partitioning : RANDOM_PARTITIONED
Stage 21 : GroupReduce
content : select:(SUM(b) AS EXPR$1)
ship_strategy : Redistribute
exchange_mode : PIPELINED
driver_strategy : Group Reduce All
Partitioning : RANDOM_PARTITIONED
Stage 2 : FlatMap
content : where: (true), join: (EXPR$2, EXPR$0, EXPR$1)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : FlatMap
Partitioning : RANDOM_PARTITIONED
Stage 1 : FlatMap
content : select: (EXPR$0, EXPR$1, EXPR$2)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : FlatMap
Partitioning : RANDOM_PARTITIONED
Stage 0 : Data Sink
content : org.apache.flink.api.java.io.DiscardingOutputFormat
ship_strategy : Forward
exchange_mode : PIPELINED
Partitioning : RANDOM_PARTITIONED
```
> DISTINCT aggregate function support for SQL queries
> ---------------------------------------------------
>
> Key: FLINK-3475
> URL: https://issues.apache.org/jira/browse/FLINK-3475
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Chengxiang Li
> Assignee: Zhenghua Gao
>
> DISTINCT aggregate function may be able to reuse the aggregate function instead of separate implementation, and let Flink runtime take care of duplicate records.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)