You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/04/06 08:31:04 UTC
[spark] branch branch-3.1 updated: [SPARK-34923][SQL] Metadata
output should be empty for more plans
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 5dc6b10 [SPARK-34923][SQL] Metadata output should be empty for more plans
5dc6b10 is described below
commit 5dc6b10252c4831f7e19710fe26f9f65b5421427
Author: Karen Feng <ka...@databricks.com>
AuthorDate: Tue Apr 6 16:04:30 2021 +0800
[SPARK-34923][SQL] Metadata output should be empty for more plans
Changes the metadata propagation framework.
Previously, most `LogicalPlan`'s propagated their `children`'s `metadataOutput`. This did not make sense in cases where the `LogicalPlan` did not even propagate their `children`'s `output`.
I set the metadata output for plans that do not propagate their `children`'s `output` to be `Nil`. Notably, `Project` and `View` no longer have metadata output.
Previously, `SELECT m from (SELECT a from tb)` would output `m` if it were metadata. This did not make sense.
Yes. Now, `SELECT m from (SELECT a from tb)` will encounter an `AnalysisException`.
Added unit tests. I did not cover all cases, as they are fairly extensive. However, the new tests cover major cases (and an existing test already covers Join).
Closes #32017 from karenfeng/spark-34923.
Authored-by: Karen Feng <ka...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 3b634f66c3e4a942178a1e322ae65ce82779625d)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../sql/catalyst/plans/logical/LogicalPlan.scala | 5 +-
.../plans/logical/basicLogicalOperators.scala | 25 +++++
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 103 +++++++++++++++++++++
3 files changed, 132 insertions(+), 1 deletion(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index bdf37d0..3ea79b3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -33,7 +33,10 @@ abstract class LogicalPlan
with QueryPlanConstraints
with Logging {
- /** Metadata fields that can be projected from this node */
+ /**
+ * Metadata fields that can be projected from this node.
+ * Should be overridden if the plan does not propagate its children's output.
+ */
def metadataOutput: Seq[Attribute] = children.flatMap(_.metadataOutput)
/** Returns true if this subtree has data from a streaming data source. */
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 338c1db..224e7bc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -59,6 +59,7 @@ object Subquery {
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
extends OrderPreservingUnaryNode {
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+ override def metadataOutput: Seq[Attribute] = Nil
override def maxRows: Option[Long] = child.maxRows
override lazy val resolved: Boolean = {
@@ -185,6 +186,8 @@ case class Intersect(
leftAttr.withNullability(leftAttr.nullable && rightAttr.nullable)
}
+ override def metadataOutput: Seq[Attribute] = Nil
+
override protected lazy val validConstraints: ExpressionSet =
leftConstraints.union(rightConstraints)
@@ -205,6 +208,8 @@ case class Except(
/** We don't use right.output because those rows get excluded from the set. */
override def output: Seq[Attribute] = left.output
+ override def metadataOutput: Seq[Attribute] = Nil
+
override protected lazy val validConstraints: ExpressionSet = leftConstraints
}
@@ -268,6 +273,8 @@ case class Union(
}
}
+ override def metadataOutput: Seq[Attribute] = Nil
+
override lazy val resolved: Boolean = {
// allChildrenCompatible needs to be evaluated after childrenResolved
def allChildrenCompatible: Boolean =
@@ -343,6 +350,17 @@ case class Join(
}
}
+ override def metadataOutput: Seq[Attribute] = {
+ joinType match {
+ case ExistenceJoin(_) =>
+ left.metadataOutput
+ case LeftExistence(_) =>
+ left.metadataOutput
+ case _ =>
+ children.flatMap(_.metadataOutput)
+ }
+ }
+
override protected lazy val validConstraints: ExpressionSet = {
joinType match {
case _: InnerLike if condition.isDefined =>
@@ -419,6 +437,7 @@ case class InsertIntoDir(
extends UnaryNode {
override def output: Seq[Attribute] = Seq.empty
+ override def metadataOutput: Seq[Attribute] = Nil
override lazy val resolved: Boolean = false
}
@@ -449,6 +468,8 @@ case class View(
override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance()))
+ override def metadataOutput: Seq[Attribute] = Nil
+
override def simpleString(maxFields: Int): String = {
s"View (${desc.identifier}, ${output.mkString("[", ",", "]")})"
}
@@ -616,6 +637,7 @@ case class Aggregate(
}
override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
+ override def metadataOutput: Seq[Attribute] = Nil
override def maxRows: Option[Long] = {
if (groupingExpressions.isEmpty) {
Some(1L)
@@ -751,6 +773,8 @@ case class Expand(
override lazy val references: AttributeSet =
AttributeSet(projections.flatten.flatMap(_.references))
+ override def metadataOutput: Seq[Attribute] = Nil
+
override def producedAttributes: AttributeSet = AttributeSet(output diff child.output)
// This operator can reuse attributes (for example making them null when doing a roll up) so
@@ -813,6 +837,7 @@ case class Pivot(
}
groupByExprsOpt.getOrElse(Seq.empty).map(_.toAttribute) ++ pivotAgg
}
+ override def metadataOutput: Seq[Attribute] = Nil
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 3c49186..eccd6c6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -2794,6 +2794,109 @@ class DataSourceV2SQLSuite
}.getMessage
assert(errMsg.contains(expectedError))
}
+
+ test("SPARK-34923: do not propagate metadata columns through Project") {
+ val t1 = s"${catalogAndNamespace}table"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
+ "PARTITIONED BY (bucket(4, id), id)")
+ sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+
+ assertThrows[AnalysisException] {
+ sql(s"SELECT index, _partition from (SELECT id, data FROM $t1)")
+ }
+ assertThrows[AnalysisException] {
+ spark.table(t1).select("id", "data").select("index", "_partition")
+ }
+ }
+ }
+
+ test("SPARK-34923: do not propagate metadata columns through View") {
+ val t1 = s"${catalogAndNamespace}table"
+ val view = "view"
+
+ withTable(t1) {
+ withTempView(view) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
+ "PARTITIONED BY (bucket(4, id), id)")
+ sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+ sql(s"CACHE TABLE $view AS SELECT * FROM $t1")
+ assertThrows[AnalysisException] {
+ sql(s"SELECT index, _partition FROM $view")
+ }
+ }
+ }
+ }
+
+ test("SPARK-34923: propagate metadata columns through Filter") {
+ val t1 = s"${catalogAndNamespace}table"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
+ "PARTITIONED BY (bucket(4, id), id)")
+ sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+
+ val sqlQuery = spark.sql(s"SELECT id, data, index, _partition FROM $t1 WHERE id > 1")
+ val dfQuery = spark.table(t1).where("id > 1").select("id", "data", "index", "_partition")
+
+ Seq(sqlQuery, dfQuery).foreach { query =>
+ checkAnswer(query, Seq(Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3")))
+ }
+ }
+ }
+
+ test("SPARK-34923: propagate metadata columns through Sort") {
+ val t1 = s"${catalogAndNamespace}table"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
+ "PARTITIONED BY (bucket(4, id), id)")
+ sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+
+ val sqlQuery = spark.sql(s"SELECT id, data, index, _partition FROM $t1 ORDER BY id")
+ val dfQuery = spark.table(t1).orderBy("id").select("id", "data", "index", "_partition")
+
+ Seq(sqlQuery, dfQuery).foreach { query =>
+ checkAnswer(query, Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3")))
+ }
+ }
+ }
+
+ test("SPARK-34923: propagate metadata columns through RepartitionBy") {
+ val t1 = s"${catalogAndNamespace}table"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
+ "PARTITIONED BY (bucket(4, id), id)")
+ sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+
+ val sqlQuery = spark.sql(
+ s"SELECT /*+ REPARTITION_BY_RANGE(3, id) */ id, data, index, _partition FROM $t1")
+ val tbl = spark.table(t1)
+ val dfQuery = tbl.repartitionByRange(3, tbl.col("id"))
+ .select("id", "data", "index", "_partition")
+
+ Seq(sqlQuery, dfQuery).foreach { query =>
+ checkAnswer(query, Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3")))
+ }
+ }
+ }
+
+ test("SPARK-34923: propagate metadata columns through SubqueryAlias") {
+ val t1 = s"${catalogAndNamespace}table"
+ val sbq = "sbq"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
+ "PARTITIONED BY (bucket(4, id), id)")
+ sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+
+ val sqlQuery = spark.sql(
+ s"SELECT $sbq.id, $sbq.data, $sbq.index, $sbq._partition FROM $t1 as $sbq")
+ val dfQuery = spark.table(t1).as(sbq).select(
+ s"$sbq.id", s"$sbq.data", s"$sbq.index", s"$sbq._partition")
+
+ Seq(sqlQuery, dfQuery).foreach { query =>
+ checkAnswer(query, Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3")))
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org