You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Kazuaki Ishizaki (JIRA)" <ji...@apache.org> on 2017/03/02 05:30:45 UTC

[jira] [Commented] (SPARK-19468) Dataset slow because of unnecessary shuffles

    [ https://issues.apache.org/jira/browse/SPARK-19468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15891658#comment-15891658 ] 

Kazuaki Ishizaki commented on SPARK-19468:
------------------------------------------

Interesting.

For {{val joined1 = ds1.joinWith(ds2, ds1("_1") === ds2("_1"))}}, I thought that shuffle occurs since a optimized project plan has `Project` (I know this should not be related to Shuffle).

{code}
== Optimized Logical Plan ==
Join Inner, (_1#105._1 = _2#106._1)
:- Project [named_struct(_1, _1#83, _2, _2#84) AS _1#105]
:  +- InMemoryRelation [_1#83, _2#84], true, 10000, StorageLevel(disk, 1 replicas)
:        +- *Sort [_1#83 ASC NULLS FIRST], false, 0
:           +- Exchange hashpartitioning(_1#83, 5)
:              +- LocalTableScan [_1#83, _2#84]
+- Project [named_struct(_1, _1#100, _2, _2#101) AS _2#106]
   +- InMemoryRelation [_1#100, _2#101], true, 10000, StorageLevel(disk, 1 replicas)
         +- *Sort [_1#83 ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(_1#83, 5)
               +- LocalTableScan [_1#83, _2#84]

== Physical Plan ==
*SortMergeJoin [_1#105._1], [_2#106._1], Inner
:- *Sort [_1#105._1 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(_1#105._1, 5)
:     +- *Project [named_struct(_1, _1#83, _2, _2#84) AS _1#105]
:        +- InMemoryTableScan [_1#83, _2#84]
:              +- InMemoryRelation [_1#83, _2#84], true, 10000, StorageLevel(disk, 1 replicas)
:                    +- *Sort [_1#83 ASC NULLS FIRST], false, 0
:                       +- Exchange hashpartitioning(_1#83, 5)
:                          +- LocalTableScan [_1#83, _2#84]
+- *Sort [_2#106._1 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(_2#106._1, 5)
      +- *Project [named_struct(_1, _1#100, _2, _2#101) AS _2#106]
         +- InMemoryTableScan [_1#100, _2#101]
               +- InMemoryRelation [_1#100, _2#101], true, 10000, StorageLevel(disk, 1 replicas)
                     +- *Sort [_1#83 ASC NULLS FIRST], false, 0
                        +- Exchange hashpartitioning(_1#83, 5)
                           +- LocalTableScan [_1#83, _2#84]
{code}

For {{val joined1 = ds1.join(ds2, ds1("_1") === ds2("_1"))}}, I knew shuffle occurs while we do not see `Project` in optimized logical plan.

{code}
== Optimized Logical Plan ==
Join Inner, (_1#83 = _1#100)
:- InMemoryRelation [_1#83, _2#84], true, 10000, StorageLevel(disk, 1 replicas)
:     +- *Sort [_1#83 ASC NULLS FIRST], false, 0
:        +- Exchange hashpartitioning(_1#83, 5)
:           +- LocalTableScan [_1#83, _2#84]
+- InMemoryRelation [_1#100, _2#101], true, 10000, StorageLevel(disk, 1 replicas)
      +- *Sort [_1#83 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(_1#83, 5)
            +- LocalTableScan [_1#83, _2#84]

== Physical Plan ==
*SortMergeJoin [_1#83], [_1#100], Inner
:- InMemoryTableScan [_1#83, _2#84]
:     +- InMemoryRelation [_1#83, _2#84], true, 10000, StorageLevel(disk, 1 replicas)
:           +- *Sort [_1#83 ASC NULLS FIRST], false, 0
:              +- Exchange hashpartitioning(_1#83, 5)
:                 +- LocalTableScan [_1#83, _2#84]
+- *Sort [_1#100 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(_1#100, 5)
      +- InMemoryTableScan [_1#100, _2#101]
            +- InMemoryRelation [_1#100, _2#101], true, 10000, StorageLevel(disk, 1 replicas)
                  +- *Sort [_1#83 ASC NULLS FIRST], false, 0
                     +- Exchange hashpartitioning(_1#83, 5)
                        +- LocalTableScan [_1#83, _2#84]
{code}

In summary, I do not know why this unnecessary shuffle is inserted.


> Dataset slow because of unnecessary shuffles
> --------------------------------------------
>
>                 Key: SPARK-19468
>                 URL: https://issues.apache.org/jira/browse/SPARK-19468
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.0
>            Reporter: koert kuipers
>
> we noticed that some algos we ported from rdd to dataset are significantly slower, and the main reason seems to be more shuffles that we successfully avoid for rdds by careful partitioning. this seems to be dataset specific as it works ok for dataframe.
> see also here:
> http://blog.hydronitrogen.com/2016/05/13/shuffle-free-joins-in-spark-sql/
> it kind of boils down to this... if i partition and sort dataframes that get used for joins repeatedly i can avoid shuffles:
> {noformat}
> System.setProperty("spark.sql.autoBroadcastJoinThreshold", "-1")
> val df1 = Seq((0, 0), (1, 1)).toDF("key", "value")
>   .repartition(col("key")).sortWithinPartitions(col("key")).persist(StorageLevel.DISK_ONLY)
> val df2 = Seq((0, 0), (1, 1)).toDF("key2", "value2")
>   .repartition(col("key2")).sortWithinPartitions(col("key2")).persist(StorageLevel.DISK_ONLY)
> val joined = df1.join(df2, col("key") === col("key2"))
> joined.explain
> == Physical Plan ==
> *SortMergeJoin [key#5], [key2#27], Inner
> :- InMemoryTableScan [key#5, value#6]
> :     +- InMemoryRelation [key#5, value#6], true, 10000, StorageLevel(disk, 1 replicas)
> :           +- *Sort [key#5 ASC NULLS FIRST], false, 0
> :              +- Exchange hashpartitioning(key#5, 4)
> :                 +- LocalTableScan [key#5, value#6]
> +- InMemoryTableScan [key2#27, value2#28]
>       +- InMemoryRelation [key2#27, value2#28], true, 10000, StorageLevel(disk, 1 replicas)
>             +- *Sort [key2#27 ASC NULLS FIRST], false, 0
>                +- Exchange hashpartitioning(key2#27, 4)
>                   +- LocalTableScan [key2#27, value2#28]
> {noformat}
> notice how the persisted dataframes are not shuffled or sorted anymore before being used in the join. however if i try to do the same with dataset i have no luck:
> {noformat}
> val ds1 = Seq((0, 0), (1, 1)).toDS
>   .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
> val ds2 = Seq((0, 0), (1, 1)).toDS
>   .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
> val joined1 = ds1.joinWith(ds2, ds1("_1") === ds2("_1"))
> joined1.explain
> == Physical Plan ==
> *SortMergeJoin [_1#105._1], [_2#106._1], Inner
> :- *Sort [_1#105._1 ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(_1#105._1, 4)
> :     +- *Project [named_struct(_1, _1#83, _2, _2#84) AS _1#105]
> :        +- InMemoryTableScan [_1#83, _2#84]
> :              +- InMemoryRelation [_1#83, _2#84], true, 10000, StorageLevel(disk, 1 replicas)
> :                    +- *Sort [_1#83 ASC NULLS FIRST], false, 0
> :                       +- Exchange hashpartitioning(_1#83, 4)
> :                          +- LocalTableScan [_1#83, _2#84]
> +- *Sort [_2#106._1 ASC NULLS FIRST], false, 0
>    +- Exchange hashpartitioning(_2#106._1, 4)
>       +- *Project [named_struct(_1, _1#100, _2, _2#101) AS _2#106]
>          +- InMemoryTableScan [_1#100, _2#101]
>                +- InMemoryRelation [_1#100, _2#101], true, 10000, StorageLevel(disk, 1 replicas)
>                      +- *Sort [_1#83 ASC NULLS FIRST], false, 0
>                         +- Exchange hashpartitioning(_1#83, 4)
>                            +- LocalTableScan [_1#83, _2#84]
> {noformat}
> notice how my persisted Datasets are shuffled and sorted again. part of the issue seems to be in joinWith, which does some preprocessing that seems to confuse the planner. if i change the joinWith to join (which returns a dataframe) it looks a little better in that only one side gets shuffled again, but still not optimal:
> {noformat}
> val ds1 = Seq((0, 0), (1, 1)).toDS
>   .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
> val ds2 = Seq((0, 0), (1, 1)).toDS
>   .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
> val joined1 = ds1.join(ds2, ds1("_1") === ds2("_1"))
> joined1.explain
> == Physical Plan ==
> *SortMergeJoin [_1#83], [_1#100], Inner
> :- InMemoryTableScan [_1#83, _2#84]
> :     +- InMemoryRelation [_1#83, _2#84], true, 10000, StorageLevel(disk, 1 replicas)
> :           +- *Sort [_1#83 ASC NULLS FIRST], false, 0
> :              +- Exchange hashpartitioning(_1#83, 4)
> :                 +- LocalTableScan [_1#83, _2#84]
> +- *Sort [_1#100 ASC NULLS FIRST], false, 0
>    +- Exchange hashpartitioning(_1#100, 4)
>       +- InMemoryTableScan [_1#100, _2#101]
>             +- InMemoryRelation [_1#100, _2#101], true, 10000, StorageLevel(disk, 1 replicas)
>                   +- *Sort [_1#83 ASC NULLS FIRST], false, 0
>                      +- Exchange hashpartitioning(_1#83, 4)
>                         +- LocalTableScan [_1#83, _2#84]
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org