You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/11/08 12:59:42 UTC

[spark] branch master updated: [SPARK-36895][SQL][FOLLOWUP] Use property to specify index type

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4011dd6  [SPARK-36895][SQL][FOLLOWUP] Use property to specify index type
4011dd6 is described below

commit 4011dd6995c61737ca67288224afb548eeecb3a0
Author: Huaxin Gao <hu...@apple.com>
AuthorDate: Mon Nov 8 20:58:38 2021 +0800

    [SPARK-36895][SQL][FOLLOWUP] Use property to specify index type
    
    ### What changes were proposed in this pull request?
    use property to specify index type
    
    ### Why are the changes needed?
    to address this comment https://github.com/apache/spark/pull/34148#discussion_r731500964
    
    ### Does this PR introduce _any_ user-facing change?
    Yes
    ```
      void createIndex(String indexName,
          String indexType,
          NamedReference[] columns,
          Map<NamedReference, Map<String, String>> columnsProperties,
          Map<String, String> properties)
    ```
    changed to
    ```
    createIndex(String indexName,
          NamedReference[] columns,
          Map<NamedReference, Map<String, String>> columnsProperties,
          Map<String, String> properties
    ```
    
    ### How was this patch tested?
    new test
    
    Closes #34486 from huaxingao/deleteIndexType.
    
    Lead-authored-by: Huaxin Gao <hu...@apple.com>
    Co-authored-by: Wenchen Fan <cl...@gmail.com>
    Co-authored-by: Huaxin Gao <hu...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/jdbc/v2/MySQLIntegrationSuite.scala  | 67 ------------------
 .../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala  | 82 +++++-----------------
 .../sql/connector/catalog/index/SupportsIndex.java |  8 ++-
 .../sql/execution/datasources/jdbc/JdbcUtils.scala |  3 +-
 .../execution/datasources/v2/CreateIndexExec.scala |  9 ++-
 .../execution/datasources/v2/jdbc/JDBCTable.scala  |  3 +-
 .../org/apache/spark/sql/jdbc/JdbcDialects.scala   |  1 -
 .../org/apache/spark/sql/jdbc/MySQLDialect.scala   | 27 ++++---
 8 files changed, 45 insertions(+), 155 deletions(-)

diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
index d77dcb4..592f7d6 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
@@ -18,16 +18,11 @@
 package org.apache.spark.sql.jdbc.v2
 
 import java.sql.{Connection, SQLFeatureNotSupportedException}
-import java.util
 
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException}
-import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog}
-import org.apache.spark.sql.connector.catalog.index.SupportsIndex
-import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
 import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
 import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite}
 import org.apache.spark.sql.types._
@@ -122,66 +117,4 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
   }
 
   override def supportsIndex: Boolean = true
-
-  override def testIndexProperties(jdbcTable: SupportsIndex): Unit = {
-    val properties = new util.HashMap[String, String]();
-    properties.put("KEY_BLOCK_SIZE", "10")
-    properties.put("COMMENT", "'this is a comment'")
-    // MySQL doesn't allow property set on individual column, so use empty Array for
-    // column properties
-    jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")),
-      new util.HashMap[NamedReference, util.Map[String, String]](), properties)
-
-    var index = jdbcTable.listIndexes()
-    // The index property size is actually 1. Even though the index is created
-    // with properties "KEY_BLOCK_SIZE", "10" and "COMMENT", "'this is a comment'", when
-    // retrieving index using `SHOW INDEXES`, MySQL only returns `COMMENT`.
-    assert(index(0).properties.size == 1)
-    assert(index(0).properties.get("COMMENT").equals("this is a comment"))
-  }
-
-  override def testIndexUsingSQL(tbl: String): Unit = {
-    val loaded = Catalogs.load("mysql", conf)
-    val jdbcTable = loaded.asInstanceOf[TableCatalog]
-      .loadTable(Identifier.of(Array.empty[String], "new_table"))
-      .asInstanceOf[SupportsIndex]
-    assert(jdbcTable.indexExists("i1") == false)
-    assert(jdbcTable.indexExists("i2") == false)
-
-    val indexType = "DUMMY"
-    var m = intercept[UnsupportedOperationException] {
-      sql(s"CREATE index i1 ON $catalogName.new_table USING DUMMY (col1)")
-    }.getMessage
-    assert(m.contains(s"Index Type $indexType is not supported." +
-      s" The supported Index Types are: BTREE and HASH"))
-
-    sql(s"CREATE index i1 ON $catalogName.new_table USING BTREE (col1)")
-    sql(s"CREATE index i2 ON $catalogName.new_table (col2, col3, col5)" +
-      s" OPTIONS (KEY_BLOCK_SIZE=10)")
-
-    assert(jdbcTable.indexExists("i1") == true)
-    assert(jdbcTable.indexExists("i2") == true)
-
-    // This should pass without exception
-    sql(s"CREATE index IF NOT EXISTS i1 ON $catalogName.new_table (col1)")
-
-    m = intercept[IndexAlreadyExistsException] {
-      sql(s"CREATE index i1 ON $catalogName.new_table (col1)")
-    }.getMessage
-    assert(m.contains("Failed to create index i1 in new_table"))
-
-    sql(s"DROP index i1 ON $catalogName.new_table")
-    sql(s"DROP index i2 ON $catalogName.new_table")
-
-    assert(jdbcTable.indexExists("i1") == false)
-    assert(jdbcTable.indexExists("i2") == false)
-
-    // This should pass without exception
-    sql(s"DROP index IF EXISTS i1 ON $catalogName.new_table")
-
-    m = intercept[NoSuchIndexException] {
-      sql(s"DROP index i1 ON $catalogName.new_table")
-    }.getMessage
-    assert(m.contains("Failed to drop index i1 in new_table"))
-  }
 }
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
index 717624b..d292051 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.jdbc.v2
 
