You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/08/03 07:13:27 UTC

[spark] branch branch-3.1 updated: [SPARK-39962][PYTHON][SQL] Apply projection when group attributes are empty

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 7e0a5ef903c [SPARK-39962][PYTHON][SQL] Apply projection when group attributes are empty
7e0a5ef903c is described below

commit 7e0a5ef903c41eea3b0d1220bfabda2c8b8a5ac4
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Wed Aug 3 16:11:20 2022 +0900

    [SPARK-39962][PYTHON][SQL] Apply projection when group attributes are empty
    
    This PR proposes to apply the projection to respect the reordered columns in its child when group attributes are empty.
    
    To respect the column order in the child.
    
    Yes, it fixes a bug as below:
    
    ```python
    import pandas as pd
    from pyspark.sql import functions as f
    
    f.pandas_udf("double")
    def AVG(x: pd.Series) -> float:
        return x.mean()
    
    abc = spark.createDataFrame([(1.0, 5.0, 17.0)], schema=["a", "b", "c"])
    abc.agg(AVG("a"), AVG("c")).show()
    abc.select("c", "a").agg(AVG("a"), AVG("c")).show()
    ```
    
    **Before**
    
    ```
    +------+------+
    |AVG(a)|AVG(c)|
    +------+------+
    |  17.0|   1.0|
    +------+------+
    ```
    
    **After**
    
    ```
    +------+------+
    |AVG(a)|AVG(c)|
    +------+------+
    |   1.0|  17.0|
    +------+------+
    ```
    
    Manually tested, and added an unittest.
    
    Closes #37390 from HyukjinKwon/SPARK-39962.
    
    Authored-by: Hyukjin Kwon <gu...@apache.org>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
    (cherry picked from commit 5335c784ae76c9cc0aaa7a4b57b3cd6b3891ad9a)
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../spark/sql/execution/python/AggregateInPandasExec.scala  |  5 +++--
 .../apache/spark/sql/execution/python/PythonUDFSuite.scala  | 13 +++++++++++++
 2 files changed, 16 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
index dadf1129c34..791af2a6aee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
@@ -109,12 +109,13 @@ case class AggregateInPandasExec(
     inputRDD.mapPartitionsInternal { iter => if (iter.isEmpty) iter else {
       val prunedProj = UnsafeProjection.create(allInputs.toSeq, child.output)
 
-      val grouped = if (groupingExpressions.isEmpty) {
+      val groupedItr = if (groupingExpressions.isEmpty) {
         // Use an empty unsafe row as a place holder for the grouping key
         Iterator((new UnsafeRow(), iter))
       } else {
         GroupedIterator(iter, groupingExpressions, child.output)
-      }.map { case (key, rows) =>
+      }
+      val grouped = groupedItr.map { case (key, rows) =>
         (key, rows.map(prunedProj))
       }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
index 45b57207c57..4ad7f901053 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
@@ -71,4 +71,17 @@ class PythonUDFSuite extends QueryTest with SharedSparkSession {
         pythonTestUDF(count(pythonTestUDF(base("a") + 1))))
     checkAnswer(df1, df2)
   }
+
+  test("SPARK-39962: Global aggregation of Pandas UDF should respect the column order") {
+    assume(shouldTestGroupedAggPandasUDFs)
+    val df = Seq[(java.lang.Integer, java.lang.Integer)]((1, null)).toDF("a", "b")
+
+    val pandasTestUDF = TestGroupedAggPandasUDF(name = "pandas_udf")
+    val reorderedDf = df.select("b", "a")
+    val actual = reorderedDf.agg(
+      pandasTestUDF(reorderedDf("a")), pandasTestUDF(reorderedDf("b")))
+    val expected = df.agg(pandasTestUDF(df("a")), pandasTestUDF(df("b")))
+
+    checkAnswer(actual, expected)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org