You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2019/10/08 05:43:19 UTC
[jira] [Resolved] (SPARK-23560) Group by on struct field can add
extra shuffle
[ https://issues.apache.org/jira/browse/SPARK-23560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-23560.
----------------------------------
Resolution: Incomplete
> Group by on struct field can add extra shuffle
> ----------------------------------------------
>
> Key: SPARK-23560
> URL: https://issues.apache.org/jira/browse/SPARK-23560
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 2.3.0
> Environment: debian 8.9, macos x high sierra
> Reporter: Bruce Robbins
> Priority: Major
> Labels: bulk-closed
>
> Depending on the size of the input, a joinWith followed by a groupBy requires more shuffles than a join followed by a groupBy.
> For example, here's a joinWith on two CSV files, followed by a groupBy:
> {noformat}
> import org.apache.spark.sql.types._
> val schema = StructType(StructField("id1", LongType) :: StructField("id2", LongType) :: Nil)
> val df1 = spark.read.schema(schema).csv("ds1.csv")
> val df2 = spark.read.schema(schema).csv("ds2.csv")
> val result1 = df1.joinWith(df2, df1.col("id1") === df2.col("id2")).groupBy("_1.id1").count
> result1.explain
> == Physical Plan ==
> *(6) HashAggregate(keys=[_1#8.id1#19L], functions=[count(1)])
> +- Exchange hashpartitioning(_1#8.id1#19L, 200)
> +- *(5) HashAggregate(keys=[_1#8.id1 AS _1#8.id1#19L], functions=[partial_count(1)])
> +- *(5) Project [_1#8]
> +- *(5) SortMergeJoin [_1#8.id1], [_2#9.id2], Inner
> :- *(2) Sort [_1#8.id1 ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(_1#8.id1, 200)
> : +- *(1) Project [named_struct(id1, id1#0L, id2, id2#1L) AS _1#8]
> : +- *(1) FileScan csv [id1#0L,id2#1L] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:.../ds1.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id1:bigint,id2:bigint>
> +- *(4) Sort [_2#9.id2 ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(_2#9.id2, 200)
> +- *(3) Project [named_struct(id1, id1#4L, id2, id2#5L) AS _2#9]
> +- *(3) FileScan csv [id1#4L,id2#5L] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:...ds2.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id1:bigint,id2:bigint>
> {noformat}
> Using join, there is one less shuffle:
> {noformat}
> val result2 = df1.join(df2, df1.col("id1") === df2.col("id2")).groupBy(df1("id1")).count
> result2.explain
> == Physical Plan ==
> *(5) HashAggregate(keys=[id1#0L], functions=[count(1)])
> +- *(5) HashAggregate(keys=[id1#0L], functions=[partial_count(1)])
> +- *(5) Project [id1#0L]
> +- *(5) SortMergeJoin [id1#0L], [id2#5L], Inner
> :- *(2) Sort [id1#0L ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(id1#0L, 200)
> : +- *(1) Project [id1#0L]
> : +- *(1) Filter isnotnull(id1#0L)
> : +- *(1) FileScan csv [id1#0L] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:.../ds1.csv], PartitionFilters: [], PushedFilters: [IsNotNull(id1)], ReadSchema: struct<id1:bigint>
> +- *(4) Sort [id2#5L ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(id2#5L, 200)
> +- *(3) Project [id2#5L]
> +- *(3) Filter isnotnull(id2#5L)
> +- *(3) FileScan csv [id2#5L] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:...ds2.csv], PartitionFilters: [], PushedFilters: [IsNotNull(id2)], ReadSchema: struct<id2:bigint>
> {noformat}
> T-he extra exchange is reflected in the run time of the query.- Actually, I recant this bit. In my particular tests, the extra exchange has negligible impact on run time. All the difference is in stage 2.
> My tests were on inputs with more than 2 million records.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org