You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/02/02 07:26:24 UTC

[spark] branch branch-3.1 updated: [SPARK-34319][SQL] Resolve duplicate attributes for FlatMapCoGroupsInPandas/MapInPandas

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new a12e29b  [SPARK-34319][SQL] Resolve duplicate attributes for FlatMapCoGroupsInPandas/MapInPandas
a12e29b is described below

commit a12e29bf96c5b7533d418446102cb2037c623c1d
Author: yi.wu <yi...@databricks.com>
AuthorDate: Tue Feb 2 16:25:32 2021 +0900

    [SPARK-34319][SQL] Resolve duplicate attributes for FlatMapCoGroupsInPandas/MapInPandas
    
    ### What changes were proposed in this pull request?
    
    Resolve duplicate attributes for `FlatMapCoGroupsInPandas`.
    
    ### Why are the changes needed?
    
    When performing self-join on top of `FlatMapCoGroupsInPandas`, analysis can fail because of conflicting attributes. For example,
    
    ```scala
    df = spark.createDataFrame([(1, 1)], ("column", "value"))
    row = df.groupby("ColUmn").cogroup(
        df.groupby("COLUMN")
    ).applyInPandas(lambda r, l: r + l, "column long, value long")
    row.join(row).show()
    ```
    error:
    
    ```scala
    ...
    Conflicting attributes: column#163321L,value#163322L
    ;;
    ’Join Inner
    :- FlatMapCoGroupsInPandas [ColUmn#163312L], [COLUMN#163312L], <lambda>(column#163312L, value#163313L, column#163312L, value#163313L), [column#163321L, value#163322L]
    :  :- Project [ColUmn#163312L, column#163312L, value#163313L]
    :  :  +- LogicalRDD [column#163312L, value#163313L], false
    :  +- Project [COLUMN#163312L, column#163312L, value#163313L]
    :     +- LogicalRDD [column#163312L, value#163313L], false
    +- FlatMapCoGroupsInPandas [ColUmn#163312L], [COLUMN#163312L], <lambda>(column#163312L, value#163313L, column#163312L, value#163313L), [column#163321L, value#163322L]
       :- Project [ColUmn#163312L, column#163312L, value#163313L]
       :  +- LogicalRDD [column#163312L, value#163313L], false
       +- Project [COLUMN#163312L, column#163312L, value#163313L]
          +- LogicalRDD [column#163312L, value#163313L], false
    ...
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, the query like the above example won't fail.
    
    ### How was this patch tested?
    
    Adde unit tests.
    
    Closes #31429 from Ngone51/fix-conflcting-attrs-of-FlatMapCoGroupsInPandas.
    
    Lead-authored-by: yi.wu <yi...@databricks.com>
    Co-authored-by: wuyi <yi...@databricks.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit e9362c2571f4a329218ff466fce79eef45e8f992)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../pyspark/sql/tests/test_pandas_cogrouped_map.py | 12 +++++++
 python/pyspark/sql/tests/test_pandas_map.py        |  8 +++++
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  8 +++++
 .../sql/catalyst/analysis/AnalysisSuite.scala      | 42 ++++++++++++++++++++++
 4 files changed, 70 insertions(+)

diff --git a/python/pyspark/sql/tests/test_pandas_cogrouped_map.py b/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
index 3c016e0..94a12bf 100644
--- a/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
+++ b/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
@@ -203,6 +203,18 @@ class CogroupedMapInPandasTests(ReusedSQLTestCase):
         ).applyInPandas(lambda r, l: r + l, "column long, value long").first()
         self.assertEqual(row.asDict(), Row(column=2, value=2).asDict())
 
+    def test_self_join(self):
+        # SPARK-34319: self-join with FlatMapCoGroupsInPandas
+        df = self.spark.createDataFrame([(1, 1)], ("column", "value"))
+
+        row = df.groupby("ColUmn").cogroup(
+            df.groupby("COLUMN")
+        ).applyInPandas(lambda r, l: r + l, "column long, value long")
+
+        row = row.join(row).first()
+
+        self.assertEqual(row.asDict(), Row(column=2, value=2).asDict())
+
     @staticmethod
     def _test_with_key(left, right, isLeft):
 
diff --git a/python/pyspark/sql/tests/test_pandas_map.py b/python/pyspark/sql/tests/test_pandas_map.py
index d53face..e8f92de 100644
--- a/python/pyspark/sql/tests/test_pandas_map.py
+++ b/python/pyspark/sql/tests/test_pandas_map.py
@@ -112,6 +112,14 @@ class MapInPandasTests(ReusedSQLTestCase):
         expected = df.collect()
         self.assertEqual(actual, expected)
 
+    def test_self_join(self):
+        # SPARK-34319: self-join with MapInPandas
+        df1 = self.spark.range(10)
+        df2 = df1.mapInPandas(lambda iter: iter, 'id long')
+        actual = df2.join(df2).collect()
+        expected = df1.join(df1).collect()
+        self.assertEqual(sorted(actual), sorted(expected))
+
 
 if __name__ == "__main__":
     from pyspark.sql.tests.test_pandas_map import *  # noqa: F401
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 6fd6901..388b2f0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1365,6 +1365,14 @@ class Analyzer(override val catalogManager: CatalogManager)
             if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
           Seq((oldVersion, oldVersion.copy(output = output.map(_.newInstance()))))
 
+        case oldVersion @ FlatMapCoGroupsInPandas(_, _, _, output, _, _)
+            if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
+          Seq((oldVersion, oldVersion.copy(output = output.map(_.newInstance()))))
+
+        case oldVersion @ MapInPandas(_, output, _)
+            if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
+          Seq((oldVersion, oldVersion.copy(output = output.map(_.newInstance()))))
+
         case oldVersion: Generate
             if oldVersion.producedAttributes.intersect(conflictingAttributes).nonEmpty =>
           val newOutput = oldVersion.generatorOutput.map(_.newInstance())
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 468b8c0..7a6103e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -631,6 +631,48 @@ class AnalysisSuite extends AnalysisTest with Matchers {
       Project(Seq(UnresolvedAttribute("temp0.a"), UnresolvedAttribute("temp1.a")), join))
   }
 
+  test("SPARK-34319: analysis fails on self-join with FlatMapCoGroupsInPandas") {
+    val pythonUdf = PythonUDF("pyUDF", null,
+      StructType(Seq(StructField("a", LongType))),
+      Seq.empty,
+      PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF,
+      true)
+    val output = pythonUdf.dataType.asInstanceOf[StructType].toAttributes
+    val project1 = Project(Seq(UnresolvedAttribute("a")), testRelation)
+    val project2 = Project(Seq(UnresolvedAttribute("a")), testRelation2)
+    val flatMapGroupsInPandas = FlatMapCoGroupsInPandas(
+      Seq(UnresolvedAttribute("a")),
+      Seq(UnresolvedAttribute("a")),
+      pythonUdf,
+      output,
+      project1,
+      project2)
+    val left = SubqueryAlias("temp0", flatMapGroupsInPandas)
+    val right = SubqueryAlias("temp1", flatMapGroupsInPandas)
+    val join = Join(left, right, Inner, None, JoinHint.NONE)
+    assertAnalysisSuccess(
+      Project(Seq(UnresolvedAttribute("temp0.a"), UnresolvedAttribute("temp1.a")), join))
+  }
+
+  test("SPARK-34319: analysis fails on self-join with MapInPandas") {
+    val pythonUdf = PythonUDF("pyUDF", null,
+      StructType(Seq(StructField("a", LongType))),
+      Seq.empty,
+      PythonEvalType.SQL_MAP_PANDAS_ITER_UDF,
+      true)
+    val output = pythonUdf.dataType.asInstanceOf[StructType].toAttributes
+    val project = Project(Seq(UnresolvedAttribute("a")), testRelation)
+    val mapInPandas = MapInPandas(
+      pythonUdf,
+      output,
+      project)
+    val left = SubqueryAlias("temp0", mapInPandas)
+    val right = SubqueryAlias("temp1", mapInPandas)
+    val join = Join(left, right, Inner, None, JoinHint.NONE)
+    assertAnalysisSuccess(
+      Project(Seq(UnresolvedAttribute("temp0.a"), UnresolvedAttribute("temp1.a")), join))
+  }
+
   test("SPARK-24488 Generator with multiple aliases") {
     assertAnalysisSuccess(
       listRelation.select(Explode($"list").as("first_alias").as("second_alias")))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org