You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hu...@apache.org on 2022/07/13 06:15:55 UTC

[spark] branch master updated: [SPARK-39704][SQL] Implement createIndex & dropIndex & indexExists in JDBC (H2 dialect)

This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b1077caf31 [SPARK-39704][SQL] Implement createIndex & dropIndex & indexExists in JDBC (H2 dialect)
0b1077caf31 is described below

commit 0b1077caf319f86b66665175afbee88e1f14da5a
Author: panbingkun <pb...@gmail.com>
AuthorDate: Tue Jul 12 23:15:35 2022 -0700

    [SPARK-39704][SQL] Implement createIndex & dropIndex & indexExists in JDBC (H2 dialect)
    
    ### What changes were proposed in this pull request?
    Implementing createIndex/dropIndex/indexExists in DS V2 JDBC for H2 dialect.
    
    ### Why are the changes needed?
    This is a subtask of the V2 Index support(https://issues.apache.org/jira/browse/SPARK-36525).
    This PR implements createIndex, dropIndex and indexExists.
    After review for some changes in this PR, I will create new PR for listIndexs.
    **It can better test the index interface locally.**
    > This PR only implements createIndex, IndexExists and dropIndex in H2 dialect.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, createIndex/dropIndex/indexExists in DS V2 JDBC for H2
    
    ### How was this patch tested?
    New UT.
    
    Closes #37112 from panbingkun/h2dialect-create-drop.
    
    Authored-by: panbingkun <pb...@gmail.com>
    Signed-off-by: huaxingao <hu...@apple.com>
---
 .../sql/execution/datasources/jdbc/JdbcUtils.scala | 14 +++---
 .../execution/datasources/v2/jdbc/JDBCTable.scala  |  6 +--
 .../org/apache/spark/sql/jdbc/H2Dialect.scala      | 56 ++++++++++++++++++++--
 .../org/apache/spark/sql/jdbc/JdbcDialects.scala   | 14 +++---
 .../org/apache/spark/sql/jdbc/MySQLDialect.scala   | 14 +++---
 .../apache/spark/sql/jdbc/PostgresDialect.scala    | 11 +++--
 .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala    | 17 +++++++
 7 files changed, 101 insertions(+), 31 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index fa4c032fcb0..5d8838906bf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateTimeToMicros, localDateToDays, toJavaDate, toJavaTimestamp, toJavaTimestampNoRebase}
-import org.apache.spark.sql.connector.catalog.TableChange
+import org.apache.spark.sql.connector.catalog.{Identifier, TableChange}
 import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex}
 import org.apache.spark.sql.connector.expressions.NamedReference
 import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
