You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/08/03 07:11:49 UTC
[spark] branch branch-3.3 updated: [SPARK-39962][PYTHON][SQL] Apply projection when group attributes are empty
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new bd3f36f6626 [SPARK-39962][PYTHON][SQL] Apply projection when group attributes are empty
bd3f36f6626 is described below
commit bd3f36f6626f0fb71ab0ceb9bbe7fa4d05c628f5
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Wed Aug 3 16:11:20 2022 +0900
[SPARK-39962][PYTHON][SQL] Apply projection when group attributes are empty
### What changes were proposed in this pull request?
This PR proposes to apply the projection to respect the reordered columns in its child when group attributes are empty.
### Why are the changes needed?
To respect the column order in the child.
### Does this PR introduce _any_ user-facing change?
Yes, it fixes a bug as below:
```python
import pandas as pd
from pyspark.sql import functions as f
f.pandas_udf("double")
def AVG(x: pd.Series) -> float:
return x.mean()
abc = spark.createDataFrame([(1.0, 5.0, 17.0)], schema=["a", "b", "c"])
abc.agg(AVG("a"), AVG("c")).show()
abc.select("c", "a").agg(AVG("a"), AVG("c")).show()
```
**Before**
```
+------+------+
|AVG(a)|AVG(c)|
+------+------+
| 17.0| 1.0|
+------+------+
```
**After**
```
+------+------+
|AVG(a)|AVG(c)|
+------+------+
| 1.0| 17.0|
+------+------+
```
### How was this patch tested?
Manually tested, and added an unittest.
Closes #37390 from HyukjinKwon/SPARK-39962.
Authored-by: Hyukjin Kwon <gu...@apache.org>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
(cherry picked from commit 5335c784ae76c9cc0aaa7a4b57b3cd6b3891ad9a)
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../spark/sql/execution/python/AggregateInPandasExec.scala | 5 +++--
.../apache/spark/sql/execution/python/PythonUDFSuite.scala | 13 +++++++++++++
2 files changed, 16 insertions(+), 2 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
index a7f63aafc9f..2f85149ee8e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
@@ -131,12 +131,13 @@ case class AggregateInPandasExec(
val newIter: Iterator[InternalRow] = mayAppendUpdatingSessionIterator(iter)
val prunedProj = UnsafeProjection.create(allInputs.toSeq, child.output)
- val grouped = if (groupingExpressions.isEmpty) {
+ val groupedItr = if (groupingExpressions.isEmpty) {
// Use an empty unsafe row as a place holder for the grouping key
Iterator((new UnsafeRow(), newIter))
} else {
GroupedIterator(newIter, groupingExpressions, child.output)
- }.map { case (key, rows) =>
+ }
+ val grouped = groupedItr.map { case (key, rows) =>
(key, rows.map(prunedProj))
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
index 45b57207c57..4ad7f901053 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
@@ -71,4 +71,17 @@ class PythonUDFSuite extends QueryTest with SharedSparkSession {
pythonTestUDF(count(pythonTestUDF(base("a") + 1))))
checkAnswer(df1, df2)
}
+
+ test("SPARK-39962: Global aggregation of Pandas UDF should respect the column order") {
+ assume(shouldTestGroupedAggPandasUDFs)
+ val df = Seq[(java.lang.Integer, java.lang.Integer)]((1, null)).toDF("a", "b")
+
+ val pandasTestUDF = TestGroupedAggPandasUDF(name = "pandas_udf")
+ val reorderedDf = df.select("b", "a")
+ val actual = reorderedDf.agg(
+ pandasTestUDF(reorderedDf("a")), pandasTestUDF(reorderedDf("b")))
+ val expected = df.agg(pandasTestUDF(df("a")), pandasTestUDF(df("b")))
+
+ checkAnswer(actual, expected)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org