You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Enrico Minack (Jira)" <ji...@apache.org> on 2023/01/05 21:06:00 UTC
[jira] [Created] (SPARK-41914) Sorting issue with partitioned-writing and planned write optimization disabled
Enrico Minack created SPARK-41914:
-------------------------------------
Summary: Sorting issue with partitioned-writing and planned write optimization disabled
Key: SPARK-41914
URL: https://issues.apache.org/jira/browse/SPARK-41914
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 3.4.0
Reporter: Enrico Minack
Spark 3.4.0 introduced option {{{}spark.sql.optimizer.plannedWrite.enabled{}}}, which is enabled by default. When disabled, partitioned writing loses in-partition order when spilling occurs.
This is related to SPARK-40885 where setting option {{spark.sql.optimizer.plannedWrite.enabled}} to {{true}} will remove the existing sort (for {{day}} and {{{}id{}}}) entirely.
Run this with 512m memory and one executor, e.g.:
{code}
spark-shell --driver-memory 512m --master "local[1]"
{code}
{code:scala}
import org.apache.spark.sql.SaveMode
spark.conf.set("spark.sql.optimizer.plannedWrite.enabled", false)
val ids = 2000000
val days = 2
val parts = 2
val ds = spark.range(0, days, 1, parts).withColumnRenamed("id", "day").join(spark.range(0, ids, 1, parts))
ds.repartition($"day")
.sortWithinPartitions($"day", $"id")
.write
.partitionBy("day")
.mode(SaveMode.Overwrite)
.csv("interleaved.csv")
{code}
Check the written files are sorted (states OK when file is sorted):
{code:bash}
for file in interleaved.csv/day\=*/part-*
do
echo "$(sort -n "$file" | md5sum | cut -d " " -f 1) $file"
done | md5sum -c
{code}
Files should look like this
{code}
0
1
2
...
1048576
1048577
1048578
...
{code}
But they look like
{code}
0
1048576
1
1048577
2
1048578
...
{code}
The cause issue is the same as in SPARK-40588. A sort (for {{{}day{}}}) is added on top of the existing sort (for {{day}} and {{{}id{}}}). Spilling interleaves the sorted spill files.
{code}
Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0
+- AdaptiveSparkPlan isFinalPlan=false
+- Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, [plan_id=30]
+- BroadcastNestedLoopJoin BuildLeft, Inner
:- BroadcastExchange IdentityBroadcastMode, [plan_id=28]
: +- Project [id#0L AS day#2L]
: +- Range (0, 2, step=1, splits=2)
+- Range (0, 2000000, step=1, splits=2)
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org