You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/08/03 07:12:09 UTC

[spark] branch branch-3.2 updated: [SPARK-39962][PYTHON][SQL] Apply projection when group attributes are empty

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 1ac74a1b170 [SPARK-39962][PYTHON][SQL] Apply projection when group attributes are empty
1ac74a1b170 is described below

commit 1ac74a1b170f52a093246d0bc13767160ddfbe46
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Wed Aug 3 16:11:20 2022 +0900

    [SPARK-39962][PYTHON][SQL] Apply projection when group attributes are empty
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to apply the projection to respect the reordered columns in its child when group attributes are empty.
    
    ### Why are the changes needed?
    
    To respect the column order in the child.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, it fixes a bug as below:
    
    ```python
    import pandas as pd
    from pyspark.sql import functions as f
    
    f.pandas_udf("double")
    def AVG(x: pd.Series) -> float:
        return x.mean()
    
    abc = spark.createDataFrame([(1.0, 5.0, 17.0)], schema=["a", "b", "c"])
    abc.agg(AVG("a"), AVG("c")).show()
    abc.select("c", "a").agg(AVG("a"), AVG("c")).show()
    ```
    
    **Before**
    
    ```
    +------+------+
    |AVG(a)|AVG(c)|
    +------+------+
    |  17.0|   1.0|
    +------+------+
    ```
    
    **After**
    
    ```
    +------+------+
    |AVG(a)|AVG(c)|
    +------+------+
    |   1.0|  17.0|
    +------+------+
    ```
    
    ### How was this patch tested?
    
    Manually tested, and added an unittest.
    
    Closes #37390 from HyukjinKwon/SPARK-39962.
    
    Authored-by: Hyukjin Kwon <gu...@apache.org>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
    (cherry picked from commit 5335c784ae76c9cc0aaa7a4b57b3cd6b3891ad9a)
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../spark/sql/execution/python/AggregateInPandasExec.scala  |  5 +++--
 .../apache/spark/sql/execution/python/PythonUDFSuite.scala  | 13 +++++++++++++
 2 files changed, 16 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
index 69802b143c1..0b0ed44958e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
@@ -131,12 +131,13 @@ case class AggregateInPandasExec(
       val newIter: Iterator[InternalRow] = mayAppendUpdatingSessionIterator(iter)
       val prunedProj = UnsafeProjection.create(allInputs.toSeq, child.output)
 
-      val grouped = if (groupingExpressions.isEmpty) {
+      val groupedItr = if (groupingExpressions.isEmpty) {
         // Use an empty unsafe row as a place holder for the grouping key
         Iterator((new UnsafeRow(), newIter))
       } else {
         GroupedIterator(newIter, groupingExpressions, child.output)
-      }.map { case (key, rows) =>
+      }
+      val grouped = groupedItr.map { case (key, rows) =>
         (key, rows.map(prunedProj))
       }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
index 45b57207c57..4ad7f901053 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
@@ -71,4 +71,17 @@ class PythonUDFSuite extends QueryTest with SharedSparkSession {
         pythonTestUDF(count(pythonTestUDF(base("a") + 1))))
     checkAnswer(df1, df2)
   }
+
+  test("SPARK-39962: Global aggregation of Pandas UDF should respect the column order") {
+    assume(shouldTestGroupedAggPandasUDFs)
+    val df = Seq[(java.lang.Integer, java.lang.Integer)]((1, null)).toDF("a", "b")
+
+    val pandasTestUDF = TestGroupedAggPandasUDF(name = "pandas_udf")
+    val reorderedDf = df.select("b", "a")
+    val actual = reorderedDf.agg(
+      pandasTestUDF(reorderedDf("a")), pandasTestUDF(reorderedDf("b")))
+    val expected = df.agg(pandasTestUDF(df("a")), pandasTestUDF(df("b")))
+
+    checkAnswer(actual, expected)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org