You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by db...@apache.org on 2021/10/26 18:17:41 UTC

[spark] branch master updated: [SPARK-36895][SQL] Add Create Index syntax support

This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 677aba2  [SPARK-36895][SQL] Add Create Index syntax support
677aba2 is described below

commit 677aba26bf7dd6d57c5400c8e9ab03b2bb69085f
Author: Huaxin Gao <hu...@apple.com>
AuthorDate: Tue Oct 26 18:16:19 2021 +0000

    [SPARK-36895][SQL] Add Create Index syntax support
    
    ### What changes were proposed in this pull request?
    This is the 2nd PR for DSv2 index support.
    
    This PR adds the following:
    
    - create index syntax support in parser and analyzer
    - `CreateIndex` logic node
    - `CreateIndexExec` physical node
    
    `CreateIndex` is not implemented yet in this PR. Calling `CreateIndex` will throw `SQLFeatureNotSupportedException`, and the parsed index information such as `IndexName` `indexType` `columns` and index properties will be included in the error message for now for testing purpose.
    
    ### Why are the changes needed?
    To support index in DSv2
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, the create table syntax as the following:
    
    ```
    CREATE INDEX index_name ON [TABLE] table_name [USING index_type] (column_index_property_list)[OPTIONS indexPropertyList]
    
        column_index_property_list: column_name [OPTIONS(indexPropertyList)]  [ ,  . . . ]
        indexPropertyList: index_property_name [= index_property_value] [ ,  . . . ]
    ```
    
    ### How was this patch tested?
    add a UT
    
    Closes #34148 from huaxingao/index_syntax.
    
    Lead-authored-by: Huaxin Gao <hu...@apple.com>
    Co-authored-by: Huaxin Gao <hu...@gmail.com>
    Signed-off-by: DB Tsai <d_...@apple.com>
---
 .../spark/sql/jdbc/v2/MySQLIntegrationSuite.scala  | 34 ++++++++-
 .../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala  | 19 +++--
 .../apache/spark/sql/catalyst/parser/SqlBase.g4    | 56 +++++++++------
 .../sql/connector/catalog/index/SupportsIndex.java |  8 +--
 .../spark/sql/catalyst/parser/AstBuilder.scala     | 80 +++++++++++++++-------
 .../sql/catalyst/plans/logical/v2Commands.scala    | 16 ++++-
 .../spark/sql/errors/QueryCompilationErrors.scala  |  4 ++
 .../spark/sql/catalyst/parser/DDLParserSuite.scala | 18 +++++
 .../spark/sql/execution/SparkSqlParser.scala       |  4 +-
 .../sql/execution/datasources/jdbc/JdbcUtils.scala |  4 +-
 .../execution/datasources/v2/CreateIndexExec.scala | 57 +++++++++++++++
 .../datasources/v2/DataSourceV2Strategy.scala      | 10 +++
 .../execution/datasources/v2/jdbc/JDBCTable.scala  |  4 +-
 .../org/apache/spark/sql/jdbc/JdbcDialects.scala   |  4 +-
 .../org/apache/spark/sql/jdbc/MySQLDialect.scala   |  7 +-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 11 +++
 16 files changed, 265 insertions(+), 71 deletions(-)

diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
index 6781d45..8d1bb08 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
@@ -24,6 +24,8 @@ import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException
+import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog}
 import org.apache.spark.sql.connector.catalog.index.SupportsIndex
 import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
 import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
@@ -122,13 +124,13 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
   override def supportsIndex: Boolean = true
 
   override def testIndexProperties(jdbcTable: SupportsIndex): Unit = {
-    val properties = new util.Properties();
+    val properties = new util.HashMap[String, String]();
     properties.put("KEY_BLOCK_SIZE", "10")
     properties.put("COMMENT", "'this is a comment'")
     // MySQL doesn't allow property set on individual column, so use empty Array for
     // column properties
     jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")),
-      Array.empty[util.Map[NamedReference, util.Properties]], properties)
+      new util.HashMap[NamedReference, util.Map[String, String]](), properties)
 
     var index = jdbcTable.listIndexes()
     // The index property size is actually 1. Even though the index is created
@@ -137,4 +139,32 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
     assert(index(0).properties.size == 1)
     assert(index(0).properties.get("COMMENT").equals("this is a comment"))
   }
