You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Liang-Chi Hsieh (JIRA)" <ji...@apache.org> on 2018/01/02 01:46:02 UTC

[jira] [Comment Edited] (SPARK-22898) collect_set aggregation on bucketed table causes an exchange stage

    [ https://issues.apache.org/jira/browse/SPARK-22898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16307591#comment-16307591 ] 

Liang-Chi Hsieh edited comment on SPARK-22898 at 1/2/18 1:46 AM:
-----------------------------------------------------------------

I think this should already be fixed by SPARK-22223.

I do a test with current master branch:

{code}
    val df = {
      (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
    }
    df.write
      .format("parquet")
      .bucketBy(8, "j")
      .sortBy("j")
      .saveAsTable("bucketed_table")
    sql("select j, max(k) from bucketed_table group by j").explain
    sql("select j, collect_set(k) from bucketed_table group by j").explain  
{code}

{code}
== Physical Plan ==
SortAggregate(key=[j#4851], functions=[max(k#4852)])
+- SortAggregate(key=[j#4851], functions=[partial_max(k#4852)])
   +- *Sort [j#4851 ASC NULLS FIRST], false, 0
      +- *FileScan parquet default.bucketed_table[j#4851,k#4852] Batched: true, Format: Parquet, Location: InMemory
FileIndex[file:/root/repos/spark-1/sql/core/spark-warehouse/bucketed_table], PartitionFilters: [], PushedFilters: [
], ReadSchema: struct<j:int,k:string>

== Physical Plan ==
ObjectHashAggregate(keys=[j#4851], functions=[collect_set(k#4852, 0, 0)])
+- ObjectHashAggregate(keys=[j#4851], functions=[partial_collect_set(k#4852, 0, 0)])
   +- *FileScan parquet default.bucketed_table[j#4851,k#4852] Batched: true, Format: Parquet, Location: InMemoryFil
eIndex[file:/root/repos/spark-1/sql/core/spark-warehouse/bucketed_table], PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<j:int,k:string>
{code}




was (Author: viirya):
I think this should already be fixed by SPARK-22223.

I do a test with current master branch:

{code}
    val df = {
      (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
    }
    df.write
      .format("parquet")
      .bucketBy(8, "j")
      .sortBy("j")
      .saveAsTable("bucketed_table")
    sql("select j, max(k) from bucketed_table group by j").explain
    sql("select j, collect_set(k) from bucketed_table group by j").explain  
{code}

{code}
SortAggregate(key=[j#4851], functions=[max(k#4852)])
+- SortAggregate(key=[j#4851], functions=[partial_max(k#4852)])
   +- *Sort [j#4851 ASC NULLS FIRST], false, 0
      +- *FileScan parquet default.bucketed_table[j#4851,k#4852] Batched: true, Format: Parquet, Location: InMemory
FileIndex[file:/root/repos/spark-1/sql/core/spark-warehouse/bucketed_table], PartitionFilters: [], PushedFilters: [
], ReadSchema: struct<j:int,k:string>
== Physical Plan ==
ObjectHashAggregate(keys=[j#4851], functions=[collect_set(k#4852, 0, 0)])
+- ObjectHashAggregate(keys=[j#4851], functions=[partial_collect_set(k#4852, 0, 0)])
   +- *FileScan parquet default.bucketed_table[j#4851,k#4852] Batched: true, Format: Parquet, Location: InMemoryFil
eIndex[file:/root/repos/spark-1/sql/core/spark-warehouse/bucketed_table], PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<j:int,k:string>
{code}



> collect_set aggregation on bucketed table causes an exchange stage
> ------------------------------------------------------------------
>
>                 Key: SPARK-22898
>                 URL: https://issues.apache.org/jira/browse/SPARK-22898
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.3.0
>            Reporter: Modi Tamam
>              Labels: bucketing
>
> I'm using Spark-2.2. I'm POCing Spark's bucketing. I've created a bucketed table, here's the desc formatted my_bucketed_tbl output:
>     +--------------------+--------------------+-------+
>     |            col_nam|         data_type|comment|
>     +--------------------+--------------------+-------+
>     |              bundle|                 string|   null|
>     |                     ifa|                     string|   null|
>     |               date_|                date|   null|
>     |                hour|                 int|   null|
>     |                    |                    |       |
>     |# Detailed Table ...|                    |       |
>     |            Database|             default|       |
>     |               Table|             my_bucketed_tbl|
>     |               Owner|            zeppelin|       |
>     |             Created|Thu Dec 21 13:43:...|       |
>     |         Last Access|Thu Jan 01 00:00:...|       |
>     |                Type|            EXTERNAL|       |
>     |            Provider|                 orc|       |
>     |         Num Buckets|                  16|       |
>     |      Bucket Columns|             [`ifa`]|       |
>     |        Sort Columns|             [`ifa`]|       |
>     |    Table Properties|[transient_lastDd...|       |
>     |            Location|hdfs:/user/hive/w...|       |
>     |       Serde Library|org.apache.hadoop...|       |
>     |         InputFormat|org.apache.hadoop...|       |
>     |        OutputFormat|org.apache.hadoop...|       |
>     |  Storage Properties|[serialization.fo...|       |
>     +--------------------+--------------------+-------+
> When I'm executing an explain of a group by query, I can see that we've spared the exchange phase :
> {code:java}
> sql("select ifa,max(bundle) from my_bucketed_tbl group by ifa").explain
> == Physical Plan ==
> SortAggregate(key=[ifa#932], functions=[max(bundle#920)])
> +- SortAggregate(key=[ifa#932], functions=[partial_max(bundle#920)])
>    +- *Sort [ifa#932 ASC NULLS FIRST], false, 0
>       +- *FileScan orc default.level_1[bundle#920,ifa#932] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<bundle:string,ifa:string>
> {code}
> But, when I replace Spark's max function with collect_set, I can see that the execution plan is the same as a non-bucketed table, means, the exchange phase is not spared :
> {code:java}
> sql("select ifa,collect_set(bundle) from my_bucketed_tbl group by ifa").explain
> == Physical Plan ==
> ObjectHashAggregate(keys=[ifa#1010], functions=[collect_set(bundle#998, 0, 0)])
> +- Exchange hashpartitioning(ifa#1010, 200)
>    +- ObjectHashAggregate(keys=[ifa#1010], functions=[partial_collect_set(bundle#998, 0, 0)])
>       +- *FileScan orc default.level_1[bundle#998,ifa#1010] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<bundle:string,ifa:string
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org