You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/02/28 06:16:56 UTC

[spark] branch branch-3.0 updated: [SPARK-30902][SQL] Default table provider should be decided by catalog implementations

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new dd6a91b  [SPARK-30902][SQL] Default table provider should be decided by catalog implementations
dd6a91b is described below

commit dd6a91b5a7c1c260f821372995789f7b25bbb30e
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Fri Feb 28 15:14:23 2020 +0900

    [SPARK-30902][SQL] Default table provider should be decided by catalog implementations
    
    ### What changes were proposed in this pull request?
    
    When `CREATE TABLE` SQL statement does not specify the provider, leave it to the catalog implementations to decide.
    
    ### Why are the changes needed?
    
    It's super weird if we set the default provider to parquet when creating a table in a JDBC catalog.
    
    ### Does this PR introduce any user-facing change?
    
    Yes, v2 catalog will not see a "provider" property in table properties if it's not specified in `CREATE TABLE` SQL statement. V2 catalog is new in 3.0.
    
    ### How was this patch tested?
    
    new tests
    
    Closes #27650 from cloud-fan/create_table.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit f21894e5faacb13d8019df1e8fcca4253f68d0ab)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../sql/catalyst/analysis/ResolveCatalogs.scala    |  4 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  4 +-
 .../sql/catalyst/plans/logical/statements.scala    |  4 +-
 .../sql/connector/catalog/CatalogV2Util.scala      |  7 +--
 .../spark/sql/catalyst/parser/DDLParserSuite.scala | 64 ++++++++++------------
 .../catalyst/analysis/ResolveSessionCatalog.scala  | 18 +++---
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 34 ++++++++++++
 .../sql/execution/command/DDLParserSuite.scala     |  4 +-
 8 files changed, 84 insertions(+), 55 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index 88a3c0a..78a3171 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -153,7 +153,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
         c.tableSchema,
         // convert the bucket spec and add it as a transform
         c.partitioning ++ c.bucketSpec.map(_.asTransform),
-        convertTableProperties(c.properties, c.options, c.location, c.comment, c.provider),
+        convertTableProperties(c.properties, c.options, c.location, c.comment, Some(c.provider)),
         orCreate = c.orCreate)
 
     case c @ ReplaceTableAsSelectStatement(
@@ -164,7 +164,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
         // convert the bucket spec and add it as a transform
         c.partitioning ++ c.bucketSpec.map(_.asTransform),
         c.asSelect,
-        convertTableProperties(c.properties, c.options, c.location, c.comment, c.provider),
+        convertTableProperties(c.properties, c.options, c.location, c.comment, Some(c.provider)),
         writeOptions = c.options,
         orCreate = c.orCreate)
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index fe72446..ce3383d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -2763,9 +2763,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
       operationNotAllowed("CREATE EXTERNAL TABLE ...", ctx)
     }
     val schema = Option(ctx.colTypeList()).map(createSchema)
