You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Wenchen Fan (Jira)" <ji...@apache.org> on 2019/11/15 07:53:00 UTC

[jira] [Assigned] (SPARK-29655) Enable adaptive execution should not add more ShuffleExchange

     [ https://issues.apache.org/jira/browse/SPARK-29655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Wenchen Fan reassigned SPARK-29655:
-----------------------------------

    Assignee: Yuming Wang

> Enable adaptive execution should not add more ShuffleExchange
> -------------------------------------------------------------
>
>                 Key: SPARK-29655
>                 URL: https://issues.apache.org/jira/browse/SPARK-29655
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Yuming Wang
>            Assignee: Yuming Wang
>            Priority: Major
>
> Enable adaptive execution should not add more ShuffleExchange. How to reproduce:
> {code:scala}
> import org.apache.spark.sql.SaveMode
> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
> spark.conf.set("spark.sql.shuffle.partitions", 4)
> val bucketedTableName = "bucketed_table"
> spark.range(10).write.bucketBy(4, "id").sortBy("id").mode(SaveMode.Overwrite).saveAsTable(bucketedTableName)
> val bucketedTable = spark.table(bucketedTableName)
> val df = spark.range(4)
> df.join(bucketedTable, "id").explain()
> spark.conf.set("spark.sql.adaptive.enabled", true)
> spark.conf.set("spark.sql.adaptive.shuffle.maxNumPostShufflePartitions", 5)
> df.join(bucketedTable, "id").explain()
> {code}
> Output:
> {noformat}
> == Physical Plan ==
> AdaptiveSparkPlan(isFinalPlan=false)
> +- Project [id#5L]
>    +- SortMergeJoin [id#5L], [id#3L], Inner
>       :- Sort [id#5L ASC NULLS FIRST], false, 0
>       :  +- Exchange hashpartitioning(id#5L, 5), true, [id=#92]
>       :     +- Range (0, 4, step=1, splits=16)
>       +- Sort [id#3L ASC NULLS FIRST], false, 0
>          +- Exchange hashpartitioning(id#3L, 5), true, [id=#93]
>             +- Project [id#3L]
>                +- Filter isnotnull(id#3L)
>                   +- FileScan parquet default.bucketed_table[id#3L] Batched: true, DataFilters: [isnotnull(id#3L)], Format: Parquet, Location: InMemoryFileIndex[file:/root/spark-3.0.0-preview-bin-hadoop3.2/spark-warehouse/bucketed_table], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 4 out of 4
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org