You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Enrico Minack (Jira)" <ji...@apache.org> on 2023/01/05 21:06:00 UTC

[jira] [Created] (SPARK-41914) Sorting issue with partitioned-writing and planned write optimization disabled

Enrico Minack created SPARK-41914:
-------------------------------------

             Summary: Sorting issue with partitioned-writing and planned write optimization disabled
                 Key: SPARK-41914
                 URL: https://issues.apache.org/jira/browse/SPARK-41914
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.4.0
            Reporter: Enrico Minack


Spark 3.4.0 introduced option {{{}spark.sql.optimizer.plannedWrite.enabled{}}}, which is enabled by default. When disabled, partitioned writing loses in-partition order when spilling occurs.

This is related to SPARK-40885 where setting option {{spark.sql.optimizer.plannedWrite.enabled}} to {{true}} will remove the existing sort (for {{day}} and {{{}id{}}}) entirely.

Run this with 512m memory and one executor, e.g.:
{code}
spark-shell --driver-memory 512m --master "local[1]"
{code}
{code:scala}
import org.apache.spark.sql.SaveMode

spark.conf.set("spark.sql.optimizer.plannedWrite.enabled", false)

val ids = 2000000
val days = 2
val parts = 2

val ds = spark.range(0, days, 1, parts).withColumnRenamed("id", "day").join(spark.range(0, ids, 1, parts))

ds.repartition($"day")
  .sortWithinPartitions($"day", $"id")
  .write
  .partitionBy("day")
  .mode(SaveMode.Overwrite)
  .csv("interleaved.csv")
{code}
Check the written files are sorted (states OK when file is sorted):
{code:bash}
for file in interleaved.csv/day\=*/part-*
do
  echo "$(sort -n "$file" | md5sum | cut -d " " -f 1)  $file"
done | md5sum -c
{code}
Files should look like this
{code}
0
1
2
...
1048576
1048577
1048578
...
{code}
But they look like
{code}
0
1048576
1
1048577
2
1048578
...
{code}
The cause issue is the same as in SPARK-40588. A sort (for {{{}day{}}}) is added on top of the existing sort (for {{day}} and {{{}id{}}}). Spilling interleaves the sorted spill files.

{code}
Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0
+- AdaptiveSparkPlan isFinalPlan=false
   +- Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, [plan_id=30]
         +- BroadcastNestedLoopJoin BuildLeft, Inner
            :- BroadcastExchange IdentityBroadcastMode, [plan_id=28]
            :  +- Project [id#0L AS day#2L]
            :     +- Range (0, 2, step=1, splits=2)
            +- Range (0, 2000000, step=1, splits=2)
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org