-    val defaultProvider = conf.defaultDataSourceName
-    val provider =
-      Option(ctx.tableProvider).map(_.multipartIdentifier.getText).getOrElse(defaultProvider)
+    val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
     val (partitioning, bucketSpec, properties, options, location, comment) =
       visitCreateTableClauses(ctx.createTableClauses())
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
index 1e6b67b..89e1539 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
@@ -64,7 +64,7 @@ case class CreateTableStatement(
     partitioning: Seq[Transform],
     bucketSpec: Option[BucketSpec],
     properties: Map[String, String],
-    provider: String,
+    provider: Option[String],
     options: Map[String, String],
     location: Option[String],
     comment: Option[String],
@@ -79,7 +79,7 @@ case class CreateTableAsSelectStatement(
     partitioning: Seq[Transform],
     bucketSpec: Option[BucketSpec],
     properties: Map[String, String],
-    provider: String,
+    provider: Option[String],
     options: Map[String, String],
     location: Option[String],
     comment: Option[String],
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index 0fabe4d..ff63201 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -298,10 +298,9 @@ private[sql] object CatalogV2Util {
       options: Map[String, String],
       location: Option[String],
       comment: Option[String],
-      provider: String): Map[String, String] = {
-    properties ++
-      options ++
-      Map(TableCatalog.PROP_PROVIDER -> provider) ++
+      provider: Option[String]): Map[String, String] = {
+    properties ++ options ++
+      provider.map(TableCatalog.PROP_PROVIDER -> _) ++
       comment.map(TableCatalog.PROP_COMMENT -> _) ++
       location.map(TableCatalog.PROP_LOCATION -> _)
   }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 4a636bd..967d0bd 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -49,26 +49,6 @@ class DDLParserSuite extends AnalysisTest {
     comparePlans(parsePlan(sql), expected, checkAnalysis = false)
   }
 
-  test("SPARK-30098: create table without provider should " +
-    "use default data source under non-legacy mode") {
-    val createSql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING)"
-    val defaultProvider = conf.defaultDataSourceName
-    val expectedPlan = CreateTableStatement(
-      Seq("my_tab"),
-      new StructType()
-        .add("a", IntegerType, nullable = true, "test")
-        .add("b", StringType),
-      Seq.empty[Transform],
-      None,
-      Map.empty[String, String],
-      defaultProvider,
-      Map.empty[String, String],
-      None,
-      None,
-      false)
-    parseCompare(createSql, expectedPlan)
-  }
-
   test("create/replace table using - schema") {
     val createSql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING NOT NULL) USING parquet"
     val replaceSql = "REPLACE TABLE my_tab(a INT COMMENT 'test', b STRING NOT NULL) USING parquet"
@@ -80,7 +60,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq.empty[Transform],
       None,
       Map.empty[String, String],
-      "parquet",
+      Some("parquet"),
       Map.empty[String, String],
       None,
       None)
@@ -103,7 +83,7 @@ class DDLParserSuite extends AnalysisTest {
         Seq.empty[Transform],
         None,
         Map.empty[String, String],
-        "parquet",
+        Some("parquet"),
         Map.empty[String, String],
         None,
         None),
@@ -123,7 +103,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq(IdentityTransform(FieldReference("a"))),
       None,
       Map.empty[String, String],
-      "parquet",
+      Some("parquet"),
       Map.empty[String, String],
       None,
       None)
@@ -177,7 +157,7 @@ class DDLParserSuite extends AnalysisTest {
           LiteralValue(34, IntegerType)))),
       None,
       Map.empty[String, String],
-      "parquet",
+      Some("parquet"),
       Map.empty[String, String],
       None,
       None)
@@ -199,7 +179,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq.empty[Transform],
       Some(BucketSpec(5, Seq("a"), Seq("b"))),
       Map.empty[String, String],
-      "parquet",
+      Some("parquet"),
       Map.empty[String, String],
       None,
       None)
@@ -217,7 +197,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq.empty[Transform],
       None,
       Map.empty[String, String],
-      "parquet",
+      Some("parquet"),
       Map.empty[String, String],
       None,
       Some("abc"))
@@ -237,7 +217,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq.empty[Transform],
       None,
       Map("test" -> "test"),
-      "parquet",
+      Some("parquet"),
       Map.empty[String, String],
       None,
       None)
@@ -255,7 +235,7 @@ class DDLParserSuite extends AnalysisTest {
         Seq.empty[Transform],
         None,
         Map.empty[String, String],
-        "parquet",
+        Some("parquet"),
         Map.empty[String, String],
         Some("/tmp/file"),
         None)
@@ -273,7 +253,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq.empty[Transform],
       None,
       Map.empty[String, String],
-      "parquet",
+      Some("parquet"),
       Map.empty[String, String],
       None,
       None)
@@ -334,7 +314,7 @@ class DDLParserSuite extends AnalysisTest {
           Seq.empty[Transform],
           Option.empty[BucketSpec],
           Map.empty[String, String],
-          "json",
+          Some("json"),
           Map("a" -> "1", "b" -> "0.1", "c" -> "true"),
           None,
           None),
@@ -389,7 +369,7 @@ class DDLParserSuite extends AnalysisTest {
         Seq.empty[Transform],
         None,
         Map("p1" -> "v1", "p2" -> "v2"),
-        "parquet",
+        Some("parquet"),
         Map.empty[String, String],
         Some("/user/external/page_view"),
         Some("This is the staging page view table"))