-import java.util
-
 import org.apache.log4j.Level
 
 import org.apache.spark.sql.AnalysisException
@@ -27,7 +25,6 @@ import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSu
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, Sample}
 import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog}
 import org.apache.spark.sql.connector.catalog.index.SupportsIndex
-import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, V1ScanWrapper}
 import org.apache.spark.sql.jdbc.DockerIntegrationFunSuite
 import org.apache.spark.sql.test.SharedSparkSession
@@ -193,14 +190,12 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
   }
 
   def supportsIndex: Boolean = false
-  def testIndexProperties(jdbcTable: SupportsIndex): Unit = {}
-  def testIndexUsingSQL(tbl: String): Unit = {}
 
-  test("SPARK-36913: Test INDEX") {
+  test("SPARK-36895: Test INDEX Using SQL") {
     if (supportsIndex) {
       withTable(s"$catalogName.new_table") {
         sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT," +
-          s" col4 INT, col5 INT)")
+          " col4 INT, col5 INT)")
         val loaded = Catalogs.load(catalogName, conf)
         val jdbcTable = loaded.asInstanceOf[TableCatalog]
           .loadTable(Identifier.of(Array.empty[String], "new_table"))
@@ -208,88 +203,45 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
         assert(jdbcTable.indexExists("i1") == false)
         assert(jdbcTable.indexExists("i2") == false)
 
-        val properties = new util.HashMap[String, String]();
         val indexType = "DUMMY"
         var m = intercept[UnsupportedOperationException] {
-          jdbcTable.createIndex("i1", indexType, Array(FieldReference("col1")),
-            new util.HashMap[NamedReference, util.Map[String, String]](), properties)
+          sql(s"CREATE index i1 ON $catalogName.new_table USING $indexType (col1)")
         }.getMessage
         assert(m.contains(s"Index Type $indexType is not supported." +
           s" The supported Index Types are: BTREE and HASH"))
 
-        jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")),
-          new util.HashMap[NamedReference, util.Map[String, String]](), properties)
-
-        jdbcTable.createIndex("i2", "",
-          Array(FieldReference("col2"), FieldReference("col3"), FieldReference("col5")),
-          new util.HashMap[NamedReference, util.Map[String, String]](), properties)
+        sql(s"CREATE index i1 ON $catalogName.new_table USING BTREE (col1)")
+        sql(s"CREATE index i2 ON $catalogName.new_table (col2, col3, col5)" +
+          s" OPTIONS (KEY_BLOCK_SIZE=10)")
 
         assert(jdbcTable.indexExists("i1") == true)
         assert(jdbcTable.indexExists("i2") == true)
 
+        // This should pass without exception
+        sql(s"CREATE index IF NOT EXISTS i1 ON $catalogName.new_table (col1)")
+
         m = intercept[IndexAlreadyExistsException] {
-          jdbcTable.createIndex("i1", "", Array(FieldReference("col1")),
-            new util.HashMap[NamedReference, util.Map[String, String]](), properties)
+          sql(s"CREATE index i1 ON $catalogName.new_table (col1)")
         }.getMessage
         assert(m.contains("Failed to create index i1 in new_table"))
 
-        var index = jdbcTable.listIndexes()
-        assert(index.length == 2)
-
-        assert(index(0).indexName.equals("i1"))
-        assert(index(0).indexType.equals("BTREE"))
-        var cols = index(0).columns
-        assert(cols.length == 1)
-        assert(cols(0).describe().equals("col1"))
-        assert(index(0).properties.size == 0)
-
-        assert(index(1).indexName.equals("i2"))
-        assert(index(1).indexType.equals("BTREE"))
-        cols = index(1).columns
-        assert(cols.length == 3)
-        assert(cols(0).describe().equals("col2"))
-        assert(cols(1).describe().equals("col3"))
-        assert(cols(2).describe().equals("col5"))
-        assert(index(1).properties.size == 0)
-
-        jdbcTable.dropIndex("i1")
-        assert(jdbcTable.indexExists("i1") == false)
-        assert(jdbcTable.indexExists("i2") == true)
-
-        index = jdbcTable.listIndexes()
-        assert(index.length == 1)
-
-        assert(index(0).indexName.equals("i2"))
-        assert(index(0).indexType.equals("BTREE"))
-        cols = index(0).columns
-        assert(cols.length == 3)
-        assert(cols(0).describe().equals("col2"))
-        assert(cols(1).describe().equals("col3"))
-        assert(cols(2).describe().equals("col5"))
+        sql(s"DROP index i1 ON $catalogName.new_table")
+        sql(s"DROP index i2 ON $catalogName.new_table")
 
-        jdbcTable.dropIndex("i2")
         assert(jdbcTable.indexExists("i1") == false)
         assert(jdbcTable.indexExists("i2") == false)
-        index = jdbcTable.listIndexes()
-        assert(index.length == 0)
+
+        // This should pass without exception
+        sql(s"DROP index IF EXISTS i1 ON $catalogName.new_table")
 
         m = intercept[NoSuchIndexException] {
-          jdbcTable.dropIndex("i2")
+          sql(s"DROP index i1 ON $catalogName.new_table")
         }.getMessage
-        assert(m.contains("Failed to drop index i2 in new_table"))
-
-        testIndexProperties(jdbcTable)
+        assert(m.contains("Failed to drop index i1 in new_table"))
       }
     }
   }
 
