You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/07/11 01:11:02 UTC
[spark] branch master updated: [SPARK-27919][SQL] Add v2 session
catalog
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ec821b4 [SPARK-27919][SQL] Add v2 session catalog
ec821b4 is described below
commit ec821b4411bf64ab587548853ade08c053c64d6a
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Thu Jul 11 09:10:30 2019 +0800
[SPARK-27919][SQL] Add v2 session catalog
## What changes were proposed in this pull request?
This fixes a problem where it is possible to create a v2 table using the default catalog that cannot be loaded with the session catalog. A session catalog should be used when the v1 catalog is responsible for tables with no catalog in the table identifier.
* Adds a v2 catalog implementation that delegates to the analyzer's SessionCatalog
* Uses the v2 session catalog for CTAS and CreateTable when the provider is a v2 provider and no v2 catalog is in the table identifier
* Updates catalog lookup to always provide the default if it is set for consistent behavior
## How was this patch tested?
* Adds a new test suite for the v2 session catalog that validates the TableCatalog API
* Adds test cases in PlanResolutionSuite to validate the v2 session catalog is used
* Adds test suite for LookupCatalog with a default catalog
Closes #24768 from rdblue/SPARK-27919-add-v2-session-catalog.
Authored-by: Ryan Blue <bl...@apache.org>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/catalog/v2/LookupCatalog.scala | 103 +++-
.../spark/sql/catalyst/analysis/Analyzer.scala | 10 +
.../analysis/UpdateAttributeNullability.scala | 2 +-
.../plans/logical/basicLogicalOperators.scala | 2 +-
.../org/apache/spark/sql/internal/SQLConf.scala | 5 +
.../catalyst/catalog/v2/LookupCatalogSuite.scala | 107 ++++
.../datasources/DataSourceResolution.scala | 52 +-
.../datasources/v2/V2SessionCatalog.scala | 255 ++++++++
.../sql/internal/BaseSessionStateBuilder.scala | 2 +-
.../execution/command/PlanResolutionSuite.scala | 123 +++-
.../datasources/v2/V2SessionCatalogSuite.scala | 683 +++++++++++++++++++++
.../sql/sources/v2/DataSourceV2SQLSuite.scala | 94 +--
.../spark/sql/hive/HiveSessionStateBuilder.scala | 2 +-
13 files changed, 1353 insertions(+), 87 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala
index 5464a74..5f7ee30 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala
@@ -17,36 +17,91 @@
package org.apache.spark.sql.catalog.v2
+import scala.util.control.NonFatal
+
import org.apache.spark.annotation.Experimental
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.TableIdentifier
/**
* A trait to encapsulate catalog lookup function and helpful extractors.
*/
@Experimental
-trait LookupCatalog {
+trait LookupCatalog extends Logging {
+
+ import LookupCatalog._
+ protected def defaultCatalogName: Option[String] = None
protected def lookupCatalog(name: String): CatalogPlugin
- type CatalogObjectIdentifier = (Option[CatalogPlugin], Identifier)
+ /**
+ * Returns the default catalog. When set, this catalog is used for all identifiers that do not
+ * set a specific catalog. When this is None, the session catalog is responsible for the
+ * identifier.
+ *
+ * If this is None and a table's provider (source) is a v2 provider, the v2 session catalog will
+ * be used.
+ */
+ def defaultCatalog: Option[CatalogPlugin] = {
+ try {
+ defaultCatalogName.map(lookupCatalog)
+ } catch {
+ case NonFatal(e) =>
+ logError(s"Cannot load default v2 catalog: ${defaultCatalogName.get}", e)
+ None
+ }
+ }
/**
- * Extract catalog plugin and identifier from a multi-part identifier.
+ * This catalog is a v2 catalog that delegates to the v1 session catalog. it is used when the
+ * session catalog is responsible for an identifier, but the source requires the v2 catalog API.
+ * This happens when the source implementation extends the v2 TableProvider API and is not listed
+ * in the fallback configuration, spark.sql.sources.write.useV1SourceList
*/
- object CatalogObjectIdentifier {
- def unapply(parts: Seq[String]): Some[CatalogObjectIdentifier] = parts match {
- case Seq(name) =>
- Some((None, Identifier.of(Array.empty, name)))
+ def sessionCatalog: Option[CatalogPlugin] = {
+ try {
+ Some(lookupCatalog(SESSION_CATALOG_NAME))
+ } catch {
+ case NonFatal(e) =>
+ logError("Cannot load v2 session catalog", e)
+ None
+ }
+ }
+
+ /**
+ * Extract catalog plugin and remaining identifier names.
+ *
+ * This does not substitute the default catalog if no catalog is set in the identifier.
+ */
+ private object CatalogAndIdentifier {
+ def unapply(parts: Seq[String]): Some[(Option[CatalogPlugin], Seq[String])] = parts match {
+ case Seq(_) =>
+ Some((None, parts))
case Seq(catalogName, tail @ _*) =>
try {
- Some((Some(lookupCatalog(catalogName)), Identifier.of(tail.init.toArray, tail.last)))
+ Some((Some(lookupCatalog(catalogName)), tail))
} catch {
case _: CatalogNotFoundException =>
- Some((None, Identifier.of(parts.init.toArray, parts.last)))
+ Some((None, parts))
}
}
}
+ type CatalogObjectIdentifier = (Option[CatalogPlugin], Identifier)
+
+ /**
+ * Extract catalog and identifier from a multi-part identifier with the default catalog if needed.
+ */
+ object CatalogObjectIdentifier {
+ def unapply(parts: Seq[String]): Some[CatalogObjectIdentifier] = parts match {
+ case CatalogAndIdentifier(maybeCatalog, nameParts) =>
+ Some((
+ maybeCatalog.orElse(defaultCatalog),
+ Identifier.of(nameParts.init.toArray, nameParts.last)
+ ))
+ }
+ }
+
/**
* Extract legacy table identifier from a multi-part identifier.
*
@@ -54,12 +109,12 @@ trait LookupCatalog {
*/
object AsTableIdentifier {
def unapply(parts: Seq[String]): Option[TableIdentifier] = parts match {
- case CatalogObjectIdentifier(None, ident) =>
- ident.namespace match {
- case Array() =>
- Some(TableIdentifier(ident.name))
- case Array(database) =>
- Some(TableIdentifier(ident.name, Some(database)))
+ case CatalogAndIdentifier(None, names) if defaultCatalog.isEmpty =>
+ names match {
+ case Seq(name) =>
+ Some(TableIdentifier(name))
+ case Seq(database, name) =>
+ Some(TableIdentifier(name, Some(database)))
case _ =>
None
}
@@ -67,4 +122,22 @@ trait LookupCatalog {
None
}
}
+
+ /**
+ * For temp views, extract a table identifier from a multi-part identifier if it has no catalog.
+ */
+ object AsTemporaryViewIdentifier {
+ def unapply(parts: Seq[String]): Option[TableIdentifier] = parts match {
+ case CatalogAndIdentifier(None, Seq(table)) =>
+ Some(TableIdentifier(table))
+ case CatalogAndIdentifier(None, Seq(database, table)) =>
+ Some(TableIdentifier(table, Some(database)))
+ case _ =>
+ None
+ }
+ }
+}
+
+object LookupCatalog {
+ val SESSION_CATALOG_NAME: String = "session"
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 5d37e90..1d0dba2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -104,6 +104,8 @@ class Analyzer(
this(catalog, conf, conf.optimizerMaxIterations)
}
+ override protected def defaultCatalogName: Option[String] = conf.defaultV2Catalog
+
override protected def lookupCatalog(name: String): CatalogPlugin =
throw new CatalogNotFoundException("No catalog lookup function")
@@ -667,6 +669,10 @@ class Analyzer(
import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util._
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+ case u @ UnresolvedRelation(AsTemporaryViewIdentifier(ident))
+ if catalog.isTemporaryTable(ident) =>
+ u // temporary views take precedence over catalog table names
+
case u @ UnresolvedRelation(CatalogObjectIdentifier(Some(catalogPlugin), ident)) =>
loadTable(catalogPlugin, ident).map(DataSourceV2Relation.create).getOrElse(u)
}
@@ -704,6 +710,10 @@ class Analyzer(
// Note this is compatible with the views defined by older versions of Spark(before 2.2), which
// have empty defaultDatabase and all the relations in viewText have database part defined.
def resolveRelation(plan: LogicalPlan): LogicalPlan = plan match {
+ case u @ UnresolvedRelation(AsTemporaryViewIdentifier(ident))
+ if catalog.isTemporaryTable(ident) =>
+ resolveRelation(lookupTableFromCatalog(ident, u, AnalysisContext.get.defaultDatabase))
+
case u @ UnresolvedRelation(AsTableIdentifier(ident)) if !isRunningDirectlyOnFiles(ident) =>
val defaultDatabase = AnalysisContext.get.defaultDatabase
val foundRelation = lookupTableFromCatalog(ident, u, defaultDatabase)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UpdateAttributeNullability.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UpdateAttributeNullability.scala
index 2210e18..3eae34d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UpdateAttributeNullability.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UpdateAttributeNullability.scala
@@ -37,7 +37,7 @@ object UpdateAttributeNullability extends Rule[LogicalPlan] {
case p if !p.resolved => p
// Skip leaf node, as it has no child and no need to update nullability.
case p: LeafNode => p
- case p: LogicalPlan =>
+ case p: LogicalPlan if p.childrenResolved =>
val nullabilities = p.children.flatMap(c => c.output).groupBy(_.exprId).map {
// If there are multiple Attributes having the same ExprId, we need to resolve
// the conflict of nullable field. We do not really expect this to happen.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 6fd4460..72f0983 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -432,7 +432,7 @@ case class CreateTableAsSelect(
override def children: Seq[LogicalPlan] = Seq(query)
- override lazy val resolved: Boolean = {
+ override lazy val resolved: Boolean = childrenResolved && {
// the table schema is created from the query schema, so the only resolution needed is to check
// that the columns referenced by the table's partitioning exist in the query schema
val references = partitioning.flatMap(_.references).toSet
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index af67632..e2636d2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1833,6 +1833,11 @@ object SQLConf {
.stringConf
.createOptional
+ val V2_SESSION_CATALOG = buildConf("spark.sql.catalog.session")
+ .doc("Name of the default v2 catalog, used when a catalog is not identified in queries")
+ .stringConf
+ .createWithDefault("org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog")
+
val LEGACY_LOOSE_UPCAST = buildConf("spark.sql.legacy.looseUpcast")
.doc("When true, the upcast will be loose and allows string to atomic types.")
.booleanConf
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/LookupCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/LookupCatalogSuite.scala
index 783751f..52543d1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/LookupCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/LookupCatalogSuite.scala
@@ -85,4 +85,111 @@ class LookupCatalogSuite extends SparkFunSuite with LookupCatalog with Inside {
}
}
}
+
+ test("temporary table identifier") {
+ Seq(
+ ("tbl", TableIdentifier("tbl")),
+ ("db.tbl", TableIdentifier("tbl", Some("db"))),
+ ("`db.tbl`", TableIdentifier("db.tbl")),
+ ("parquet.`file:/tmp/db.tbl`", TableIdentifier("file:/tmp/db.tbl", Some("parquet"))),
+ ("`org.apache.spark.sql.json`.`s3://buck/tmp/abc.json`",
+ TableIdentifier("s3://buck/tmp/abc.json", Some("org.apache.spark.sql.json")))).foreach {
+ case (sqlIdent: String, expectedTableIdent: TableIdentifier) =>
+ // when there is no catalog and the namespace has one part, the rule should match
+ inside(parseMultipartIdentifier(sqlIdent)) {
+ case AsTemporaryViewIdentifier(ident) =>
+ ident shouldEqual expectedTableIdent
+ }
+ }
+
+ Seq("prod.func", "prod.db.tbl", "test.db.tbl", "ns1.ns2.tbl", "test.ns1.ns2.ns3.tbl")
+ .foreach { sqlIdent =>
+ inside(parseMultipartIdentifier(sqlIdent)) {
+ case AsTemporaryViewIdentifier(_) =>
+ fail("AsTemporaryViewIdentifier should not match when " +
+ "the catalog is set or the namespace has multiple parts")
+ case _ =>
+ // expected
+ }
+ }
+ }
+}
+
+class LookupCatalogWithDefaultSuite extends SparkFunSuite with LookupCatalog with Inside {
+ import CatalystSqlParser._
+
+ private val catalogs = Seq("prod", "test").map(x => x -> new TestCatalogPlugin(x)).toMap
+
+ override def defaultCatalogName: Option[String] = Some("prod")
+
+ override def lookupCatalog(name: String): CatalogPlugin =
+ catalogs.getOrElse(name, throw new CatalogNotFoundException(s"$name not found"))
+
+ test("catalog object identifier") {
+ Seq(
+ ("tbl", catalogs.get("prod"), Seq.empty, "tbl"),
+ ("db.tbl", catalogs.get("prod"), Seq("db"), "tbl"),
+ ("prod.func", catalogs.get("prod"), Seq.empty, "func"),
+ ("ns1.ns2.tbl", catalogs.get("prod"), Seq("ns1", "ns2"), "tbl"),
+ ("prod.db.tbl", catalogs.get("prod"), Seq("db"), "tbl"),
+ ("test.db.tbl", catalogs.get("test"), Seq("db"), "tbl"),
+ ("test.ns1.ns2.ns3.tbl", catalogs.get("test"), Seq("ns1", "ns2", "ns3"), "tbl"),
+ ("`db.tbl`", catalogs.get("prod"), Seq.empty, "db.tbl"),
+ ("parquet.`file:/tmp/db.tbl`", catalogs.get("prod"), Seq("parquet"), "file:/tmp/db.tbl"),
+ ("`org.apache.spark.sql.json`.`s3://buck/tmp/abc.json`", catalogs.get("prod"),
+ Seq("org.apache.spark.sql.json"), "s3://buck/tmp/abc.json")).foreach {
+ case (sql, expectedCatalog, namespace, name) =>
+ inside(parseMultipartIdentifier(sql)) {
+ case CatalogObjectIdentifier(catalog, ident) =>
+ catalog shouldEqual expectedCatalog
+ ident shouldEqual Identifier.of(namespace.toArray, name)
+ }
+ }
+ }
+
+ test("table identifier") {
+ Seq(
+ "tbl",
+ "db.tbl",
+ "`db.tbl`",
+ "parquet.`file:/tmp/db.tbl`",
+ "`org.apache.spark.sql.json`.`s3://buck/tmp/abc.json`",
+ "prod.func",
+ "prod.db.tbl",
+ "ns1.ns2.tbl").foreach { sql =>
+ parseMultipartIdentifier(sql) match {
+ case AsTableIdentifier(_) =>
+ fail(s"$sql should not be resolved as TableIdentifier")
+ case _ =>
+ }
+ }
+ }
+
+ test("temporary table identifier") {
+ Seq(
+ ("tbl", TableIdentifier("tbl")),
+ ("db.tbl", TableIdentifier("tbl", Some("db"))),
+ ("`db.tbl`", TableIdentifier("db.tbl")),
+ ("parquet.`file:/tmp/db.tbl`", TableIdentifier("file:/tmp/db.tbl", Some("parquet"))),
+ ("`org.apache.spark.sql.json`.`s3://buck/tmp/abc.json`",
+ TableIdentifier("s3://buck/tmp/abc.json", Some("org.apache.spark.sql.json")))).foreach {
+ case (sqlIdent: String, expectedTableIdent: TableIdentifier) =>
+ // when there is no catalog and the namespace has one part, the rule should match
+ inside(parseMultipartIdentifier(sqlIdent)) {
+ case AsTemporaryViewIdentifier(ident) =>
+ ident shouldEqual expectedTableIdent
+ }
+ }
+
+ Seq("prod.func", "prod.db.tbl", "test.db.tbl", "ns1.ns2.tbl", "test.ns1.ns2.ns3.tbl")
+ .foreach { sqlIdent =>
+ inside(parseMultipartIdentifier(sqlIdent)) {
+ case AsTemporaryViewIdentifier(_) =>
+ fail("AsTemporaryViewIdentifier should not match when " +
+ "the catalog is set or the namespace has multiple parts")
+ case _ =>
+ // expected
+ }
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
index 26f7230..1b7bb16 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
@@ -26,31 +26,32 @@ import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, LookupCatalog
import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.CastSupport
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, UnresolvedCatalogRelation}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DropTableCommand}
+import org.apache.spark.sql.execution.datasources.v2.{CatalogTableAsV2, DataSourceV2Relation}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.TableProvider
import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBuilder, StructField, StructType}
case class DataSourceResolution(
conf: SQLConf,
- findCatalog: String => CatalogPlugin)
- extends Rule[LogicalPlan] with CastSupport with LookupCatalog {
+ lookup: LookupCatalog)
+ extends Rule[LogicalPlan] with CastSupport {
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
+ import lookup._
- override protected def lookupCatalog(name: String): CatalogPlugin = findCatalog(name)
-
- def defaultCatalog: Option[CatalogPlugin] = conf.defaultV2Catalog.map(findCatalog)
+ lazy val v2SessionCatalog: CatalogPlugin = lookup.sessionCatalog
+ .getOrElse(throw new AnalysisException("No v2 session catalog implementation is available"))
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case CreateTableStatement(
AsTableIdentifier(table), schema, partitionCols, bucketSpec, properties,
V1WriteProvider(provider), options, location, comment, ifNotExists) =>
-
+ // the source is v1, the identifier has no catalog, and there is no default v2 catalog
val tableDesc = buildCatalogTable(table, schema, partitionCols, bucketSpec, properties,
provider, options, location, comment, ifNotExists)
val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
@@ -58,18 +59,22 @@ case class DataSourceResolution(
CreateTable(tableDesc, mode, None)
case create: CreateTableStatement =>
- // the provider was not a v1 source, convert to a v2 plan
+ // the provider was not a v1 source or a v2 catalog is the default, convert to a v2 plan
val CatalogObjectIdentifier(maybeCatalog, identifier) = create.tableName
- val catalog = maybeCatalog.orElse(defaultCatalog)
- .getOrElse(throw new AnalysisException(
- s"No catalog specified for table ${identifier.quoted} and no default catalog is set"))
- .asTableCatalog
- convertCreateTable(catalog, identifier, create)
+ maybeCatalog match {
+ case Some(catalog) =>
+ // the identifier had a catalog, or there is a default v2 catalog
+ convertCreateTable(catalog.asTableCatalog, identifier, create)
+ case _ =>
+ // the identifier had no catalog and no default catalog is set, but the source is v2.
+ // use the v2 session catalog, which delegates to the global v1 session catalog
+ convertCreateTable(v2SessionCatalog.asTableCatalog, identifier, create)
+ }
case CreateTableAsSelectStatement(
AsTableIdentifier(table), query, partitionCols, bucketSpec, properties,
V1WriteProvider(provider), options, location, comment, ifNotExists) =>
-
+ // the source is v1, the identifier has no catalog, and there is no default v2 catalog
val tableDesc = buildCatalogTable(table, new StructType, partitionCols, bucketSpec,
properties, provider, options, location, comment, ifNotExists)
val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
@@ -77,13 +82,17 @@ case class DataSourceResolution(
CreateTable(tableDesc, mode, Some(query))
case create: CreateTableAsSelectStatement =>
- // the provider was not a v1 source, convert to a v2 plan
+ // the provider was not a v1 source or a v2 catalog is the default, convert to a v2 plan
val CatalogObjectIdentifier(maybeCatalog, identifier) = create.tableName
- val catalog = maybeCatalog.orElse(defaultCatalog)
- .getOrElse(throw new AnalysisException(
- s"No catalog specified for table ${identifier.quoted} and no default catalog is set"))
- .asTableCatalog
- convertCTAS(catalog, identifier, create)
+ maybeCatalog match {
+ case Some(catalog) =>
+ // the identifier had a catalog, or there is a default v2 catalog
+ convertCTAS(catalog.asTableCatalog, identifier, create)
+ case _ =>
+ // the identifier had no catalog and no default catalog is set, but the source is v2.
+ // use the v2 session catalog, which delegates to the global v1 session catalog
+ convertCTAS(v2SessionCatalog.asTableCatalog, identifier, create)
+ }
case DropTableStatement(CatalogObjectIdentifier(Some(catalog), ident), ifExists, _) =>
DropTable(catalog.asTableCatalog, ident, ifExists)
@@ -118,6 +127,9 @@ case class DataSourceResolution(
if newColumns.forall(_.name.size == 1) =>
// only top-level adds are supported using AlterTableAddColumnsCommand
AlterTableAddColumnsCommand(table, newColumns.map(convertToStructField))
+
+ case DataSourceV2Relation(CatalogTableAsV2(catalogTable), _, _) =>
+ UnresolvedCatalogRelation(catalogTable)
}
object V1WriteProvider {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
new file mode 100644
index 0000000..4cd0346
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import java.util
+import java.util.Locale
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog, TableChange}
+import org.apache.spark.sql.catalog.v2.expressions.{BucketTransform, FieldReference, IdentityTransform, LogicalExpressions, Transform}
+import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, SessionCatalog}
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.sql.sources.v2.{Table, TableCapability}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+/**
+ * A [[TableCatalog]] that translates calls to the v1 SessionCatalog.
+ */
+class V2SessionCatalog(sessionState: SessionState) extends TableCatalog {
+ def this() = {
+ this(SparkSession.active.sessionState)
+ }
+
+ private lazy val catalog: SessionCatalog = sessionState.catalog
+
+ private var _name: String = _
+
+ override def name: String = _name
+
+ override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
+ this._name = name
+ }
+
+ override def listTables(namespace: Array[String]): Array[Identifier] = {
+ namespace match {
+ case Array(db) =>
+ catalog.listTables(db).map(ident => Identifier.of(Array(db), ident.table)).toArray
+ case _ =>
+ throw new NoSuchNamespaceException(namespace)
+ }
+ }
+
+ override def loadTable(ident: Identifier): Table = {
+ val catalogTable = try {
+ catalog.getTableMetadata(ident.asTableIdentifier)
+ } catch {
+ case _: NoSuchTableException =>
+ throw new NoSuchTableException(ident)
+ }
+
+ CatalogTableAsV2(catalogTable)
+ }
+
+ override def invalidateTable(ident: Identifier): Unit = {
+ catalog.refreshTable(ident.asTableIdentifier)
+ }
+
+ override def createTable(
+ ident: Identifier,
+ schema: StructType,
+ partitions: Array[Transform],
+ properties: util.Map[String, String]): Table = {
+
+ val (partitionColumns, maybeBucketSpec) = V2SessionCatalog.convertTransforms(partitions)
+ val provider = properties.getOrDefault("provider", sessionState.conf.defaultDataSourceName)
+ val tableProperties = properties.asScala
+ val location = Option(properties.get("location"))
+ val storage = DataSource.buildStorageFormatFromOptions(tableProperties.toMap)
+ .copy(locationUri = location.map(CatalogUtils.stringToURI))
+
+ val tableDesc = CatalogTable(
+ identifier = ident.asTableIdentifier,
+ tableType = CatalogTableType.MANAGED,
+ storage = storage,
+ schema = schema,
+ provider = Some(provider),
+ partitionColumnNames = partitionColumns,
+ bucketSpec = maybeBucketSpec,
+ properties = tableProperties.toMap,
+ tracksPartitionsInCatalog = sessionState.conf.manageFilesourcePartitions,
+ comment = Option(properties.get("comment")))
+
+ try {
+ catalog.createTable(tableDesc, ignoreIfExists = false)
+ } catch {
+ case _: TableAlreadyExistsException =>
+ throw new TableAlreadyExistsException(ident)
+ }
+
+ loadTable(ident)
+ }
+
+ override def alterTable(
+ ident: Identifier,
+ changes: TableChange*): Table = {
+ val catalogTable = try {
+ catalog.getTableMetadata(ident.asTableIdentifier)
+ } catch {
+ case _: NoSuchTableException =>
+ throw new NoSuchTableException(ident)
+ }
+
+ val properties = CatalogV2Util.applyPropertiesChanges(catalogTable.properties, changes)
+ val schema = CatalogV2Util.applySchemaChanges(catalogTable.schema, changes)
+
+ try {
+ catalog.alterTable(catalogTable.copy(properties = properties, schema = schema))
+ } catch {
+ case _: NoSuchTableException =>
+ throw new NoSuchTableException(ident)
+ }
+
+ loadTable(ident)
+ }
+
+ override def dropTable(ident: Identifier): Boolean = {
+ try {
+ if (loadTable(ident) != null) {
+ catalog.dropTable(
+ ident.asTableIdentifier,
+ ignoreIfNotExists = true,
+ purge = true /* skip HDFS trash */)
+ true
+ } else {
+ false
+ }
+ } catch {
+ case _: NoSuchTableException =>
+ false
+ }
+ }
+
+ implicit class TableIdentifierHelper(ident: Identifier) {
+ def asTableIdentifier: TableIdentifier = {
+ ident.namespace match {
+ case Array(db) =>
+ TableIdentifier(ident.name, Some(db))
+ case Array() =>
+ TableIdentifier(ident.name, Some(catalog.getCurrentDatabase))
+ case _ =>
+ throw new NoSuchTableException(ident)
+ }
+ }
+ }
+
+ override def toString: String = s"V2SessionCatalog($name)"
+}
+
+/**
+ * An implementation of catalog v2 [[Table]] to expose v1 table metadata.
+ */
+case class CatalogTableAsV2(v1Table: CatalogTable) extends Table {
+ implicit class IdentifierHelper(identifier: TableIdentifier) {
+ def quoted: String = {
+ identifier.database match {
+ case Some(db) =>
+ Seq(db, identifier.table).map(quote).mkString(".")
+ case _ =>
+ quote(identifier.table)
+
+ }
+ }
+
+ private def quote(part: String): String = {
+ if (part.contains(".") || part.contains("`")) {
+ s"`${part.replace("`", "``")}`"
+ } else {
+ part
+ }
+ }
+ }
+
+ def catalogTable: CatalogTable = v1Table
+
+ lazy val options: Map[String, String] = {
+ v1Table.storage.locationUri match {
+ case Some(uri) =>
+ v1Table.storage.properties + ("path" -> uri.toString)
+ case _ =>
+ v1Table.storage.properties
+ }
+ }
+
+ override lazy val properties: util.Map[String, String] = v1Table.properties.asJava
+
+ override lazy val schema: StructType = v1Table.schema
+
+ override lazy val partitioning: Array[Transform] = {
+ val partitions = new mutable.ArrayBuffer[Transform]()
+
+ v1Table.partitionColumnNames.foreach { col =>
+ partitions += LogicalExpressions.identity(col)
+ }
+
+ v1Table.bucketSpec.foreach { spec =>
+ partitions += LogicalExpressions.bucket(spec.numBuckets, spec.bucketColumnNames: _*)
+ }
+
+ partitions.toArray
+ }
+
+ override def name: String = v1Table.identifier.quoted
+
+ override def capabilities: util.Set[TableCapability] = new util.HashSet[TableCapability]()
+
+ override def toString: String = s"CatalogTableAsV2($name)"
+}
+
+private[sql] object V2SessionCatalog {
+ /**
+ * Convert v2 Transforms to v1 partition columns and an optional bucket spec.
+ */
+ private def convertTransforms(partitions: Seq[Transform]): (Seq[String], Option[BucketSpec]) = {
+ val identityCols = new mutable.ArrayBuffer[String]
+ var bucketSpec = Option.empty[BucketSpec]
+
+ partitions.map {
+ case IdentityTransform(FieldReference(Seq(col))) =>
+ identityCols += col
+
+ case BucketTransform(numBuckets, FieldReference(Seq(col))) =>
+ bucketSpec = Some(BucketSpec(numBuckets, col :: Nil, Nil))
+
+ case transform =>
+ throw new UnsupportedOperationException(
+ s"SessionCatalog does not support partition transform: $transform")
+ }
+
+ (identityCols, bucketSpec)
+ }
+}
+
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index 8dc30ea..b05a5df 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -170,7 +170,7 @@ abstract class BaseSessionStateBuilder(
new FindDataSourceTable(session) +:
new ResolveSQLOnFile(session) +:
new FallBackFileSourceV2(session) +:
- DataSourceResolution(conf, session.catalog(_)) +:
+ DataSourceResolution(conf, this) +:
customResolutionRules
override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 727160d..7df0dab 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -21,7 +21,7 @@ import java.net.URI
import java.util.Locale
import org.apache.spark.sql.{AnalysisException, SaveMode}
-import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin, Identifier, TableCatalog, TestTableCatalog}
+import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin, Identifier, LookupCatalog, TableCatalog, TestTableCatalog}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.AnalysisTest
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
@@ -43,17 +43,43 @@ class PlanResolutionSuite extends AnalysisTest {
newCatalog
}
- private val lookupCatalog: String => CatalogPlugin = {
- case "testcat" =>
- testCat
- case name =>
- throw new CatalogNotFoundException(s"No such catalog: $name")
+ private val v2SessionCatalog = {
+ val newCatalog = new TestTableCatalog
+ newCatalog.initialize("session", CaseInsensitiveStringMap.empty())
+ newCatalog
+ }
+
+ private val lookupWithDefault: LookupCatalog = new LookupCatalog {
+ override protected def defaultCatalogName: Option[String] = Some("testcat")
+
+ override protected def lookupCatalog(name: String): CatalogPlugin = name match {
+ case "testcat" =>
+ testCat
+ case "session" =>
+ v2SessionCatalog
+ case _ =>
+ throw new CatalogNotFoundException(s"No such catalog: $name")
+ }
+ }
+
+ private val lookupWithoutDefault: LookupCatalog = new LookupCatalog {
+ override protected def defaultCatalogName: Option[String] = None
+
+ override protected def lookupCatalog(name: String): CatalogPlugin = name match {
+ case "testcat" =>
+ testCat
+ case "session" =>
+ v2SessionCatalog
+ case _ =>
+ throw new CatalogNotFoundException(s"No such catalog: $name")
+ }
}
- def parseAndResolve(query: String): LogicalPlan = {
+ def parseAndResolve(query: String, withDefault: Boolean = false): LogicalPlan = {
val newConf = conf.copy()
newConf.setConfString("spark.sql.default.catalog", "testcat")
- DataSourceResolution(newConf, lookupCatalog).apply(parsePlan(query))
+ DataSourceResolution(newConf, if (withDefault) lookupWithDefault else lookupWithoutDefault)
+ .apply(parsePlan(query))
}
private def parseResolveCompare(query: String, expected: LogicalPlan): Unit =
@@ -338,7 +364,46 @@ class PlanResolutionSuite extends AnalysisTest {
}
}
- test("Test v2 CreateTable with data source v2 provider") {
+ test("Test v2 CreateTable with default catalog") {
+ val sql =
+ s"""
+ |CREATE TABLE IF NOT EXISTS mydb.table_name (
+ | id bigint,
+ | description string,
+ | point struct<x: double, y: double>)
+ |USING parquet
+ |COMMENT 'table comment'
+ |TBLPROPERTIES ('p1'='v1', 'p2'='v2')
+ |OPTIONS (path 's3://bucket/path/to/data', other 20)
+ """.stripMargin
+
+ val expectedProperties = Map(
+ "p1" -> "v1",
+ "p2" -> "v2",
+ "other" -> "20",
+ "provider" -> "parquet",
+ "location" -> "s3://bucket/path/to/data",
+ "comment" -> "table comment")
+
+ parseAndResolve(sql, withDefault = true) match {
+ case create: CreateV2Table =>
+ assert(create.catalog.name == "testcat")
+ assert(create.tableName == Identifier.of(Array("mydb"), "table_name"))
+ assert(create.tableSchema == new StructType()
+ .add("id", LongType)
+ .add("description", StringType)
+ .add("point", new StructType().add("x", DoubleType).add("y", DoubleType)))
+ assert(create.partitioning.isEmpty)
+ assert(create.properties == expectedProperties)
+ assert(create.ignoreIfExists)
+
+ case other =>
+ fail(s"Expected to parse ${classOf[CreateV2Table].getName} from query," +
+ s"got ${other.getClass.getName}: $sql")
+ }
+ }
+
+ test("Test v2 CreateTable with data source v2 provider and no default") {
val sql =
s"""
|CREATE TABLE IF NOT EXISTS mydb.page_view (
@@ -360,7 +425,7 @@ class PlanResolutionSuite extends AnalysisTest {
parseAndResolve(sql) match {
case create: CreateV2Table =>
- assert(create.catalog.name == "testcat")
+ assert(create.catalog.name == "session")
assert(create.tableName == Identifier.of(Array("mydb"), "page_view"))
assert(create.tableSchema == new StructType()
.add("id", LongType)
@@ -410,7 +475,41 @@ class PlanResolutionSuite extends AnalysisTest {
}
}
- test("Test v2 CTAS with data source v2 provider") {
+ test("Test v2 CTAS with default catalog") {
+ val sql =
+ s"""
+ |CREATE TABLE IF NOT EXISTS mydb.table_name
+ |USING parquet
+ |COMMENT 'table comment'
+ |TBLPROPERTIES ('p1'='v1', 'p2'='v2')
+ |OPTIONS (path 's3://bucket/path/to/data', other 20)
+ |AS SELECT * FROM src
+ """.stripMargin
+
+ val expectedProperties = Map(
+ "p1" -> "v1",
+ "p2" -> "v2",
+ "other" -> "20",
+ "provider" -> "parquet",
+ "location" -> "s3://bucket/path/to/data",
+ "comment" -> "table comment")
+
+ parseAndResolve(sql, withDefault = true) match {
+ case ctas: CreateTableAsSelect =>
+ assert(ctas.catalog.name == "testcat")
+ assert(ctas.tableName == Identifier.of(Array("mydb"), "table_name"))
+ assert(ctas.properties == expectedProperties)
+ assert(ctas.writeOptions == Map("other" -> "20"))
+ assert(ctas.partitioning.isEmpty)
+ assert(ctas.ignoreIfExists)
+
+ case other =>
+ fail(s"Expected to parse ${classOf[CreateTableAsSelect].getName} from query," +
+ s"got ${other.getClass.getName}: $sql")
+ }
+ }
+
+ test("Test v2 CTAS with data source v2 provider and no default") {
val sql =
s"""
|CREATE TABLE IF NOT EXISTS mydb.page_view
@@ -430,7 +529,7 @@ class PlanResolutionSuite extends AnalysisTest {
parseAndResolve(sql) match {
case ctas: CreateTableAsSelect =>
- assert(ctas.catalog.name == "testcat")
+ assert(ctas.catalog.name == "session")
assert(ctas.tableName == Identifier.of(Array("mydb"), "page_view"))
assert(ctas.properties == expectedProperties)
assert(ctas.writeOptions.isEmpty)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
new file mode 100644
index 0000000..3822882
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
@@ -0,0 +1,683 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import java.util
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalog.v2.{Catalogs, Identifier, TableCatalog, TableChange}
+import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class V2SessionCatalogSuite
+ extends SparkFunSuite with SharedSQLContext with BeforeAndAfter with BeforeAndAfterAll {
+ import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
+
+ private val emptyProps: util.Map[String, String] = Collections.emptyMap[String, String]
+ private val schema: StructType = new StructType()
+ .add("id", IntegerType)
+ .add("data", StringType)
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+ spark.sql("""CREATE DATABASE IF NOT EXISTS db""")
+ spark.sql("""CREATE DATABASE IF NOT EXISTS ns""")
+ spark.sql("""CREATE DATABASE IF NOT EXISTS ns2""")
+ }
+
+ override protected def afterAll(): Unit = {
+ spark.sql("""DROP TABLE IF EXISTS db.test_table""")
+ spark.sql("""DROP DATABASE IF EXISTS db""")
+ spark.sql("""DROP DATABASE IF EXISTS ns""")
+ spark.sql("""DROP DATABASE IF EXISTS ns2""")
+ super.afterAll()
+ }
+
+ after {
+ newCatalog().dropTable(testIdent)
+ }
+
+ private def newCatalog(): TableCatalog = {
+ val newCatalog = new V2SessionCatalog(spark.sessionState)
+ newCatalog.initialize("test", CaseInsensitiveStringMap.empty())
+ newCatalog
+ }
+
+ private val testIdent = Identifier.of(Array("db"), "test_table")
+
+ test("Catalogs can load the catalog") {
+ val catalog = newCatalog()
+
+ val conf = new SQLConf
+ conf.setConfString("spark.sql.catalog.test", catalog.getClass.getName)
+
+ val loaded = Catalogs.load("test", conf)
+ assert(loaded.getClass == catalog.getClass)
+ }
+
+ test("listTables") {
+ val catalog = newCatalog()
+ val ident1 = Identifier.of(Array("ns"), "test_table_1")
+ val ident2 = Identifier.of(Array("ns"), "test_table_2")
+ val ident3 = Identifier.of(Array("ns2"), "test_table_1")
+
+ assert(catalog.listTables(Array("ns")).isEmpty)
+
+ catalog.createTable(ident1, schema, Array.empty, emptyProps)
+
+ assert(catalog.listTables(Array("ns")).toSet == Set(ident1))
+ assert(catalog.listTables(Array("ns2")).isEmpty)
+
+ catalog.createTable(ident3, schema, Array.empty, emptyProps)
+ catalog.createTable(ident2, schema, Array.empty, emptyProps)
+
+ assert(catalog.listTables(Array("ns")).toSet == Set(ident1, ident2))
+ assert(catalog.listTables(Array("ns2")).toSet == Set(ident3))
+
+ catalog.dropTable(ident1)
+
+ assert(catalog.listTables(Array("ns")).toSet == Set(ident2))
+
+ catalog.dropTable(ident2)
+
+ assert(catalog.listTables(Array("ns")).isEmpty)
+ assert(catalog.listTables(Array("ns2")).toSet == Set(ident3))
+
+ catalog.dropTable(ident3)
+ }
+
+ test("createTable") {
+ val catalog = newCatalog()
+
+ assert(!catalog.tableExists(testIdent))
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
+ assert(parsed == Seq("db", "test_table"))
+ assert(table.schema == schema)
+ assert(table.properties.asScala == Map())
+
+ assert(catalog.tableExists(testIdent))
+ }
+
+ test("createTable: with properties") {
+ val catalog = newCatalog()
+
+ val properties = new util.HashMap[String, String]()
+ properties.put("property", "value")
+
+ assert(!catalog.tableExists(testIdent))
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, properties)
+
+ val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
+ assert(parsed == Seq("db", "test_table"))
+ assert(table.schema == schema)
+ assert(table.properties == properties)
+
+ assert(catalog.tableExists(testIdent))
+ }
+
+ test("createTable: table already exists") {
+ val catalog = newCatalog()
+
+ assert(!catalog.tableExists(testIdent))
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ val exc = intercept[TableAlreadyExistsException] {
+ catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ }
+
+ assert(exc.message.contains(table.name()))
+ assert(exc.message.contains("already exists"))
+
+ assert(catalog.tableExists(testIdent))
+ }
+
+ test("tableExists") {
+ val catalog = newCatalog()
+
+ assert(!catalog.tableExists(testIdent))
+
+ catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(catalog.tableExists(testIdent))
+
+ catalog.dropTable(testIdent)
+
+ assert(!catalog.tableExists(testIdent))
+ }
+
+ test("loadTable") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val loaded = catalog.loadTable(testIdent)
+
+ assert(table.name == loaded.name)
+ assert(table.schema == loaded.schema)
+ assert(table.properties == loaded.properties)
+ }
+
+ test("loadTable: table does not exist") {
+ val catalog = newCatalog()
+
+ val exc = intercept[NoSuchTableException] {
+ catalog.loadTable(testIdent)
+ }
+
+ assert(exc.message.contains(testIdent.quoted))
+ assert(exc.message.contains("not found"))
+ }
+
+ test("invalidateTable") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ catalog.invalidateTable(testIdent)
+
+ val loaded = catalog.loadTable(testIdent)
+
+ assert(table.name == loaded.name)
+ assert(table.schema == loaded.schema)
+ assert(table.properties == loaded.properties)
+ }
+
+ test("invalidateTable: table does not exist") {
+ val catalog = newCatalog()
+
+ assert(catalog.tableExists(testIdent) === false)
+
+ catalog.invalidateTable(testIdent)
+ }
+
+ test("alterTable: add property") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.properties.asScala == Map())
+
+ val updated = catalog.alterTable(testIdent, TableChange.setProperty("prop-1", "1"))
+ assert(updated.properties.asScala == Map("prop-1" -> "1"))
+
+ val loaded = catalog.loadTable(testIdent)
+ assert(loaded.properties.asScala == Map("prop-1" -> "1"))
+
+ assert(table.properties.asScala == Map())
+ }
+
+ test("alterTable: add property to existing") {
+ val catalog = newCatalog()
+
+ val properties = new util.HashMap[String, String]()
+ properties.put("prop-1", "1")
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, properties)
+
+ assert(table.properties.asScala == Map("prop-1" -> "1"))
+
+ val updated = catalog.alterTable(testIdent, TableChange.setProperty("prop-2", "2"))
+ assert(updated.properties.asScala == Map("prop-1" -> "1", "prop-2" -> "2"))
+
+ val loaded = catalog.loadTable(testIdent)
+ assert(loaded.properties.asScala == Map("prop-1" -> "1", "prop-2" -> "2"))
+
+ assert(table.properties.asScala == Map("prop-1" -> "1"))
+ }
+
+ test("alterTable: remove existing property") {
+ val catalog = newCatalog()
+
+ val properties = new util.HashMap[String, String]()
+ properties.put("prop-1", "1")
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, properties)
+
+ assert(table.properties.asScala == Map("prop-1" -> "1"))
+
+ val updated = catalog.alterTable(testIdent, TableChange.removeProperty("prop-1"))
+ assert(updated.properties.asScala == Map())
+
+ val loaded = catalog.loadTable(testIdent)
+ assert(loaded.properties.asScala == Map())
+
+ assert(table.properties.asScala == Map("prop-1" -> "1"))
+ }
+
+ test("alterTable: remove missing property") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.properties.asScala == Map())
+
+ val updated = catalog.alterTable(testIdent, TableChange.removeProperty("prop-1"))
+ assert(updated.properties.asScala == Map())
+
+ val loaded = catalog.loadTable(testIdent)
+ assert(loaded.properties.asScala == Map())
+
+ assert(table.properties.asScala == Map())
+ }
+
+ test("alterTable: add top-level column") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ val updated = catalog.alterTable(testIdent, TableChange.addColumn(Array("ts"), TimestampType))
+
+ assert(updated.schema == schema.add("ts", TimestampType))
+ }
+
+ test("alterTable: add required column") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ val updated = catalog.alterTable(testIdent,
+ TableChange.addColumn(Array("ts"), TimestampType, false))
+
+ assert(updated.schema == schema.add("ts", TimestampType, nullable = false))
+ }
+
+ test("alterTable: add column with comment") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ val updated = catalog.alterTable(testIdent,
+ TableChange.addColumn(Array("ts"), TimestampType, false, "comment text"))
+
+ val field = StructField("ts", TimestampType, nullable = false).withComment("comment text")
+ assert(updated.schema == schema.add(field))
+ }
+
+ test("alterTable: add nested column") {
+ val catalog = newCatalog()
+
+ val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
+ val tableSchema = schema.add("point", pointStruct)
+
+ val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps)
+
+ assert(table.schema == tableSchema)
+
+ val updated = catalog.alterTable(testIdent,
+ TableChange.addColumn(Array("point", "z"), DoubleType))
+
+ val expectedSchema = schema.add("point", pointStruct.add("z", DoubleType))
+
+ assert(updated.schema == expectedSchema)
+ }
+
+ test("alterTable: add column to primitive field fails") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ val exc = intercept[IllegalArgumentException] {
+ catalog.alterTable(testIdent, TableChange.addColumn(Array("data", "ts"), TimestampType))
+ }
+
+ assert(exc.getMessage.contains("Not a struct"))
+ assert(exc.getMessage.contains("data"))
+
+ // the table has not changed
+ assert(catalog.loadTable(testIdent).schema == schema)
+ }
+
+ test("alterTable: add field to missing column fails") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ val exc = intercept[IllegalArgumentException] {
+ catalog.alterTable(testIdent,
+ TableChange.addColumn(Array("missing_col", "new_field"), StringType))
+ }
+
+ assert(exc.getMessage.contains("missing_col"))
+ assert(exc.getMessage.contains("Cannot find"))
+ }
+
+ test("alterTable: update column data type") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ val updated = catalog.alterTable(testIdent, TableChange.updateColumnType(Array("id"), LongType))
+
+ val expectedSchema = new StructType().add("id", LongType).add("data", StringType)
+ assert(updated.schema == expectedSchema)
+ }
+
+ test("alterTable: update column data type and nullability") {
+ val catalog = newCatalog()
+
+ val originalSchema = new StructType()
+ .add("id", IntegerType, nullable = false)
+ .add("data", StringType)
+ val table = catalog.createTable(testIdent, originalSchema, Array.empty, emptyProps)
+
+ assert(table.schema == originalSchema)
+
+ val updated = catalog.alterTable(testIdent,
+ TableChange.updateColumnType(Array("id"), LongType, true))
+
+ val expectedSchema = new StructType().add("id", LongType).add("data", StringType)
+ assert(updated.schema == expectedSchema)
+ }
+
+ test("alterTable: update optional column to required fails") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ val exc = intercept[IllegalArgumentException] {
+ catalog.alterTable(testIdent, TableChange.updateColumnType(Array("id"), LongType, false))
+ }
+
+ assert(exc.getMessage.contains("Cannot change optional column to required"))
+ assert(exc.getMessage.contains("id"))
+ }
+
+ test("alterTable: update missing column fails") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ val exc = intercept[IllegalArgumentException] {
+ catalog.alterTable(testIdent,
+ TableChange.updateColumnType(Array("missing_col"), LongType))
+ }
+
+ assert(exc.getMessage.contains("missing_col"))
+ assert(exc.getMessage.contains("Cannot find"))
+ }
+
+ test("alterTable: add comment") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ val updated = catalog.alterTable(testIdent,
+ TableChange.updateColumnComment(Array("id"), "comment text"))
+
+ val expectedSchema = new StructType()
+ .add("id", IntegerType, nullable = true, "comment text")
+ .add("data", StringType)
+ assert(updated.schema == expectedSchema)
+ }
+
+ test("alterTable: replace comment") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ catalog.alterTable(testIdent, TableChange.updateColumnComment(Array("id"), "comment text"))
+
+ val expectedSchema = new StructType()
+ .add("id", IntegerType, nullable = true, "replacement comment")
+ .add("data", StringType)
+
+ val updated = catalog.alterTable(testIdent,
+ TableChange.updateColumnComment(Array("id"), "replacement comment"))
+
+ assert(updated.schema == expectedSchema)
+ }
+
+ test("alterTable: add comment to missing column fails") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ val exc = intercept[IllegalArgumentException] {
+ catalog.alterTable(testIdent,
+ TableChange.updateColumnComment(Array("missing_col"), "comment"))
+ }
+
+ assert(exc.getMessage.contains("missing_col"))
+ assert(exc.getMessage.contains("Cannot find"))
+ }
+
+ test("alterTable: rename top-level column") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ val updated = catalog.alterTable(testIdent, TableChange.renameColumn(Array("id"), "some_id"))
+
+ val expectedSchema = new StructType().add("some_id", IntegerType).add("data", StringType)
+
+ assert(updated.schema == expectedSchema)
+ }
+
+ test("alterTable: rename nested column") {
+ val catalog = newCatalog()
+
+ val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
+ val tableSchema = schema.add("point", pointStruct)
+
+ val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps)
+
+ assert(table.schema == tableSchema)
+
+ val updated = catalog.alterTable(testIdent,
+ TableChange.renameColumn(Array("point", "x"), "first"))
+
+ val newPointStruct = new StructType().add("first", DoubleType).add("y", DoubleType)
+ val expectedSchema = schema.add("point", newPointStruct)
+
+ assert(updated.schema == expectedSchema)
+ }
+
+ test("alterTable: rename struct column") {
+ val catalog = newCatalog()
+
+ val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
+ val tableSchema = schema.add("point", pointStruct)
+
+ val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps)
+
+ assert(table.schema == tableSchema)
+
+ val updated = catalog.alterTable(testIdent,
+ TableChange.renameColumn(Array("point"), "p"))
+
+ val newPointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
+ val expectedSchema = schema.add("p", newPointStruct)
+
+ assert(updated.schema == expectedSchema)
+ }
+
+ test("alterTable: rename missing column fails") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ val exc = intercept[IllegalArgumentException] {
+ catalog.alterTable(testIdent,
+ TableChange.renameColumn(Array("missing_col"), "new_name"))
+ }
+
+ assert(exc.getMessage.contains("missing_col"))
+ assert(exc.getMessage.contains("Cannot find"))
+ }
+
+ test("alterTable: multiple changes") {
+ val catalog = newCatalog()
+
+ val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
+ val tableSchema = schema.add("point", pointStruct)
+
+ val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps)
+
+ assert(table.schema == tableSchema)
+
+ val updated = catalog.alterTable(testIdent,
+ TableChange.renameColumn(Array("point", "x"), "first"),
+ TableChange.renameColumn(Array("point", "y"), "second"))
+
+ val newPointStruct = new StructType().add("first", DoubleType).add("second", DoubleType)
+ val expectedSchema = schema.add("point", newPointStruct)
+
+ assert(updated.schema == expectedSchema)
+ }
+
+ test("alterTable: delete top-level column") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ val updated = catalog.alterTable(testIdent,
+ TableChange.deleteColumn(Array("id")))
+
+ val expectedSchema = new StructType().add("data", StringType)
+ assert(updated.schema == expectedSchema)
+ }
+
+ test("alterTable: delete nested column") {
+ val catalog = newCatalog()
+
+ val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
+ val tableSchema = schema.add("point", pointStruct)
+
+ val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps)
+
+ assert(table.schema == tableSchema)
+
+ val updated = catalog.alterTable(testIdent,
+ TableChange.deleteColumn(Array("point", "y")))
+
+ val newPointStruct = new StructType().add("x", DoubleType)
+ val expectedSchema = schema.add("point", newPointStruct)
+
+ assert(updated.schema == expectedSchema)
+ }
+
+ test("alterTable: delete missing column fails") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ val exc = intercept[IllegalArgumentException] {
+ catalog.alterTable(testIdent, TableChange.deleteColumn(Array("missing_col")))
+ }
+
+ assert(exc.getMessage.contains("missing_col"))
+ assert(exc.getMessage.contains("Cannot find"))
+ }
+
+ test("alterTable: delete missing nested column fails") {
+ val catalog = newCatalog()
+
+ val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
+ val tableSchema = schema.add("point", pointStruct)
+
+ val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps)
+
+ assert(table.schema == tableSchema)
+
+ val exc = intercept[IllegalArgumentException] {
+ catalog.alterTable(testIdent, TableChange.deleteColumn(Array("point", "z")))
+ }
+
+ assert(exc.getMessage.contains("z"))
+ assert(exc.getMessage.contains("Cannot find"))
+ }
+
+ test("alterTable: table does not exist") {
+ val catalog = newCatalog()
+
+ val exc = intercept[NoSuchTableException] {
+ catalog.alterTable(testIdent, TableChange.setProperty("prop", "val"))
+ }
+
+ assert(exc.message.contains(testIdent.quoted))
+ assert(exc.message.contains("not found"))
+ }
+
+ test("dropTable") {
+ val catalog = newCatalog()
+
+ assert(!catalog.tableExists(testIdent))
+
+ catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(catalog.tableExists(testIdent))
+
+ val wasDropped = catalog.dropTable(testIdent)
+
+ assert(wasDropped)
+ assert(!catalog.tableExists(testIdent))
+ }
+
+ test("dropTable: table does not exist") {
+ val catalog = newCatalog()
+
+ assert(!catalog.tableExists(testIdent))
+
+ val wasDropped = catalog.dropTable(testIdent)
+
+ assert(!wasDropped)
+ assert(!catalog.tableExists(testIdent))
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala
index 96345e2..0175212 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala
@@ -21,9 +21,10 @@ import scala.collection.JavaConverters._
import org.scalatest.BeforeAndAfter
-import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalog.v2.Identifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog
import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{LongType, StringType, StructType}
@@ -37,7 +38,7 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn
before {
spark.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName)
spark.conf.set("spark.sql.catalog.testcat2", classOf[TestInMemoryTableCatalog].getName)
- spark.conf.set("spark.sql.default.catalog", "testcat")
+ spark.conf.set("spark.sql.catalog.session", classOf[TestInMemoryTableCatalog].getName)
val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data")
df.createOrReplaceTempView("source")
@@ -47,8 +48,7 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn
after {
spark.catalog("testcat").asInstanceOf[TestInMemoryTableCatalog].clearTables()
- spark.sql("DROP TABLE source")
- spark.sql("DROP TABLE source2")
+ spark.catalog("session").asInstanceOf[TestInMemoryTableCatalog].clearTables()
}
test("CreateTable: use v2 plan because catalog is set") {
@@ -66,13 +66,13 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn
checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Seq.empty)
}
- test("CreateTable: use v2 plan because provider is v2") {
+ test("CreateTable: use v2 plan and session catalog when provider is v2") {
spark.sql(s"CREATE TABLE table_name (id bigint, data string) USING $orc2")
- val testCatalog = spark.catalog("testcat").asTableCatalog
+ val testCatalog = spark.catalog("session").asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
- assert(table.name == "testcat.table_name")
+ assert(table.name == "session.table_name")
assert(table.partitioning.isEmpty)
assert(table.properties == Map("provider" -> orc2).asJava)
assert(table.schema == new StructType().add("id", LongType).add("data", StringType))
@@ -137,22 +137,23 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn
checkAnswer(spark.internalCreateDataFrame(rdd2, table.schema), Seq.empty)
}
- test("CreateTable: fail analysis when default catalog is needed but missing") {
- val originalDefaultCatalog = conf.getConfString("spark.sql.default.catalog")
- try {
- conf.unsetConf("spark.sql.default.catalog")
+ test("CreateTable: use default catalog for v2 sources when default catalog is set") {
+ val sparkSession = spark.newSession()
+ sparkSession.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName)
+ sparkSession.conf.set("spark.sql.default.catalog", "testcat")
+ sparkSession.sql(s"CREATE TABLE table_name (id bigint, data string) USING foo")
- val exc = intercept[AnalysisException] {
- spark.sql(s"CREATE TABLE table_name USING $orc2 AS SELECT id, data FROM source")
- }
+ val testCatalog = sparkSession.catalog("testcat").asTableCatalog
+ val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
- assert(exc.getMessage.contains("No catalog specified for table"))
- assert(exc.getMessage.contains("table_name"))
- assert(exc.getMessage.contains("no default catalog is set"))
+ assert(table.name == "testcat.table_name")
+ assert(table.partitioning.isEmpty)
+ assert(table.properties == Map("provider" -> "foo").asJava)
+ assert(table.schema == new StructType().add("id", LongType).add("data", StringType))
- } finally {
- conf.setConfString("spark.sql.default.catalog", originalDefaultCatalog)
- }
+ // check that the table is empty
+ val rdd = sparkSession.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
+ checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Seq.empty)
}
test("CreateTableAsSelect: use v2 plan because catalog is set") {
@@ -172,13 +173,13 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn
checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source"))
}
- test("CreateTableAsSelect: use v2 plan because provider is v2") {
+ test("CreateTableAsSelect: use v2 plan and session catalog when provider is v2") {
spark.sql(s"CREATE TABLE table_name USING $orc2 AS SELECT id, data FROM source")
- val testCatalog = spark.catalog("testcat").asTableCatalog
+ val testCatalog = spark.catalog("session").asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
- assert(table.name == "testcat.table_name")
+ assert(table.name == "session.table_name")
assert(table.partitioning.isEmpty)
assert(table.properties == Map("provider" -> orc2).asJava)
assert(table.schema == new StructType()
@@ -251,22 +252,43 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn
checkAnswer(spark.internalCreateDataFrame(rdd2, table.schema), spark.table("source"))
}
- test("CreateTableAsSelect: fail analysis when default catalog is needed but missing") {
- val originalDefaultCatalog = conf.getConfString("spark.sql.default.catalog")
- try {
- conf.unsetConf("spark.sql.default.catalog")
+ test("CreateTableAsSelect: use default catalog for v2 sources when default catalog is set") {
+ val sparkSession = spark.newSession()
+ sparkSession.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName)
+ sparkSession.conf.set("spark.sql.default.catalog", "testcat")
- val exc = intercept[AnalysisException] {
- spark.sql(s"CREATE TABLE table_name USING $orc2 AS SELECT id, data FROM source")
- }
+ val df = sparkSession.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data")
+ df.createOrReplaceTempView("source")
- assert(exc.getMessage.contains("No catalog specified for table"))
- assert(exc.getMessage.contains("table_name"))
- assert(exc.getMessage.contains("no default catalog is set"))
+ // setting the default catalog breaks the reference to source because the default catalog is
+ // used and AsTableIdentifier no longer matches
+ sparkSession.sql(s"CREATE TABLE table_name USING foo AS SELECT id, data FROM source")
- } finally {
- conf.setConfString("spark.sql.default.catalog", originalDefaultCatalog)
- }
+ val testCatalog = sparkSession.catalog("testcat").asTableCatalog
+ val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+
+ assert(table.name == "testcat.table_name")
+ assert(table.partitioning.isEmpty)
+ assert(table.properties == Map("provider" -> "foo").asJava)
+ assert(table.schema == new StructType()
+ .add("id", LongType, nullable = false)
+ .add("data", StringType))
+
+ val rdd = sparkSession.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
+ checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), sparkSession.table("source"))
+ }
+
+ test("CreateTableAsSelect: v2 session catalog can load v1 source table") {
+ val sparkSession = spark.newSession()
+ sparkSession.conf.set("spark.sql.catalog.session", classOf[V2SessionCatalog].getName)
+
+ val df = sparkSession.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data")
+ df.createOrReplaceTempView("source")
+
+ sparkSession.sql(s"CREATE TABLE table_name USING parquet AS SELECT id, data FROM source")
+
+ // use the catalog name to force loading with the v2 catalog
+ checkAnswer(sparkSession.sql(s"TABLE session.table_name"), sparkSession.table("source"))
}
test("DropTable: basic") {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
index b04b3f1..2fa1088 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
@@ -74,7 +74,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
new FindDataSourceTable(session) +:
new ResolveSQLOnFile(session) +:
new FallBackFileSourceV2(session) +:
- DataSourceResolution(conf, session.catalog(_)) +:
+ DataSourceResolution(conf, this) +:
customResolutionRules
override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org