You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/02/28 03:32:19 UTC

[spark] branch branch-3.0 updated: [SPARK-30955][SQL] Exclude Generate output when aliasing in nested column pruning

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new fc13b27  [SPARK-30955][SQL] Exclude Generate output when aliasing in nested column pruning
fc13b27 is described below

commit fc13b27374c2ebe2c13a1def2368933578d13801
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Fri Feb 28 12:29:46 2020 +0900

    [SPARK-30955][SQL] Exclude Generate output when aliasing in nested column pruning
    
    ### What changes were proposed in this pull request?
    
    When aliasing in nested column pruning in Project on top of Generate, we should exclude Generate outputs.
    
    ### Why are the changes needed?
    
    Right now we would prune nested columns in Project on top of Generate. It is possible that referred nested columns are from Generate's outputs, not from its child. To address that case, we should exclude Generate outputs when aliasing in nested column pruning.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit test.
    
    Closes #27702 from viirya/fix-nested-pruning.
    
    Lead-authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    Co-authored-by: Liang-Chi Hsieh <li...@uber.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit ba032acf9534ea7da75429ebcd4c4912d90cd26c)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../catalyst/optimizer/NestedColumnAliasing.scala  |  8 +++-
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  3 +-
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 43 ++++++++++++++++++----
 3 files changed, 44 insertions(+), 10 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
index 46e07f3..9814bc2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
@@ -104,10 +104,13 @@ object NestedColumnAliasing {
   /**
    * Return two maps in order to replace nested fields to aliases.
    *
+   * If `exclusiveAttrs` is given, any nested field accessors of these attributes
+   * won't be considered in nested fields aliasing.
+   *
    * 1. ExtractValue -> Alias: A new alias is created for each nested field.
    * 2. ExprId -> Seq[Alias]: A reference attribute has multiple aliases pointing it.
    */
-  def getAliasSubMap(exprList: Seq[Expression])
+  def getAliasSubMap(exprList: Seq[Expression], exclusiveAttrs: Seq[Attribute] = Seq.empty)
     : Option[(Map[ExtractValue, Alias], Map[ExprId, Seq[Alias]])] = {
     val (nestedFieldReferences, otherRootReferences) =
       exprList.flatMap(collectRootReferenceAndExtractValue).partition {
@@ -115,8 +118,9 @@ object NestedColumnAliasing {
         case _ => false
       }
 
+    val exclusiveAttrSet = AttributeSet(exclusiveAttrs ++ otherRootReferences)
     val aliasSub = nestedFieldReferences.asInstanceOf[Seq[ExtractValue]]
-      .filter(!_.references.subsetOf(AttributeSet(otherRootReferences)))
+      .filter(!_.references.subsetOf(exclusiveAttrSet))
       .groupBy(_.references.head)
       .flatMap { case (attr, nestedFields: Seq[ExtractValue]) =>
         // Each expression can contain multiple nested fields.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 08acac1..30ad6bfe 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -612,7 +612,8 @@ object ColumnPruning extends Rule[LogicalPlan] {
     // prune unrequired nested fields
     case p @ Project(projectList, g: Generate) if SQLConf.get.nestedPruningOnExpressions &&
         NestedColumnAliasing.canPruneGenerator(g.generator) =>
-      NestedColumnAliasing.getAliasSubMap(projectList ++ g.generator.children).map {
+      val exprsToPrune = projectList ++ g.generator.children
+      NestedColumnAliasing.getAliasSubMap(exprsToPrune, g.qualifiedGeneratorOutput).map {
         case (nestedFieldToAlias, attrToAliases) =>
           val newGenerator = g.generator.transform {
             case f: ExtractValue if nestedFieldToAlias.contains(f) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 78ceeaa..383861a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -28,7 +28,8 @@ import org.apache.spark.{AccumulatorSuite, SparkException}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
 import org.apache.spark.sql.catalyst.expressions.GenericRow
 import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, Partial}
-import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
+import org.apache.spark.sql.catalyst.optimizer.{ConvertToLocalRelation, NestedColumnAliasingSuite}
+import org.apache.spark.sql.catalyst.plans.logical.Project
 import org.apache.spark.sql.catalyst.util.StringUtils
 import org.apache.spark.sql.execution.HiveResult.hiveResultString
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
@@ -3413,15 +3414,43 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
     }
   }
 
-  test("SPARK-30870: Column pruning shouldn't alias a nested column if it means the whole " +
-    "structure") {
-    val df = sql(
+  test("SPARK-30870: Column pruning shouldn't alias a nested column for the whole structure") {
+    withTable("t") {
+      val df = sql(
+        """
+          |SELECT value
+          |FROM VALUES array(named_struct('field', named_struct('a', 1, 'b', 2))) AS (value)
+        """.stripMargin)
+      df.write.format("parquet").saveAsTable("t")
+
+      val df2 = spark.table("t")
+        .limit(100)
+        .select(size(col("value.field")))
+      val projects = df2.queryExecution.optimizedPlan.collect {
+        case p: Project => p
+      }
+      assert(projects.length == 1)
+      val aliases = NestedColumnAliasingSuite.collectGeneratedAliases(projects(0))
+      assert(aliases.length == 0)
+    }
+  }
+
+  test("SPARK-30955: Exclude Generate output when aliasing in nested column pruning") {
+    val df1 = sql(
+      """
+        |SELECT explodedvalue.*
+        |FROM VALUES array(named_struct('nested', named_struct('a', 1, 'b', 2))) AS (value)
+        |LATERAL VIEW explode(value) AS explodedvalue
+      """.stripMargin)
+    checkAnswer(df1, Row(Row(1, 2)) :: Nil)
+
+    val df2 = sql(
       """
-        |SELECT explodedvalue.field
-        |FROM VALUES array(named_struct('field', named_struct('a', 1, 'b', 2))) AS (value)
+        |SELECT explodedvalue.nested.a
+        |FROM VALUES array(named_struct('nested', named_struct('a', 1, 'b', 2))) AS (value)
         |LATERAL VIEW explode(value) AS explodedvalue
       """.stripMargin)
-    checkAnswer(df, Row(Row(1, 2)) :: Nil)
+    checkAnswer(df2, Row(1) :: Nil)
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org