-  test("SPARK-36895: Test INDEX Using SQL") {
-    withTable(s"$catalogName.new_table") {
-      sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT, col4 INT, col5 INT)")
-      testIndexUsingSQL(s"$catalogName.new_table")
-    }
-  }
-
   def supportsTableSample: Boolean = false
 
   private def samplePushed(df: DataFrame): Boolean = {
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java
index 9cf39eb..734b290 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java
@@ -34,18 +34,20 @@ import org.apache.spark.sql.connector.expressions.NamedReference;
 public interface SupportsIndex extends Table {
 
   /**
+   * A reserved property to specify the index type.
+   */
+  String PROP_TYPE = "type";
+
+  /**
    * Creates an index.
    *
    * @param indexName the name of the index to be created
-   * @param indexType the type of the index to be created. If this is not specified, Spark
-   *                  will use empty String.
    * @param columns the columns on which index to be created
    * @param columnsProperties the properties of the columns on which index to be created
    * @param properties the properties of the index to be created
    * @throws IndexAlreadyExistsException If the index already exists.
    */
   void createIndex(String indexName,
-      String indexType,
       NamedReference[] columns,
       Map<NamedReference, Map<String, String>> columnsProperties,
       Map<String, String> properties)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 4402f27..526f9d0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -1018,7 +1018,6 @@ object JdbcUtils extends Logging with SQLConfHelper {
   def createIndex(
       conn: Connection,
       indexName: String,
-      indexType: String,
       tableName: String,
       columns: Array[NamedReference],
       columnsProperties: util.Map[NamedReference, util.Map[String, String]],
@@ -1026,7 +1025,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
       options: JDBCOptions): Unit = {
     val dialect = JdbcDialects.get(options.url)
     executeStatement(conn, options,
-      dialect.createIndex(indexName, indexType, tableName, columns, columnsProperties, properties))
+      dialect.createIndex(indexName, tableName, columns, columnsProperties, properties))
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala
index 78bdf64..20ccf99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala
@@ -39,13 +39,20 @@ case class CreateIndexExec(
     properties: Map[String, String])
   extends LeafV2CommandExec {
   override protected def run(): Seq[InternalRow] = {
+
+    val propertiesWithIndexType: Map[String, String] = if (indexType.nonEmpty) {
+      properties + (SupportsIndex.PROP_TYPE -> indexType)
+    } else {
+      properties
+    }
+
     val colProperties = new util.HashMap[NamedReference, util.Map[String, String]]
     columns.foreach {
       case (column, map) => colProperties.put(column, map.asJava)
     }
     try {
       table.createIndex(
-        indexName, indexType, columns.unzip._1.toArray, colProperties, properties.asJava)
+        indexName, columns.unzip._1.toArray, colProperties, propertiesWithIndexType.asJava)
     } catch {
       case _: IndexAlreadyExistsException if ignoreIfExists =>
         logWarning(s"Index $indexName already exists in table ${table.name}. Ignoring.")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
index 304431d..31c0167 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
@@ -54,7 +54,6 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt
 
   override def createIndex(
       indexName: String,
-      indexType: String,
       columns: Array[NamedReference],
       columnsProperties: util.Map[NamedReference, util.Map[String, String]],
       properties: util.Map[String, String]): Unit = {
@@ -62,7 +61,7 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt
       JdbcUtils.classifyException(s"Failed to create index $indexName in $name",
         JdbcDialects.get(jdbcOptions.url)) {
         JdbcUtils.createIndex(
-          conn, indexName, indexType, name, columns, columnsProperties, properties, jdbcOptions)
+          conn, indexName, name, columns, columnsProperties, properties, jdbcOptions)
       }
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 568318c..54273dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -304,7 +304,6 @@ abstract class JdbcDialect extends Serializable with Logging{
    */
   def createIndex(
       indexName: String,
-      indexType: String,
       tableName: String,
       columns: Array[NamedReference],
       columnsProperties: util.Map[NamedReference, util.Map[String, String]],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index 73b36f1..28e15b4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -26,7 +26,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException}
-import org.apache.spark.sql.connector.catalog.index.TableIndex
+import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex}
 import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
@@ -115,30 +115,29 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
   // https://dev.mysql.com/doc/refman/8.0/en/create-index.html
   override def createIndex(
       indexName: String,
-      indexType: String,
       tableName: String,
       columns: Array[NamedReference],
       columnsProperties: util.Map[NamedReference, util.Map[String, String]],
       properties: util.Map[String, String]): String = {
     val columnList = columns.map(col => quoteIdentifier(col.fieldNames.head))
     var indexProperties: String = ""
+    var indexType = ""
     if (!properties.isEmpty) {
       properties.asScala.foreach { case (k, v) =>
-        indexProperties = indexProperties + " " + s"$k $v"
-      }
-    }
-    val iType = if (indexType.isEmpty) {
-      ""
-    } else {
-      if (indexType.length > 1 && !indexType.equalsIgnoreCase("BTREE") &&
-        !indexType.equalsIgnoreCase("HASH")) {
-        throw new UnsupportedOperationException(s"Index Type $indexType is not supported." +
-          " The supported Index Types are: BTREE and HASH")
+        if (k.equals(SupportsIndex.PROP_TYPE)) {
+          if (v.equalsIgnoreCase("BTREE") || v.equalsIgnoreCase("HASH")) {
+            indexType = s"USING $v"
+          } else {
+            throw new UnsupportedOperationException(s"Index Type $v is not supported." +
+              " The supported Index Types are: BTREE and HASH")
+          }
+        } else {
+          indexProperties = indexProperties + " " + s"$k $v"
+        }
       }
-      s"USING $indexType"
     }
     // columnsProperties doesn't apply to MySQL so it is ignored
-    s"CREATE INDEX ${quoteIdentifier(indexName)} $iType ON" +
+    s"CREATE INDEX ${quoteIdentifier(indexName)} $indexType ON" +
       s" ${quoteIdentifier(tableName)} (${columnList.mkString(", ")}) $indexProperties"
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org