You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Bruce Robbins (Jira)" <ji...@apache.org> on 2022/01/24 21:03:00 UTC
[jira] [Commented] (SPARK-38000) Sort node incorrectly removed from the optimized logical plan
[ https://issues.apache.org/jira/browse/SPARK-38000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17481392#comment-17481392 ]
Bruce Robbins commented on SPARK-38000:
---------------------------------------
I can reproduce on 3.2.0, but it seems to be fixed on 3.2.1-RC2:
{noformat}
+---+----+
| id|rank|
+---+----+
| 10| 1|
| 11| 1|
| 12| 1|
| 13| 1|
| 14| 1|
| 15| 1|
| 16| 1|
| 17| 1|
| 18| 1|
| 19| 1|
| 20| 1|
+---+----+
scala> == Parsed Logical Plan ==
'Sort ['id ASC NULLS FIRST], true
+- Project [id#1, rank#10]
+- Project [id#1, rank#10, rank#10]
+- Window [row_number() windowspecdefinition(id#1, id#1 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#10], [id#1], [id#1 ASC NULLS FIRST]
+- Project [id#1]
+- Project [id#1]
+- Join LeftOuter, (id#1 = id#5)
:- LocalRelation [id#1]
+- Deduplicate [id#5]
+- LocalRelation <empty>, [id#5]
== Analyzed Logical Plan ==
id: string, rank: int
Sort [id#1 ASC NULLS FIRST], true
+- Project [id#1, rank#10]
+- Project [id#1, rank#10, rank#10]
+- Window [row_number() windowspecdefinition(id#1, id#1 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#10], [id#1], [id#1 ASC NULLS FIRST]
+- Project [id#1]
+- Project [id#1]
+- Join LeftOuter, (id#1 = id#5)
:- LocalRelation [id#1]
+- Deduplicate [id#5]
+- LocalRelation <empty>, [id#5]
== Optimized Logical Plan ==
InMemoryRelation [id#1, rank#10], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(2) Sort [id#1 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(id#1 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#21]
+- Window [row_number() windowspecdefinition(id#1, id#1 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#10], [id#1], [id#1 ASC NULLS FIRST]
+- *(1) Sort [id#1 ASC NULLS FIRST, id#1 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#1, 200), ENSURE_REQUIREMENTS, [id=#11]
+- LocalTableScan [id#1]
== Physical Plan ==
InMemoryTableScan [id#1, rank#10]
+- InMemoryRelation [id#1, rank#10], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(2) Sort [id#1 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(id#1 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#21]
+- Window [row_number() windowspecdefinition(id#1, id#1 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#10], [id#1], [id#1 ASC NULLS FIRST]
+- *(1) Sort [id#1 ASC NULLS FIRST, id#1 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#1, 200), ENSURE_REQUIREMENTS, [id=#11]
+- LocalTableScan [id#1]
{noformat}
> Sort node incorrectly removed from the optimized logical plan
> -------------------------------------------------------------
>
> Key: SPARK-38000
> URL: https://issues.apache.org/jira/browse/SPARK-38000
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.2.0
> Environment: Tested on:
> Ubuntu 18.04.2 LTS
> OpenJDK 1.8.0_312 64-Bit Server VM (build 25.312-b07, mixed mode)
> Reporter: Antoine Wendlinger
> Priority: Major
> Labels: correctness
>
> When using a fairly involved combination of joins, windows, cache and orderBy, the sorting phase disappears from the optimized logical plan and the resulting dataframe is not sorted.
> You can find a reproduction of the bug in [https://github.com/antoinewdg/spark-bug-report|http://example.com/].
> Use {{sbt run}} to get the results.
> The bug is very niche, I chose to report it because it looks like a correctness issue, and may be a symptom of a larger one.
> The bug affects only 3.2.0, tests on 3.1.2 show the result correctly sorted.
> As far as I could test it, all steps in the reproduction are necessary for the bug to happen:
> * the join with an empty dataframe
> * the distinct call on the empty dataframe
> * the window function
> * the cache after the order by
> h2. Code
>
> {code:scala}
> val players = (10 to 20).map(x => Player(id = x.toString)).toDS
> val blacklist = sparkSession
> .emptyDataset[BlacklistEntry]
> .distinct()
> val result = players
> .join(blacklist, Seq("id"), "left_outer")
> .withColumn("rank", row_number().over(Window.partitionBy("id").orderBy("id")))
> .orderBy("id")
> .cache()
> result.show()
> result.explain(true)
> {code}
>
> h2. Output
>
> {code:java}
> +---+----+
> | id|rank|
> +---+----+
> | 15| 1|
> | 11| 1|
> | 16| 1|
> | 18| 1|
> | 17| 1|
> | 19| 1|
> | 20| 1|
> | 10| 1|
> | 12| 1|
> | 13| 1|
> | 14| 1|
> +---+----+
> == Parsed Logical Plan ==
> 'Sort ['id ASC NULLS FIRST], true
> +- Project [id#1, rank#10]
> +- Project [id#1, rank#10, rank#10]
> +- Window [row_number() windowspecdefinition(id#1, id#1 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#10], [id#1], [id#1 ASC NULLS FIRST]
> +- Project [id#1]
> +- Project [id#1]
> +- Join LeftOuter, (id#1 = id#5)
> :- LocalRelation [id#1]
> +- Deduplicate [id#5]
> +- LocalRelation <empty>, [id#5]
> == Analyzed Logical Plan ==
> id: string, rank: int
> Sort [id#1 ASC NULLS FIRST], true
> +- Project [id#1, rank#10]
> +- Project [id#1, rank#10, rank#10]
> +- Window [row_number() windowspecdefinition(id#1, id#1 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#10], [id#1], [id#1 ASC NULLS FIRST]
> +- Project [id#1]
> +- Project [id#1]
> +- Join LeftOuter, (id#1 = id#5)
> :- LocalRelation [id#1]
> +- Deduplicate [id#5]
> +- LocalRelation <empty>, [id#5]
> == Optimized Logical Plan ==
> InMemoryRelation [id#1, rank#10], StorageLevel(disk, memory, deserialized, 1 replicas)
> +- Window [row_number() windowspecdefinition(id#1, id#1 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#10], [id#1], [id#1 ASC NULLS FIRST]
> +- *(1) Sort [id#1 ASC NULLS FIRST, id#1 ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(id#1, 200), ENSURE_REQUIREMENTS, [id=#7]
> +- LocalTableScan [id#1]
> == Physical Plan ==
> InMemoryTableScan [id#1, rank#10]
> +- InMemoryRelation [id#1, rank#10], StorageLevel(disk, memory, deserialized, 1 replicas)
> +- Window [row_number() windowspecdefinition(id#1, id#1 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#10], [id#1], [id#1 ASC NULLS FIRST]
> +- *(1) Sort [id#1 ASC NULLS FIRST, id#1 ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(id#1, 200), ENSURE_REQUIREMENTS, [id=#7]
> +- LocalTableScan [id#1]{quote}
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org