You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hu...@apache.org on 2021/11/02 21:41:29 UTC
[spark] branch master updated: [SPARK-36895][SQL][FOLLOWUP] CREATE
INDEX command should rely on the analyzer framework to resolve columns
This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 293c085 [SPARK-36895][SQL][FOLLOWUP] CREATE INDEX command should rely on the analyzer framework to resolve columns
293c085 is described below
commit 293c085d677220e71966d98c25cde8a06ae78468
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Tue Nov 2 14:39:42 2021 -0700
[SPARK-36895][SQL][FOLLOWUP] CREATE INDEX command should rely on the analyzer framework to resolve columns
### What changes were proposed in this pull request?
This PR leverages the existing framework to resolve columns in the CREATE INDEX command.
### Why are the changes needed?
To fail earlier instead of passing invalid column names to v2 sources.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
new test
Closes #34467 from cloud-fan/col.
Lead-authored-by: Wenchen Fan <we...@databricks.com>
Co-authored-by: Wenchen Fan <cl...@gmail.com>
Signed-off-by: Huaxin Gao <hu...@apple.com>
---
.../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 15 +++++++++++----
.../org/apache/spark/sql/catalyst/parser/AstBuilder.scala | 4 ++--
.../spark/sql/catalyst/plans/logical/v2Commands.scala | 12 +++++++-----
.../apache/spark/sql/catalyst/parser/DDLParserSuite.scala | 6 +++---
.../execution/datasources/v2/DataSourceV2Strategy.scala | 5 ++++-
.../apache/spark/sql/connector/DataSourceV2SQLSuite.scala | 11 ++++++++---
6 files changed, 35 insertions(+), 18 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index f0a1c8c..068886e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -272,7 +272,7 @@ class Analyzer(override val catalogManager: CatalogManager)
ResolveInsertInto ::
ResolveRelations ::
ResolvePartitionSpec ::
- ResolveAlterTableCommands ::
+ ResolveFieldNameAndPosition ::
AddMetadataColumns ::
DeduplicateRelations ::
ResolveReferences ::
@@ -3529,11 +3529,18 @@ class Analyzer(override val catalogManager: CatalogManager)
}
/**
- * Rule to mostly resolve, normalize and rewrite column names based on case sensitivity
- * for alter table column commands.
+ * Rule to resolve, normalize and rewrite field names based on case sensitivity for commands.
*/
- object ResolveAlterTableCommands extends Rule[LogicalPlan] {
+ object ResolveFieldNameAndPosition extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+ case cmd: CreateIndex if cmd.table.resolved &&
+ cmd.columns.exists(_._1.isInstanceOf[UnresolvedFieldName]) =>
+ val table = cmd.table.asInstanceOf[ResolvedTable]
+ cmd.copy(columns = cmd.columns.map {
+ case (u: UnresolvedFieldName, prop) => resolveFieldNames(table, u.name, u) -> prop
+ case other => other
+ })
+
case a: AlterTableCommand if a.table.resolved && hasUnresolvedFieldName(a) =>
val table = a.table.asInstanceOf[ResolvedTable]
a.transformExpressions {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 722a055..a16674f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -4429,7 +4429,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}
val columns = ctx.columns.multipartIdentifierProperty.asScala
- .map(_.multipartIdentifier.getText).toSeq
+ .map(_.multipartIdentifier).map(typedVisit[Seq[String]]).toSeq
val columnsProperties = ctx.columns.multipartIdentifierProperty.asScala
.map(x => (Option(x.options).map(visitPropertyKeyValues).getOrElse(Map.empty))).toSeq
val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
@@ -4439,7 +4439,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
indexName,
indexType,
ctx.EXISTS != null,
- columns.map(FieldReference(_).asInstanceOf[FieldReference]).zip(columnsProperties),
+ columns.map(UnresolvedFieldName(_)).zip(columnsProperties),
options)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index e349822..f5ebae8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.analysis.{NamedRelation, PartitionSpec, UnresolvedException}
+import org.apache.spark.sql.catalyst.analysis.{FieldName, NamedRelation, PartitionSpec, UnresolvedException}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.FunctionResource
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable}
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema
import org.apache.spark.sql.catalyst.trees.BinaryLike
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog._
-import org.apache.spark.sql.connector.expressions.{NamedReference, Transform}
+import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.write.Write
import org.apache.spark.sql.types.{BooleanType, DataType, MetadataBuilder, StringType, StructType}
@@ -1061,12 +1061,14 @@ case class UncacheTable(
* The logical plan of the CREATE INDEX command.
*/
case class CreateIndex(
- child: LogicalPlan,
+ table: LogicalPlan,
indexName: String,
indexType: String,
ignoreIfExists: Boolean,
- columns: Seq[(NamedReference, Map[String, String])],
+ columns: Seq[(FieldName, Map[String, String])],
properties: Map[String, String]) extends UnaryCommand {
+ override def child: LogicalPlan = table
+ override lazy val resolved: Boolean = table.resolved && columns.forall(_._1.resolved)
override protected def withNewChildInternal(newChild: LogicalPlan): CreateIndex =
- copy(child = newChild)
+ copy(table = newChild)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 13588e9..bdb6a15 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -2276,18 +2276,18 @@ class DDLParserSuite extends AnalysisTest {
test("CREATE INDEX") {
parseCompare("CREATE index i1 ON a.b.c USING BTREE (col1)",
CreateIndex(UnresolvedTable(Seq("a", "b", "c"), "CREATE INDEX", None), "i1", "BTREE", false,
- Array(FieldReference("col1")).toSeq.zip(Seq(Map.empty[String, String])), Map.empty))
+ Seq(UnresolvedFieldName(Seq("col1"))).zip(Seq(Map.empty[String, String])), Map.empty))
parseCompare("CREATE index IF NOT EXISTS i1 ON TABLE a.b.c USING BTREE" +
" (col1 OPTIONS ('k1'='v1'), col2 OPTIONS ('k2'='v2')) ",
CreateIndex(UnresolvedTable(Seq("a", "b", "c"), "CREATE INDEX", None), "i1", "BTREE", true,
- Array(FieldReference("col1"), FieldReference("col2")).toSeq
+ Seq(UnresolvedFieldName(Seq("col1")), UnresolvedFieldName(Seq("col2")))
.zip(Seq(Map("k1" -> "v1"), Map("k2" -> "v2"))), Map.empty))
parseCompare("CREATE index i1 ON a.b.c" +
" (col1 OPTIONS ('k1'='v1'), col2 OPTIONS ('k2'='v2')) OPTIONS ('k3'='v3', 'k4'='v4')",
CreateIndex(UnresolvedTable(Seq("a", "b", "c"), "CREATE INDEX", None), "i1", "", false,
- Array(FieldReference("col1"), FieldReference("col2")).toSeq
+ Seq(UnresolvedFieldName(Seq("col1")), UnresolvedFieldName(Seq("col2")))
.zip(Seq(Map("k1" -> "v1"), Map("k2" -> "v2"))), Map("k3" -> "v3", "k4" -> "v4")))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index b688c32..f1513a2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -436,7 +436,10 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
indexName, indexType, ifNotExists, columns, properties) =>
table match {
case s: SupportsIndex =>
- CreateIndexExec(s, indexName, indexType, ifNotExists, columns, properties):: Nil
+ val namedRefs = columns.map { case (field, prop) =>
+ FieldReference(field.name) -> prop
+ }
+ CreateIndexExec(s, indexName, indexType, ifNotExists, namedRefs, properties) :: Nil
case _ => throw QueryCompilationErrors.tableIndexNotSupportedError(
s"CreateIndex is not supported in this table ${table.name}.")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index ee3a156..d2d46d6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -2937,10 +2937,15 @@ class DataSourceV2SQLSuite
val t = "testcat.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string COMMENT 'hello') USING foo")
- val ex = intercept[AnalysisException] {
- sql(s"CREATE index i1 ON $t(col1)")
+ val e1 = intercept[AnalysisException] {
+ sql(s"CREATE index i1 ON $t(non_exist)")
+ }
+ assert(e1.getMessage.contains(s"Missing field non_exist in table $t"))
+
+ val e2 = intercept[AnalysisException] {
+ sql(s"CREATE index i1 ON $t(id)")
}
- assert(ex.getMessage.contains(s"CreateIndex is not supported in this table $t."))
+ assert(e2.getMessage.contains(s"CreateIndex is not supported in this table $t."))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org