You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2019/10/08 05:43:19 UTC

[jira] [Resolved] (SPARK-23560) Group by on struct field can add extra shuffle

     [ https://issues.apache.org/jira/browse/SPARK-23560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-23560.
----------------------------------
    Resolution: Incomplete

> Group by on struct field can add extra shuffle
> ----------------------------------------------
>
>                 Key: SPARK-23560
>                 URL: https://issues.apache.org/jira/browse/SPARK-23560
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.3.0
>         Environment: debian 8.9, macos x high sierra
>            Reporter: Bruce Robbins
>            Priority: Major
>              Labels: bulk-closed
>
> Depending on the size of the input, a joinWith followed by a groupBy requires more shuffles than a join followed by a groupBy.
> For example, here's a joinWith on two CSV files, followed by a groupBy:
> {noformat}
> import org.apache.spark.sql.types._
> val schema = StructType(StructField("id1", LongType) :: StructField("id2", LongType) :: Nil)
> val df1 = spark.read.schema(schema).csv("ds1.csv")
> val df2 = spark.read.schema(schema).csv("ds2.csv")
> val result1 = df1.joinWith(df2, df1.col("id1") === df2.col("id2")).groupBy("_1.id1").count
> result1.explain
> == Physical Plan ==
> *(6) HashAggregate(keys=[_1#8.id1#19L], functions=[count(1)])
> +- Exchange hashpartitioning(_1#8.id1#19L, 200)
>    +- *(5) HashAggregate(keys=[_1#8.id1 AS _1#8.id1#19L], functions=[partial_count(1)])
>       +- *(5) Project [_1#8]
>          +- *(5) SortMergeJoin [_1#8.id1], [_2#9.id2], Inner
>             :- *(2) Sort [_1#8.id1 ASC NULLS FIRST], false, 0
>             :  +- Exchange hashpartitioning(_1#8.id1, 200)
>             :     +- *(1) Project [named_struct(id1, id1#0L, id2, id2#1L) AS _1#8]
>             :        +- *(1) FileScan csv [id1#0L,id2#1L] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:.../ds1.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id1:bigint,id2:bigint>
>             +- *(4) Sort [_2#9.id2 ASC NULLS FIRST], false, 0
>                +- Exchange hashpartitioning(_2#9.id2, 200)
>                   +- *(3) Project [named_struct(id1, id1#4L, id2, id2#5L) AS _2#9]
>                      +- *(3) FileScan csv [id1#4L,id2#5L] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:...ds2.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id1:bigint,id2:bigint>
> {noformat}
> Using join, there is one less shuffle:
> {noformat}
> val result2 = df1.join(df2,  df1.col("id1") === df2.col("id2")).groupBy(df1("id1")).count
> result2.explain
> == Physical Plan ==
> *(5) HashAggregate(keys=[id1#0L], functions=[count(1)])
> +- *(5) HashAggregate(keys=[id1#0L], functions=[partial_count(1)])
>    +- *(5) Project [id1#0L]
>       +- *(5) SortMergeJoin [id1#0L], [id2#5L], Inner
>          :- *(2) Sort [id1#0L ASC NULLS FIRST], false, 0
>          :  +- Exchange hashpartitioning(id1#0L, 200)
>          :     +- *(1) Project [id1#0L]
>          :        +- *(1) Filter isnotnull(id1#0L)
>          :           +- *(1) FileScan csv [id1#0L] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:.../ds1.csv], PartitionFilters: [], PushedFilters: [IsNotNull(id1)], ReadSchema: struct<id1:bigint>
>          +- *(4) Sort [id2#5L ASC NULLS FIRST], false, 0
>             +- Exchange hashpartitioning(id2#5L, 200)
>                +- *(3) Project [id2#5L]
>                   +- *(3) Filter isnotnull(id2#5L)
>                      +- *(3) FileScan csv [id2#5L] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:...ds2.csv], PartitionFilters: [], PushedFilters: [IsNotNull(id2)], ReadSchema: struct<id2:bigint>
> {noformat}
> T-he extra exchange is reflected in the run time of the query.- Actually, I recant this bit. In my particular tests, the extra exchange has negligible impact on run time. All the difference is in stage 2.
> My tests were on inputs with more than 2 million records.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org