You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by cu...@apache.org on 2020/06/10 22:55:34 UTC

[spark] branch master updated: [SPARK-31915][SQL][PYTHON] Resolve the grouping column properly per the case sensitivity in grouped and cogrouped pandas UDFs

This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 00d06ca  [SPARK-31915][SQL][PYTHON] Resolve the grouping column properly per the case sensitivity in grouped and cogrouped pandas UDFs
00d06ca is described below

commit 00d06cad564d5e3e5f78a687776d02fe0695a861
Author: HyukjinKwon <gu...@apache.org>
AuthorDate: Wed Jun 10 15:54:07 2020 -0700

    [SPARK-31915][SQL][PYTHON] Resolve the grouping column properly per the case sensitivity in grouped and cogrouped pandas UDFs
    
    ### What changes were proposed in this pull request?
    
    This is another approach to fix the issue. See the previous try https://github.com/apache/spark/pull/28745. It was too invasive so I took more conservative approach.
    
    This PR proposes to resolve grouping attributes separately first so it can be properly referred when `FlatMapGroupsInPandas` and `FlatMapCoGroupsInPandas` are resolved without ambiguity.
    
    Previously,
    
    ```python
    from pyspark.sql.functions import *
    df = spark.createDataFrame([[1, 1]], ["column", "Score"])
    pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP)
    def my_pandas_udf(pdf):
        return pdf.assign(Score=0.5)
    
    df.groupby('COLUMN').apply(my_pandas_udf).show()
    ```
    
    was failed as below:
    
    ```
    pyspark.sql.utils.AnalysisException: "Reference 'COLUMN' is ambiguous, could be: COLUMN, COLUMN.;"
    ```
    because the unresolved `COLUMN` in `FlatMapGroupsInPandas` doesn't know which reference to take from the child projection.
    
    After this fix, it resolves the child projection first with grouping keys and pass, to `FlatMapGroupsInPandas`, the attribute as a grouping key from the child projection that is positionally selected.
    
    ### Why are the changes needed?
    
    To resolve grouping keys correctly.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes,
    
    ```python
    from pyspark.sql.functions import *
    df = spark.createDataFrame([[1, 1]], ["column", "Score"])
    pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP)
    def my_pandas_udf(pdf):
        return pdf.assign(Score=0.5)
    
    df.groupby('COLUMN').apply(my_pandas_udf).show()
    ```
    
    ```python
    df1 = spark.createDataFrame([(1, 1)], ("column", "value"))
    df2 = spark.createDataFrame([(1, 1)], ("column", "value"))
    
    df1.groupby("COLUMN").cogroup(
        df2.groupby("COLUMN")
    ).applyInPandas(lambda r, l: r + l, df1.schema).show()
    ```
    
    Before:
    
    ```
    pyspark.sql.utils.AnalysisException: Reference 'COLUMN' is ambiguous, could be: COLUMN, COLUMN.;
    ```
    
    ```
    pyspark.sql.utils.AnalysisException: cannot resolve '`COLUMN`' given input columns: [COLUMN, COLUMN, value, value];;
    'FlatMapCoGroupsInPandas ['COLUMN], ['COLUMN], <lambda>(column#9L, value#10L, column#13L, value#14L), [column#22L, value#23L]
    :- Project [COLUMN#9L, column#9L, value#10L]
    :  +- LogicalRDD [column#9L, value#10L], false
    +- Project [COLUMN#13L, column#13L, value#14L]
       +- LogicalRDD [column#13L, value#14L], false
    ```
    
    After:
    
    ```
    +------+-----+
    |column|Score|
    +------+-----+
    |     1|  0.5|
    +------+-----+
    ```
    
    ```
    +------+-----+
    |column|value|
    +------+-----+
    |     2|    2|
    +------+-----+
    ```
    
    ### How was this patch tested?
    
    Unittests were added and manually tested.
    
    Closes #28777 from HyukjinKwon/SPARK-31915-another.
    
    Authored-by: HyukjinKwon <gu...@apache.org>
    Signed-off-by: Bryan Cutler <cu...@gmail.com>
---
 python/pyspark/sql/tests/test_pandas_cogrouped_map.py  | 18 +++++++++++++++++-
 python/pyspark/sql/tests/test_pandas_grouped_map.py    | 10 ++++++++++
 .../apache/spark/sql/RelationalGroupedDataset.scala    | 17 ++++++++++-------
 3 files changed, 37 insertions(+), 8 deletions(-)

diff --git a/python/pyspark/sql/tests/test_pandas_cogrouped_map.py b/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
index 3ed9d2a..c1cb30c 100644
--- a/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
+++ b/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
@@ -19,7 +19,7 @@ import unittest
 import sys
 
 from pyspark.sql.functions import array, explode, col, lit, udf, sum, pandas_udf, PandasUDFType
