You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/02/28 06:16:56 UTC
[spark] branch branch-3.0 updated: [SPARK-30902][SQL] Default table
provider should be decided by catalog implementations
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new dd6a91b [SPARK-30902][SQL] Default table provider should be decided by catalog implementations
dd6a91b is described below
commit dd6a91b5a7c1c260f821372995789f7b25bbb30e
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Fri Feb 28 15:14:23 2020 +0900
[SPARK-30902][SQL] Default table provider should be decided by catalog implementations
### What changes were proposed in this pull request?
When `CREATE TABLE` SQL statement does not specify the provider, leave it to the catalog implementations to decide.
### Why are the changes needed?
It's super weird if we set the default provider to parquet when creating a table in a JDBC catalog.
### Does this PR introduce any user-facing change?
Yes, v2 catalog will not see a "provider" property in table properties if it's not specified in `CREATE TABLE` SQL statement. V2 catalog is new in 3.0.
### How was this patch tested?
new tests
Closes #27650 from cloud-fan/create_table.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
(cherry picked from commit f21894e5faacb13d8019df1e8fcca4253f68d0ab)
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../sql/catalyst/analysis/ResolveCatalogs.scala | 4 +-
.../spark/sql/catalyst/parser/AstBuilder.scala | 4 +-
.../sql/catalyst/plans/logical/statements.scala | 4 +-
.../sql/connector/catalog/CatalogV2Util.scala | 7 +--
.../spark/sql/catalyst/parser/DDLParserSuite.scala | 64 ++++++++++------------
.../catalyst/analysis/ResolveSessionCatalog.scala | 18 +++---
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 34 ++++++++++++
.../sql/execution/command/DDLParserSuite.scala | 4 +-
8 files changed, 84 insertions(+), 55 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index 88a3c0a..78a3171 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -153,7 +153,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
c.tableSchema,
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
- convertTableProperties(c.properties, c.options, c.location, c.comment, c.provider),
+ convertTableProperties(c.properties, c.options, c.location, c.comment, Some(c.provider)),
orCreate = c.orCreate)
case c @ ReplaceTableAsSelectStatement(
@@ -164,7 +164,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
c.asSelect,
- convertTableProperties(c.properties, c.options, c.location, c.comment, c.provider),
+ convertTableProperties(c.properties, c.options, c.location, c.comment, Some(c.provider)),
writeOptions = c.options,
orCreate = c.orCreate)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index fe72446..ce3383d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -2763,9 +2763,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
operationNotAllowed("CREATE EXTERNAL TABLE ...", ctx)
}
val schema = Option(ctx.colTypeList()).map(createSchema)
- val defaultProvider = conf.defaultDataSourceName
- val provider =
- Option(ctx.tableProvider).map(_.multipartIdentifier.getText).getOrElse(defaultProvider)
+ val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
val (partitioning, bucketSpec, properties, options, location, comment) =
visitCreateTableClauses(ctx.createTableClauses())
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
index 1e6b67b..89e1539 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
@@ -64,7 +64,7 @@ case class CreateTableStatement(
partitioning: Seq[Transform],
bucketSpec: Option[BucketSpec],
properties: Map[String, String],
- provider: String,
+ provider: Option[String],
options: Map[String, String],
location: Option[String],
comment: Option[String],
@@ -79,7 +79,7 @@ case class CreateTableAsSelectStatement(
partitioning: Seq[Transform],
bucketSpec: Option[BucketSpec],
properties: Map[String, String],
- provider: String,
+ provider: Option[String],
options: Map[String, String],
location: Option[String],
comment: Option[String],
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index 0fabe4d..ff63201 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -298,10 +298,9 @@ private[sql] object CatalogV2Util {
options: Map[String, String],
location: Option[String],
comment: Option[String],
- provider: String): Map[String, String] = {
- properties ++
- options ++
- Map(TableCatalog.PROP_PROVIDER -> provider) ++
+ provider: Option[String]): Map[String, String] = {
+ properties ++ options ++
+ provider.map(TableCatalog.PROP_PROVIDER -> _) ++
comment.map(TableCatalog.PROP_COMMENT -> _) ++
location.map(TableCatalog.PROP_LOCATION -> _)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 4a636bd..967d0bd 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -49,26 +49,6 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(parsePlan(sql), expected, checkAnalysis = false)
}
- test("SPARK-30098: create table without provider should " +
- "use default data source under non-legacy mode") {
- val createSql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING)"
- val defaultProvider = conf.defaultDataSourceName
- val expectedPlan = CreateTableStatement(
- Seq("my_tab"),
- new StructType()
- .add("a", IntegerType, nullable = true, "test")
- .add("b", StringType),
- Seq.empty[Transform],
- None,
- Map.empty[String, String],
- defaultProvider,
- Map.empty[String, String],
- None,
- None,
- false)
- parseCompare(createSql, expectedPlan)
- }
-
test("create/replace table using - schema") {
val createSql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING NOT NULL) USING parquet"
val replaceSql = "REPLACE TABLE my_tab(a INT COMMENT 'test', b STRING NOT NULL) USING parquet"
@@ -80,7 +60,7 @@ class DDLParserSuite extends AnalysisTest {
Seq.empty[Transform],
None,
Map.empty[String, String],
- "parquet",
+ Some("parquet"),
Map.empty[String, String],
None,
None)
@@ -103,7 +83,7 @@ class DDLParserSuite extends AnalysisTest {
Seq.empty[Transform],
None,
Map.empty[String, String],
- "parquet",
+ Some("parquet"),
Map.empty[String, String],
None,
None),
@@ -123,7 +103,7 @@ class DDLParserSuite extends AnalysisTest {
Seq(IdentityTransform(FieldReference("a"))),
None,
Map.empty[String, String],
- "parquet",
+ Some("parquet"),
Map.empty[String, String],
None,
None)
@@ -177,7 +157,7 @@ class DDLParserSuite extends AnalysisTest {
LiteralValue(34, IntegerType)))),
None,
Map.empty[String, String],
- "parquet",
+ Some("parquet"),
Map.empty[String, String],
None,
None)
@@ -199,7 +179,7 @@ class DDLParserSuite extends AnalysisTest {
Seq.empty[Transform],
Some(BucketSpec(5, Seq("a"), Seq("b"))),
Map.empty[String, String],
- "parquet",
+ Some("parquet"),
Map.empty[String, String],
None,
None)
@@ -217,7 +197,7 @@ class DDLParserSuite extends AnalysisTest {
Seq.empty[Transform],
None,
Map.empty[String, String],
- "parquet",
+ Some("parquet"),
Map.empty[String, String],
None,
Some("abc"))
@@ -237,7 +217,7 @@ class DDLParserSuite extends AnalysisTest {
Seq.empty[Transform],
None,
Map("test" -> "test"),
- "parquet",
+ Some("parquet"),
Map.empty[String, String],
None,
None)
@@ -255,7 +235,7 @@ class DDLParserSuite extends AnalysisTest {
Seq.empty[Transform],
None,
Map.empty[String, String],
- "parquet",
+ Some("parquet"),
Map.empty[String, String],
Some("/tmp/file"),
None)
@@ -273,7 +253,7 @@ class DDLParserSuite extends AnalysisTest {
Seq.empty[Transform],
None,
Map.empty[String, String],
- "parquet",
+ Some("parquet"),
Map.empty[String, String],
None,
None)
@@ -334,7 +314,7 @@ class DDLParserSuite extends AnalysisTest {
Seq.empty[Transform],
Option.empty[BucketSpec],
Map.empty[String, String],
- "json",
+ Some("json"),
Map("a" -> "1", "b" -> "0.1", "c" -> "true"),
None,
None),
@@ -389,7 +369,7 @@ class DDLParserSuite extends AnalysisTest {
Seq.empty[Transform],
None,
Map("p1" -> "v1", "p2" -> "v2"),
- "parquet",
+ Some("parquet"),
Map.empty[String, String],
Some("/user/external/page_view"),
Some("This is the staging page view table"))
@@ -2059,7 +2039,7 @@ class DDLParserSuite extends AnalysisTest {
partitioning: Seq[Transform],
bucketSpec: Option[BucketSpec],
properties: Map[String, String],
- provider: String,
+ provider: Option[String],
options: Map[String, String],
location: Option[String],
comment: Option[String])
@@ -2085,7 +2065,7 @@ class DDLParserSuite extends AnalysisTest {
replace.partitioning,
replace.bucketSpec,
replace.properties,
- replace.provider,
+ Some(replace.provider),
replace.options,
replace.location,
replace.comment)
@@ -2107,7 +2087,7 @@ class DDLParserSuite extends AnalysisTest {
rtas.partitioning,
rtas.bucketSpec,
rtas.properties,
- rtas.provider,
+ Some(rtas.provider),
rtas.options,
rtas.location,
rtas.comment)
@@ -2135,4 +2115,20 @@ class DDLParserSuite extends AnalysisTest {
parsePlan("COMMENT ON TABLE a.b.c IS 'xYz'"),
CommentOnTable(UnresolvedTable(Seq("a", "b", "c")), "xYz"))
}
+
+ test("create table - without using") {
+ val sql = "CREATE TABLE 1m.2g(a INT)"
+ val expectedTableSpec = TableSpec(
+ Seq("1m", "2g"),
+ Some(new StructType().add("a", IntegerType)),
+ Seq.empty[Transform],
+ None,
+ Map.empty[String, String],
+ None,
+ Map.empty[String, String],
+ None,
+ None)
+
+ testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 9941f56..baaa24f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -235,9 +235,10 @@ class ResolveSessionCatalog(
// session catalog and the table provider is not v2.
case c @ CreateTableStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
- if (!isV2Provider(c.provider)) {
+ val provider = c.provider.getOrElse(conf.defaultDataSourceName)
+ if (!isV2Provider(provider)) {
val tableDesc = buildCatalogTable(tbl.asTableIdentifier, c.tableSchema,
- c.partitioning, c.bucketSpec, c.properties, c.provider, c.options, c.location,
+ c.partitioning, c.bucketSpec, c.properties, provider, c.options, c.location,
c.comment, c.ifNotExists)
val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTable(tableDesc, mode, None)
@@ -248,15 +249,16 @@ class ResolveSessionCatalog(
c.tableSchema,
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
- convertTableProperties(c.properties, c.options, c.location, c.comment, c.provider),
+ convertTableProperties(c.properties, c.options, c.location, c.comment, Some(provider)),
ignoreIfExists = c.ifNotExists)
}
case c @ CreateTableAsSelectStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
- if (!isV2Provider(c.provider)) {
+ val provider = c.provider.getOrElse(conf.defaultDataSourceName)
+ if (!isV2Provider(provider)) {
val tableDesc = buildCatalogTable(tbl.asTableIdentifier, new StructType,
- c.partitioning, c.bucketSpec, c.properties, c.provider, c.options, c.location,
+ c.partitioning, c.bucketSpec, c.properties, provider, c.options, c.location,
c.comment, c.ifNotExists)
val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTable(tableDesc, mode, Some(c.asSelect))
@@ -267,7 +269,7 @@ class ResolveSessionCatalog(
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
c.asSelect,
- convertTableProperties(c.properties, c.options, c.location, c.comment, c.provider),
+ convertTableProperties(c.properties, c.options, c.location, c.comment, Some(provider)),
writeOptions = c.options,
ignoreIfExists = c.ifNotExists)
}
@@ -289,7 +291,7 @@ class ResolveSessionCatalog(
c.tableSchema,
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
- convertTableProperties(c.properties, c.options, c.location, c.comment, c.provider),
+ convertTableProperties(c.properties, c.options, c.location, c.comment, Some(c.provider)),
orCreate = c.orCreate)
}
@@ -304,7 +306,7 @@ class ResolveSessionCatalog(
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
c.asSelect,
- convertTableProperties(c.properties, c.options, c.location, c.comment, c.provider),
+ convertTableProperties(c.properties, c.options, c.location, c.comment, Some(c.provider)),
writeOptions = c.options,
orCreate = c.orCreate)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 4ff2093..c074b335 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -256,6 +256,23 @@ class DataSourceV2SQLSuite
checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Seq.empty)
}
+ test("CreateTable: without USING clause") {
+ // unset this config to use the default v2 session catalog.
+ spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
+ val testCatalog = catalog("testcat").asTableCatalog
+
+ sql("CREATE TABLE testcat.t1 (id int)")
+ val t1 = testCatalog.loadTable(Identifier.of(Array(), "t1"))
+ // Spark shouldn't set the default provider for catalog plugins.
+ assert(!t1.properties.containsKey(TableCatalog.PROP_PROVIDER))
+
+ sql("CREATE TABLE t2 (id int)")
+ val t2 = spark.sessionState.catalogManager.v2SessionCatalog.asTableCatalog
+ .loadTable(Identifier.of(Array("default"), "t2")).asInstanceOf[V1Table]
+ // Spark should set the default provider as DEFAULT_DATA_SOURCE_NAME for the session catalog.
+ assert(t2.v1Table.provider == Some(conf.defaultDataSourceName))
+ }
+
test("CreateTable/RepalceTable: invalid schema if has interval type") {
Seq("CREATE", "REPLACE").foreach { action =>
val e1 = intercept[AnalysisException](
@@ -595,6 +612,23 @@ class DataSourceV2SQLSuite
}
}
+ test("CreateTableAsSelect: without USING clause") {
+ // unset this config to use the default v2 session catalog.
+ spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
+ val testCatalog = catalog("testcat").asTableCatalog
+
+ sql("CREATE TABLE testcat.t1 AS SELECT 1 i")
+ val t1 = testCatalog.loadTable(Identifier.of(Array(), "t1"))
+ // Spark shouldn't set the default provider for catalog plugins.
+ assert(!t1.properties.containsKey(TableCatalog.PROP_PROVIDER))
+
+ sql("CREATE TABLE t2 AS SELECT 1 i")
+ val t2 = spark.sessionState.catalogManager.v2SessionCatalog.asTableCatalog
+ .loadTable(Identifier.of(Array("default"), "t2")).asInstanceOf[V1Table]
+ // Spark should set the default provider as DEFAULT_DATA_SOURCE_NAME for the session catalog.
+ assert(t2.v1Table.provider == Some(conf.defaultDataSourceName))
+ }
+
test("DropTable: basic") {
val tableName = "testcat.ns1.ns2.tbl"
val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
index 81965e4..bacd64e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
@@ -492,7 +492,7 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
assert(statement.partitioning.isEmpty)
assert(statement.bucketSpec.isEmpty)
assert(statement.properties.isEmpty)
- assert(statement.provider == conf.defaultDataSourceName)
+ assert(statement.provider.isEmpty)
assert(statement.options.isEmpty)
assert(statement.location.isEmpty)
assert(statement.comment.isEmpty)
@@ -662,7 +662,7 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
assert(state.partitioning.isEmpty)
assert(state.bucketSpec.isEmpty)
assert(state.properties.isEmpty)
- assert(state.provider == conf.defaultDataSourceName)
+ assert(state.provider.isEmpty)
assert(state.options.isEmpty)
assert(state.location.isEmpty)
assert(state.comment.isEmpty)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org