You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hu...@apache.org on 2021/11/02 21:41:29 UTC

[spark] branch master updated: [SPARK-36895][SQL][FOLLOWUP] CREATE INDEX command should rely on the analyzer framework to resolve columns

This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 293c085  [SPARK-36895][SQL][FOLLOWUP] CREATE INDEX command should rely on the analyzer framework to resolve columns
293c085 is described below

commit 293c085d677220e71966d98c25cde8a06ae78468
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Tue Nov 2 14:39:42 2021 -0700

    [SPARK-36895][SQL][FOLLOWUP] CREATE INDEX command should rely on the analyzer framework to resolve columns
    
    ### What changes were proposed in this pull request?
    
    This PR leverages the existing framework to resolve columns in the CREATE INDEX command.
    
    ### Why are the changes needed?
    
    To fail earlier instead of passing invalid column names to v2 sources.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    new test
    
    Closes #34467 from cloud-fan/col.
    
    Lead-authored-by: Wenchen Fan <we...@databricks.com>
    Co-authored-by: Wenchen Fan <cl...@gmail.com>
    Signed-off-by: Huaxin Gao <hu...@apple.com>
---
 .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 15 +++++++++++----
 .../org/apache/spark/sql/catalyst/parser/AstBuilder.scala |  4 ++--
 .../spark/sql/catalyst/plans/logical/v2Commands.scala     | 12 +++++++-----
 .../apache/spark/sql/catalyst/parser/DDLParserSuite.scala |  6 +++---
 .../execution/datasources/v2/DataSourceV2Strategy.scala   |  5 ++++-
 .../apache/spark/sql/connector/DataSourceV2SQLSuite.scala | 11 ++++++++---
 6 files changed, 35 insertions(+), 18 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index f0a1c8c..068886e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -272,7 +272,7 @@ class Analyzer(override val catalogManager: CatalogManager)
       ResolveInsertInto ::
       ResolveRelations ::
       ResolvePartitionSpec ::
-      ResolveAlterTableCommands ::
+      ResolveFieldNameAndPosition ::
       AddMetadataColumns ::
       DeduplicateRelations ::
       ResolveReferences ::
@@ -3529,11 +3529,18 @@ class Analyzer(override val catalogManager: CatalogManager)
   }
 
   /**
-   * Rule to mostly resolve, normalize and rewrite column names based on case sensitivity
-   * for alter table column commands.
+   * Rule to resolve, normalize and rewrite field names based on case sensitivity for commands.
    */
-  object ResolveAlterTableCommands extends Rule[LogicalPlan] {
+  object ResolveFieldNameAndPosition extends Rule[LogicalPlan] {
     def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+      case cmd: CreateIndex if cmd.table.resolved &&
+          cmd.columns.exists(_._1.isInstanceOf[UnresolvedFieldName]) =>
+        val table = cmd.table.asInstanceOf[ResolvedTable]
+        cmd.copy(columns = cmd.columns.map {
+          case (u: UnresolvedFieldName, prop) => resolveFieldNames(table, u.name, u) -> prop
+          case other => other
+        })
+
       case a: AlterTableCommand if a.table.resolved && hasUnresolvedFieldName(a) =>
         val table = a.table.asInstanceOf[ResolvedTable]
         a.transformExpressions {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 722a055..a16674f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -4429,7 +4429,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
     }
 
     val columns = ctx.columns.multipartIdentifierProperty.asScala
-      .map(_.multipartIdentifier.getText).toSeq
+      .map(_.multipartIdentifier).map(typedVisit[Seq[String]]).toSeq
     val columnsProperties = ctx.columns.multipartIdentifierProperty.asScala
       .map(x => (Option(x.options).map(visitPropertyKeyValues).getOrElse(Map.empty))).toSeq
     val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
@@ -4439,7 +4439,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
       indexName,
       indexType,
       ctx.EXISTS != null,
-      columns.map(FieldReference(_).asInstanceOf[FieldReference]).zip(columnsProperties),
+      columns.map(UnresolvedFieldName(_)).zip(columnsProperties),
       options)
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index e349822..f5ebae8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.plans.logical
 
-import org.apache.spark.sql.catalyst.analysis.{NamedRelation, PartitionSpec, UnresolvedException}
+import org.apache.spark.sql.catalyst.analysis.{FieldName, NamedRelation, PartitionSpec, UnresolvedException}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.catalog.FunctionResource
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable}
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema
 import org.apache.spark.sql.catalyst.trees.BinaryLike
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.connector.catalog._
-import org.apache.spark.sql.connector.expressions.{NamedReference, Transform}
+import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.write.Write
 import org.apache.spark.sql.types.{BooleanType, DataType, MetadataBuilder, StringType, StructType}
 
