You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/04/06 08:31:04 UTC

[spark] branch branch-3.1 updated: [SPARK-34923][SQL] Metadata output should be empty for more plans

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 5dc6b10  [SPARK-34923][SQL] Metadata output should be empty for more plans
5dc6b10 is described below

commit 5dc6b10252c4831f7e19710fe26f9f65b5421427
Author: Karen Feng <ka...@databricks.com>
AuthorDate: Tue Apr 6 16:04:30 2021 +0800

    [SPARK-34923][SQL] Metadata output should be empty for more plans
    
    Changes the metadata propagation framework.
    
    Previously, most `LogicalPlan`'s propagated their `children`'s `metadataOutput`. This did not make sense in cases where the `LogicalPlan` did not even propagate their `children`'s `output`.
    
    I set the metadata output for plans that do not propagate their `children`'s `output` to be `Nil`. Notably, `Project` and `View` no longer have metadata output.
    
    Previously, `SELECT m from (SELECT a from tb)` would output `m` if it were metadata. This did not make sense.
    
    Yes. Now, `SELECT m from (SELECT a from tb)` will encounter an `AnalysisException`.
    
    Added unit tests. I did not cover all cases, as they are fairly extensive. However, the new tests cover major cases (and an existing test already covers Join).
    
    Closes #32017 from karenfeng/spark-34923.
    
    Authored-by: Karen Feng <ka...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 3b634f66c3e4a942178a1e322ae65ce82779625d)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/catalyst/plans/logical/LogicalPlan.scala   |   5 +-
 .../plans/logical/basicLogicalOperators.scala      |  25 +++++
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 103 +++++++++++++++++++++
 3 files changed, 132 insertions(+), 1 deletion(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index bdf37d0..3ea79b3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -33,7 +33,10 @@ abstract class LogicalPlan
   with QueryPlanConstraints
   with Logging {
 
-  /** Metadata fields that can be projected from this node */
+  /**
+   * Metadata fields that can be projected from this node.
+   * Should be overridden if the plan does not propagate its children's output.
+   */
   def metadataOutput: Seq[Attribute] = children.flatMap(_.metadataOutput)
 
   /** Returns true if this subtree has data from a streaming data source. */
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 338c1db..224e7bc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -59,6 +59,7 @@ object Subquery {
 case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
     extends OrderPreservingUnaryNode {
   override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+  override def metadataOutput: Seq[Attribute] = Nil
   override def maxRows: Option[Long] = child.maxRows
 
   override lazy val resolved: Boolean = {
@@ -185,6 +186,8 @@ case class Intersect(
       leftAttr.withNullability(leftAttr.nullable && rightAttr.nullable)
     }
 
+  override def metadataOutput: Seq[Attribute] = Nil
+
   override protected lazy val validConstraints: ExpressionSet =
     leftConstraints.union(rightConstraints)
 
@@ -205,6 +208,8 @@ case class Except(
   /** We don't use right.output because those rows get excluded from the set. */
   override def output: Seq[Attribute] = left.output
 
+  override def metadataOutput: Seq[Attribute] = Nil
+
   override protected lazy val validConstraints: ExpressionSet = leftConstraints
 }
 
@@ -268,6 +273,8 @@ case class Union(
     }
   }
 
+  override def metadataOutput: Seq[Attribute] = Nil
+
   override lazy val resolved: Boolean = {
     // allChildrenCompatible needs to be evaluated after childrenResolved
     def allChildrenCompatible: Boolean =
@@ -343,6 +350,17 @@ case class Join(
     }
   }
 
+  override def metadataOutput: Seq[Attribute] = {
+    joinType match {
+      case ExistenceJoin(_) =>
+        left.metadataOutput
+      case LeftExistence(_) =>
+        left.metadataOutput
+      case _ =>
+        children.flatMap(_.metadataOutput)
+    }
+  }
+
   override protected lazy val validConstraints: ExpressionSet = {
     joinType match {
       case _: InnerLike if condition.isDefined =>
@@ -419,6 +437,7 @@ case class InsertIntoDir(
   extends UnaryNode {
 
   override def output: Seq[Attribute] = Seq.empty
+  override def metadataOutput: Seq[Attribute] = Nil
   override lazy val resolved: Boolean = false
 }
 
@@ -449,6 +468,8 @@ case class View(
 
   override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance()))
 
+  override def metadataOutput: Seq[Attribute] = Nil
+
   override def simpleString(maxFields: Int): String = {
     s"View (${desc.identifier}, ${output.mkString("[", ",", "]")})"
   }
@@ -616,6 +637,7 @@ case class Aggregate(
   }
 
   override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
+  override def metadataOutput: Seq[Attribute] = Nil
   override def maxRows: Option[Long] = {
     if (groupingExpressions.isEmpty) {
       Some(1L)
@@ -751,6 +773,8 @@ case class Expand(
   override lazy val references: AttributeSet =
     AttributeSet(projections.flatten.flatMap(_.references))
 
+  override def metadataOutput: Seq[Attribute] = Nil
+
   override def producedAttributes: AttributeSet = AttributeSet(output diff child.output)
 
   // This operator can reuse attributes (for example making them null when doing a roll up) so
@@ -813,6 +837,7 @@ case class Pivot(
     }
     groupByExprsOpt.getOrElse(Seq.empty).map(_.toAttribute) ++ pivotAgg
   }
+  override def metadataOutput: Seq[Attribute] = Nil
 }
 
 /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 3c49186..eccd6c6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -2794,6 +2794,109 @@ class DataSourceV2SQLSuite
     }.getMessage
     assert(errMsg.contains(expectedError))
   }
+
+  test("SPARK-34923: do not propagate metadata columns through Project") {
+    val t1 = s"${catalogAndNamespace}table"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
+        "PARTITIONED BY (bucket(4, id), id)")
+      sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+
+      assertThrows[AnalysisException] {
+        sql(s"SELECT index, _partition from (SELECT id, data FROM $t1)")
+      }
+      assertThrows[AnalysisException] {
+        spark.table(t1).select("id", "data").select("index", "_partition")
+      }
+    }
+  }
+
+  test("SPARK-34923: do not propagate metadata columns through View") {
+    val t1 = s"${catalogAndNamespace}table"
+    val view = "view"
+
+    withTable(t1) {
+      withTempView(view) {
+        sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
+          "PARTITIONED BY (bucket(4, id), id)")
+        sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+        sql(s"CACHE TABLE $view AS SELECT * FROM $t1")
+        assertThrows[AnalysisException] {
+          sql(s"SELECT index, _partition FROM $view")
+        }
+      }
+    }
+  }
+
+  test("SPARK-34923: propagate metadata columns through Filter") {
+    val t1 = s"${catalogAndNamespace}table"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
+        "PARTITIONED BY (bucket(4, id), id)")
+      sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+
+      val sqlQuery = spark.sql(s"SELECT id, data, index, _partition FROM $t1 WHERE id > 1")
+      val dfQuery = spark.table(t1).where("id > 1").select("id", "data", "index", "_partition")
+
+      Seq(sqlQuery, dfQuery).foreach { query =>
+        checkAnswer(query, Seq(Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3")))
+      }
+    }
+  }
+
+  test("SPARK-34923: propagate metadata columns through Sort") {
+    val t1 = s"${catalogAndNamespace}table"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
+        "PARTITIONED BY (bucket(4, id), id)")
+      sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+
+      val sqlQuery = spark.sql(s"SELECT id, data, index, _partition FROM $t1 ORDER BY id")
+      val dfQuery = spark.table(t1).orderBy("id").select("id", "data", "index", "_partition")
+
+      Seq(sqlQuery, dfQuery).foreach { query =>
+        checkAnswer(query, Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3")))
+      }
+    }
+  }
+
+  test("SPARK-34923: propagate metadata columns through RepartitionBy") {
+    val t1 = s"${catalogAndNamespace}table"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
+        "PARTITIONED BY (bucket(4, id), id)")
+      sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+
+      val sqlQuery = spark.sql(
+        s"SELECT /*+ REPARTITION_BY_RANGE(3, id) */ id, data, index, _partition FROM $t1")
+      val tbl = spark.table(t1)
+      val dfQuery = tbl.repartitionByRange(3, tbl.col("id"))
+        .select("id", "data", "index", "_partition")
+
+      Seq(sqlQuery, dfQuery).foreach { query =>
+        checkAnswer(query, Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3")))
+      }
+    }
+  }
+
+  test("SPARK-34923: propagate metadata columns through SubqueryAlias") {
+    val t1 = s"${catalogAndNamespace}table"
+    val sbq = "sbq"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
+        "PARTITIONED BY (bucket(4, id), id)")
+      sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+
+      val sqlQuery = spark.sql(
+        s"SELECT $sbq.id, $sbq.data, $sbq.index, $sbq._partition FROM $t1 as $sbq")
+      val dfQuery = spark.table(t1).as(sbq).select(
+        s"$sbq.id", s"$sbq.data", s"$sbq.index", s"$sbq._partition")
+
+      Seq(sqlQuery, dfQuery).foreach { query =>
+        checkAnswer(query, Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3")))
+      }
+    }
+  }
 }
 
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org