You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by vi...@apache.org on 2021/07/16 07:42:08 UTC

[spark] branch branch-3.1 updated: [SPARK-35972][SQL][3.1] When replace ExtractValue in NestedColumnAliasing we should use semanticEquals

This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 9741c76  [SPARK-35972][SQL][3.1] When replace ExtractValue in NestedColumnAliasing we should use semanticEquals
9741c76 is described below

commit 9741c769da8a8d33475fb7295059632ccd5725bd
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Fri Jul 16 00:41:13 2021 -0700

    [SPARK-35972][SQL][3.1] When replace ExtractValue in NestedColumnAliasing we should use semanticEquals
    
    ### What changes were proposed in this pull request?
    Ideally, in SQL query, nested columns should result to GetStructField with non-None name. But there are places that can create GetStructField with None name, such as UnresolvedStar.expand, Dataset encoder stuff, etc.
    the current `nestedFieldToAlias` cannot catch it up and will cause job failed.
    
    ### Why are the changes needed?
    Fix bug
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added UT,
    
    Closes #33227 from AngersZhuuuu/SPARK-35972-3.0.
    
    Authored-by: Angerszhuuuu <an...@gmail.com>
    Signed-off-by: Liang-Chi Hsieh <vi...@gmail.com>
---
 .../catalyst/optimizer/NestedColumnAliasing.scala  | 20 ++++++++--------
 .../optimizer/NestedColumnAliasingSuite.scala      | 27 ++++++++++++++++++++++
 2 files changed, 38 insertions(+), 9 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
index b053bf6..028e112 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
@@ -68,7 +68,7 @@ object NestedColumnAliasing {
    */
   private def replaceToAliases(
       plan: LogicalPlan,
-      nestedFieldToAlias: Map[ExtractValue, Alias],
+      nestedFieldToAlias: Map[Expression, Alias],
       attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
     case Project(projectList, child) =>
       Project(
@@ -85,10 +85,10 @@ object NestedColumnAliasing {
    */
   def getNewProjectList(
       projectList: Seq[NamedExpression],
-      nestedFieldToAlias: Map[ExtractValue, Alias]): Seq[NamedExpression] = {
+      nestedFieldToAlias: Map[Expression, Alias]): Seq[NamedExpression] = {
     projectList.map(_.transform {
-      case f: ExtractValue if nestedFieldToAlias.contains(f) =>
-        nestedFieldToAlias(f).toAttribute
+      case f: ExtractValue if nestedFieldToAlias.contains(f.canonicalized) =>
+        nestedFieldToAlias(f.canonicalized).toAttribute
     }.asInstanceOf[NamedExpression])
   }
 
@@ -98,13 +98,13 @@ object NestedColumnAliasing {
    */
   def replaceWithAliases(
       plan: LogicalPlan,
-      nestedFieldToAlias: Map[ExtractValue, Alias],
+      nestedFieldToAlias: Map[Expression, Alias],
       attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
     plan.withNewChildren(plan.children.map { plan =>
       Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, Seq(a))), plan)
     }).transformExpressions {
-      case f: ExtractValue if nestedFieldToAlias.contains(f) =>
-        nestedFieldToAlias(f).toAttribute
+      case f: ExtractValue if nestedFieldToAlias.contains(f.canonicalized) =>
+        nestedFieldToAlias(f.canonicalized).toAttribute
     }
   }
 
@@ -158,7 +158,7 @@ object NestedColumnAliasing {
    * 2. ExprId -> Seq[Alias]: A reference attribute has multiple aliases pointing it.
    */
   def getAliasSubMap(exprList: Seq[Expression], exclusiveAttrs: Seq[Attribute] = Seq.empty)
-    : Option[(Map[ExtractValue, Alias], Map[ExprId, Seq[Alias]])] = {
+    : Option[(Map[Expression, Alias], Map[ExprId, Seq[Alias]])] = {
     val (nestedFieldReferences, otherRootReferences) =
       exprList.flatMap(collectRootReferenceAndExtractValue).partition {
         case _: ExtractValue => true
@@ -208,7 +208,9 @@ object NestedColumnAliasing {
     if (aliasSub.isEmpty) {
       None
     } else {
-      Some((aliasSub.values.flatten.toMap, aliasSub.map(x => (x._1, x._2.map(_._2)))))
+      Some((aliasSub.values.flatten.map {
+        case (field, alias) => field.canonicalized -> alias
+      }.toMap, aliasSub.map(x => (x._1, x._2.map(_._2)))))
     }
   }
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
index c83ab37..7a667cc 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
@@ -684,6 +684,33 @@ class NestedColumnAliasingSuite extends SchemaPruningTest {
     ).analyze
     comparePlans(optimized2, expected2)
   }
+
+  test("SPARK-35972: NestedColumnAliasing should consider semantic equality") {
+    val dataType = new StructType()
+      .add(StructField("itemid", StringType))
+      .add(StructField("search_params", StructType(Seq(
+        StructField("col1", StringType),
+        StructField("col2", StringType)
+      ))))
+    val relation = LocalRelation('struct_data.struct(dataType))
+    val plan = relation
+      .repartition(100)
+      .select(
+        GetStructField('struct_data, 1, None).as("value"),
+        $"struct_data.search_params.col1".as("col1"),
+        $"struct_data.search_params.col2".as("col2")).analyze
+    val query = Optimize.execute(plan)
+    val alias = collectGeneratedAliases(query)
+
+    val optimized = relation
+      .select(GetStructField('struct_data, 1, None).as(alias(0)))
+      .repartition(100)
+      .select(
+        $"${alias(0)}".as("value"),
+        $"${alias(0)}.col1".as("col1"),
+        $"${alias(0)}.col2".as("col2")).analyze
+    comparePlans(optimized, query)
+  }
 }
 
 object NestedColumnAliasingSuite {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org