@@ -1061,12 +1061,14 @@ case class UncacheTable(
  * The logical plan of the CREATE INDEX command.
  */
 case class CreateIndex(
-    child: LogicalPlan,
+    table: LogicalPlan,
     indexName: String,
     indexType: String,
     ignoreIfExists: Boolean,
-    columns: Seq[(NamedReference, Map[String, String])],
+    columns: Seq[(FieldName, Map[String, String])],
     properties: Map[String, String]) extends UnaryCommand {
+  override def child: LogicalPlan = table
+  override lazy val resolved: Boolean = table.resolved && columns.forall(_._1.resolved)
   override protected def withNewChildInternal(newChild: LogicalPlan): CreateIndex =
-    copy(child = newChild)
+    copy(table = newChild)
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 13588e9..bdb6a15 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -2276,18 +2276,18 @@ class DDLParserSuite extends AnalysisTest {
   test("CREATE INDEX") {
     parseCompare("CREATE index i1 ON a.b.c USING BTREE (col1)",
       CreateIndex(UnresolvedTable(Seq("a", "b", "c"), "CREATE INDEX", None), "i1", "BTREE", false,
-        Array(FieldReference("col1")).toSeq.zip(Seq(Map.empty[String, String])), Map.empty))
+        Seq(UnresolvedFieldName(Seq("col1"))).zip(Seq(Map.empty[String, String])), Map.empty))
 
     parseCompare("CREATE index IF NOT EXISTS i1 ON TABLE a.b.c USING BTREE" +
       " (col1 OPTIONS ('k1'='v1'), col2 OPTIONS ('k2'='v2')) ",
       CreateIndex(UnresolvedTable(Seq("a", "b", "c"), "CREATE INDEX", None), "i1", "BTREE", true,
-        Array(FieldReference("col1"), FieldReference("col2")).toSeq
+        Seq(UnresolvedFieldName(Seq("col1")), UnresolvedFieldName(Seq("col2")))
           .zip(Seq(Map("k1" -> "v1"), Map("k2" -> "v2"))), Map.empty))
 
     parseCompare("CREATE index i1 ON a.b.c" +
       " (col1 OPTIONS ('k1'='v1'), col2 OPTIONS ('k2'='v2')) OPTIONS ('k3'='v3', 'k4'='v4')",
       CreateIndex(UnresolvedTable(Seq("a", "b", "c"), "CREATE INDEX", None), "i1", "", false,
-        Array(FieldReference("col1"), FieldReference("col2")).toSeq
+        Seq(UnresolvedFieldName(Seq("col1")), UnresolvedFieldName(Seq("col2")))
           .zip(Seq(Map("k1" -> "v1"), Map("k2" -> "v2"))), Map("k3" -> "v3", "k4" -> "v4")))
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index b688c32..f1513a2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -436,7 +436,10 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
         indexName, indexType, ifNotExists, columns, properties) =>
       table match {
         case s: SupportsIndex =>
-          CreateIndexExec(s, indexName, indexType, ifNotExists, columns, properties):: Nil
+          val namedRefs = columns.map { case (field, prop) =>
+            FieldReference(field.name) -> prop
+          }
+          CreateIndexExec(s, indexName, indexType, ifNotExists, namedRefs, properties) :: Nil
         case _ => throw QueryCompilationErrors.tableIndexNotSupportedError(
           s"CreateIndex is not supported in this table ${table.name}.")
       }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index ee3a156..d2d46d6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -2937,10 +2937,15 @@ class DataSourceV2SQLSuite
     val t = "testcat.tbl"
     withTable(t) {
       sql(s"CREATE TABLE $t (id bigint, data string COMMENT 'hello') USING foo")
-      val ex = intercept[AnalysisException] {
-        sql(s"CREATE index i1 ON $t(col1)")
+      val e1 = intercept[AnalysisException] {
+        sql(s"CREATE index i1 ON $t(non_exist)")
+      }
+      assert(e1.getMessage.contains(s"Missing field non_exist in table $t"))
+
+      val e2 = intercept[AnalysisException] {
+        sql(s"CREATE index i1 ON $t(id)")
       }
-      assert(ex.getMessage.contains(s"CreateIndex is not supported in this table $t."))
+      assert(e2.getMessage.contains(s"CreateIndex is not supported in this table $t."))
     }
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org