You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by cu...@apache.org on 2020/06/10 22:55:34 UTC
[spark] branch master updated: [SPARK-31915][SQL][PYTHON] Resolve
the grouping column properly per the case sensitivity in grouped and
cogrouped pandas UDFs
This is an automated email from the ASF dual-hosted git repository.
cutlerb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 00d06ca [SPARK-31915][SQL][PYTHON] Resolve the grouping column properly per the case sensitivity in grouped and cogrouped pandas UDFs
00d06ca is described below
commit 00d06cad564d5e3e5f78a687776d02fe0695a861
Author: HyukjinKwon <gu...@apache.org>
AuthorDate: Wed Jun 10 15:54:07 2020 -0700
[SPARK-31915][SQL][PYTHON] Resolve the grouping column properly per the case sensitivity in grouped and cogrouped pandas UDFs
### What changes were proposed in this pull request?
This is another approach to fix the issue. See the previous try https://github.com/apache/spark/pull/28745. It was too invasive so I took more conservative approach.
This PR proposes to resolve grouping attributes separately first so it can be properly referred when `FlatMapGroupsInPandas` and `FlatMapCoGroupsInPandas` are resolved without ambiguity.
Previously,
```python
from pyspark.sql.functions import *
df = spark.createDataFrame([[1, 1]], ["column", "Score"])
pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP)
def my_pandas_udf(pdf):
return pdf.assign(Score=0.5)
df.groupby('COLUMN').apply(my_pandas_udf).show()
```
was failed as below:
```
pyspark.sql.utils.AnalysisException: "Reference 'COLUMN' is ambiguous, could be: COLUMN, COLUMN.;"
```
because the unresolved `COLUMN` in `FlatMapGroupsInPandas` doesn't know which reference to take from the child projection.
After this fix, it resolves the child projection first with grouping keys and pass, to `FlatMapGroupsInPandas`, the attribute as a grouping key from the child projection that is positionally selected.
### Why are the changes needed?
To resolve grouping keys correctly.
### Does this PR introduce _any_ user-facing change?
Yes,
```python
from pyspark.sql.functions import *
df = spark.createDataFrame([[1, 1]], ["column", "Score"])
pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP)
def my_pandas_udf(pdf):
return pdf.assign(Score=0.5)
df.groupby('COLUMN').apply(my_pandas_udf).show()
```
```python
df1 = spark.createDataFrame([(1, 1)], ("column", "value"))
df2 = spark.createDataFrame([(1, 1)], ("column", "value"))
df1.groupby("COLUMN").cogroup(
df2.groupby("COLUMN")
).applyInPandas(lambda r, l: r + l, df1.schema).show()
```
Before:
```
pyspark.sql.utils.AnalysisException: Reference 'COLUMN' is ambiguous, could be: COLUMN, COLUMN.;
```
```
pyspark.sql.utils.AnalysisException: cannot resolve '`COLUMN`' given input columns: [COLUMN, COLUMN, value, value];;
'FlatMapCoGroupsInPandas ['COLUMN], ['COLUMN], <lambda>(column#9L, value#10L, column#13L, value#14L), [column#22L, value#23L]
:- Project [COLUMN#9L, column#9L, value#10L]
: +- LogicalRDD [column#9L, value#10L], false
+- Project [COLUMN#13L, column#13L, value#14L]
+- LogicalRDD [column#13L, value#14L], false
```
After:
```
+------+-----+
|column|Score|
+------+-----+
| 1| 0.5|
+------+-----+
```
```
+------+-----+
|column|value|
+------+-----+
| 2| 2|
+------+-----+
```
### How was this patch tested?
Unittests were added and manually tested.
Closes #28777 from HyukjinKwon/SPARK-31915-another.
Authored-by: HyukjinKwon <gu...@apache.org>
Signed-off-by: Bryan Cutler <cu...@gmail.com>
---
python/pyspark/sql/tests/test_pandas_cogrouped_map.py | 18 +++++++++++++++++-
python/pyspark/sql/tests/test_pandas_grouped_map.py | 10 ++++++++++
.../apache/spark/sql/RelationalGroupedDataset.scala | 17 ++++++++++-------
3 files changed, 37 insertions(+), 8 deletions(-)
diff --git a/python/pyspark/sql/tests/test_pandas_cogrouped_map.py b/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
index 3ed9d2a..c1cb30c 100644
--- a/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
+++ b/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
@@ -19,7 +19,7 @@ import unittest
import sys
from pyspark.sql.functions import array, explode, col, lit, udf, sum, pandas_udf, PandasUDFType
-from pyspark.sql.types import DoubleType, StructType, StructField
+from pyspark.sql.types import DoubleType, StructType, StructField, Row
from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, have_pyarrow, \
pandas_requirement_message, pyarrow_requirement_message
from pyspark.testing.utils import QuietTest
@@ -193,6 +193,22 @@ class CogroupedMapInPandasTests(ReusedSQLTestCase):
left.groupby('id').cogroup(right.groupby('id')) \
.applyInPandas(lambda: 1, StructType([StructField("d", DoubleType())]))
+ def test_case_insensitive_grouping_column(self):
+ # SPARK-31915: case-insensitive grouping column should work.
+ df1 = self.spark.createDataFrame([(1, 1)], ("column", "value"))
+
+ row = df1.groupby("ColUmn").cogroup(
+ df1.groupby("COLUMN")
+ ).applyInPandas(lambda r, l: r + l, "column long, value long").first()
+ self.assertEquals(row.asDict(), Row(column=2, value=2).asDict())
+
+ df2 = self.spark.createDataFrame([(1, 1)], ("column", "value"))
+
+ row = df1.groupby("ColUmn").cogroup(
+ df2.groupby("COLUMN")
+ ).applyInPandas(lambda r, l: r + l, "column long, value long").first()
+ self.assertEquals(row.asDict(), Row(column=2, value=2).asDict())
+
@staticmethod
def _test_with_key(left, right, isLeft):
diff --git a/python/pyspark/sql/tests/test_pandas_grouped_map.py b/python/pyspark/sql/tests/test_pandas_grouped_map.py
index ff53a0c..7611943 100644
--- a/python/pyspark/sql/tests/test_pandas_grouped_map.py
+++ b/python/pyspark/sql/tests/test_pandas_grouped_map.py
@@ -587,6 +587,16 @@ class GroupedMapInPandasTests(ReusedSQLTestCase):
# Check that all group and window_range values from udf matched expected
self.assertTrue(all([r[0] for r in result]))
+ def test_case_insensitive_grouping_column(self):
+ # SPARK-31915: case-insensitive grouping column should work.
+ def my_pandas_udf(pdf):
+ return pdf.assign(score=0.5)
+
+ df = self.spark.createDataFrame([[1, 1]], ["column", "score"])
+ row = df.groupby('COLUMN').applyInPandas(
+ my_pandas_udf, schema="column integer, score float").first()
+ self.assertEquals(row.asDict(), Row(column=1, score=0.5).asDict())
+
if __name__ == "__main__":
from pyspark.sql.tests.test_pandas_grouped_map import *
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
index b1ba7d4..c37d8ea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
@@ -546,9 +546,10 @@ class RelationalGroupedDataset protected[sql](
case ne: NamedExpression => ne
case other => Alias(other, other.toString)()
}
- val groupingAttributes = groupingNamedExpressions.map(_.toAttribute)
val child = df.logicalPlan
- val project = Project(groupingNamedExpressions ++ child.output, child)
+ val project = df.sparkSession.sessionState.executePlan(
+ Project(groupingNamedExpressions ++ child.output, child)).analyzed
+ val groupingAttributes = project.output.take(groupingNamedExpressions.length)
val output = expr.dataType.asInstanceOf[StructType].toAttributes
val plan = FlatMapGroupsInPandas(groupingAttributes, expr, output, project)
@@ -583,14 +584,16 @@ class RelationalGroupedDataset protected[sql](
case other => Alias(other, other.toString)()
}
- val leftAttributes = leftGroupingNamedExpressions.map(_.toAttribute)
- val rightAttributes = rightGroupingNamedExpressions.map(_.toAttribute)
-
val leftChild = df.logicalPlan
val rightChild = r.df.logicalPlan
- val left = Project(leftGroupingNamedExpressions ++ leftChild.output, leftChild)
- val right = Project(rightGroupingNamedExpressions ++ rightChild.output, rightChild)
+ val left = df.sparkSession.sessionState.executePlan(
+ Project(leftGroupingNamedExpressions ++ leftChild.output, leftChild)).analyzed
+ val right = r.df.sparkSession.sessionState.executePlan(
+ Project(rightGroupingNamedExpressions ++ rightChild.output, rightChild)).analyzed
+
+ val leftAttributes = left.output.take(leftGroupingNamedExpressions.length)
+ val rightAttributes = right.output.take(rightGroupingNamedExpressions.length)
val output = expr.dataType.asInstanceOf[StructType].toAttributes
val plan = FlatMapCoGroupsInPandas(leftAttributes, rightAttributes, expr, output, left, right)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org