@@ -1033,14 +1033,14 @@ object JdbcUtils extends Logging with SQLConfHelper {
   def createIndex(
       conn: Connection,
       indexName: String,
-      tableName: String,
+      tableIdent: Identifier,
       columns: Array[NamedReference],
       columnsProperties: util.Map[NamedReference, util.Map[String, String]],
       properties: util.Map[String, String],
       options: JDBCOptions): Unit = {
     val dialect = JdbcDialects.get(options.url)
     executeStatement(conn, options,
-      dialect.createIndex(indexName, tableName, columns, columnsProperties, properties))
+      dialect.createIndex(indexName, tableIdent, columns, columnsProperties, properties))
   }
 
   /**
@@ -1049,10 +1049,10 @@ object JdbcUtils extends Logging with SQLConfHelper {
   def indexExists(
       conn: Connection,
       indexName: String,
-      tableName: String,
+      tableIdent: Identifier,
       options: JDBCOptions): Boolean = {
     val dialect = JdbcDialects.get(options.url)
-    dialect.indexExists(conn, indexName, tableName, options)
+    dialect.indexExists(conn, indexName, tableIdent, options)
   }
 
   /**
@@ -1061,10 +1061,10 @@ object JdbcUtils extends Logging with SQLConfHelper {
   def dropIndex(
       conn: Connection,
       indexName: String,
-      tableName: String,
+      tableIdent: Identifier,
       options: JDBCOptions): Unit = {
     val dialect = JdbcDialects.get(options.url)
-    executeStatement(conn, options, dialect.dropIndex(indexName, tableName))
+    executeStatement(conn, options, dialect.dropIndex(indexName, tableIdent))
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
index 31c0167ab49..be8e1c68b7c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
@@ -61,14 +61,14 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt
       JdbcUtils.classifyException(s"Failed to create index $indexName in $name",
         JdbcDialects.get(jdbcOptions.url)) {
         JdbcUtils.createIndex(
-          conn, indexName, name, columns, columnsProperties, properties, jdbcOptions)
+          conn, indexName, ident, columns, columnsProperties, properties, jdbcOptions)
       }
     }
   }
 
   override def indexExists(indexName: String): Boolean = {
     JdbcUtils.withConnection(jdbcOptions) { conn =>
-      JdbcUtils.indexExists(conn, indexName, name, jdbcOptions)
+      JdbcUtils.indexExists(conn, indexName, ident, jdbcOptions)
     }
   }
 
@@ -76,7 +76,7 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt
     JdbcUtils.withConnection(jdbcOptions) { conn =>
       JdbcUtils.classifyException(s"Failed to drop index $indexName in $name",
         JdbcDialects.get(jdbcOptions.url)) {
-        JdbcUtils.dropIndex(conn, indexName, name, jdbcOptions)
+        JdbcUtils.dropIndex(conn, indexName, ident, jdbcOptions)
       }
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
index e58473bb2b3..d41929225a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.sql.jdbc
 
-import java.sql.{SQLException, Types}
+import java.sql.{Connection, SQLException, Types}
+import java.util
 import java.util.Locale
 import java.util.concurrent.ConcurrentHashMap
 
@@ -25,10 +26,12 @@ import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.connector.catalog.Identifier
 import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
 import org.apache.spark.sql.connector.expressions.Expression
-import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
+import org.apache.spark.sql.connector.expressions.NamedReference
+import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
 import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, ShortType, StringType}
 
 private[sql] object H2Dialect extends JdbcDialect {
@@ -74,6 +77,47 @@ private[sql] object H2Dialect extends JdbcDialect {
     functionMap.clear()
   }
 
+  // CREATE INDEX syntax
+  // https://www.h2database.com/html/commands.html#create_index
+  override def createIndex(
+      indexName: String,
+      tableIdent: Identifier,
+      columns: Array[NamedReference],
+      columnsProperties: util.Map[NamedReference, util.Map[String, String]],
+      properties: util.Map[String, String]): String = {
+    val columnList = columns.map(col => quoteIdentifier(col.fieldNames.head))
+    val (indexType, _) = JdbcUtils.processIndexProperties(properties, "h2")
+
+    s"CREATE INDEX ${quoteIdentifier(indexName)} $indexType ON " +
+      s"${tableNameWithSchema(tableIdent)} (${columnList.mkString(", ")})"
+  }
+
+  // DROP INDEX syntax
+  // https://www.h2database.com/html/commands.html#drop_index
+  override def dropIndex(indexName: String, tableIdent: Identifier): String = {
+    s"DROP INDEX ${indexNameWithSchema(tableIdent, indexName)}"
+  }
+
+  // See https://www.h2database.com/html/systemtables.html?#information_schema_indexes
+  override def indexExists(
+      conn: Connection,
+      indexName: String,
+      tableIdent: Identifier,
+      options: JDBCOptions): Boolean = {
+    val sql = s"SELECT * FROM INFORMATION_SCHEMA.INDEXES WHERE " +
+      s"TABLE_SCHEMA = '${tableIdent.namespace().last}' AND " +
+      s"TABLE_NAME = '${tableIdent.name()}' AND INDEX_NAME = '$indexName'"
+    JdbcUtils.checkIfIndexExists(conn, sql, options)
+  }
+
+  private def tableNameWithSchema(ident: Identifier): String = {
+    (ident.namespace() :+ ident.name()).map(quoteIdentifier).mkString(".")
+  }
+
+  private def indexNameWithSchema(ident: Identifier, indexName: String): String = {
+    (ident.namespace() :+ indexName).map(quoteIdentifier).mkString(".")
+  }
+
   override def classifyException(message: String, e: Throwable): AnalysisException = {
     e match {
       case exception: SQLException =>
@@ -88,6 +132,12 @@ private[sql] object H2Dialect extends JdbcDialect {
           // SCHEMA_NOT_FOUND_1
           case 90079 =>
             throw NoSuchNamespaceException(message, cause = Some(e))
+          // INDEX_ALREADY_EXISTS_1
+          case 42111 =>
+            throw new IndexAlreadyExistsException(message, cause = Some(e))
+          // INDEX_NOT_FOUND_1
+          case 42112 =>
+            throw new NoSuchIndexException(message, cause = Some(e))
           case _ => // do nothing
         }
       case _ => // do nothing
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index ba3a3f50eca..d77299bdc0c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -31,7 +31,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
 import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
-import org.apache.spark.sql.connector.catalog.TableChange
+import org.apache.spark.sql.connector.catalog.{Identifier, TableChange}
 import org.apache.spark.sql.connector.catalog.TableChange._
 import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
 import org.apache.spark.sql.connector.catalog.index.TableIndex
@@ -473,7 +473,7 @@ abstract class JdbcDialect extends Serializable with Logging {
    * Build a create index SQL statement.
    *
    * @param indexName         the name of the index to be created
-   * @param tableName         the table on which index to be created
+   * @param tableIdent        the table on which index to be created
    * @param columns           the columns on which index to be created
    * @param columnsProperties the properties of the columns on which index to be created
    * @param properties        the properties of the index to be created
@@ -481,7 +481,7 @@ abstract class JdbcDialect extends Serializable with Logging {
    */
   def createIndex(
       indexName: String,
-      tableName: String,
+      tableIdent: Identifier,
       columns: Array[NamedReference],
       columnsProperties: util.Map[NamedReference, util.Map[String, String]],
       properties: util.Map[String, String]): String = {
@@ -492,7 +492,7 @@ abstract class JdbcDialect extends Serializable with Logging {
    * Checks whether an index exists
    *
    * @param indexName the name of the index
-   * @param tableName the table name on which index to be checked
+   * @param tableIdent the table on which index to be checked
    * @param options JDBCOptions of the table
    * @return true if the index with `indexName` exists in the table with `tableName`,
    *         false otherwise
@@ -500,7 +500,7 @@ abstract class JdbcDialect extends Serializable with Logging {
   def indexExists(
       conn: Connection,
       indexName: String,
-      tableName: String,
+      tableIdent: Identifier,
       options: JDBCOptions): Boolean = {
     throw new UnsupportedOperationException("indexExists is not supported")
   }
@@ -509,10 +509,10 @@ abstract class JdbcDialect extends Serializable with Logging {
    * Build a drop index SQL statement.
    *
    * @param indexName the name of the index to be dropped.
-   * @param tableName the table name on which index to be dropped.
+   * @param tableIdent the table on which index to be dropped.
   * @return the SQL statement to use for dropping the index.
    */
-  def dropIndex(indexName: String, tableName: String): String = {
+  def dropIndex(indexName: String, tableIdent: Identifier): String = {
     throw new UnsupportedOperationException("dropIndex is not supported")
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index 96b544bb03e..cc04b5c7c92 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable.ArrayBuilder
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException}
+import org.apache.spark.sql.connector.catalog.Identifier
 import org.apache.spark.sql.connector.catalog.index.TableIndex
 import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
 import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -150,7 +151,7 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
   // https://dev.mysql.com/doc/refman/8.0/en/create-index.html
   override def createIndex(
       indexName: String,
-      tableName: String,
+      tableIdent: Identifier,
       columns: Array[NamedReference],
       columnsProperties: util.Map[NamedReference, util.Map[String, String]],
       properties: util.Map[String, String]): String = {
@@ -159,7 +160,7 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
 
     // columnsProperties doesn't apply to MySQL so it is ignored
     s"CREATE INDEX ${quoteIdentifier(indexName)} $indexType ON" +
-      s" ${quoteIdentifier(tableName)} (${columnList.mkString(", ")})" +
+      s" ${quoteIdentifier(tableIdent.name())} (${columnList.mkString(", ")})" +
       s" ${indexPropertyList.mkString(" ")}"
   }
 
@@ -168,14 +169,15 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
   override def indexExists(
       conn: Connection,
       indexName: String,
-      tableName: String,
+      tableIdent: Identifier,
       options: JDBCOptions): Boolean = {
-    val sql = s"SHOW INDEXES FROM ${quoteIdentifier(tableName)} WHERE key_name = '$indexName'"
+    val sql = s"SHOW INDEXES FROM ${quoteIdentifier(tableIdent.name())} " +
+      s"WHERE key_name = '$indexName'"
     JdbcUtils.checkIfIndexExists(conn, sql, options)
   }
 
-  override def dropIndex(indexName: String, tableName: String): String = {
-    s"DROP INDEX ${quoteIdentifier(indexName)} ON $tableName"
+  override def dropIndex(indexName: String, tableIdent: Identifier): String = {
+    s"DROP INDEX ${quoteIdentifier(indexName)} ON ${tableIdent.name()}"
   }
 
   // SHOW INDEX syntax
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
index 551f8d62621..cb78bc806e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
@@ -24,6 +24,7 @@ import java.util.Locale
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NonEmptyNamespaceException, NoSuchIndexException}
+import org.apache.spark.sql.connector.catalog.Identifier
 import org.apache.spark.sql.connector.expressions.NamedReference
 import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
 import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo
@@ -182,7 +183,7 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper {
   // https://www.postgresql.org/docs/14/sql-createindex.html
   override def createIndex(
       indexName: String,
-      tableName: String,
+      tableIdent: Identifier,
       columns: Array[NamedReference],
       columnsProperties: util.Map[NamedReference, util.Map[String, String]],
       properties: util.Map[String, String]): String = {
@@ -194,7 +195,7 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper {
       indexProperties = "WITH (" + indexPropertyList.mkString(", ") + ")"
     }
 
-    s"CREATE INDEX ${quoteIdentifier(indexName)} ON ${quoteIdentifier(tableName)}" +
+    s"CREATE INDEX ${quoteIdentifier(indexName)} ON ${quoteIdentifier(tableIdent.name())}" +
       s" $indexType (${columnList.mkString(", ")}) $indexProperties"
   }
 
@@ -203,16 +204,16 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper {
   override def indexExists(
       conn: Connection,
       indexName: String,
-      tableName: String,
+      tableIdent: Identifier,
       options: JDBCOptions): Boolean = {
-    val sql = s"SELECT * FROM pg_indexes WHERE tablename = '$tableName' AND" +
+    val sql = s"SELECT * FROM pg_indexes WHERE tablename = '${tableIdent.name()}' AND" +
       s" indexname = '$indexName'"
     JdbcUtils.checkIfIndexExists(conn, sql, options)
   }
 
   // DROP INDEX syntax
   // https://www.postgresql.org/docs/14/sql-dropindex.html
-  override def dropIndex(indexName: String, tableName: String): String = {
+  override def dropIndex(indexName: String, tableIdent: Identifier): String = {
     s"DROP INDEX ${quoteIdentifier(indexName)}"
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index 2dd6280091b..7608c0b148d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -28,7 +28,9 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, GlobalLimit, LocalLimit, Offset, Sort}
 import org.apache.spark.sql.connector.{IntegralAverage, StrLen}
+import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog}
 import org.apache.spark.sql.connector.catalog.functions.{ScalarFunction, UnboundFunction}
+import org.apache.spark.sql.connector.catalog.index.SupportsIndex
 import org.apache.spark.sql.connector.expressions.Expression
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, V1ScanWrapper}
 import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
@@ -2217,4 +2219,19 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       JdbcDialects.registerDialect(H2Dialect)
     }
   }
+
+  test("Test INDEX Using SQL") {
+    val loaded = Catalogs.load("h2", conf)
+    val jdbcTable = loaded.asInstanceOf[TableCatalog]
+      .loadTable(Identifier.of(Array("test"), "people"))
+      .asInstanceOf[SupportsIndex]
+    assert(jdbcTable != null)
+    assert(jdbcTable.indexExists("people_index") == false)
+
+    sql(s"CREATE INDEX people_index ON TABLE h2.test.people (id)")
+    assert(jdbcTable.indexExists("people_index"))
+
+    sql(s"DROP INDEX people_index ON TABLE h2.test.people")
+    assert(jdbcTable.indexExists("people_index") == false)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org