You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by vi...@apache.org on 2022/01/12 19:46:31 UTC
[spark] branch branch-3.2 updated: [SPARK-37855][SQL][3.2] IllegalStateException when transforming an array inside a nested struct
This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new a58b8a8 [SPARK-37855][SQL][3.2] IllegalStateException when transforming an array inside a nested struct
a58b8a8 is described below
commit a58b8a864bb03bcdfe69b157bc7aec39b68556fa
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Wed Jan 12 11:44:04 2022 -0800
[SPARK-37855][SQL][3.2] IllegalStateException when transforming an array inside a nested struct
This is a backport of #35170 for branch-3.2.
### What changes were proposed in this pull request?
Skip alias the `ExtractValue` whose children contains `NamedLambdaVariable`.
### Why are the changes needed?
Since #32773, the `NamedLambdaVariable` can produce the references, however it cause the rule `NestedColumnAliasing` alias the `ExtractValue` which contains `NamedLambdaVariable`. It fails since we can not match a `NamedLambdaVariable` to an actual attribute.
Talk more:
During `NamedLambdaVariable#replaceWithAliases`, it uses the references of nestedField to match the output attributes of grandchildren. However `NamedLambdaVariable` is created at analyzer as a virtual attribute, and it is not resolved from the output of children. So we can not get any attribute when use the references of `NamedLambdaVariable` to match the grandchildren's output.
### Does this PR introduce _any_ user-facing change?
yes, bug fix
### How was this patch tested?
Add new test
Closes #35175 from ulysses-you/SPARK-37855-branch-3.2.
Authored-by: ulysses-you <ul...@gmail.com>
Signed-off-by: Liang-Chi Hsieh <vi...@gmail.com>
---
.../catalyst/optimizer/NestedColumnAliasing.scala | 4 +-
.../org/apache/spark/sql/DataFrameSuite.scala | 54 ++++++++++++++++++++++
2 files changed, 57 insertions(+), 1 deletion(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
index 77a25ec..9d63f4e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
@@ -245,11 +245,13 @@ object NestedColumnAliasing {
val otherRootReferences = new mutable.ArrayBuffer[AttributeReference]()
exprList.foreach { e =>
collectRootReferenceAndExtractValue(e).foreach {
- case ev: ExtractValue =>
+ // we can not alias the attr from lambda variable whose expr id is not available
+ case ev: ExtractValue if ev.find(_.isInstanceOf[NamedLambdaVariable]).isEmpty =>
if (ev.references.size == 1) {
nestedFieldReferences.append(ev)
}
case ar: AttributeReference => otherRootReferences.append(ar)
+ case _ => // ignore
}
}
val exclusiveAttrSet = AttributeSet(exclusiveAttrs ++ otherRootReferences)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 2fd5993..70ec052 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -2943,6 +2943,60 @@ class DataFrameSuite extends QueryTest
.withSequenceColumn("default_index").collect().map(_.getLong(0))
assert(ids.toSet === Range(0, 10).toSet)
}
+
+ test("SPARK-37855: IllegalStateException when transforming an array inside a nested struct") {
+ def makeInput(): DataFrame = {
+ val innerElement1 = Row(3, 3.12)
+ val innerElement2 = Row(4, 2.1)
+ val innerElement3 = Row(1, 985.2)
+ val innerElement4 = Row(10, 757548.0)
+ val innerElement5 = Row(1223, 0.665)
+
+ val outerElement1 = Row(1, Row(List(innerElement1, innerElement2)))
+ val outerElement2 = Row(2, Row(List(innerElement3)))
+ val outerElement3 = Row(3, Row(List(innerElement4, innerElement5)))
+
+ val data = Seq(
+ Row("row1", List(outerElement1)),
+ Row("row2", List(outerElement2, outerElement3))
+ )
+
+ val schema = new StructType()
+ .add("name", StringType)
+ .add("outer_array", ArrayType(new StructType()
+ .add("id", IntegerType)
+ .add("inner_array_struct", new StructType()
+ .add("inner_array", ArrayType(new StructType()
+ .add("id", IntegerType)
+ .add("value", DoubleType)
+ ))
+ )
+ ))
+
+ spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
+ }
+
+ val df = makeInput().limit(2)
+
+ val res = df.withColumn("extracted", transform(
+ col("outer_array"),
+ c1 => {
+ struct(
+ c1.getField("id").alias("outer_id"),
+ transform(
+ c1.getField("inner_array_struct").getField("inner_array"),
+ c2 => {
+ struct(
+ c2.getField("value").alias("inner_value")
+ )
+ }
+ )
+ )
+ }
+ ))
+
+ assert(res.collect.length == 2)
+ }
}
case class GroupByKey(a: Int, b: Int)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org