You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2016/03/29 01:25:22 UTC
spark git commit: [SPARK-13923][SPARK-14014][SQL] Session catalog
follow-ups
Repository: spark
Updated Branches:
refs/heads/master 34c0638ee -> eebc8c1c9
[SPARK-13923][SPARK-14014][SQL] Session catalog follow-ups
## What changes were proposed in this pull request?
This patch addresses the remaining comments left in #11750 and #11918 after they are merged. For a full list of changes in this patch, just trace the commits.
## How was this patch tested?
`SessionCatalogSuite` and `CatalogTestCases`
Author: Andrew Or <an...@databricks.com>
Closes #12006 from andrewor14/session-catalog-followup.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eebc8c1c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eebc8c1c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eebc8c1c
Branch: refs/heads/master
Commit: eebc8c1c95fb7752d09a5846b7cac65f7702c8f2
Parents: 34c0638
Author: Andrew Or <an...@databricks.com>
Authored: Mon Mar 28 16:25:15 2016 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Mon Mar 28 16:25:15 2016 -0700
----------------------------------------------------------------------
.../sql/catalyst/catalog/InMemoryCatalog.scala | 18 +-
.../sql/catalyst/catalog/SessionCatalog.scala | 74 ++---
.../spark/sql/catalyst/catalog/interface.scala | 14 +-
.../sql/catalyst/analysis/AnalysisTest.scala | 2 +-
.../analysis/DecimalPrecisionSuite.scala | 2 +-
.../sql/catalyst/catalog/CatalogTestCases.scala | 6 +-
.../catalyst/catalog/SessionCatalogSuite.scala | 30 +-
.../scala/org/apache/spark/sql/SQLContext.scala | 2 +-
.../spark/sql/execution/datasources/ddl.scala | 4 +-
.../org/apache/spark/sql/hive/HiveCatalog.scala | 297 ------------------
.../org/apache/spark/sql/hive/HiveContext.scala | 4 +-
.../spark/sql/hive/HiveExternalCatalog.scala | 298 +++++++++++++++++++
.../spark/sql/hive/HiveMetastoreCatalog.scala | 22 +-
.../org/apache/spark/sql/hive/HiveQl.scala | 6 +-
.../spark/sql/hive/HiveSessionCatalog.scala | 6 +-
.../spark/sql/hive/client/HiveClient.scala | 2 +-
.../spark/sql/hive/client/HiveClientImpl.scala | 11 +-
.../hive/execution/CreateTableAsSelect.scala | 6 +-
.../sql/hive/execution/CreateViewAsSelect.scala | 4 +-
.../apache/spark/sql/hive/test/TestHive.scala | 4 +-
.../spark/sql/hive/HiveCatalogSuite.scala | 49 ---
.../sql/hive/HiveExternalCatalogSuite.scala | 49 +++
.../org/apache/spark/sql/hive/HiveQlSuite.scala | 16 +-
.../apache/spark/sql/hive/ListTablesSuite.scala | 2 +-
.../sql/hive/MetastoreDataSourcesSuite.scala | 2 +-
.../spark/sql/hive/client/VersionsSuite.scala | 2 +-
26 files changed, 469 insertions(+), 463 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/eebc8c1c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index e216fa5..2bbb970 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -155,7 +155,7 @@ class InMemoryCatalog extends ExternalCatalog {
tableDefinition: CatalogTable,
ignoreIfExists: Boolean): Unit = synchronized {
requireDbExists(db)
- val table = tableDefinition.name.table
+ val table = tableDefinition.identifier.table
if (tableExists(db, table)) {
if (!ignoreIfExists) {
throw new AnalysisException(s"Table '$table' already exists in database '$db'")
@@ -182,14 +182,14 @@ class InMemoryCatalog extends ExternalCatalog {
override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
requireTableExists(db, oldName)
val oldDesc = catalog(db).tables(oldName)
- oldDesc.table = oldDesc.table.copy(name = TableIdentifier(newName, Some(db)))
+ oldDesc.table = oldDesc.table.copy(identifier = TableIdentifier(newName, Some(db)))
catalog(db).tables.put(newName, oldDesc)
catalog(db).tables.remove(oldName)
}
override def alterTable(db: String, tableDefinition: CatalogTable): Unit = synchronized {
- requireTableExists(db, tableDefinition.name.table)
- catalog(db).tables(tableDefinition.name.table).table = tableDefinition
+ requireTableExists(db, tableDefinition.identifier.table)
+ catalog(db).tables(tableDefinition.identifier.table).table = tableDefinition
}
override def getTable(db: String, table: String): CatalogTable = synchronized {
@@ -296,10 +296,10 @@ class InMemoryCatalog extends ExternalCatalog {
override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
requireDbExists(db)
- if (functionExists(db, func.name.funcName)) {
+ if (functionExists(db, func.identifier.funcName)) {
throw new AnalysisException(s"Function '$func' already exists in '$db' database")
} else {
- catalog(db).functions.put(func.name.funcName, func)
+ catalog(db).functions.put(func.identifier.funcName, func)
}
}
@@ -310,14 +310,14 @@ class InMemoryCatalog extends ExternalCatalog {
override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized {
requireFunctionExists(db, oldName)
- val newFunc = getFunction(db, oldName).copy(name = FunctionIdentifier(newName, Some(db)))
+ val newFunc = getFunction(db, oldName).copy(identifier = FunctionIdentifier(newName, Some(db)))
catalog(db).functions.remove(oldName)
catalog(db).functions.put(newName, newFunc)
}
override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = synchronized {
- requireFunctionExists(db, funcDefinition.name.funcName)
- catalog(db).functions.put(funcDefinition.name.funcName, funcDefinition)
+ requireFunctionExists(db, funcDefinition.identifier.funcName)
+ catalog(db).functions.put(funcDefinition.identifier.funcName, funcDefinition)
}
override def getFunction(db: String, funcName: String): CatalogFunction = synchronized {
http://git-wip-us.apache.org/repos/asf/spark/blob/eebc8c1c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 34265fa..a9cf807 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -17,9 +17,7 @@
package org.apache.spark.sql.catalyst.catalog
-import java.util.concurrent.ConcurrentHashMap
-
-import scala.collection.JavaConverters._
+import scala.collection.mutable
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
@@ -31,6 +29,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
* An internal catalog that is used by a Spark Session. This internal catalog serves as a
* proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary
* tables and functions of the Spark Session that it belongs to.
+ *
+ * This class is not thread-safe.
*/
class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
import ExternalCatalog._
@@ -39,8 +39,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
this(externalCatalog, new SimpleCatalystConf(true))
}
- protected[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
- protected[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction]
+ protected[this] val tempTables = new mutable.HashMap[String, LogicalPlan]
+ protected[this] val tempFunctions = new mutable.HashMap[String, CatalogFunction]
// Note: we track current database here because certain operations do not explicitly
// specify the database (e.g. DROP TABLE my_table). In these cases we must first
@@ -122,9 +122,9 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
* If no such database is specified, create it in the current database.
*/
def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
- val db = tableDefinition.name.database.getOrElse(currentDb)
- val table = formatTableName(tableDefinition.name.table)
- val newTableDefinition = tableDefinition.copy(name = TableIdentifier(table, Some(db)))
+ val db = tableDefinition.identifier.database.getOrElse(currentDb)
+ val table = formatTableName(tableDefinition.identifier.table)
+ val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
externalCatalog.createTable(db, newTableDefinition, ignoreIfExists)
}
@@ -138,9 +138,9 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
* this becomes a no-op.
*/
def alterTable(tableDefinition: CatalogTable): Unit = {
- val db = tableDefinition.name.database.getOrElse(currentDb)
- val table = formatTableName(tableDefinition.name.table)
- val newTableDefinition = tableDefinition.copy(name = TableIdentifier(table, Some(db)))
+ val db = tableDefinition.identifier.database.getOrElse(currentDb)
+ val table = formatTableName(tableDefinition.identifier.table)
+ val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
externalCatalog.alterTable(db, newTableDefinition)
}
@@ -164,9 +164,9 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
def createTempTable(
name: String,
tableDefinition: LogicalPlan,
- ignoreIfExists: Boolean): Unit = {
+ overrideIfExists: Boolean): Unit = {
val table = formatTableName(name)
- if (tempTables.containsKey(table) && !ignoreIfExists) {
+ if (tempTables.contains(table) && !overrideIfExists) {
throw new AnalysisException(s"Temporary table '$name' already exists.")
}
tempTables.put(table, tableDefinition)
@@ -188,10 +188,11 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
val db = oldName.database.getOrElse(currentDb)
val oldTableName = formatTableName(oldName.table)
val newTableName = formatTableName(newName.table)
- if (oldName.database.isDefined || !tempTables.containsKey(oldTableName)) {
+ if (oldName.database.isDefined || !tempTables.contains(oldTableName)) {
externalCatalog.renameTable(db, oldTableName, newTableName)
} else {
- val table = tempTables.remove(oldTableName)
+ val table = tempTables(oldTableName)
+ tempTables.remove(oldTableName)
tempTables.put(newTableName, table)
}
}
@@ -206,7 +207,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean): Unit = {
val db = name.database.getOrElse(currentDb)
val table = formatTableName(name.table)
- if (name.database.isDefined || !tempTables.containsKey(table)) {
+ if (name.database.isDefined || !tempTables.contains(table)) {
externalCatalog.dropTable(db, table, ignoreIfNotExists)
} else {
tempTables.remove(table)
@@ -224,11 +225,11 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
val db = name.database.getOrElse(currentDb)
val table = formatTableName(name.table)
val relation =
- if (name.database.isDefined || !tempTables.containsKey(table)) {
+ if (name.database.isDefined || !tempTables.contains(table)) {
val metadata = externalCatalog.getTable(db, table)
CatalogRelation(db, metadata, alias)
} else {
- tempTables.get(table)
+ tempTables(table)
}
val qualifiedTable = SubqueryAlias(table, relation)
// If an alias was specified by the lookup, wrap the plan in a subquery so that
@@ -247,7 +248,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
def tableExists(name: TableIdentifier): Boolean = {
val db = name.database.getOrElse(currentDb)
val table = formatTableName(name.table)
- if (name.database.isDefined || !tempTables.containsKey(table)) {
+ if (name.database.isDefined || !tempTables.contains(table)) {
externalCatalog.tableExists(db, table)
} else {
true // it's a temporary table
@@ -266,7 +267,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
val dbTables =
externalCatalog.listTables(db, pattern).map { t => TableIdentifier(t, Some(db)) }
val regex = pattern.replaceAll("\\*", ".*").r
- val _tempTables = tempTables.keys().asScala
+ val _tempTables = tempTables.keys.toSeq
.filter { t => regex.pattern.matcher(t).matches() }
.map { t => TableIdentifier(t) }
dbTables ++ _tempTables
@@ -290,7 +291,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
* For testing only.
*/
private[catalog] def getTempTable(name: String): Option[LogicalPlan] = {
- Option(tempTables.get(name))
+ tempTables.get(name)
}
// ----------------------------------------------------------------------------
@@ -399,9 +400,9 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
* If no such database is specified, create it in the current database.
*/
def createFunction(funcDefinition: CatalogFunction): Unit = {
- val db = funcDefinition.name.database.getOrElse(currentDb)
+ val db = funcDefinition.identifier.database.getOrElse(currentDb)
val newFuncDefinition = funcDefinition.copy(
- name = FunctionIdentifier(funcDefinition.name.funcName, Some(db)))
+ identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db)))
externalCatalog.createFunction(db, newFuncDefinition)
}
@@ -424,9 +425,9 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
* this becomes a no-op.
*/
def alterFunction(funcDefinition: CatalogFunction): Unit = {
- val db = funcDefinition.name.database.getOrElse(currentDb)
+ val db = funcDefinition.identifier.database.getOrElse(currentDb)
val newFuncDefinition = funcDefinition.copy(
- name = FunctionIdentifier(funcDefinition.name.funcName, Some(db)))
+ identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db)))
externalCatalog.alterFunction(db, newFuncDefinition)
}
@@ -439,10 +440,10 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
* This assumes no database is specified in `funcDefinition`.
*/
def createTempFunction(funcDefinition: CatalogFunction, ignoreIfExists: Boolean): Unit = {
- require(funcDefinition.name.database.isEmpty,
+ require(funcDefinition.identifier.database.isEmpty,
"attempted to create a temporary function while specifying a database")
- val name = funcDefinition.name.funcName
- if (tempFunctions.containsKey(name) && !ignoreIfExists) {
+ val name = funcDefinition.identifier.funcName
+ if (tempFunctions.contains(name) && !ignoreIfExists) {
throw new AnalysisException(s"Temporary function '$name' already exists.")
}
tempFunctions.put(name, funcDefinition)
@@ -455,7 +456,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
// Hive has DROP FUNCTION and DROP TEMPORARY FUNCTION. We may want to consolidate
// dropFunction and dropTempFunction.
def dropTempFunction(name: String, ignoreIfNotExists: Boolean): Unit = {
- if (!tempFunctions.containsKey(name) && !ignoreIfNotExists) {
+ if (!tempFunctions.contains(name) && !ignoreIfNotExists) {
throw new AnalysisException(
s"Temporary function '$name' cannot be dropped because it does not exist!")
}
@@ -476,11 +477,12 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
throw new AnalysisException("rename does not support moving functions across databases")
}
val db = oldName.database.getOrElse(currentDb)
- if (oldName.database.isDefined || !tempFunctions.containsKey(oldName.funcName)) {
+ if (oldName.database.isDefined || !tempFunctions.contains(oldName.funcName)) {
externalCatalog.renameFunction(db, oldName.funcName, newName.funcName)
} else {
- val func = tempFunctions.remove(oldName.funcName)
- val newFunc = func.copy(name = func.name.copy(funcName = newName.funcName))
+ val func = tempFunctions(oldName.funcName)
+ val newFunc = func.copy(identifier = func.identifier.copy(funcName = newName.funcName))
+ tempFunctions.remove(oldName.funcName)
tempFunctions.put(newName.funcName, newFunc)
}
}
@@ -494,10 +496,10 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
*/
def getFunction(name: FunctionIdentifier): CatalogFunction = {
val db = name.database.getOrElse(currentDb)
- if (name.database.isDefined || !tempFunctions.containsKey(name.funcName)) {
+ if (name.database.isDefined || !tempFunctions.contains(name.funcName)) {
externalCatalog.getFunction(db, name.funcName)
} else {
- tempFunctions.get(name.funcName)
+ tempFunctions(name.funcName)
}
}
@@ -510,7 +512,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
val dbFunctions =
externalCatalog.listFunctions(db, pattern).map { f => FunctionIdentifier(f, Some(db)) }
val regex = pattern.replaceAll("\\*", ".*").r
- val _tempFunctions = tempFunctions.keys().asScala
+ val _tempFunctions = tempFunctions.keys.toSeq
.filter { f => regex.pattern.matcher(f).matches() }
.map { f => FunctionIdentifier(f) }
dbFunctions ++ _tempFunctions
@@ -520,7 +522,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
* Return a temporary function. For testing only.
*/
private[catalog] def getTempFunction(name: String): Option[CatalogFunction] = {
- Option(tempFunctions.get(name))
+ tempFunctions.get(name)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/eebc8c1c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 3480313..8bb8e09 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -169,10 +169,10 @@ abstract class ExternalCatalog {
/**
* A function defined in the catalog.
*
- * @param name name of the function
+ * @param identifier name of the function
* @param className fully qualified class name, e.g. "org.apache.spark.util.MyFunc"
*/
-case class CatalogFunction(name: FunctionIdentifier, className: String)
+case class CatalogFunction(identifier: FunctionIdentifier, className: String)
/**
@@ -216,7 +216,7 @@ case class CatalogTablePartition(
* future once we have a better understanding of how we want to handle skewed columns.
*/
case class CatalogTable(
- name: TableIdentifier,
+ identifier: TableIdentifier,
tableType: CatalogTableType,
storage: CatalogStorageFormat,
schema: Seq[CatalogColumn],
@@ -230,12 +230,12 @@ case class CatalogTable(
viewText: Option[String] = None) {
/** Return the database this table was specified to belong to, assuming it exists. */
- def database: String = name.database.getOrElse {
- throw new AnalysisException(s"table $name did not specify database")
+ def database: String = identifier.database.getOrElse {
+ throw new AnalysisException(s"table $identifier did not specify database")
}
/** Return the fully qualified name of this table, assuming the database was specified. */
- def qualifiedName: String = name.unquotedString
+ def qualifiedName: String = identifier.unquotedString
/** Syntactic sugar to update a field in `storage`. */
def withNewStorage(
@@ -290,6 +290,6 @@ case class CatalogRelation(
// TODO: implement this
override def output: Seq[Attribute] = Seq.empty
- require(metadata.name.database == Some(db),
+ require(metadata.identifier.database == Some(db),
"provided database does not much the one specified in the table definition")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/eebc8c1c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
index 6fa4bee..34cb976 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
@@ -31,7 +31,7 @@ trait AnalysisTest extends PlanTest {
private def makeAnalyzer(caseSensitive: Boolean): Analyzer = {
val conf = new SimpleCatalystConf(caseSensitive)
val catalog = new SessionCatalog(new InMemoryCatalog, conf)
- catalog.createTempTable("TaBlE", TestRelations.testRelation, ignoreIfExists = true)
+ catalog.createTempTable("TaBlE", TestRelations.testRelation, overrideIfExists = true)
new Analyzer(catalog, EmptyFunctionRegistry, conf) {
override val extendedResolutionRules = EliminateSubqueryAliases :: Nil
}
http://git-wip-us.apache.org/repos/asf/spark/blob/eebc8c1c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
index 3150186..6c08ccc 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
@@ -52,7 +52,7 @@ class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter {
private val b: Expression = UnresolvedAttribute("b")
before {
- catalog.createTempTable("table", relation, ignoreIfExists = true)
+ catalog.createTempTable("table", relation, overrideIfExists = true)
}
private def checkType(expression: Expression, expectedType: DataType): Unit = {
http://git-wip-us.apache.org/repos/asf/spark/blob/eebc8c1c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
index 277c2d7..959bd56 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
@@ -210,7 +210,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
}
test("get table") {
- assert(newBasicCatalog().getTable("db2", "tbl1").name.table == "tbl1")
+ assert(newBasicCatalog().getTable("db2", "tbl1").identifier.table == "tbl1")
}
test("get table when database/table does not exist") {
@@ -452,7 +452,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
assert(catalog.getFunction("db2", "func1").className == funcClass)
catalog.renameFunction("db2", "func1", newName)
intercept[AnalysisException] { catalog.getFunction("db2", "func1") }
- assert(catalog.getFunction("db2", newName).name.funcName == newName)
+ assert(catalog.getFunction("db2", newName).identifier.funcName == newName)
assert(catalog.getFunction("db2", newName).className == funcClass)
intercept[AnalysisException] { catalog.renameFunction("db2", "does_not_exist", "me") }
}
@@ -549,7 +549,7 @@ abstract class CatalogTestUtils {
def newTable(name: String, database: Option[String] = None): CatalogTable = {
CatalogTable(
- name = TableIdentifier(name, database),
+ identifier = TableIdentifier(name, database),
tableType = CatalogTableType.EXTERNAL_TABLE,
storage = storageFormat,
schema = Seq(CatalogColumn("col1", "int"), CatalogColumn("col2", "string")),
http://git-wip-us.apache.org/repos/asf/spark/blob/eebc8c1c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 74e995c..2948c5f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -197,17 +197,17 @@ class SessionCatalogSuite extends SparkFunSuite {
val catalog = new SessionCatalog(newBasicCatalog())
val tempTable1 = Range(1, 10, 1, 10, Seq())
val tempTable2 = Range(1, 20, 2, 10, Seq())
- catalog.createTempTable("tbl1", tempTable1, ignoreIfExists = false)
- catalog.createTempTable("tbl2", tempTable2, ignoreIfExists = false)
+ catalog.createTempTable("tbl1", tempTable1, overrideIfExists = false)
+ catalog.createTempTable("tbl2", tempTable2, overrideIfExists = false)
assert(catalog.getTempTable("tbl1") == Some(tempTable1))
assert(catalog.getTempTable("tbl2") == Some(tempTable2))
assert(catalog.getTempTable("tbl3") == None)
// Temporary table already exists
intercept[AnalysisException] {
- catalog.createTempTable("tbl1", tempTable1, ignoreIfExists = false)
+ catalog.createTempTable("tbl1", tempTable1, overrideIfExists = false)
}
// Temporary table already exists but we override it
- catalog.createTempTable("tbl1", tempTable2, ignoreIfExists = true)
+ catalog.createTempTable("tbl1", tempTable2, overrideIfExists = true)
assert(catalog.getTempTable("tbl1") == Some(tempTable2))
}
@@ -243,7 +243,7 @@ class SessionCatalogSuite extends SparkFunSuite {
val externalCatalog = newBasicCatalog()
val sessionCatalog = new SessionCatalog(externalCatalog)
val tempTable = Range(1, 10, 2, 10, Seq())
- sessionCatalog.createTempTable("tbl1", tempTable, ignoreIfExists = false)
+ sessionCatalog.createTempTable("tbl1", tempTable, overrideIfExists = false)
sessionCatalog.setCurrentDatabase("db2")
assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable))
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
@@ -255,7 +255,7 @@ class SessionCatalogSuite extends SparkFunSuite {
sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false)
assert(externalCatalog.listTables("db2").toSet == Set("tbl2"))
// If database is specified, temp tables are never dropped
- sessionCatalog.createTempTable("tbl1", tempTable, ignoreIfExists = false)
+ sessionCatalog.createTempTable("tbl1", tempTable, overrideIfExists = false)
sessionCatalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false)
sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false)
assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable))
@@ -299,7 +299,7 @@ class SessionCatalogSuite extends SparkFunSuite {
val externalCatalog = newBasicCatalog()
val sessionCatalog = new SessionCatalog(externalCatalog)
val tempTable = Range(1, 10, 2, 10, Seq())
- sessionCatalog.createTempTable("tbl1", tempTable, ignoreIfExists = false)
+ sessionCatalog.createTempTable("tbl1", tempTable, overrideIfExists = false)
sessionCatalog.setCurrentDatabase("db2")
assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable))
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
@@ -327,7 +327,7 @@ class SessionCatalogSuite extends SparkFunSuite {
assert(newTbl1.properties.get("toh") == Some("frem"))
// Alter table without explicitly specifying database
sessionCatalog.setCurrentDatabase("db2")
- sessionCatalog.alterTable(tbl1.copy(name = TableIdentifier("tbl1")))
+ sessionCatalog.alterTable(tbl1.copy(identifier = TableIdentifier("tbl1")))
val newestTbl1 = externalCatalog.getTable("db2", "tbl1")
assert(newestTbl1 == tbl1)
}
@@ -368,7 +368,7 @@ class SessionCatalogSuite extends SparkFunSuite {
val sessionCatalog = new SessionCatalog(externalCatalog)
val tempTable1 = Range(1, 10, 1, 10, Seq())
val metastoreTable1 = externalCatalog.getTable("db2", "tbl1")
- sessionCatalog.createTempTable("tbl1", tempTable1, ignoreIfExists = false)
+ sessionCatalog.createTempTable("tbl1", tempTable1, overrideIfExists = false)
sessionCatalog.setCurrentDatabase("db2")
// If we explicitly specify the database, we'll look up the relation in that database
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2")))
@@ -406,7 +406,7 @@ class SessionCatalogSuite extends SparkFunSuite {
assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1"))))
// If database is explicitly specified, do not check temporary tables
val tempTable = Range(1, 10, 1, 10, Seq())
- catalog.createTempTable("tbl3", tempTable, ignoreIfExists = false)
+ catalog.createTempTable("tbl3", tempTable, overrideIfExists = false)
assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2"))))
// If database is not explicitly specified, check the current database
catalog.setCurrentDatabase("db2")
@@ -418,8 +418,8 @@ class SessionCatalogSuite extends SparkFunSuite {
test("list tables without pattern") {
val catalog = new SessionCatalog(newBasicCatalog())
val tempTable = Range(1, 10, 2, 10, Seq())
- catalog.createTempTable("tbl1", tempTable, ignoreIfExists = false)
- catalog.createTempTable("tbl4", tempTable, ignoreIfExists = false)
+ catalog.createTempTable("tbl1", tempTable, overrideIfExists = false)
+ catalog.createTempTable("tbl4", tempTable, overrideIfExists = false)
assert(catalog.listTables("db1").toSet ==
Set(TableIdentifier("tbl1"), TableIdentifier("tbl4")))
assert(catalog.listTables("db2").toSet ==
@@ -435,8 +435,8 @@ class SessionCatalogSuite extends SparkFunSuite {
test("list tables with pattern") {
val catalog = new SessionCatalog(newBasicCatalog())
val tempTable = Range(1, 10, 2, 10, Seq())
- catalog.createTempTable("tbl1", tempTable, ignoreIfExists = false)
- catalog.createTempTable("tbl4", tempTable, ignoreIfExists = false)
+ catalog.createTempTable("tbl1", tempTable, overrideIfExists = false)
+ catalog.createTempTable("tbl4", tempTable, overrideIfExists = false)
assert(catalog.listTables("db1", "*").toSet == catalog.listTables("db1").toSet)
assert(catalog.listTables("db2", "*").toSet == catalog.listTables("db2").toSet)
assert(catalog.listTables("db2", "tbl*").toSet ==
@@ -826,7 +826,7 @@ class SessionCatalogSuite extends SparkFunSuite {
sessionCatalog.createFunction(newFunc("func1", Some("db2")))
sessionCatalog.renameFunction(FunctionIdentifier("func1"), FunctionIdentifier("func4"))
assert(sessionCatalog.getTempFunction("func4") ==
- Some(tempFunc.copy(name = FunctionIdentifier("func4"))))
+ Some(tempFunc.copy(identifier = FunctionIdentifier("func4"))))
assert(sessionCatalog.getTempFunction("func1") == None)
assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1", "func3"))
// Then, if no such temporary function exist, rename the function in the current database
http://git-wip-us.apache.org/repos/asf/spark/blob/eebc8c1c/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index e413e77..c946009 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -671,7 +671,7 @@ class SQLContext private[sql](
sessionState.catalog.createTempTable(
sessionState.sqlParser.parseTableIdentifier(tableName).table,
df.logicalPlan,
- ignoreIfExists = true)
+ overrideIfExists = true)
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/eebc8c1c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 24923bb..877e159 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -107,7 +107,7 @@ case class CreateTempTableUsing(
sqlContext.sessionState.catalog.createTempTable(
tableIdent.table,
Dataset.ofRows(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan,
- ignoreIfExists = true)
+ overrideIfExists = true)
Seq.empty[Row]
}
@@ -138,7 +138,7 @@ case class CreateTempTableUsingAsSelect(
sqlContext.sessionState.catalog.createTempTable(
tableIdent.table,
Dataset.ofRows(sqlContext, LogicalRelation(result)).logicalPlan,
- ignoreIfExists = true)
+ overrideIfExists = true)
Seq.empty[Row]
}
http://git-wip-us.apache.org/repos/asf/spark/blob/eebc8c1c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
deleted file mode 100644
index 0722fb0..0000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import scala.util.control.NonFatal
-
-import org.apache.hadoop.hive.ql.metadata.HiveException
-import org.apache.thrift.TException
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.NoSuchItemException
-import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.hive.client.HiveClient
-
-
-/**
- * A persistent implementation of the system catalog using Hive.
- * All public methods must be synchronized for thread-safety.
- */
-private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog with Logging {
- import ExternalCatalog._
-
- // Exceptions thrown by the hive client that we would like to wrap
- private val clientExceptions = Set(
- classOf[HiveException].getCanonicalName,
- classOf[TException].getCanonicalName)
-
- /**
- * Whether this is an exception thrown by the hive client that should be wrapped.
- *
- * Due to classloader isolation issues, pattern matching won't work here so we need
- * to compare the canonical names of the exceptions, which we assume to be stable.
- */
- private def isClientException(e: Throwable): Boolean = {
- var temp: Class[_] = e.getClass
- var found = false
- while (temp != null && !found) {
- found = clientExceptions.contains(temp.getCanonicalName)
- temp = temp.getSuperclass
- }
- found
- }
-
- /**
- * Run some code involving `client` in a [[synchronized]] block and wrap certain
- * exceptions thrown in the process in [[AnalysisException]].
- */
- private def withClient[T](body: => T): T = synchronized {
- try {
- body
- } catch {
- case e: NoSuchItemException =>
- throw new AnalysisException(e.getMessage)
- case NonFatal(e) if isClientException(e) =>
- throw new AnalysisException(e.getClass.getCanonicalName + ": " + e.getMessage)
- }
- }
-
- private def requireDbMatches(db: String, table: CatalogTable): Unit = {
- if (table.name.database != Some(db)) {
- throw new AnalysisException(
- s"Provided database $db does not much the one specified in the " +
- s"table definition (${table.name.database.getOrElse("n/a")})")
- }
- }
-
- private def requireTableExists(db: String, table: String): Unit = {
- withClient { getTable(db, table) }
- }
-
- // --------------------------------------------------------------------------
- // Databases
- // --------------------------------------------------------------------------
-
- override def createDatabase(
- dbDefinition: CatalogDatabase,
- ignoreIfExists: Boolean): Unit = withClient {
- client.createDatabase(dbDefinition, ignoreIfExists)
- }
-
- override def dropDatabase(
- db: String,
- ignoreIfNotExists: Boolean,
- cascade: Boolean): Unit = withClient {
- client.dropDatabase(db, ignoreIfNotExists, cascade)
- }
-
- /**
- * Alter a database whose name matches the one specified in `dbDefinition`,
- * assuming the database exists.
- *
- * Note: As of now, this only supports altering database properties!
- */
- override def alterDatabase(dbDefinition: CatalogDatabase): Unit = withClient {
- val existingDb = getDatabase(dbDefinition.name)
- if (existingDb.properties == dbDefinition.properties) {
- logWarning(s"Request to alter database ${dbDefinition.name} is a no-op because " +
- s"the provided database properties are the same as the old ones. Hive does not " +
- s"currently support altering other database fields.")
- }
- client.alterDatabase(dbDefinition)
- }
-
- override def getDatabase(db: String): CatalogDatabase = withClient {
- client.getDatabase(db)
- }
-
- override def databaseExists(db: String): Boolean = withClient {
- client.getDatabaseOption(db).isDefined
- }
-
- override def listDatabases(): Seq[String] = withClient {
- client.listDatabases("*")
- }
-
- override def listDatabases(pattern: String): Seq[String] = withClient {
- client.listDatabases(pattern)
- }
-
- override def setCurrentDatabase(db: String): Unit = withClient {
- client.setCurrentDatabase(db)
- }
-
- // --------------------------------------------------------------------------
- // Tables
- // --------------------------------------------------------------------------
-
- override def createTable(
- db: String,
- tableDefinition: CatalogTable,
- ignoreIfExists: Boolean): Unit = withClient {
- requireDbExists(db)
- requireDbMatches(db, tableDefinition)
- client.createTable(tableDefinition, ignoreIfExists)
- }
-
- override def dropTable(
- db: String,
- table: String,
- ignoreIfNotExists: Boolean): Unit = withClient {
- requireDbExists(db)
- client.dropTable(db, table, ignoreIfNotExists)
- }
-
- override def renameTable(db: String, oldName: String, newName: String): Unit = withClient {
- val newTable = client.getTable(db, oldName).copy(name = TableIdentifier(newName, Some(db)))
- client.alterTable(oldName, newTable)
- }
-
- /**
- * Alter a table whose name that matches the one specified in `tableDefinition`,
- * assuming the table exists.
- *
- * Note: As of now, this only supports altering table properties, serde properties,
- * and num buckets!
- */
- override def alterTable(db: String, tableDefinition: CatalogTable): Unit = withClient {
- requireDbMatches(db, tableDefinition)
- requireTableExists(db, tableDefinition.name.table)
- client.alterTable(tableDefinition)
- }
-
- override def getTable(db: String, table: String): CatalogTable = withClient {
- client.getTable(db, table)
- }
-
- override def tableExists(db: String, table: String): Boolean = withClient {
- client.getTableOption(db, table).isDefined
- }
-
- override def listTables(db: String): Seq[String] = withClient {
- requireDbExists(db)
- client.listTables(db)
- }
-
- override def listTables(db: String, pattern: String): Seq[String] = withClient {
- requireDbExists(db)
- client.listTables(db, pattern)
- }
-
- // --------------------------------------------------------------------------
- // Partitions
- // --------------------------------------------------------------------------
-
- override def createPartitions(
- db: String,
- table: String,
- parts: Seq[CatalogTablePartition],
- ignoreIfExists: Boolean): Unit = withClient {
- requireTableExists(db, table)
- client.createPartitions(db, table, parts, ignoreIfExists)
- }
-
- override def dropPartitions(
- db: String,
- table: String,
- parts: Seq[TablePartitionSpec],
- ignoreIfNotExists: Boolean): Unit = withClient {
- requireTableExists(db, table)
- // Note: Unfortunately Hive does not currently support `ignoreIfNotExists` so we
- // need to implement it here ourselves. This is currently somewhat expensive because
- // we make multiple synchronous calls to Hive for each partition we want to drop.
- val partsToDrop =
- if (ignoreIfNotExists) {
- parts.filter { spec =>
- try {
- getPartition(db, table, spec)
- true
- } catch {
- // Filter out the partitions that do not actually exist
- case _: AnalysisException => false
- }
- }
- } else {
- parts
- }
- if (partsToDrop.nonEmpty) {
- client.dropPartitions(db, table, partsToDrop)
- }
- }
-
- override def renamePartitions(
- db: String,
- table: String,
- specs: Seq[TablePartitionSpec],
- newSpecs: Seq[TablePartitionSpec]): Unit = withClient {
- client.renamePartitions(db, table, specs, newSpecs)
- }
-
- override def alterPartitions(
- db: String,
- table: String,
- newParts: Seq[CatalogTablePartition]): Unit = withClient {
- client.alterPartitions(db, table, newParts)
- }
-
- override def getPartition(
- db: String,
- table: String,
- spec: TablePartitionSpec): CatalogTablePartition = withClient {
- client.getPartition(db, table, spec)
- }
-
- override def listPartitions(
- db: String,
- table: String): Seq[CatalogTablePartition] = withClient {
- client.getAllPartitions(db, table)
- }
-
- // --------------------------------------------------------------------------
- // Functions
- // --------------------------------------------------------------------------
-
- override def createFunction(
- db: String,
- funcDefinition: CatalogFunction): Unit = withClient {
- client.createFunction(db, funcDefinition)
- }
-
- override def dropFunction(db: String, name: String): Unit = withClient {
- client.dropFunction(db, name)
- }
-
- override def renameFunction(db: String, oldName: String, newName: String): Unit = withClient {
- client.renameFunction(db, oldName, newName)
- }
-
- override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = withClient {
- client.alterFunction(db, funcDefinition)
- }
-
- override def getFunction(db: String, funcName: String): CatalogFunction = withClient {
- client.getFunction(db, funcName)
- }
-
- override def listFunctions(db: String, pattern: String): Seq[String] = withClient {
- client.listFunctions(db, pattern)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/eebc8c1c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index ca3ce43..c0b6d16 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -86,7 +86,7 @@ class HiveContext private[hive](
@transient private[hive] val executionHive: HiveClientImpl,
@transient private[hive] val metadataHive: HiveClient,
isRootContext: Boolean,
- @transient private[sql] val hiveCatalog: HiveCatalog)
+ @transient private[sql] val hiveCatalog: HiveExternalCatalog)
extends SQLContext(sc, cacheManager, listener, isRootContext, hiveCatalog) with Logging {
self =>
@@ -98,7 +98,7 @@ class HiveContext private[hive](
execHive,
metaHive,
true,
- new HiveCatalog(metaHive))
+ new HiveExternalCatalog(metaHive))
}
def this(sc: SparkContext) = {
http://git-wip-us.apache.org/repos/asf/spark/blob/eebc8c1c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
new file mode 100644
index 0000000..f75509f
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.hive.ql.metadata.HiveException
+import org.apache.thrift.TException
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchItemException
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.hive.client.HiveClient
+
+
+/**
+ * A persistent implementation of the system catalog using Hive.
+ * All public methods must be synchronized for thread-safety.
+ */
+private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCatalog with Logging {
+ import ExternalCatalog._
+
+ // Exceptions thrown by the hive client that we would like to wrap
+ private val clientExceptions = Set(
+ classOf[HiveException].getCanonicalName,
+ classOf[TException].getCanonicalName)
+
+ /**
+ * Whether this is an exception thrown by the hive client that should be wrapped.
+ *
+ * Due to classloader isolation issues, pattern matching won't work here so we need
+ * to compare the canonical names of the exceptions, which we assume to be stable.
+ */
+ private def isClientException(e: Throwable): Boolean = {
+ var temp: Class[_] = e.getClass
+ var found = false
+ while (temp != null && !found) {
+ found = clientExceptions.contains(temp.getCanonicalName)
+ temp = temp.getSuperclass
+ }
+ found
+ }
+
+ /**
+ * Run some code involving `client` in a [[synchronized]] block and wrap certain
+ * exceptions thrown in the process in [[AnalysisException]].
+ */
+ private def withClient[T](body: => T): T = synchronized {
+ try {
+ body
+ } catch {
+ case e: NoSuchItemException =>
+ throw new AnalysisException(e.getMessage)
+ case NonFatal(e) if isClientException(e) =>
+ throw new AnalysisException(e.getClass.getCanonicalName + ": " + e.getMessage)
+ }
+ }
+
+ private def requireDbMatches(db: String, table: CatalogTable): Unit = {
+ if (table.identifier.database != Some(db)) {
+ throw new AnalysisException(
+ s"Provided database $db does not much the one specified in the " +
+ s"table definition (${table.identifier.database.getOrElse("n/a")})")
+ }
+ }
+
+ private def requireTableExists(db: String, table: String): Unit = {
+ withClient { getTable(db, table) }
+ }
+
+ // --------------------------------------------------------------------------
+ // Databases
+ // --------------------------------------------------------------------------
+
+ override def createDatabase(
+ dbDefinition: CatalogDatabase,
+ ignoreIfExists: Boolean): Unit = withClient {
+ client.createDatabase(dbDefinition, ignoreIfExists)
+ }
+
+ override def dropDatabase(
+ db: String,
+ ignoreIfNotExists: Boolean,
+ cascade: Boolean): Unit = withClient {
+ client.dropDatabase(db, ignoreIfNotExists, cascade)
+ }
+
+ /**
+ * Alter a database whose name matches the one specified in `dbDefinition`,
+ * assuming the database exists.
+ *
+ * Note: As of now, this only supports altering database properties!
+ */
+ override def alterDatabase(dbDefinition: CatalogDatabase): Unit = withClient {
+ val existingDb = getDatabase(dbDefinition.name)
+ if (existingDb.properties == dbDefinition.properties) {
+ logWarning(s"Request to alter database ${dbDefinition.name} is a no-op because " +
+ s"the provided database properties are the same as the old ones. Hive does not " +
+ s"currently support altering other database fields.")
+ }
+ client.alterDatabase(dbDefinition)
+ }
+
+ override def getDatabase(db: String): CatalogDatabase = withClient {
+ client.getDatabase(db)
+ }
+
+ override def databaseExists(db: String): Boolean = withClient {
+ client.getDatabaseOption(db).isDefined
+ }
+
+ override def listDatabases(): Seq[String] = withClient {
+ client.listDatabases("*")
+ }
+
+ override def listDatabases(pattern: String): Seq[String] = withClient {
+ client.listDatabases(pattern)
+ }
+
+ override def setCurrentDatabase(db: String): Unit = withClient {
+ client.setCurrentDatabase(db)
+ }
+
+ // --------------------------------------------------------------------------
+ // Tables
+ // --------------------------------------------------------------------------
+
+ override def createTable(
+ db: String,
+ tableDefinition: CatalogTable,
+ ignoreIfExists: Boolean): Unit = withClient {
+ requireDbExists(db)
+ requireDbMatches(db, tableDefinition)
+ client.createTable(tableDefinition, ignoreIfExists)
+ }
+
+ override def dropTable(
+ db: String,
+ table: String,
+ ignoreIfNotExists: Boolean): Unit = withClient {
+ requireDbExists(db)
+ client.dropTable(db, table, ignoreIfNotExists)
+ }
+
+ override def renameTable(db: String, oldName: String, newName: String): Unit = withClient {
+ val newTable = client.getTable(db, oldName)
+ .copy(identifier = TableIdentifier(newName, Some(db)))
+ client.alterTable(oldName, newTable)
+ }
+
+ /**
+ * Alter a table whose name that matches the one specified in `tableDefinition`,
+ * assuming the table exists.
+ *
+ * Note: As of now, this only supports altering table properties, serde properties,
+ * and num buckets!
+ */
+ override def alterTable(db: String, tableDefinition: CatalogTable): Unit = withClient {
+ requireDbMatches(db, tableDefinition)
+ requireTableExists(db, tableDefinition.identifier.table)
+ client.alterTable(tableDefinition)
+ }
+
+ override def getTable(db: String, table: String): CatalogTable = withClient {
+ client.getTable(db, table)
+ }
+
+ override def tableExists(db: String, table: String): Boolean = withClient {
+ client.getTableOption(db, table).isDefined
+ }
+
+ override def listTables(db: String): Seq[String] = withClient {
+ requireDbExists(db)
+ client.listTables(db)
+ }
+
+ override def listTables(db: String, pattern: String): Seq[String] = withClient {
+ requireDbExists(db)
+ client.listTables(db, pattern)
+ }
+
+ // --------------------------------------------------------------------------
+ // Partitions
+ // --------------------------------------------------------------------------
+
+ override def createPartitions(
+ db: String,
+ table: String,
+ parts: Seq[CatalogTablePartition],
+ ignoreIfExists: Boolean): Unit = withClient {
+ requireTableExists(db, table)
+ client.createPartitions(db, table, parts, ignoreIfExists)
+ }
+
+ override def dropPartitions(
+ db: String,
+ table: String,
+ parts: Seq[TablePartitionSpec],
+ ignoreIfNotExists: Boolean): Unit = withClient {
+ requireTableExists(db, table)
+ // Note: Unfortunately Hive does not currently support `ignoreIfNotExists` so we
+ // need to implement it here ourselves. This is currently somewhat expensive because
+ // we make multiple synchronous calls to Hive for each partition we want to drop.
+ val partsToDrop =
+ if (ignoreIfNotExists) {
+ parts.filter { spec =>
+ try {
+ getPartition(db, table, spec)
+ true
+ } catch {
+ // Filter out the partitions that do not actually exist
+ case _: AnalysisException => false
+ }
+ }
+ } else {
+ parts
+ }
+ if (partsToDrop.nonEmpty) {
+ client.dropPartitions(db, table, partsToDrop)
+ }
+ }
+
+ override def renamePartitions(
+ db: String,
+ table: String,
+ specs: Seq[TablePartitionSpec],
+ newSpecs: Seq[TablePartitionSpec]): Unit = withClient {
+ client.renamePartitions(db, table, specs, newSpecs)
+ }
+
+ override def alterPartitions(
+ db: String,
+ table: String,
+ newParts: Seq[CatalogTablePartition]): Unit = withClient {
+ client.alterPartitions(db, table, newParts)
+ }
+
+ override def getPartition(
+ db: String,
+ table: String,
+ spec: TablePartitionSpec): CatalogTablePartition = withClient {
+ client.getPartition(db, table, spec)
+ }
+
+ override def listPartitions(
+ db: String,
+ table: String): Seq[CatalogTablePartition] = withClient {
+ client.getAllPartitions(db, table)
+ }
+
+ // --------------------------------------------------------------------------
+ // Functions
+ // --------------------------------------------------------------------------
+
+ override def createFunction(
+ db: String,
+ funcDefinition: CatalogFunction): Unit = withClient {
+ client.createFunction(db, funcDefinition)
+ }
+
+ override def dropFunction(db: String, name: String): Unit = withClient {
+ client.dropFunction(db, name)
+ }
+
+ override def renameFunction(db: String, oldName: String, newName: String): Unit = withClient {
+ client.renameFunction(db, oldName, newName)
+ }
+
+ override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = withClient {
+ client.alterFunction(db, funcDefinition)
+ }
+
+ override def getFunction(db: String, funcName: String): CatalogFunction = withClient {
+ client.getFunction(db, funcName)
+ }
+
+ override def listFunctions(db: String, pattern: String): Seq[String] = withClient {
+ client.listFunctions(db, pattern)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/eebc8c1c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index c7066d7..eedd12d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -102,7 +102,7 @@ private[hive] object HiveSerDe {
* Legacy catalog for interacting with the Hive metastore.
*
* This is still used for things like creating data source tables, but in the future will be
- * cleaned up to integrate more nicely with [[HiveCatalog]].
+ * cleaned up to integrate more nicely with [[HiveExternalCatalog]].
*/
private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveContext)
extends Logging {
@@ -124,8 +124,8 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
private def getQualifiedTableName(t: CatalogTable): QualifiedTableName = {
QualifiedTableName(
- t.name.database.getOrElse(getCurrentDatabase).toLowerCase,
- t.name.table.toLowerCase)
+ t.identifier.database.getOrElse(getCurrentDatabase).toLowerCase,
+ t.identifier.table.toLowerCase)
}
/** A cache of Spark SQL data source tables that have been accessed. */
@@ -299,7 +299,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
CatalogTable(
- name = TableIdentifier(tblName, Option(dbName)),
+ identifier = TableIdentifier(tblName, Option(dbName)),
tableType = tableType,
schema = Nil,
storage = CatalogStorageFormat(
@@ -319,7 +319,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
assert(relation.partitionSchema.isEmpty)
CatalogTable(
- name = TableIdentifier(tblName, Option(dbName)),
+ identifier = TableIdentifier(tblName, Option(dbName)),
tableType = tableType,
storage = CatalogStorageFormat(
locationUri = Some(relation.location.paths.map(_.toUri.toString).head),
@@ -431,7 +431,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
alias match {
// because hive use things like `_c0` to build the expanded text
// currently we cannot support view from "create view v1(c1) as ..."
- case None => SubqueryAlias(table.name.table, hive.parseSql(viewText))
+ case None => SubqueryAlias(table.identifier.table, hive.parseSql(viewText))
case Some(aliasText) => SubqueryAlias(aliasText, hive.parseSql(viewText))
}
} else {
@@ -611,7 +611,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
execution.CreateViewAsSelect(
- table.copy(name = TableIdentifier(tblName, Some(dbName))),
+ table.copy(identifier = TableIdentifier(tblName, Some(dbName))),
child,
allowExisting,
replace)
@@ -633,7 +633,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
if (hive.convertCTAS && table.storage.serde.isEmpty) {
// Do the conversion when spark.sql.hive.convertCTAS is true and the query
// does not specify any storage format (file format and storage handler).
- if (table.name.database.isDefined) {
+ if (table.identifier.database.isDefined) {
throw new AnalysisException(
"Cannot specify database name in a CTAS statement " +
"when spark.sql.hive.convertCTAS is set to true.")
@@ -641,7 +641,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableUsingAsSelect(
- TableIdentifier(desc.name.table),
+ TableIdentifier(desc.identifier.table),
conf.defaultDataSourceName,
temporary = false,
Array.empty[String],
@@ -662,7 +662,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
execution.CreateTableAsSelect(
- desc.copy(name = TableIdentifier(tblName, Some(dbName))),
+ desc.copy(identifier = TableIdentifier(tblName, Some(dbName))),
child,
allowExisting)
}
@@ -792,7 +792,7 @@ private[hive] case class MetastoreRelation(
// We start by constructing an API table as Hive performs several important transformations
// internally when converting an API table to a QL table.
val tTable = new org.apache.hadoop.hive.metastore.api.Table()
- tTable.setTableName(table.name.table)
+ tTable.setTableName(table.identifier.table)
tTable.setDbName(table.database)
val tableParameters = new java.util.HashMap[String, String]()
http://git-wip-us.apache.org/repos/asf/spark/blob/eebc8c1c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index e5bcb9b..b3ec95f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -60,7 +60,7 @@ private[hive] case class CreateTableAsSelect(
override def output: Seq[Attribute] = Seq.empty[Attribute]
override lazy val resolved: Boolean =
- tableDesc.name.database.isDefined &&
+ tableDesc.identifier.database.isDefined &&
tableDesc.schema.nonEmpty &&
tableDesc.storage.serde.isDefined &&
tableDesc.storage.inputFormat.isDefined &&
@@ -183,7 +183,7 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
val tableIdentifier = extractTableIdent(viewNameParts)
val originalText = query.source
val tableDesc = CatalogTable(
- name = tableIdentifier,
+ identifier = tableIdentifier,
tableType = CatalogTableType.VIRTUAL_VIEW,
schema = schema,
storage = CatalogStorageFormat(
@@ -352,7 +352,7 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
// TODO add bucket support
var tableDesc: CatalogTable = CatalogTable(
- name = tableIdentifier,
+ identifier = tableIdentifier,
tableType =
if (externalTable.isDefined) {
CatalogTableType.EXTERNAL_TABLE
http://git-wip-us.apache.org/repos/asf/spark/blob/eebc8c1c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index aa44cba..ec7bf61 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.types.StructType
class HiveSessionCatalog(
- externalCatalog: HiveCatalog,
+ externalCatalog: HiveExternalCatalog,
client: HiveClient,
context: HiveContext,
conf: SQLConf)
@@ -41,11 +41,11 @@ class HiveSessionCatalog(
override def lookupRelation(name: TableIdentifier, alias: Option[String]): LogicalPlan = {
val table = formatTableName(name.table)
- if (name.database.isDefined || !tempTables.containsKey(table)) {
+ if (name.database.isDefined || !tempTables.contains(table)) {
val newName = name.copy(table = table)
metastoreCatalog.lookupRelation(newName, alias)
} else {
- val relation = tempTables.get(table)
+ val relation = tempTables(table)
val tableWithQualifiers = SubqueryAlias(table, relation)
// If an alias was specified by the lookup, wrap the plan in a subquery so that
// attributes are properly qualified with this alias.
http://git-wip-us.apache.org/repos/asf/spark/blob/eebc8c1c/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index f4d3035..ee56f9d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -88,7 +88,7 @@ private[hive] trait HiveClient {
def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit
/** Alter a table whose name matches the one specified in `table`, assuming it exists. */
- final def alterTable(table: CatalogTable): Unit = alterTable(table.name.table, table)
+ final def alterTable(table: CatalogTable): Unit = alterTable(table.identifier.table, table)
/** Updates the given table with new metadata, optionally renaming the table. */
def alterTable(tableName: String, table: CatalogTable): Unit
http://git-wip-us.apache.org/repos/asf/spark/blob/eebc8c1c/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index e4e15d1..a31178e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -298,7 +298,7 @@ private[hive] class HiveClientImpl(
logDebug(s"Looking up $dbName.$tableName")
Option(client.getTable(dbName, tableName, false)).map { h =>
CatalogTable(
- name = TableIdentifier(h.getTableName, Option(h.getDbName)),
+ identifier = TableIdentifier(h.getTableName, Option(h.getDbName)),
tableType = h.getTableType match {
case HiveTableType.EXTERNAL_TABLE => CatalogTableType.EXTERNAL_TABLE
case HiveTableType.MANAGED_TABLE => CatalogTableType.MANAGED_TABLE
@@ -544,13 +544,14 @@ private[hive] class HiveClientImpl(
}
override def renameFunction(db: String, oldName: String, newName: String): Unit = withHiveState {
- val catalogFunc = getFunction(db, oldName).copy(name = FunctionIdentifier(newName, Some(db)))
+ val catalogFunc = getFunction(db, oldName)
+ .copy(identifier = FunctionIdentifier(newName, Some(db)))
val hiveFunc = toHiveFunction(catalogFunc, db)
client.alterFunction(db, oldName, hiveFunc)
}
override def alterFunction(db: String, func: CatalogFunction): Unit = withHiveState {
- client.alterFunction(db, func.name.funcName, toHiveFunction(func, db))
+ client.alterFunction(db, func.identifier.funcName, toHiveFunction(func, db))
}
override def getFunctionOption(
@@ -611,7 +612,7 @@ private[hive] class HiveClientImpl(
private def toHiveFunction(f: CatalogFunction, db: String): HiveFunction = {
new HiveFunction(
- f.name.funcName,
+ f.identifier.funcName,
db,
f.className,
null,
@@ -639,7 +640,7 @@ private[hive] class HiveClientImpl(
}
private def toHiveTable(table: CatalogTable): HiveTable = {
- val hiveTable = new HiveTable(table.database, table.name.table)
+ val hiveTable = new HiveTable(table.database, table.identifier.table)
hiveTable.setTableType(table.tableType match {
case CatalogTableType.EXTERNAL_TABLE => HiveTableType.EXTERNAL_TABLE
case CatalogTableType.MANAGED_TABLE => HiveTableType.MANAGED_TABLE
http://git-wip-us.apache.org/repos/asf/spark/blob/eebc8c1c/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 5a61eef..29f7dc2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -38,7 +38,7 @@ case class CreateTableAsSelect(
allowExisting: Boolean)
extends RunnableCommand {
- private val tableIdentifier = tableDesc.name
+ private val tableIdentifier = tableDesc.identifier
override def children: Seq[LogicalPlan] = Seq(query)
@@ -93,6 +93,8 @@ case class CreateTableAsSelect(
}
override def argString: String = {
- s"[Database:${tableDesc.database}}, TableName: ${tableDesc.name.table}, InsertIntoHiveTable]"
+ s"[Database:${tableDesc.database}}, " +
+ s"TableName: ${tableDesc.identifier.table}, " +
+ s"InsertIntoHiveTable]"
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/eebc8c1c/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
index 9ff520d..33cd8b4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
@@ -44,7 +44,7 @@ private[hive] case class CreateViewAsSelect(
assert(tableDesc.schema == Nil || tableDesc.schema.length == childSchema.length)
assert(tableDesc.viewText.isDefined)
- private val tableIdentifier = tableDesc.name
+ private val tableIdentifier = tableDesc.identifier
override def run(sqlContext: SQLContext): Seq[Row] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
@@ -116,7 +116,7 @@ private[hive] case class CreateViewAsSelect(
}
val viewText = tableDesc.viewText.get
- val viewName = quote(tableDesc.name.table)
+ val viewName = quote(tableDesc.identifier.table)
s"SELECT $viewOutput FROM ($viewText) $viewName"
}
http://git-wip-us.apache.org/repos/asf/spark/blob/eebc8c1c/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index a1785ca..4afc8d1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -78,7 +78,7 @@ class TestHiveContext private[hive](
executionHive: HiveClientImpl,
metadataHive: HiveClient,
isRootContext: Boolean,
- hiveCatalog: HiveCatalog,
+ hiveCatalog: HiveExternalCatalog,
val warehousePath: File,
val scratchDirPath: File,
metastoreTemporaryConf: Map[String, String])
@@ -110,7 +110,7 @@ class TestHiveContext private[hive](
executionHive,
metadataHive,
true,
- new HiveCatalog(metadataHive),
+ new HiveExternalCatalog(metadataHive),
warehousePath,
scratchDirPath,
metastoreTemporaryConf)
http://git-wip-us.apache.org/repos/asf/spark/blob/eebc8c1c/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala
deleted file mode 100644
index 427f574..0000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.util.VersionInfo
-
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.hive.client.{HiveClient, IsolatedClientLoader}
-import org.apache.spark.util.Utils
-
-/**
- * Test suite for the [[HiveCatalog]].
- */
-class HiveCatalogSuite extends CatalogTestCases {
-
- private val client: HiveClient = {
- IsolatedClientLoader.forVersion(
- hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
- hadoopVersion = VersionInfo.getVersion,
- sparkConf = new SparkConf(),
- hadoopConf = new Configuration()).createClient()
- }
-
- protected override val utils: CatalogTestUtils = new CatalogTestUtils {
- override val tableInputFormat: String = "org.apache.hadoop.mapred.SequenceFileInputFormat"
- override val tableOutputFormat: String = "org.apache.hadoop.mapred.SequenceFileOutputFormat"
- override def newEmptyCatalog(): ExternalCatalog = new HiveCatalog(client)
- }
-
- protected override def resetState(): Unit = client.reset()
-
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/eebc8c1c/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
new file mode 100644
index 0000000..3334c16
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.util.VersionInfo
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.hive.client.{HiveClient, IsolatedClientLoader}
+import org.apache.spark.util.Utils
+
+/**
+ * Test suite for the [[HiveExternalCatalog]].
+ */
+class HiveExternalCatalogSuite extends CatalogTestCases {
+
+ private val client: HiveClient = {
+ IsolatedClientLoader.forVersion(
+ hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
+ hadoopVersion = VersionInfo.getVersion,
+ sparkConf = new SparkConf(),
+ hadoopConf = new Configuration()).createClient()
+ }
+
+ protected override val utils: CatalogTestUtils = new CatalogTestUtils {
+ override val tableInputFormat: String = "org.apache.hadoop.mapred.SequenceFileInputFormat"
+ override val tableOutputFormat: String = "org.apache.hadoop.mapred.SequenceFileOutputFormat"
+ override def newEmptyCatalog(): ExternalCatalog = new HiveExternalCatalog(client)
+ }
+
+ protected override def resetState(): Unit = client.reset()
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/eebc8c1c/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
index 1c775db..0aaf576 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
@@ -54,8 +54,8 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
val (desc, exists) = extractTableDesc(s1)
assert(exists)
- assert(desc.name.database == Some("mydb"))
- assert(desc.name.table == "page_view")
+ assert(desc.identifier.database == Some("mydb"))
+ assert(desc.identifier.table == "page_view")
assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE)
assert(desc.storage.locationUri == Some("/user/external/page_view"))
assert(desc.schema ==
@@ -100,8 +100,8 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
val (desc, exists) = extractTableDesc(s2)
assert(exists)
- assert(desc.name.database == Some("mydb"))
- assert(desc.name.table == "page_view")
+ assert(desc.identifier.database == Some("mydb"))
+ assert(desc.identifier.table == "page_view")
assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE)
assert(desc.storage.locationUri == Some("/user/external/page_view"))
assert(desc.schema ==
@@ -127,8 +127,8 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
val s3 = """CREATE TABLE page_view AS SELECT * FROM src"""
val (desc, exists) = extractTableDesc(s3)
assert(exists == false)
- assert(desc.name.database == None)
- assert(desc.name.table == "page_view")
+ assert(desc.identifier.database == None)
+ assert(desc.identifier.table == "page_view")
assert(desc.tableType == CatalogTableType.MANAGED_TABLE)
assert(desc.storage.locationUri == None)
assert(desc.schema == Seq.empty[CatalogColumn])
@@ -162,8 +162,8 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
| ORDER BY key, value""".stripMargin
val (desc, exists) = extractTableDesc(s5)
assert(exists == false)
- assert(desc.name.database == None)
- assert(desc.name.table == "ctas2")
+ assert(desc.identifier.database == None)
+ assert(desc.identifier.table == "ctas2")
assert(desc.tableType == CatalogTableType.MANAGED_TABLE)
assert(desc.storage.locationUri == None)
assert(desc.schema == Seq.empty[CatalogColumn])
http://git-wip-us.apache.org/repos/asf/spark/blob/eebc8c1c/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
index 5272f41..e8188e5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
@@ -34,7 +34,7 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft
super.beforeAll()
// The catalog in HiveContext is a case insensitive one.
sessionState.catalog.createTempTable(
- "ListTablesSuiteTable", df.logicalPlan, ignoreIfExists = true)
+ "ListTablesSuiteTable", df.logicalPlan, overrideIfExists = true)
sql("CREATE TABLE HiveListTablesSuiteTable (key int, value string)")
sql("CREATE DATABASE IF NOT EXISTS ListTablesSuiteDB")
sql("CREATE TABLE ListTablesSuiteDB.HiveInDBListTablesSuiteTable (key int, value string)")
http://git-wip-us.apache.org/repos/asf/spark/blob/eebc8c1c/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 7165289..3c299da 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -722,7 +722,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
withTable(tableName) {
val schema = StructType(StructField("int", IntegerType, true) :: Nil)
val hiveTable = CatalogTable(
- name = TableIdentifier(tableName, Some("default")),
+ identifier = TableIdentifier(tableName, Some("default")),
tableType = CatalogTableType.MANAGED_TABLE,
schema = Seq.empty,
storage = CatalogStorageFormat(
http://git-wip-us.apache.org/repos/asf/spark/blob/eebc8c1c/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index d59bca4..8b07192 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -148,7 +148,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
test(s"$version: createTable") {
val table =
CatalogTable(
- name = TableIdentifier("src", Some("default")),
+ identifier = TableIdentifier("src", Some("default")),
tableType = CatalogTableType.MANAGED_TABLE,
schema = Seq(CatalogColumn("key", "int")),
storage = CatalogStorageFormat(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org