You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2023/01/06 12:34:00 UTC

[jira] [Assigned] (SPARK-41914) Sorting issue with partitioned-writing and planned write optimization disabled

     [ https://issues.apache.org/jira/browse/SPARK-41914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-41914:
------------------------------------

    Assignee:     (was: Apache Spark)

> Sorting issue with partitioned-writing and planned write optimization disabled
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-41914
>                 URL: https://issues.apache.org/jira/browse/SPARK-41914
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.4.0
>            Reporter: Enrico Minack
>            Priority: Major
>
> Spark 3.4.0 introduced option {{{}spark.sql.optimizer.plannedWrite.enabled{}}}, which is enabled by default. When disabled, partitioned writing loses in-partition order when spilling occurs.
> This is related to SPARK-40885 where setting option {{spark.sql.optimizer.plannedWrite.enabled}} to {{true}} will remove the existing sort (for {{day}} and {{{}id{}}}) entirely.
> Run this with 512m memory and one executor, e.g.:
> {code}
> spark-shell --driver-memory 512m --master "local[1]"
> {code}
> {code:scala}
> import org.apache.spark.sql.SaveMode
> spark.conf.set("spark.sql.optimizer.plannedWrite.enabled", false)
> val ids = 2000000
> val days = 2
> val parts = 2
> val ds = spark.range(0, days, 1, parts).withColumnRenamed("id", "day").join(spark.range(0, ids, 1, parts))
> ds.repartition($"day")
>   .sortWithinPartitions($"day", $"id")
>   .write
>   .partitionBy("day")
>   .mode(SaveMode.Overwrite)
>   .csv("interleaved.csv")
> {code}
> Check the written files are sorted (states OK when file is sorted):
> {code:bash}
> for file in interleaved.csv/day\=*/part-*
> do
>   echo "$(sort -n "$file" | md5sum | cut -d " " -f 1)  $file"
> done | md5sum -c
> {code}
> Files should look like this
> {code}
> 0
> 1
> 2
> ...
> 1048576
> 1048577
> 1048578
> ...
> {code}
> But they look like
> {code}
> 0
> 1048576
> 1
> 1048577
> 2
> 1048578
> ...
> {code}
> The cause issue is the same as in SPARK-40588. A sort (for {{{}day{}}}) is added on top of the existing sort (for {{day}} and {{{}id{}}}). Spilling interleaves the sorted spill files.
> {code}
> Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0
> +- AdaptiveSparkPlan isFinalPlan=false
>    +- Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0
>       +- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, [plan_id=30]
>          +- BroadcastNestedLoopJoin BuildLeft, Inner
>             :- BroadcastExchange IdentityBroadcastMode, [plan_id=28]
>             :  +- Project [id#0L AS day#2L]
>             :     +- Range (0, 2, step=1, splits=2)
>             +- Range (0, 2000000, step=1, splits=2)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org