You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/12/30 07:57:01 UTC
[spark] branch master updated: [SPARK-33904][SQL] Recognize
`spark_catalog` in `saveAsTable()` and `insertInto()`
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2afd1fb [SPARK-33904][SQL] Recognize `spark_catalog` in `saveAsTable()` and `insertInto()`
2afd1fb is described below
commit 2afd1fb49243e28152b3e581923b49d3aaab0dd7
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Wed Dec 30 07:56:34 2020 +0000
[SPARK-33904][SQL] Recognize `spark_catalog` in `saveAsTable()` and `insertInto()`
### What changes were proposed in this pull request?
In the `saveAsTable()` and `insertInto()` methods of `DataFrameWriter`, recognize `spark_catalog` as the default session catalog in table names.
### Why are the changes needed?
1. To simplify writing of unified v1 and v2 tests
2. To improve Spark SQL user experience. `insertInto()` should have feature parity with the `INSERT INTO` sql command. Currently, `insertInto()` fails on a table from a namespace in `spark_catalog`:
```scala
scala> sql("CREATE NAMESPACE spark_catalog.ns")
scala> Seq(0).toDF().write.saveAsTable("spark_catalog.ns.tbl")
org.apache.spark.sql.AnalysisException: Couldn't find a catalog to handle the identifier spark_catalog.ns.tbl.
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:629)
... 47 elided
scala> Seq(0).toDF().write.insertInto("spark_catalog.ns.tbl")
org.apache.spark.sql.AnalysisException: Couldn't find a catalog to handle the identifier spark_catalog.ns.tbl.
at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:498)
... 47 elided
```
but `INSERT INTO` succeed:
```sql
spark-sql> create table spark_catalog.ns.tbl (c int);
spark-sql> insert into spark_catalog.ns.tbl select 0;
spark-sql> select * from spark_catalog.ns.tbl;
0
```
### Does this PR introduce _any_ user-facing change?
Yes. After the changes for the example above:
```scala
scala> Seq(0).toDF().write.saveAsTable("spark_catalog.ns.tbl")
scala> Seq(1).toDF().write.insertInto("spark_catalog.ns.tbl")
scala> spark.table("spark_catalog.ns.tbl").show(false)
+-----+
|value|
+-----+
|0 |
|1 |
+-----+
```
### How was this patch tested?
By running the affected test suites:
```
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *.ShowPartitionsSuite"
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *.FileFormatWriterSuite"
```
Closes #30919 from MaxGekk/insert-into-spark_catalog.
Authored-by: Max Gekk <ma...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../sql/connector/catalog/LookupCatalog.scala | 27 ++++++++++++----------
.../command/ShowPartitionsSuiteBase.scala | 12 +++++++++-
.../execution/command/v1/ShowPartitionsSuite.scala | 19 ++++-----------
.../execution/command/v2/ShowPartitionsSuite.scala | 22 ++++--------------
.../datasources/FileFormatWriterSuite.scala | 13 +++++++++++
.../execution/command/ShowPartitionsSuite.scala | 19 +++++----------
6 files changed, 55 insertions(+), 57 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala
index d8cdecc..16416fa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala
@@ -140,19 +140,22 @@ private[sql] trait LookupCatalog extends Logging {
* For legacy support only. Please use [[CatalogAndIdentifier]] instead on DSv2 code paths.
*/
object AsTableIdentifier {
- def unapply(parts: Seq[String]): Option[TableIdentifier] = parts match {
- case CatalogAndMultipartIdentifier(None, names)
+ def unapply(parts: Seq[String]): Option[TableIdentifier] = {
+ def namesToTableIdentifier(names: Seq[String]): Option[TableIdentifier] = names match {
+ case Seq(name) => Some(TableIdentifier(name))
+ case Seq(database, name) => Some(TableIdentifier(name, Some(database)))
+ case _ => None
+ }
+ parts match {
+ case CatalogAndMultipartIdentifier(None, names)
if CatalogV2Util.isSessionCatalog(currentCatalog) =>
- names match {
- case Seq(name) =>
- Some(TableIdentifier(name))
- case Seq(database, name) =>
- Some(TableIdentifier(name, Some(database)))
- case _ =>
- None
- }
- case _ =>
- None
+ namesToTableIdentifier(names)
+ case CatalogAndMultipartIdentifier(Some(catalog), names)
+ if CatalogV2Util.isSessionCatalog(catalog) &&
+ CatalogV2Util.isSessionCatalog(currentCatalog) =>
+ namesToTableIdentifier(names)
+ case _ => None
+ }
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowPartitionsSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowPartitionsSuiteBase.scala
index 9a942d3..29edb8f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowPartitionsSuiteBase.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowPartitionsSuiteBase.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.command
-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StringType, StructType}
@@ -53,6 +53,16 @@ trait ShowPartitionsSuiteBase extends QueryTest with DDLCommandTestUtils {
sql(s"ALTER TABLE $table ADD PARTITION(year = 2016, month = 3)")
}
+ protected def createNullPartTable(table: String, format: String): Unit = {
+ import testImplicits._
+ val df = Seq((0, ""), (1, null)).toDF("a", "part")
+ df.write
+ .partitionBy("part")
+ .format(format)
+ .mode(SaveMode.Overwrite)
+ .saveAsTable(table)
+ }
+
test("show partitions of non-partitioned table") {
withNamespaceAndTable("ns", "not_partitioned_table") { t =>
sql(s"CREATE TABLE $t (col1 int) $defaultUsing")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala
index 5be5e28..e85d62c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala
@@ -105,22 +105,13 @@ class ShowPartitionsSuite extends ShowPartitionsSuiteBase with CommandSuiteBase
}
}
- test("null and empty string as partition values") {
- import testImplicits._
- withTable("t") {
- val df = Seq((0, ""), (1, null)).toDF("a", "part")
- df.write
- .partitionBy("part")
- .format("parquet")
- .mode(SaveMode.Overwrite)
- .saveAsTable("t")
-
+ test("SPARK-33904: null and empty string as partition values") {
+ withNamespaceAndTable("ns", "tbl") { t =>
+ createNullPartTable(t, "parquet")
runShowPartitionsSql(
- "SHOW PARTITIONS t",
+ s"SHOW PARTITIONS $t",
Row("part=__HIVE_DEFAULT_PARTITION__") :: Nil)
- checkAnswer(spark.table("t"),
- Row(0, null) ::
- Row(1, null) :: Nil)
+ checkAnswer(spark.table(t), Row(0, null) :: Row(1, null) :: Nil)
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowPartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowPartitionsSuite.scala
index 44d8b57..42f05ee 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowPartitionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowPartitionsSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.command.v2
-import org.apache.spark.sql.{AnalysisException, Row, SaveMode}
+import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.execution.command
/**
@@ -38,23 +38,11 @@ class ShowPartitionsSuite extends command.ShowPartitionsSuiteBase with CommandSu
}
}
- test("SPARK-33889: null and empty string as partition values") {
- import testImplicits._
+ test("SPARK-33889, SPARK-33904: null and empty string as partition values") {
withNamespaceAndTable("ns", "tbl") { t =>
- val df = Seq((0, ""), (1, null)).toDF("a", "part")
- df.write
- .partitionBy("part")
- .format("parquet")
- .mode(SaveMode.Overwrite)
- .saveAsTable(t)
-
- runShowPartitionsSql(
- s"SHOW PARTITIONS $t",
- Row("part=") ::
- Row("part=null") :: Nil)
- checkAnswer(spark.table(t),
- Row(0, "") ::
- Row(1, null) :: Nil)
+ createNullPartTable(t, "parquet")
+ runShowPartitionsSql(s"SHOW PARTITIONS $t", Row("part=") :: Row("part=null") :: Nil)
+ checkAnswer(spark.table(t), Row(0, "") :: Row(1, null) :: Nil)
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
index ce51184..f492fc6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
@@ -61,4 +61,17 @@ class FileFormatWriterSuite
checkAnswer(spark.table("t2").sort("id"), Seq(Row(0, null), Row(1, null), Row(2, null)))
}
}
+
+ test("SPARK-33904: save and insert into a table in a namespace of spark_catalog") {
+ val ns = "spark_catalog.ns"
+ withNamespace(ns) {
+ spark.sql(s"CREATE NAMESPACE $ns")
+ val t = s"$ns.tbl"
+ withTable(t) {
+ spark.range(1).write.saveAsTable(t)
+ Seq(100).toDF().write.insertInto(t)
+ checkAnswer(spark.table(t), Seq(Row(0), Row(100)))
+ }
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowPartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowPartitionsSuite.scala
index 904c6c4..ded53cc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowPartitionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowPartitionsSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.hive.execution.command
-import org.apache.spark.sql.{Row, SaveMode}
+import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.command.v1
/**
@@ -25,21 +25,14 @@ import org.apache.spark.sql.execution.command.v1
* V1 Hive external table catalog.
*/
class ShowPartitionsSuite extends v1.ShowPartitionsSuiteBase with CommandSuiteBase {
- test("null and empty string as partition values") {
- import testImplicits._
+ test("SPARK-33904: null and empty string as partition values") {
withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
- withTable("t") {
- val df = Seq((0, ""), (1, null)).toDF("a", "part")
- df.write
- .partitionBy("part")
- .format("hive")
- .mode(SaveMode.Overwrite)
- .saveAsTable("t")
-
+ withNamespaceAndTable("ns", "tbl") { t =>
+ createNullPartTable(t, "hive")
runShowPartitionsSql(
- "SHOW PARTITIONS t",
+ s"SHOW PARTITIONS $t",
Row("part=__HIVE_DEFAULT_PARTITION__") :: Nil)
- checkAnswer(spark.table("t"),
+ checkAnswer(spark.table(t),
Row(0, "__HIVE_DEFAULT_PARTITION__") ::
Row(1, "__HIVE_DEFAULT_PARTITION__") :: Nil)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org