+
+  override def testIndexUsingSQL(tbl: String): Unit = {
+    val loaded = Catalogs.load("mysql", conf)
+    val jdbcTable = loaded.asInstanceOf[TableCatalog]
+      .loadTable(Identifier.of(Array.empty[String], "new_table"))
+      .asInstanceOf[SupportsIndex]
+    assert(jdbcTable.indexExists("i1") == false)
+    assert(jdbcTable.indexExists("i2") == false)
+
+    val indexType = "DUMMY"
+    var m = intercept[UnsupportedOperationException] {
+      sql(s"CREATE index i1 ON $catalogName.new_table USING DUMMY (col1)")
+    }.getMessage
+    assert(m.contains(s"Index Type $indexType is not supported." +
+      s" The supported Index Types are: BTREE and HASH"))
+
+    sql(s"CREATE index i1 ON $catalogName.new_table USING BTREE (col1)")
+    sql(s"CREATE index i2 ON $catalogName.new_table (col2, col3, col5)" +
+      s" OPTIONS (KEY_BLOCK_SIZE=10)")
+
+    assert(jdbcTable.indexExists("i1") == true)
+    assert(jdbcTable.indexExists("i2") == true)
+
+    m = intercept[IndexAlreadyExistsException] {
+      sql(s"CREATE index i1 ON $catalogName.new_table (col1)")
+    }.getMessage
+    assert(m.contains("Failed to create index: i1 in new_table"))
+  }
 }
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
index f3e3b34..a97adf9 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
@@ -189,6 +189,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
 
   def supportsIndex: Boolean = false
   def testIndexProperties(jdbcTable: SupportsIndex): Unit = {}
+  def testIndexUsingSQL(tbl: String): Unit = {}
 
   test("SPARK-36913: Test INDEX") {
     if (supportsIndex) {
@@ -202,28 +203,28 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
         assert(jdbcTable.indexExists("i1") == false)
         assert(jdbcTable.indexExists("i2") == false)
 
-        val properties = new util.Properties();
+        val properties = new util.HashMap[String, String]();
         val indexType = "DUMMY"
         var m = intercept[UnsupportedOperationException] {
           jdbcTable.createIndex("i1", indexType, Array(FieldReference("col1")),
-            Array.empty[util.Map[NamedReference, util.Properties]], properties)
+            new util.HashMap[NamedReference, util.Map[String, String]](), properties)
         }.getMessage
         assert(m.contains(s"Index Type $indexType is not supported." +
           s" The supported Index Types are: BTREE and HASH"))
 
         jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")),
-          Array.empty[util.Map[NamedReference, util.Properties]], properties)
+          new util.HashMap[NamedReference, util.Map[String, String]](), properties)
 
         jdbcTable.createIndex("i2", "",
           Array(FieldReference("col2"), FieldReference("col3"), FieldReference("col5")),
-          Array.empty[util.Map[NamedReference, util.Properties]], properties)
+          new util.HashMap[NamedReference, util.Map[String, String]](), properties)
 
         assert(jdbcTable.indexExists("i1") == true)
         assert(jdbcTable.indexExists("i2") == true)
 
         m = intercept[IndexAlreadyExistsException] {
           jdbcTable.createIndex("i1", "", Array(FieldReference("col1")),
-            Array.empty[util.Map[NamedReference, util.Properties]], properties)
+            new util.HashMap[NamedReference, util.Map[String, String]](), properties)
         }.getMessage
         assert(m.contains("Failed to create index: i1 in new_table"))
 
@@ -276,5 +277,11 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
       }
     }
   }
-}
 
+  test("SPARK-36895: Test INDEX Using SQL") {
+    withTable(s"$catalogName.new_table") {
+      sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT, col4 INT, col5 INT)")
+      testIndexUsingSQL(s"$catalogName.new_table")
+    }
+  }
+}
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 8aca8ed..32b080c 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -112,9 +112,9 @@ statement
     | CREATE namespace (IF NOT EXISTS)? multipartIdentifier
         (commentSpec |
          locationSpec |
-         (WITH (DBPROPERTIES | PROPERTIES) tablePropertyList))*        #createNamespace
+         (WITH (DBPROPERTIES | PROPERTIES) propertyList))*             #createNamespace
     | ALTER namespace multipartIdentifier
-        SET (DBPROPERTIES | PROPERTIES) tablePropertyList              #setNamespaceProperties
+        SET (DBPROPERTIES | PROPERTIES) propertyList                   #setNamespaceProperties
     | ALTER namespace multipartIdentifier
         SET locationSpec                                               #setNamespaceLocation
     | DROP namespace (IF EXISTS)? multipartIdentifier
@@ -130,7 +130,7 @@ statement
         rowFormat |
         createFileFormat |
         locationSpec |