@@ -2059,7 +2039,7 @@ class DDLParserSuite extends AnalysisTest {
       partitioning: Seq[Transform],
       bucketSpec: Option[BucketSpec],
       properties: Map[String, String],
-      provider: String,
+      provider: Option[String],
       options: Map[String, String],
       location: Option[String],
       comment: Option[String])
@@ -2085,7 +2065,7 @@ class DDLParserSuite extends AnalysisTest {
             replace.partitioning,
             replace.bucketSpec,
             replace.properties,
-            replace.provider,
+            Some(replace.provider),
             replace.options,
             replace.location,
             replace.comment)
@@ -2107,7 +2087,7 @@ class DDLParserSuite extends AnalysisTest {
             rtas.partitioning,
             rtas.bucketSpec,
             rtas.properties,
-            rtas.provider,
+            Some(rtas.provider),
             rtas.options,
             rtas.location,
             rtas.comment)
@@ -2135,4 +2115,20 @@ class DDLParserSuite extends AnalysisTest {
       parsePlan("COMMENT ON TABLE a.b.c IS 'xYz'"),
       CommentOnTable(UnresolvedTable(Seq("a", "b", "c")), "xYz"))
   }
+
+  test("create table - without using") {
+    val sql = "CREATE TABLE 1m.2g(a INT)"
+    val expectedTableSpec = TableSpec(
+      Seq("1m", "2g"),
+      Some(new StructType().add("a", IntegerType)),
+      Seq.empty[Transform],
+      None,
+      Map.empty[String, String],
+      None,
+      Map.empty[String, String],
+      None,
+      None)
+
+    testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 9941f56..baaa24f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -235,9 +235,10 @@ class ResolveSessionCatalog(
     // session catalog and the table provider is not v2.
     case c @ CreateTableStatement(
          SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
-      if (!isV2Provider(c.provider)) {
+      val provider = c.provider.getOrElse(conf.defaultDataSourceName)
+      if (!isV2Provider(provider)) {
         val tableDesc = buildCatalogTable(tbl.asTableIdentifier, c.tableSchema,
-          c.partitioning, c.bucketSpec, c.properties, c.provider, c.options, c.location,
+          c.partitioning, c.bucketSpec, c.properties, provider, c.options, c.location,
           c.comment, c.ifNotExists)
         val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
         CreateTable(tableDesc, mode, None)
@@ -248,15 +249,16 @@ class ResolveSessionCatalog(
           c.tableSchema,
           // convert the bucket spec and add it as a transform
           c.partitioning ++ c.bucketSpec.map(_.asTransform),
-          convertTableProperties(c.properties, c.options, c.location, c.comment, c.provider),
+          convertTableProperties(c.properties, c.options, c.location, c.comment, Some(provider)),
           ignoreIfExists = c.ifNotExists)
       }
 
     case c @ CreateTableAsSelectStatement(
          SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
-      if (!isV2Provider(c.provider)) {
+      val provider = c.provider.getOrElse(conf.defaultDataSourceName)
+      if (!isV2Provider(provider)) {
         val tableDesc = buildCatalogTable(tbl.asTableIdentifier, new StructType,
-          c.partitioning, c.bucketSpec, c.properties, c.provider, c.options, c.location,
+          c.partitioning, c.bucketSpec, c.properties, provider, c.options, c.location,
           c.comment, c.ifNotExists)
         val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
         CreateTable(tableDesc, mode, Some(c.asSelect))
@@ -267,7 +269,7 @@ class ResolveSessionCatalog(
           // convert the bucket spec and add it as a transform
           c.partitioning ++ c.bucketSpec.map(_.asTransform),
           c.asSelect,
-          convertTableProperties(c.properties, c.options, c.location, c.comment, c.provider),
+          convertTableProperties(c.properties, c.options, c.location, c.comment, Some(provider)),
           writeOptions = c.options,
           ignoreIfExists = c.ifNotExists)
       }
@@ -289,7 +291,7 @@ class ResolveSessionCatalog(
           c.tableSchema,
           // convert the bucket spec and add it as a transform
           c.partitioning ++ c.bucketSpec.map(_.asTransform),
-          convertTableProperties(c.properties, c.options, c.location, c.comment, c.provider),
+          convertTableProperties(c.properties, c.options, c.location, c.comment, Some(c.provider)),
           orCreate = c.orCreate)
       }
 
@@ -304,7 +306,7 @@ class ResolveSessionCatalog(
           // convert the bucket spec and add it as a transform
           c.partitioning ++ c.bucketSpec.map(_.asTransform),
           c.asSelect,
-          convertTableProperties(c.properties, c.options, c.location, c.comment, c.provider),
+          convertTableProperties(c.properties, c.options, c.location, c.comment, Some(c.provider)),
           writeOptions = c.options,
           orCreate = c.orCreate)
       }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 4ff2093..c074b335 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -256,6 +256,23 @@ class DataSourceV2SQLSuite
     checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Seq.empty)
   }
 
+  test("CreateTable: without USING clause") {
+    // unset this config to use the default v2 session catalog.
+    spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
+    val testCatalog = catalog("testcat").asTableCatalog
+
+    sql("CREATE TABLE testcat.t1 (id int)")
+    val t1 = testCatalog.loadTable(Identifier.of(Array(), "t1"))
+    // Spark shouldn't set the default provider for catalog plugins.
+    assert(!t1.properties.containsKey(TableCatalog.PROP_PROVIDER))
+
+    sql("CREATE TABLE t2 (id int)")
+    val t2 = spark.sessionState.catalogManager.v2SessionCatalog.asTableCatalog
+      .loadTable(Identifier.of(Array("default"), "t2")).asInstanceOf[V1Table]
+    // Spark should set the default provider as DEFAULT_DATA_SOURCE_NAME for the session catalog.
+    assert(t2.v1Table.provider == Some(conf.defaultDataSourceName))
+  }
+
   test("CreateTable/RepalceTable: invalid schema if has interval type") {
     Seq("CREATE", "REPLACE").foreach { action =>
       val e1 = intercept[AnalysisException](
@@ -595,6 +612,23 @@ class DataSourceV2SQLSuite
     }
   }
 
+  test("CreateTableAsSelect: without USING clause") {
+    // unset this config to use the default v2 session catalog.
+    spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
+    val testCatalog = catalog("testcat").asTableCatalog
+
+    sql("CREATE TABLE testcat.t1 AS SELECT 1 i")
+    val t1 = testCatalog.loadTable(Identifier.of(Array(), "t1"))
+    // Spark shouldn't set the default provider for catalog plugins.
+    assert(!t1.properties.containsKey(TableCatalog.PROP_PROVIDER))
+
+    sql("CREATE TABLE t2 AS SELECT 1 i")
+    val t2 = spark.sessionState.catalogManager.v2SessionCatalog.asTableCatalog
+      .loadTable(Identifier.of(Array("default"), "t2")).asInstanceOf[V1Table]
+    // Spark should set the default provider as DEFAULT_DATA_SOURCE_NAME for the session catalog.
+    assert(t2.v1Table.provider == Some(conf.defaultDataSourceName))
+  }
+
   test("DropTable: basic") {
     val tableName = "testcat.ns1.ns2.tbl"
     val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
index 81965e4..bacd64e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
@@ -492,7 +492,7 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
     assert(statement.partitioning.isEmpty)
     assert(statement.bucketSpec.isEmpty)
     assert(statement.properties.isEmpty)
-    assert(statement.provider == conf.defaultDataSourceName)
+    assert(statement.provider.isEmpty)
     assert(statement.options.isEmpty)
     assert(statement.location.isEmpty)
     assert(statement.comment.isEmpty)
@@ -662,7 +662,7 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
       assert(state.partitioning.isEmpty)
       assert(state.bucketSpec.isEmpty)
       assert(state.properties.isEmpty)
-      assert(state.provider == conf.defaultDataSourceName)
+      assert(state.provider.isEmpty)
       assert(state.options.isEmpty)
       assert(state.location.isEmpty)
       assert(state.comment.isEmpty)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org