You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by vi...@apache.org on 2021/07/16 07:42:08 UTC
[spark] branch branch-3.1 updated: [SPARK-35972][SQL][3.1] When
replace ExtractValue in NestedColumnAliasing we should use semanticEquals
This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 9741c76 [SPARK-35972][SQL][3.1] When replace ExtractValue in NestedColumnAliasing we should use semanticEquals
9741c76 is described below
commit 9741c769da8a8d33475fb7295059632ccd5725bd
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Fri Jul 16 00:41:13 2021 -0700
[SPARK-35972][SQL][3.1] When replace ExtractValue in NestedColumnAliasing we should use semanticEquals
### What changes were proposed in this pull request?
Ideally, in SQL query, nested columns should result to GetStructField with non-None name. But there are places that can create GetStructField with None name, such as UnresolvedStar.expand, Dataset encoder stuff, etc.
the current `nestedFieldToAlias` cannot catch it up and will cause job failed.
### Why are the changes needed?
Fix bug
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added UT,
Closes #33227 from AngersZhuuuu/SPARK-35972-3.0.
Authored-by: Angerszhuuuu <an...@gmail.com>
Signed-off-by: Liang-Chi Hsieh <vi...@gmail.com>
---
.../catalyst/optimizer/NestedColumnAliasing.scala | 20 ++++++++--------
.../optimizer/NestedColumnAliasingSuite.scala | 27 ++++++++++++++++++++++
2 files changed, 38 insertions(+), 9 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
index b053bf6..028e112 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
@@ -68,7 +68,7 @@ object NestedColumnAliasing {
*/
private def replaceToAliases(
plan: LogicalPlan,
- nestedFieldToAlias: Map[ExtractValue, Alias],
+ nestedFieldToAlias: Map[Expression, Alias],
attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
case Project(projectList, child) =>
Project(
@@ -85,10 +85,10 @@ object NestedColumnAliasing {
*/
def getNewProjectList(
projectList: Seq[NamedExpression],
- nestedFieldToAlias: Map[ExtractValue, Alias]): Seq[NamedExpression] = {
+ nestedFieldToAlias: Map[Expression, Alias]): Seq[NamedExpression] = {
projectList.map(_.transform {
- case f: ExtractValue if nestedFieldToAlias.contains(f) =>
- nestedFieldToAlias(f).toAttribute
+ case f: ExtractValue if nestedFieldToAlias.contains(f.canonicalized) =>
+ nestedFieldToAlias(f.canonicalized).toAttribute
}.asInstanceOf[NamedExpression])
}
@@ -98,13 +98,13 @@ object NestedColumnAliasing {
*/
def replaceWithAliases(
plan: LogicalPlan,
- nestedFieldToAlias: Map[ExtractValue, Alias],
+ nestedFieldToAlias: Map[Expression, Alias],
attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
plan.withNewChildren(plan.children.map { plan =>
Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, Seq(a))), plan)
}).transformExpressions {
- case f: ExtractValue if nestedFieldToAlias.contains(f) =>
- nestedFieldToAlias(f).toAttribute
+ case f: ExtractValue if nestedFieldToAlias.contains(f.canonicalized) =>
+ nestedFieldToAlias(f.canonicalized).toAttribute
}
}
@@ -158,7 +158,7 @@ object NestedColumnAliasing {
* 2. ExprId -> Seq[Alias]: A reference attribute has multiple aliases pointing it.
*/
def getAliasSubMap(exprList: Seq[Expression], exclusiveAttrs: Seq[Attribute] = Seq.empty)
- : Option[(Map[ExtractValue, Alias], Map[ExprId, Seq[Alias]])] = {
+ : Option[(Map[Expression, Alias], Map[ExprId, Seq[Alias]])] = {
val (nestedFieldReferences, otherRootReferences) =
exprList.flatMap(collectRootReferenceAndExtractValue).partition {
case _: ExtractValue => true
@@ -208,7 +208,9 @@ object NestedColumnAliasing {
if (aliasSub.isEmpty) {
None
} else {
- Some((aliasSub.values.flatten.toMap, aliasSub.map(x => (x._1, x._2.map(_._2)))))
+ Some((aliasSub.values.flatten.map {
+ case (field, alias) => field.canonicalized -> alias
+ }.toMap, aliasSub.map(x => (x._1, x._2.map(_._2)))))
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
index c83ab37..7a667cc 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
@@ -684,6 +684,33 @@ class NestedColumnAliasingSuite extends SchemaPruningTest {
).analyze
comparePlans(optimized2, expected2)
}
+
+ test("SPARK-35972: NestedColumnAliasing should consider semantic equality") {
+ val dataType = new StructType()
+ .add(StructField("itemid", StringType))
+ .add(StructField("search_params", StructType(Seq(
+ StructField("col1", StringType),
+ StructField("col2", StringType)
+ ))))
+ val relation = LocalRelation('struct_data.struct(dataType))
+ val plan = relation
+ .repartition(100)
+ .select(
+ GetStructField('struct_data, 1, None).as("value"),
+ $"struct_data.search_params.col1".as("col1"),
+ $"struct_data.search_params.col2".as("col2")).analyze
+ val query = Optimize.execute(plan)
+ val alias = collectGeneratedAliases(query)
+
+ val optimized = relation
+ .select(GetStructField('struct_data, 1, None).as(alias(0)))
+ .repartition(100)
+ .select(
+ $"${alias(0)}".as("value"),
+ $"${alias(0)}.col1".as("col1"),
+ $"${alias(0)}.col2".as("col2")).analyze
+ comparePlans(optimized, query)
+ }
}
object NestedColumnAliasingSuite {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org