You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by db...@apache.org on 2021/10/26 18:17:41 UTC
[spark] branch master updated: [SPARK-36895][SQL] Add Create Index
syntax support
This is an automated email from the ASF dual-hosted git repository.
dbtsai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 677aba2 [SPARK-36895][SQL] Add Create Index syntax support
677aba2 is described below
commit 677aba26bf7dd6d57c5400c8e9ab03b2bb69085f
Author: Huaxin Gao <hu...@apple.com>
AuthorDate: Tue Oct 26 18:16:19 2021 +0000
[SPARK-36895][SQL] Add Create Index syntax support
### What changes were proposed in this pull request?
This is the 2nd PR for DSv2 index support.
This PR adds the following:
- create index syntax support in parser and analyzer
- `CreateIndex` logic node
- `CreateIndexExec` physical node
`CreateIndex` is not implemented yet in this PR. Calling `CreateIndex` will throw `SQLFeatureNotSupportedException`, and the parsed index information such as `IndexName` `indexType` `columns` and index properties will be included in the error message for now for testing purpose.
### Why are the changes needed?
To support index in DSv2
### Does this PR introduce _any_ user-facing change?
Yes, the create table syntax as the following:
```
CREATE INDEX index_name ON [TABLE] table_name [USING index_type] (column_index_property_list)[OPTIONS indexPropertyList]
column_index_property_list: column_name [OPTIONS(indexPropertyList)] [ , . . . ]
indexPropertyList: index_property_name [= index_property_value] [ , . . . ]
```
### How was this patch tested?
add a UT
Closes #34148 from huaxingao/index_syntax.
Lead-authored-by: Huaxin Gao <hu...@apple.com>
Co-authored-by: Huaxin Gao <hu...@gmail.com>
Signed-off-by: DB Tsai <d_...@apple.com>
---
.../spark/sql/jdbc/v2/MySQLIntegrationSuite.scala | 34 ++++++++-
.../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 19 +++--
.../apache/spark/sql/catalyst/parser/SqlBase.g4 | 56 +++++++++------
.../sql/connector/catalog/index/SupportsIndex.java | 8 +--
.../spark/sql/catalyst/parser/AstBuilder.scala | 80 +++++++++++++++-------
.../sql/catalyst/plans/logical/v2Commands.scala | 16 ++++-
.../spark/sql/errors/QueryCompilationErrors.scala | 4 ++
.../spark/sql/catalyst/parser/DDLParserSuite.scala | 18 +++++
.../spark/sql/execution/SparkSqlParser.scala | 4 +-
.../sql/execution/datasources/jdbc/JdbcUtils.scala | 4 +-
.../execution/datasources/v2/CreateIndexExec.scala | 57 +++++++++++++++
.../datasources/v2/DataSourceV2Strategy.scala | 10 +++
.../execution/datasources/v2/jdbc/JDBCTable.scala | 4 +-
.../org/apache/spark/sql/jdbc/JdbcDialects.scala | 4 +-
.../org/apache/spark/sql/jdbc/MySQLDialect.scala | 7 +-
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 11 +++
16 files changed, 265 insertions(+), 71 deletions(-)
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
index 6781d45..8d1bb08 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
@@ -24,6 +24,8 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkConf
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException
+import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog}
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
@@ -122,13 +124,13 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
override def supportsIndex: Boolean = true
override def testIndexProperties(jdbcTable: SupportsIndex): Unit = {
- val properties = new util.Properties();
+ val properties = new util.HashMap[String, String]();
properties.put("KEY_BLOCK_SIZE", "10")
properties.put("COMMENT", "'this is a comment'")
// MySQL doesn't allow property set on individual column, so use empty Array for
// column properties
jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")),
- Array.empty[util.Map[NamedReference, util.Properties]], properties)
+ new util.HashMap[NamedReference, util.Map[String, String]](), properties)
var index = jdbcTable.listIndexes()
// The index property size is actually 1. Even though the index is created
@@ -137,4 +139,32 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
assert(index(0).properties.size == 1)
assert(index(0).properties.get("COMMENT").equals("this is a comment"))
}
+
+ override def testIndexUsingSQL(tbl: String): Unit = {
+ val loaded = Catalogs.load("mysql", conf)
+ val jdbcTable = loaded.asInstanceOf[TableCatalog]
+ .loadTable(Identifier.of(Array.empty[String], "new_table"))
+ .asInstanceOf[SupportsIndex]
+ assert(jdbcTable.indexExists("i1") == false)
+ assert(jdbcTable.indexExists("i2") == false)
+
+ val indexType = "DUMMY"
+ var m = intercept[UnsupportedOperationException] {
+ sql(s"CREATE index i1 ON $catalogName.new_table USING DUMMY (col1)")
+ }.getMessage
+ assert(m.contains(s"Index Type $indexType is not supported." +
+ s" The supported Index Types are: BTREE and HASH"))
+
+ sql(s"CREATE index i1 ON $catalogName.new_table USING BTREE (col1)")
+ sql(s"CREATE index i2 ON $catalogName.new_table (col2, col3, col5)" +
+ s" OPTIONS (KEY_BLOCK_SIZE=10)")
+
+ assert(jdbcTable.indexExists("i1") == true)
+ assert(jdbcTable.indexExists("i2") == true)
+
+ m = intercept[IndexAlreadyExistsException] {
+ sql(s"CREATE index i1 ON $catalogName.new_table (col1)")
+ }.getMessage
+ assert(m.contains("Failed to create index: i1 in new_table"))
+ }
}
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
index f3e3b34..a97adf9 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
@@ -189,6 +189,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
def supportsIndex: Boolean = false
def testIndexProperties(jdbcTable: SupportsIndex): Unit = {}
+ def testIndexUsingSQL(tbl: String): Unit = {}
test("SPARK-36913: Test INDEX") {
if (supportsIndex) {
@@ -202,28 +203,28 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
assert(jdbcTable.indexExists("i1") == false)
assert(jdbcTable.indexExists("i2") == false)
- val properties = new util.Properties();
+ val properties = new util.HashMap[String, String]();
val indexType = "DUMMY"
var m = intercept[UnsupportedOperationException] {
jdbcTable.createIndex("i1", indexType, Array(FieldReference("col1")),
- Array.empty[util.Map[NamedReference, util.Properties]], properties)
+ new util.HashMap[NamedReference, util.Map[String, String]](), properties)
}.getMessage
assert(m.contains(s"Index Type $indexType is not supported." +
s" The supported Index Types are: BTREE and HASH"))
jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")),
- Array.empty[util.Map[NamedReference, util.Properties]], properties)
+ new util.HashMap[NamedReference, util.Map[String, String]](), properties)
jdbcTable.createIndex("i2", "",
Array(FieldReference("col2"), FieldReference("col3"), FieldReference("col5")),
- Array.empty[util.Map[NamedReference, util.Properties]], properties)
+ new util.HashMap[NamedReference, util.Map[String, String]](), properties)
assert(jdbcTable.indexExists("i1") == true)
assert(jdbcTable.indexExists("i2") == true)
m = intercept[IndexAlreadyExistsException] {
jdbcTable.createIndex("i1", "", Array(FieldReference("col1")),
- Array.empty[util.Map[NamedReference, util.Properties]], properties)
+ new util.HashMap[NamedReference, util.Map[String, String]](), properties)
}.getMessage
assert(m.contains("Failed to create index: i1 in new_table"))
@@ -276,5 +277,11 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
}
}
}
-}
+ test("SPARK-36895: Test INDEX Using SQL") {
+ withTable(s"$catalogName.new_table") {
+ sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT, col4 INT, col5 INT)")
+ testIndexUsingSQL(s"$catalogName.new_table")
+ }
+ }
+}
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 8aca8ed..32b080c 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -112,9 +112,9 @@ statement
| CREATE namespace (IF NOT EXISTS)? multipartIdentifier
(commentSpec |
locationSpec |
- (WITH (DBPROPERTIES | PROPERTIES) tablePropertyList))* #createNamespace
+ (WITH (DBPROPERTIES | PROPERTIES) propertyList))* #createNamespace
| ALTER namespace multipartIdentifier
- SET (DBPROPERTIES | PROPERTIES) tablePropertyList #setNamespaceProperties
+ SET (DBPROPERTIES | PROPERTIES) propertyList #setNamespaceProperties
| ALTER namespace multipartIdentifier
SET locationSpec #setNamespaceLocation
| DROP namespace (IF EXISTS)? multipartIdentifier
@@ -130,7 +130,7 @@ statement
rowFormat |
createFileFormat |
locationSpec |
- (TBLPROPERTIES tableProps=tablePropertyList))* #createTableLike
+ (TBLPROPERTIES tableProps=propertyList))* #createTableLike
| replaceTableHeader ('(' colTypeList ')')? tableProvider?
createTableClauses
(AS? query)? #replaceTable
@@ -155,9 +155,9 @@ statement
| ALTER (TABLE | VIEW) from=multipartIdentifier
RENAME TO to=multipartIdentifier #renameTable
| ALTER (TABLE | VIEW) multipartIdentifier
- SET TBLPROPERTIES tablePropertyList #setTableProperties
+ SET TBLPROPERTIES propertyList #setTableProperties
| ALTER (TABLE | VIEW) multipartIdentifier
- UNSET TBLPROPERTIES (IF EXISTS)? tablePropertyList #unsetTableProperties
+ UNSET TBLPROPERTIES (IF EXISTS)? propertyList #unsetTableProperties
| ALTER TABLE table=multipartIdentifier
(ALTER | CHANGE) COLUMN? column=multipartIdentifier
alterColumnAction? #alterTableAlterColumn
@@ -168,9 +168,9 @@ statement
REPLACE COLUMNS
'(' columns=qualifiedColTypeWithPositionList ')' #hiveReplaceColumns
| ALTER TABLE multipartIdentifier (partitionSpec)?
- SET SERDE STRING (WITH SERDEPROPERTIES tablePropertyList)? #setTableSerDe
+ SET SERDE STRING (WITH SERDEPROPERTIES propertyList)? #setTableSerDe
| ALTER TABLE multipartIdentifier (partitionSpec)?
- SET SERDEPROPERTIES tablePropertyList #setTableSerDe
+ SET SERDEPROPERTIES propertyList #setTableSerDe
| ALTER (TABLE | VIEW) multipartIdentifier ADD (IF NOT EXISTS)?
partitionSpecLocation+ #addTablePartition
| ALTER TABLE multipartIdentifier
@@ -187,11 +187,11 @@ statement
identifierCommentList?
(commentSpec |
(PARTITIONED ON identifierList) |
- (TBLPROPERTIES tablePropertyList))*
+ (TBLPROPERTIES propertyList))*
AS query #createView
| CREATE (OR REPLACE)? GLOBAL? TEMPORARY VIEW
tableIdentifier ('(' colTypeList ')')? tableProvider
- (OPTIONS tablePropertyList)? #createTempViewUsing
+ (OPTIONS propertyList)? #createTempViewUsing
| ALTER VIEW multipartIdentifier AS? query #alterViewQuery
| CREATE (OR REPLACE)? TEMPORARY? FUNCTION (IF NOT EXISTS)?
multipartIdentifier AS className=STRING
@@ -204,7 +204,7 @@ statement
| SHOW TABLE EXTENDED ((FROM | IN) ns=multipartIdentifier)?
LIKE pattern=STRING partitionSpec? #showTableExtended
| SHOW TBLPROPERTIES table=multipartIdentifier
- ('(' key=tablePropertyKey ')')? #showTblProperties
+ ('(' key=propertyKey ')')? #showTblProperties
| SHOW COLUMNS (FROM | IN) table=multipartIdentifier
((FROM | IN) ns=multipartIdentifier)? #showColumns
| SHOW VIEWS ((FROM | IN) multipartIdentifier)?
@@ -228,7 +228,7 @@ statement
| REFRESH FUNCTION multipartIdentifier #refreshFunction
| REFRESH (STRING | .*?) #refreshResource
| CACHE LAZY? TABLE multipartIdentifier
- (OPTIONS options=tablePropertyList)? (AS? query)? #cacheTable
+ (OPTIONS options=propertyList)? (AS? query)? #cacheTable
| UNCACHE TABLE (IF EXISTS)? multipartIdentifier #uncacheTable
| CLEAR CACHE #clearCache
| LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE
@@ -247,6 +247,10 @@ statement
| SET .*? #setConfiguration
| RESET configKey #resetQuotedConfiguration
| RESET .*? #resetConfiguration
+ | CREATE INDEX (IF NOT EXISTS)? identifier ON TABLE?
+ multipartIdentifier (USING indexType=identifier)?
+ '(' columns=multipartIdentifierPropertyList ')'
+ (OPTIONS options=propertyList)? #createIndex
| unsupportedHiveNativeCommands .*? #failNativeCommand
;
@@ -341,7 +345,7 @@ insertInto
: INSERT OVERWRITE TABLE? multipartIdentifier (partitionSpec (IF NOT EXISTS)?)? identifierList? #insertOverwriteTable
| INSERT INTO TABLE? multipartIdentifier partitionSpec? (IF NOT EXISTS)? identifierList? #insertIntoTable
| INSERT OVERWRITE LOCAL? DIRECTORY path=STRING rowFormat? createFileFormat? #insertOverwriteHiveDir
- | INSERT OVERWRITE LOCAL? DIRECTORY (path=STRING)? tableProvider (OPTIONS options=tablePropertyList)? #insertOverwriteDir
+ | INSERT OVERWRITE LOCAL? DIRECTORY (path=STRING)? tableProvider (OPTIONS options=propertyList)? #insertOverwriteDir
;
partitionSpecLocation
@@ -387,7 +391,7 @@ tableProvider
;
createTableClauses
- :((OPTIONS options=tablePropertyList) |
+ :((OPTIONS options=propertyList) |
(PARTITIONED BY partitioning=partitionFieldList) |
skewSpec |
bucketSpec |
@@ -395,23 +399,23 @@ createTableClauses
createFileFormat |
locationSpec |
commentSpec |
- (TBLPROPERTIES tableProps=tablePropertyList))*
+ (TBLPROPERTIES tableProps=propertyList))*
;
-tablePropertyList
- : '(' tableProperty (',' tableProperty)* ')'
+propertyList
+ : '(' property (',' property)* ')'
;
-tableProperty
- : key=tablePropertyKey (EQ? value=tablePropertyValue)?
+property
+ : key=propertyKey (EQ? value=propertyValue)?
;
-tablePropertyKey
+propertyKey
: identifier ('.' identifier)*
| STRING
;
-tablePropertyValue
+propertyValue
: INTEGER_VALUE
| DECIMAL_VALUE
| booleanValue
@@ -437,7 +441,7 @@ fileFormat
;
storageHandler
- : STRING (WITH SERDEPROPERTIES tablePropertyList)?
+ : STRING (WITH SERDEPROPERTIES propertyList)?
;
resource
@@ -726,7 +730,7 @@ tableAlias
;
rowFormat
- : ROW FORMAT SERDE name=STRING (WITH SERDEPROPERTIES props=tablePropertyList)? #rowFormatSerde
+ : ROW FORMAT SERDE name=STRING (WITH SERDEPROPERTIES props=propertyList)? #rowFormatSerde
| ROW FORMAT DELIMITED
(FIELDS TERMINATED BY fieldsTerminatedBy=STRING (ESCAPED BY escapedBy=STRING)?)?
(COLLECTION ITEMS TERMINATED BY collectionItemsTerminatedBy=STRING)?
@@ -743,6 +747,14 @@ multipartIdentifier
: parts+=errorCapturingIdentifier ('.' parts+=errorCapturingIdentifier)*
;
+multipartIdentifierPropertyList
+ : multipartIdentifierProperty (',' multipartIdentifierProperty)*
+ ;
+
+multipartIdentifierProperty
+ : multipartIdentifier (OPTIONS options=propertyList)?
+ ;
+
tableIdentifier
: (db=errorCapturingIdentifier '.')? table=errorCapturingIdentifier
;
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java
index 4181cf5..9cf39eb 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java
@@ -18,7 +18,6 @@
package org.apache.spark.sql.connector.catalog.index;
import java.util.Map;
-import java.util.Properties;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException;
@@ -38,7 +37,8 @@ public interface SupportsIndex extends Table {
* Creates an index.
*
* @param indexName the name of the index to be created
- * @param indexType the IndexType of the index to be created
+ * @param indexType the type of the index to be created. If this is not specified, Spark
+ * will use empty String.
* @param columns the columns on which index to be created
* @param columnsProperties the properties of the columns on which index to be created
* @param properties the properties of the index to be created
@@ -47,8 +47,8 @@ public interface SupportsIndex extends Table {
void createIndex(String indexName,
String indexType,
NamedReference[] columns,
- Map<NamedReference, Properties>[] columnsProperties,
- Properties properties)
+ Map<NamedReference, Map<String, String>> columnsProperties,
+ Map<String, String> properties)
throws IndexAlreadyExistsException;
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 82c8891..d36c7ac 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -2780,14 +2780,14 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}
/**
- * Convert a table property list into a key-value map.
+ * Convert a property list into a key-value map.
* This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]].
*/
- override def visitTablePropertyList(
- ctx: TablePropertyListContext): Map[String, String] = withOrigin(ctx) {
- val properties = ctx.tableProperty.asScala.map { property =>
- val key = visitTablePropertyKey(property.key)
- val value = visitTablePropertyValue(property.value)
+ override def visitPropertyList(
+ ctx: PropertyListContext): Map[String, String] = withOrigin(ctx) {
+ val properties = ctx.property.asScala.map { property =>
+ val key = visitPropertyKey(property.key)
+ val value = visitPropertyValue(property.value)
key -> value
}
// Check for duplicate property names.
@@ -2796,10 +2796,10 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}
/**
- * Parse a key-value map from a [[TablePropertyListContext]], assuming all values are specified.
+ * Parse a key-value map from a [[PropertyListContext]], assuming all values are specified.
*/
- def visitPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String] = {
- val props = visitTablePropertyList(ctx)
+ def visitPropertyKeyValues(ctx: PropertyListContext): Map[String, String] = {
+ val props = visitPropertyList(ctx)
val badKeys = props.collect { case (key, null) => key }
if (badKeys.nonEmpty) {
operationNotAllowed(
@@ -2809,10 +2809,10 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}
/**
- * Parse a list of keys from a [[TablePropertyListContext]], assuming no values are specified.
+ * Parse a list of keys from a [[PropertyListContext]], assuming no values are specified.
*/
- def visitPropertyKeys(ctx: TablePropertyListContext): Seq[String] = {
- val props = visitTablePropertyList(ctx)
+ def visitPropertyKeys(ctx: PropertyListContext): Seq[String] = {
+ val props = visitPropertyList(ctx)
val badKeys = props.filter { case (_, v) => v != null }.keys
if (badKeys.nonEmpty) {
operationNotAllowed(
@@ -2822,11 +2822,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}
/**
- * A table property key can either be String or a collection of dot separated elements. This
- * function extracts the property key based on whether its a string literal or a table property
+ * A property key can either be String or a collection of dot separated elements. This
+ * function extracts the property key based on whether its a string literal or a property
* identifier.
*/
- override def visitTablePropertyKey(key: TablePropertyKeyContext): String = {
+ override def visitPropertyKey(key: PropertyKeyContext): String = {
if (key.STRING != null) {
string(key.STRING)
} else {
@@ -2835,10 +2835,10 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}
/**
- * A table property value can be String, Integer, Boolean or Decimal. This function extracts
+ * A property value can be String, Integer, Boolean or Decimal. This function extracts
* the property value based on whether its a string, integer, boolean or decimal literal.
*/
- override def visitTablePropertyValue(value: TablePropertyValueContext): String = {
+ override def visitPropertyValue(value: PropertyValueContext): String = {
if (value == null) {
null
} else if (value.STRING != null) {
@@ -3048,7 +3048,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
throw QueryParsingErrors.propertiesAndDbPropertiesBothSpecifiedError(ctx)
}
- var properties = ctx.tablePropertyList.asScala.headOption
+ var properties = ctx.propertyList.asScala.headOption
.map(visitPropertyKeyValues)
.getOrElse(Map.empty)
@@ -3096,7 +3096,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
*/
override def visitSetNamespaceProperties(ctx: SetNamespacePropertiesContext): LogicalPlan = {
withOrigin(ctx) {
- val properties = cleanNamespaceProperties(visitPropertyKeyValues(ctx.tablePropertyList), ctx)
+ val properties = cleanNamespaceProperties(visitPropertyKeyValues(ctx.propertyList), ctx)
SetNamespaceProperties(
UnresolvedNamespace(visitMultipartIdentifier(ctx.multipartIdentifier)),
properties)
@@ -3237,7 +3237,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
import ctx._
SerdeInfo(
serde = Some(string(name)),
- serdeProperties = Option(tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))
+ serdeProperties = Option(propertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))
}
/**
@@ -3809,7 +3809,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
*/
override def visitSetTableProperties(
ctx: SetTablePropertiesContext): LogicalPlan = withOrigin(ctx) {
- val properties = visitPropertyKeyValues(ctx.tablePropertyList)
+ val properties = visitPropertyKeyValues(ctx.propertyList)
val cleanedTableProperties = cleanTableProperties(ctx, properties)
if (ctx.VIEW != null) {
SetViewProperties(
@@ -3840,7 +3840,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
*/
override def visitUnsetTableProperties(
ctx: UnsetTablePropertiesContext): LogicalPlan = withOrigin(ctx) {
- val properties = visitPropertyKeys(ctx.tablePropertyList)
+ val properties = visitPropertyKeys(ctx.propertyList)
val cleanedProperties = cleanTableProperties(ctx, properties.map(_ -> "").toMap).keys.toSeq
val ifExists = ctx.EXISTS != null
@@ -4278,7 +4278,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
"ALTER TABLE ... SET [SERDE|SERDEPROPERTIES]",
alterTableTypeMismatchHint),
Option(ctx.STRING).map(string),
- Option(ctx.tablePropertyList).map(visitPropertyKeyValues),
+ Option(ctx.propertyList).map(visitPropertyKeyValues),
// TODO a partition spec is allowed to have optional values. This is currently violated.
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
}
@@ -4329,7 +4329,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
ctx: ShowTblPropertiesContext): LogicalPlan = withOrigin(ctx) {
ShowTableProperties(
createUnresolvedTableOrView(ctx.table, "SHOW TBLPROPERTIES"),
- Option(ctx.key).map(visitTablePropertyKey))
+ Option(ctx.key).map(visitPropertyKey))
}
/**
@@ -4404,6 +4404,38 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
CommentOnTable(createUnresolvedTable(ctx.multipartIdentifier, "COMMENT ON TABLE"), comment)
}
+ /**
+ * Create an index, returning a [[CreateIndex]] logical plan.
+ * For example:
+ * {{{
+ * CREATE INDEX index_name ON [TABLE] table_name [USING index_type] (column_index_property_list)
+ * [OPTIONS indexPropertyList]
+ * column_index_property_list: column_name [OPTIONS(indexPropertyList)] [ , . . . ]
+ * indexPropertyList: index_property_name [= index_property_value] [ , . . . ]
+ * }}}
+ */
+ override def visitCreateIndex(ctx: CreateIndexContext): LogicalPlan = withOrigin(ctx) {
+ val (indexName, indexType) = if (ctx.identifier.size() == 1) {
+ (ctx.identifier(0).getText, "")
+ } else {
+ (ctx.identifier(0).getText, ctx.identifier(1).getText)
+ }
+
+ val columns = ctx.columns.multipartIdentifierProperty.asScala
+ .map(_.multipartIdentifier.getText).toSeq
+ val columnsProperties = ctx.columns.multipartIdentifierProperty.asScala
+ .map(x => (Option(x.options).map(visitPropertyKeyValues).getOrElse(Map.empty))).toSeq
+ val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
+
+ CreateIndex(
+ createUnresolvedTable(ctx.multipartIdentifier(), "CREATE INDEX"),
+ indexName,
+ indexType,
+ ctx.EXISTS != null,
+ columns.map(FieldReference(_).asInstanceOf[FieldReference]).zip(columnsProperties),
+ options)
+ }
+
private def alterViewTypeMismatchHint: Option[String] = Some("Please use ALTER TABLE instead.")
private def alterTableTypeMismatchHint: Option[String] = Some("Please use ALTER VIEW instead.")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 31fdb6c..e349822 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema
import org.apache.spark.sql.catalyst.trees.BinaryLike
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog._
-import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.expressions.{NamedReference, Transform}
import org.apache.spark.sql.connector.write.Write
import org.apache.spark.sql.types.{BooleanType, DataType, MetadataBuilder, StringType, StructType}
@@ -1056,3 +1056,17 @@ case class UncacheTable(
override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
}
+
+/**
+ * The logical plan of the CREATE INDEX command.
+ */
+case class CreateIndex(
+ child: LogicalPlan,
+ indexName: String,
+ indexType: String,
+ ignoreIfExists: Boolean,
+ columns: Seq[(NamedReference, Map[String, String])],
+ properties: Map[String, String]) extends UnaryCommand {
+ override protected def withNewChildInternal(newChild: LogicalPlan): CreateIndex =
+ copy(child = newChild)
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index eb8985d..dcaa622 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2366,4 +2366,8 @@ object QueryCompilationErrors {
errorClass = "INVALID_JSON_SCHEMA_MAPTYPE",
messageParameters = Array(schema.toString))
}
+
+ def tableIndexNotSupportedError(errorMessage: String): Throwable = {
+ new AnalysisException(errorMessage)
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 7bcc2b7..798c130 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -2282,6 +2282,24 @@ class DDLParserSuite extends AnalysisTest {
RefreshFunction(UnresolvedFunc(Seq("a", "b", "c"))))
}
+ test("CREATE INDEX") {
+ parseCompare("CREATE index i1 ON a.b.c USING BTREE (col1)",
+ CreateIndex(UnresolvedTable(Seq("a", "b", "c"), "CREATE INDEX", None), "i1", "BTREE", false,
+ Array(FieldReference("col1")).toSeq.zip(Seq(Map.empty[String, String])), Map.empty))
+
+ parseCompare("CREATE index IF NOT EXISTS i1 ON TABLE a.b.c USING BTREE" +
+ " (col1 OPTIONS ('k1'='v1'), col2 OPTIONS ('k2'='v2')) ",
+ CreateIndex(UnresolvedTable(Seq("a", "b", "c"), "CREATE INDEX", None), "i1", "BTREE", true,
+ Array(FieldReference("col1"), FieldReference("col2")).toSeq
+ .zip(Seq(Map("k1" -> "v1"), Map("k2" -> "v2"))), Map.empty))
+
+ parseCompare("CREATE index i1 ON a.b.c" +
+ " (col1 OPTIONS ('k1'='v1'), col2 OPTIONS ('k2'='v2')) OPTIONS ('k3'='v3', 'k4'='v4')",
+ CreateIndex(UnresolvedTable(Seq("a", "b", "c"), "CREATE INDEX", None), "i1", "", false,
+ Array(FieldReference("col1"), FieldReference("col2")).toSeq
+ .zip(Seq(Map("k1" -> "v1"), Map("k2" -> "v2"))), Map("k3" -> "v3", "k4" -> "v4")))
+ }
+
private case class TableSpec(
name: Seq[String],
schema: Option[StructType],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index d26b261..b63306b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -338,7 +338,7 @@ class SparkSqlAstBuilder extends AstBuilder {
replace = ctx.REPLACE != null,
global = ctx.GLOBAL != null,
provider = ctx.tableProvider.multipartIdentifier.getText,
- options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))
+ options = Option(ctx.propertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))
}
/**
@@ -464,7 +464,7 @@ class SparkSqlAstBuilder extends AstBuilder {
throw QueryParsingErrors.createViewWithBothIfNotExistsAndReplaceError(ctx)
}
- val properties = ctx.tablePropertyList.asScala.headOption.map(visitPropertyKeyValues)
+ val properties = ctx.propertyList.asScala.headOption.map(visitPropertyKeyValues)
.getOrElse(Map.empty)
if (ctx.TEMPORARY != null && !properties.isEmpty) {
operationNotAllowed("TBLPROPERTIES can't coexist with CREATE TEMPORARY VIEW", ctx)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index d482245..4402f27 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -1021,8 +1021,8 @@ object JdbcUtils extends Logging with SQLConfHelper {
indexType: String,
tableName: String,
columns: Array[NamedReference],
- columnsProperties: Array[util.Map[NamedReference, util.Properties]],
- properties: util.Properties,
+ columnsProperties: util.Map[NamedReference, util.Map[String, String]],
+ properties: util.Map[String, String],
options: JDBCOptions): Unit = {
val dialect = JdbcDialects.get(options.url)
executeStatement(conn, options,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala
new file mode 100644
index 0000000..78bdf64
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.connector.catalog.index.SupportsIndex
+import org.apache.spark.sql.connector.expressions.NamedReference
+
+/**
+ * Physical plan node for creating an index.
+ */
+case class CreateIndexExec(
+ table: SupportsIndex,
+ indexName: String,
+ indexType: String,
+ ignoreIfExists: Boolean,
+ columns: Seq[(NamedReference, Map[String, String])],
+ properties: Map[String, String])
+ extends LeafV2CommandExec {
+ override protected def run(): Seq[InternalRow] = {
+ val colProperties = new util.HashMap[NamedReference, util.Map[String, String]]
+ columns.foreach {
+ case (column, map) => colProperties.put(column, map.asJava)
+ }
+ try {
+ table.createIndex(
+ indexName, indexType, columns.unzip._1.toArray, colProperties, properties.asJava)
+ } catch {
+ case _: IndexAlreadyExistsException if ignoreIfExists =>
+ logWarning(s"Index $indexName already exists in table ${table.name}. Ignoring.")
+ }
+ Seq.empty
+ }
+
+ override def output: Seq[Attribute] = Seq.empty
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 56e7abc..66ee431 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.toPrettySQL
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog}
+import org.apache.spark.sql.connector.catalog.index.SupportsIndex
import org.apache.spark.sql.connector.expressions.{FieldReference, Literal => V2Literal, LiteralValue}
import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse => V2AlwaysFalse, AlwaysTrue => V2AlwaysTrue, And => V2And, EqualNullSafe => V2EqualNullSafe, EqualTo => V2EqualTo, Filter => V2Filter, GreaterThan => V2GreaterThan, GreaterThanOrEqual => V2GreaterThanOrEqual, In => V2In, IsNotNull => V2IsNotNull, IsNull => V2IsNull, LessThan => V2LessThan, LessThanOrEqual => V2LessThanOrEqual, Not => V2Not, Or => V2Or, StringContains => V2StringContains, StringEndsWith => V2StringEnds [...]
import org.apache.spark.sql.connector.read.LocalScan
@@ -429,6 +430,15 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
val table = a.table.asInstanceOf[ResolvedTable]
AlterTableExec(table.catalog, table.identifier, a.changes) :: Nil
+ case CreateIndex(ResolvedTable(_, _, table, _),
+ indexName, indexType, ifNotExists, columns, properties) =>
+ table match {
+ case s: SupportsIndex =>
+ CreateIndexExec(s, indexName, indexType, ifNotExists, columns, properties):: Nil
+ case _ => throw QueryCompilationErrors.tableIndexNotSupportedError(
+ s"CreateIndex is not supported in this table ${table.name}.")
+ }
+
case _ => Nil
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
index 23ff503..c61d890 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
@@ -56,8 +56,8 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt
indexName: String,
indexType: String,
columns: Array[NamedReference],
- columnsProperties: Array[util.Map[NamedReference, util.Properties]],
- properties: util.Properties): Unit = {
+ columnsProperties: util.Map[NamedReference, util.Map[String, String]],
+ properties: util.Map[String, String]): Unit = {
JdbcUtils.withConnection(jdbcOptions) { conn =>
JdbcUtils.classifyException(s"Failed to create index: $indexName in $name",
JdbcDialects.get(jdbcOptions.url)) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index eb3986c..9e54ba7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -306,8 +306,8 @@ abstract class JdbcDialect extends Serializable with Logging{
indexType: String,
tableName: String,
columns: Array[NamedReference],
- columnsProperties: Array[util.Map[NamedReference, util.Properties]],
- properties: util.Properties): String = {
+ columnsProperties: util.Map[NamedReference, util.Map[String, String]],
+ properties: util.Map[String, String]): String = {
throw new UnsupportedOperationException("createIndex is not supported")
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index 7e85b3b..73b36f1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -118,13 +118,12 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
indexType: String,
tableName: String,
columns: Array[NamedReference],
- columnsProperties: Array[util.Map[NamedReference, util.Properties]],
- properties: util.Properties): String = {
+ columnsProperties: util.Map[NamedReference, util.Map[String, String]],
+ properties: util.Map[String, String]): String = {
val columnList = columns.map(col => quoteIdentifier(col.fieldNames.head))
var indexProperties: String = ""
- val scalaProps = properties.asScala
if (!properties.isEmpty) {
- scalaProps.foreach { case (k, v) =>
+ properties.asScala.foreach { case (k, v) =>
indexProperties = indexProperties + " " + s"$k $v"
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 360c8be..10a70e2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -2969,6 +2969,17 @@ class DataSourceV2SQLSuite
Row("testcat"), Row("testcat2")))
}
+ test("CREATE INDEX should fail") {
+ val t = "testcat.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id bigint, data string COMMENT 'hello') USING foo")
+ val ex = intercept[AnalysisException] {
+ sql(s"CREATE index i1 ON $t(col1)")
+ }
+ assert(ex.getMessage.contains(s"CreateIndex is not supported in this table $t."))
+ }
+ }
+
private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = {
val e = intercept[AnalysisException] {
sql(s"$sqlCommand $sqlParams")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org