-        (TBLPROPERTIES tableProps=tablePropertyList))*                 #createTableLike
+        (TBLPROPERTIES tableProps=propertyList))*                      #createTableLike
     | replaceTableHeader ('(' colTypeList ')')? tableProvider?
         createTableClauses
         (AS? query)?                                                   #replaceTable
@@ -155,9 +155,9 @@ statement
     | ALTER (TABLE | VIEW) from=multipartIdentifier
         RENAME TO to=multipartIdentifier                               #renameTable
     | ALTER (TABLE | VIEW) multipartIdentifier
-        SET TBLPROPERTIES tablePropertyList                            #setTableProperties
+        SET TBLPROPERTIES propertyList                                 #setTableProperties
     | ALTER (TABLE | VIEW) multipartIdentifier
-        UNSET TBLPROPERTIES (IF EXISTS)? tablePropertyList             #unsetTableProperties
+        UNSET TBLPROPERTIES (IF EXISTS)? propertyList                  #unsetTableProperties
     | ALTER TABLE table=multipartIdentifier
         (ALTER | CHANGE) COLUMN? column=multipartIdentifier
         alterColumnAction?                                             #alterTableAlterColumn
@@ -168,9 +168,9 @@ statement
         REPLACE COLUMNS
         '(' columns=qualifiedColTypeWithPositionList ')'               #hiveReplaceColumns
     | ALTER TABLE multipartIdentifier (partitionSpec)?
-        SET SERDE STRING (WITH SERDEPROPERTIES tablePropertyList)?     #setTableSerDe
+        SET SERDE STRING (WITH SERDEPROPERTIES propertyList)?          #setTableSerDe
     | ALTER TABLE multipartIdentifier (partitionSpec)?
-        SET SERDEPROPERTIES tablePropertyList                          #setTableSerDe
+        SET SERDEPROPERTIES propertyList                               #setTableSerDe
     | ALTER (TABLE | VIEW) multipartIdentifier ADD (IF NOT EXISTS)?
         partitionSpecLocation+                                         #addTablePartition
     | ALTER TABLE multipartIdentifier
@@ -187,11 +187,11 @@ statement
         identifierCommentList?
         (commentSpec |
          (PARTITIONED ON identifierList) |
-         (TBLPROPERTIES tablePropertyList))*
+         (TBLPROPERTIES propertyList))*
         AS query                                                       #createView
     | CREATE (OR REPLACE)? GLOBAL? TEMPORARY VIEW
         tableIdentifier ('(' colTypeList ')')? tableProvider
-        (OPTIONS tablePropertyList)?                                   #createTempViewUsing
+        (OPTIONS propertyList)?                                        #createTempViewUsing
     | ALTER VIEW multipartIdentifier AS? query                         #alterViewQuery
     | CREATE (OR REPLACE)? TEMPORARY? FUNCTION (IF NOT EXISTS)?
         multipartIdentifier AS className=STRING
@@ -204,7 +204,7 @@ statement
     | SHOW TABLE EXTENDED ((FROM | IN) ns=multipartIdentifier)?
         LIKE pattern=STRING partitionSpec?                             #showTableExtended
     | SHOW TBLPROPERTIES table=multipartIdentifier
-        ('(' key=tablePropertyKey ')')?                                #showTblProperties
+        ('(' key=propertyKey ')')?                                     #showTblProperties
     | SHOW COLUMNS (FROM | IN) table=multipartIdentifier
         ((FROM | IN) ns=multipartIdentifier)?                          #showColumns
     | SHOW VIEWS ((FROM | IN) multipartIdentifier)?
@@ -228,7 +228,7 @@ statement
     | REFRESH FUNCTION multipartIdentifier                             #refreshFunction
     | REFRESH (STRING | .*?)                                           #refreshResource
     | CACHE LAZY? TABLE multipartIdentifier
-        (OPTIONS options=tablePropertyList)? (AS? query)?              #cacheTable
+        (OPTIONS options=propertyList)? (AS? query)?                   #cacheTable
     | UNCACHE TABLE (IF EXISTS)? multipartIdentifier                   #uncacheTable
     | CLEAR CACHE                                                      #clearCache
     | LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE
@@ -247,6 +247,10 @@ statement
     | SET .*?                                                          #setConfiguration
     | RESET configKey                                                  #resetQuotedConfiguration
     | RESET .*?                                                        #resetConfiguration
