You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "allisonwang-db (via GitHub)" <gi...@apache.org> on 2023/11/22 03:42:16 UTC

[PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

allisonwang-db opened a new pull request, #43949:
URL: https://github.com/apache/spark/pull/43949

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   This PR supports `CREATE TABLE ... USING source` for DSv2 sources.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   To support creating DSv2 tables in SQL. Currently the table create can work but when you select a dsv2 table created in SQL, it fails with this error:
   ```
   org.apache.spark.sql.AnalysisException: org.apache.spark.sql.connector.SimpleDataSourceV2 is not a valid Spark SQL Data Source.
   ```
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   No
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   New unit tests
   
   ### Was this patch authored or co-authored using generative AI tooling?
   <!--
   If generative AI tooling has been used in the process of authoring this patch, please include the
   phrase: 'Generated-by: ' followed by the name of the tool and its version.
   If no, write 'No'.
   Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
   -->
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan closed pull request #43949: [SPARK-46043][SQL] Support create table using DSv2 sources
URL: https://github.com/apache/spark/pull/43949


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on PR #43949:
URL: https://github.com/apache/spark/pull/43949#issuecomment-1835444963

   The test failure seems unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on PR #43949:
URL: https://github.com/apache/spark/pull/43949#issuecomment-1822044270

   cc @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1411879434


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala:
##########
@@ -633,6 +634,95 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS
       }
     }
   }
