You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/04/06 08:05:06 UTC
[spark] branch master updated: [SPARK-34923][SQL] Metadata output
should be empty for more plans
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3b634f6 [SPARK-34923][SQL] Metadata output should be empty for more plans
3b634f6 is described below
commit 3b634f66c3e4a942178a1e322ae65ce82779625d
Author: Karen Feng <ka...@databricks.com>
AuthorDate: Tue Apr 6 16:04:30 2021 +0800
[SPARK-34923][SQL] Metadata output should be empty for more plans
### What changes were proposed in this pull request?
Changes the metadata propagation framework.
Previously, most `LogicalPlan`'s propagated their `children`'s `metadataOutput`. This did not make sense in cases where the `LogicalPlan` did not even propagate their `children`'s `output`.
I set the metadata output for plans that do not propagate their `children`'s `output` to be `Nil`. Notably, `Project` and `View` no longer have metadata output.
### Why are the changes needed?
Previously, `SELECT m from (SELECT a from tb)` would output `m` if it were metadata. This did not make sense.
### Does this PR introduce _any_ user-facing change?
Yes. Now, `SELECT m from (SELECT a from tb)` will encounter an `AnalysisException`.
### How was this patch tested?
Added unit tests. I did not cover all cases, as they are fairly extensive. However, the new tests cover major cases (and an existing test already covers Join).
Closes #32017 from karenfeng/spark-34923.
Authored-by: Karen Feng <ka...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../sql/catalyst/plans/logical/LogicalPlan.scala | 5 +-
.../plans/logical/basicLogicalOperators.scala | 25 +++++
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 103 +++++++++++++++++++++
3 files changed, 132 insertions(+), 1 deletion(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 7129c69..cef0e06 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -34,7 +34,10 @@ abstract class LogicalPlan
with QueryPlanConstraints
with Logging {
- /** Metadata fields that can be projected from this node */
+ /**
+ * Metadata fields that can be projected from this node.
+ * Should be overridden if the plan does not propagate its children's output.
+ */
def metadataOutput: Seq[Attribute] = children.flatMap(_.metadataOutput)
/** Returns true if this subtree has data from a streaming data source. */
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 9461dbf..49b09cc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -61,6 +61,7 @@ object Subquery {
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
extends OrderPreservingUnaryNode {
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+ override def metadataOutput: Seq[Attribute] = Nil
override def maxRows: Option[Long] = child.maxRows
override lazy val resolved: Boolean = {
@@ -187,6 +188,8 @@ case class Intersect(
leftAttr.withNullability(leftAttr.nullable && rightAttr.nullable)
}
+ override def metadataOutput: Seq[Attribute] = Nil
+
override protected lazy val validConstraints: ExpressionSet =
leftConstraints.union(rightConstraints)
@@ -207,6 +210,8 @@ case class Except(
/** We don't use right.output because those rows get excluded from the set. */
override def output: Seq[Attribute] = left.output
+ override def metadataOutput: Seq[Attribute] = Nil
+
override protected lazy val validConstraints: ExpressionSet = leftConstraints
}
@@ -270,6 +275,8 @@ case class Union(
}
}
+ override def metadataOutput: Seq[Attribute] = Nil
+
override lazy val resolved: Boolean = {
// allChildrenCompatible needs to be evaluated after childrenResolved
def allChildrenCompatible: Boolean =
@@ -364,6 +371,17 @@ case class Join(
}
}
+ override def metadataOutput: Seq[Attribute] = {
+ joinType match {
+ case ExistenceJoin(_) =>
+ left.metadataOutput
+ case LeftExistence(_) =>
+ left.metadataOutput
+ case _ =>
+ children.flatMap(_.metadataOutput)
+ }
+ }
+
override protected lazy val validConstraints: ExpressionSet = {
joinType match {
case _: InnerLike if condition.isDefined =>
@@ -440,6 +458,7 @@ case class InsertIntoDir(
extends UnaryNode {
override def output: Seq[Attribute] = Seq.empty
+ override def metadataOutput: Seq[Attribute] = Nil
override lazy val resolved: Boolean = false
}
@@ -466,6 +485,8 @@ case class View(
override def output: Seq[Attribute] = child.output
+ override def metadataOutput: Seq[Attribute] = Nil
+
override def simpleString(maxFields: Int): String = {
s"View (${desc.identifier}, ${output.mkString("[", ",", "]")})"
}
@@ -647,6 +668,7 @@ case class Aggregate(
}
override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
+ override def metadataOutput: Seq[Attribute] = Nil
override def maxRows: Option[Long] = {
if (groupingExpressions.isEmpty) {
Some(1L)
@@ -782,6 +804,8 @@ case class Expand(
override lazy val references: AttributeSet =
AttributeSet(projections.flatten.flatMap(_.references))
+ override def metadataOutput: Seq[Attribute] = Nil
+
override def producedAttributes: AttributeSet = AttributeSet(output diff child.output)
// This operator can reuse attributes (for example making them null when doing a roll up) so
@@ -818,6 +842,7 @@ case class Pivot(
}
groupByExprsOpt.getOrElse(Seq.empty).map(_.toAttribute) ++ pivotAgg
}
+ override def metadataOutput: Seq[Attribute] = Nil
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index c4abed3..e0dcdc7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -2690,6 +2690,109 @@ class DataSourceV2SQLSuite
}
}
+ test("SPARK-34923: do not propagate metadata columns through Project") {
+ val t1 = s"${catalogAndNamespace}table"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
+ "PARTITIONED BY (bucket(4, id), id)")
+ sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+
+ assertThrows[AnalysisException] {
+ sql(s"SELECT index, _partition from (SELECT id, data FROM $t1)")
+ }
+ assertThrows[AnalysisException] {
+ spark.table(t1).select("id", "data").select("index", "_partition")
+ }
+ }
+ }
+
+ test("SPARK-34923: do not propagate metadata columns through View") {
+ val t1 = s"${catalogAndNamespace}table"
+ val view = "view"
+
+ withTable(t1) {
+ withTempView(view) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
+ "PARTITIONED BY (bucket(4, id), id)")
+ sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+ sql(s"CACHE TABLE $view AS SELECT * FROM $t1")
+ assertThrows[AnalysisException] {
+ sql(s"SELECT index, _partition FROM $view")
+ }
+ }
+ }
+ }
+
+ test("SPARK-34923: propagate metadata columns through Filter") {
+ val t1 = s"${catalogAndNamespace}table"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
+ "PARTITIONED BY (bucket(4, id), id)")
+ sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+
+ val sqlQuery = spark.sql(s"SELECT id, data, index, _partition FROM $t1 WHERE id > 1")
+ val dfQuery = spark.table(t1).where("id > 1").select("id", "data", "index", "_partition")
+
+ Seq(sqlQuery, dfQuery).foreach { query =>
+ checkAnswer(query, Seq(Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3")))
+ }
+ }
+ }
+
+ test("SPARK-34923: propagate metadata columns through Sort") {
+ val t1 = s"${catalogAndNamespace}table"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
+ "PARTITIONED BY (bucket(4, id), id)")
+ sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+
+ val sqlQuery = spark.sql(s"SELECT id, data, index, _partition FROM $t1 ORDER BY id")
+ val dfQuery = spark.table(t1).orderBy("id").select("id", "data", "index", "_partition")
+
+ Seq(sqlQuery, dfQuery).foreach { query =>
+ checkAnswer(query, Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3")))
+ }
+ }
+ }
+
+ test("SPARK-34923: propagate metadata columns through RepartitionBy") {
+ val t1 = s"${catalogAndNamespace}table"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
+ "PARTITIONED BY (bucket(4, id), id)")
+ sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+
+ val sqlQuery = spark.sql(
+ s"SELECT /*+ REPARTITION_BY_RANGE(3, id) */ id, data, index, _partition FROM $t1")
+ val tbl = spark.table(t1)
+ val dfQuery = tbl.repartitionByRange(3, tbl.col("id"))
+ .select("id", "data", "index", "_partition")
+
+ Seq(sqlQuery, dfQuery).foreach { query =>
+ checkAnswer(query, Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3")))
+ }
+ }
+ }
+
+ test("SPARK-34923: propagate metadata columns through SubqueryAlias") {
+ val t1 = s"${catalogAndNamespace}table"
+ val sbq = "sbq"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
+ "PARTITIONED BY (bucket(4, id), id)")
+ sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+
+ val sqlQuery = spark.sql(
+ s"SELECT $sbq.id, $sbq.data, $sbq.index, $sbq._partition FROM $t1 as $sbq")
+ val dfQuery = spark.table(t1).as(sbq).select(
+ s"$sbq.id", s"$sbq.data", s"$sbq.index", s"$sbq._partition")
+
+ Seq(sqlQuery, dfQuery).foreach { query =>
+ checkAnswer(query, Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3")))
+ }
+ }
+ }
+
private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = {
val e = intercept[AnalysisException] {
sql(s"$sqlCommand $sqlParams")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org