+    | CREATE INDEX (IF NOT EXISTS)? identifier ON TABLE?
+        multipartIdentifier (USING indexType=identifier)?
+        '(' columns=multipartIdentifierPropertyList ')'
+        (OPTIONS options=propertyList)?                                #createIndex
     | unsupportedHiveNativeCommands .*?                                #failNativeCommand
     ;
 
@@ -341,7 +345,7 @@ insertInto
     : INSERT OVERWRITE TABLE? multipartIdentifier (partitionSpec (IF NOT EXISTS)?)?  identifierList?        #insertOverwriteTable
     | INSERT INTO TABLE? multipartIdentifier partitionSpec? (IF NOT EXISTS)? identifierList?                #insertIntoTable
     | INSERT OVERWRITE LOCAL? DIRECTORY path=STRING rowFormat? createFileFormat?                            #insertOverwriteHiveDir
-    | INSERT OVERWRITE LOCAL? DIRECTORY (path=STRING)? tableProvider (OPTIONS options=tablePropertyList)?   #insertOverwriteDir
+    | INSERT OVERWRITE LOCAL? DIRECTORY (path=STRING)? tableProvider (OPTIONS options=propertyList)?        #insertOverwriteDir
     ;
 
 partitionSpecLocation
@@ -387,7 +391,7 @@ tableProvider
     ;
 
 createTableClauses
-    :((OPTIONS options=tablePropertyList) |
+    :((OPTIONS options=propertyList) |
      (PARTITIONED BY partitioning=partitionFieldList) |
      skewSpec |
      bucketSpec |
@@ -395,23 +399,23 @@ createTableClauses
      createFileFormat |
      locationSpec |
      commentSpec |
-     (TBLPROPERTIES tableProps=tablePropertyList))*
+     (TBLPROPERTIES tableProps=propertyList))*
     ;
 
-tablePropertyList
-    : '(' tableProperty (',' tableProperty)* ')'
+propertyList
+    : '(' property (',' property)* ')'
     ;
 
-tableProperty
-    : key=tablePropertyKey (EQ? value=tablePropertyValue)?
+property
+    : key=propertyKey (EQ? value=propertyValue)?
     ;
 
-tablePropertyKey
+propertyKey
     : identifier ('.' identifier)*
     | STRING
     ;
 
-tablePropertyValue
+propertyValue
     : INTEGER_VALUE
     | DECIMAL_VALUE
     | booleanValue
@@ -437,7 +441,7 @@ fileFormat
     ;
 
 storageHandler
-    : STRING (WITH SERDEPROPERTIES tablePropertyList)?
+    : STRING (WITH SERDEPROPERTIES propertyList)?
     ;
 
 resource
@@ -726,7 +730,7 @@ tableAlias
     ;
 
 rowFormat
-    : ROW FORMAT SERDE name=STRING (WITH SERDEPROPERTIES props=tablePropertyList)?  #rowFormatSerde
+    : ROW FORMAT SERDE name=STRING (WITH SERDEPROPERTIES props=propertyList)?       #rowFormatSerde
     | ROW FORMAT DELIMITED
       (FIELDS TERMINATED BY fieldsTerminatedBy=STRING (ESCAPED BY escapedBy=STRING)?)?
       (COLLECTION ITEMS TERMINATED BY collectionItemsTerminatedBy=STRING)?
@@ -743,6 +747,14 @@ multipartIdentifier
     : parts+=errorCapturingIdentifier ('.' parts+=errorCapturingIdentifier)*
     ;
 
+multipartIdentifierPropertyList
+    : multipartIdentifierProperty (',' multipartIdentifierProperty)*
+    ;
+
+multipartIdentifierProperty
+    : multipartIdentifier (OPTIONS options=propertyList)?
+    ;
+
 tableIdentifier
     : (db=errorCapturingIdentifier '.')? table=errorCapturingIdentifier
     ;
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java
index 4181cf5..9cf39eb 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.connector.catalog.index;
 
 import java.util.Map;
-import java.util.Properties;
 
 import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException;
