You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Enrico Minack (Jira)" <ji...@apache.org> on 2023/01/24 12:17:00 UTC
[jira] [Updated] (SPARK-42168) CoGroup with window function returns incorrect result when partition keys differ in order
[ https://issues.apache.org/jira/browse/SPARK-42168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Enrico Minack updated SPARK-42168:
----------------------------------
Description:
The following example returns an incorrect result:
{code:java}
import pandas as pd
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, lit, sum
spark = SparkSession \
.builder \
.getOrCreate()
ids = 1000
days = 1000
parts = 10
id_df = spark.range(ids)
day_df = spark.range(days).withColumnRenamed("id", "day")
id_day_df = id_df.join(day_df)
left_df = id_day_df.select(col("id").alias("id"), col("day").alias("day"), lit("left").alias("side")).repartition(parts).cache()
right_df = id_day_df.select(col("id").alias("id"), col("day").alias("day"), lit("right").alias("side")).repartition(parts).cache() #.withColumnRenamed("id", "id2")
# note the column order is different to the groupBy("id", "day") column order below
window = Window.partitionBy("day", "id")
left_grouped_df = left_df.groupBy("id", "day")
right_grouped_df = right_df.withColumn("day_sum", sum(col("day")).over(window)).groupBy("id", "day")
def cogroup(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
return pd.DataFrame([{
"id": left["id"][0] if not left.empty else (right["id"][0] if not right.empty else None),
"day": left["day"][0] if not left.empty else (right["day"][0] if not right.empty else None),
"lefts": len(left.index),
"rights": len(right.index)
}])
df = left_grouped_df.cogroup(right_grouped_df) \
.applyInPandas(cogroup, schema="id long, day long, lefts integer, rights integer")
df.explain()
df.show(5)
{code}
Output is
{code}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67]
:- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117]
: +- ...
+- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
+- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
+- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L]
+- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112]
+- ...
+---+---+-----+------+
| id|day|lefts|rights|
+---+---+-----+------+
| 0| 3| 0| 1|
| 0| 4| 0| 1|
| 0| 13| 1| 0|
| 0| 27| 0| 1|
| 0| 31| 0| 1|
+---+---+-----+------+
only showing top 5 rows
{code}
The first child is hash-partitioned by {{id}} and {{{}day{}}}, while the second child is hash-partitioned by {{day}} and {{id}} (required by the window function). Therefore, rows end up in different partitions.
This has been fixed in Spark 3.3 by [#32875|https://github.com/apache/spark/pull/32875/files#diff-e938569a4ca4eba8f7e10fe473d4f9c306ea253df151405bcaba880a601f075fR75-R76]:
{code}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L)#63, [id#64L, day#65L, lefts#66, rights#67]
:- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117]
: +- ...
+- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#29L, day#30L, 200), ENSURE_REQUIREMENTS, [plan_id=118]
+- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
+- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L]
+- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112]
+- ...
+---+---+-----+------+
| id|day|lefts|rights|
+---+---+-----+------+
| 0| 13| 1| 1|
| 0| 63| 1| 1|
| 0| 89| 1| 1|
| 0| 95| 1| 1|
| 0| 96| 1| 1|
+---+---+-----+------+
only showing top 5 rows
{code}
Only PySpark seems to be affected.
was:
The following example returns an incorrect result:
{code:java}
import pandas as pd
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, lit, sum
spark = SparkSession \
.builder \
.getOrCreate()
ids = 1000
days = 1000
parts = 10
id_df = spark.range(ids)
day_df = spark.range(days).withColumnRenamed("id", "day")
id_day_df = id_df.join(day_df)
left_df = id_day_df.select(col("id").alias("id"), col("day").alias("day"), lit("left").alias("side")).repartition(parts).cache()
right_df = id_day_df.select(col("id").alias("id"), col("day").alias("day"), lit("right").alias("side")).repartition(parts).cache() #.withColumnRenamed("id", "id2")
# note the column order is different to the groupBy("id", "day") column order below
window = Window.partitionBy("day", "id")
left_grouped_df = left_df.groupBy("id", "day")
right_grouped_df = right_df.withColumn("day_sum", sum(col("day")).over(window)).groupBy("id", "day")
def cogroup(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
return pd.DataFrame([{
"id": left["id"][0] if not left.empty else (right["id"][0] if not right.empty else None),
"day": left["day"][0] if not left.empty else (right["day"][0] if not right.empty else None),
"lefts": len(left.index),
"rights": len(right.index)
}])
df = left_grouped_df.cogroup(right_grouped_df) \
.applyInPandas(cogroup, schema="id long, day long, lefts integer, rights integer")
df.explain()
df.show(5)
{code}
Output is
{code}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67]
:- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117]
: +- InMemoryTableScan [id#8L, day#9L, id#8L, day#9L, side#10]
: +- InMemoryRelation [id#8L, day#9L, side#10], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, [plan_id=33]
: +- *(2) Project [id#0L, day#4L, left AS side#10]
: +- *(2) BroadcastNestedLoopJoin BuildRight, Inner
: :- *(2) Range (0, 1000, step=1, splits=16)
: +- BroadcastExchange IdentityBroadcastMode, [plan_id=28]
: +- *(1) Project [id#2L AS day#4L]
: +- *(1) Range (0, 1000, step=1, splits=16)
+- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
+- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
+- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L]
+- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112]
+- InMemoryTableScan [id#29L, day#30L, side#31]
+- InMemoryRelation [id#29L, day#30L, side#31], StorageLevel(disk, memory, deserialized, 1 replicas)
+- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, [plan_id=79]
+- *(2) Project [id#0L, day#4L, right AS side#31]
+- *(2) BroadcastNestedLoopJoin BuildRight, Inner
:- *(2) Range (0, 1000, step=1, splits=16)
+- BroadcastExchange IdentityBroadcastMode, [plan_id=74]
+- *(1) Project [id#2L AS day#4L]
+- *(1) Range (0, 1000, step=1, splits=16)
+---+---+-----+------+
| id|day|lefts|rights|
+---+---+-----+------+
| 0| 3| 0| 1|
| 0| 4| 0| 1|
| 0| 13| 1| 0|
| 0| 27| 0| 1|
| 0| 31| 0| 1|
+---+---+-----+------+
only showing top 5 rows
{code}
The first child is hash-partitioned by {{id}} and {{{}day{}}}, while the second child is hash-partitioned by {{day}} and {{id}} (required by the window function). Therefore, rows end up in different partitions.
This has been fixed in Spark 3.3 by [#32875|https://github.com/apache/spark/pull/32875/files#diff-e938569a4ca4eba8f7e10fe473d4f9c306ea253df151405bcaba880a601f075fR75-R76]:
{code}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L)#63, [id#64L, day#65L, lefts#66, rights#67]
:- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117]
: +- InMemoryTableScan [id#8L, day#9L, id#8L, day#9L, side#10]
: +- InMemoryRelation [id#8L, day#9L, side#10], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, [plan_id=33]
: +- *(2) Project [id#0L, day#4L, left AS side#10]
: +- *(2) BroadcastNestedLoopJoin BuildRight, Inner
: :- *(2) Range (0, 1000, step=1, splits=16)
: +- BroadcastExchange IdentityBroadcastMode, [plan_id=28]
: +- *(1) Project [id#2L AS day#4L]
: +- *(1) Range (0, 1000, step=1, splits=16)
+- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#29L, day#30L, 200), ENSURE_REQUIREMENTS, [plan_id=118]
+- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
+- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L]
+- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112]
+- InMemoryTableScan [id#29L, day#30L, side#31]
+- InMemoryRelation [id#29L, day#30L, side#31], StorageLevel(disk, memory, deserialized, 1 replicas)
+- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, [plan_id=79]
+- *(2) Project [id#0L, day#4L, right AS side#31]
+- *(2) BroadcastNestedLoopJoin BuildRight, Inner
:- *(2) Range (0, 1000, step=1, splits=16)
+- BroadcastExchange IdentityBroadcastMode, [plan_id=74]
+- *(1) Project [id#2L AS day#4L]
+- *(1) Range (0, 1000, step=1, splits=16)
+---+---+-----+------+
| id|day|lefts|rights|
+---+---+-----+------+
| 0| 13| 1| 1|
| 0| 63| 1| 1|
| 0| 89| 1| 1|
| 0| 95| 1| 1|
| 0| 96| 1| 1|
+---+---+-----+------+
only showing top 5 rows
{code}
Only PySpark seems to be affected.
> CoGroup with window function returns incorrect result when partition keys differ in order
> -----------------------------------------------------------------------------------------
>
> Key: SPARK-42168
> URL: https://issues.apache.org/jira/browse/SPARK-42168
> Project: Spark
> Issue Type: Bug
> Components: PySpark, SQL
> Affects Versions: 3.0.3, 3.1.3, 3.2.3
> Reporter: Enrico Minack
> Priority: Major
> Labels: correctness
>
> The following example returns an incorrect result:
> {code:java}
> import pandas as pd
> from pyspark.sql import SparkSession, Window
> from pyspark.sql.functions import col, lit, sum
> spark = SparkSession \
> .builder \
> .getOrCreate()
> ids = 1000
> days = 1000
> parts = 10
> id_df = spark.range(ids)
> day_df = spark.range(days).withColumnRenamed("id", "day")
> id_day_df = id_df.join(day_df)
> left_df = id_day_df.select(col("id").alias("id"), col("day").alias("day"), lit("left").alias("side")).repartition(parts).cache()
> right_df = id_day_df.select(col("id").alias("id"), col("day").alias("day"), lit("right").alias("side")).repartition(parts).cache() #.withColumnRenamed("id", "id2")
> # note the column order is different to the groupBy("id", "day") column order below
> window = Window.partitionBy("day", "id")
> left_grouped_df = left_df.groupBy("id", "day")
> right_grouped_df = right_df.withColumn("day_sum", sum(col("day")).over(window)).groupBy("id", "day")
> def cogroup(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
> return pd.DataFrame([{
> "id": left["id"][0] if not left.empty else (right["id"][0] if not right.empty else None),
> "day": left["day"][0] if not left.empty else (right["day"][0] if not right.empty else None),
> "lefts": len(left.index),
> "rights": len(right.index)
> }])
> df = left_grouped_df.cogroup(right_grouped_df) \
> .applyInPandas(cogroup, schema="id long, day long, lefts integer, rights integer")
> df.explain()
> df.show(5)
> {code}
> Output is
> {code}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67]
> :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117]
> : +- ...
> +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
> +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
> +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L]
> +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112]
> +- ...
> +---+---+-----+------+
> | id|day|lefts|rights|
> +---+---+-----+------+
> | 0| 3| 0| 1|
> | 0| 4| 0| 1|
> | 0| 13| 1| 0|
> | 0| 27| 0| 1|
> | 0| 31| 0| 1|
> +---+---+-----+------+
> only showing top 5 rows
> {code}
> The first child is hash-partitioned by {{id}} and {{{}day{}}}, while the second child is hash-partitioned by {{day}} and {{id}} (required by the window function). Therefore, rows end up in different partitions.
> This has been fixed in Spark 3.3 by [#32875|https://github.com/apache/spark/pull/32875/files#diff-e938569a4ca4eba8f7e10fe473d4f9c306ea253df151405bcaba880a601f075fR75-R76]:
> {code}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L)#63, [id#64L, day#65L, lefts#66, rights#67]
> :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117]
> : +- ...
> +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(id#29L, day#30L, 200), ENSURE_REQUIREMENTS, [plan_id=118]
> +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
> +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L]
> +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112]
> +- ...
> +---+---+-----+------+
> | id|day|lefts|rights|
> +---+---+-----+------+
> | 0| 13| 1| 1|
> | 0| 63| 1| 1|
> | 0| 89| 1| 1|
> | 0| 95| 1| 1|
> | 0| 96| 1| 1|
> +---+---+-----+------+
> only showing top 5 rows
> {code}
> Only PySpark seems to be affected.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org