You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/11/08 12:59:42 UTC
[spark] branch master updated: [SPARK-36895][SQL][FOLLOWUP] Use
property to specify index type
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4011dd6 [SPARK-36895][SQL][FOLLOWUP] Use property to specify index type
4011dd6 is described below
commit 4011dd6995c61737ca67288224afb548eeecb3a0
Author: Huaxin Gao <hu...@apple.com>
AuthorDate: Mon Nov 8 20:58:38 2021 +0800
[SPARK-36895][SQL][FOLLOWUP] Use property to specify index type
### What changes were proposed in this pull request?
use property to specify index type
### Why are the changes needed?
to address this comment https://github.com/apache/spark/pull/34148#discussion_r731500964
### Does this PR introduce _any_ user-facing change?
Yes
```
void createIndex(String indexName,
String indexType,
NamedReference[] columns,
Map<NamedReference, Map<String, String>> columnsProperties,
Map<String, String> properties)
```
changed to
```
createIndex(String indexName,
NamedReference[] columns,
Map<NamedReference, Map<String, String>> columnsProperties,
Map<String, String> properties
```
### How was this patch tested?
new test
Closes #34486 from huaxingao/deleteIndexType.
Lead-authored-by: Huaxin Gao <hu...@apple.com>
Co-authored-by: Wenchen Fan <cl...@gmail.com>
Co-authored-by: Huaxin Gao <hu...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/jdbc/v2/MySQLIntegrationSuite.scala | 67 ------------------
.../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 82 +++++-----------------
.../sql/connector/catalog/index/SupportsIndex.java | 8 ++-
.../sql/execution/datasources/jdbc/JdbcUtils.scala | 3 +-
.../execution/datasources/v2/CreateIndexExec.scala | 9 ++-
.../execution/datasources/v2/jdbc/JDBCTable.scala | 3 +-
.../org/apache/spark/sql/jdbc/JdbcDialects.scala | 1 -
.../org/apache/spark/sql/jdbc/MySQLDialect.scala | 27 ++++---
8 files changed, 45 insertions(+), 155 deletions(-)
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
index d77dcb4..592f7d6 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
@@ -18,16 +18,11 @@
package org.apache.spark.sql.jdbc.v2
import java.sql.{Connection, SQLFeatureNotSupportedException}
-import java.util
import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkConf
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException}
-import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog}
-import org.apache.spark.sql.connector.catalog.index.SupportsIndex
-import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite}
import org.apache.spark.sql.types._
@@ -122,66 +117,4 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
}
override def supportsIndex: Boolean = true
-
- override def testIndexProperties(jdbcTable: SupportsIndex): Unit = {
- val properties = new util.HashMap[String, String]();
- properties.put("KEY_BLOCK_SIZE", "10")
- properties.put("COMMENT", "'this is a comment'")
- // MySQL doesn't allow property set on individual column, so use empty Array for
- // column properties
- jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")),
- new util.HashMap[NamedReference, util.Map[String, String]](), properties)
-
- var index = jdbcTable.listIndexes()
- // The index property size is actually 1. Even though the index is created
- // with properties "KEY_BLOCK_SIZE", "10" and "COMMENT", "'this is a comment'", when
- // retrieving index using `SHOW INDEXES`, MySQL only returns `COMMENT`.
- assert(index(0).properties.size == 1)
- assert(index(0).properties.get("COMMENT").equals("this is a comment"))
- }
-
- override def testIndexUsingSQL(tbl: String): Unit = {
- val loaded = Catalogs.load("mysql", conf)
- val jdbcTable = loaded.asInstanceOf[TableCatalog]
- .loadTable(Identifier.of(Array.empty[String], "new_table"))
- .asInstanceOf[SupportsIndex]
- assert(jdbcTable.indexExists("i1") == false)
- assert(jdbcTable.indexExists("i2") == false)
-
- val indexType = "DUMMY"
- var m = intercept[UnsupportedOperationException] {
- sql(s"CREATE index i1 ON $catalogName.new_table USING DUMMY (col1)")
- }.getMessage
- assert(m.contains(s"Index Type $indexType is not supported." +
- s" The supported Index Types are: BTREE and HASH"))
-
- sql(s"CREATE index i1 ON $catalogName.new_table USING BTREE (col1)")
- sql(s"CREATE index i2 ON $catalogName.new_table (col2, col3, col5)" +
- s" OPTIONS (KEY_BLOCK_SIZE=10)")
-
- assert(jdbcTable.indexExists("i1") == true)
- assert(jdbcTable.indexExists("i2") == true)
-
- // This should pass without exception
- sql(s"CREATE index IF NOT EXISTS i1 ON $catalogName.new_table (col1)")
-
- m = intercept[IndexAlreadyExistsException] {
- sql(s"CREATE index i1 ON $catalogName.new_table (col1)")
- }.getMessage
- assert(m.contains("Failed to create index i1 in new_table"))
-
- sql(s"DROP index i1 ON $catalogName.new_table")
- sql(s"DROP index i2 ON $catalogName.new_table")
-
- assert(jdbcTable.indexExists("i1") == false)
- assert(jdbcTable.indexExists("i2") == false)
-
- // This should pass without exception
- sql(s"DROP index IF EXISTS i1 ON $catalogName.new_table")
-
- m = intercept[NoSuchIndexException] {
- sql(s"DROP index i1 ON $catalogName.new_table")
- }.getMessage
- assert(m.contains("Failed to drop index i1 in new_table"))
- }
}
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
index 717624b..d292051 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.jdbc.v2
-import java.util
-
import org.apache.log4j.Level
import org.apache.spark.sql.AnalysisException
@@ -27,7 +25,6 @@ import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSu
import org.apache.spark.sql.catalyst.plans.logical.{Filter, Sample}
import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog}
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
-import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, V1ScanWrapper}
import org.apache.spark.sql.jdbc.DockerIntegrationFunSuite
import org.apache.spark.sql.test.SharedSparkSession
@@ -193,14 +190,12 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
}
def supportsIndex: Boolean = false
- def testIndexProperties(jdbcTable: SupportsIndex): Unit = {}
- def testIndexUsingSQL(tbl: String): Unit = {}
- test("SPARK-36913: Test INDEX") {
+ test("SPARK-36895: Test INDEX Using SQL") {
if (supportsIndex) {
withTable(s"$catalogName.new_table") {
sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT," +
- s" col4 INT, col5 INT)")
+ " col4 INT, col5 INT)")
val loaded = Catalogs.load(catalogName, conf)
val jdbcTable = loaded.asInstanceOf[TableCatalog]
.loadTable(Identifier.of(Array.empty[String], "new_table"))
@@ -208,88 +203,45 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
assert(jdbcTable.indexExists("i1") == false)
assert(jdbcTable.indexExists("i2") == false)
- val properties = new util.HashMap[String, String]();
val indexType = "DUMMY"
var m = intercept[UnsupportedOperationException] {
- jdbcTable.createIndex("i1", indexType, Array(FieldReference("col1")),
- new util.HashMap[NamedReference, util.Map[String, String]](), properties)
+ sql(s"CREATE index i1 ON $catalogName.new_table USING $indexType (col1)")
}.getMessage
assert(m.contains(s"Index Type $indexType is not supported." +
s" The supported Index Types are: BTREE and HASH"))
- jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")),
- new util.HashMap[NamedReference, util.Map[String, String]](), properties)
-
- jdbcTable.createIndex("i2", "",
- Array(FieldReference("col2"), FieldReference("col3"), FieldReference("col5")),
- new util.HashMap[NamedReference, util.Map[String, String]](), properties)
+ sql(s"CREATE index i1 ON $catalogName.new_table USING BTREE (col1)")
+ sql(s"CREATE index i2 ON $catalogName.new_table (col2, col3, col5)" +
+ s" OPTIONS (KEY_BLOCK_SIZE=10)")
assert(jdbcTable.indexExists("i1") == true)
assert(jdbcTable.indexExists("i2") == true)
+ // This should pass without exception
+ sql(s"CREATE index IF NOT EXISTS i1 ON $catalogName.new_table (col1)")
+
m = intercept[IndexAlreadyExistsException] {
- jdbcTable.createIndex("i1", "", Array(FieldReference("col1")),
- new util.HashMap[NamedReference, util.Map[String, String]](), properties)
+ sql(s"CREATE index i1 ON $catalogName.new_table (col1)")
}.getMessage
assert(m.contains("Failed to create index i1 in new_table"))
- var index = jdbcTable.listIndexes()
- assert(index.length == 2)
-
- assert(index(0).indexName.equals("i1"))
- assert(index(0).indexType.equals("BTREE"))
- var cols = index(0).columns
- assert(cols.length == 1)
- assert(cols(0).describe().equals("col1"))
- assert(index(0).properties.size == 0)
-
- assert(index(1).indexName.equals("i2"))
- assert(index(1).indexType.equals("BTREE"))
- cols = index(1).columns
- assert(cols.length == 3)
- assert(cols(0).describe().equals("col2"))
- assert(cols(1).describe().equals("col3"))
- assert(cols(2).describe().equals("col5"))
- assert(index(1).properties.size == 0)
-
- jdbcTable.dropIndex("i1")
- assert(jdbcTable.indexExists("i1") == false)
- assert(jdbcTable.indexExists("i2") == true)
-
- index = jdbcTable.listIndexes()
- assert(index.length == 1)
-
- assert(index(0).indexName.equals("i2"))
- assert(index(0).indexType.equals("BTREE"))
- cols = index(0).columns
- assert(cols.length == 3)
- assert(cols(0).describe().equals("col2"))
- assert(cols(1).describe().equals("col3"))
- assert(cols(2).describe().equals("col5"))
+ sql(s"DROP index i1 ON $catalogName.new_table")
+ sql(s"DROP index i2 ON $catalogName.new_table")
- jdbcTable.dropIndex("i2")
assert(jdbcTable.indexExists("i1") == false)
assert(jdbcTable.indexExists("i2") == false)
- index = jdbcTable.listIndexes()
- assert(index.length == 0)
+
+ // This should pass without exception
+ sql(s"DROP index IF EXISTS i1 ON $catalogName.new_table")
m = intercept[NoSuchIndexException] {
- jdbcTable.dropIndex("i2")
+ sql(s"DROP index i1 ON $catalogName.new_table")
}.getMessage
- assert(m.contains("Failed to drop index i2 in new_table"))
-
- testIndexProperties(jdbcTable)
+ assert(m.contains("Failed to drop index i1 in new_table"))
}
}
}
- test("SPARK-36895: Test INDEX Using SQL") {
- withTable(s"$catalogName.new_table") {
- sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT, col4 INT, col5 INT)")
- testIndexUsingSQL(s"$catalogName.new_table")
- }
- }
-
def supportsTableSample: Boolean = false
private def samplePushed(df: DataFrame): Boolean = {
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java
index 9cf39eb..734b290 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java
@@ -34,18 +34,20 @@ import org.apache.spark.sql.connector.expressions.NamedReference;
public interface SupportsIndex extends Table {
/**
+ * A reserved property to specify the index type.
+ */
+ String PROP_TYPE = "type";
+
+ /**
* Creates an index.
*
* @param indexName the name of the index to be created
- * @param indexType the type of the index to be created. If this is not specified, Spark
- * will use empty String.
* @param columns the columns on which index to be created
* @param columnsProperties the properties of the columns on which index to be created
* @param properties the properties of the index to be created
* @throws IndexAlreadyExistsException If the index already exists.
*/
void createIndex(String indexName,
- String indexType,
NamedReference[] columns,
Map<NamedReference, Map<String, String>> columnsProperties,
Map<String, String> properties)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 4402f27..526f9d0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -1018,7 +1018,6 @@ object JdbcUtils extends Logging with SQLConfHelper {
def createIndex(
conn: Connection,
indexName: String,
- indexType: String,
tableName: String,
columns: Array[NamedReference],
columnsProperties: util.Map[NamedReference, util.Map[String, String]],
@@ -1026,7 +1025,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
options: JDBCOptions): Unit = {
val dialect = JdbcDialects.get(options.url)
executeStatement(conn, options,
- dialect.createIndex(indexName, indexType, tableName, columns, columnsProperties, properties))
+ dialect.createIndex(indexName, tableName, columns, columnsProperties, properties))
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala
index 78bdf64..20ccf99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala
@@ -39,13 +39,20 @@ case class CreateIndexExec(
properties: Map[String, String])
extends LeafV2CommandExec {
override protected def run(): Seq[InternalRow] = {
+
+ val propertiesWithIndexType: Map[String, String] = if (indexType.nonEmpty) {
+ properties + (SupportsIndex.PROP_TYPE -> indexType)
+ } else {
+ properties
+ }
+
val colProperties = new util.HashMap[NamedReference, util.Map[String, String]]
columns.foreach {
case (column, map) => colProperties.put(column, map.asJava)
}
try {
table.createIndex(
- indexName, indexType, columns.unzip._1.toArray, colProperties, properties.asJava)
+ indexName, columns.unzip._1.toArray, colProperties, propertiesWithIndexType.asJava)
} catch {
case _: IndexAlreadyExistsException if ignoreIfExists =>
logWarning(s"Index $indexName already exists in table ${table.name}. Ignoring.")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
index 304431d..31c0167 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
@@ -54,7 +54,6 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt
override def createIndex(
indexName: String,
- indexType: String,
columns: Array[NamedReference],
columnsProperties: util.Map[NamedReference, util.Map[String, String]],
properties: util.Map[String, String]): Unit = {
@@ -62,7 +61,7 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt
JdbcUtils.classifyException(s"Failed to create index $indexName in $name",
JdbcDialects.get(jdbcOptions.url)) {
JdbcUtils.createIndex(
- conn, indexName, indexType, name, columns, columnsProperties, properties, jdbcOptions)
+ conn, indexName, name, columns, columnsProperties, properties, jdbcOptions)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 568318c..54273dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -304,7 +304,6 @@ abstract class JdbcDialect extends Serializable with Logging{
*/
def createIndex(
indexName: String,
- indexType: String,
tableName: String,
columns: Array[NamedReference],
columnsProperties: util.Map[NamedReference, util.Map[String, String]],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index 73b36f1..28e15b4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -26,7 +26,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException}
-import org.apache.spark.sql.connector.catalog.index.TableIndex
+import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex}
import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
@@ -115,30 +115,29 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
// https://dev.mysql.com/doc/refman/8.0/en/create-index.html
override def createIndex(
indexName: String,
- indexType: String,
tableName: String,
columns: Array[NamedReference],
columnsProperties: util.Map[NamedReference, util.Map[String, String]],
properties: util.Map[String, String]): String = {
val columnList = columns.map(col => quoteIdentifier(col.fieldNames.head))
var indexProperties: String = ""
+ var indexType = ""
if (!properties.isEmpty) {
properties.asScala.foreach { case (k, v) =>
- indexProperties = indexProperties + " " + s"$k $v"
- }
- }
- val iType = if (indexType.isEmpty) {
- ""
- } else {
- if (indexType.length > 1 && !indexType.equalsIgnoreCase("BTREE") &&
- !indexType.equalsIgnoreCase("HASH")) {
- throw new UnsupportedOperationException(s"Index Type $indexType is not supported." +
- " The supported Index Types are: BTREE and HASH")
+ if (k.equals(SupportsIndex.PROP_TYPE)) {
+ if (v.equalsIgnoreCase("BTREE") || v.equalsIgnoreCase("HASH")) {
+ indexType = s"USING $v"
+ } else {
+ throw new UnsupportedOperationException(s"Index Type $v is not supported." +
+ " The supported Index Types are: BTREE and HASH")
+ }
+ } else {
+ indexProperties = indexProperties + " " + s"$k $v"
+ }
}
- s"USING $indexType"
}
// columnsProperties doesn't apply to MySQL so it is ignored
- s"CREATE INDEX ${quoteIdentifier(indexName)} $iType ON" +
+ s"CREATE INDEX ${quoteIdentifier(indexName)} $indexType ON" +
s" ${quoteIdentifier(tableName)} (${columnList.mkString(", ")}) $indexProperties"
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org