You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yuming Wang (Jira)" <ji...@apache.org> on 2022/10/15 02:23:00 UTC

[jira] [Resolved] (SPARK-40703) Performance regression for joins in Spark 3.3 vs Spark 3.2

     [ https://issues.apache.org/jira/browse/SPARK-40703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yuming Wang resolved SPARK-40703.
---------------------------------
    Fix Version/s: 3.3.1
                   3.4.0
       Resolution: Fixed

Issue resolved by pull request 38196
[https://github.com/apache/spark/pull/38196]

> Performance regression for joins in Spark 3.3 vs Spark 3.2
> ----------------------------------------------------------
>
>                 Key: SPARK-40703
>                 URL: https://issues.apache.org/jira/browse/SPARK-40703
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.3.0
>            Reporter: Bryan Keller
>            Assignee: Chao Sun
>            Priority: Major
>             Fix For: 3.3.1, 3.4.0
>
>         Attachments: spark32-plan.txt, spark33-plan.txt, test.py
>
>
> When running the TPC-DS benchmarks using a DSv2 datasource in Spark 3.3, a performance regression vs Spark 3.2 was discovered. More specifically, it appears as if {_}EnsureRequirements.ensureDistributionAndOrdering{_}() no longer enforces a minimum number of partitions for a join distribution in some cases. This impacts DSv2 datasources, because if a scan has only a single read partition {_}DataSourceV2ScanExecBase.outputPartitioning{_}() returns a _SinglePartition_ instance. The _SinglePartition_ creates a {_}SinglePartitionShuffleSpec{_}, and {_}SinglePartitionShuffleSpec.canCreatePartitioning{_}() returns true.
> Because {_}canCreatePartitioning{_}() returns true in this case, {_}EnsureRequirements.ensureDistributionAndOrdering{_}() won't enforce minimum parallelism and also will favor the single partition when considering the best distribution candidate. Ultimately this results in a single partition being selected for the join distribution, even if the other side of the join is a large table with many partitions. This can seriously impact performance of the join.
> Spark 3.2 enforces minimum parallelism differently in {_}ensureDistributionAndOrdering{_}() and thus does not suffer from this issue. It will shuffle both sides of the join to enforce parallelism.
> In the TPC-DS benchmark, some queries affected include 14a and 14b. This can also be demonstrated using a simple query, for example:
> {{select ics.i_item_sk from catalog_sales cs join item ics on cs.cs_item_sk = ics.i_item_sk}}
> ...where _item_ is a small table that is read into one partition, and _catalog_sales_ is a large table. These tables are part of the TPC-DS but you can create your own. Also, to demonstrate the issue, you may need to turn off broadcast joins though that is not required for this issue to occur, it happens when running the TPC-DS with broadcast setting at default.
> Attached is the plan for this query in Spark 3.2 and in Spark 3.3. The plan shows how in Spark 3.2, the join parallelism of 200 is reached by inserting an exchange after the item table scan. In Spark 3.3, no such exchange is inserted and the join parallelism is 1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org