+
+  test("SPARK-46043: create table in SQL using a DSv2 source") {
+    Seq(classOf[SimpleDataSourceV2], classOf[JavaSimpleDataSourceV2]).foreach { cls =>
+      withClue(cls.getName) {
+        // Create a table with empty schema.
+        withTable("test") {
+          sql(s"CREATE TABLE test USING ${cls.getName}")
+          checkAnswer(
+            sql(s"SELECT * FROM test WHERE i < 3"),
+            Seq(Row(0, 0), Row(1, -1), Row(2, -2)))
+        }
+        // Create a table with non-empty schema is not allowed.
+        checkError(
+          exception = intercept[SparkUnsupportedOperationException] {
+            sql(s"CREATE TABLE test(a INT, b INT) USING ${cls.getName}")
+          },
+          errorClass = "CANNOT_CREATE_DATA_SOURCE_TABLE.EXTERNAL_METADATA_UNSUPPORTED",
+          parameters = Map("tableName" -> "default.test", "provider" -> cls.getName)

Review Comment:
   it should be `"`default`.`test`"`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1410627269


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SimpleTableProvider.scala:
##########
@@ -45,7 +45,6 @@ trait SimpleTableProvider extends TableProvider {
       schema: StructType,
       partitioning: Array[Transform],
       properties: util.Map[String, String]): Table = {
-    assert(partitioning.isEmpty)

Review Comment:
   Do you know the reason about the assert? If not, please revert it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1407415324


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala:
##########
@@ -151,6 +153,27 @@ private[sql] object DataSourceV2Utils extends Logging {
     }
   }
 
+  /**
+   * Returns the table provider for the given format, or None if it cannot be found.
+   */
+  def getTableProvider(provider: String, conf: SQLConf): Option[TableProvider] = {
+    // Return earlier since `lookupDataSourceV2` may fail to resolve provider "hive" to
+    // `HiveFileFormat`, when running tests in sql/core.
+    if (DDLUtils.isHiveTable(Some(provider))) return None
+    DataSource.lookupDataSourceV2(provider, conf) match {
+      // TODO(SPARK-28396): Currently file source v2 can't work with tables.
+      case Some(_: FileDataSourceV2) => None
+      case o => o
+    }
+  }
+
+  /**
+   * Check if the provider is a v2 provider.
+   */
+  def isV2Provider(provider: String, conf: SQLConf): Boolean = {

Review Comment:
   Because `isV2Provider` is only used for  `ResolveSessionCatalog`, move this back to `ResolveSessionCatalog`.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala:
##########
@@ -151,6 +153,27 @@ private[sql] object DataSourceV2Utils extends Logging {
     }
   }
 
+  /**
+   * Returns the table provider for the given format, or None if it cannot be found.
+   */
+  def getTableProvider(provider: String, conf: SQLConf): Option[TableProvider] = {
+    // Return earlier since `lookupDataSourceV2` may fail to resolve provider "hive" to
+    // `HiveFileFormat`, when running tests in sql/core.
+    if (DDLUtils.isHiveTable(Some(provider))) return None
+    DataSource.lookupDataSourceV2(provider, conf) match {
+      // TODO(SPARK-28396): Currently file source v2 can't work with tables.
+      case Some(_: FileDataSourceV2) => None
+      case o => o

Review Comment:
   ```
   case p: Some(v) if !v.isInstanceOf[FileDataSourceV2] => p
   case _ => None
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1411885999


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala:
##########
@@ -633,6 +634,95 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS
       }
     }
   }
+
+  test("SPARK-46043: create table in SQL using a DSv2 source") {
+    Seq(classOf[SimpleDataSourceV2], classOf[JavaSimpleDataSourceV2]).foreach { cls =>
+      withClue(cls.getName) {
+        // Create a table with empty schema.
+        withTable("test") {
+          sql(s"CREATE TABLE test USING ${cls.getName}")
+          checkAnswer(
+            sql(s"SELECT * FROM test WHERE i < 3"),
+            Seq(Row(0, 0), Row(1, -1), Row(2, -2)))
+        }
+        // Create a table with non-empty schema is not allowed.
+        checkError(
+          exception = intercept[SparkUnsupportedOperationException] {
+            sql(s"CREATE TABLE test(a INT, b INT) USING ${cls.getName}")
+          },
+          errorClass = "CANNOT_CREATE_DATA_SOURCE_TABLE.EXTERNAL_METADATA_UNSUPPORTED",
+          parameters = Map("tableName" -> "default.test", "provider" -> cls.getName)
+        )
+      }
+    }
+  }
+
+  test("SPARK-46043: create table in SQL with schema required data source") {
+    val cls = classOf[SchemaRequiredDataSource]
+    val e = intercept[IllegalArgumentException] {

Review Comment:
   oh, it doesn't have an error class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1411198692


##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -149,6 +149,19 @@
     ],
     "sqlState" : "42846"
   },
+  "CANNOT_CREATE_DATA_SOURCE_V2_TABLE" : {
+    "message" : [
+      "Failed to create data source V2 table:"

Review Comment:
   shall we include the table name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1413451050


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala:
##########
@@ -61,7 +61,19 @@ case class DataSourceV2Relation(
       Nil
   }
 
-  override def name: String = table.name()
+  override def name: String = {
+    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+    (catalog, identifier) match {
+      case (Some(cat), Some(ident)) => s"${quoteIfNeeded(cat.name())}.${ident.quoted}"
+      case (None, None) => table.name()
+      case _ =>
+        throw new IllegalArgumentException(

Review Comment:
   this should be `SparkException.internalError`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1408150483


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala:
##########
@@ -115,8 +163,40 @@ class V2SessionCatalog(catalog: SessionCatalog)
       partitions: Array[Transform],
       properties: util.Map[String, String]): Table = {
     import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TransformHelper
-    val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) = partitions.toSeq.convertTransforms
     val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, conf.defaultDataSourceName)
+
+    val (newSchema, newPartitions) = DataSourceV2Utils.getTableProvider(provider, conf) match {
+      case Some(_: SupportsCatalogOptions) =>
+        throw new SparkUnsupportedOperationException(
+          errorClass = "CANNOT_CREATE_DATA_SOURCE_V2_TABLE.CATALOG_OPTIONS_UNSUPPORTED",
+          messageParameters = Map("provider" -> provider))
+
+      case Some(p) if !p.supportsExternalMetadata() =>
+        // Partitions cannot be specified when schema is empty.

Review Comment:
   Add assert here



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala:
##########
@@ -115,8 +163,40 @@ class V2SessionCatalog(catalog: SessionCatalog)
       partitions: Array[Transform],
       properties: util.Map[String, String]): Table = {
     import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TransformHelper
-    val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) = partitions.toSeq.convertTransforms
     val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, conf.defaultDataSourceName)
+
+    val (newSchema, newPartitions) = DataSourceV2Utils.getTableProvider(provider, conf) match {
+      case Some(_: SupportsCatalogOptions) =>
+        throw new SparkUnsupportedOperationException(
+          errorClass = "CANNOT_CREATE_DATA_SOURCE_V2_TABLE.CATALOG_OPTIONS_UNSUPPORTED",
+          messageParameters = Map("provider" -> provider))

Review Comment:
   We can pass in the catalogManager and skip this check. And user cannot provide schema here.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala:
##########
@@ -115,8 +163,40 @@ class V2SessionCatalog(catalog: SessionCatalog)
       partitions: Array[Transform],
       properties: util.Map[String, String]): Table = {
     import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TransformHelper
-    val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) = partitions.toSeq.convertTransforms
     val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, conf.defaultDataSourceName)
+
+    val (newSchema, newPartitions) = DataSourceV2Utils.getTableProvider(provider, conf) match {
+      case Some(_: SupportsCatalogOptions) =>
+        throw new SparkUnsupportedOperationException(
+          errorClass = "CANNOT_CREATE_DATA_SOURCE_V2_TABLE.CATALOG_OPTIONS_UNSUPPORTED",
+          messageParameters = Map("provider" -> provider))
+
+      case Some(p) if !p.supportsExternalMetadata() =>
+        // Partitions cannot be specified when schema is empty.
+        if (schema.nonEmpty) {
+          throw new SparkUnsupportedOperationException(
+            errorClass = "CANNOT_CREATE_DATA_SOURCE_V2_TABLE.EXTERNAL_METADATA_UNSUPPORTED",
+            messageParameters = Map("provider" -> provider))
+        }
+        (schema, partitions)
+
+      case Some(tableProvider) =>
+        assert(tableProvider.supportsExternalMetadata())
+        if (schema.isEmpty) {
+          // Infer the schema and partitions and store them in the catalog.
+          val dsOptions = new CaseInsensitiveStringMap(properties)
+          (tableProvider.inferSchema(dsOptions), tableProvider.inferPartitioning(dsOptions))
+        } else {
+          // TODO: when schema is defined but partitioning is empty, should we infer it?

Review Comment:
   We should infer partitions here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1411884756


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala:
##########
@@ -114,10 +154,46 @@ class V2SessionCatalog(catalog: SessionCatalog)
       schema: StructType,
       partitions: Array[Transform],
       properties: util.Map[String, String]): Table = {
-    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TransformHelper
-    val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) =
-      partitions.toImmutableArraySeq.convertTransforms
+    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
     val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, conf.defaultDataSourceName)
+
+    val (newSchema, newPartitions) = DataSourceV2Utils.getTableProvider(provider, conf) match {
+      // If the provider does not support external metadata, users should not be allowed to
+      // specify custom schema when creating the data source table, since the schema will not
+      // be used when loading the table.
+      case Some(p) if !p.supportsExternalMetadata() =>
+        if (schema.nonEmpty) {
+          throw new SparkUnsupportedOperationException(
+            errorClass = "CANNOT_CREATE_DATA_SOURCE_TABLE.EXTERNAL_METADATA_UNSUPPORTED",
+            messageParameters = Map("tableName" -> ident.quoted, "provider" -> provider))

Review Comment:
   `ident.quoted` only quotes when necessary, but in error message, we require fully quoted.
   
   You can call `toSQLId(ident.asMultipartIdentifier)`, but maybe it's better to add a `def fullyQuoted` in `implicit class IdentifierHelper` and use it here.



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala:
##########
@@ -633,6 +634,95 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS
       }
     }
   }
+
+  test("SPARK-46043: create table in SQL using a DSv2 source") {
+    Seq(classOf[SimpleDataSourceV2], classOf[JavaSimpleDataSourceV2]).foreach { cls =>
+      withClue(cls.getName) {
+        // Create a table with empty schema.
+        withTable("test") {
+          sql(s"CREATE TABLE test USING ${cls.getName}")
+          checkAnswer(
+            sql(s"SELECT * FROM test WHERE i < 3"),
+            Seq(Row(0, 0), Row(1, -1), Row(2, -2)))
+        }
+        // Create a table with non-empty schema is not allowed.
+        checkError(
+          exception = intercept[SparkUnsupportedOperationException] {
+            sql(s"CREATE TABLE test(a INT, b INT) USING ${cls.getName}")
+          },
+          errorClass = "CANNOT_CREATE_DATA_SOURCE_TABLE.EXTERNAL_METADATA_UNSUPPORTED",
+          parameters = Map("tableName" -> "default.test", "provider" -> cls.getName)

Review Comment:
   it should be ``"`default`.`test`"``



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1411876914


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala:
##########
@@ -114,10 +154,46 @@ class V2SessionCatalog(catalog: SessionCatalog)
       schema: StructType,
       partitions: Array[Transform],
       properties: util.Map[String, String]): Table = {
-    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TransformHelper
-    val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) =
-      partitions.toImmutableArraySeq.convertTransforms
+    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
     val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, conf.defaultDataSourceName)
+
+    val (newSchema, newPartitions) = DataSourceV2Utils.getTableProvider(provider, conf) match {
+      // If the provider does not support external metadata, users should not be allowed to
+      // specify custom schema when creating the data source table, since the schema will not
+      // be used when loading the table.
+      case Some(p) if !p.supportsExternalMetadata() =>
+        if (schema.nonEmpty) {
+          throw new SparkUnsupportedOperationException(
+            errorClass = "CANNOT_CREATE_DATA_SOURCE_TABLE.EXTERNAL_METADATA_UNSUPPORTED",
+            messageParameters = Map("tableName" -> ident.quoted, "provider" -> provider))
+        }
+        // V2CreateTablePlan does not allow non-empty partitions when schema is empty. This
+        // is checked in `PreProcessTableCreation` rule.
+        assert(partitions.isEmpty,
+          s"Partitions should be empty when the schema is empty: ${partitions.mkString(", ")}")
+        (schema, partitions)
+
+      case Some(tableProvider) =>
+        assert(tableProvider.supportsExternalMetadata())
+        lazy val dsOptions = new CaseInsensitiveStringMap(properties)

Review Comment:
   do we need to put the path option?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1401567348


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala:
##########
@@ -633,6 +633,38 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS
       }
     }
   }
+
+  test("SPARK-46043: Support create table using DSv2 sources") {
+    Seq(classOf[SimpleDataSourceV2], classOf[JavaSimpleDataSourceV2]).foreach { cls =>
+      withClue(cls.getName) {
+        withTable("test") {
+          sql(s"CREATE TABLE test USING ${cls.getName}")
+          checkAnswer(
+            sql(s"SELECT * FROM test WHERE i < 3"),
+            Seq(Row(0, 0), Row(1, -1), Row(2, -2)))
+        }
+      }
+    }
+    withTable("test") {
+      val cls = classOf[SchemaRequiredDataSource]
+      withClue(cls.getName) {
+        sql(s"CREATE TABLE test USING ${cls.getName}")
+        checkAnswer(sql(s"SELECT * FROM test"), Nil)
+      }
+    }
+    withTable("test") {
+      val cls = classOf[SupportsExternalMetadataWritableDataSource]
+      withClue(cls.getName) {
+        withTempDir { dir =>
+          sql(s"CREATE TABLE test USING ${cls.getName} OPTIONS (path '${dir.getCanonicalPath}')")
+          checkAnswer(sql(s"SELECT * FROM test"), Nil)

Review Comment:
   we should at least test the table schema if there is no data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1401565526


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala:
##########
@@ -633,6 +633,38 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS
       }
     }
   }
+
+  test("SPARK-46043: Support create table using DSv2 sources") {
+    Seq(classOf[SimpleDataSourceV2], classOf[JavaSimpleDataSourceV2]).foreach { cls =>
+      withClue(cls.getName) {
+        withTable("test") {
+          sql(s"CREATE TABLE test USING ${cls.getName}")
+          checkAnswer(
+            sql(s"SELECT * FROM test WHERE i < 3"),
+            Seq(Row(0, 0), Row(1, -1), Row(2, -2)))
+        }
+      }
+    }
+    withTable("test") {
+      val cls = classOf[SchemaRequiredDataSource]
+      withClue(cls.getName) {
+        sql(s"CREATE TABLE test USING ${cls.getName}")

Review Comment:
   how does this work? empty table schema?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1411891058


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala:
##########
@@ -126,7 +126,11 @@ abstract class InsertIntoTests(
     val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
 
     verifyTable(t1, Seq.empty[(Long, String, String)].toDF("id", "data", "missing"))
-    val tableName = if (catalogAndNamespace.isEmpty) toSQLId(s"default.$t1") else toSQLId(t1)
+    val tableName = if (catalogAndNamespace.isEmpty) {
+      toSQLId(s"spark_catalog.default.$t1")

Review Comment:
   not related to your PR but this seems to indicate a bug. so the error message points to table ``"`spark_catalog.default.t1`"``?  cc @MaxGekk 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1411887364


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala:
##########
@@ -633,6 +634,95 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS
       }
     }
   }
+
+  test("SPARK-46043: create table in SQL using a DSv2 source") {
+    Seq(classOf[SimpleDataSourceV2], classOf[JavaSimpleDataSourceV2]).foreach { cls =>
+      withClue(cls.getName) {
+        // Create a table with empty schema.
+        withTable("test") {
+          sql(s"CREATE TABLE test USING ${cls.getName}")
+          checkAnswer(
+            sql(s"SELECT * FROM test WHERE i < 3"),
+            Seq(Row(0, 0), Row(1, -1), Row(2, -2)))
+        }
+        // Create a table with non-empty schema is not allowed.
+        checkError(
+          exception = intercept[SparkUnsupportedOperationException] {
+            sql(s"CREATE TABLE test(a INT, b INT) USING ${cls.getName}")
+          },
+          errorClass = "CANNOT_CREATE_DATA_SOURCE_TABLE.EXTERNAL_METADATA_UNSUPPORTED",
+          parameters = Map("tableName" -> "default.test", "provider" -> cls.getName)
+        )
+      }
+    }
+  }
+
+  test("SPARK-46043: create table in SQL with schema required data source") {
+    val cls = classOf[SchemaRequiredDataSource]
+    val e = intercept[IllegalArgumentException] {
+      sql(s"CREATE TABLE test USING ${cls.getName}")
+    }
+    assert(e.getMessage.contains("requires a user-supplied schema"))
+    withTable("test") {
+      sql(s"CREATE TABLE test(i INT, j INT) USING ${cls.getName}")
+      checkAnswer(sql(s"SELECT * FROM test"), Seq(Row(0, 0), Row(1, -1)))
+    }
+    withTable("test") {
+      sql(s"CREATE TABLE test(i INT) USING ${cls.getName}")
+      checkAnswer(sql(s"SELECT * FROM test"), Seq(Row(0), Row(1)))
+    }
+    withTable("test") {
+      // Test the behavior when there is a mismatch between the schema defined in the
+      // CREATE TABLE command and the actual schema produced by the data source. The
+      // resulting behavior is not guaranteed and may vary based on the data source's
+      // implementation.
+      sql(s"CREATE TABLE test(i INT, j INT, k INT) USING ${cls.getName}")
+      val e = intercept[Exception] {
+        sql("SELECT * FROM test").collect()
+      }
+      assert(e.getMessage.contains(
+        "java.lang.ArrayIndexOutOfBoundsException: Index 2 out of bounds for length 2"))
+    }
+  }
+
+  test("SPARK-46043: create table in SQL with partitioning required data source") {
+    val cls = classOf[PartitionsRequiredDataSource]
+    val e = intercept[IllegalArgumentException](

Review Comment:
   oh it's thrown directly from the data source?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1411889175


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala:
##########
@@ -633,6 +634,95 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS
       }
     }
   }
+
+  test("SPARK-46043: create table in SQL using a DSv2 source") {
+    Seq(classOf[SimpleDataSourceV2], classOf[JavaSimpleDataSourceV2]).foreach { cls =>
+      withClue(cls.getName) {
+        // Create a table with empty schema.
+        withTable("test") {
+          sql(s"CREATE TABLE test USING ${cls.getName}")
+          checkAnswer(
+            sql(s"SELECT * FROM test WHERE i < 3"),
+            Seq(Row(0, 0), Row(1, -1), Row(2, -2)))
+        }
+        // Create a table with non-empty schema is not allowed.
+        checkError(
+          exception = intercept[SparkUnsupportedOperationException] {
+            sql(s"CREATE TABLE test(a INT, b INT) USING ${cls.getName}")
+          },
+          errorClass = "CANNOT_CREATE_DATA_SOURCE_TABLE.EXTERNAL_METADATA_UNSUPPORTED",
+          parameters = Map("tableName" -> "default.test", "provider" -> cls.getName)
+        )
+      }
+    }
+  }
+
+  test("SPARK-46043: create table in SQL with schema required data source") {
+    val cls = classOf[SchemaRequiredDataSource]
+    val e = intercept[IllegalArgumentException] {

Review Comment:
   nvm, https://github.com/apache/spark/pull/43949/files#r1411887364



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1401566798


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala:
##########
@@ -633,6 +633,38 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS
       }
     }
   }
+
+  test("SPARK-46043: Support create table using DSv2 sources") {
+    Seq(classOf[SimpleDataSourceV2], classOf[JavaSimpleDataSourceV2]).foreach { cls =>
+      withClue(cls.getName) {
+        withTable("test") {
+          sql(s"CREATE TABLE test USING ${cls.getName}")

Review Comment:
   can we also test what happens if we create table with a schema that does not match the data source?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1411206881


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala:
##########
@@ -25,7 +25,7 @@ import scala.util.Try
 
 import org.scalatest.BeforeAndAfter
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkException, SparkUnsupportedOperationException}

Review Comment:
   unnecessary change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1411886823


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala:
##########
@@ -633,6 +634,95 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS
       }
     }
   }
+
+  test("SPARK-46043: create table in SQL using a DSv2 source") {
+    Seq(classOf[SimpleDataSourceV2], classOf[JavaSimpleDataSourceV2]).foreach { cls =>
+      withClue(cls.getName) {
+        // Create a table with empty schema.
+        withTable("test") {
+          sql(s"CREATE TABLE test USING ${cls.getName}")
+          checkAnswer(
+            sql(s"SELECT * FROM test WHERE i < 3"),
+            Seq(Row(0, 0), Row(1, -1), Row(2, -2)))
+        }
+        // Create a table with non-empty schema is not allowed.
+        checkError(
+          exception = intercept[SparkUnsupportedOperationException] {
+            sql(s"CREATE TABLE test(a INT, b INT) USING ${cls.getName}")
+          },
+          errorClass = "CANNOT_CREATE_DATA_SOURCE_TABLE.EXTERNAL_METADATA_UNSUPPORTED",
+          parameters = Map("tableName" -> "default.test", "provider" -> cls.getName)
+        )
+      }
+    }
+  }
+
+  test("SPARK-46043: create table in SQL with schema required data source") {
+    val cls = classOf[SchemaRequiredDataSource]
+    val e = intercept[IllegalArgumentException] {
+      sql(s"CREATE TABLE test USING ${cls.getName}")
+    }
+    assert(e.getMessage.contains("requires a user-supplied schema"))
+    withTable("test") {
+      sql(s"CREATE TABLE test(i INT, j INT) USING ${cls.getName}")
+      checkAnswer(sql(s"SELECT * FROM test"), Seq(Row(0, 0), Row(1, -1)))
+    }
+    withTable("test") {
+      sql(s"CREATE TABLE test(i INT) USING ${cls.getName}")
+      checkAnswer(sql(s"SELECT * FROM test"), Seq(Row(0), Row(1)))
+    }
+    withTable("test") {
+      // Test the behavior when there is a mismatch between the schema defined in the
+      // CREATE TABLE command and the actual schema produced by the data source. The
+      // resulting behavior is not guaranteed and may vary based on the data source's
+      // implementation.
+      sql(s"CREATE TABLE test(i INT, j INT, k INT) USING ${cls.getName}")
+      val e = intercept[Exception] {
+        sql("SELECT * FROM test").collect()
+      }
+      assert(e.getMessage.contains(
+        "java.lang.ArrayIndexOutOfBoundsException: Index 2 out of bounds for length 2"))
+    }
+  }
+
+  test("SPARK-46043: create table in SQL with partitioning required data source") {
+    val cls = classOf[PartitionsRequiredDataSource]
+    val e = intercept[IllegalArgumentException](

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1411205694


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala:
##########
@@ -115,9 +164,44 @@ class V2SessionCatalog(catalog: SessionCatalog)
       partitions: Array[Transform],
       properties: util.Map[String, String]): Table = {
     import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TransformHelper
-    val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) =
-      partitions.toImmutableArraySeq.convertTransforms
     val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, conf.defaultDataSourceName)
+
+    val (newSchema, newPartitions) = DataSourceV2Utils.getTableProvider(provider, conf) match {
+      // If the provider does not support external metadata, users should not be allowed to
+      // specify custom schema when creating the data source table, since the schema will not
+      // be used when loading the table.
+      case Some(p) if !p.supportsExternalMetadata() =>
+        if (schema.nonEmpty) {
+          throw new SparkUnsupportedOperationException(
+            errorClass = "CANNOT_CREATE_DATA_SOURCE_V2_TABLE.EXTERNAL_METADATA_UNSUPPORTED",
+            messageParameters = Map("provider" -> provider))
+        }
+        // V2CreateTablePlan does not allow non-empty partitions when schema is empty. This
+        // is checked in `PreProcessTableCreation` rule.
+        assert(partitions.isEmpty,
+          s"Partitions should be empty when the schema is empty: ${partitions.mkString(", ")}")
+        (schema, partitions)
+
+      case Some(tableProvider) =>
+        assert(tableProvider.supportsExternalMetadata())
+        lazy val dsOptions = new CaseInsensitiveStringMap(properties)
+        if (schema.isEmpty) {
+          assert(partitions.isEmpty,
+            s"Partitions should be empty when the schema is empty: ${partitions.mkString(", ")}")
+          // Infer the schema and partitions and store them in the catalog.
+          (tableProvider.inferSchema(dsOptions), tableProvider.inferPartitioning(dsOptions))
+        } else if (partitions.isEmpty) {
+          (schema, tableProvider.inferPartitioning(dsOptions))
+        } else {
+          (schema, partitions)
+        }
+
+      case _ =>
+        (schema, partitions)

Review Comment:
   shall we fail here if it's not a valid data source?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1411203083


##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -149,6 +149,19 @@
     ],
     "sqlState" : "42846"
   },
+  "CANNOT_CREATE_DATA_SOURCE_V2_TABLE" : {

Review Comment:
   I can't find other errors that mention data source v2. I think it's a developer thing and we should not expose it to end users via error message. How about just `CANNOT_CREATE_TABLE`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1411877904


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala:
##########
@@ -115,9 +164,44 @@ class V2SessionCatalog(catalog: SessionCatalog)
       partitions: Array[Transform],
       properties: util.Map[String, String]): Table = {
     import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TransformHelper
-    val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) =
-      partitions.toImmutableArraySeq.convertTransforms
     val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, conf.defaultDataSourceName)
+
+    val (newSchema, newPartitions) = DataSourceV2Utils.getTableProvider(provider, conf) match {
+      // If the provider does not support external metadata, users should not be allowed to
+      // specify custom schema when creating the data source table, since the schema will not
+      // be used when loading the table.
+      case Some(p) if !p.supportsExternalMetadata() =>
+        if (schema.nonEmpty) {
+          throw new SparkUnsupportedOperationException(
+            errorClass = "CANNOT_CREATE_DATA_SOURCE_V2_TABLE.EXTERNAL_METADATA_UNSUPPORTED",
+            messageParameters = Map("provider" -> provider))
+        }
+        // V2CreateTablePlan does not allow non-empty partitions when schema is empty. This
+        // is checked in `PreProcessTableCreation` rule.
+        assert(partitions.isEmpty,
+          s"Partitions should be empty when the schema is empty: ${partitions.mkString(", ")}")
+        (schema, partitions)
+
+      case Some(tableProvider) =>
+        assert(tableProvider.supportsExternalMetadata())
+        lazy val dsOptions = new CaseInsensitiveStringMap(properties)
+        if (schema.isEmpty) {
+          assert(partitions.isEmpty,
+            s"Partitions should be empty when the schema is empty: ${partitions.mkString(", ")}")
+          // Infer the schema and partitions and store them in the catalog.
+          (tableProvider.inferSchema(dsOptions), tableProvider.inferPartitioning(dsOptions))
+        } else if (partitions.isEmpty) {
+          (schema, tableProvider.inferPartitioning(dsOptions))
+        } else {
+          (schema, partitions)
+        }
+
+      case _ =>
+        (schema, partitions)

Review Comment:
   maybe we can do it latter. It's the current behavior that allows any table provider.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1411877401


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala:
##########
@@ -114,10 +154,46 @@ class V2SessionCatalog(catalog: SessionCatalog)
       schema: StructType,
       partitions: Array[Transform],
       properties: util.Map[String, String]): Table = {
-    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TransformHelper
-    val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) =
-      partitions.toImmutableArraySeq.convertTransforms
+    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
     val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, conf.defaultDataSourceName)
+
+    val (newSchema, newPartitions) = DataSourceV2Utils.getTableProvider(provider, conf) match {
+      // If the provider does not support external metadata, users should not be allowed to
+      // specify custom schema when creating the data source table, since the schema will not
+      // be used when loading the table.
+      case Some(p) if !p.supportsExternalMetadata() =>
+        if (schema.nonEmpty) {
+          throw new SparkUnsupportedOperationException(
+            errorClass = "CANNOT_CREATE_DATA_SOURCE_TABLE.EXTERNAL_METADATA_UNSUPPORTED",
+            messageParameters = Map("tableName" -> ident.quoted, "provider" -> provider))
+        }
+        // V2CreateTablePlan does not allow non-empty partitions when schema is empty. This
+        // is checked in `PreProcessTableCreation` rule.
+        assert(partitions.isEmpty,
+          s"Partitions should be empty when the schema is empty: ${partitions.mkString(", ")}")
+        (schema, partitions)
+
+      case Some(tableProvider) =>
+        assert(tableProvider.supportsExternalMetadata())
+        lazy val dsOptions = new CaseInsensitiveStringMap(properties)

Review Comment:
   I think we can add a new method to create ds options from a `CatalogTable`, to save duplicated code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1408398980


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala:
##########
@@ -151,6 +153,27 @@ private[sql] object DataSourceV2Utils extends Logging {
     }
   }
 
+  /**
+   * Returns the table provider for the given format, or None if it cannot be found.
+   */
+  def getTableProvider(provider: String, conf: SQLConf): Option[TableProvider] = {
+    // Return earlier since `lookupDataSourceV2` may fail to resolve provider "hive" to
+    // `HiveFileFormat`, when running tests in sql/core.
+    if (DDLUtils.isHiveTable(Some(provider))) return None
+    DataSource.lookupDataSourceV2(provider, conf) match {
+      // TODO(SPARK-28396): Currently file source v2 can't work with tables.
+      case Some(_: FileDataSourceV2) => None
+      case o => o
+    }
+  }
+
+  /**
+   * Check if the provider is a v2 provider.
+   */
+  def isV2Provider(provider: String, conf: SQLConf): Boolean = {

Review Comment:
   Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1408439193


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala:
##########
@@ -115,8 +163,40 @@ class V2SessionCatalog(catalog: SessionCatalog)
       partitions: Array[Transform],
       properties: util.Map[String, String]): Table = {
     import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TransformHelper
-    val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) = partitions.toSeq.convertTransforms
     val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, conf.defaultDataSourceName)
+
+    val (newSchema, newPartitions) = DataSourceV2Utils.getTableProvider(provider, conf) match {
+      case Some(_: SupportsCatalogOptions) =>
+        throw new SparkUnsupportedOperationException(
+          errorClass = "CANNOT_CREATE_DATA_SOURCE_V2_TABLE.CATALOG_OPTIONS_UNSUPPORTED",
+          messageParameters = Map("provider" -> provider))

Review Comment:
   @cloud-fan Actually no. CatalogManager constructor takes in a v2SessionCatalog, and here we can't pass in the catalog manager to the constructor of v2 session catalog (circular dependency):
   https://github.com/apache/spark/blob/7a0d0411aa02e7e1b6beb393966ada2c54c09870/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala#L174-L176



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1409265752


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SimpleTableProvider.scala:
##########
@@ -31,6 +31,7 @@ trait SimpleTableProvider extends TableProvider {
   def getTable(options: CaseInsensitiveStringMap): Table
 
   private[this] var loadedTable: Table = _
+

Review Comment:
   Please revert this line.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala:
##########
@@ -73,7 +74,44 @@ class V2SessionCatalog(catalog: SessionCatalog)
 
   override def loadTable(ident: Identifier): Table = {
     try {
-      V1Table(catalog.getTableMetadata(ident.asTableIdentifier))
+      val table = catalog.getTableMetadata(ident.asTableIdentifier)
+      if (table.provider.isDefined) {
+        DataSourceV2Utils.getTableProvider(table.provider.get, conf) match {
+          case Some(provider) =>
+            // Get the table properties during creation and append the path option
+            // to the properties.
+            val tableProperties = table.properties
+            val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
+            val properties = tableProperties ++ pathOption

Review Comment:
   ```suggestion
               val properties = table.properties ++
                 table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #43949:
URL: https://github.com/apache/spark/pull/43949#issuecomment-1839849681

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1411203786


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala:
##########
@@ -61,7 +61,14 @@ case class DataSourceV2Relation(
       Nil
   }
 
-  override def name: String = table.name()
+  override def name: String = {
+    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+    (catalog, identifier) match {
+      case (Some(cat), Some(ident)) => s"${quoteIdentifier(cat.name())}.${ident.quoted}"
+      case (None, Some(ident)) => ident.quoted

Review Comment:
   I don't think this can happen. We can add an assert.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1411878649


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala:
##########
@@ -3366,8 +3367,21 @@ class DataSourceV2SQLSuiteV2Filter extends DataSourceV2SQLSuite {
 
 /** Used as a V2 DataSource for V2SessionCatalog DDL */
 class FakeV2Provider extends SimpleTableProvider {

Review Comment:
   Can we avoid extending `SimpleTableProvider` here? I think it's not meant to support external metadata.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46043][SQL] Support create table using DSv2 sources [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1411204043


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SimpleTableProvider.scala:
##########
@@ -45,7 +45,6 @@ trait SimpleTableProvider extends TableProvider {
       schema: StructType,
       partitioning: Array[Transform],
       properties: util.Map[String, String]): Table = {
-    assert(partitioning.isEmpty)

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org