You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/04/27 06:29:31 UTC
[1/2] spark git commit: [SPARK-13477][SQL] Expose new user-facing
Catalog interface
Repository: spark
Updated Branches:
refs/heads/master d93976d86 -> d8a83a564
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 6b1d413..855e7e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -183,8 +183,8 @@ case class ShowPartitionsCommand(
* 2. If it is a datasource table.
* 3. If it is a view or index table.
*/
- if (tab.tableType == CatalogTableType.VIRTUAL_VIEW ||
- tab.tableType == CatalogTableType.INDEX_TABLE) {
+ if (tab.tableType == CatalogTableType.VIEW ||
+ tab.tableType == CatalogTableType.INDEX) {
throw new AnalysisException("SHOW PARTITIONS is not allowed on a view or index table: " +
s"${tab.qualifiedName}")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 31900b4..f670f63 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -323,10 +323,10 @@ object CreateDataSourceTableUtils extends Logging {
val tableType = if (isExternal) {
tableProperties.put("EXTERNAL", "TRUE")
- CatalogTableType.EXTERNAL_TABLE
+ CatalogTableType.EXTERNAL
} else {
tableProperties.put("EXTERNAL", "FALSE")
- CatalogTableType.MANAGED_TABLE
+ CatalogTableType.MANAGED
}
val maybeSerDe = HiveSerDe.sourceToSerDe(provider, sparkSession.sessionState.conf)
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index ecde332..12167ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -204,10 +204,10 @@ case class DropTable(
// If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view
// issue an exception.
catalog.getTableMetadataOption(tableName).map(_.tableType match {
- case CatalogTableType.VIRTUAL_VIEW if !isView =>
+ case CatalogTableType.VIEW if !isView =>
throw new AnalysisException(
"Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
- case o if o != CatalogTableType.VIRTUAL_VIEW && isView =>
+ case o if o != CatalogTableType.VIEW && isView =>
throw new AnalysisException(
s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")
case _ =>
@@ -527,10 +527,10 @@ private[sql] object DDLUtils {
tableIdentifier: TableIdentifier,
isView: Boolean): Unit = {
catalog.getTableMetadataOption(tableIdentifier).map(_.tableType match {
- case CatalogTableType.VIRTUAL_VIEW if !isView =>
+ case CatalogTableType.VIEW if !isView =>
throw new AnalysisException(
"Cannot alter a view with ALTER TABLE. Please use ALTER VIEW instead")
- case o if o != CatalogTableType.VIRTUAL_VIEW && isView =>
+ case o if o != CatalogTableType.VIEW && isView =>
throw new AnalysisException(
s"Cannot alter a table with ALTER VIEW. Please use ALTER TABLE instead")
case _ =>
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 700a704..8d9feec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -73,7 +73,7 @@ case class CreateTableLike(
val tableToCreate = catalog.getTableMetadata(sourceTable).copy(
identifier = targetTable,
- tableType = CatalogTableType.MANAGED_TABLE,
+ tableType = CatalogTableType.MANAGED,
createTime = System.currentTimeMillis,
lastAccessTime = -1).withNewStorage(locationUri = None)
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index f42b56f..1641780 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -52,7 +52,7 @@ case class CreateViewCommand(
override def output: Seq[Attribute] = Seq.empty[Attribute]
- require(tableDesc.tableType == CatalogTableType.VIRTUAL_VIEW)
+ require(tableDesc.tableType == CatalogTableType.VIEW)
require(tableDesc.viewText.isDefined)
private val tableIdentifier = tableDesc.identifier
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
new file mode 100644
index 0000000..976c9c5
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import scala.collection.JavaConverters._
+import scala.reflect.runtime.universe.TypeTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.catalog.{Catalog, Column, Database, Function, Table}
+import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.execution.datasources.CreateTableUsing
+import org.apache.spark.sql.types.StructType
+
+
+/**
+ * Internal implementation of the user-facing [[Catalog]].
+ */
+class CatalogImpl(sparkSession: SparkSession) extends Catalog {
+
+ private def sessionCatalog: SessionCatalog = sparkSession.sessionState.catalog
+
+ private def requireDatabaseExists(dbName: String): Unit = {
+ if (!sessionCatalog.databaseExists(dbName)) {
+ throw new AnalysisException(s"Database '$dbName' does not exist.")
+ }
+ }
+
+ private def requireTableExists(dbName: String, tableName: String): Unit = {
+ if (!sessionCatalog.tableExists(TableIdentifier(tableName, Some(dbName)))) {
+ throw new AnalysisException(s"Table '$tableName' does not exist in database '$dbName'.")
+ }
+ }
+
+ private def makeDataset[T <: DefinedByConstructorParams: TypeTag](data: Seq[T]): Dataset[T] = {
+ val enc = ExpressionEncoder[T]()
+ val encoded = data.map(d => enc.toRow(d).copy())
+ val plan = new LocalRelation(enc.schema.toAttributes, encoded)
+ val queryExecution = sparkSession.executePlan(plan)
+ new Dataset[T](sparkSession, queryExecution, enc)
+ }
+
+ /**
+ * Returns the current default database in this session.
+ */
+ override def currentDatabase: String = sessionCatalog.getCurrentDatabase
+
+ /**
+ * Sets the current default database in this session.
+ */
+ @throws[AnalysisException]("database does not exist")
+ override def setCurrentDatabase(dbName: String): Unit = {
+ requireDatabaseExists(dbName)
+ sessionCatalog.setCurrentDatabase(dbName)
+ }
+
+ /**
+ * Returns a list of databases available across all sessions.
+ */
+ override def listDatabases(): Dataset[Database] = {
+ val databases = sessionCatalog.listDatabases().map { dbName =>
+ val metadata = sessionCatalog.getDatabaseMetadata(dbName)
+ new Database(
+ name = metadata.name,
+ description = metadata.description,
+ locationUri = metadata.locationUri)
+ }
+ makeDataset(databases)
+ }
+
+ /**
+ * Returns a list of tables in the current database.
+ * This includes all temporary tables.
+ */
+ override def listTables(): Dataset[Table] = {
+ listTables(currentDatabase)
+ }
+
+ /**
+ * Returns a list of tables in the specified database.
+ * This includes all temporary tables.
+ */
+ @throws[AnalysisException]("database does not exist")
+ override def listTables(dbName: String): Dataset[Table] = {
+ requireDatabaseExists(dbName)
+ val tables = sessionCatalog.listTables(dbName).map { tableIdent =>
+ val isTemp = tableIdent.database.isEmpty
+ val metadata = if (isTemp) None else Some(sessionCatalog.getTableMetadata(tableIdent))
+ new Table(
+ name = tableIdent.identifier,
+ database = metadata.flatMap(_.identifier.database).orNull,
+ description = metadata.flatMap(_.comment).orNull,
+ tableType = metadata.map(_.tableType.name).getOrElse("TEMPORARY"),
+ isTemporary = isTemp)
+ }
+ makeDataset(tables)
+ }
+
+ /**
+ * Returns a list of functions registered in the current database.
+ * This includes all temporary functions
+ */
+ override def listFunctions(): Dataset[Function] = {
+ listFunctions(currentDatabase)
+ }
+
+ /**
+ * Returns a list of functions registered in the specified database.
+ * This includes all temporary functions
+ */
+ @throws[AnalysisException]("database does not exist")
+ override def listFunctions(dbName: String): Dataset[Function] = {
+ requireDatabaseExists(dbName)
+ val functions = sessionCatalog.listFunctions(dbName).map { funcIdent =>
+ val metadata = sessionCatalog.lookupFunctionInfo(funcIdent)
+ new Function(
+ name = funcIdent.identifier,
+ description = null, // for now, this is always undefined
+ className = metadata.getClassName,
+ isTemporary = funcIdent.database.isEmpty)
+ }
+ makeDataset(functions)
+ }
+
+ /**
+ * Returns a list of columns for the given table in the current database.
+ */
+ @throws[AnalysisException]("table does not exist")
+ override def listColumns(tableName: String): Dataset[Column] = {
+ listColumns(currentDatabase, tableName)
+ }
+
+ /**
+ * Returns a list of columns for the given table in the specified database.
+ */
+ @throws[AnalysisException]("database or table does not exist")
+ override def listColumns(dbName: String, tableName: String): Dataset[Column] = {
+ requireTableExists(dbName, tableName)
+ val tableMetadata = sessionCatalog.getTableMetadata(TableIdentifier(tableName, Some(dbName)))
+ val partitionColumnNames = tableMetadata.partitionColumnNames.toSet
+ val bucketColumnNames = tableMetadata.bucketColumnNames.toSet
+ val columns = tableMetadata.schema.map { c =>
+ new Column(
+ name = c.name,
+ description = c.comment.orNull,
+ dataType = c.dataType,
+ nullable = c.nullable,
+ isPartition = partitionColumnNames.contains(c.name),
+ isBucket = bucketColumnNames.contains(c.name))
+ }
+ makeDataset(columns)
+ }
+
+ /**
+ * :: Experimental ::
+ * Creates an external table from the given path and returns the corresponding DataFrame.
+ * It will use the default data source configured by spark.sql.sources.default.
+ *
+ * @group ddl_ops
+ * @since 2.0.0
+ */
+ @Experimental
+ override def createExternalTable(tableName: String, path: String): DataFrame = {
+ val dataSourceName = sparkSession.sessionState.conf.defaultDataSourceName
+ createExternalTable(tableName, path, dataSourceName)
+ }
+
+ /**
+ * :: Experimental ::
+ * Creates an external table from the given path based on a data source
+ * and returns the corresponding DataFrame.
+ *
+ * @group ddl_ops
+ * @since 2.0.0
+ */
+ @Experimental
+ override def createExternalTable(tableName: String, path: String, source: String): DataFrame = {
+ createExternalTable(tableName, source, Map("path" -> path))
+ }
+
+ /**
+ * :: Experimental ::
+ * Creates an external table from the given path based on a data source and a set of options.
+ * Then, returns the corresponding DataFrame.
+ *
+ * @group ddl_ops
+ * @since 2.0.0
+ */
+ @Experimental
+ override def createExternalTable(
+ tableName: String,
+ source: String,
+ options: java.util.Map[String, String]): DataFrame = {
+ createExternalTable(tableName, source, options.asScala.toMap)
+ }
+
+ /**
+ * :: Experimental ::
+ * (Scala-specific)
+ * Creates an external table from the given path based on a data source and a set of options.
+ * Then, returns the corresponding DataFrame.
+ *
+ * @group ddl_ops
+ * @since 2.0.0
+ */
+ @Experimental
+ override def createExternalTable(
+ tableName: String,
+ source: String,
+ options: Map[String, String]): DataFrame = {
+ val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
+ val cmd =
+ CreateTableUsing(
+ tableIdent,
+ userSpecifiedSchema = None,
+ source,
+ temporary = false,
+ options,
+ allowExisting = false,
+ managedIfNoPath = false)
+ sparkSession.executePlan(cmd).toRdd
+ sparkSession.table(tableIdent)
+ }
+
+ /**
+ * :: Experimental ::
+ * Create an external table from the given path based on a data source, a schema and
+ * a set of options. Then, returns the corresponding DataFrame.
+ *
+ * @group ddl_ops
+ * @since 2.0.0
+ */
+ @Experimental
+ override def createExternalTable(
+ tableName: String,
+ source: String,
+ schema: StructType,
+ options: java.util.Map[String, String]): DataFrame = {
+ createExternalTable(tableName, source, schema, options.asScala.toMap)
+ }
+
+ /**
+ * :: Experimental ::
+ * (Scala-specific)
+ * Create an external table from the given path based on a data source, a schema and
+ * a set of options. Then, returns the corresponding DataFrame.
+ *
+ * @group ddl_ops
+ * @since 2.0.0
+ */
+ @Experimental
+ override def createExternalTable(
+ tableName: String,
+ source: String,
+ schema: StructType,
+ options: Map[String, String]): DataFrame = {
+ val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
+ val cmd =
+ CreateTableUsing(
+ tableIdent,
+ userSpecifiedSchema = Some(schema),
+ source,
+ temporary = false,
+ options,
+ allowExisting = false,
+ managedIfNoPath = false)
+ sparkSession.executePlan(cmd).toRdd
+ sparkSession.table(tableIdent)
+ }
+
+ /**
+ * Drops the temporary table with the given table name in the catalog.
+ * If the table has been cached/persisted before, it's also unpersisted.
+ *
+ * @param tableName the name of the table to be unregistered.
+ * @group ddl_ops
+ * @since 2.0.0
+ */
+ override def dropTempTable(tableName: String): Unit = {
+ sparkSession.cacheManager.tryUncacheQuery(sparkSession.table(tableName))
+ sessionCatalog.dropTable(TableIdentifier(tableName), ignoreIfNotExists = true)
+ }
+
+ /**
+ * Returns true if the table is currently cached in-memory.
+ *
+ * @group cachemgmt
+ * @since 2.0.0
+ */
+ override def isCached(tableName: String): Boolean = {
+ sparkSession.cacheManager.lookupCachedData(sparkSession.table(tableName)).nonEmpty
+ }
+
+ /**
+ * Caches the specified table in-memory.
+ *
+ * @group cachemgmt
+ * @since 2.0.0
+ */
+ override def cacheTable(tableName: String): Unit = {
+ sparkSession.cacheManager.cacheQuery(sparkSession.table(tableName), Some(tableName))
+ }
+
+ /**
+ * Removes the specified table from the in-memory cache.
+ *
+ * @group cachemgmt
+ * @since 2.0.0
+ */
+ override def uncacheTable(tableName: String): Unit = {
+ sparkSession.cacheManager.uncacheQuery(sparkSession.table(tableName))
+ }
+
+ /**
+ * Removes all cached tables from the in-memory cache.
+ *
+ * @group cachemgmt
+ * @since 2.0.0
+ */
+ override def clearCache(): Unit = {
+ sparkSession.cacheManager.clearCache()
+ }
+
+ /**
+ * Returns true if the [[Dataset]] is currently cached in-memory.
+ *
+ * @group cachemgmt
+ * @since 2.0.0
+ */
+ protected[sql] def isCached(qName: Dataset[_]): Boolean = {
+ sparkSession.cacheManager.lookupCachedData(qName).nonEmpty
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index e601ff1..58330c4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -69,7 +69,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
private def createTable(catalog: SessionCatalog, name: TableIdentifier): Unit = {
catalog.createTable(CatalogTable(
identifier = name,
- tableType = CatalogTableType.EXTERNAL_TABLE,
+ tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat(None, None, None, None, Map()),
schema = Seq()), ignoreIfExists = false)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
new file mode 100644
index 0000000..986d8f5
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalog.{Column, Database, Function, Table}
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
+import org.apache.spark.sql.catalyst.plans.logical.Range
+import org.apache.spark.sql.test.SharedSQLContext
+
+
+/**
+ * Tests for the user-facing [[org.apache.spark.sql.catalog.Catalog]].
+ */
+class CatalogSuite
+ extends SparkFunSuite
+ with BeforeAndAfterEach
+ with SharedSQLContext {
+
+ private def sparkSession: SparkSession = sqlContext.sparkSession
+ private def sessionCatalog: SessionCatalog = sparkSession.sessionState.catalog
+
+ private val utils = new CatalogTestUtils {
+ override val tableInputFormat: String = "com.fruit.eyephone.CameraInputFormat"
+ override val tableOutputFormat: String = "com.fruit.eyephone.CameraOutputFormat"
+ override def newEmptyCatalog(): ExternalCatalog = sparkSession.sharedState.externalCatalog
+ }
+
+ private def createDatabase(name: String): Unit = {
+ sessionCatalog.createDatabase(utils.newDb(name), ignoreIfExists = false)
+ }
+
+ private def dropDatabase(name: String): Unit = {
+ sessionCatalog.dropDatabase(name, ignoreIfNotExists = false, cascade = true)
+ }
+
+ private def createTable(name: String, db: Option[String] = None): Unit = {
+ sessionCatalog.createTable(utils.newTable(name, db), ignoreIfExists = false)
+ }
+
+ private def createTempTable(name: String): Unit = {
+ sessionCatalog.createTempTable(name, Range(1, 2, 3, 4, Seq()), overrideIfExists = true)
+ }
+
+ private def dropTable(name: String, db: Option[String] = None): Unit = {
+ sessionCatalog.dropTable(TableIdentifier(name, db), ignoreIfNotExists = false)
+ }
+
+ private def createFunction(name: String, db: Option[String] = None): Unit = {
+ sessionCatalog.createFunction(utils.newFunc(name, db), ignoreIfExists = false)
+ }
+
+ private def createTempFunction(name: String): Unit = {
+ val info = new ExpressionInfo("className", name)
+ val tempFunc = (e: Seq[Expression]) => e.head
+ sessionCatalog.createTempFunction(name, info, tempFunc, ignoreIfExists = false)
+ }
+
+ private def dropFunction(name: String, db: Option[String] = None): Unit = {
+ sessionCatalog.dropFunction(FunctionIdentifier(name, db), ignoreIfNotExists = false)
+ }
+
+ private def dropTempFunction(name: String): Unit = {
+ sessionCatalog.dropTempFunction(name, ignoreIfNotExists = false)
+ }
+
+ private def testListColumns(tableName: String, dbName: Option[String]): Unit = {
+ val tableMetadata = sessionCatalog.getTableMetadata(TableIdentifier(tableName, dbName))
+ val columns = dbName
+ .map { db => sparkSession.catalog.listColumns(db, tableName) }
+ .getOrElse { sparkSession.catalog.listColumns(tableName) }
+ assume(tableMetadata.schema.nonEmpty, "bad test")
+ assume(tableMetadata.partitionColumnNames.nonEmpty, "bad test")
+ assume(tableMetadata.bucketColumnNames.nonEmpty, "bad test")
+ assert(columns.collect().map(_.name).toSet == tableMetadata.schema.map(_.name).toSet)
+ columns.collect().foreach { col =>
+ assert(col.isPartition == tableMetadata.partitionColumnNames.contains(col.name))
+ assert(col.isBucket == tableMetadata.bucketColumnNames.contains(col.name))
+ }
+ }
+
+ override def afterEach(): Unit = {
+ try {
+ sessionCatalog.reset()
+ } finally {
+ super.afterEach()
+ }
+ }
+
+ test("current database") {
+ assert(sparkSession.catalog.currentDatabase == "default")
+ assert(sessionCatalog.getCurrentDatabase == "default")
+ createDatabase("my_db")
+ sparkSession.catalog.setCurrentDatabase("my_db")
+ assert(sparkSession.catalog.currentDatabase == "my_db")
+ assert(sessionCatalog.getCurrentDatabase == "my_db")
+ val e = intercept[AnalysisException] {
+ sparkSession.catalog.setCurrentDatabase("unknown_db")
+ }
+ assert(e.getMessage.contains("unknown_db"))
+ }
+
+ test("list databases") {
+ assert(sparkSession.catalog.listDatabases().collect().map(_.name).toSet == Set("default"))
+ createDatabase("my_db1")
+ createDatabase("my_db2")
+ assert(sparkSession.catalog.listDatabases().collect().map(_.name).toSet ==
+ Set("default", "my_db1", "my_db2"))
+ dropDatabase("my_db1")
+ assert(sparkSession.catalog.listDatabases().collect().map(_.name).toSet ==
+ Set("default", "my_db2"))
+ }
+
+ test("list tables") {
+ assert(sparkSession.catalog.listTables().collect().isEmpty)
+ createTable("my_table1")
+ createTable("my_table2")
+ createTempTable("my_temp_table")
+ assert(sparkSession.catalog.listTables().collect().map(_.name).toSet ==
+ Set("my_table1", "my_table2", "my_temp_table"))
+ dropTable("my_table1")
+ assert(sparkSession.catalog.listTables().collect().map(_.name).toSet ==
+ Set("my_table2", "my_temp_table"))
+ dropTable("my_temp_table")
+ assert(sparkSession.catalog.listTables().collect().map(_.name).toSet == Set("my_table2"))
+ }
+
+ test("list tables with database") {
+ assert(sparkSession.catalog.listTables("default").collect().isEmpty)
+ createDatabase("my_db1")
+ createDatabase("my_db2")
+ createTable("my_table1", Some("my_db1"))
+ createTable("my_table2", Some("my_db2"))
+ createTempTable("my_temp_table")
+ assert(sparkSession.catalog.listTables("default").collect().map(_.name).toSet ==
+ Set("my_temp_table"))
+ assert(sparkSession.catalog.listTables("my_db1").collect().map(_.name).toSet ==
+ Set("my_table1", "my_temp_table"))
+ assert(sparkSession.catalog.listTables("my_db2").collect().map(_.name).toSet ==
+ Set("my_table2", "my_temp_table"))
+ dropTable("my_table1", Some("my_db1"))
+ assert(sparkSession.catalog.listTables("my_db1").collect().map(_.name).toSet ==
+ Set("my_temp_table"))
+ assert(sparkSession.catalog.listTables("my_db2").collect().map(_.name).toSet ==
+ Set("my_table2", "my_temp_table"))
+ dropTable("my_temp_table")
+ assert(sparkSession.catalog.listTables("default").collect().map(_.name).isEmpty)
+ assert(sparkSession.catalog.listTables("my_db1").collect().map(_.name).isEmpty)
+ assert(sparkSession.catalog.listTables("my_db2").collect().map(_.name).toSet ==
+ Set("my_table2"))
+ val e = intercept[AnalysisException] {
+ sparkSession.catalog.listTables("unknown_db")
+ }
+ assert(e.getMessage.contains("unknown_db"))
+ }
+
+ test("list functions") {
+ assert(Set("+", "current_database", "window").subsetOf(
+ sparkSession.catalog.listFunctions().collect().map(_.name).toSet))
+ createFunction("my_func1")
+ createFunction("my_func2")
+ createTempFunction("my_temp_func")
+ val funcNames1 = sparkSession.catalog.listFunctions().collect().map(_.name).toSet
+ assert(funcNames1.contains("my_func1"))
+ assert(funcNames1.contains("my_func2"))
+ assert(funcNames1.contains("my_temp_func"))
+ dropFunction("my_func1")
+ dropTempFunction("my_temp_func")
+ val funcNames2 = sparkSession.catalog.listFunctions().collect().map(_.name).toSet
+ assert(!funcNames2.contains("my_func1"))
+ assert(funcNames2.contains("my_func2"))
+ assert(!funcNames2.contains("my_temp_func"))
+ }
+
+ test("list functions with database") {
+ assert(Set("+", "current_database", "window").subsetOf(
+ sparkSession.catalog.listFunctions("default").collect().map(_.name).toSet))
+ createDatabase("my_db1")
+ createDatabase("my_db2")
+ createFunction("my_func1", Some("my_db1"))
+ createFunction("my_func2", Some("my_db2"))
+ createTempFunction("my_temp_func")
+ val funcNames1 = sparkSession.catalog.listFunctions("my_db1").collect().map(_.name).toSet
+ val funcNames2 = sparkSession.catalog.listFunctions("my_db2").collect().map(_.name).toSet
+ assert(funcNames1.contains("my_func1"))
+ assert(!funcNames1.contains("my_func2"))
+ assert(funcNames1.contains("my_temp_func"))
+ assert(!funcNames2.contains("my_func1"))
+ assert(funcNames2.contains("my_func2"))
+ assert(funcNames2.contains("my_temp_func"))
+ dropFunction("my_func1", Some("my_db1"))
+ dropTempFunction("my_temp_func")
+ val funcNames1b = sparkSession.catalog.listFunctions("my_db1").collect().map(_.name).toSet
+ val funcNames2b = sparkSession.catalog.listFunctions("my_db2").collect().map(_.name).toSet
+ assert(!funcNames1b.contains("my_func1"))
+ assert(!funcNames1b.contains("my_temp_func"))
+ assert(funcNames2b.contains("my_func2"))
+ assert(!funcNames2b.contains("my_temp_func"))
+ val e = intercept[AnalysisException] {
+ sparkSession.catalog.listFunctions("unknown_db")
+ }
+ assert(e.getMessage.contains("unknown_db"))
+ }
+
+ test("list columns") {
+ createTable("tab1")
+ testListColumns("tab1", dbName = None)
+ }
+
+ test("list columns in database") {
+ createDatabase("db1")
+ createTable("tab1", Some("db1"))
+ testListColumns("tab1", dbName = Some("db1"))
+ }
+
+ test("Database.toString") {
+ assert(new Database("cool_db", "cool_desc", "cool_path").toString ==
+ "Database[name='cool_db', description='cool_desc', path='cool_path']")
+ assert(new Database("cool_db", null, "cool_path").toString ==
+ "Database[name='cool_db', path='cool_path']")
+ }
+
+ test("Table.toString") {
+ assert(new Table("volley", "databasa", "one", "world", isTemporary = true).toString ==
+ "Table[name='volley', database='databasa', description='one', " +
+ "tableType='world', isTemporary='true']")
+ assert(new Table("volley", null, null, "world", isTemporary = true).toString ==
+ "Table[name='volley', tableType='world', isTemporary='true']")
+ }
+
+ test("Function.toString") {
+ assert(new Function("nama", "commenta", "classNameAh", isTemporary = true).toString ==
+ "Function[name='nama', description='commenta', className='classNameAh', isTemporary='true']")
+ assert(new Function("nama", null, "classNameAh", isTemporary = false).toString ==
+ "Function[name='nama', className='classNameAh', isTemporary='false']")
+ }
+
+ test("Column.toString") {
+ assert(new Column("namama", "descaca", "datatapa",
+ nullable = true, isPartition = false, isBucket = true).toString ==
+ "Column[name='namama', description='descaca', dataType='datatapa', " +
+ "nullable='true', isPartition='false', isBucket='true']")
+ assert(new Column("namama", null, "datatapa",
+ nullable = false, isPartition = true, isBucket = true).toString ==
+ "Column[name='namama', dataType='datatapa', " +
+ "nullable='false', isPartition='true', isBucket='true']")
+ }
+
+ // TODO: add tests for the rest of them
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 01b7cfb..c4db4f3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -173,7 +173,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
// Then, if alias is specified, wrap the table with a Subquery using the alias.
// Otherwise, wrap the table with a Subquery using the table name.
alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable)
- } else if (table.tableType == CatalogTableType.VIRTUAL_VIEW) {
+ } else if (table.tableType == CatalogTableType.VIEW) {
val viewText = table.viewText.getOrElse(sys.error("Invalid view without text."))
alias match {
// because hive use things like `_c0` to build the expanded text
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index 367fcf1..5b580d0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -77,10 +77,10 @@ private[hive] case class MetastoreRelation(
catalogTable.properties.foreach { case (k, v) => tableParameters.put(k, v) }
tTable.setTableType(catalogTable.tableType match {
- case CatalogTableType.EXTERNAL_TABLE => HiveTableType.EXTERNAL_TABLE.toString
- case CatalogTableType.MANAGED_TABLE => HiveTableType.MANAGED_TABLE.toString
- case CatalogTableType.INDEX_TABLE => HiveTableType.INDEX_TABLE.toString
- case CatalogTableType.VIRTUAL_VIEW => HiveTableType.VIRTUAL_VIEW.toString
+ case CatalogTableType.EXTERNAL => HiveTableType.EXTERNAL_TABLE.toString
+ case CatalogTableType.MANAGED => HiveTableType.MANAGED_TABLE.toString
+ case CatalogTableType.INDEX => HiveTableType.INDEX_TABLE.toString
+ case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW.toString
})
val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 6a7345f..d651791 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -317,10 +317,10 @@ private[hive] class HiveClientImpl(
CatalogTable(
identifier = TableIdentifier(h.getTableName, Option(h.getDbName)),
tableType = h.getTableType match {
- case HiveTableType.EXTERNAL_TABLE => CatalogTableType.EXTERNAL_TABLE
- case HiveTableType.MANAGED_TABLE => CatalogTableType.MANAGED_TABLE
- case HiveTableType.INDEX_TABLE => CatalogTableType.INDEX_TABLE
- case HiveTableType.VIRTUAL_VIEW => CatalogTableType.VIRTUAL_VIEW
+ case HiveTableType.EXTERNAL_TABLE => CatalogTableType.EXTERNAL
+ case HiveTableType.MANAGED_TABLE => CatalogTableType.MANAGED
+ case HiveTableType.INDEX_TABLE => CatalogTableType.INDEX
+ case HiveTableType.VIRTUAL_VIEW => CatalogTableType.VIEW
},
schema = schema,
partitionColumnNames = partCols.map(_.name),
@@ -696,13 +696,13 @@ private[hive] class HiveClientImpl(
// Otherwise, Hive metastore will change the table to a MANAGED_TABLE.
// (metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java#L1095-L1105)
hiveTable.setTableType(table.tableType match {
- case CatalogTableType.EXTERNAL_TABLE =>
+ case CatalogTableType.EXTERNAL =>
hiveTable.setProperty("EXTERNAL", "TRUE")
HiveTableType.EXTERNAL_TABLE
- case CatalogTableType.MANAGED_TABLE =>
+ case CatalogTableType.MANAGED =>
HiveTableType.MANAGED_TABLE
- case CatalogTableType.INDEX_TABLE => HiveTableType.INDEX_TABLE
- case CatalogTableType.VIRTUAL_VIEW => HiveTableType.VIRTUAL_VIEW
+ case CatalogTableType.INDEX => HiveTableType.INDEX_TABLE
+ case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW
})
// Note: In Hive the schema and partition columns must be disjoint sets
val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 97bd47a..4ca5619 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -166,7 +166,7 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton {
tempPath.delete()
table("src").write.mode(SaveMode.Overwrite).parquet(tempPath.toString)
sql("DROP TABLE IF EXISTS refreshTable")
- createExternalTable("refreshTable", tempPath.toString, "parquet")
+ sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet")
checkAnswer(
table("refreshTable"),
table("src").collect())
@@ -190,7 +190,7 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton {
// Drop the table and create it again.
sql("DROP TABLE refreshTable")
- createExternalTable("refreshTable", tempPath.toString, "parquet")
+ sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet")
// It is not cached.
assert(!isCached("refreshTable"), "refreshTable should not be cached.")
// Refresh the table. REFRESH TABLE command should not make a uncached
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index cff1127..ec581b6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -70,7 +70,7 @@ class HiveDDLCommandSuite extends PlanTest {
assert(exists)
assert(desc.identifier.database == Some("mydb"))
assert(desc.identifier.table == "page_view")
- assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE)
+ assert(desc.tableType == CatalogTableType.EXTERNAL)
assert(desc.storage.locationUri == Some("/user/external/page_view"))
assert(desc.schema ==
CatalogColumn("viewtime", "int") ::
@@ -120,7 +120,7 @@ class HiveDDLCommandSuite extends PlanTest {
assert(exists)
assert(desc.identifier.database == Some("mydb"))
assert(desc.identifier.table == "page_view")
- assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE)
+ assert(desc.tableType == CatalogTableType.EXTERNAL)
assert(desc.storage.locationUri == Some("/user/external/page_view"))
assert(desc.schema ==
CatalogColumn("viewtime", "int") ::
@@ -151,7 +151,7 @@ class HiveDDLCommandSuite extends PlanTest {
assert(exists == false)
assert(desc.identifier.database == None)
assert(desc.identifier.table == "page_view")
- assert(desc.tableType == CatalogTableType.MANAGED_TABLE)
+ assert(desc.tableType == CatalogTableType.MANAGED)
assert(desc.storage.locationUri == None)
assert(desc.schema == Seq.empty[CatalogColumn])
assert(desc.viewText == None) // TODO will be SQLText
@@ -187,7 +187,7 @@ class HiveDDLCommandSuite extends PlanTest {
assert(exists == false)
assert(desc.identifier.database == None)
assert(desc.identifier.table == "ctas2")
- assert(desc.tableType == CatalogTableType.MANAGED_TABLE)
+ assert(desc.tableType == CatalogTableType.MANAGED)
assert(desc.storage.locationUri == None)
assert(desc.schema == Seq.empty[CatalogColumn])
assert(desc.viewText == None) // TODO will be SQLText
@@ -318,7 +318,7 @@ class HiveDDLCommandSuite extends PlanTest {
assert(!allowExisting)
assert(desc.identifier.database.isEmpty)
assert(desc.identifier.table == "my_table")
- assert(desc.tableType == CatalogTableType.MANAGED_TABLE)
+ assert(desc.tableType == CatalogTableType.MANAGED)
assert(desc.schema == Seq(CatalogColumn("id", "int"), CatalogColumn("name", "string")))
assert(desc.partitionColumnNames.isEmpty)
assert(desc.sortColumnNames.isEmpty)
@@ -353,7 +353,7 @@ class HiveDDLCommandSuite extends PlanTest {
test("create table - external") {
val query = "CREATE EXTERNAL TABLE tab1 (id int, name string)"
val (desc, _) = extractTableDesc(query)
- assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE)
+ assert(desc.tableType == CatalogTableType.EXTERNAL)
}
test("create table - if not exists") {
@@ -480,7 +480,7 @@ class HiveDDLCommandSuite extends PlanTest {
assert(allowExisting)
assert(desc.identifier.database == Some("dbx"))
assert(desc.identifier.table == "my_table")
- assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE)
+ assert(desc.tableType == CatalogTableType.EXTERNAL)
assert(desc.schema == Seq(
CatalogColumn("id", "int"),
CatalogColumn("name", "string"),
@@ -506,7 +506,7 @@ class HiveDDLCommandSuite extends PlanTest {
assert(!exists)
assert(desc.identifier.database.isEmpty)
assert(desc.identifier.table == "view1")
- assert(desc.tableType == CatalogTableType.VIRTUAL_VIEW)
+ assert(desc.tableType == CatalogTableType.VIEW)
assert(desc.storage.locationUri.isEmpty)
assert(desc.schema == Seq.empty[CatalogColumn])
assert(desc.viewText == Option("SELECT * FROM tab1"))
@@ -530,7 +530,7 @@ class HiveDDLCommandSuite extends PlanTest {
val (desc, exists) = extractTableDesc(v1)
assert(desc.identifier.database.isEmpty)
assert(desc.identifier.table == "view1")
- assert(desc.tableType == CatalogTableType.VIRTUAL_VIEW)
+ assert(desc.tableType == CatalogTableType.VIEW)
assert(desc.storage.locationUri.isEmpty)
assert(desc.schema ==
CatalogColumn("col1", null, nullable = true, None) ::
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
index cb60a2c..bf9935a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.hive.client.HiveClient
/**
* Test suite for the [[HiveExternalCatalog]].
*/
-class HiveExternalCatalogSuite extends CatalogTestCases {
+class HiveExternalCatalogSuite extends ExternalCatalogSuite {
private val client: HiveClient = {
// We create a metastore at a temp location to avoid any potential
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index d1a1490..0d6a2e7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -90,7 +90,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
assert(hiveTable.storage.serde === Some(serde))
assert(hiveTable.partitionColumnNames.isEmpty)
- assert(hiveTable.tableType === CatalogTableType.MANAGED_TABLE)
+ assert(hiveTable.tableType === CatalogTableType.MANAGED)
val columns = hiveTable.schema
assert(columns.map(_.name) === Seq("d1", "d2"))
@@ -121,7 +121,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
assert(hiveTable.storage.outputFormat === Some(outputFormat))
assert(hiveTable.storage.serde === Some(serde))
- assert(hiveTable.tableType === CatalogTableType.EXTERNAL_TABLE)
+ assert(hiveTable.tableType === CatalogTableType.EXTERNAL)
assert(hiveTable.storage.locationUri ===
Some(path.toURI.toString.stripSuffix(File.separator)))
@@ -153,7 +153,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
assert(hiveTable.storage.serde === Some(serde))
assert(hiveTable.partitionColumnNames.isEmpty)
- assert(hiveTable.tableType === CatalogTableType.EXTERNAL_TABLE)
+ assert(hiveTable.tableType === CatalogTableType.EXTERNAL)
val columns = hiveTable.schema
assert(columns.map(_.name) === Seq("d1", "d2"))
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 7cd01c9..31ba735 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -502,13 +502,13 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}
withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "json") {
- createExternalTable("createdJsonTable", tempPath.toString)
+ sparkSession.catalog.createExternalTable("createdJsonTable", tempPath.toString)
assert(table("createdJsonTable").schema === df.schema)
checkAnswer(sql("SELECT * FROM createdJsonTable"), df)
assert(
intercept[AnalysisException] {
- createExternalTable("createdJsonTable", jsonFilePath.toString)
+ sparkSession.catalog.createExternalTable("createdJsonTable", jsonFilePath.toString)
}.getMessage.contains("Table createdJsonTable already exists."),
"We should complain that createdJsonTable already exists")
}
@@ -520,7 +520,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
// Try to specify the schema.
withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "not a source name") {
val schema = StructType(StructField("b", StringType, true) :: Nil)
- createExternalTable(
+ sparkSession.catalog.createExternalTable(
"createdJsonTable",
"org.apache.spark.sql.json",
schema,
@@ -539,7 +539,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
test("path required error") {
assert(
intercept[AnalysisException] {
- createExternalTable(
+ sparkSession.catalog.createExternalTable(
"createdJsonTable",
"org.apache.spark.sql.json",
Map.empty[String, String])
@@ -725,7 +725,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
val schema = StructType(StructField("int", IntegerType, true) :: Nil)
val hiveTable = CatalogTable(
identifier = TableIdentifier(tableName, Some("default")),
- tableType = CatalogTableType.MANAGED_TABLE,
+ tableType = CatalogTableType.MANAGED,
schema = Seq.empty,
storage = CatalogStorageFormat(
locationUri = None,
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 916a470..9341b38 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -149,7 +149,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
val table =
CatalogTable(
identifier = TableIdentifier("src", Some("default")),
- tableType = CatalogTableType.MANAGED_TABLE,
+ tableType = CatalogTableType.MANAGED,
schema = Seq(CatalogColumn("key", "int")),
storage = CatalogStorageFormat(
locationUri = None,
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index fd19fcb..e23272d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -73,7 +73,7 @@ class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
hiveContext.sessionState.catalog
.getTableMetadata(TableIdentifier(tabName, Some("default")))
// It is a managed table, although it uses external in SQL
- assert(hiveTable.tableType == CatalogTableType.MANAGED_TABLE)
+ assert(hiveTable.tableType == CatalogTableType.MANAGED)
assert(tmpDir.listFiles.nonEmpty)
sql(s"DROP TABLE $tabName")
@@ -102,7 +102,7 @@ class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
hiveContext.sessionState.catalog
.getTableMetadata(TableIdentifier(tabName, Some("default")))
// This data source table is external table
- assert(hiveTable.tableType == CatalogTableType.EXTERNAL_TABLE)
+ assert(hiveTable.tableType == CatalogTableType.EXTERNAL)
assert(tmpDir.listFiles.nonEmpty)
sql(s"DROP TABLE $tabName")
@@ -166,7 +166,7 @@ class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
val hiveTable = catalog.getTableMetadata(TableIdentifier(externalTab, Some("default")))
- assert(hiveTable.tableType == CatalogTableType.EXTERNAL_TABLE)
+ assert(hiveTable.tableType == CatalogTableType.EXTERNAL)
// After data insertion, all the directory are not empty
assert(dirSet.forall(dir => dir.listFiles.nonEmpty))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org
[2/2] spark git commit: [SPARK-13477][SQL] Expose new user-facing
Catalog interface
Posted by rx...@apache.org.
[SPARK-13477][SQL] Expose new user-facing Catalog interface
## What changes were proposed in this pull request?
#12625 exposed a new user-facing conf interface in `SparkSession`. This patch adds a catalog interface.
## How was this patch tested?
See `CatalogSuite`.
Author: Andrew Or <an...@databricks.com>
Closes #12713 from andrewor14/user-facing-catalog.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8a83a56
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8a83a56
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8a83a56
Branch: refs/heads/master
Commit: d8a83a564ff3fd0281007adbf8aa3757da8a2c2b
Parents: d93976d
Author: Andrew Or <an...@databricks.com>
Authored: Tue Apr 26 21:29:25 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Apr 26 21:29:25 2016 -0700
----------------------------------------------------------------------
.../spark/sql/catalyst/ScalaReflection.scala | 26 +-
.../sql/catalyst/catalog/SessionCatalog.scala | 36 +-
.../spark/sql/catalyst/catalog/interface.scala | 8 +-
.../catalyst/encoders/ExpressionEncoder.scala | 5 +-
.../sql/catalyst/catalog/CatalogTestCases.scala | 580 ------------------
.../catalyst/catalog/ExternalCatalogSuite.scala | 581 +++++++++++++++++++
.../catalyst/catalog/InMemoryCatalogSuite.scala | 2 +-
.../catalyst/catalog/SessionCatalogSuite.scala | 2 +-
.../scala/org/apache/spark/sql/SQLContext.scala | 33 +-
.../org/apache/spark/sql/SparkSession.scala | 248 +-------
.../org/apache/spark/sql/catalog/Catalog.scala | 214 +++++++
.../apache/spark/sql/catalog/interface.scala | 101 ++++
.../spark/sql/execution/SparkSqlParser.scala | 6 +-
.../spark/sql/execution/command/cache.scala | 4 +-
.../spark/sql/execution/command/commands.scala | 4 +-
.../command/createDataSourceTables.scala | 4 +-
.../spark/sql/execution/command/ddl.scala | 8 +-
.../spark/sql/execution/command/tables.scala | 2 +-
.../spark/sql/execution/command/views.scala | 2 +-
.../apache/spark/sql/internal/CatalogImpl.scala | 352 +++++++++++
.../spark/sql/execution/command/DDLSuite.scala | 2 +-
.../spark/sql/internal/CatalogSuite.scala | 271 +++++++++
.../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +-
.../spark/sql/hive/MetastoreRelation.scala | 8 +-
.../spark/sql/hive/client/HiveClientImpl.scala | 16 +-
.../spark/sql/hive/CachedTableSuite.scala | 4 +-
.../spark/sql/hive/HiveDDLCommandSuite.scala | 18 +-
.../sql/hive/HiveExternalCatalogSuite.scala | 2 +-
.../sql/hive/HiveMetastoreCatalogSuite.scala | 6 +-
.../sql/hive/MetastoreDataSourcesSuite.scala | 10 +-
.../spark/sql/hive/client/VersionsSuite.scala | 2 +-
.../spark/sql/hive/execution/HiveDDLSuite.scala | 6 +-
32 files changed, 1665 insertions(+), 900 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index bd72313..be67605 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -22,7 +22,15 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
-import org.apache.spark.util.Utils
+
+
+/**
+ * A helper trait to create [[org.apache.spark.sql.catalyst.encoders.ExpressionEncoder]]s
+ * for classes whose fields are entirely defined by constructor params but should not be
+ * case classes.
+ */
+private[sql] trait DefinedByConstructorParams
+
/**
* A default version of ScalaReflection that uses the runtime universe.
@@ -333,7 +341,7 @@ object ScalaReflection extends ScalaReflection {
"toScalaMap",
keyData :: valueData :: Nil)
- case t if t <:< localTypeOf[Product] =>
+ case t if definedByConstructorParams(t) =>
val params = getConstructorParameters(t)
val cls = getClassFromType(tpe)
@@ -401,7 +409,7 @@ object ScalaReflection extends ScalaReflection {
val clsName = getClassNameFromType(tpe)
val walkedTypePath = s"""- root class: "${clsName}"""" :: Nil
serializerFor(inputObject, tpe, walkedTypePath) match {
- case expressions.If(_, _, s: CreateNamedStruct) if tpe <:< localTypeOf[Product] => s
+ case expressions.If(_, _, s: CreateNamedStruct) if definedByConstructorParams(tpe) => s
case other => CreateNamedStruct(expressions.Literal("value") :: other :: Nil)
}
}
@@ -491,7 +499,7 @@ object ScalaReflection extends ScalaReflection {
serializerFor(unwrapped, optType, newPath))
}
- case t if t <:< localTypeOf[Product] =>
+ case t if definedByConstructorParams(t) =>
val params = getConstructorParameters(t)
val nonNullOutput = CreateNamedStruct(params.flatMap { case (fieldName, fieldType) =>
val fieldValue = Invoke(inputObject, fieldName, dataTypeFor(fieldType))
@@ -680,7 +688,7 @@ object ScalaReflection extends ScalaReflection {
val Schema(valueDataType, valueNullable) = schemaFor(valueType)
Schema(MapType(schemaFor(keyType).dataType,
valueDataType, valueContainsNull = valueNullable), nullable = true)
- case t if t <:< localTypeOf[Product] =>
+ case t if definedByConstructorParams(t) =>
val params = getConstructorParameters(t)
Schema(StructType(
params.map { case (fieldName, fieldType) =>
@@ -712,6 +720,14 @@ object ScalaReflection extends ScalaReflection {
throw new UnsupportedOperationException(s"Schema for type $other is not supported")
}
}
+
+ /**
+ * Whether the fields of the given type is defined entirely by its constructor parameters.
+ */
+ private[sql] def definedByConstructorParams(tpe: Type): Boolean = {
+ tpe <:< localTypeOf[Product] || tpe <:< localTypeOf[DefinedByConstructorParams]
+ }
+
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 402aacf..91d35de 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -613,6 +613,25 @@ class SessionCatalog(
}
/**
+ * Look up the [[ExpressionInfo]] associated with the specified function, assuming it exists.
+ */
+ private[spark] def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo = {
+ // TODO: just make function registry take in FunctionIdentifier instead of duplicating this
+ val qualifiedName = name.copy(database = name.database.orElse(Some(currentDb)))
+ functionRegistry.lookupFunction(name.funcName)
+ .orElse(functionRegistry.lookupFunction(qualifiedName.unquotedString))
+ .getOrElse {
+ val db = qualifiedName.database.get
+ if (externalCatalog.functionExists(db, name.funcName)) {
+ val metadata = externalCatalog.getFunction(db, name.funcName)
+ new ExpressionInfo(metadata.className, qualifiedName.unquotedString)
+ } else {
+ failFunctionLookup(name.funcName)
+ }
+ }
+ }
+
+ /**
* Return an [[Expression]] that represents the specified function, assuming it exists.
*
* For a temporary function or a permanent function that has been loaded,
@@ -646,6 +665,7 @@ class SessionCatalog(
// The function has not been loaded to the function registry, which means
// that the function is a permanent function (if it actually has been registered
// in the metastore). We need to first put the function in the FunctionRegistry.
+ // TODO: why not just check whether the function exists first?
val catalogFunction = try {
externalCatalog.getFunction(currentDb, name.funcName)
} catch {
@@ -662,7 +682,7 @@ class SessionCatalog(
val builder = makeFunctionBuilder(qualifiedName.unquotedString, catalogFunction.className)
createTempFunction(qualifiedName.unquotedString, info, builder, ignoreIfExists = false)
// Now, we need to create the Expression.
- return functionRegistry.lookupFunction(qualifiedName.unquotedString, children)
+ functionRegistry.lookupFunction(qualifiedName.unquotedString, children)
}
/**
@@ -687,8 +707,8 @@ class SessionCatalog(
// -----------------
/**
- * Drop all existing databases (except "default") along with all associated tables,
- * partitions and functions, and set the current database to "default".
+ * Drop all existing databases (except "default"), tables, partitions and functions,
+ * and set the current database to "default".
*
* This is mainly used for tests.
*/
@@ -697,6 +717,16 @@ class SessionCatalog(
listDatabases().filter(_ != default).foreach { db =>
dropDatabase(db, ignoreIfNotExists = false, cascade = true)
}
+ listTables(default).foreach { table =>
+ dropTable(table, ignoreIfNotExists = false)
+ }
+ listFunctions(default).foreach { func =>
+ if (func.database.isDefined) {
+ dropFunction(func, ignoreIfNotExists = false)
+ } else {
+ dropTempFunction(func.funcName, ignoreIfNotExists = false)
+ }
+ }
tempTables.clear()
functionRegistry.clear()
// restore built-in functions
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 9e90987..d1e2b3f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -299,10 +299,10 @@ case class CatalogTable(
case class CatalogTableType private(name: String)
object CatalogTableType {
- val EXTERNAL_TABLE = new CatalogTableType("EXTERNAL_TABLE")
- val MANAGED_TABLE = new CatalogTableType("MANAGED_TABLE")
- val INDEX_TABLE = new CatalogTableType("INDEX_TABLE")
- val VIRTUAL_VIEW = new CatalogTableType("VIRTUAL_VIEW")
+ val EXTERNAL = new CatalogTableType("EXTERNAL")
+ val MANAGED = new CatalogTableType("MANAGED")
+ val INDEX = new CatalogTableType("INDEX")
+ val VIEW = new CatalogTableType("VIEW")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
index 56d29cf..5d29448 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
@@ -47,8 +47,9 @@ object ExpressionEncoder {
def apply[T : TypeTag](): ExpressionEncoder[T] = {
// We convert the not-serializable TypeTag into StructType and ClassTag.
val mirror = typeTag[T].mirror
- val cls = mirror.runtimeClass(typeTag[T].tpe)
- val flat = !classOf[Product].isAssignableFrom(cls)
+ val tpe = typeTag[T].tpe
+ val cls = mirror.runtimeClass(tpe)
+ val flat = !ScalaReflection.definedByConstructorParams(tpe)
val inputObject = BoundReference(0, ScalaReflection.dataTypeFor[T], nullable = false)
val serializer = ScalaReflection.serializerFor[T](inputObject)
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
deleted file mode 100644
index f961fe3..0000000
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
+++ /dev/null
@@ -1,580 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.catalog
-
-import org.scalatest.BeforeAndAfterEach
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.util.Utils
-
-
-/**
- * A reasonable complete test suite (i.e. behaviors) for a [[ExternalCatalog]].
- *
- * Implementations of the [[ExternalCatalog]] interface can create test suites by extending this.
- */
-abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
- protected val utils: CatalogTestUtils
- import utils._
-
- protected def resetState(): Unit = { }
-
- // Clear all state after each test
- override def afterEach(): Unit = {
- try {
- resetState()
- } finally {
- super.afterEach()
- }
- }
-
- // --------------------------------------------------------------------------
- // Databases
- // --------------------------------------------------------------------------
-
- test("basic create and list databases") {
- val catalog = newEmptyCatalog()
- catalog.createDatabase(newDb("default"), ignoreIfExists = true)
- assert(catalog.databaseExists("default"))
- assert(!catalog.databaseExists("testing"))
- assert(!catalog.databaseExists("testing2"))
- catalog.createDatabase(newDb("testing"), ignoreIfExists = false)
- assert(catalog.databaseExists("testing"))
- assert(catalog.listDatabases().toSet == Set("default", "testing"))
- catalog.createDatabase(newDb("testing2"), ignoreIfExists = false)
- assert(catalog.listDatabases().toSet == Set("default", "testing", "testing2"))
- assert(catalog.databaseExists("testing2"))
- assert(!catalog.databaseExists("does_not_exist"))
- }
-
- test("get database when a database exists") {
- val db1 = newBasicCatalog().getDatabase("db1")
- assert(db1.name == "db1")
- assert(db1.description.contains("db1"))
- }
-
- test("get database should throw exception when the database does not exist") {
- intercept[AnalysisException] { newBasicCatalog().getDatabase("db_that_does_not_exist") }
- }
-
- test("list databases without pattern") {
- val catalog = newBasicCatalog()
- assert(catalog.listDatabases().toSet == Set("default", "db1", "db2"))
- }
-
- test("list databases with pattern") {
- val catalog = newBasicCatalog()
- assert(catalog.listDatabases("db").toSet == Set.empty)
- assert(catalog.listDatabases("db*").toSet == Set("db1", "db2"))
- assert(catalog.listDatabases("*1").toSet == Set("db1"))
- assert(catalog.listDatabases("db2").toSet == Set("db2"))
- }
-
- test("drop database") {
- val catalog = newBasicCatalog()
- catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = false)
- assert(catalog.listDatabases().toSet == Set("default", "db2"))
- }
-
- test("drop database when the database is not empty") {
- // Throw exception if there are functions left
- val catalog1 = newBasicCatalog()
- catalog1.dropTable("db2", "tbl1", ignoreIfNotExists = false)
- catalog1.dropTable("db2", "tbl2", ignoreIfNotExists = false)
- intercept[AnalysisException] {
- catalog1.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
- }
- resetState()
-
- // Throw exception if there are tables left
- val catalog2 = newBasicCatalog()
- catalog2.dropFunction("db2", "func1")
- intercept[AnalysisException] {
- catalog2.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
- }
- resetState()
-
- // When cascade is true, it should drop them
- val catalog3 = newBasicCatalog()
- catalog3.dropDatabase("db2", ignoreIfNotExists = false, cascade = true)
- assert(catalog3.listDatabases().toSet == Set("default", "db1"))
- }
-
- test("drop database when the database does not exist") {
- val catalog = newBasicCatalog()
-
- intercept[AnalysisException] {
- catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = false, cascade = false)
- }
-
- catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = true, cascade = false)
- }
-
- test("alter database") {
- val catalog = newBasicCatalog()
- val db1 = catalog.getDatabase("db1")
- // Note: alter properties here because Hive does not support altering other fields
- catalog.alterDatabase(db1.copy(properties = Map("k" -> "v3", "good" -> "true")))
- val newDb1 = catalog.getDatabase("db1")
- assert(db1.properties.isEmpty)
- assert(newDb1.properties.size == 2)
- assert(newDb1.properties.get("k") == Some("v3"))
- assert(newDb1.properties.get("good") == Some("true"))
- }
-
- test("alter database should throw exception when the database does not exist") {
- intercept[AnalysisException] {
- newBasicCatalog().alterDatabase(newDb("does_not_exist"))
- }
- }
-
- // --------------------------------------------------------------------------
- // Tables
- // --------------------------------------------------------------------------
-
- test("the table type of an external table should be EXTERNAL_TABLE") {
- val catalog = newBasicCatalog()
- val table =
- newTable("external_table1", "db2").copy(tableType = CatalogTableType.EXTERNAL_TABLE)
- catalog.createTable("db2", table, ignoreIfExists = false)
- val actual = catalog.getTable("db2", "external_table1")
- assert(actual.tableType === CatalogTableType.EXTERNAL_TABLE)
- }
-
- test("drop table") {
- val catalog = newBasicCatalog()
- assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
- catalog.dropTable("db2", "tbl1", ignoreIfNotExists = false)
- assert(catalog.listTables("db2").toSet == Set("tbl2"))
- }
-
- test("drop table when database/table does not exist") {
- val catalog = newBasicCatalog()
- // Should always throw exception when the database does not exist
- intercept[AnalysisException] {
- catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = false)
- }
- intercept[AnalysisException] {
- catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = true)
- }
- // Should throw exception when the table does not exist, if ignoreIfNotExists is false
- intercept[AnalysisException] {
- catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = false)
- }
- catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = true)
- }
-
- test("rename table") {
- val catalog = newBasicCatalog()
- assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
- catalog.renameTable("db2", "tbl1", "tblone")
- assert(catalog.listTables("db2").toSet == Set("tblone", "tbl2"))
- }
-
- test("rename table when database/table does not exist") {
- val catalog = newBasicCatalog()
- intercept[AnalysisException] {
- catalog.renameTable("unknown_db", "unknown_table", "unknown_table")
- }
- intercept[AnalysisException] {
- catalog.renameTable("db2", "unknown_table", "unknown_table")
- }
- }
-
- test("alter table") {
- val catalog = newBasicCatalog()
- val tbl1 = catalog.getTable("db2", "tbl1")
- catalog.alterTable("db2", tbl1.copy(properties = Map("toh" -> "frem")))
- val newTbl1 = catalog.getTable("db2", "tbl1")
- assert(!tbl1.properties.contains("toh"))
- assert(newTbl1.properties.size == tbl1.properties.size + 1)
- assert(newTbl1.properties.get("toh") == Some("frem"))
- }
-
- test("alter table when database/table does not exist") {
- val catalog = newBasicCatalog()
- intercept[AnalysisException] {
- catalog.alterTable("unknown_db", newTable("tbl1", "unknown_db"))
- }
- intercept[AnalysisException] {
- catalog.alterTable("db2", newTable("unknown_table", "db2"))
- }
- }
-
- test("get table") {
- assert(newBasicCatalog().getTable("db2", "tbl1").identifier.table == "tbl1")
- }
-
- test("get table when database/table does not exist") {
- val catalog = newBasicCatalog()
- intercept[AnalysisException] {
- catalog.getTable("unknown_db", "unknown_table")
- }
- intercept[AnalysisException] {
- catalog.getTable("db2", "unknown_table")
- }
- }
-
- test("list tables without pattern") {
- val catalog = newBasicCatalog()
- intercept[AnalysisException] { catalog.listTables("unknown_db") }
- assert(catalog.listTables("db1").toSet == Set.empty)
- assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
- }
-
- test("list tables with pattern") {
- val catalog = newBasicCatalog()
- intercept[AnalysisException] { catalog.listTables("unknown_db", "*") }
- assert(catalog.listTables("db1", "*").toSet == Set.empty)
- assert(catalog.listTables("db2", "*").toSet == Set("tbl1", "tbl2"))
- assert(catalog.listTables("db2", "tbl*").toSet == Set("tbl1", "tbl2"))
- assert(catalog.listTables("db2", "*1").toSet == Set("tbl1"))
- }
-
- // --------------------------------------------------------------------------
- // Partitions
- // --------------------------------------------------------------------------
-
- test("basic create and list partitions") {
- val catalog = newEmptyCatalog()
- catalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
- catalog.createTable("mydb", newTable("tbl", "mydb"), ignoreIfExists = false)
- catalog.createPartitions("mydb", "tbl", Seq(part1, part2), ignoreIfExists = false)
- assert(catalogPartitionsEqual(catalog, "mydb", "tbl", Seq(part1, part2)))
- }
-
- test("create partitions when database/table does not exist") {
- val catalog = newBasicCatalog()
- intercept[AnalysisException] {
- catalog.createPartitions("does_not_exist", "tbl1", Seq(), ignoreIfExists = false)
- }
- intercept[AnalysisException] {
- catalog.createPartitions("db2", "does_not_exist", Seq(), ignoreIfExists = false)
- }
- }
-
- test("create partitions that already exist") {
- val catalog = newBasicCatalog()
- intercept[AnalysisException] {
- catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = false)
- }
- catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = true)
- }
-
- test("drop partitions") {
- val catalog = newBasicCatalog()
- assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2)))
- catalog.dropPartitions(
- "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false)
- assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part2)))
- resetState()
- val catalog2 = newBasicCatalog()
- assert(catalogPartitionsEqual(catalog2, "db2", "tbl2", Seq(part1, part2)))
- catalog2.dropPartitions(
- "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false)
- assert(catalog2.listPartitions("db2", "tbl2").isEmpty)
- }
-
- test("drop partitions when database/table does not exist") {
- val catalog = newBasicCatalog()
- intercept[AnalysisException] {
- catalog.dropPartitions(
- "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false)
- }
- intercept[AnalysisException] {
- catalog.dropPartitions(
- "db2", "does_not_exist", Seq(), ignoreIfNotExists = false)
- }
- }
-
- test("drop partitions that do not exist") {
- val catalog = newBasicCatalog()
- intercept[AnalysisException] {
- catalog.dropPartitions(
- "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false)
- }
- catalog.dropPartitions(
- "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true)
- }
-
- test("get partition") {
- val catalog = newBasicCatalog()
- assert(catalog.getPartition("db2", "tbl2", part1.spec).spec == part1.spec)
- assert(catalog.getPartition("db2", "tbl2", part2.spec).spec == part2.spec)
- intercept[AnalysisException] {
- catalog.getPartition("db2", "tbl1", part3.spec)
- }
- }
-
- test("get partition when database/table does not exist") {
- val catalog = newBasicCatalog()
- intercept[AnalysisException] {
- catalog.getPartition("does_not_exist", "tbl1", part1.spec)
- }
- intercept[AnalysisException] {
- catalog.getPartition("db2", "does_not_exist", part1.spec)
- }
- }
-
- test("rename partitions") {
- val catalog = newBasicCatalog()
- val newPart1 = part1.copy(spec = Map("a" -> "100", "b" -> "101"))
- val newPart2 = part2.copy(spec = Map("a" -> "200", "b" -> "201"))
- val newSpecs = Seq(newPart1.spec, newPart2.spec)
- catalog.renamePartitions("db2", "tbl2", Seq(part1.spec, part2.spec), newSpecs)
- assert(catalog.getPartition("db2", "tbl2", newPart1.spec).spec === newPart1.spec)
- assert(catalog.getPartition("db2", "tbl2", newPart2.spec).spec === newPart2.spec)
- // The old partitions should no longer exist
- intercept[AnalysisException] { catalog.getPartition("db2", "tbl2", part1.spec) }
- intercept[AnalysisException] { catalog.getPartition("db2", "tbl2", part2.spec) }
- }
-
- test("rename partitions when database/table does not exist") {
- val catalog = newBasicCatalog()
- intercept[AnalysisException] {
- catalog.renamePartitions("does_not_exist", "tbl1", Seq(part1.spec), Seq(part2.spec))
- }
- intercept[AnalysisException] {
- catalog.renamePartitions("db2", "does_not_exist", Seq(part1.spec), Seq(part2.spec))
- }
- }
-
- test("alter partitions") {
- val catalog = newBasicCatalog()
- try {
- // Note: Before altering table partitions in Hive, you *must* set the current database
- // to the one that contains the table of interest. Otherwise you will end up with the
- // most helpful error message ever: "Unable to alter partition. alter is not possible."
- // See HIVE-2742 for more detail.
- catalog.setCurrentDatabase("db2")
- val newLocation = newUriForDatabase()
- // alter but keep spec the same
- val oldPart1 = catalog.getPartition("db2", "tbl2", part1.spec)
- val oldPart2 = catalog.getPartition("db2", "tbl2", part2.spec)
- catalog.alterPartitions("db2", "tbl2", Seq(
- oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocation))),
- oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocation)))))
- val newPart1 = catalog.getPartition("db2", "tbl2", part1.spec)
- val newPart2 = catalog.getPartition("db2", "tbl2", part2.spec)
- assert(newPart1.storage.locationUri == Some(newLocation))
- assert(newPart2.storage.locationUri == Some(newLocation))
- assert(oldPart1.storage.locationUri != Some(newLocation))
- assert(oldPart2.storage.locationUri != Some(newLocation))
- // alter but change spec, should fail because new partition specs do not exist yet
- val badPart1 = part1.copy(spec = Map("a" -> "v1", "b" -> "v2"))
- val badPart2 = part2.copy(spec = Map("a" -> "v3", "b" -> "v4"))
- intercept[AnalysisException] {
- catalog.alterPartitions("db2", "tbl2", Seq(badPart1, badPart2))
- }
- } finally {
- // Remember to restore the original current database, which we assume to be "default"
- catalog.setCurrentDatabase("default")
- }
- }
-
- test("alter partitions when database/table does not exist") {
- val catalog = newBasicCatalog()
- intercept[AnalysisException] {
- catalog.alterPartitions("does_not_exist", "tbl1", Seq(part1))
- }
- intercept[AnalysisException] {
- catalog.alterPartitions("db2", "does_not_exist", Seq(part1))
- }
- }
-
- // --------------------------------------------------------------------------
- // Functions
- // --------------------------------------------------------------------------
-
- test("basic create and list functions") {
- val catalog = newEmptyCatalog()
- catalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
- catalog.createFunction("mydb", newFunc("myfunc"))
- assert(catalog.listFunctions("mydb", "*").toSet == Set("myfunc"))
- }
-
- test("create function when database does not exist") {
- val catalog = newBasicCatalog()
- intercept[AnalysisException] {
- catalog.createFunction("does_not_exist", newFunc())
- }
- }
-
- test("create function that already exists") {
- val catalog = newBasicCatalog()
- intercept[AnalysisException] {
- catalog.createFunction("db2", newFunc("func1"))
- }
- }
-
- test("drop function") {
- val catalog = newBasicCatalog()
- assert(catalog.listFunctions("db2", "*").toSet == Set("func1"))
- catalog.dropFunction("db2", "func1")
- assert(catalog.listFunctions("db2", "*").isEmpty)
- }
-
- test("drop function when database does not exist") {
- val catalog = newBasicCatalog()
- intercept[AnalysisException] {
- catalog.dropFunction("does_not_exist", "something")
- }
- }
-
- test("drop function that does not exist") {
- val catalog = newBasicCatalog()
- intercept[AnalysisException] {
- catalog.dropFunction("db2", "does_not_exist")
- }
- }
-
- test("get function") {
- val catalog = newBasicCatalog()
- assert(catalog.getFunction("db2", "func1") ==
- CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass,
- Seq.empty[(String, String)]))
- intercept[AnalysisException] {
- catalog.getFunction("db2", "does_not_exist")
- }
- }
-
- test("get function when database does not exist") {
- val catalog = newBasicCatalog()
- intercept[AnalysisException] {
- catalog.getFunction("does_not_exist", "func1")
- }
- }
-
- test("rename function") {
- val catalog = newBasicCatalog()
- val newName = "funcky"
- assert(catalog.getFunction("db2", "func1").className == funcClass)
- catalog.renameFunction("db2", "func1", newName)
- intercept[AnalysisException] { catalog.getFunction("db2", "func1") }
- assert(catalog.getFunction("db2", newName).identifier.funcName == newName)
- assert(catalog.getFunction("db2", newName).className == funcClass)
- intercept[AnalysisException] { catalog.renameFunction("db2", "does_not_exist", "me") }
- }
-
- test("rename function when database does not exist") {
- val catalog = newBasicCatalog()
- intercept[AnalysisException] {
- catalog.renameFunction("does_not_exist", "func1", "func5")
- }
- }
-
- test("list functions") {
- val catalog = newBasicCatalog()
- catalog.createFunction("db2", newFunc("func2"))
- catalog.createFunction("db2", newFunc("not_me"))
- assert(catalog.listFunctions("db2", "*").toSet == Set("func1", "func2", "not_me"))
- assert(catalog.listFunctions("db2", "func*").toSet == Set("func1", "func2"))
- }
-
-}
-
-
-/**
- * A collection of utility fields and methods for tests related to the [[ExternalCatalog]].
- */
-abstract class CatalogTestUtils {
-
- // Unimplemented methods
- val tableInputFormat: String
- val tableOutputFormat: String
- def newEmptyCatalog(): ExternalCatalog
-
- // These fields must be lazy because they rely on fields that are not implemented yet
- lazy val storageFormat = CatalogStorageFormat(
- locationUri = None,
- inputFormat = Some(tableInputFormat),
- outputFormat = Some(tableOutputFormat),
- serde = None,
- serdeProperties = Map.empty)
- lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat)
- lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat)
- lazy val part3 = CatalogTablePartition(Map("a" -> "5", "b" -> "6"), storageFormat)
- lazy val funcClass = "org.apache.spark.myFunc"
-
- /**
- * Creates a basic catalog, with the following structure:
- *
- * default
- * db1
- * db2
- * - tbl1
- * - tbl2
- * - part1
- * - part2
- * - func1
- */
- def newBasicCatalog(): ExternalCatalog = {
- val catalog = newEmptyCatalog()
- // When testing against a real catalog, the default database may already exist
- catalog.createDatabase(newDb("default"), ignoreIfExists = true)
- catalog.createDatabase(newDb("db1"), ignoreIfExists = false)
- catalog.createDatabase(newDb("db2"), ignoreIfExists = false)
- catalog.createTable("db2", newTable("tbl1", "db2"), ignoreIfExists = false)
- catalog.createTable("db2", newTable("tbl2", "db2"), ignoreIfExists = false)
- catalog.createPartitions("db2", "tbl2", Seq(part1, part2), ignoreIfExists = false)
- catalog.createFunction("db2", newFunc("func1", Some("db2")))
- catalog
- }
-
- def newFunc(): CatalogFunction = newFunc("funcName")
-
- def newUriForDatabase(): String = Utils.createTempDir().getAbsolutePath
-
- def newDb(name: String): CatalogDatabase = {
- CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty)
- }
-
- def newTable(name: String, db: String): CatalogTable = newTable(name, Some(db))
-
- def newTable(name: String, database: Option[String] = None): CatalogTable = {
- CatalogTable(
- identifier = TableIdentifier(name, database),
- tableType = CatalogTableType.EXTERNAL_TABLE,
- storage = storageFormat,
- schema = Seq(
- CatalogColumn("col1", "int"),
- CatalogColumn("col2", "string"),
- CatalogColumn("a", "int"),
- CatalogColumn("b", "string")),
- partitionColumnNames = Seq("a", "b"))
- }
-
- def newFunc(name: String, database: Option[String] = None): CatalogFunction = {
- CatalogFunction(FunctionIdentifier(name, database), funcClass, Seq.empty[(String, String)])
- }
-
- /**
- * Whether the catalog's table partitions equal the ones given.
- * Note: Hive sets some random serde things, so we just compare the specs here.
- */
- def catalogPartitionsEqual(
- catalog: ExternalCatalog,
- db: String,
- table: String,
- parts: Seq[CatalogTablePartition]): Boolean = {
- catalog.listPartitions(db, table).map(_.spec).toSet == parts.map(_.spec).toSet
- }
-
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
new file mode 100644
index 0000000..d739b17
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.util.Utils
+
+
+/**
+ * A reasonable complete test suite (i.e. behaviors) for a [[ExternalCatalog]].
+ *
+ * Implementations of the [[ExternalCatalog]] interface can create test suites by extending this.
+ */
+abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEach {
+ protected val utils: CatalogTestUtils
+ import utils._
+
+ protected def resetState(): Unit = { }
+
+ // Clear all state after each test
+ override def afterEach(): Unit = {
+ try {
+ resetState()
+ } finally {
+ super.afterEach()
+ }
+ }
+
+ // --------------------------------------------------------------------------
+ // Databases
+ // --------------------------------------------------------------------------
+
+ test("basic create and list databases") {
+ val catalog = newEmptyCatalog()
+ catalog.createDatabase(newDb("default"), ignoreIfExists = true)
+ assert(catalog.databaseExists("default"))
+ assert(!catalog.databaseExists("testing"))
+ assert(!catalog.databaseExists("testing2"))
+ catalog.createDatabase(newDb("testing"), ignoreIfExists = false)
+ assert(catalog.databaseExists("testing"))
+ assert(catalog.listDatabases().toSet == Set("default", "testing"))
+ catalog.createDatabase(newDb("testing2"), ignoreIfExists = false)
+ assert(catalog.listDatabases().toSet == Set("default", "testing", "testing2"))
+ assert(catalog.databaseExists("testing2"))
+ assert(!catalog.databaseExists("does_not_exist"))
+ }
+
+ test("get database when a database exists") {
+ val db1 = newBasicCatalog().getDatabase("db1")
+ assert(db1.name == "db1")
+ assert(db1.description.contains("db1"))
+ }
+
+ test("get database should throw exception when the database does not exist") {
+ intercept[AnalysisException] { newBasicCatalog().getDatabase("db_that_does_not_exist") }
+ }
+
+ test("list databases without pattern") {
+ val catalog = newBasicCatalog()
+ assert(catalog.listDatabases().toSet == Set("default", "db1", "db2"))
+ }
+
+ test("list databases with pattern") {
+ val catalog = newBasicCatalog()
+ assert(catalog.listDatabases("db").toSet == Set.empty)
+ assert(catalog.listDatabases("db*").toSet == Set("db1", "db2"))
+ assert(catalog.listDatabases("*1").toSet == Set("db1"))
+ assert(catalog.listDatabases("db2").toSet == Set("db2"))
+ }
+
+ test("drop database") {
+ val catalog = newBasicCatalog()
+ catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = false)
+ assert(catalog.listDatabases().toSet == Set("default", "db2"))
+ }
+
+ test("drop database when the database is not empty") {
+ // Throw exception if there are functions left
+ val catalog1 = newBasicCatalog()
+ catalog1.dropTable("db2", "tbl1", ignoreIfNotExists = false)
+ catalog1.dropTable("db2", "tbl2", ignoreIfNotExists = false)
+ intercept[AnalysisException] {
+ catalog1.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
+ }
+ resetState()
+
+ // Throw exception if there are tables left
+ val catalog2 = newBasicCatalog()
+ catalog2.dropFunction("db2", "func1")
+ intercept[AnalysisException] {
+ catalog2.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
+ }
+ resetState()
+
+ // When cascade is true, it should drop them
+ val catalog3 = newBasicCatalog()
+ catalog3.dropDatabase("db2", ignoreIfNotExists = false, cascade = true)
+ assert(catalog3.listDatabases().toSet == Set("default", "db1"))
+ }
+
+ test("drop database when the database does not exist") {
+ val catalog = newBasicCatalog()
+
+ intercept[AnalysisException] {
+ catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = false, cascade = false)
+ }
+
+ catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = true, cascade = false)
+ }
+
+ test("alter database") {
+ val catalog = newBasicCatalog()
+ val db1 = catalog.getDatabase("db1")
+ // Note: alter properties here because Hive does not support altering other fields
+ catalog.alterDatabase(db1.copy(properties = Map("k" -> "v3", "good" -> "true")))
+ val newDb1 = catalog.getDatabase("db1")
+ assert(db1.properties.isEmpty)
+ assert(newDb1.properties.size == 2)
+ assert(newDb1.properties.get("k") == Some("v3"))
+ assert(newDb1.properties.get("good") == Some("true"))
+ }
+
+ test("alter database should throw exception when the database does not exist") {
+ intercept[AnalysisException] {
+ newBasicCatalog().alterDatabase(newDb("does_not_exist"))
+ }
+ }
+
+ // --------------------------------------------------------------------------
+ // Tables
+ // --------------------------------------------------------------------------
+
+ test("the table type of an external table should be EXTERNAL_TABLE") {
+ val catalog = newBasicCatalog()
+ val table =
+ newTable("external_table1", "db2").copy(tableType = CatalogTableType.EXTERNAL)
+ catalog.createTable("db2", table, ignoreIfExists = false)
+ val actual = catalog.getTable("db2", "external_table1")
+ assert(actual.tableType === CatalogTableType.EXTERNAL)
+ }
+
+ test("drop table") {
+ val catalog = newBasicCatalog()
+ assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+ catalog.dropTable("db2", "tbl1", ignoreIfNotExists = false)
+ assert(catalog.listTables("db2").toSet == Set("tbl2"))
+ }
+
+ test("drop table when database/table does not exist") {
+ val catalog = newBasicCatalog()
+ // Should always throw exception when the database does not exist
+ intercept[AnalysisException] {
+ catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = false)
+ }
+ intercept[AnalysisException] {
+ catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = true)
+ }
+ // Should throw exception when the table does not exist, if ignoreIfNotExists is false
+ intercept[AnalysisException] {
+ catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = false)
+ }
+ catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = true)
+ }
+
+ test("rename table") {
+ val catalog = newBasicCatalog()
+ assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+ catalog.renameTable("db2", "tbl1", "tblone")
+ assert(catalog.listTables("db2").toSet == Set("tblone", "tbl2"))
+ }
+
+ test("rename table when database/table does not exist") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.renameTable("unknown_db", "unknown_table", "unknown_table")
+ }
+ intercept[AnalysisException] {
+ catalog.renameTable("db2", "unknown_table", "unknown_table")
+ }
+ }
+
+ test("alter table") {
+ val catalog = newBasicCatalog()
+ val tbl1 = catalog.getTable("db2", "tbl1")
+ catalog.alterTable("db2", tbl1.copy(properties = Map("toh" -> "frem")))
+ val newTbl1 = catalog.getTable("db2", "tbl1")
+ assert(!tbl1.properties.contains("toh"))
+ assert(newTbl1.properties.size == tbl1.properties.size + 1)
+ assert(newTbl1.properties.get("toh") == Some("frem"))
+ }
+
+ test("alter table when database/table does not exist") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.alterTable("unknown_db", newTable("tbl1", "unknown_db"))
+ }
+ intercept[AnalysisException] {
+ catalog.alterTable("db2", newTable("unknown_table", "db2"))
+ }
+ }
+
+ test("get table") {
+ assert(newBasicCatalog().getTable("db2", "tbl1").identifier.table == "tbl1")
+ }
+
+ test("get table when database/table does not exist") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.getTable("unknown_db", "unknown_table")
+ }
+ intercept[AnalysisException] {
+ catalog.getTable("db2", "unknown_table")
+ }
+ }
+
+ test("list tables without pattern") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] { catalog.listTables("unknown_db") }
+ assert(catalog.listTables("db1").toSet == Set.empty)
+ assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+ }
+
+ test("list tables with pattern") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] { catalog.listTables("unknown_db", "*") }
+ assert(catalog.listTables("db1", "*").toSet == Set.empty)
+ assert(catalog.listTables("db2", "*").toSet == Set("tbl1", "tbl2"))
+ assert(catalog.listTables("db2", "tbl*").toSet == Set("tbl1", "tbl2"))
+ assert(catalog.listTables("db2", "*1").toSet == Set("tbl1"))
+ }
+
+ // --------------------------------------------------------------------------
+ // Partitions
+ // --------------------------------------------------------------------------
+
+ test("basic create and list partitions") {
+ val catalog = newEmptyCatalog()
+ catalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
+ catalog.createTable("mydb", newTable("tbl", "mydb"), ignoreIfExists = false)
+ catalog.createPartitions("mydb", "tbl", Seq(part1, part2), ignoreIfExists = false)
+ assert(catalogPartitionsEqual(catalog, "mydb", "tbl", Seq(part1, part2)))
+ }
+
+ test("create partitions when database/table does not exist") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.createPartitions("does_not_exist", "tbl1", Seq(), ignoreIfExists = false)
+ }
+ intercept[AnalysisException] {
+ catalog.createPartitions("db2", "does_not_exist", Seq(), ignoreIfExists = false)
+ }
+ }
+
+ test("create partitions that already exist") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = false)
+ }
+ catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = true)
+ }
+
+ test("drop partitions") {
+ val catalog = newBasicCatalog()
+ assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2)))
+ catalog.dropPartitions(
+ "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false)
+ assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part2)))
+ resetState()
+ val catalog2 = newBasicCatalog()
+ assert(catalogPartitionsEqual(catalog2, "db2", "tbl2", Seq(part1, part2)))
+ catalog2.dropPartitions(
+ "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false)
+ assert(catalog2.listPartitions("db2", "tbl2").isEmpty)
+ }
+
+ test("drop partitions when database/table does not exist") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.dropPartitions(
+ "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false)
+ }
+ intercept[AnalysisException] {
+ catalog.dropPartitions(
+ "db2", "does_not_exist", Seq(), ignoreIfNotExists = false)
+ }
+ }
+
+ test("drop partitions that do not exist") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.dropPartitions(
+ "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false)
+ }
+ catalog.dropPartitions(
+ "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true)
+ }
+
+ test("get partition") {
+ val catalog = newBasicCatalog()
+ assert(catalog.getPartition("db2", "tbl2", part1.spec).spec == part1.spec)
+ assert(catalog.getPartition("db2", "tbl2", part2.spec).spec == part2.spec)
+ intercept[AnalysisException] {
+ catalog.getPartition("db2", "tbl1", part3.spec)
+ }
+ }
+
+ test("get partition when database/table does not exist") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.getPartition("does_not_exist", "tbl1", part1.spec)
+ }
+ intercept[AnalysisException] {
+ catalog.getPartition("db2", "does_not_exist", part1.spec)
+ }
+ }
+
+ test("rename partitions") {
+ val catalog = newBasicCatalog()
+ val newPart1 = part1.copy(spec = Map("a" -> "100", "b" -> "101"))
+ val newPart2 = part2.copy(spec = Map("a" -> "200", "b" -> "201"))
+ val newSpecs = Seq(newPart1.spec, newPart2.spec)
+ catalog.renamePartitions("db2", "tbl2", Seq(part1.spec, part2.spec), newSpecs)
+ assert(catalog.getPartition("db2", "tbl2", newPart1.spec).spec === newPart1.spec)
+ assert(catalog.getPartition("db2", "tbl2", newPart2.spec).spec === newPart2.spec)
+ // The old partitions should no longer exist
+ intercept[AnalysisException] { catalog.getPartition("db2", "tbl2", part1.spec) }
+ intercept[AnalysisException] { catalog.getPartition("db2", "tbl2", part2.spec) }
+ }
+
+ test("rename partitions when database/table does not exist") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.renamePartitions("does_not_exist", "tbl1", Seq(part1.spec), Seq(part2.spec))
+ }
+ intercept[AnalysisException] {
+ catalog.renamePartitions("db2", "does_not_exist", Seq(part1.spec), Seq(part2.spec))
+ }
+ }
+
+ test("alter partitions") {
+ val catalog = newBasicCatalog()
+ try {
+ // Note: Before altering table partitions in Hive, you *must* set the current database
+ // to the one that contains the table of interest. Otherwise you will end up with the
+ // most helpful error message ever: "Unable to alter partition. alter is not possible."
+ // See HIVE-2742 for more detail.
+ catalog.setCurrentDatabase("db2")
+ val newLocation = newUriForDatabase()
+ // alter but keep spec the same
+ val oldPart1 = catalog.getPartition("db2", "tbl2", part1.spec)
+ val oldPart2 = catalog.getPartition("db2", "tbl2", part2.spec)
+ catalog.alterPartitions("db2", "tbl2", Seq(
+ oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocation))),
+ oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocation)))))
+ val newPart1 = catalog.getPartition("db2", "tbl2", part1.spec)
+ val newPart2 = catalog.getPartition("db2", "tbl2", part2.spec)
+ assert(newPart1.storage.locationUri == Some(newLocation))
+ assert(newPart2.storage.locationUri == Some(newLocation))
+ assert(oldPart1.storage.locationUri != Some(newLocation))
+ assert(oldPart2.storage.locationUri != Some(newLocation))
+ // alter but change spec, should fail because new partition specs do not exist yet
+ val badPart1 = part1.copy(spec = Map("a" -> "v1", "b" -> "v2"))
+ val badPart2 = part2.copy(spec = Map("a" -> "v3", "b" -> "v4"))
+ intercept[AnalysisException] {
+ catalog.alterPartitions("db2", "tbl2", Seq(badPart1, badPart2))
+ }
+ } finally {
+ // Remember to restore the original current database, which we assume to be "default"
+ catalog.setCurrentDatabase("default")
+ }
+ }
+
+ test("alter partitions when database/table does not exist") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.alterPartitions("does_not_exist", "tbl1", Seq(part1))
+ }
+ intercept[AnalysisException] {
+ catalog.alterPartitions("db2", "does_not_exist", Seq(part1))
+ }
+ }
+
+ // --------------------------------------------------------------------------
+ // Functions
+ // --------------------------------------------------------------------------
+
+ test("basic create and list functions") {
+ val catalog = newEmptyCatalog()
+ catalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
+ catalog.createFunction("mydb", newFunc("myfunc"))
+ assert(catalog.listFunctions("mydb", "*").toSet == Set("myfunc"))
+ }
+
+ test("create function when database does not exist") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.createFunction("does_not_exist", newFunc())
+ }
+ }
+
+ test("create function that already exists") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.createFunction("db2", newFunc("func1"))
+ }
+ }
+
+ test("drop function") {
+ val catalog = newBasicCatalog()
+ assert(catalog.listFunctions("db2", "*").toSet == Set("func1"))
+ catalog.dropFunction("db2", "func1")
+ assert(catalog.listFunctions("db2", "*").isEmpty)
+ }
+
+ test("drop function when database does not exist") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.dropFunction("does_not_exist", "something")
+ }
+ }
+
+ test("drop function that does not exist") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.dropFunction("db2", "does_not_exist")
+ }
+ }
+
+ test("get function") {
+ val catalog = newBasicCatalog()
+ assert(catalog.getFunction("db2", "func1") ==
+ CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass,
+ Seq.empty[(String, String)]))
+ intercept[AnalysisException] {
+ catalog.getFunction("db2", "does_not_exist")
+ }
+ }
+
+ test("get function when database does not exist") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.getFunction("does_not_exist", "func1")
+ }
+ }
+
+ test("rename function") {
+ val catalog = newBasicCatalog()
+ val newName = "funcky"
+ assert(catalog.getFunction("db2", "func1").className == funcClass)
+ catalog.renameFunction("db2", "func1", newName)
+ intercept[AnalysisException] { catalog.getFunction("db2", "func1") }
+ assert(catalog.getFunction("db2", newName).identifier.funcName == newName)
+ assert(catalog.getFunction("db2", newName).className == funcClass)
+ intercept[AnalysisException] { catalog.renameFunction("db2", "does_not_exist", "me") }
+ }
+
+ test("rename function when database does not exist") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.renameFunction("does_not_exist", "func1", "func5")
+ }
+ }
+
+ test("list functions") {
+ val catalog = newBasicCatalog()
+ catalog.createFunction("db2", newFunc("func2"))
+ catalog.createFunction("db2", newFunc("not_me"))
+ assert(catalog.listFunctions("db2", "*").toSet == Set("func1", "func2", "not_me"))
+ assert(catalog.listFunctions("db2", "func*").toSet == Set("func1", "func2"))
+ }
+
+}
+
+
+/**
+ * A collection of utility fields and methods for tests related to the [[ExternalCatalog]].
+ */
+abstract class CatalogTestUtils {
+
+ // Unimplemented methods
+ val tableInputFormat: String
+ val tableOutputFormat: String
+ def newEmptyCatalog(): ExternalCatalog
+
+ // These fields must be lazy because they rely on fields that are not implemented yet
+ lazy val storageFormat = CatalogStorageFormat(
+ locationUri = None,
+ inputFormat = Some(tableInputFormat),
+ outputFormat = Some(tableOutputFormat),
+ serde = None,
+ serdeProperties = Map.empty)
+ lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat)
+ lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat)
+ lazy val part3 = CatalogTablePartition(Map("a" -> "5", "b" -> "6"), storageFormat)
+ lazy val funcClass = "org.apache.spark.myFunc"
+
+ /**
+ * Creates a basic catalog, with the following structure:
+ *
+ * default
+ * db1
+ * db2
+ * - tbl1
+ * - tbl2
+ * - part1
+ * - part2
+ * - func1
+ */
+ def newBasicCatalog(): ExternalCatalog = {
+ val catalog = newEmptyCatalog()
+ // When testing against a real catalog, the default database may already exist
+ catalog.createDatabase(newDb("default"), ignoreIfExists = true)
+ catalog.createDatabase(newDb("db1"), ignoreIfExists = false)
+ catalog.createDatabase(newDb("db2"), ignoreIfExists = false)
+ catalog.createTable("db2", newTable("tbl1", "db2"), ignoreIfExists = false)
+ catalog.createTable("db2", newTable("tbl2", "db2"), ignoreIfExists = false)
+ catalog.createPartitions("db2", "tbl2", Seq(part1, part2), ignoreIfExists = false)
+ catalog.createFunction("db2", newFunc("func1", Some("db2")))
+ catalog
+ }
+
+ def newFunc(): CatalogFunction = newFunc("funcName")
+
+ def newUriForDatabase(): String = Utils.createTempDir().getAbsolutePath
+
+ def newDb(name: String): CatalogDatabase = {
+ CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty)
+ }
+
+ def newTable(name: String, db: String): CatalogTable = newTable(name, Some(db))
+
+ def newTable(name: String, database: Option[String] = None): CatalogTable = {
+ CatalogTable(
+ identifier = TableIdentifier(name, database),
+ tableType = CatalogTableType.EXTERNAL,
+ storage = storageFormat,
+ schema = Seq(
+ CatalogColumn("col1", "int"),
+ CatalogColumn("col2", "string"),
+ CatalogColumn("a", "int"),
+ CatalogColumn("b", "string")),
+ partitionColumnNames = Seq("a", "b"),
+ bucketColumnNames = Seq("col1"))
+ }
+
+ def newFunc(name: String, database: Option[String] = None): CatalogFunction = {
+ CatalogFunction(FunctionIdentifier(name, database), funcClass, Seq.empty[(String, String)])
+ }
+
+ /**
+ * Whether the catalog's table partitions equal the ones given.
+ * Note: Hive sets some random serde things, so we just compare the specs here.
+ */
+ def catalogPartitionsEqual(
+ catalog: ExternalCatalog,
+ db: String,
+ table: String,
+ parts: Seq[CatalogTablePartition]): Boolean = {
+ catalog.listPartitions(db, table).map(_.spec).toSet == parts.map(_.spec).toSet
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
index 63a7b2c..0605daa 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.catalog
/** Test suite for the [[InMemoryCatalog]]. */
-class InMemoryCatalogSuite extends CatalogTestCases {
+class InMemoryCatalogSuite extends ExternalCatalogSuite {
protected override val utils: CatalogTestUtils = new CatalogTestUtils {
override val tableInputFormat: String = "org.apache.park.SequenceFileInputFormat"
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 1933be5..ba5d8ce 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias}
/**
* Tests for [[SessionCatalog]] that assume that [[InMemoryCatalog]] is correctly implemented.
*
- * Note: many of the methods here are very similar to the ones in [[CatalogTestCases]].
+ * Note: many of the methods here are very similar to the ones in [[ExternalCatalogSuite]].
* This is because [[SessionCatalog]] and [[ExternalCatalog]] share many similar method
* signatures but do not extend a common parent. This is largely by design but
* unfortunately leads to very similar test code in two places.
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 47c043a..dbbdf11 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.ShowTablesCommand
import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
import org.apache.spark.sql.sources.BaseRelation
@@ -258,7 +259,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def isCached(tableName: String): Boolean = {
- sparkSession.isCached(tableName)
+ sparkSession.catalog.isCached(tableName)
}
/**
@@ -267,7 +268,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
private[sql] def isCached(qName: Dataset[_]): Boolean = {
- sparkSession.isCached(qName)
+ sparkSession.cacheManager.lookupCachedData(qName).nonEmpty
}
/**
@@ -276,7 +277,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def cacheTable(tableName: String): Unit = {
- sparkSession.cacheTable(tableName)
+ sparkSession.catalog.cacheTable(tableName)
}
/**
@@ -285,7 +286,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def uncacheTable(tableName: String): Unit = {
- sparkSession.uncacheTable(tableName)
+ sparkSession.catalog.uncacheTable(tableName)
}
/**
@@ -293,7 +294,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def clearCache(): Unit = {
- sparkSession.clearCache()
+ sparkSession.catalog.clearCache()
}
// scalastyle:off
@@ -507,7 +508,7 @@ class SQLContext private[sql](
*/
@Experimental
def createExternalTable(tableName: String, path: String): DataFrame = {
- sparkSession.createExternalTable(tableName, path)
+ sparkSession.catalog.createExternalTable(tableName, path)
}
/**
@@ -523,7 +524,7 @@ class SQLContext private[sql](
tableName: String,
path: String,
source: String): DataFrame = {
- sparkSession.createExternalTable(tableName, path, source)
+ sparkSession.catalog.createExternalTable(tableName, path, source)
}
/**
@@ -539,7 +540,7 @@ class SQLContext private[sql](
tableName: String,
source: String,
options: java.util.Map[String, String]): DataFrame = {
- sparkSession.createExternalTable(tableName, source, options)
+ sparkSession.catalog.createExternalTable(tableName, source, options)
}
/**
@@ -556,7 +557,7 @@ class SQLContext private[sql](
tableName: String,
source: String,
options: Map[String, String]): DataFrame = {
- sparkSession.createExternalTable(tableName, source, options)
+ sparkSession.catalog.createExternalTable(tableName, source, options)
}
/**
@@ -573,7 +574,7 @@ class SQLContext private[sql](
source: String,
schema: StructType,
options: java.util.Map[String, String]): DataFrame = {
- sparkSession.createExternalTable(tableName, source, schema, options)
+ sparkSession.catalog.createExternalTable(tableName, source, schema, options)
}
/**
@@ -591,7 +592,7 @@ class SQLContext private[sql](
source: String,
schema: StructType,
options: Map[String, String]): DataFrame = {
- sparkSession.createExternalTable(tableName, source, schema, options)
+ sparkSession.catalog.createExternalTable(tableName, source, schema, options)
}
/**
@@ -611,7 +612,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def dropTempTable(tableName: String): Unit = {
- sparkSession.dropTempTable(tableName)
+ sparkSession.catalog.dropTempTable(tableName)
}
/**
@@ -700,7 +701,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def tables(): DataFrame = {
- sparkSession.tables()
+ Dataset.ofRows(sparkSession, ShowTablesCommand(None, None))
}
/**
@@ -712,7 +713,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def tables(databaseName: String): DataFrame = {
- sparkSession.tables(databaseName)
+ Dataset.ofRows(sparkSession, ShowTablesCommand(Some(databaseName), None))
}
/**
@@ -730,7 +731,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def tableNames(): Array[String] = {
- sparkSession.tableNames()
+ sparkSession.catalog.listTables().collect().map(_.name)
}
/**
@@ -740,7 +741,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def tableNames(databaseName: String): Array[String] = {
- sparkSession.tableNames(databaseName)
+ sparkSession.catalog.listTables(databaseName).collect().map(_.name)
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index a0f0bd3..6477f42 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -31,16 +31,16 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.config.{CATALOG_IMPLEMENTATION, ConfigEntry}
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalog.Catalog
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range}
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.ShowTablesCommand
-import org.apache.spark.sql.execution.datasources.{CreateTableUsing, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.ui.SQLListener
-import org.apache.spark.sql.internal.{RuntimeConfigImpl, SessionState, SharedState}
+import org.apache.spark.sql.internal.{CatalogImpl, RuntimeConfigImpl, SessionState, SharedState}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.{DataType, LongType, StructType}
import org.apache.spark.sql.util.ExecutionListenerManager
@@ -191,10 +191,6 @@ class SparkSession private(
| Methods for accessing or mutating configurations |
* -------------------------------------------------- */
- @transient private lazy val _conf: RuntimeConfig = {
- new RuntimeConfigImpl(sessionState.conf, sessionState.hadoopConf)
- }
-
/**
* Runtime configuration interface for Spark.
*
@@ -205,7 +201,9 @@ class SparkSession private(
* @group config
* @since 2.0.0
*/
- def conf: RuntimeConfig = _conf
+ @transient lazy val conf: RuntimeConfig = {
+ new RuntimeConfigImpl(sessionState.conf, sessionState.hadoopConf)
+ }
/**
* Set Spark SQL configuration properties.
@@ -274,61 +272,6 @@ class SparkSession private(
}
- /* ------------------------------------- *
- | Methods related to cache management |
- * ------------------------------------- */
-
- /**
- * Returns true if the table is currently cached in-memory.
- *
- * @group cachemgmt
- * @since 2.0.0
- */
- def isCached(tableName: String): Boolean = {
- cacheManager.lookupCachedData(table(tableName)).nonEmpty
- }
-
- /**
- * Caches the specified table in-memory.
- *
- * @group cachemgmt
- * @since 2.0.0
- */
- def cacheTable(tableName: String): Unit = {
- cacheManager.cacheQuery(table(tableName), Some(tableName))
- }
-
- /**
- * Removes the specified table from the in-memory cache.
- *
- * @group cachemgmt
- * @since 2.0.0
- */
- def uncacheTable(tableName: String): Unit = {
- cacheManager.uncacheQuery(table(tableName))
- }
-
- /**
- * Removes all cached tables from the in-memory cache.
- *
- * @group cachemgmt
- * @since 2.0.0
- */
- def clearCache(): Unit = {
- cacheManager.clearCache()
- }
-
- /**
- * Returns true if the [[Dataset]] is currently cached in-memory.
- *
- * @group cachemgmt
- * @since 2.0.0
- */
- protected[sql] def isCached(qName: Dataset[_]): Boolean = {
- cacheManager.lookupCachedData(qName).nonEmpty
- }
-
-
/* --------------------------------- *
| Methods for creating DataFrames |
* --------------------------------- */
@@ -605,139 +548,18 @@ class SparkSession private(
}
- /* --------------------------- *
- | Methods related to tables |
- * --------------------------- */
-
- /**
- * :: Experimental ::
- * Creates an external table from the given path and returns the corresponding DataFrame.
- * It will use the default data source configured by spark.sql.sources.default.
- *
- * @group ddl_ops
- * @since 2.0.0
- */
- @Experimental
- def createExternalTable(tableName: String, path: String): DataFrame = {
- val dataSourceName = sessionState.conf.defaultDataSourceName
- createExternalTable(tableName, path, dataSourceName)
- }
-
- /**
- * :: Experimental ::
- * Creates an external table from the given path based on a data source
- * and returns the corresponding DataFrame.
- *
- * @group ddl_ops
- * @since 2.0.0
- */
- @Experimental
- def createExternalTable(tableName: String, path: String, source: String): DataFrame = {
- createExternalTable(tableName, source, Map("path" -> path))
- }
-
- /**
- * :: Experimental ::
- * Creates an external table from the given path based on a data source and a set of options.
- * Then, returns the corresponding DataFrame.
- *
- * @group ddl_ops
- * @since 2.0.0
- */
- @Experimental
- def createExternalTable(
- tableName: String,
- source: String,
- options: java.util.Map[String, String]): DataFrame = {
- createExternalTable(tableName, source, options.asScala.toMap)
- }
-
- /**
- * :: Experimental ::
- * (Scala-specific)
- * Creates an external table from the given path based on a data source and a set of options.
- * Then, returns the corresponding DataFrame.
- *
- * @group ddl_ops
- * @since 2.0.0
- */
- @Experimental
- def createExternalTable(
- tableName: String,
- source: String,
- options: Map[String, String]): DataFrame = {
- val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
- val cmd =
- CreateTableUsing(
- tableIdent,
- userSpecifiedSchema = None,
- source,
- temporary = false,
- options,
- allowExisting = false,
- managedIfNoPath = false)
- executePlan(cmd).toRdd
- table(tableIdent)
- }
-
- /**
- * :: Experimental ::
- * Create an external table from the given path based on a data source, a schema and
- * a set of options. Then, returns the corresponding DataFrame.
- *
- * @group ddl_ops
- * @since 2.0.0
- */
- @Experimental
- def createExternalTable(
- tableName: String,
- source: String,
- schema: StructType,
- options: java.util.Map[String, String]): DataFrame = {
- createExternalTable(tableName, source, schema, options.asScala.toMap)
- }
+ /* ------------------------ *
+ | Catalog-related methods |
+ * ----------------- ------ */
/**
- * :: Experimental ::
- * (Scala-specific)
- * Create an external table from the given path based on a data source, a schema and
- * a set of options. Then, returns the corresponding DataFrame.
+ * Interface through which the user may create, drop, alter or query underlying
+ * databases, tables, functions etc.
*
* @group ddl_ops
* @since 2.0.0
*/
- @Experimental
- def createExternalTable(
- tableName: String,
- source: String,
- schema: StructType,
- options: Map[String, String]): DataFrame = {
- val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
- val cmd =
- CreateTableUsing(
- tableIdent,
- userSpecifiedSchema = Some(schema),
- source,
- temporary = false,
- options,
- allowExisting = false,
- managedIfNoPath = false)
- executePlan(cmd).toRdd
- table(tableIdent)
- }
-
- /**
- * Drops the temporary table with the given table name in the catalog.
- * If the table has been cached/persisted before, it's also unpersisted.
- *
- * @param tableName the name of the table to be unregistered.
- * @group ddl_ops
- * @since 2.0.0
- */
- def dropTempTable(tableName: String): Unit = {
- cacheManager.tryUncacheQuery(table(tableName))
- sessionState.catalog.dropTable(TableIdentifier(tableName), ignoreIfNotExists = true)
- }
+ @transient lazy val catalog: Catalog = new CatalogImpl(self)
/**
* Returns the specified table as a [[DataFrame]].
@@ -749,55 +571,11 @@ class SparkSession private(
table(sessionState.sqlParser.parseTableIdentifier(tableName))
}
- private def table(tableIdent: TableIdentifier): DataFrame = {
+ protected[sql] def table(tableIdent: TableIdentifier): DataFrame = {
Dataset.ofRows(self, sessionState.catalog.lookupRelation(tableIdent))
}
/**
- * Returns a [[DataFrame]] containing names of existing tables in the current database.
- * The returned DataFrame has two columns, tableName and isTemporary (a Boolean
- * indicating if a table is a temporary one or not).
- *
- * @group ddl_ops
- * @since 2.0.0
- */
- def tables(): DataFrame = {
- Dataset.ofRows(self, ShowTablesCommand(None, None))
- }
-
- /**
- * Returns a [[DataFrame]] containing names of existing tables in the given database.
- * The returned DataFrame has two columns, tableName and isTemporary (a Boolean
- * indicating if a table is a temporary one or not).
- *
- * @group ddl_ops
- * @since 2.0.0
- */
- def tables(databaseName: String): DataFrame = {
- Dataset.ofRows(self, ShowTablesCommand(Some(databaseName), None))
- }
-
- /**
- * Returns the names of tables in the current database as an array.
- *
- * @group ddl_ops
- * @since 2.0.0
- */
- def tableNames(): Array[String] = {
- tableNames(sessionState.catalog.getCurrentDatabase)
- }
-
- /**
- * Returns the names of tables in the given database as an array.
- *
- * @group ddl_ops
- * @since 2.0.0
- */
- def tableNames(databaseName: String): Array[String] = {
- sessionState.catalog.listTables(databaseName).map(_.table).toArray
- }
-
- /**
* Registers the given [[DataFrame]] as a temporary table in the catalog.
* Temporary tables exist only during the lifetime of this instance of [[SparkSession]].
*/
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
new file mode 100644
index 0000000..868cc3a
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset}
+import org.apache.spark.sql.types.StructType
+
+
+/**
+ * Catalog interface for Spark. To access this, use `SparkSession.catalog`.
+ */
+abstract class Catalog {
+
+ /**
+ * Returns the current default database in this session.
+ *
+ * @since 2.0.0
+ */
+ def currentDatabase: String
+
+ /**
+ * Sets the current default database in this session.
+ *
+ * @since 2.0.0
+ */
+ def setCurrentDatabase(dbName: String): Unit
+
+ /**
+ * Returns a list of databases available across all sessions.
+ *
+ * @since 2.0.0
+ */
+ def listDatabases(): Dataset[Database]
+
+ /**
+ * Returns a list of tables in the current database.
+ * This includes all temporary tables.
+ *
+ * @since 2.0.0
+ */
+ def listTables(): Dataset[Table]
+
+ /**
+ * Returns a list of tables in the specified database.
+ * This includes all temporary tables.
+ *
+ * @since 2.0.0
+ */
+ @throws[AnalysisException]("database does not exist")
+ def listTables(dbName: String): Dataset[Table]
+
+ /**
+ * Returns a list of functions registered in the current database.
+ * This includes all temporary functions
+ *
+ * @since 2.0.0
+ */
+ def listFunctions(): Dataset[Function]
+
+ /**
+ * Returns a list of functions registered in the specified database.
+ * This includes all temporary functions
+ *
+ * @since 2.0.0
+ */
+ @throws[AnalysisException]("database does not exist")
+ def listFunctions(dbName: String): Dataset[Function]
+
+ /**
+ * Returns a list of columns for the given table in the current database.
+ *
+ * @since 2.0.0
+ */
+ @throws[AnalysisException]("table does not exist")
+ def listColumns(tableName: String): Dataset[Column]
+
+ /**
+ * Returns a list of columns for the given table in the specified database.
+ *
+ * @since 2.0.0
+ */
+ @throws[AnalysisException]("database or table does not exist")
+ def listColumns(dbName: String, tableName: String): Dataset[Column]
+
+ /**
+ * :: Experimental ::
+ * Creates an external table from the given path and returns the corresponding DataFrame.
+ * It will use the default data source configured by spark.sql.sources.default.
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def createExternalTable(tableName: String, path: String): DataFrame
+
+ /**
+ * :: Experimental ::
+ * Creates an external table from the given path based on a data source
+ * and returns the corresponding DataFrame.
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def createExternalTable(tableName: String, path: String, source: String): DataFrame
+
+ /**
+ * :: Experimental ::
+ * Creates an external table from the given path based on a data source and a set of options.
+ * Then, returns the corresponding DataFrame.
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def createExternalTable(
+ tableName: String,
+ source: String,
+ options: java.util.Map[String, String]): DataFrame
+
+ /**
+ * :: Experimental ::
+ * (Scala-specific)
+ * Creates an external table from the given path based on a data source and a set of options.
+ * Then, returns the corresponding DataFrame.
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def createExternalTable(
+ tableName: String,
+ source: String,
+ options: Map[String, String]): DataFrame
+
+ /**
+ * :: Experimental ::
+ * Create an external table from the given path based on a data source, a schema and
+ * a set of options. Then, returns the corresponding DataFrame.
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def createExternalTable(
+ tableName: String,
+ source: String,
+ schema: StructType,
+ options: java.util.Map[String, String]): DataFrame
+
+ /**
+ * :: Experimental ::
+ * (Scala-specific)
+ * Create an external table from the given path based on a data source, a schema and
+ * a set of options. Then, returns the corresponding DataFrame.
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def createExternalTable(
+ tableName: String,
+ source: String,
+ schema: StructType,
+ options: Map[String, String]): DataFrame
+
+ /**
+ * Drops the temporary table with the given table name in the catalog.
+ * If the table has been cached/persisted before, it's also unpersisted.
+ *
+ * @param tableName the name of the table to be unregistered.
+ * @since 2.0.0
+ */
+ def dropTempTable(tableName: String): Unit
+
+ /**
+ * Returns true if the table is currently cached in-memory.
+ *
+ * @since 2.0.0
+ */
+ def isCached(tableName: String): Boolean
+
+ /**
+ * Caches the specified table in-memory.
+ *
+ * @since 2.0.0
+ */
+ def cacheTable(tableName: String): Unit
+
+ /**
+ * Removes the specified table from the in-memory cache.
+ *
+ * @since 2.0.0
+ */
+ def uncacheTable(tableName: String): Unit
+
+ /**
+ * Removes all cached tables from the in-memory cache.
+ *
+ * @since 2.0.0
+ */
+ def clearCache(): Unit
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala
new file mode 100644
index 0000000..d5de6cd
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog
+
+import javax.annotation.Nullable
+
+import org.apache.spark.sql.catalyst.DefinedByConstructorParams
+
+
+// Note: all classes here are expected to be wrapped in Datasets and so must extend
+// DefinedByConstructorParams for the catalog to be able to create encoders for them.
+
+class Database(
+ val name: String,
+ @Nullable val description: String,
+ val locationUri: String)
+ extends DefinedByConstructorParams {
+
+ override def toString: String = {
+ "Database[" +
+ s"name='$name', " +
+ Option(description).map { d => s"description='$d', " }.getOrElse("") +
+ s"path='$locationUri']"
+ }
+
+}
+
+
+class Table(
+ val name: String,
+ @Nullable val database: String,
+ @Nullable val description: String,
+ val tableType: String,
+ val isTemporary: Boolean)
+ extends DefinedByConstructorParams {
+
+ override def toString: String = {
+ "Table[" +
+ s"name='$name', " +
+ Option(database).map { d => s"database='$d', " }.getOrElse("") +
+ Option(description).map { d => s"description='$d', " }.getOrElse("") +
+ s"tableType='$tableType', " +
+ s"isTemporary='$isTemporary']"
+ }
+
+}
+
+
+class Column(
+ val name: String,
+ @Nullable val description: String,
+ val dataType: String,
+ val nullable: Boolean,
+ val isPartition: Boolean,
+ val isBucket: Boolean)
+ extends DefinedByConstructorParams {
+
+ override def toString: String = {
+ "Column[" +
+ s"name='$name', " +
+ Option(description).map { d => s"description='$d', " }.getOrElse("") +
+ s"dataType='$dataType', " +
+ s"nullable='$nullable', " +
+ s"isPartition='$isPartition', " +
+ s"isBucket='$isBucket']"
+ }
+
+}
+
+
+class Function(
+ val name: String,
+ @Nullable val description: String,
+ val className: String,
+ val isTemporary: Boolean)
+ extends DefinedByConstructorParams {
+
+ override def toString: String = {
+ "Function[" +
+ s"name='$name', " +
+ Option(description).map { d => s"description='$d', " }.getOrElse("") +
+ s"className='$className', " +
+ s"isTemporary='$isTemporary']"
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index ebc60ed..e04e130 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -835,9 +835,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
throw new ParseException("Operation not allowed: CREATE TABLE ... CLUSTERED BY ...", ctx)
}
val tableType = if (external) {
- CatalogTableType.EXTERNAL_TABLE
+ CatalogTableType.EXTERNAL
} else {
- CatalogTableType.MANAGED_TABLE
+ CatalogTableType.MANAGED
}
val comment = Option(ctx.STRING).map(string)
val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns)
@@ -1083,7 +1083,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
val sql = Option(source(query))
val tableDesc = CatalogTable(
identifier = visitTableIdentifier(name),
- tableType = CatalogTableType.VIRTUAL_VIEW,
+ tableType = CatalogTableType.VIEW,
schema = schema,
storage = EmptyStorageFormat,
properties = properties,
http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
index c283bd6..ec3fada 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
@@ -32,7 +32,7 @@ case class CacheTableCommand(
plan.foreach { logicalPlan =>
sparkSession.registerDataFrameAsTable(Dataset.ofRows(sparkSession, logicalPlan), tableName)
}
- sparkSession.cacheTable(tableName)
+ sparkSession.catalog.cacheTable(tableName)
if (!isLazy) {
// Performs eager caching
@@ -62,7 +62,7 @@ case class UncacheTableCommand(tableName: String) extends RunnableCommand {
case object ClearCacheCommand extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
- sparkSession.clearCache()
+ sparkSession.catalog.clearCache()
Seq.empty[Row]
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org