You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hu...@apache.org on 2022/07/13 06:15:55 UTC
[spark] branch master updated: [SPARK-39704][SQL] Implement createIndex & dropIndex & indexExists in JDBC (H2 dialect)
This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0b1077caf31 [SPARK-39704][SQL] Implement createIndex & dropIndex & indexExists in JDBC (H2 dialect)
0b1077caf31 is described below
commit 0b1077caf319f86b66665175afbee88e1f14da5a
Author: panbingkun <pb...@gmail.com>
AuthorDate: Tue Jul 12 23:15:35 2022 -0700
[SPARK-39704][SQL] Implement createIndex & dropIndex & indexExists in JDBC (H2 dialect)
### What changes were proposed in this pull request?
Implementing createIndex/dropIndex/indexExists in DS V2 JDBC for H2 dialect.
### Why are the changes needed?
This is a subtask of the V2 Index support(https://issues.apache.org/jira/browse/SPARK-36525).
This PR implements createIndex, dropIndex and indexExists.
After review for some changes in this PR, I will create new PR for listIndexs.
**It can better test the index interface locally.**
> This PR only implements createIndex, IndexExists and dropIndex in H2 dialect.
### Does this PR introduce _any_ user-facing change?
Yes, createIndex/dropIndex/indexExists in DS V2 JDBC for H2
### How was this patch tested?
New UT.
Closes #37112 from panbingkun/h2dialect-create-drop.
Authored-by: panbingkun <pb...@gmail.com>
Signed-off-by: huaxingao <hu...@apple.com>
---
.../sql/execution/datasources/jdbc/JdbcUtils.scala | 14 +++---
.../execution/datasources/v2/jdbc/JDBCTable.scala | 6 +--
.../org/apache/spark/sql/jdbc/H2Dialect.scala | 56 ++++++++++++++++++++--
.../org/apache/spark/sql/jdbc/JdbcDialects.scala | 14 +++---
.../org/apache/spark/sql/jdbc/MySQLDialect.scala | 14 +++---
.../apache/spark/sql/jdbc/PostgresDialect.scala | 11 +++--
.../org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 17 +++++++
7 files changed, 101 insertions(+), 31 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index fa4c032fcb0..5d8838906bf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateTimeToMicros, localDateToDays, toJavaDate, toJavaTimestamp, toJavaTimestampNoRebase}
-import org.apache.spark.sql.connector.catalog.TableChange
+import org.apache.spark.sql.connector.catalog.{Identifier, TableChange}
import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex}
import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
@@ -1033,14 +1033,14 @@ object JdbcUtils extends Logging with SQLConfHelper {
def createIndex(
conn: Connection,
indexName: String,
- tableName: String,
+ tableIdent: Identifier,
columns: Array[NamedReference],
columnsProperties: util.Map[NamedReference, util.Map[String, String]],
properties: util.Map[String, String],
options: JDBCOptions): Unit = {
val dialect = JdbcDialects.get(options.url)
executeStatement(conn, options,
- dialect.createIndex(indexName, tableName, columns, columnsProperties, properties))
+ dialect.createIndex(indexName, tableIdent, columns, columnsProperties, properties))
}
/**
@@ -1049,10 +1049,10 @@ object JdbcUtils extends Logging with SQLConfHelper {
def indexExists(
conn: Connection,
indexName: String,
- tableName: String,
+ tableIdent: Identifier,
options: JDBCOptions): Boolean = {
val dialect = JdbcDialects.get(options.url)
- dialect.indexExists(conn, indexName, tableName, options)
+ dialect.indexExists(conn, indexName, tableIdent, options)
}
/**
@@ -1061,10 +1061,10 @@ object JdbcUtils extends Logging with SQLConfHelper {
def dropIndex(
conn: Connection,
indexName: String,
- tableName: String,
+ tableIdent: Identifier,
options: JDBCOptions): Unit = {
val dialect = JdbcDialects.get(options.url)
- executeStatement(conn, options, dialect.dropIndex(indexName, tableName))
+ executeStatement(conn, options, dialect.dropIndex(indexName, tableIdent))
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
index 31c0167ab49..be8e1c68b7c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
@@ -61,14 +61,14 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt
JdbcUtils.classifyException(s"Failed to create index $indexName in $name",
JdbcDialects.get(jdbcOptions.url)) {
JdbcUtils.createIndex(
- conn, indexName, name, columns, columnsProperties, properties, jdbcOptions)
+ conn, indexName, ident, columns, columnsProperties, properties, jdbcOptions)
}
}
}
override def indexExists(indexName: String): Boolean = {
JdbcUtils.withConnection(jdbcOptions) { conn =>
- JdbcUtils.indexExists(conn, indexName, name, jdbcOptions)
+ JdbcUtils.indexExists(conn, indexName, ident, jdbcOptions)
}
}
@@ -76,7 +76,7 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt
JdbcUtils.withConnection(jdbcOptions) { conn =>
JdbcUtils.classifyException(s"Failed to drop index $indexName in $name",
JdbcDialects.get(jdbcOptions.url)) {
- JdbcUtils.dropIndex(conn, indexName, name, jdbcOptions)
+ JdbcUtils.dropIndex(conn, indexName, ident, jdbcOptions)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
index e58473bb2b3..d41929225a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
@@ -17,7 +17,8 @@
package org.apache.spark.sql.jdbc
-import java.sql.{SQLException, Types}
+import java.sql.{Connection, SQLException, Types}
+import java.util
import java.util.Locale
import java.util.concurrent.ConcurrentHashMap
@@ -25,10 +26,12 @@ import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.connector.expressions.Expression
-import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
+import org.apache.spark.sql.connector.expressions.NamedReference
+import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, ShortType, StringType}
private[sql] object H2Dialect extends JdbcDialect {
@@ -74,6 +77,47 @@ private[sql] object H2Dialect extends JdbcDialect {
functionMap.clear()
}
+ // CREATE INDEX syntax
+ // https://www.h2database.com/html/commands.html#create_index
+ override def createIndex(
+ indexName: String,
+ tableIdent: Identifier,
+ columns: Array[NamedReference],
+ columnsProperties: util.Map[NamedReference, util.Map[String, String]],
+ properties: util.Map[String, String]): String = {
+ val columnList = columns.map(col => quoteIdentifier(col.fieldNames.head))
+ val (indexType, _) = JdbcUtils.processIndexProperties(properties, "h2")
+
+ s"CREATE INDEX ${quoteIdentifier(indexName)} $indexType ON " +
+ s"${tableNameWithSchema(tableIdent)} (${columnList.mkString(", ")})"
+ }
+
+ // DROP INDEX syntax
+ // https://www.h2database.com/html/commands.html#drop_index
+ override def dropIndex(indexName: String, tableIdent: Identifier): String = {
+ s"DROP INDEX ${indexNameWithSchema(tableIdent, indexName)}"
+ }
+
+ // See https://www.h2database.com/html/systemtables.html?#information_schema_indexes
+ override def indexExists(
+ conn: Connection,
+ indexName: String,
+ tableIdent: Identifier,
+ options: JDBCOptions): Boolean = {
+ val sql = s"SELECT * FROM INFORMATION_SCHEMA.INDEXES WHERE " +
+ s"TABLE_SCHEMA = '${tableIdent.namespace().last}' AND " +
+ s"TABLE_NAME = '${tableIdent.name()}' AND INDEX_NAME = '$indexName'"
+ JdbcUtils.checkIfIndexExists(conn, sql, options)
+ }
+
+ private def tableNameWithSchema(ident: Identifier): String = {
+ (ident.namespace() :+ ident.name()).map(quoteIdentifier).mkString(".")
+ }
+
+ private def indexNameWithSchema(ident: Identifier, indexName: String): String = {
+ (ident.namespace() :+ indexName).map(quoteIdentifier).mkString(".")
+ }
+
override def classifyException(message: String, e: Throwable): AnalysisException = {
e match {
case exception: SQLException =>
@@ -88,6 +132,12 @@ private[sql] object H2Dialect extends JdbcDialect {
// SCHEMA_NOT_FOUND_1
case 90079 =>
throw NoSuchNamespaceException(message, cause = Some(e))
+ // INDEX_ALREADY_EXISTS_1
+ case 42111 =>
+ throw new IndexAlreadyExistsException(message, cause = Some(e))
+ // INDEX_NOT_FOUND_1
+ case 42112 =>
+ throw new NoSuchIndexException(message, cause = Some(e))
case _ => // do nothing
}
case _ => // do nothing
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index ba3a3f50eca..d77299bdc0c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -31,7 +31,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
-import org.apache.spark.sql.connector.catalog.TableChange
+import org.apache.spark.sql.connector.catalog.{Identifier, TableChange}
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.connector.catalog.index.TableIndex
@@ -473,7 +473,7 @@ abstract class JdbcDialect extends Serializable with Logging {
* Build a create index SQL statement.
*
* @param indexName the name of the index to be created
- * @param tableName the table on which index to be created
+ * @param tableIdent the table on which index to be created
* @param columns the columns on which index to be created
* @param columnsProperties the properties of the columns on which index to be created
* @param properties the properties of the index to be created
@@ -481,7 +481,7 @@ abstract class JdbcDialect extends Serializable with Logging {
*/
def createIndex(
indexName: String,
- tableName: String,
+ tableIdent: Identifier,
columns: Array[NamedReference],
columnsProperties: util.Map[NamedReference, util.Map[String, String]],
properties: util.Map[String, String]): String = {
@@ -492,7 +492,7 @@ abstract class JdbcDialect extends Serializable with Logging {
* Checks whether an index exists
*
* @param indexName the name of the index
- * @param tableName the table name on which index to be checked
+ * @param tableIdent the table on which index to be checked
* @param options JDBCOptions of the table
* @return true if the index with `indexName` exists in the table with `tableName`,
* false otherwise
@@ -500,7 +500,7 @@ abstract class JdbcDialect extends Serializable with Logging {
def indexExists(
conn: Connection,
indexName: String,
- tableName: String,
+ tableIdent: Identifier,
options: JDBCOptions): Boolean = {
throw new UnsupportedOperationException("indexExists is not supported")
}
@@ -509,10 +509,10 @@ abstract class JdbcDialect extends Serializable with Logging {
* Build a drop index SQL statement.
*
* @param indexName the name of the index to be dropped.
- * @param tableName the table name on which index to be dropped.
+ * @param tableIdent the table on which index to be dropped.
* @return the SQL statement to use for dropping the index.
*/
- def dropIndex(indexName: String, tableName: String): String = {
+ def dropIndex(indexName: String, tableIdent: Identifier): String = {
throw new UnsupportedOperationException("dropIndex is not supported")
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index 96b544bb03e..cc04b5c7c92 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable.ArrayBuilder
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException}
+import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.index.TableIndex
import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -150,7 +151,7 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
// https://dev.mysql.com/doc/refman/8.0/en/create-index.html
override def createIndex(
indexName: String,
- tableName: String,
+ tableIdent: Identifier,
columns: Array[NamedReference],
columnsProperties: util.Map[NamedReference, util.Map[String, String]],
properties: util.Map[String, String]): String = {
@@ -159,7 +160,7 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
// columnsProperties doesn't apply to MySQL so it is ignored
s"CREATE INDEX ${quoteIdentifier(indexName)} $indexType ON" +
- s" ${quoteIdentifier(tableName)} (${columnList.mkString(", ")})" +
+ s" ${quoteIdentifier(tableIdent.name())} (${columnList.mkString(", ")})" +
s" ${indexPropertyList.mkString(" ")}"
}
@@ -168,14 +169,15 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
override def indexExists(
conn: Connection,
indexName: String,
- tableName: String,
+ tableIdent: Identifier,
options: JDBCOptions): Boolean = {
- val sql = s"SHOW INDEXES FROM ${quoteIdentifier(tableName)} WHERE key_name = '$indexName'"
+ val sql = s"SHOW INDEXES FROM ${quoteIdentifier(tableIdent.name())} " +
+ s"WHERE key_name = '$indexName'"
JdbcUtils.checkIfIndexExists(conn, sql, options)
}
- override def dropIndex(indexName: String, tableName: String): String = {
- s"DROP INDEX ${quoteIdentifier(indexName)} ON $tableName"
+ override def dropIndex(indexName: String, tableIdent: Identifier): String = {
+ s"DROP INDEX ${quoteIdentifier(indexName)} ON ${tableIdent.name()}"
}
// SHOW INDEX syntax
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
index 551f8d62621..cb78bc806e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
@@ -24,6 +24,7 @@ import java.util.Locale
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NonEmptyNamespaceException, NoSuchIndexException}
+import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo
@@ -182,7 +183,7 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper {
// https://www.postgresql.org/docs/14/sql-createindex.html
override def createIndex(
indexName: String,
- tableName: String,
+ tableIdent: Identifier,
columns: Array[NamedReference],
columnsProperties: util.Map[NamedReference, util.Map[String, String]],
properties: util.Map[String, String]): String = {
@@ -194,7 +195,7 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper {
indexProperties = "WITH (" + indexPropertyList.mkString(", ") + ")"
}
- s"CREATE INDEX ${quoteIdentifier(indexName)} ON ${quoteIdentifier(tableName)}" +
+ s"CREATE INDEX ${quoteIdentifier(indexName)} ON ${quoteIdentifier(tableIdent.name())}" +
s" $indexType (${columnList.mkString(", ")}) $indexProperties"
}
@@ -203,16 +204,16 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper {
override def indexExists(
conn: Connection,
indexName: String,
- tableName: String,
+ tableIdent: Identifier,
options: JDBCOptions): Boolean = {
- val sql = s"SELECT * FROM pg_indexes WHERE tablename = '$tableName' AND" +
+ val sql = s"SELECT * FROM pg_indexes WHERE tablename = '${tableIdent.name()}' AND" +
s" indexname = '$indexName'"
JdbcUtils.checkIfIndexExists(conn, sql, options)
}
// DROP INDEX syntax
// https://www.postgresql.org/docs/14/sql-dropindex.html
- override def dropIndex(indexName: String, tableName: String): String = {
+ override def dropIndex(indexName: String, tableIdent: Identifier): String = {
s"DROP INDEX ${quoteIdentifier(indexName)}"
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index 2dd6280091b..7608c0b148d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -28,7 +28,9 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, GlobalLimit, LocalLimit, Offset, Sort}
import org.apache.spark.sql.connector.{IntegralAverage, StrLen}
+import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog}
import org.apache.spark.sql.connector.catalog.functions.{ScalarFunction, UnboundFunction}
+import org.apache.spark.sql.connector.catalog.index.SupportsIndex
import org.apache.spark.sql.connector.expressions.Expression
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, V1ScanWrapper}
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
@@ -2217,4 +2219,19 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
JdbcDialects.registerDialect(H2Dialect)
}
}
+
+ test("Test INDEX Using SQL") {
+ val loaded = Catalogs.load("h2", conf)
+ val jdbcTable = loaded.asInstanceOf[TableCatalog]
+ .loadTable(Identifier.of(Array("test"), "people"))
+ .asInstanceOf[SupportsIndex]
+ assert(jdbcTable != null)
+ assert(jdbcTable.indexExists("people_index") == false)
+
+ sql(s"CREATE INDEX people_index ON TABLE h2.test.people (id)")
+ assert(jdbcTable.indexExists("people_index"))
+
+ sql(s"DROP INDEX people_index ON TABLE h2.test.people")
+ assert(jdbcTable.indexExists("people_index") == false)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org