You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/02/02 07:26:24 UTC
[spark] branch branch-3.1 updated: [SPARK-34319][SQL] Resolve
duplicate attributes for FlatMapCoGroupsInPandas/MapInPandas
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new a12e29b [SPARK-34319][SQL] Resolve duplicate attributes for FlatMapCoGroupsInPandas/MapInPandas
a12e29b is described below
commit a12e29bf96c5b7533d418446102cb2037c623c1d
Author: yi.wu <yi...@databricks.com>
AuthorDate: Tue Feb 2 16:25:32 2021 +0900
[SPARK-34319][SQL] Resolve duplicate attributes for FlatMapCoGroupsInPandas/MapInPandas
### What changes were proposed in this pull request?
Resolve duplicate attributes for `FlatMapCoGroupsInPandas`.
### Why are the changes needed?
When performing self-join on top of `FlatMapCoGroupsInPandas`, analysis can fail because of conflicting attributes. For example,
```scala
df = spark.createDataFrame([(1, 1)], ("column", "value"))
row = df.groupby("ColUmn").cogroup(
df.groupby("COLUMN")
).applyInPandas(lambda r, l: r + l, "column long, value long")
row.join(row).show()
```
error:
```scala
...
Conflicting attributes: column#163321L,value#163322L
;;
’Join Inner
:- FlatMapCoGroupsInPandas [ColUmn#163312L], [COLUMN#163312L], <lambda>(column#163312L, value#163313L, column#163312L, value#163313L), [column#163321L, value#163322L]
: :- Project [ColUmn#163312L, column#163312L, value#163313L]
: : +- LogicalRDD [column#163312L, value#163313L], false
: +- Project [COLUMN#163312L, column#163312L, value#163313L]
: +- LogicalRDD [column#163312L, value#163313L], false
+- FlatMapCoGroupsInPandas [ColUmn#163312L], [COLUMN#163312L], <lambda>(column#163312L, value#163313L, column#163312L, value#163313L), [column#163321L, value#163322L]
:- Project [ColUmn#163312L, column#163312L, value#163313L]
: +- LogicalRDD [column#163312L, value#163313L], false
+- Project [COLUMN#163312L, column#163312L, value#163313L]
+- LogicalRDD [column#163312L, value#163313L], false
...
```
### Does this PR introduce _any_ user-facing change?
yes, the query like the above example won't fail.
### How was this patch tested?
Adde unit tests.
Closes #31429 from Ngone51/fix-conflcting-attrs-of-FlatMapCoGroupsInPandas.
Lead-authored-by: yi.wu <yi...@databricks.com>
Co-authored-by: wuyi <yi...@databricks.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
(cherry picked from commit e9362c2571f4a329218ff466fce79eef45e8f992)
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../pyspark/sql/tests/test_pandas_cogrouped_map.py | 12 +++++++
python/pyspark/sql/tests/test_pandas_map.py | 8 +++++
.../spark/sql/catalyst/analysis/Analyzer.scala | 8 +++++
.../sql/catalyst/analysis/AnalysisSuite.scala | 42 ++++++++++++++++++++++
4 files changed, 70 insertions(+)
diff --git a/python/pyspark/sql/tests/test_pandas_cogrouped_map.py b/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
index 3c016e0..94a12bf 100644
--- a/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
+++ b/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
@@ -203,6 +203,18 @@ class CogroupedMapInPandasTests(ReusedSQLTestCase):
).applyInPandas(lambda r, l: r + l, "column long, value long").first()
self.assertEqual(row.asDict(), Row(column=2, value=2).asDict())
+ def test_self_join(self):
+ # SPARK-34319: self-join with FlatMapCoGroupsInPandas
+ df = self.spark.createDataFrame([(1, 1)], ("column", "value"))
+
+ row = df.groupby("ColUmn").cogroup(
+ df.groupby("COLUMN")
+ ).applyInPandas(lambda r, l: r + l, "column long, value long")
+
+ row = row.join(row).first()
+
+ self.assertEqual(row.asDict(), Row(column=2, value=2).asDict())
+
@staticmethod
def _test_with_key(left, right, isLeft):
diff --git a/python/pyspark/sql/tests/test_pandas_map.py b/python/pyspark/sql/tests/test_pandas_map.py
index d53face..e8f92de 100644
--- a/python/pyspark/sql/tests/test_pandas_map.py
+++ b/python/pyspark/sql/tests/test_pandas_map.py
@@ -112,6 +112,14 @@ class MapInPandasTests(ReusedSQLTestCase):
expected = df.collect()
self.assertEqual(actual, expected)
+ def test_self_join(self):
+ # SPARK-34319: self-join with MapInPandas
+ df1 = self.spark.range(10)
+ df2 = df1.mapInPandas(lambda iter: iter, 'id long')
+ actual = df2.join(df2).collect()
+ expected = df1.join(df1).collect()
+ self.assertEqual(sorted(actual), sorted(expected))
+
if __name__ == "__main__":
from pyspark.sql.tests.test_pandas_map import * # noqa: F401
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 6fd6901..388b2f0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1365,6 +1365,14 @@ class Analyzer(override val catalogManager: CatalogManager)
if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
Seq((oldVersion, oldVersion.copy(output = output.map(_.newInstance()))))
+ case oldVersion @ FlatMapCoGroupsInPandas(_, _, _, output, _, _)
+ if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
+ Seq((oldVersion, oldVersion.copy(output = output.map(_.newInstance()))))
+
+ case oldVersion @ MapInPandas(_, output, _)
+ if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
+ Seq((oldVersion, oldVersion.copy(output = output.map(_.newInstance()))))
+
case oldVersion: Generate
if oldVersion.producedAttributes.intersect(conflictingAttributes).nonEmpty =>
val newOutput = oldVersion.generatorOutput.map(_.newInstance())
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 468b8c0..7a6103e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -631,6 +631,48 @@ class AnalysisSuite extends AnalysisTest with Matchers {
Project(Seq(UnresolvedAttribute("temp0.a"), UnresolvedAttribute("temp1.a")), join))
}
+ test("SPARK-34319: analysis fails on self-join with FlatMapCoGroupsInPandas") {
+ val pythonUdf = PythonUDF("pyUDF", null,
+ StructType(Seq(StructField("a", LongType))),
+ Seq.empty,
+ PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF,
+ true)
+ val output = pythonUdf.dataType.asInstanceOf[StructType].toAttributes
+ val project1 = Project(Seq(UnresolvedAttribute("a")), testRelation)
+ val project2 = Project(Seq(UnresolvedAttribute("a")), testRelation2)
+ val flatMapGroupsInPandas = FlatMapCoGroupsInPandas(
+ Seq(UnresolvedAttribute("a")),
+ Seq(UnresolvedAttribute("a")),
+ pythonUdf,
+ output,
+ project1,
+ project2)
+ val left = SubqueryAlias("temp0", flatMapGroupsInPandas)
+ val right = SubqueryAlias("temp1", flatMapGroupsInPandas)
+ val join = Join(left, right, Inner, None, JoinHint.NONE)
+ assertAnalysisSuccess(
+ Project(Seq(UnresolvedAttribute("temp0.a"), UnresolvedAttribute("temp1.a")), join))
+ }
+
+ test("SPARK-34319: analysis fails on self-join with MapInPandas") {
+ val pythonUdf = PythonUDF("pyUDF", null,
+ StructType(Seq(StructField("a", LongType))),
+ Seq.empty,
+ PythonEvalType.SQL_MAP_PANDAS_ITER_UDF,
+ true)
+ val output = pythonUdf.dataType.asInstanceOf[StructType].toAttributes
+ val project = Project(Seq(UnresolvedAttribute("a")), testRelation)
+ val mapInPandas = MapInPandas(
+ pythonUdf,
+ output,
+ project)
+ val left = SubqueryAlias("temp0", mapInPandas)
+ val right = SubqueryAlias("temp1", mapInPandas)
+ val join = Join(left, right, Inner, None, JoinHint.NONE)
+ assertAnalysisSuccess(
+ Project(Seq(UnresolvedAttribute("temp0.a"), UnresolvedAttribute("temp1.a")), join))
+ }
+
test("SPARK-24488 Generator with multiple aliases") {
assertAnalysisSuccess(
listRelation.select(Explode($"list").as("first_alias").as("second_alias")))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org