-from pyspark.sql.types import DoubleType, StructType, StructField
+from pyspark.sql.types import DoubleType, StructType, StructField, Row
 from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, have_pyarrow, \
     pandas_requirement_message, pyarrow_requirement_message
 from pyspark.testing.utils import QuietTest
@@ -193,6 +193,22 @@ class CogroupedMapInPandasTests(ReusedSQLTestCase):
             left.groupby('id').cogroup(right.groupby('id')) \
                 .applyInPandas(lambda: 1, StructType([StructField("d", DoubleType())]))
 
+    def test_case_insensitive_grouping_column(self):
+        # SPARK-31915: case-insensitive grouping column should work.
+        df1 = self.spark.createDataFrame([(1, 1)], ("column", "value"))
+
+        row = df1.groupby("ColUmn").cogroup(
+            df1.groupby("COLUMN")
+        ).applyInPandas(lambda r, l: r + l, "column long, value long").first()
+        self.assertEquals(row.asDict(), Row(column=2, value=2).asDict())
+
+        df2 = self.spark.createDataFrame([(1, 1)], ("column", "value"))
+
+        row = df1.groupby("ColUmn").cogroup(
+            df2.groupby("COLUMN")
+        ).applyInPandas(lambda r, l: r + l, "column long, value long").first()
+        self.assertEquals(row.asDict(), Row(column=2, value=2).asDict())
+
     @staticmethod
     def _test_with_key(left, right, isLeft):
 
diff --git a/python/pyspark/sql/tests/test_pandas_grouped_map.py b/python/pyspark/sql/tests/test_pandas_grouped_map.py
index ff53a0c..7611943 100644
--- a/python/pyspark/sql/tests/test_pandas_grouped_map.py
+++ b/python/pyspark/sql/tests/test_pandas_grouped_map.py
@@ -587,6 +587,16 @@ class GroupedMapInPandasTests(ReusedSQLTestCase):
         # Check that all group and window_range values from udf matched expected
         self.assertTrue(all([r[0] for r in result]))
 
+    def test_case_insensitive_grouping_column(self):
+        # SPARK-31915: case-insensitive grouping column should work.
+        def my_pandas_udf(pdf):
+            return pdf.assign(score=0.5)
+
+        df = self.spark.createDataFrame([[1, 1]], ["column", "score"])
+        row = df.groupby('COLUMN').applyInPandas(
+            my_pandas_udf, schema="column integer, score float").first()
+        self.assertEquals(row.asDict(), Row(column=1, score=0.5).asDict())
+
 
 if __name__ == "__main__":
     from pyspark.sql.tests.test_pandas_grouped_map import *
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
index b1ba7d4..c37d8ea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
@@ -546,9 +546,10 @@ class RelationalGroupedDataset protected[sql](
       case ne: NamedExpression => ne
       case other => Alias(other, other.toString)()
     }
-    val groupingAttributes = groupingNamedExpressions.map(_.toAttribute)
     val child = df.logicalPlan
-    val project = Project(groupingNamedExpressions ++ child.output, child)
+    val project = df.sparkSession.sessionState.executePlan(
+      Project(groupingNamedExpressions ++ child.output, child)).analyzed
+    val groupingAttributes = project.output.take(groupingNamedExpressions.length)
     val output = expr.dataType.asInstanceOf[StructType].toAttributes
     val plan = FlatMapGroupsInPandas(groupingAttributes, expr, output, project)
 
@@ -583,14 +584,16 @@ class RelationalGroupedDataset protected[sql](
       case other => Alias(other, other.toString)()
     }
 
-    val leftAttributes = leftGroupingNamedExpressions.map(_.toAttribute)
-    val rightAttributes = rightGroupingNamedExpressions.map(_.toAttribute)
-
     val leftChild = df.logicalPlan
     val rightChild = r.df.logicalPlan
 
-    val left = Project(leftGroupingNamedExpressions ++ leftChild.output, leftChild)
-    val right = Project(rightGroupingNamedExpressions ++ rightChild.output, rightChild)
+    val left = df.sparkSession.sessionState.executePlan(
+      Project(leftGroupingNamedExpressions ++ leftChild.output, leftChild)).analyzed
+    val right = r.df.sparkSession.sessionState.executePlan(
+      Project(rightGroupingNamedExpressions ++ rightChild.output, rightChild)).analyzed
+
+    val leftAttributes = left.output.take(leftGroupingNamedExpressions.length)
+    val rightAttributes = right.output.take(rightGroupingNamedExpressions.length)
 
     val output = expr.dataType.asInstanceOf[StructType].toAttributes
     val plan = FlatMapCoGroupsInPandas(leftAttributes, rightAttributes, expr, output, left, right)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org