@@ -38,7 +37,8 @@ public interface SupportsIndex extends Table {
    * Creates an index.
    *
    * @param indexName the name of the index to be created
-   * @param indexType the IndexType of the index to be created
+   * @param indexType the type of the index to be created. If this is not specified, Spark
+   *                  will use empty String.
    * @param columns the columns on which index to be created
    * @param columnsProperties the properties of the columns on which index to be created
    * @param properties the properties of the index to be created
@@ -47,8 +47,8 @@ public interface SupportsIndex extends Table {
   void createIndex(String indexName,
       String indexType,
       NamedReference[] columns,
-      Map<NamedReference, Properties>[] columnsProperties,
-      Properties properties)
+      Map<NamedReference, Map<String, String>> columnsProperties,
+      Map<String, String> properties)
       throws IndexAlreadyExistsException;
 
   /**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 82c8891..d36c7ac 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -2780,14 +2780,14 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
   }
 
   /**
-   * Convert a table property list into a key-value map.
+   * Convert a property list into a key-value map.
    * This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]].
    */
-  override def visitTablePropertyList(
-      ctx: TablePropertyListContext): Map[String, String] = withOrigin(ctx) {
-    val properties = ctx.tableProperty.asScala.map { property =>
-      val key = visitTablePropertyKey(property.key)
-      val value = visitTablePropertyValue(property.value)
+  override def visitPropertyList(
+      ctx: PropertyListContext): Map[String, String] = withOrigin(ctx) {
+    val properties = ctx.property.asScala.map { property =>
+      val key = visitPropertyKey(property.key)
+      val value = visitPropertyValue(property.value)
       key -> value
     }
     // Check for duplicate property names.
@@ -2796,10 +2796,10 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
   }
 
   /**
-   * Parse a key-value map from a [[TablePropertyListContext]], assuming all values are specified.
+   * Parse a key-value map from a [[PropertyListContext]], assuming all values are specified.
    */
-  def visitPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String] = {
-    val props = visitTablePropertyList(ctx)
+  def visitPropertyKeyValues(ctx: PropertyListContext): Map[String, String] = {
+    val props = visitPropertyList(ctx)
     val badKeys = props.collect { case (key, null) => key }
     if (badKeys.nonEmpty) {
       operationNotAllowed(
@@ -2809,10 +2809,10 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
   }
 
   /**
-   * Parse a list of keys from a [[TablePropertyListContext]], assuming no values are specified.
+   * Parse a list of keys from a [[PropertyListContext]], assuming no values are specified.
    */
-  def visitPropertyKeys(ctx: TablePropertyListContext): Seq[String] = {
-    val props = visitTablePropertyList(ctx)
+  def visitPropertyKeys(ctx: PropertyListContext): Seq[String] = {
+    val props = visitPropertyList(ctx)
     val badKeys = props.filter { case (_, v) => v != null }.keys
     if (badKeys.nonEmpty) {
       operationNotAllowed(
@@ -2822,11 +2822,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
   }
 
   /**
-   * A table property key can either be String or a collection of dot separated elements. This
-   * function extracts the property key based on whether its a string literal or a table property
+   * A property key can either be String or a collection of dot separated elements. This
+   * function extracts the property key based on whether its a string literal or a property
    * identifier.
    */
-  override def visitTablePropertyKey(key: TablePropertyKeyContext): String = {
+  override def visitPropertyKey(key: PropertyKeyContext): String = {
     if (key.STRING != null) {
       string(key.STRING)
     } else {
@@ -2835,10 +2835,10 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
   }
 
   /**
-   * A table property value can be String, Integer, Boolean or Decimal. This function extracts
+   * A property value can be String, Integer, Boolean or Decimal. This function extracts
    * the property value based on whether its a string, integer, boolean or decimal literal.
    */
-  override def visitTablePropertyValue(value: TablePropertyValueContext): String = {
+  override def visitPropertyValue(value: PropertyValueContext): String = {
     if (value == null) {
       null
     } else if (value.STRING != null) {
@@ -3048,7 +3048,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
       throw QueryParsingErrors.propertiesAndDbPropertiesBothSpecifiedError(ctx)
     }
 
-    var properties = ctx.tablePropertyList.asScala.headOption
+    var properties = ctx.propertyList.asScala.headOption
       .map(visitPropertyKeyValues)
       .getOrElse(Map.empty)
 
@@ -3096,7 +3096,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
    */
   override def visitSetNamespaceProperties(ctx: SetNamespacePropertiesContext): LogicalPlan = {
     withOrigin(ctx) {
-      val properties = cleanNamespaceProperties(visitPropertyKeyValues(ctx.tablePropertyList), ctx)
+      val properties = cleanNamespaceProperties(visitPropertyKeyValues(ctx.propertyList), ctx)
       SetNamespaceProperties(
         UnresolvedNamespace(visitMultipartIdentifier(ctx.multipartIdentifier)),
         properties)
@@ -3237,7 +3237,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
     import ctx._
     SerdeInfo(
       serde = Some(string(name)),
-      serdeProperties = Option(tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))
+      serdeProperties = Option(propertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))
   }
 
   /**
@@ -3809,7 +3809,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
    */
   override def visitSetTableProperties(
       ctx: SetTablePropertiesContext): LogicalPlan = withOrigin(ctx) {
-    val properties = visitPropertyKeyValues(ctx.tablePropertyList)
+    val properties = visitPropertyKeyValues(ctx.propertyList)
     val cleanedTableProperties = cleanTableProperties(ctx, properties)
     if (ctx.VIEW != null) {
       SetViewProperties(
@@ -3840,7 +3840,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
    */
   override def visitUnsetTableProperties(
       ctx: UnsetTablePropertiesContext): LogicalPlan = withOrigin(ctx) {
-    val properties = visitPropertyKeys(ctx.tablePropertyList)
+    val properties = visitPropertyKeys(ctx.propertyList)
     val cleanedProperties = cleanTableProperties(ctx, properties.map(_ -> "").toMap).keys.toSeq
 
     val ifExists = ctx.EXISTS != null
@@ -4278,7 +4278,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
         "ALTER TABLE ... SET [SERDE|SERDEPROPERTIES]",
         alterTableTypeMismatchHint),
       Option(ctx.STRING).map(string),
-      Option(ctx.tablePropertyList).map(visitPropertyKeyValues),
+      Option(ctx.propertyList).map(visitPropertyKeyValues),
       // TODO a partition spec is allowed to have optional values. This is currently violated.
       Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
   }
@@ -4329,7 +4329,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
       ctx: ShowTblPropertiesContext): LogicalPlan = withOrigin(ctx) {
     ShowTableProperties(
       createUnresolvedTableOrView(ctx.table, "SHOW TBLPROPERTIES"),
-      Option(ctx.key).map(visitTablePropertyKey))
+      Option(ctx.key).map(visitPropertyKey))
   }
 
   /**
@@ -4404,6 +4404,38 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
     CommentOnTable(createUnresolvedTable(ctx.multipartIdentifier, "COMMENT ON TABLE"), comment)
   }
 
+  /**
+   * Create an index, returning a [[CreateIndex]] logical plan.
+   * For example:
+   * {{{
+   * CREATE INDEX index_name ON [TABLE] table_name [USING index_type] (column_index_property_list)
+   *   [OPTIONS indexPropertyList]
+   *   column_index_property_list: column_name [OPTIONS(indexPropertyList)]  [ ,  . . . ]
+   *   indexPropertyList: index_property_name [= index_property_value] [ ,  . . . ]
+   * }}}
+   */
+  override def visitCreateIndex(ctx: CreateIndexContext): LogicalPlan = withOrigin(ctx) {
+    val (indexName, indexType) = if (ctx.identifier.size() == 1) {
+      (ctx.identifier(0).getText, "")
+    } else {
+      (ctx.identifier(0).getText, ctx.identifier(1).getText)
+    }
+
+    val columns = ctx.columns.multipartIdentifierProperty.asScala
+      .map(_.multipartIdentifier.getText).toSeq
+    val columnsProperties = ctx.columns.multipartIdentifierProperty.asScala
+      .map(x => (Option(x.options).map(visitPropertyKeyValues).getOrElse(Map.empty))).toSeq
+    val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
+
+    CreateIndex(
+      createUnresolvedTable(ctx.multipartIdentifier(), "CREATE INDEX"),
+      indexName,
+      indexType,
+      ctx.EXISTS != null,
+      columns.map(FieldReference(_).asInstanceOf[FieldReference]).zip(columnsProperties),
+      options)
+  }
+
   private def alterViewTypeMismatchHint: Option[String] = Some("Please use ALTER TABLE instead.")
 
   private def alterTableTypeMismatchHint: Option[String] = Some("Please use ALTER VIEW instead.")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 31fdb6c..e349822 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema
 import org.apache.spark.sql.catalyst.trees.BinaryLike
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.connector.catalog._
-import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.expressions.{NamedReference, Transform}
 import org.apache.spark.sql.connector.write.Write
 import org.apache.spark.sql.types.{BooleanType, DataType, MetadataBuilder, StringType, StructType}
 
@@ -1056,3 +1056,17 @@ case class UncacheTable(
 
   override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
 }
+
+/**
+ * The logical plan of the CREATE INDEX command.
+ */
+case class CreateIndex(
+    child: LogicalPlan,
+    indexName: String,
+    indexType: String,
+    ignoreIfExists: Boolean,
+    columns: Seq[(NamedReference, Map[String, String])],
+    properties: Map[String, String]) extends UnaryCommand {
+  override protected def withNewChildInternal(newChild: LogicalPlan): CreateIndex =
+    copy(child = newChild)
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index eb8985d..dcaa622 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2366,4 +2366,8 @@ object QueryCompilationErrors {
       errorClass = "INVALID_JSON_SCHEMA_MAPTYPE",
       messageParameters = Array(schema.toString))
   }
+
+  def tableIndexNotSupportedError(errorMessage: String): Throwable = {
+    new AnalysisException(errorMessage)
+  }
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 7bcc2b7..798c130 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -2282,6 +2282,24 @@ class DDLParserSuite extends AnalysisTest {
       RefreshFunction(UnresolvedFunc(Seq("a", "b", "c"))))
   }
 
+  test("CREATE INDEX") {
+    parseCompare("CREATE index i1 ON a.b.c USING BTREE (col1)",
+      CreateIndex(UnresolvedTable(Seq("a", "b", "c"), "CREATE INDEX", None), "i1", "BTREE", false,
+        Array(FieldReference("col1")).toSeq.zip(Seq(Map.empty[String, String])), Map.empty))
+
+    parseCompare("CREATE index IF NOT EXISTS i1 ON TABLE a.b.c USING BTREE" +
+      " (col1 OPTIONS ('k1'='v1'), col2 OPTIONS ('k2'='v2')) ",
+      CreateIndex(UnresolvedTable(Seq("a", "b", "c"), "CREATE INDEX", None), "i1", "BTREE", true,
+        Array(FieldReference("col1"), FieldReference("col2")).toSeq
+          .zip(Seq(Map("k1" -> "v1"), Map("k2" -> "v2"))), Map.empty))
+
+    parseCompare("CREATE index i1 ON a.b.c" +
+      " (col1 OPTIONS ('k1'='v1'), col2 OPTIONS ('k2'='v2')) OPTIONS ('k3'='v3', 'k4'='v4')",
+      CreateIndex(UnresolvedTable(Seq("a", "b", "c"), "CREATE INDEX", None), "i1", "", false,
+        Array(FieldReference("col1"), FieldReference("col2")).toSeq
+          .zip(Seq(Map("k1" -> "v1"), Map("k2" -> "v2"))), Map("k3" -> "v3", "k4" -> "v4")))
+  }
+
   private case class TableSpec(
       name: Seq[String],
       schema: Option[StructType],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index d26b261..b63306b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -338,7 +338,7 @@ class SparkSqlAstBuilder extends AstBuilder {
       replace = ctx.REPLACE != null,
       global = ctx.GLOBAL != null,
       provider = ctx.tableProvider.multipartIdentifier.getText,
-      options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))
+      options = Option(ctx.propertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))
   }
 
   /**
@@ -464,7 +464,7 @@ class SparkSqlAstBuilder extends AstBuilder {
       throw QueryParsingErrors.createViewWithBothIfNotExistsAndReplaceError(ctx)
     }
 
-    val properties = ctx.tablePropertyList.asScala.headOption.map(visitPropertyKeyValues)
+    val properties = ctx.propertyList.asScala.headOption.map(visitPropertyKeyValues)
       .getOrElse(Map.empty)
     if (ctx.TEMPORARY != null && !properties.isEmpty) {
       operationNotAllowed("TBLPROPERTIES can't coexist with CREATE TEMPORARY VIEW", ctx)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index d482245..4402f27 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -1021,8 +1021,8 @@ object JdbcUtils extends Logging with SQLConfHelper {
       indexType: String,
       tableName: String,
       columns: Array[NamedReference],
-      columnsProperties: Array[util.Map[NamedReference, util.Properties]],
-      properties: util.Properties,
+      columnsProperties: util.Map[NamedReference, util.Map[String, String]],
+      properties: util.Map[String, String],
       options: JDBCOptions): Unit = {
     val dialect = JdbcDialects.get(options.url)
     executeStatement(conn, options,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala
new file mode 100644
index 0000000..78bdf64
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.connector.catalog.index.SupportsIndex
+import org.apache.spark.sql.connector.expressions.NamedReference
+
+/**
+ * Physical plan node for creating an index.
+ */
+case class CreateIndexExec(
+    table: SupportsIndex,
+    indexName: String,
+    indexType: String,
+    ignoreIfExists: Boolean,
+    columns: Seq[(NamedReference, Map[String, String])],
+    properties: Map[String, String])
+  extends LeafV2CommandExec {
+  override protected def run(): Seq[InternalRow] = {
+    val colProperties = new util.HashMap[NamedReference, util.Map[String, String]]
+    columns.foreach {
+      case (column, map) => colProperties.put(column, map.asJava)
+    }
+    try {
+      table.createIndex(
+        indexName, indexType, columns.unzip._1.toArray, colProperties, properties.asJava)
+    } catch {
+      case _: IndexAlreadyExistsException if ignoreIfExists =>
+        logWarning(s"Index $indexName already exists in table ${table.name}. Ignoring.")
+    }
+    Seq.empty
+  }
+
+  override def output: Seq[Attribute] = Seq.empty
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 56e7abc..66ee431 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.toPrettySQL
 import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog}
+import org.apache.spark.sql.connector.catalog.index.SupportsIndex
 import org.apache.spark.sql.connector.expressions.{FieldReference, Literal => V2Literal, LiteralValue}
 import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse => V2AlwaysFalse, AlwaysTrue => V2AlwaysTrue, And => V2And, EqualNullSafe => V2EqualNullSafe, EqualTo => V2EqualTo, Filter => V2Filter, GreaterThan => V2GreaterThan, GreaterThanOrEqual => V2GreaterThanOrEqual, In => V2In, IsNotNull => V2IsNotNull, IsNull => V2IsNull, LessThan => V2LessThan, LessThanOrEqual => V2LessThanOrEqual, Not => V2Not, Or => V2Or, StringContains => V2StringContains, StringEndsWith => V2StringEnds [...]
 import org.apache.spark.sql.connector.read.LocalScan
@@ -429,6 +430,15 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
       val table = a.table.asInstanceOf[ResolvedTable]
       AlterTableExec(table.catalog, table.identifier, a.changes) :: Nil
 
+    case CreateIndex(ResolvedTable(_, _, table, _),
+        indexName, indexType, ifNotExists, columns, properties) =>
+      table match {
+        case s: SupportsIndex =>
+          CreateIndexExec(s, indexName, indexType, ifNotExists, columns, properties):: Nil
+        case _ => throw QueryCompilationErrors.tableIndexNotSupportedError(
+          s"CreateIndex is not supported in this table ${table.name}.")
+      }
+
     case _ => Nil
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
index 23ff503..c61d890 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
@@ -56,8 +56,8 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt
       indexName: String,
       indexType: String,
       columns: Array[NamedReference],
-      columnsProperties: Array[util.Map[NamedReference, util.Properties]],
-      properties: util.Properties): Unit = {
+      columnsProperties: util.Map[NamedReference, util.Map[String, String]],
+      properties: util.Map[String, String]): Unit = {
     JdbcUtils.withConnection(jdbcOptions) { conn =>
       JdbcUtils.classifyException(s"Failed to create index: $indexName in $name",
         JdbcDialects.get(jdbcOptions.url)) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index eb3986c..9e54ba7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -306,8 +306,8 @@ abstract class JdbcDialect extends Serializable with Logging{
       indexType: String,
       tableName: String,
       columns: Array[NamedReference],
-      columnsProperties: Array[util.Map[NamedReference, util.Properties]],
-      properties: util.Properties): String = {
+      columnsProperties: util.Map[NamedReference, util.Map[String, String]],
+      properties: util.Map[String, String]): String = {
     throw new UnsupportedOperationException("createIndex is not supported")
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index 7e85b3b..73b36f1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -118,13 +118,12 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
       indexType: String,
       tableName: String,
       columns: Array[NamedReference],
-      columnsProperties: Array[util.Map[NamedReference, util.Properties]],
-      properties: util.Properties): String = {
+      columnsProperties: util.Map[NamedReference, util.Map[String, String]],
+      properties: util.Map[String, String]): String = {
     val columnList = columns.map(col => quoteIdentifier(col.fieldNames.head))
     var indexProperties: String = ""
-    val scalaProps = properties.asScala
     if (!properties.isEmpty) {
-      scalaProps.foreach { case (k, v) =>
+      properties.asScala.foreach { case (k, v) =>
         indexProperties = indexProperties + " " + s"$k $v"
       }
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 360c8be..10a70e2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -2969,6 +2969,17 @@ class DataSourceV2SQLSuite
       Row("testcat"), Row("testcat2")))
   }
 
+  test("CREATE INDEX should fail") {
+    val t = "testcat.tbl"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id bigint, data string COMMENT 'hello') USING foo")
+      val ex = intercept[AnalysisException] {
+        sql(s"CREATE index i1 ON $t(col1)")
+      }
+      assert(ex.getMessage.contains(s"CreateIndex is not supported in this table $t."))
+    }
+  }
+
   private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = {
     val e = intercept[AnalysisException] {
       sql(s"$sqlCommand $sqlParams")

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org