You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/02/22 00:00:27 UTC
[2/2] spark git commit: [SPARK-13080][SQL] Implement new Catalog API
using Hive
[SPARK-13080][SQL] Implement new Catalog API using Hive
## What changes were proposed in this pull request?
This is a step towards merging `SQLContext` and `HiveContext`. A new internal Catalog API was introduced in #10982 and extended in #11069. This patch introduces an implementation of this API using `HiveClient`, an existing interface to Hive. It also extends `HiveClient` with additional calls to Hive that are needed to complete the catalog implementation.
*Where should I start reviewing?* The new catalog introduced is `HiveCatalog`. This class is relatively simple because it just calls `HiveClientImpl`, where most of the new logic is. I would not start with `HiveClient`, `HiveQl`, or `HiveMetastoreCatalog`, which are modified mainly because of a refactor.
*Why is this patch so big?* I had to refactor HiveClient to remove an intermediate representation of databases, tables, partitions etc. After this refactor `CatalogTable` convert directly to and from `HiveTable` (etc.). Otherwise we would have to first convert `CatalogTable` to the intermediate representation and then convert that to HiveTable, which is messy.
The new class hierarchy is as follows:
```
org.apache.spark.sql.catalyst.catalog.Catalog
- org.apache.spark.sql.catalyst.catalog.InMemoryCatalog
- org.apache.spark.sql.hive.HiveCatalog
```
Note that, as of this patch, none of these classes are currently used anywhere yet. This will come in the future before the Spark 2.0 release.
## How was the this patch tested?
All existing unit tests, and HiveCatalogSuite that extends CatalogTestCases.
Author: Andrew Or <an...@databricks.com>
Author: Reynold Xin <rx...@databricks.com>
Closes #11293 from rxin/hive-catalog.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c3832b2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c3832b2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c3832b2
Branch: refs/heads/master
Commit: 6c3832b26e119626205732b8fd03c8f5ba986896
Parents: 7eb83fe
Author: Andrew Or <an...@databricks.com>
Authored: Sun Feb 21 15:00:24 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sun Feb 21 15:00:24 2016 -0800
----------------------------------------------------------------------
.../apache/spark/sql/AnalysisException.scala | 3 +
.../spark/sql/catalyst/analysis/Catalog.scala | 9 -
.../catalyst/analysis/NoSuchItemException.scala | 52 +++
.../sql/catalyst/catalog/InMemoryCatalog.scala | 154 ++++----
.../spark/sql/catalyst/catalog/interface.scala | 190 ++++++----
.../sql/catalyst/catalog/CatalogTestCases.scala | 284 +++++++++-----
.../org/apache/spark/sql/hive/HiveCatalog.scala | 293 ++++++++++++++
.../spark/sql/hive/HiveMetastoreCatalog.scala | 171 +++++----
.../org/apache/spark/sql/hive/HiveQl.scala | 145 +++----
.../spark/sql/hive/client/HiveClient.scala | 210 ++++++----
.../spark/sql/hive/client/HiveClientImpl.scala | 379 +++++++++++++------
.../hive/execution/CreateTableAsSelect.scala | 20 +-
.../sql/hive/execution/CreateViewAsSelect.scala | 20 +-
.../hive/execution/InsertIntoHiveTable.scala | 2 +-
.../spark/sql/hive/HiveCatalogSuite.scala | 49 +++
.../sql/hive/HiveMetastoreCatalogSuite.scala | 40 +-
.../org/apache/spark/sql/hive/HiveQlSuite.scala | 94 ++---
.../sql/hive/MetastoreDataSourcesSuite.scala | 25 +-
.../spark/sql/hive/MultiDatabaseSuite.scala | 4 +-
.../spark/sql/hive/client/VersionsSuite.scala | 37 +-
.../spark/sql/hive/execution/PruningSuite.scala | 2 +-
21 files changed, 1483 insertions(+), 700 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
index f999218..97f28fa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
@@ -19,6 +19,9 @@ package org.apache.spark.sql
import org.apache.spark.annotation.DeveloperApi
+
+// TODO: don't swallow original stack trace if it exists
+
/**
* :: DeveloperApi ::
* Thrown when a query fails to analyze, usually because the query itself is invalid.
http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index 67edab5..52b284b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -20,20 +20,11 @@ package org.apache.spark.sql.catalyst.analysis
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, EmptyConf, TableIdentifier}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
-/**
- * Thrown by a catalog when a table cannot be found. The analyzer will rethrow the exception
- * as an AnalysisException with the correct position information.
- */
-class NoSuchTableException extends Exception
-
-class NoSuchDatabaseException extends Exception
/**
* An interface for looking up relations by name. Used by an [[Analyzer]].
http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
new file mode 100644
index 0000000..81399db
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.catalog.Catalog.TablePartitionSpec
+
+
+/**
+ * Thrown by a catalog when an item cannot be found. The analyzer will rethrow the exception
+ * as an [[org.apache.spark.sql.AnalysisException]] with the correct position information.
+ */
+abstract class NoSuchItemException extends Exception {
+ override def getMessage: String
+}
+
+class NoSuchDatabaseException(db: String) extends NoSuchItemException {
+ override def getMessage: String = s"Database $db not found"
+}
+
+class NoSuchTableException(db: String, table: String) extends NoSuchItemException {
+ override def getMessage: String = s"Table $table not found in database $db"
+}
+
+class NoSuchPartitionException(
+ db: String,
+ table: String,
+ spec: TablePartitionSpec)
+ extends NoSuchItemException {
+
+ override def getMessage: String = {
+ s"Partition not found in table $table database $db:\n" + spec.mkString("\n")
+ }
+}
+
+class NoSuchFunctionException(db: String, func: String) extends NoSuchItemException {
+ override def getMessage: String = s"Function $func not found in database $db"
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 38be61c..cba4de3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -30,15 +30,16 @@ import org.apache.spark.sql.AnalysisException
class InMemoryCatalog extends Catalog {
import Catalog._
- private class TableDesc(var table: Table) {
- val partitions = new mutable.HashMap[PartitionSpec, TablePartition]
+ private class TableDesc(var table: CatalogTable) {
+ val partitions = new mutable.HashMap[TablePartitionSpec, CatalogTablePartition]
}
- private class DatabaseDesc(var db: Database) {
+ private class DatabaseDesc(var db: CatalogDatabase) {
val tables = new mutable.HashMap[String, TableDesc]
- val functions = new mutable.HashMap[String, Function]
+ val functions = new mutable.HashMap[String, CatalogFunction]
}
+ // Database name -> description
private val catalog = new scala.collection.mutable.HashMap[String, DatabaseDesc]
private def filterPattern(names: Seq[String], pattern: String): Seq[String] = {
@@ -47,39 +48,33 @@ class InMemoryCatalog extends Catalog {
}
private def existsFunction(db: String, funcName: String): Boolean = {
- assertDbExists(db)
+ requireDbExists(db)
catalog(db).functions.contains(funcName)
}
private def existsTable(db: String, table: String): Boolean = {
- assertDbExists(db)
+ requireDbExists(db)
catalog(db).tables.contains(table)
}
- private def existsPartition(db: String, table: String, spec: PartitionSpec): Boolean = {
- assertTableExists(db, table)
+ private def existsPartition(db: String, table: String, spec: TablePartitionSpec): Boolean = {
+ requireTableExists(db, table)
catalog(db).tables(table).partitions.contains(spec)
}
- private def assertDbExists(db: String): Unit = {
- if (!catalog.contains(db)) {
- throw new AnalysisException(s"Database $db does not exist")
- }
- }
-
- private def assertFunctionExists(db: String, funcName: String): Unit = {
+ private def requireFunctionExists(db: String, funcName: String): Unit = {
if (!existsFunction(db, funcName)) {
throw new AnalysisException(s"Function $funcName does not exist in $db database")
}
}
- private def assertTableExists(db: String, table: String): Unit = {
+ private def requireTableExists(db: String, table: String): Unit = {
if (!existsTable(db, table)) {
throw new AnalysisException(s"Table $table does not exist in $db database")
}
}
- private def assertPartitionExists(db: String, table: String, spec: PartitionSpec): Unit = {
+ private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = {
if (!existsPartition(db, table, spec)) {
throw new AnalysisException(s"Partition does not exist in database $db table $table: $spec")
}
@@ -90,7 +85,7 @@ class InMemoryCatalog extends Catalog {
// --------------------------------------------------------------------------
override def createDatabase(
- dbDefinition: Database,
+ dbDefinition: CatalogDatabase,
ignoreIfExists: Boolean): Unit = synchronized {
if (catalog.contains(dbDefinition.name)) {
if (!ignoreIfExists) {
@@ -124,17 +119,20 @@ class InMemoryCatalog extends Catalog {
}
}
- override def alterDatabase(db: String, dbDefinition: Database): Unit = synchronized {
- assertDbExists(db)
- assert(db == dbDefinition.name)
- catalog(db).db = dbDefinition
+ override def alterDatabase(dbDefinition: CatalogDatabase): Unit = synchronized {
+ requireDbExists(dbDefinition.name)
+ catalog(dbDefinition.name).db = dbDefinition
}
- override def getDatabase(db: String): Database = synchronized {
- assertDbExists(db)
+ override def getDatabase(db: String): CatalogDatabase = synchronized {
+ requireDbExists(db)
catalog(db).db
}
+ override def databaseExists(db: String): Boolean = synchronized {
+ catalog.contains(db)
+ }
+
override def listDatabases(): Seq[String] = synchronized {
catalog.keySet.toSeq
}
@@ -143,15 +141,17 @@ class InMemoryCatalog extends Catalog {
filterPattern(listDatabases(), pattern)
}
+ override def setCurrentDatabase(db: String): Unit = { /* no-op */ }
+
// --------------------------------------------------------------------------
// Tables
// --------------------------------------------------------------------------
override def createTable(
db: String,
- tableDefinition: Table,
+ tableDefinition: CatalogTable,
ignoreIfExists: Boolean): Unit = synchronized {
- assertDbExists(db)
+ requireDbExists(db)
if (existsTable(db, tableDefinition.name)) {
if (!ignoreIfExists) {
throw new AnalysisException(s"Table ${tableDefinition.name} already exists in $db database")
@@ -165,7 +165,7 @@ class InMemoryCatalog extends Catalog {
db: String,
table: String,
ignoreIfNotExists: Boolean): Unit = synchronized {
- assertDbExists(db)
+ requireDbExists(db)
if (existsTable(db, table)) {
catalog(db).tables.remove(table)
} else {
@@ -176,31 +176,30 @@ class InMemoryCatalog extends Catalog {
}
override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
- assertTableExists(db, oldName)
+ requireTableExists(db, oldName)
val oldDesc = catalog(db).tables(oldName)
oldDesc.table = oldDesc.table.copy(name = newName)
catalog(db).tables.put(newName, oldDesc)
catalog(db).tables.remove(oldName)
}
- override def alterTable(db: String, table: String, tableDefinition: Table): Unit = synchronized {
- assertTableExists(db, table)
- assert(table == tableDefinition.name)
- catalog(db).tables(table).table = tableDefinition
+ override def alterTable(db: String, tableDefinition: CatalogTable): Unit = synchronized {
+ requireTableExists(db, tableDefinition.name)
+ catalog(db).tables(tableDefinition.name).table = tableDefinition
}
- override def getTable(db: String, table: String): Table = synchronized {
- assertTableExists(db, table)
+ override def getTable(db: String, table: String): CatalogTable = synchronized {
+ requireTableExists(db, table)
catalog(db).tables(table).table
}
override def listTables(db: String): Seq[String] = synchronized {
- assertDbExists(db)
+ requireDbExists(db)
catalog(db).tables.keySet.toSeq
}
override def listTables(db: String, pattern: String): Seq[String] = synchronized {
- assertDbExists(db)
+ requireDbExists(db)
filterPattern(listTables(db), pattern)
}
@@ -211,9 +210,9 @@ class InMemoryCatalog extends Catalog {
override def createPartitions(
db: String,
table: String,
- parts: Seq[TablePartition],
+ parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = synchronized {
- assertTableExists(db, table)
+ requireTableExists(db, table)
val existingParts = catalog(db).tables(table).partitions
if (!ignoreIfExists) {
val dupSpecs = parts.collect { case p if existingParts.contains(p.spec) => p.spec }
@@ -229,9 +228,9 @@ class InMemoryCatalog extends Catalog {
override def dropPartitions(
db: String,
table: String,
- partSpecs: Seq[PartitionSpec],
+ partSpecs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean): Unit = synchronized {
- assertTableExists(db, table)
+ requireTableExists(db, table)
val existingParts = catalog(db).tables(table).partitions
if (!ignoreIfNotExists) {
val missingSpecs = partSpecs.collect { case s if !existingParts.contains(s) => s }
@@ -244,30 +243,42 @@ class InMemoryCatalog extends Catalog {
partSpecs.foreach(existingParts.remove)
}
- override def alterPartition(
+ override def renamePartitions(
db: String,
table: String,
- spec: Map[String, String],
- newPart: TablePartition): Unit = synchronized {
- assertPartitionExists(db, table, spec)
- val existingParts = catalog(db).tables(table).partitions
- if (spec != newPart.spec) {
- // Also a change in specs; remove the old one and add the new one back
- existingParts.remove(spec)
+ specs: Seq[TablePartitionSpec],
+ newSpecs: Seq[TablePartitionSpec]): Unit = synchronized {
+ require(specs.size == newSpecs.size, "number of old and new partition specs differ")
+ specs.zip(newSpecs).foreach { case (oldSpec, newSpec) =>
+ val newPart = getPartition(db, table, oldSpec).copy(spec = newSpec)
+ val existingParts = catalog(db).tables(table).partitions
+ existingParts.remove(oldSpec)
+ existingParts.put(newSpec, newPart)
+ }
+ }
+
+ override def alterPartitions(
+ db: String,
+ table: String,
+ parts: Seq[CatalogTablePartition]): Unit = synchronized {
+ parts.foreach { p =>
+ requirePartitionExists(db, table, p.spec)
+ catalog(db).tables(table).partitions.put(p.spec, p)
}
- existingParts.put(newPart.spec, newPart)
}
override def getPartition(
db: String,
table: String,
- spec: Map[String, String]): TablePartition = synchronized {
- assertPartitionExists(db, table, spec)
+ spec: TablePartitionSpec): CatalogTablePartition = synchronized {
+ requirePartitionExists(db, table, spec)
catalog(db).tables(table).partitions(spec)
}
- override def listPartitions(db: String, table: String): Seq[TablePartition] = synchronized {
- assertTableExists(db, table)
+ override def listPartitions(
+ db: String,
+ table: String): Seq[CatalogTablePartition] = synchronized {
+ requireTableExists(db, table)
catalog(db).tables(table).partitions.values.toSeq
}
@@ -275,44 +286,39 @@ class InMemoryCatalog extends Catalog {
// Functions
// --------------------------------------------------------------------------
- override def createFunction(
- db: String,
- func: Function,
- ignoreIfExists: Boolean): Unit = synchronized {
- assertDbExists(db)
+ override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
+ requireDbExists(db)
if (existsFunction(db, func.name)) {
- if (!ignoreIfExists) {
- throw new AnalysisException(s"Function $func already exists in $db database")
- }
+ throw new AnalysisException(s"Function $func already exists in $db database")
} else {
catalog(db).functions.put(func.name, func)
}
}
override def dropFunction(db: String, funcName: String): Unit = synchronized {
- assertFunctionExists(db, funcName)
+ requireFunctionExists(db, funcName)
catalog(db).functions.remove(funcName)
}
- override def alterFunction(
- db: String,
- funcName: String,
- funcDefinition: Function): Unit = synchronized {
- assertFunctionExists(db, funcName)
- if (funcName != funcDefinition.name) {
- // Also a rename; remove the old one and add the new one back
- catalog(db).functions.remove(funcName)
- }
+ override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized {
+ requireFunctionExists(db, oldName)
+ val newFunc = getFunction(db, oldName).copy(name = newName)
+ catalog(db).functions.remove(oldName)
+ catalog(db).functions.put(newName, newFunc)
+ }
+
+ override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = synchronized {
+ requireFunctionExists(db, funcDefinition.name)
catalog(db).functions.put(funcDefinition.name, funcDefinition)
}
- override def getFunction(db: String, funcName: String): Function = synchronized {
- assertFunctionExists(db, funcName)
+ override def getFunction(db: String, funcName: String): CatalogFunction = synchronized {
+ requireFunctionExists(db, funcName)
catalog(db).functions(funcName)
}
override def listFunctions(db: String, pattern: String): Seq[String] = synchronized {
- assertDbExists(db)
+ requireDbExists(db)
filterPattern(catalog(db).functions.keysIterator.toSeq, pattern)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 56aaa6b..dac5f02 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst.catalog
+import javax.annotation.Nullable
+
import org.apache.spark.sql.AnalysisException
@@ -31,41 +33,59 @@ import org.apache.spark.sql.AnalysisException
abstract class Catalog {
import Catalog._
+ protected def requireDbExists(db: String): Unit = {
+ if (!databaseExists(db)) {
+ throw new AnalysisException(s"Database $db does not exist")
+ }
+ }
+
// --------------------------------------------------------------------------
// Databases
// --------------------------------------------------------------------------
- def createDatabase(dbDefinition: Database, ignoreIfExists: Boolean): Unit
+ def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit
def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
/**
- * Alter an existing database. This operation does not support renaming.
+ * Alter a database whose name matches the one specified in `dbDefinition`,
+ * assuming the database exists.
+ *
+ * Note: If the underlying implementation does not support altering a certain field,
+ * this becomes a no-op.
*/
- def alterDatabase(db: String, dbDefinition: Database): Unit
+ def alterDatabase(dbDefinition: CatalogDatabase): Unit
- def getDatabase(db: String): Database
+ def getDatabase(db: String): CatalogDatabase
+
+ def databaseExists(db: String): Boolean
def listDatabases(): Seq[String]
def listDatabases(pattern: String): Seq[String]
+ def setCurrentDatabase(db: String): Unit
+
// --------------------------------------------------------------------------
// Tables
// --------------------------------------------------------------------------
- def createTable(db: String, tableDefinition: Table, ignoreIfExists: Boolean): Unit
+ def createTable(db: String, tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit
def dropTable(db: String, table: String, ignoreIfNotExists: Boolean): Unit
def renameTable(db: String, oldName: String, newName: String): Unit
/**
- * Alter an existing table. This operation does not support renaming.
+ * Alter a table whose name that matches the one specified in `tableDefinition`,
+ * assuming the table exists.
+ *
+ * Note: If the underlying implementation does not support altering a certain field,
+ * this becomes a no-op.
*/
- def alterTable(db: String, table: String, tableDefinition: Table): Unit
+ def alterTable(db: String, tableDefinition: CatalogTable): Unit
- def getTable(db: String, table: String): Table
+ def getTable(db: String, table: String): CatalogTable
def listTables(db: String): Seq[String]
@@ -78,43 +98,62 @@ abstract class Catalog {
def createPartitions(
db: String,
table: String,
- parts: Seq[TablePartition],
+ parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit
def dropPartitions(
db: String,
table: String,
- parts: Seq[PartitionSpec],
+ parts: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean): Unit
/**
- * Alter an existing table partition and optionally override its spec.
+ * Override the specs of one or many existing table partitions, assuming they exist.
+ * This assumes index i of `specs` corresponds to index i of `newSpecs`.
+ */
+ def renamePartitions(
+ db: String,
+ table: String,
+ specs: Seq[TablePartitionSpec],
+ newSpecs: Seq[TablePartitionSpec]): Unit
+
+ /**
+ * Alter one or many table partitions whose specs that match those specified in `parts`,
+ * assuming the partitions exist.
+ *
+ * Note: If the underlying implementation does not support altering a certain field,
+ * this becomes a no-op.
*/
- def alterPartition(
+ def alterPartitions(
db: String,
table: String,
- spec: PartitionSpec,
- newPart: TablePartition): Unit
+ parts: Seq[CatalogTablePartition]): Unit
- def getPartition(db: String, table: String, spec: PartitionSpec): TablePartition
+ def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition
// TODO: support listing by pattern
- def listPartitions(db: String, table: String): Seq[TablePartition]
+ def listPartitions(db: String, table: String): Seq[CatalogTablePartition]
// --------------------------------------------------------------------------
// Functions
// --------------------------------------------------------------------------
- def createFunction(db: String, funcDefinition: Function, ignoreIfExists: Boolean): Unit
+ def createFunction(db: String, funcDefinition: CatalogFunction): Unit
def dropFunction(db: String, funcName: String): Unit
+ def renameFunction(db: String, oldName: String, newName: String): Unit
+
/**
- * Alter an existing function and optionally override its name.
+ * Alter a function whose name that matches the one specified in `funcDefinition`,
+ * assuming the function exists.
+ *
+ * Note: If the underlying implementation does not support altering a certain field,
+ * this becomes a no-op.
*/
- def alterFunction(db: String, funcName: String, funcDefinition: Function): Unit
+ def alterFunction(db: String, funcDefinition: CatalogFunction): Unit
- def getFunction(db: String, funcName: String): Function
+ def getFunction(db: String, funcName: String): CatalogFunction
def listFunctions(db: String, pattern: String): Seq[String]
@@ -127,33 +166,30 @@ abstract class Catalog {
* @param name name of the function
* @param className fully qualified class name, e.g. "org.apache.spark.util.MyFunc"
*/
-case class Function(
- name: String,
- className: String
-)
+case class CatalogFunction(name: String, className: String)
/**
* Storage format, used to describe how a partition or a table is stored.
*/
-case class StorageFormat(
- locationUri: String,
- inputFormat: String,
- outputFormat: String,
- serde: String,
- serdeProperties: Map[String, String]
-)
+case class CatalogStorageFormat(
+ locationUri: Option[String],
+ inputFormat: Option[String],
+ outputFormat: Option[String],
+ serde: Option[String],
+ serdeProperties: Map[String, String])
/**
* A column in a table.
*/
-case class Column(
- name: String,
- dataType: String,
- nullable: Boolean,
- comment: String
-)
+case class CatalogColumn(
+ name: String,
+ // This may be null when used to create views. TODO: make this type-safe; this is left
+ // as a string due to issues in converting Hive varchars to and from SparkSQL strings.
+ @Nullable dataType: String,
+ nullable: Boolean = true,
+ comment: Option[String] = None)
/**
@@ -162,10 +198,7 @@ case class Column(
* @param spec partition spec values indexed by column name
* @param storage storage format of the partition
*/
-case class TablePartition(
- spec: Catalog.PartitionSpec,
- storage: StorageFormat
-)
+case class CatalogTablePartition(spec: Catalog.TablePartitionSpec, storage: CatalogStorageFormat)
/**
@@ -174,40 +207,65 @@ case class TablePartition(
* Note that Hive's metastore also tracks skewed columns. We should consider adding that in the
* future once we have a better understanding of how we want to handle skewed columns.
*/
-case class Table(
- name: String,
- description: String,
- schema: Seq[Column],
- partitionColumns: Seq[Column],
- sortColumns: Seq[Column],
- storage: StorageFormat,
- numBuckets: Int,
- properties: Map[String, String],
- tableType: String,
- createTime: Long,
- lastAccessTime: Long,
- viewOriginalText: Option[String],
- viewText: Option[String]) {
-
- require(tableType == "EXTERNAL_TABLE" || tableType == "INDEX_TABLE" ||
- tableType == "MANAGED_TABLE" || tableType == "VIRTUAL_VIEW")
+case class CatalogTable(
+ specifiedDatabase: Option[String],
+ name: String,
+ tableType: CatalogTableType,
+ storage: CatalogStorageFormat,
+ schema: Seq[CatalogColumn],
+ partitionColumns: Seq[CatalogColumn] = Seq.empty,
+ sortColumns: Seq[CatalogColumn] = Seq.empty,
+ numBuckets: Int = 0,
+ createTime: Long = System.currentTimeMillis,
+ lastAccessTime: Long = System.currentTimeMillis,
+ properties: Map[String, String] = Map.empty,
+ viewOriginalText: Option[String] = None,
+ viewText: Option[String] = None) {
+
+ /** Return the database this table was specified to belong to, assuming it exists. */
+ def database: String = specifiedDatabase.getOrElse {
+ throw new AnalysisException(s"table $name did not specify database")
+ }
+
+ /** Return the fully qualified name of this table, assuming the database was specified. */
+ def qualifiedName: String = s"$database.$name"
+
+ /** Syntactic sugar to update a field in `storage`. */
+ def withNewStorage(
+ locationUri: Option[String] = storage.locationUri,
+ inputFormat: Option[String] = storage.inputFormat,
+ outputFormat: Option[String] = storage.outputFormat,
+ serde: Option[String] = storage.serde,
+ serdeProperties: Map[String, String] = storage.serdeProperties): CatalogTable = {
+ copy(storage = CatalogStorageFormat(
+ locationUri, inputFormat, outputFormat, serde, serdeProperties))
+ }
+
+}
+
+
+case class CatalogTableType private(name: String)
+object CatalogTableType {
+ val EXTERNAL_TABLE = new CatalogTableType("EXTERNAL_TABLE")
+ val MANAGED_TABLE = new CatalogTableType("MANAGED_TABLE")
+ val INDEX_TABLE = new CatalogTableType("INDEX_TABLE")
+ val VIRTUAL_VIEW = new CatalogTableType("VIRTUAL_VIEW")
}
/**
* A database defined in the catalog.
*/
-case class Database(
- name: String,
- description: String,
- locationUri: String,
- properties: Map[String, String]
-)
+case class CatalogDatabase(
+ name: String,
+ description: String,
+ locationUri: String,
+ properties: Map[String, String])
object Catalog {
/**
- * Specifications of a table partition indexed by column name.
+ * Specifications of a table partition. Mapping column name to column value.
*/
- type PartitionSpec = Map[String, String]
+ type TablePartitionSpec = Map[String, String]
}
http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
index 45c5cee..e0d1220 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst.catalog
+import org.scalatest.BeforeAndAfterEach
+
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
@@ -26,18 +28,38 @@ import org.apache.spark.sql.AnalysisException
*
* Implementations of the [[Catalog]] interface can create test suites by extending this.
*/
-abstract class CatalogTestCases extends SparkFunSuite {
- private val storageFormat = StorageFormat("usa", "$", "zzz", "serde", Map())
- private val part1 = TablePartition(Map("a" -> "1"), storageFormat)
- private val part2 = TablePartition(Map("b" -> "2"), storageFormat)
- private val part3 = TablePartition(Map("c" -> "3"), storageFormat)
+abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
+ private lazy val storageFormat = CatalogStorageFormat(
+ locationUri = None,
+ inputFormat = Some(tableInputFormat),
+ outputFormat = Some(tableOutputFormat),
+ serde = None,
+ serdeProperties = Map.empty)
+ private lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat)
+ private lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat)
+ private lazy val part3 = CatalogTablePartition(Map("a" -> "5", "b" -> "6"), storageFormat)
private val funcClass = "org.apache.spark.myFunc"
+ // Things subclasses should override
+ protected val tableInputFormat: String = "org.apache.park.serde.MyInputFormat"
+ protected val tableOutputFormat: String = "org.apache.park.serde.MyOutputFormat"
+ protected def newUriForDatabase(): String = "uri"
+ protected def resetState(): Unit = { }
protected def newEmptyCatalog(): Catalog
+ // Clear all state after each test
+ override def afterEach(): Unit = {
+ try {
+ resetState()
+ } finally {
+ super.afterEach()
+ }
+ }
+
/**
* Creates a basic catalog, with the following structure:
*
+ * default
* db1
* db2
* - tbl1
@@ -48,37 +70,65 @@ abstract class CatalogTestCases extends SparkFunSuite {
*/
private def newBasicCatalog(): Catalog = {
val catalog = newEmptyCatalog()
+ // When testing against a real catalog, the default database may already exist
+ catalog.createDatabase(newDb("default"), ignoreIfExists = true)
catalog.createDatabase(newDb("db1"), ignoreIfExists = false)
catalog.createDatabase(newDb("db2"), ignoreIfExists = false)
- catalog.createTable("db2", newTable("tbl1"), ignoreIfExists = false)
- catalog.createTable("db2", newTable("tbl2"), ignoreIfExists = false)
+ catalog.createTable("db2", newTable("tbl1", "db2"), ignoreIfExists = false)
+ catalog.createTable("db2", newTable("tbl2", "db2"), ignoreIfExists = false)
catalog.createPartitions("db2", "tbl2", Seq(part1, part2), ignoreIfExists = false)
- catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = false)
+ catalog.createFunction("db2", newFunc("func1"))
catalog
}
- private def newFunc(): Function = Function("funcname", funcClass)
+ private def newFunc(): CatalogFunction = CatalogFunction("funcname", funcClass)
+
+ private def newDb(name: String): CatalogDatabase = {
+ CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty)
+ }
+
+ private def newTable(name: String, db: String): CatalogTable = {
+ CatalogTable(
+ specifiedDatabase = Some(db),
+ name = name,
+ tableType = CatalogTableType.EXTERNAL_TABLE,
+ storage = storageFormat,
+ schema = Seq(CatalogColumn("col1", "int"), CatalogColumn("col2", "string")),
+ partitionColumns = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string")))
+ }
- private def newDb(name: String = "default"): Database =
- Database(name, name + " description", "uri", Map.empty)
+ private def newFunc(name: String): CatalogFunction = CatalogFunction(name, funcClass)
- private def newTable(name: String): Table =
- Table(name, "", Seq.empty, Seq.empty, Seq.empty, null, 0, Map.empty, "EXTERNAL_TABLE", 0, 0,
- None, None)
+ /**
+ * Whether the catalog's table partitions equal the ones given.
+ * Note: Hive sets some random serde things, so we just compare the specs here.
+ */
+ private def catalogPartitionsEqual(
+ catalog: Catalog,
+ db: String,
+ table: String,
+ parts: Seq[CatalogTablePartition]): Boolean = {
+ catalog.listPartitions(db, table).map(_.spec).toSet == parts.map(_.spec).toSet
+ }
- private def newFunc(name: String): Function = Function(name, funcClass)
// --------------------------------------------------------------------------
// Databases
// --------------------------------------------------------------------------
- test("basic create, drop and list databases") {
+ test("basic create and list databases") {
val catalog = newEmptyCatalog()
- catalog.createDatabase(newDb(), ignoreIfExists = false)
- assert(catalog.listDatabases().toSet == Set("default"))
-
- catalog.createDatabase(newDb("default2"), ignoreIfExists = false)
- assert(catalog.listDatabases().toSet == Set("default", "default2"))
+ catalog.createDatabase(newDb("default"), ignoreIfExists = true)
+ assert(catalog.databaseExists("default"))
+ assert(!catalog.databaseExists("testing"))
+ assert(!catalog.databaseExists("testing2"))
+ catalog.createDatabase(newDb("testing"), ignoreIfExists = false)
+ assert(catalog.databaseExists("testing"))
+ assert(catalog.listDatabases().toSet == Set("default", "testing"))
+ catalog.createDatabase(newDb("testing2"), ignoreIfExists = false)
+ assert(catalog.listDatabases().toSet == Set("default", "testing", "testing2"))
+ assert(catalog.databaseExists("testing2"))
+ assert(!catalog.databaseExists("does_not_exist"))
}
test("get database when a database exists") {
@@ -93,7 +143,7 @@ abstract class CatalogTestCases extends SparkFunSuite {
test("list databases without pattern") {
val catalog = newBasicCatalog()
- assert(catalog.listDatabases().toSet == Set("db1", "db2"))
+ assert(catalog.listDatabases().toSet == Set("default", "db1", "db2"))
}
test("list databases with pattern") {
@@ -107,7 +157,7 @@ abstract class CatalogTestCases extends SparkFunSuite {
test("drop database") {
val catalog = newBasicCatalog()
catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = false)
- assert(catalog.listDatabases().toSet == Set("db2"))
+ assert(catalog.listDatabases().toSet == Set("default", "db2"))
}
test("drop database when the database is not empty") {
@@ -118,6 +168,7 @@ abstract class CatalogTestCases extends SparkFunSuite {
intercept[AnalysisException] {
catalog1.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
}
+ resetState()
// Throw exception if there are tables left
val catalog2 = newBasicCatalog()
@@ -125,11 +176,12 @@ abstract class CatalogTestCases extends SparkFunSuite {
intercept[AnalysisException] {
catalog2.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
}
+ resetState()
// When cascade is true, it should drop them
val catalog3 = newBasicCatalog()
catalog3.dropDatabase("db2", ignoreIfNotExists = false, cascade = true)
- assert(catalog3.listDatabases().toSet == Set("db1"))
+ assert(catalog3.listDatabases().toSet == Set("default", "db1"))
}
test("drop database when the database does not exist") {
@@ -144,13 +196,19 @@ abstract class CatalogTestCases extends SparkFunSuite {
test("alter database") {
val catalog = newBasicCatalog()
- catalog.alterDatabase("db1", Database("db1", "new description", "lll", Map.empty))
- assert(catalog.getDatabase("db1").description == "new description")
+ val db1 = catalog.getDatabase("db1")
+ // Note: alter properties here because Hive does not support altering other fields
+ catalog.alterDatabase(db1.copy(properties = Map("k" -> "v3", "good" -> "true")))
+ val newDb1 = catalog.getDatabase("db1")
+ assert(db1.properties.isEmpty)
+ assert(newDb1.properties.size == 2)
+ assert(newDb1.properties.get("k") == Some("v3"))
+ assert(newDb1.properties.get("good") == Some("true"))
}
test("alter database should throw exception when the database does not exist") {
intercept[AnalysisException] {
- newBasicCatalog().alterDatabase("no_db", Database("no_db", "ddd", "lll", Map.empty))
+ newBasicCatalog().alterDatabase(newDb("does_not_exist"))
}
}
@@ -165,61 +223,56 @@ abstract class CatalogTestCases extends SparkFunSuite {
assert(catalog.listTables("db2").toSet == Set("tbl2"))
}
- test("drop table when database / table does not exist") {
+ test("drop table when database/table does not exist") {
val catalog = newBasicCatalog()
-
// Should always throw exception when the database does not exist
intercept[AnalysisException] {
catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = false)
}
-
intercept[AnalysisException] {
catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = true)
}
-
// Should throw exception when the table does not exist, if ignoreIfNotExists is false
intercept[AnalysisException] {
catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = false)
}
-
catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = true)
}
test("rename table") {
val catalog = newBasicCatalog()
-
assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
catalog.renameTable("db2", "tbl1", "tblone")
assert(catalog.listTables("db2").toSet == Set("tblone", "tbl2"))
}
- test("rename table when database / table does not exist") {
+ test("rename table when database/table does not exist") {
val catalog = newBasicCatalog()
-
- intercept[AnalysisException] { // Throw exception when the database does not exist
+ intercept[AnalysisException] {
catalog.renameTable("unknown_db", "unknown_table", "unknown_table")
}
-
- intercept[AnalysisException] { // Throw exception when the table does not exist
+ intercept[AnalysisException] {
catalog.renameTable("db2", "unknown_table", "unknown_table")
}
}
test("alter table") {
val catalog = newBasicCatalog()
- catalog.alterTable("db2", "tbl1", newTable("tbl1").copy(createTime = 10))
- assert(catalog.getTable("db2", "tbl1").createTime == 10)
+ val tbl1 = catalog.getTable("db2", "tbl1")
+ catalog.alterTable("db2", tbl1.copy(properties = Map("toh" -> "frem")))
+ val newTbl1 = catalog.getTable("db2", "tbl1")
+ assert(!tbl1.properties.contains("toh"))
+ assert(newTbl1.properties.size == tbl1.properties.size + 1)
+ assert(newTbl1.properties.get("toh") == Some("frem"))
}
- test("alter table when database / table does not exist") {
+ test("alter table when database/table does not exist") {
val catalog = newBasicCatalog()
-
- intercept[AnalysisException] { // Throw exception when the database does not exist
- catalog.alterTable("unknown_db", "unknown_table", newTable("unknown_table"))
+ intercept[AnalysisException] {
+ catalog.alterTable("unknown_db", newTable("tbl1", "unknown_db"))
}
-
- intercept[AnalysisException] { // Throw exception when the table does not exist
- catalog.alterTable("db2", "unknown_table", newTable("unknown_table"))
+ intercept[AnalysisException] {
+ catalog.alterTable("db2", newTable("unknown_table", "db2"))
}
}
@@ -227,12 +280,11 @@ abstract class CatalogTestCases extends SparkFunSuite {
assert(newBasicCatalog().getTable("db2", "tbl1").name == "tbl1")
}
- test("get table when database / table does not exist") {
+ test("get table when database/table does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.getTable("unknown_db", "unknown_table")
}
-
intercept[AnalysisException] {
catalog.getTable("db2", "unknown_table")
}
@@ -246,10 +298,7 @@ abstract class CatalogTestCases extends SparkFunSuite {
test("list tables with pattern") {
val catalog = newBasicCatalog()
-
- // Test when database does not exist
intercept[AnalysisException] { catalog.listTables("unknown_db") }
-
assert(catalog.listTables("db1", "*").toSet == Set.empty)
assert(catalog.listTables("db2", "*").toSet == Set("tbl1", "tbl2"))
assert(catalog.listTables("db2", "tbl*").toSet == Set("tbl1", "tbl2"))
@@ -263,12 +312,12 @@ abstract class CatalogTestCases extends SparkFunSuite {
test("basic create and list partitions") {
val catalog = newEmptyCatalog()
catalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
- catalog.createTable("mydb", newTable("mytbl"), ignoreIfExists = false)
- catalog.createPartitions("mydb", "mytbl", Seq(part1, part2), ignoreIfExists = false)
- assert(catalog.listPartitions("mydb", "mytbl").toSet == Set(part1, part2))
+ catalog.createTable("mydb", newTable("tbl", "mydb"), ignoreIfExists = false)
+ catalog.createPartitions("mydb", "tbl", Seq(part1, part2), ignoreIfExists = false)
+ assert(catalogPartitionsEqual(catalog, "mydb", "tbl", Seq(part1, part2)))
}
- test("create partitions when database / table does not exist") {
+ test("create partitions when database/table does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.createPartitions("does_not_exist", "tbl1", Seq(), ignoreIfExists = false)
@@ -288,16 +337,17 @@ abstract class CatalogTestCases extends SparkFunSuite {
test("drop partitions") {
val catalog = newBasicCatalog()
- assert(catalog.listPartitions("db2", "tbl2").toSet == Set(part1, part2))
+ assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2)))
catalog.dropPartitions("db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false)
- assert(catalog.listPartitions("db2", "tbl2").toSet == Set(part2))
+ assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part2)))
+ resetState()
val catalog2 = newBasicCatalog()
- assert(catalog2.listPartitions("db2", "tbl2").toSet == Set(part1, part2))
+ assert(catalogPartitionsEqual(catalog2, "db2", "tbl2", Seq(part1, part2)))
catalog2.dropPartitions("db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false)
assert(catalog2.listPartitions("db2", "tbl2").isEmpty)
}
- test("drop partitions when database / table does not exist") {
+ test("drop partitions when database/table does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.dropPartitions("does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false)
@@ -317,14 +367,14 @@ abstract class CatalogTestCases extends SparkFunSuite {
test("get partition") {
val catalog = newBasicCatalog()
- assert(catalog.getPartition("db2", "tbl2", part1.spec) == part1)
- assert(catalog.getPartition("db2", "tbl2", part2.spec) == part2)
+ assert(catalog.getPartition("db2", "tbl2", part1.spec).spec == part1.spec)
+ assert(catalog.getPartition("db2", "tbl2", part2.spec).spec == part2.spec)
intercept[AnalysisException] {
catalog.getPartition("db2", "tbl1", part3.spec)
}
}
- test("get partition when database / table does not exist") {
+ test("get partition when database/table does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.getPartition("does_not_exist", "tbl1", part1.spec)
@@ -334,28 +384,69 @@ abstract class CatalogTestCases extends SparkFunSuite {
}
}
- test("alter partitions") {
+ test("rename partitions") {
+ val catalog = newBasicCatalog()
+ val newPart1 = part1.copy(spec = Map("a" -> "100", "b" -> "101"))
+ val newPart2 = part2.copy(spec = Map("a" -> "200", "b" -> "201"))
+ val newSpecs = Seq(newPart1.spec, newPart2.spec)
+ catalog.renamePartitions("db2", "tbl2", Seq(part1.spec, part2.spec), newSpecs)
+ assert(catalog.getPartition("db2", "tbl2", newPart1.spec).spec === newPart1.spec)
+ assert(catalog.getPartition("db2", "tbl2", newPart2.spec).spec === newPart2.spec)
+ // The old partitions should no longer exist
+ intercept[AnalysisException] { catalog.getPartition("db2", "tbl2", part1.spec) }
+ intercept[AnalysisException] { catalog.getPartition("db2", "tbl2", part2.spec) }
+ }
+
+ test("rename partitions when database/table does not exist") {
val catalog = newBasicCatalog()
- val partSameSpec = part1.copy(storage = storageFormat.copy(serde = "myserde"))
- val partNewSpec = part1.copy(spec = Map("x" -> "10"))
- // alter but keep spec the same
- catalog.alterPartition("db2", "tbl2", part1.spec, partSameSpec)
- assert(catalog.getPartition("db2", "tbl2", part1.spec) == partSameSpec)
- // alter and change spec
- catalog.alterPartition("db2", "tbl2", part1.spec, partNewSpec)
intercept[AnalysisException] {
- catalog.getPartition("db2", "tbl2", part1.spec)
+ catalog.renamePartitions("does_not_exist", "tbl1", Seq(part1.spec), Seq(part2.spec))
+ }
+ intercept[AnalysisException] {
+ catalog.renamePartitions("db2", "does_not_exist", Seq(part1.spec), Seq(part2.spec))
}
- assert(catalog.getPartition("db2", "tbl2", partNewSpec.spec) == partNewSpec)
}
- test("alter partition when database / table does not exist") {
+ test("alter partitions") {
+ val catalog = newBasicCatalog()
+ try{
+ // Note: Before altering table partitions in Hive, you *must* set the current database
+ // to the one that contains the table of interest. Otherwise you will end up with the
+ // most helpful error message ever: "Unable to alter partition. alter is not possible."
+ // See HIVE-2742 for more detail.
+ catalog.setCurrentDatabase("db2")
+ val newLocation = newUriForDatabase()
+ // alter but keep spec the same
+ val oldPart1 = catalog.getPartition("db2", "tbl2", part1.spec)
+ val oldPart2 = catalog.getPartition("db2", "tbl2", part2.spec)
+ catalog.alterPartitions("db2", "tbl2", Seq(
+ oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocation))),
+ oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocation)))))
+ val newPart1 = catalog.getPartition("db2", "tbl2", part1.spec)
+ val newPart2 = catalog.getPartition("db2", "tbl2", part2.spec)
+ assert(newPart1.storage.locationUri == Some(newLocation))
+ assert(newPart2.storage.locationUri == Some(newLocation))
+ assert(oldPart1.storage.locationUri != Some(newLocation))
+ assert(oldPart2.storage.locationUri != Some(newLocation))
+ // alter but change spec, should fail because new partition specs do not exist yet
+ val badPart1 = part1.copy(spec = Map("a" -> "v1", "b" -> "v2"))
+ val badPart2 = part2.copy(spec = Map("a" -> "v3", "b" -> "v4"))
+ intercept[AnalysisException] {
+ catalog.alterPartitions("db2", "tbl2", Seq(badPart1, badPart2))
+ }
+ } finally {
+ // Remember to restore the original current database, which we assume to be "default"
+ catalog.setCurrentDatabase("default")
+ }
+ }
+
+ test("alter partitions when database/table does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
- catalog.alterPartition("does_not_exist", "tbl1", part1.spec, part1)
+ catalog.alterPartitions("does_not_exist", "tbl1", Seq(part1))
}
intercept[AnalysisException] {
- catalog.alterPartition("db2", "does_not_exist", part1.spec, part1)
+ catalog.alterPartitions("db2", "does_not_exist", Seq(part1))
}
}
@@ -366,23 +457,22 @@ abstract class CatalogTestCases extends SparkFunSuite {
test("basic create and list functions") {
val catalog = newEmptyCatalog()
catalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
- catalog.createFunction("mydb", newFunc("myfunc"), ignoreIfExists = false)
+ catalog.createFunction("mydb", newFunc("myfunc"))
assert(catalog.listFunctions("mydb", "*").toSet == Set("myfunc"))
}
test("create function when database does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
- catalog.createFunction("does_not_exist", newFunc(), ignoreIfExists = false)
+ catalog.createFunction("does_not_exist", newFunc())
}
}
test("create function that already exists") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
- catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = false)
+ catalog.createFunction("db2", newFunc("func1"))
}
- catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = true)
}
test("drop function") {
@@ -421,31 +511,43 @@ abstract class CatalogTestCases extends SparkFunSuite {
}
}
- test("alter function") {
+ test("rename function") {
val catalog = newBasicCatalog()
+ val newName = "funcky"
assert(catalog.getFunction("db2", "func1").className == funcClass)
- // alter func but keep name
- catalog.alterFunction("db2", "func1", newFunc("func1").copy(className = "muhaha"))
- assert(catalog.getFunction("db2", "func1").className == "muhaha")
- // alter func and change name
- catalog.alterFunction("db2", "func1", newFunc("funcky"))
+ catalog.renameFunction("db2", "func1", newName)
+ intercept[AnalysisException] { catalog.getFunction("db2", "func1") }
+ assert(catalog.getFunction("db2", newName).name == newName)
+ assert(catalog.getFunction("db2", newName).className == funcClass)
+ intercept[AnalysisException] { catalog.renameFunction("db2", "does_not_exist", "me") }
+ }
+
+ test("rename function when database does not exist") {
+ val catalog = newBasicCatalog()
intercept[AnalysisException] {
- catalog.getFunction("db2", "func1")
+ catalog.renameFunction("does_not_exist", "func1", "func5")
}
- assert(catalog.getFunction("db2", "funcky").className == funcClass)
+ }
+
+ test("alter function") {
+ val catalog = newBasicCatalog()
+ assert(catalog.getFunction("db2", "func1").className == funcClass)
+ catalog.alterFunction("db2", newFunc("func1").copy(className = "muhaha"))
+ assert(catalog.getFunction("db2", "func1").className == "muhaha")
+ intercept[AnalysisException] { catalog.alterFunction("db2", newFunc("funcky")) }
}
test("alter function when database does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
- catalog.alterFunction("does_not_exist", "func1", newFunc())
+ catalog.alterFunction("does_not_exist", newFunc())
}
}
test("list functions") {
val catalog = newBasicCatalog()
- catalog.createFunction("db2", newFunc("func2"), ignoreIfExists = false)
- catalog.createFunction("db2", newFunc("not_me"), ignoreIfExists = false)
+ catalog.createFunction("db2", newFunc("func2"))
+ catalog.createFunction("db2", newFunc("not_me"))
assert(catalog.listFunctions("db2", "*").toSet == Set("func1", "func2", "not_me"))
assert(catalog.listFunctions("db2", "func*").toSet == Set("func1", "func2"))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
new file mode 100644
index 0000000..21b9cfb
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.hive.ql.metadata.HiveException
+import org.apache.thrift.TException
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.NoSuchItemException
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.hive.client.HiveClient
+
+
+/**
+ * A persistent implementation of the system catalog using Hive.
+ * All public methods must be synchronized for thread-safety.
+ */
+private[spark] class HiveCatalog(client: HiveClient) extends Catalog with Logging {
+ import Catalog._
+
+ // Exceptions thrown by the hive client that we would like to wrap
+ private val clientExceptions = Set(
+ classOf[HiveException].getCanonicalName,
+ classOf[TException].getCanonicalName)
+
+ /**
+ * Whether this is an exception thrown by the hive client that should be wrapped.
+ *
+ * Due to classloader isolation issues, pattern matching won't work here so we need
+ * to compare the canonical names of the exceptions, which we assume to be stable.
+ */
+ private def isClientException(e: Throwable): Boolean = {
+ var temp: Class[_] = e.getClass
+ var found = false
+ while (temp != null && !found) {
+ found = clientExceptions.contains(temp.getCanonicalName)
+ temp = temp.getSuperclass
+ }
+ found
+ }
+
+ /**
+ * Run some code involving `client` in a [[synchronized]] block and wrap certain
+ * exceptions thrown in the process in [[AnalysisException]].
+ */
+ private def withClient[T](body: => T): T = synchronized {
+ try {
+ body
+ } catch {
+ case e: NoSuchItemException =>
+ throw new AnalysisException(e.getMessage)
+ case NonFatal(e) if isClientException(e) =>
+ throw new AnalysisException(e.getClass.getCanonicalName + ": " + e.getMessage)
+ }
+ }
+
+ private def requireDbMatches(db: String, table: CatalogTable): Unit = {
+ if (table.specifiedDatabase != Some(db)) {
+ throw new AnalysisException(
+ s"Provided database $db does not much the one specified in the " +
+ s"table definition (${table.specifiedDatabase.getOrElse("n/a")})")
+ }
+ }
+
+ private def requireTableExists(db: String, table: String): Unit = {
+ withClient { getTable(db, table) }
+ }
+
+
+ // --------------------------------------------------------------------------
+ // Databases
+ // --------------------------------------------------------------------------
+
+ override def createDatabase(
+ dbDefinition: CatalogDatabase,
+ ignoreIfExists: Boolean): Unit = withClient {
+ client.createDatabase(dbDefinition, ignoreIfExists)
+ }
+
+ override def dropDatabase(
+ db: String,
+ ignoreIfNotExists: Boolean,
+ cascade: Boolean): Unit = withClient {
+ client.dropDatabase(db, ignoreIfNotExists, cascade)
+ }
+
+ /**
+ * Alter a database whose name matches the one specified in `dbDefinition`,
+ * assuming the database exists.
+ *
+ * Note: As of now, this only supports altering database properties!
+ */
+ override def alterDatabase(dbDefinition: CatalogDatabase): Unit = withClient {
+ val existingDb = getDatabase(dbDefinition.name)
+ if (existingDb.properties == dbDefinition.properties) {
+ logWarning(s"Request to alter database ${dbDefinition.name} is a no-op because " +
+ s"the provided database properties are the same as the old ones. Hive does not " +
+ s"currently support altering other database fields.")
+ }
+ client.alterDatabase(dbDefinition)
+ }
+
+ override def getDatabase(db: String): CatalogDatabase = withClient {
+ client.getDatabase(db)
+ }
+
+ override def databaseExists(db: String): Boolean = withClient {
+ client.getDatabaseOption(db).isDefined
+ }
+
+ override def listDatabases(): Seq[String] = withClient {
+ client.listDatabases("*")
+ }
+
+ override def listDatabases(pattern: String): Seq[String] = withClient {
+ client.listDatabases(pattern)
+ }
+
+ override def setCurrentDatabase(db: String): Unit = withClient {
+ client.setCurrentDatabase(db)
+ }
+
+ // --------------------------------------------------------------------------
+ // Tables
+ // --------------------------------------------------------------------------
+
+ override def createTable(
+ db: String,
+ tableDefinition: CatalogTable,
+ ignoreIfExists: Boolean): Unit = withClient {
+ requireDbExists(db)
+ requireDbMatches(db, tableDefinition)
+ client.createTable(tableDefinition, ignoreIfExists)
+ }
+
+ override def dropTable(
+ db: String,
+ table: String,
+ ignoreIfNotExists: Boolean): Unit = withClient {
+ requireDbExists(db)
+ client.dropTable(db, table, ignoreIfNotExists)
+ }
+
+ override def renameTable(db: String, oldName: String, newName: String): Unit = withClient {
+ val newTable = client.getTable(db, oldName).copy(name = newName)
+ client.alterTable(oldName, newTable)
+ }
+
+ /**
+ * Alter a table whose name that matches the one specified in `tableDefinition`,
+ * assuming the table exists.
+ *
+ * Note: As of now, this only supports altering table properties, serde properties,
+ * and num buckets!
+ */
+ override def alterTable(db: String, tableDefinition: CatalogTable): Unit = withClient {
+ requireDbMatches(db, tableDefinition)
+ requireTableExists(db, tableDefinition.name)
+ client.alterTable(tableDefinition)
+ }
+
+ override def getTable(db: String, table: String): CatalogTable = withClient {
+ client.getTable(db, table)
+ }
+
+ override def listTables(db: String): Seq[String] = withClient {
+ requireDbExists(db)
+ client.listTables(db)
+ }
+
+ override def listTables(db: String, pattern: String): Seq[String] = withClient {
+ requireDbExists(db)
+ client.listTables(db, pattern)
+ }
+
+ // --------------------------------------------------------------------------
+ // Partitions
+ // --------------------------------------------------------------------------
+
+ override def createPartitions(
+ db: String,
+ table: String,
+ parts: Seq[CatalogTablePartition],
+ ignoreIfExists: Boolean): Unit = withClient {
+ requireTableExists(db, table)
+ client.createPartitions(db, table, parts, ignoreIfExists)
+ }
+
+ override def dropPartitions(
+ db: String,
+ table: String,
+ parts: Seq[TablePartitionSpec],
+ ignoreIfNotExists: Boolean): Unit = withClient {
+ requireTableExists(db, table)
+ // Note: Unfortunately Hive does not currently support `ignoreIfNotExists` so we
+ // need to implement it here ourselves. This is currently somewhat expensive because
+ // we make multiple synchronous calls to Hive for each partition we want to drop.
+ val partsToDrop =
+ if (ignoreIfNotExists) {
+ parts.filter { spec =>
+ try {
+ getPartition(db, table, spec)
+ true
+ } catch {
+ // Filter out the partitions that do not actually exist
+ case _: AnalysisException => false
+ }
+ }
+ } else {
+ parts
+ }
+ if (partsToDrop.nonEmpty) {
+ client.dropPartitions(db, table, partsToDrop)
+ }
+ }
+
+ override def renamePartitions(
+ db: String,
+ table: String,
+ specs: Seq[TablePartitionSpec],
+ newSpecs: Seq[TablePartitionSpec]): Unit = withClient {
+ client.renamePartitions(db, table, specs, newSpecs)
+ }
+
+ override def alterPartitions(
+ db: String,
+ table: String,
+ newParts: Seq[CatalogTablePartition]): Unit = withClient {
+ client.alterPartitions(db, table, newParts)
+ }
+
+ override def getPartition(
+ db: String,
+ table: String,
+ spec: TablePartitionSpec): CatalogTablePartition = withClient {
+ client.getPartition(db, table, spec)
+ }
+
+ override def listPartitions(
+ db: String,
+ table: String): Seq[CatalogTablePartition] = withClient {
+ client.getAllPartitions(db, table)
+ }
+
+ // --------------------------------------------------------------------------
+ // Functions
+ // --------------------------------------------------------------------------
+
+ override def createFunction(
+ db: String,
+ funcDefinition: CatalogFunction): Unit = withClient {
+ client.createFunction(db, funcDefinition)
+ }
+
+ override def dropFunction(db: String, name: String): Unit = withClient {
+ client.dropFunction(db, name)
+ }
+
+ override def renameFunction(db: String, oldName: String, newName: String): Unit = withClient {
+ client.renameFunction(db, oldName, newName)
+ }
+
+ override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = withClient {
+ client.alterFunction(db, funcDefinition)
+ }
+
+ override def getFunction(db: String, funcName: String): CatalogFunction = withClient {
+ client.getFunction(db, funcName)
+ }
+
+ override def listFunctions(db: String, pattern: String): Seq[String] = withClient {
+ client.listFunctions(db, pattern)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index c222b00..3788736 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -25,15 +25,16 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.metastore.Warehouse
+import org.apache.hadoop.hive.metastore.{TableType => HiveTableType, Warehouse}
import org.apache.hadoop.hive.metastore.api.FieldSchema
-import org.apache.hadoop.hive.ql.metadata._
+import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable, _}
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.spark.Logging
import org.apache.spark.sql.{AnalysisException, SaveMode, SQLContext}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog}
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
@@ -96,6 +97,8 @@ private[hive] object HiveSerDe {
}
}
+
+// TODO: replace this with o.a.s.sql.hive.HiveCatalog once we merge SQLContext and HiveContext
private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveContext)
extends Catalog with Logging {
@@ -107,16 +110,16 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
/** A fully qualified identifier for a table (i.e., database.tableName) */
case class QualifiedTableName(database: String, name: String)
- private def getQualifiedTableName(tableIdent: TableIdentifier) = {
+ private def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = {
QualifiedTableName(
tableIdent.database.getOrElse(client.currentDatabase).toLowerCase,
tableIdent.table.toLowerCase)
}
- private def getQualifiedTableName(hiveTable: HiveTable) = {
+ private def getQualifiedTableName(t: CatalogTable): QualifiedTableName = {
QualifiedTableName(
- hiveTable.specifiedDatabase.getOrElse(client.currentDatabase).toLowerCase,
- hiveTable.name.toLowerCase)
+ t.specifiedDatabase.getOrElse(client.currentDatabase).toLowerCase,
+ t.name.toLowerCase)
}
/** A cache of Spark SQL data source tables that have been accessed. */
@@ -175,7 +178,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
// It does not appear that the ql client for the metastore has a way to enumerate all the
// SerDe properties directly...
- val options = table.serdeProperties
+ val options = table.storage.serdeProperties
val resolvedRelation =
ResolvedDataSource(
@@ -276,53 +279,54 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
val tableType = if (isExternal) {
tableProperties.put("EXTERNAL", "TRUE")
- ExternalTable
+ CatalogTableType.EXTERNAL_TABLE
} else {
tableProperties.put("EXTERNAL", "FALSE")
- ManagedTable
+ CatalogTableType.MANAGED_TABLE
}
val maybeSerDe = HiveSerDe.sourceToSerDe(provider, hive.hiveconf)
val dataSource = ResolvedDataSource(
hive, userSpecifiedSchema, partitionColumns, bucketSpec, provider, options)
- def newSparkSQLSpecificMetastoreTable(): HiveTable = {
- HiveTable(
+ def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
+ CatalogTable(
specifiedDatabase = Option(dbName),
name = tblName,
- schema = Nil,
- partitionColumns = Nil,
tableType = tableType,
- properties = tableProperties.toMap,
- serdeProperties = options)
+ schema = Nil,
+ storage = CatalogStorageFormat(
+ locationUri = None,
+ inputFormat = None,
+ outputFormat = None,
+ serde = None,
+ serdeProperties = options
+ ),
+ properties = tableProperties.toMap)
}
- def newHiveCompatibleMetastoreTable(relation: HadoopFsRelation, serde: HiveSerDe): HiveTable = {
- def schemaToHiveColumn(schema: StructType): Seq[HiveColumn] = {
- schema.map { field =>
- HiveColumn(
- name = field.name,
- hiveType = HiveMetastoreTypes.toMetastoreType(field.dataType),
- comment = "")
- }
- }
-
+ def newHiveCompatibleMetastoreTable(
+ relation: HadoopFsRelation,
+ serde: HiveSerDe): CatalogTable = {
assert(partitionColumns.isEmpty)
assert(relation.partitionColumns.isEmpty)
- HiveTable(
+ CatalogTable(
specifiedDatabase = Option(dbName),
name = tblName,
- schema = schemaToHiveColumn(relation.schema),
- partitionColumns = Nil,
tableType = tableType,
+ storage = CatalogStorageFormat(
+ locationUri = Some(relation.paths.head),
+ inputFormat = serde.inputFormat,
+ outputFormat = serde.outputFormat,
+ serde = serde.serde,
+ serdeProperties = options
+ ),
+ schema = relation.schema.map { f =>
+ CatalogColumn(f.name, HiveMetastoreTypes.toMetastoreType(f.dataType))
+ },
properties = tableProperties.toMap,
- serdeProperties = options,
- location = Some(relation.paths.head),
- viewText = None, // TODO We need to place the SQL string here.
- inputFormat = serde.inputFormat,
- outputFormat = serde.outputFormat,
- serde = serde.serde)
+ viewText = None) // TODO: We need to place the SQL string here
}
// TODO: Support persisting partitioned data source relations in Hive compatible format
@@ -379,7 +383,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
// specific way.
try {
logInfo(message)
- client.createTable(table)
+ client.createTable(table, ignoreIfExists = false)
} catch {
case throwable: Throwable =>
val warningMessage =
@@ -387,20 +391,20 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
s"it into Hive metastore in Spark SQL specific format."
logWarning(warningMessage, throwable)
val sparkSqlSpecificTable = newSparkSQLSpecificMetastoreTable()
- client.createTable(sparkSqlSpecificTable)
+ client.createTable(sparkSqlSpecificTable, ignoreIfExists = false)
}
case (None, message) =>
logWarning(message)
val hiveTable = newSparkSQLSpecificMetastoreTable()
- client.createTable(hiveTable)
+ client.createTable(hiveTable, ignoreIfExists = false)
}
}
def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = {
// Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName)
val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
- new Path(new Path(client.getDatabase(dbName).location), tblName).toString
+ new Path(new Path(client.getDatabase(dbName).locationUri), tblName).toString
}
override def tableExists(tableIdent: TableIdentifier): Boolean = {
@@ -420,7 +424,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
// Then, if alias is specified, wrap the table with a Subquery using the alias.
// Otherwise, wrap the table with a Subquery using the table name.
alias.map(a => SubqueryAlias(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)
- } else if (table.tableType == VirtualView) {
+ } else if (table.tableType == CatalogTableType.VIRTUAL_VIEW) {
val viewText = table.viewText.getOrElse(sys.error("Invalid view without text."))
alias match {
// because hive use things like `_c0` to build the expanded text
@@ -429,7 +433,8 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
case Some(aliasText) => SubqueryAlias(aliasText, hive.parseSql(viewText))
}
} else {
- MetastoreRelation(qualifiedTableName.database, qualifiedTableName.name, alias)(table)(hive)
+ MetastoreRelation(
+ qualifiedTableName.database, qualifiedTableName.name, alias)(table, client, hive)
}
}
@@ -602,16 +607,14 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
val schema = if (table.schema.nonEmpty) {
table.schema
} else {
- child.output.map {
- attr => new HiveColumn(
- attr.name,
- HiveMetastoreTypes.toMetastoreType(attr.dataType), null)
+ child.output.map { a =>
+ CatalogColumn(a.name, HiveMetastoreTypes.toMetastoreType(a.dataType), a.nullable)
}
}
val desc = table.copy(schema = schema)
- if (hive.convertCTAS && table.serde.isEmpty) {
+ if (hive.convertCTAS && table.storage.serde.isEmpty) {
// Do the conversion when spark.sql.hive.convertCTAS is true and the query
// does not specify any storage format (file format and storage handler).
if (table.specifiedDatabase.isDefined) {
@@ -632,9 +635,9 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
child
)
} else {
- val desc = if (table.serde.isEmpty) {
+ val desc = if (table.storage.serde.isEmpty) {
// add default serde
- table.copy(
+ table.withNewStorage(
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
} else {
table
@@ -744,10 +747,13 @@ private[hive] case class InsertIntoHiveTable(
}
}
-private[hive] case class MetastoreRelation
- (databaseName: String, tableName: String, alias: Option[String])
- (val table: HiveTable)
- (@transient private val sqlContext: SQLContext)
+private[hive] case class MetastoreRelation(
+ databaseName: String,
+ tableName: String,
+ alias: Option[String])
+ (val table: CatalogTable,
+ @transient private val client: HiveClient,
+ @transient private val sqlContext: SQLContext)
extends LeafNode with MultiInstanceRelation with FileRelation {
override def equals(other: Any): Boolean = other match {
@@ -765,7 +771,12 @@ private[hive] case class MetastoreRelation
override protected def otherCopyArgs: Seq[AnyRef] = table :: sqlContext :: Nil
- @transient val hiveQlTable: Table = {
+ private def toHiveColumn(c: CatalogColumn): FieldSchema = {
+ new FieldSchema(c.name, c.dataType, c.comment.orNull)
+ }
+
+ // TODO: merge this with HiveClientImpl#toHiveTable
+ @transient val hiveQlTable: HiveTable = {
// We start by constructing an API table as Hive performs several important transformations
// internally when converting an API table to a QL table.
val tTable = new org.apache.hadoop.hive.metastore.api.Table()
@@ -776,27 +787,31 @@ private[hive] case class MetastoreRelation
tTable.setParameters(tableParameters)
table.properties.foreach { case (k, v) => tableParameters.put(k, v) }
- tTable.setTableType(table.tableType.name)
+ tTable.setTableType(table.tableType match {
+ case CatalogTableType.EXTERNAL_TABLE => HiveTableType.EXTERNAL_TABLE.toString
+ case CatalogTableType.MANAGED_TABLE => HiveTableType.MANAGED_TABLE.toString
+ case CatalogTableType.INDEX_TABLE => HiveTableType.INDEX_TABLE.toString
+ case CatalogTableType.VIRTUAL_VIEW => HiveTableType.VIRTUAL_VIEW.toString
+ })
val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
tTable.setSd(sd)
- sd.setCols(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
- tTable.setPartitionKeys(
- table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
+ sd.setCols(table.schema.map(toHiveColumn).asJava)
+ tTable.setPartitionKeys(table.partitionColumns.map(toHiveColumn).asJava)
- table.location.foreach(sd.setLocation)
- table.inputFormat.foreach(sd.setInputFormat)
- table.outputFormat.foreach(sd.setOutputFormat)
+ table.storage.locationUri.foreach(sd.setLocation)
+ table.storage.inputFormat.foreach(sd.setInputFormat)
+ table.storage.outputFormat.foreach(sd.setOutputFormat)
val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
- table.serde.foreach(serdeInfo.setSerializationLib)
+ table.storage.serde.foreach(serdeInfo.setSerializationLib)
sd.setSerdeInfo(serdeInfo)
val serdeParameters = new java.util.HashMap[String, String]()
- table.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
+ table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
serdeInfo.setParameters(serdeParameters)
- new Table(tTable)
+ new HiveTable(tTable)
}
@transient override lazy val statistics: Statistics = Statistics(
@@ -821,11 +836,11 @@ private[hive] case class MetastoreRelation
// When metastore partition pruning is turned off, we cache the list of all partitions to
// mimic the behavior of Spark < 1.5
- lazy val allPartitions = table.getAllPartitions
+ private lazy val allPartitions: Seq[CatalogTablePartition] = client.getAllPartitions(table)
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
- table.getPartitions(predicates)
+ client.getPartitionsByFilter(table, predicates)
} else {
allPartitions
}
@@ -834,23 +849,22 @@ private[hive] case class MetastoreRelation
val tPartition = new org.apache.hadoop.hive.metastore.api.Partition
tPartition.setDbName(databaseName)
tPartition.setTableName(tableName)
- tPartition.setValues(p.values.asJava)
+ tPartition.setValues(p.spec.values.toList.asJava)
val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
tPartition.setSd(sd)
- sd.setCols(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
-
- sd.setLocation(p.storage.location)
- sd.setInputFormat(p.storage.inputFormat)
- sd.setOutputFormat(p.storage.outputFormat)
+ sd.setCols(table.schema.map(toHiveColumn).asJava)
+ p.storage.locationUri.foreach(sd.setLocation)
+ p.storage.inputFormat.foreach(sd.setInputFormat)
+ p.storage.outputFormat.foreach(sd.setOutputFormat)
val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
sd.setSerdeInfo(serdeInfo)
// maps and lists should be set only after all elements are ready (see HIVE-7975)
- serdeInfo.setSerializationLib(p.storage.serde)
+ p.storage.serde.foreach(serdeInfo.setSerializationLib)
val serdeParameters = new java.util.HashMap[String, String]()
- table.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
+ table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
p.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
serdeInfo.setParameters(serdeParameters)
@@ -877,10 +891,10 @@ private[hive] case class MetastoreRelation
hiveQlTable.getMetadata
)
- implicit class SchemaAttribute(f: HiveColumn) {
+ implicit class SchemaAttribute(f: CatalogColumn) {
def toAttribute: AttributeReference = AttributeReference(
f.name,
- HiveMetastoreTypes.toDataType(f.hiveType),
+ HiveMetastoreTypes.toDataType(f.dataType),
// Since data can be dumped in randomly with no validation, everything is nullable.
nullable = true
)(qualifiers = Seq(alias.getOrElse(tableName)))
@@ -901,19 +915,22 @@ private[hive] case class MetastoreRelation
val columnOrdinals = AttributeMap(attributes.zipWithIndex)
override def inputFiles: Array[String] = {
- val partLocations = table.getPartitions(Nil).map(_.storage.location).toArray
+ val partLocations = client
+ .getPartitionsByFilter(table, Nil)
+ .flatMap(_.storage.locationUri)
+ .toArray
if (partLocations.nonEmpty) {
partLocations
} else {
Array(
- table.location.getOrElse(
+ table.storage.locationUri.getOrElse(
sys.error(s"Could not get the location of ${table.qualifiedName}.")))
}
}
override def newInstance(): MetastoreRelation = {
- MetastoreRelation(databaseName, tableName, alias)(table)(sqlContext)
+ MetastoreRelation(databaseName, tableName, alias)(table, client, sqlContext)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org