You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Liang-Chi Hsieh (JIRA)" <ji...@apache.org> on 2018/01/02 01:42:00 UTC
[jira] [Commented] (SPARK-22898) collect_set aggregation on
bucketed table causes an exchange stage
[ https://issues.apache.org/jira/browse/SPARK-22898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16307591#comment-16307591 ]
Liang-Chi Hsieh commented on SPARK-22898:
-----------------------------------------
I think this should already be fixed by SPARK-22223.
I do a test with current master branch:
{code}
val df = {
(0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
}
df.write
.format("parquet")
.bucketBy(8, "j")
.sortBy("j")
.saveAsTable("bucketed_table")
sql("select j, max(k) from bucketed_table group by j").explain
sql("select j, collect_set(k) from bucketed_table group by j").explain
{code}
{code}
SortAggregate(key=[j#4851], functions=[max(k#4852)])
+- SortAggregate(key=[j#4851], functions=[partial_max(k#4852)])
+- *Sort [j#4851 ASC NULLS FIRST], false, 0
+- *FileScan parquet default.bucketed_table[j#4851,k#4852] Batched: true, Format: Parquet, Location: InMemory
FileIndex[file:/root/repos/spark-1/sql/core/spark-warehouse/bucketed_table], PartitionFilters: [], PushedFilters: [
], ReadSchema: struct<j:int,k:string>
== Physical Plan ==
ObjectHashAggregate(keys=[j#4851], functions=[collect_set(k#4852, 0, 0)])
+- ObjectHashAggregate(keys=[j#4851], functions=[partial_collect_set(k#4852, 0, 0)])
+- *FileScan parquet default.bucketed_table[j#4851,k#4852] Batched: true, Format: Parquet, Location: InMemoryFil
eIndex[file:/root/repos/spark-1/sql/core/spark-warehouse/bucketed_table], PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<j:int,k:string>
{code}
> collect_set aggregation on bucketed table causes an exchange stage
> ------------------------------------------------------------------
>
> Key: SPARK-22898
> URL: https://issues.apache.org/jira/browse/SPARK-22898
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.3.0
> Reporter: Modi Tamam
> Labels: bucketing
>
> I'm using Spark-2.2. I'm POCing Spark's bucketing. I've created a bucketed table, here's the desc formatted my_bucketed_tbl output:
> +--------------------+--------------------+-------+
> | col_nam| data_type|comment|
> +--------------------+--------------------+-------+
> | bundle| string| null|
> | ifa| string| null|
> | date_| date| null|
> | hour| int| null|
> | | | |
> |# Detailed Table ...| | |
> | Database| default| |
> | Table| my_bucketed_tbl|
> | Owner| zeppelin| |
> | Created|Thu Dec 21 13:43:...| |
> | Last Access|Thu Jan 01 00:00:...| |
> | Type| EXTERNAL| |
> | Provider| orc| |
> | Num Buckets| 16| |
> | Bucket Columns| [`ifa`]| |
> | Sort Columns| [`ifa`]| |
> | Table Properties|[transient_lastDd...| |
> | Location|hdfs:/user/hive/w...| |
> | Serde Library|org.apache.hadoop...| |
> | InputFormat|org.apache.hadoop...| |
> | OutputFormat|org.apache.hadoop...| |
> | Storage Properties|[serialization.fo...| |
> +--------------------+--------------------+-------+
> When I'm executing an explain of a group by query, I can see that we've spared the exchange phase :
> {code:java}
> sql("select ifa,max(bundle) from my_bucketed_tbl group by ifa").explain
> == Physical Plan ==
> SortAggregate(key=[ifa#932], functions=[max(bundle#920)])
> +- SortAggregate(key=[ifa#932], functions=[partial_max(bundle#920)])
> +- *Sort [ifa#932 ASC NULLS FIRST], false, 0
> +- *FileScan orc default.level_1[bundle#920,ifa#932] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<bundle:string,ifa:string>
> {code}
> But, when I replace Spark's max function with collect_set, I can see that the execution plan is the same as a non-bucketed table, means, the exchange phase is not spared :
> {code:java}
> sql("select ifa,collect_set(bundle) from my_bucketed_tbl group by ifa").explain
> == Physical Plan ==
> ObjectHashAggregate(keys=[ifa#1010], functions=[collect_set(bundle#998, 0, 0)])
> +- Exchange hashpartitioning(ifa#1010, 200)
> +- ObjectHashAggregate(keys=[ifa#1010], functions=[partial_collect_set(bundle#998, 0, 0)])
> +- *FileScan orc default.level_1[bundle#998,ifa#1010] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<bundle:string,ifa:string
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org