You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by br...@apache.org on 2019/09/03 20:13:52 UTC
[spark] branch master updated: [SPARK-28628][SQL] Implement
SupportsNamespaces in V2SessionCatalog
This is an automated email from the ASF dual-hosted git repository.
brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5ea134c [SPARK-28628][SQL] Implement SupportsNamespaces in V2SessionCatalog
5ea134c is described below
commit 5ea134c3546aa0512a85cc2970d38f5e0345edde
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Tue Sep 3 13:13:27 2019 -0700
[SPARK-28628][SQL] Implement SupportsNamespaces in V2SessionCatalog
## What changes were proposed in this pull request?
This adds namespace support to V2SessionCatalog.
## How was this patch tested?
WIP: will add tests for v2 session catalog namespace methods.
Closes #25363 from rdblue/SPARK-28628-support-namespaces-in-v2-session-catalog.
Authored-by: Ryan Blue <bl...@apache.org>
Signed-off-by: Burak Yavuz <br...@gmail.com>
---
.../datasources/v2/V2SessionCatalog.scala | 136 +++++++-
.../datasources/v2/V2SessionCatalogSuite.scala | 347 +++++++++++++++++++--
2 files changed, 451 insertions(+), 32 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index 6dcebe295..6f8cf47 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -17,18 +17,20 @@
package org.apache.spark.sql.execution.datasources.v2
+import java.net.URI
import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog, TableChange}
-import org.apache.spark.sql.catalog.v2.expressions.{BucketTransform, FieldReference, IdentityTransform, LogicalExpressions, Transform}
+import org.apache.spark.sql.catalog.v2.{Identifier, NamespaceChange, SupportsNamespaces, TableCatalog, TableChange}
+import org.apache.spark.sql.catalog.v2.NamespaceChange.{RemoveProperty, SetProperty}
+import org.apache.spark.sql.catalog.v2.expressions.{BucketTransform, FieldReference, IdentityTransform, Transform}
import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, SessionCatalog}
+import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable, CatalogTableType, CatalogUtils, SessionCatalog}
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.internal.SessionState
import org.apache.spark.sql.sources.v2.Table
@@ -39,11 +41,16 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
/**
* A [[TableCatalog]] that translates calls to the v1 SessionCatalog.
*/
-class V2SessionCatalog(sessionState: SessionState) extends TableCatalog {
+class V2SessionCatalog(sessionState: SessionState) extends TableCatalog with SupportsNamespaces {
+ import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
+ import V2SessionCatalog._
+
def this() = {
this(SparkSession.active.sessionState)
}
+ override val defaultNamespace: Array[String] = Array("default")
+
private lazy val catalog: SessionCatalog = sessionState.catalog
private var _name: String = _
@@ -87,7 +94,7 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog {
val (partitionColumns, maybeBucketSpec) = V2SessionCatalog.convertTransforms(partitions)
val provider = properties.getOrDefault("provider", sessionState.conf.defaultDataSourceName)
val tableProperties = properties.asScala
- val location = Option(properties.get("location"))
+ val location = Option(properties.get(LOCATION_TABLE_PROP))
val storage = DataSource.buildStorageFormatFromOptions(tableProperties.toMap)
.copy(locationUri = location.map(CatalogUtils.stringToURI))
val tableType = if (location.isDefined) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED
@@ -102,7 +109,7 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog {
bucketSpec = maybeBucketSpec,
properties = tableProperties.toMap,
tracksPartitionsInCatalog = sessionState.conf.manageFilesourcePartitions,
- comment = Option(properties.get("comment")))
+ comment = Option(properties.get(COMMENT_TABLE_PROP)))
try {
catalog.createTable(tableDesc, ignoreIfExists = false)
@@ -177,10 +184,97 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog {
}
}
+ override def namespaceExists(namespace: Array[String]): Boolean = namespace match {
+ case Array(db) =>
+ catalog.databaseExists(db)
+ case _ =>
+ false
+ }
+
+ override def listNamespaces(): Array[Array[String]] = {
+ catalog.listDatabases().map(Array(_)).toArray
+ }
+
+ override def listNamespaces(namespace: Array[String]): Array[Array[String]] = {
+ namespace match {
+ case Array() =>
+ listNamespaces()
+ case Array(db) if catalog.databaseExists(db) =>
+ Array()
+ case _ =>
+ throw new NoSuchNamespaceException(namespace)
+ }
+ }
+
+ override def loadNamespaceMetadata(namespace: Array[String]): util.Map[String, String] = {
+ namespace match {
+ case Array(db) =>
+ catalog.getDatabaseMetadata(db).toMetadata
+
+ case _ =>
+ throw new NoSuchNamespaceException(namespace)
+ }
+ }
+
+ override def createNamespace(
+ namespace: Array[String],
+ metadata: util.Map[String, String]): Unit = namespace match {
+ case Array(db) if !catalog.databaseExists(db) =>
+ catalog.createDatabase(
+ toCatalogDatabase(db, metadata, defaultLocation = Some(catalog.getDefaultDBPath(db))),
+ ignoreIfExists = false)
+
+ case Array(_) =>
+ throw new NamespaceAlreadyExistsException(namespace)
+
+ case _ =>
+ throw new IllegalArgumentException(s"Invalid namespace name: ${namespace.quoted}")
+ }
+
+ override def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit = {
+ namespace match {
+ case Array(db) =>
+ // validate that this catalog's reserved properties are not removed
+ changes.foreach {
+ case remove: RemoveProperty if RESERVED_PROPERTIES.contains(remove.property) =>
+ throw new UnsupportedOperationException(
+ s"Cannot remove reserved property: ${remove.property}")
+ case _ =>
+ }
+
+ val metadata = catalog.getDatabaseMetadata(db).toMetadata
+ catalog.alterDatabase(
+ toCatalogDatabase(db, CatalogV2Util.applyNamespaceChanges(metadata, changes)))
+
+ case _ =>
+ throw new NoSuchNamespaceException(namespace)
+ }
+ }
+
+ override def dropNamespace(namespace: Array[String]): Boolean = namespace match {
+ case Array(db) if catalog.databaseExists(db) =>
+ if (catalog.listTables(db).nonEmpty) {
+ throw new IllegalStateException(s"Namespace ${namespace.quoted} is not empty")
+ }
+ catalog.dropDatabase(db, ignoreIfNotExists = false, cascade = false)
+ true
+
+ case Array(_) =>
+ // exists returned false
+ false
+
+ case _ =>
+ throw new NoSuchNamespaceException(namespace)
+ }
+
override def toString: String = s"V2SessionCatalog($name)"
}
private[sql] object V2SessionCatalog {
+ val COMMENT_TABLE_PROP: String = "comment"
+ val LOCATION_TABLE_PROP: String = "location"
+ val RESERVED_PROPERTIES: Set[String] = Set(COMMENT_TABLE_PROP, LOCATION_TABLE_PROP)
+
/**
* Convert v2 Transforms to v1 partition columns and an optional bucket spec.
*/
@@ -202,4 +296,32 @@ private[sql] object V2SessionCatalog {
(identityCols, bucketSpec)
}
+
+ private def toCatalogDatabase(
+ db: String,
+ metadata: util.Map[String, String],
+ defaultLocation: Option[URI] = None): CatalogDatabase = {
+ CatalogDatabase(
+ name = db,
+ description = metadata.getOrDefault(COMMENT_TABLE_PROP, ""),
+ locationUri = Option(metadata.get(LOCATION_TABLE_PROP))
+ .map(CatalogUtils.stringToURI)
+ .orElse(defaultLocation)
+ .getOrElse(throw new IllegalArgumentException("Missing database location")),
+ properties = metadata.asScala.toMap -- Seq("comment", "location"))
+ }
+
+ private implicit class CatalogDatabaseHelper(catalogDatabase: CatalogDatabase) {
+ def toMetadata: util.Map[String, String] = {
+ val metadata = mutable.HashMap[String, String]()
+
+ catalogDatabase.properties.foreach {
+ case (key, value) => metadata.put(key, value)
+ }
+ metadata.put(LOCATION_TABLE_PROP, catalogDatabase.locationUri.toString)
+ metadata.put(COMMENT_TABLE_PROP, catalogDatabase.description)
+
+ metadata.asJava
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
index 64460d0..275bc33 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
@@ -22,41 +22,56 @@ import java.util.Collections
import scala.collection.JavaConverters._
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
+import org.scalatest.BeforeAndAfter
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalog.v2.{Catalogs, Identifier, TableCatalog, TableChange}
-import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.catalog.v2.{Catalogs, Identifier, NamespaceChange, TableChange}
+import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
-class V2SessionCatalogSuite
- extends SparkFunSuite with SharedSparkSession with BeforeAndAfter {
- import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
+class V2SessionCatalogBaseSuite extends SparkFunSuite with SharedSparkSession with BeforeAndAfter {
- private val emptyProps: util.Map[String, String] = Collections.emptyMap[String, String]
- private val schema: StructType = new StructType()
+ val emptyProps: util.Map[String, String] = Collections.emptyMap[String, String]
+ val schema: StructType = new StructType()
.add("id", IntegerType)
.add("data", StringType)
+ val testNs: Array[String] = Array("db")
+ val defaultNs: Array[String] = Array("default")
+ val testIdent: Identifier = Identifier.of(testNs, "test_table")
+
+ def newCatalog(): V2SessionCatalog = {
+ val newCatalog = new V2SessionCatalog(spark.sessionState)
+ newCatalog.initialize("test", CaseInsensitiveStringMap.empty())
+ newCatalog
+ }
+}
+
+class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
+
+ import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
+
override protected def beforeAll(): Unit = {
super.beforeAll()
- spark.sql("""CREATE DATABASE IF NOT EXISTS db""")
- spark.sql("""CREATE DATABASE IF NOT EXISTS db2""")
- spark.sql("""CREATE DATABASE IF NOT EXISTS ns""")
- spark.sql("""CREATE DATABASE IF NOT EXISTS ns2""")
+ // TODO: when there is a public API for v2 catalogs, use that instead
+ val catalog = newCatalog()
+ catalog.createNamespace(Array("db"), emptyProps)
+ catalog.createNamespace(Array("db2"), emptyProps)
+ catalog.createNamespace(Array("ns"), emptyProps)
+ catalog.createNamespace(Array("ns2"), emptyProps)
}
override protected def afterAll(): Unit = {
- spark.sql("""DROP TABLE IF EXISTS db.test_table""")
- spark.sql("""DROP DATABASE IF EXISTS db""")
- spark.sql("""DROP DATABASE IF EXISTS db2""")
- spark.sql("""DROP DATABASE IF EXISTS ns""")
- spark.sql("""DROP DATABASE IF EXISTS ns2""")
+ val catalog = newCatalog()
+ catalog.dropNamespace(Array("db"))
+ catalog.dropNamespace(Array("db2"))
+ catalog.dropNamespace(Array("ns"))
+ catalog.dropNamespace(Array("ns2"))
super.afterAll()
}
@@ -65,14 +80,6 @@ class V2SessionCatalogSuite
newCatalog().dropTable(testIdentNew)
}
- private def newCatalog(): TableCatalog = {
- val newCatalog = new V2SessionCatalog(spark.sessionState)
- newCatalog.initialize("test", CaseInsensitiveStringMap.empty())
- newCatalog
- }
-
- private val testNs = Array("db")
- private val testIdent = Identifier.of(testNs, "test_table")
private val testIdentNew = Identifier.of(testNs, "test_table_new")
test("Catalogs can load the catalog") {
@@ -753,3 +760,293 @@ class V2SessionCatalogSuite
assert(exc.message.contains("RENAME TABLE source and destination databases do not match"))
}
}
+
+class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
+
+ import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
+
+ def checkMetadata(
+ expected: scala.collection.Map[String, String],
+ actual: scala.collection.Map[String, String]): Unit = {
+ // remove location and comment that are automatically added by HMS unless they are expected
+ val toRemove = V2SessionCatalog.RESERVED_PROPERTIES.filter(expected.contains)
+ assert(expected -- toRemove === actual)
+ }
+
+ test("listNamespaces: basic behavior") {
+ val catalog = newCatalog()
+ catalog.createNamespace(testNs, Map("property" -> "value").asJava)
+
+ assert(catalog.listNamespaces() === Array(testNs, defaultNs))
+ assert(catalog.listNamespaces(Array()) === Array(testNs, defaultNs))
+ assert(catalog.listNamespaces(testNs) === Array())
+
+ catalog.dropNamespace(testNs)
+ }
+
+ test("listNamespaces: fail if missing namespace") {
+ val catalog = newCatalog()
+
+ assert(catalog.namespaceExists(testNs) === false)
+
+ val exc = intercept[NoSuchNamespaceException] {
+ assert(catalog.listNamespaces(testNs) === Array())
+ }
+
+ assert(exc.getMessage.contains(testNs.quoted))
+ assert(catalog.namespaceExists(testNs) === false)
+ }
+
+ test("loadNamespaceMetadata: fail missing namespace") {
+ val catalog = newCatalog()
+
+ val exc = intercept[NoSuchNamespaceException] {
+ catalog.loadNamespaceMetadata(testNs)
+ }
+
+ assert(exc.getMessage.contains(testNs.quoted))
+ }
+
+ test("loadNamespaceMetadata: non-empty metadata") {
+ val catalog = newCatalog()
+
+ assert(catalog.namespaceExists(testNs) === false)
+
+ catalog.createNamespace(testNs, Map("property" -> "value").asJava)
+
+ val metadata = catalog.loadNamespaceMetadata(testNs)
+
+ assert(catalog.namespaceExists(testNs) === true)
+ checkMetadata(metadata.asScala, Map("property" -> "value"))
+
+ catalog.dropNamespace(testNs)
+ }
+
+ test("loadNamespaceMetadata: empty metadata") {
+ val catalog = newCatalog()
+
+ assert(catalog.namespaceExists(testNs) === false)
+
+ catalog.createNamespace(testNs, emptyProps)
+
+ val metadata = catalog.loadNamespaceMetadata(testNs)
+
+ assert(catalog.namespaceExists(testNs) === true)
+ checkMetadata(metadata.asScala, emptyProps.asScala)
+
+ catalog.dropNamespace(testNs)
+ }
+
+ test("createNamespace: basic behavior") {
+ val catalog = newCatalog()
+ val expectedPath = sqlContext.sessionState.catalog.getDefaultDBPath(testNs(0)).toString
+
+ catalog.createNamespace(testNs, Map("property" -> "value").asJava)
+
+ assert(expectedPath === spark.catalog.getDatabase(testNs(0)).locationUri.toString)
+
+ assert(catalog.namespaceExists(testNs) === true)
+ val metadata = catalog.loadNamespaceMetadata(testNs).asScala
+ checkMetadata(metadata, Map("property" -> "value"))
+ assert(expectedPath === metadata("location"))
+
+ catalog.dropNamespace(testNs)
+ }
+
+ test("createNamespace: initialize location") {
+ val catalog = newCatalog()
+ val expectedPath = "file:/tmp/db.db"
+
+ catalog.createNamespace(testNs, Map("location" -> expectedPath).asJava)
+
+ assert(expectedPath === spark.catalog.getDatabase(testNs(0)).locationUri.toString)
+
+ assert(catalog.namespaceExists(testNs) === true)
+ val metadata = catalog.loadNamespaceMetadata(testNs).asScala
+ checkMetadata(metadata, Map.empty)
+ assert(expectedPath === metadata("location"))
+
+ catalog.dropNamespace(testNs)
+ }
+
+ test("createNamespace: fail if namespace already exists") {
+ val catalog = newCatalog()
+
+ catalog.createNamespace(testNs, Map("property" -> "value").asJava)
+
+ val exc = intercept[NamespaceAlreadyExistsException] {
+ catalog.createNamespace(testNs, Map("property" -> "value2").asJava)
+ }
+
+ assert(exc.getMessage.contains(testNs.quoted))
+ assert(catalog.namespaceExists(testNs) === true)
+ checkMetadata(catalog.loadNamespaceMetadata(testNs).asScala, Map("property" -> "value"))
+
+ catalog.dropNamespace(testNs)
+ }
+
+ test("createNamespace: fail nested namespace") {
+ val catalog = newCatalog()
+
+ // ensure the parent exists
+ catalog.createNamespace(Array("db"), emptyProps)
+
+ val exc = intercept[IllegalArgumentException] {
+ catalog.createNamespace(Array("db", "nested"), emptyProps)
+ }
+
+ assert(exc.getMessage.contains("Invalid namespace name: db.nested"))
+
+ catalog.dropNamespace(Array("db"))
+ }
+
+ test("createTable: fail if namespace does not exist") {
+ val catalog = newCatalog()
+
+ assert(catalog.namespaceExists(testNs) === false)
+
+ val exc = intercept[NoSuchNamespaceException] {
+ catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ }
+
+ assert(exc.getMessage.contains(testNs.quoted))
+ assert(catalog.namespaceExists(testNs) === false)
+ }
+
+ test("dropNamespace: drop missing namespace") {
+ val catalog = newCatalog()
+
+ assert(catalog.namespaceExists(testNs) === false)
+
+ val ret = catalog.dropNamespace(testNs)
+
+ assert(ret === false)
+ }
+
+ test("dropNamespace: drop empty namespace") {
+ val catalog = newCatalog()
+
+ catalog.createNamespace(testNs, emptyProps)
+
+ assert(catalog.namespaceExists(testNs) === true)
+
+ val ret = catalog.dropNamespace(testNs)
+
+ assert(ret === true)
+ assert(catalog.namespaceExists(testNs) === false)
+ }
+
+ test("dropNamespace: fail if not empty") {
+ val catalog = newCatalog()
+
+ catalog.createNamespace(testNs, Map("property" -> "value").asJava)
+ catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ val exc = intercept[IllegalStateException] {
+ catalog.dropNamespace(testNs)
+ }
+
+ assert(exc.getMessage.contains(testNs.quoted))
+ assert(catalog.namespaceExists(testNs) === true)
+ checkMetadata(catalog.loadNamespaceMetadata(testNs).asScala, Map("property" -> "value"))
+
+ catalog.dropTable(testIdent)
+ catalog.dropNamespace(testNs)
+ }
+
+ test("alterNamespace: basic behavior") {
+ val catalog = newCatalog()
+
+ catalog.createNamespace(testNs, Map("property" -> "value").asJava)
+
+ catalog.alterNamespace(testNs, NamespaceChange.setProperty("property2", "value2"))
+ checkMetadata(
+ catalog.loadNamespaceMetadata(testNs).asScala,
+ Map("property" -> "value", "property2" -> "value2"))
+
+ catalog.alterNamespace(testNs,
+ NamespaceChange.removeProperty("property2"),
+ NamespaceChange.setProperty("property3", "value3"))
+ checkMetadata(
+ catalog.loadNamespaceMetadata(testNs).asScala,
+ Map("property" -> "value", "property3" -> "value3"))
+
+ catalog.alterNamespace(testNs, NamespaceChange.removeProperty("property3"))
+ checkMetadata(
+ catalog.loadNamespaceMetadata(testNs).asScala,
+ Map("property" -> "value"))
+
+ catalog.dropNamespace(testNs)
+ }
+
+ test("alterNamespace: update namespace location") {
+ val catalog = newCatalog()
+ val initialPath = sqlContext.sessionState.catalog.getDefaultDBPath(testNs(0)).toString
+ val newPath = "file:/tmp/db.db"
+
+ catalog.createNamespace(testNs, emptyProps)
+
+ assert(initialPath === spark.catalog.getDatabase(testNs(0)).locationUri.toString)
+
+ catalog.alterNamespace(testNs, NamespaceChange.setProperty("location", newPath))
+
+ assert(newPath === spark.catalog.getDatabase(testNs(0)).locationUri.toString)
+
+ catalog.dropNamespace(testNs)
+ }
+
+ test("alterNamespace: update namespace comment") {
+ val catalog = newCatalog()
+ val newComment = "test db"
+
+ catalog.createNamespace(testNs, emptyProps)
+
+ assert(spark.catalog.getDatabase(testNs(0)).description.isEmpty)
+
+ catalog.alterNamespace(testNs, NamespaceChange.setProperty("comment", newComment))
+
+ assert(newComment === spark.catalog.getDatabase(testNs(0)).description)
+
+ catalog.dropNamespace(testNs)
+ }
+
+ test("alterNamespace: fail if namespace doesn't exist") {
+ val catalog = newCatalog()
+
+ assert(catalog.namespaceExists(testNs) === false)
+
+ val exc = intercept[NoSuchNamespaceException] {
+ catalog.alterNamespace(testNs, NamespaceChange.setProperty("property", "value"))
+ }
+
+ assert(exc.getMessage.contains(testNs.quoted))
+ }
+
+ test("alterNamespace: fail to remove location") {
+ val catalog = newCatalog()
+
+ catalog.createNamespace(testNs, emptyProps)
+
+ val exc = intercept[UnsupportedOperationException] {
+ catalog.alterNamespace(testNs, NamespaceChange.removeProperty("location"))
+ }
+
+ assert(exc.getMessage.contains("Cannot remove reserved property: location"))
+
+ catalog.dropNamespace(testNs)
+ }
+
+ test("alterNamespace: fail to remove comment") {
+ val catalog = newCatalog()
+
+ catalog.createNamespace(testNs, Map("comment" -> "test db").asJava)
+
+ val exc = intercept[UnsupportedOperationException] {
+ catalog.alterNamespace(testNs, NamespaceChange.removeProperty("comment"))
+ }
+
+ assert(exc.getMessage.contains("Cannot remove reserved property: comment"))
+
+ catalog.dropNamespace(testNs)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org