You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by vi...@apache.org on 2022/01/12 19:46:31 UTC

[spark] branch branch-3.2 updated: [SPARK-37855][SQL][3.2] IllegalStateException when transforming an array inside a nested struct

This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new a58b8a8  [SPARK-37855][SQL][3.2] IllegalStateException when transforming an array inside a nested struct
a58b8a8 is described below

commit a58b8a864bb03bcdfe69b157bc7aec39b68556fa
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Wed Jan 12 11:44:04 2022 -0800

    [SPARK-37855][SQL][3.2] IllegalStateException when transforming an array inside a nested struct
    
    This is a backport of #35170 for branch-3.2.
    
    ### What changes were proposed in this pull request?
    
    Skip alias the `ExtractValue` whose children contains `NamedLambdaVariable`.
    
    ### Why are the changes needed?
    
    Since #32773, the `NamedLambdaVariable` can produce the references, however it cause the rule `NestedColumnAliasing` alias the `ExtractValue` which contains `NamedLambdaVariable`. It fails since we can not match a `NamedLambdaVariable` to an actual attribute.
    
    Talk more:
    During `NamedLambdaVariable#replaceWithAliases`, it uses the references of nestedField to match the output attributes of grandchildren. However `NamedLambdaVariable` is created at analyzer as a virtual attribute, and it is not resolved from the output of children. So we can not get any attribute when use the references of `NamedLambdaVariable` to match the grandchildren's output.
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, bug fix
    
    ### How was this patch tested?
    
    Add new test
    
    Closes #35175 from ulysses-you/SPARK-37855-branch-3.2.
    
    Authored-by: ulysses-you <ul...@gmail.com>
    Signed-off-by: Liang-Chi Hsieh <vi...@gmail.com>
---
 .../catalyst/optimizer/NestedColumnAliasing.scala  |  4 +-
 .../org/apache/spark/sql/DataFrameSuite.scala      | 54 ++++++++++++++++++++++
 2 files changed, 57 insertions(+), 1 deletion(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
index 77a25ec..9d63f4e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
@@ -245,11 +245,13 @@ object NestedColumnAliasing {
     val otherRootReferences = new mutable.ArrayBuffer[AttributeReference]()
     exprList.foreach { e =>
       collectRootReferenceAndExtractValue(e).foreach {
-        case ev: ExtractValue =>
+        // we can not alias the attr from lambda variable whose expr id is not available
+        case ev: ExtractValue if ev.find(_.isInstanceOf[NamedLambdaVariable]).isEmpty =>
           if (ev.references.size == 1) {
             nestedFieldReferences.append(ev)
           }
         case ar: AttributeReference => otherRootReferences.append(ar)
+        case _ => // ignore
       }
     }
     val exclusiveAttrSet = AttributeSet(exclusiveAttrs ++ otherRootReferences)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 2fd5993..70ec052 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -2943,6 +2943,60 @@ class DataFrameSuite extends QueryTest
       .withSequenceColumn("default_index").collect().map(_.getLong(0))
     assert(ids.toSet === Range(0, 10).toSet)
   }
+
+  test("SPARK-37855: IllegalStateException when transforming an array inside a nested struct") {
+    def makeInput(): DataFrame = {
+      val innerElement1 = Row(3, 3.12)
+      val innerElement2 = Row(4, 2.1)
+      val innerElement3 = Row(1, 985.2)
+      val innerElement4 = Row(10, 757548.0)
+      val innerElement5 = Row(1223, 0.665)
+
+      val outerElement1 = Row(1, Row(List(innerElement1, innerElement2)))
+      val outerElement2 = Row(2, Row(List(innerElement3)))
+      val outerElement3 = Row(3, Row(List(innerElement4, innerElement5)))
+
+      val data = Seq(
+        Row("row1", List(outerElement1)),
+        Row("row2", List(outerElement2, outerElement3))
+      )
+
+      val schema = new StructType()
+        .add("name", StringType)
+        .add("outer_array", ArrayType(new StructType()
+          .add("id", IntegerType)
+          .add("inner_array_struct", new StructType()
+            .add("inner_array", ArrayType(new StructType()
+              .add("id", IntegerType)
+              .add("value", DoubleType)
+            ))
+          )
+        ))
+
+      spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
+    }
+
+    val df = makeInput().limit(2)
+
+    val res = df.withColumn("extracted", transform(
+      col("outer_array"),
+      c1 => {
+        struct(
+          c1.getField("id").alias("outer_id"),
+          transform(
+            c1.getField("inner_array_struct").getField("inner_array"),
+            c2 => {
+              struct(
+                c2.getField("value").alias("inner_value")
+              )
+            }
+          )
+        )
+      }
+    ))
+
+    assert(res.collect.length == 2)
+  }
 }
 
 case class GroupByKey(a: Int, b: Int)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org