You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/02/22 00:00:27 UTC

[2/2] spark git commit: [SPARK-13080][SQL] Implement new Catalog API using Hive

[SPARK-13080][SQL] Implement new Catalog API using Hive

## What changes were proposed in this pull request?

This is a step towards merging `SQLContext` and `HiveContext`. A new internal Catalog API was introduced in #10982 and extended in #11069. This patch introduces an implementation of this API using `HiveClient`, an existing interface to Hive. It also extends `HiveClient` with additional calls to Hive that are needed to complete the catalog implementation.

*Where should I start reviewing?* The new catalog introduced is `HiveCatalog`. This class is relatively simple because it just calls `HiveClientImpl`, where most of the new logic is. I would not start with `HiveClient`, `HiveQl`, or `HiveMetastoreCatalog`, which are modified mainly because of a refactor.

*Why is this patch so big?* I had to refactor HiveClient to remove an intermediate representation of databases, tables, partitions etc. After this refactor `CatalogTable` convert directly to and from `HiveTable` (etc.). Otherwise we would have to first convert `CatalogTable` to the intermediate representation and then convert that to HiveTable, which is messy.

The new class hierarchy is as follows:
```
org.apache.spark.sql.catalyst.catalog.Catalog
  - org.apache.spark.sql.catalyst.catalog.InMemoryCatalog
  - org.apache.spark.sql.hive.HiveCatalog
```

Note that, as of this patch, none of these classes are currently used anywhere yet. This will come in the future before the Spark 2.0 release.

## How was the this patch tested?
All existing unit tests, and HiveCatalogSuite that extends CatalogTestCases.

Author: Andrew Or <an...@databricks.com>
Author: Reynold Xin <rx...@databricks.com>

Closes #11293 from rxin/hive-catalog.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c3832b2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c3832b2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c3832b2

Branch: refs/heads/master
Commit: 6c3832b26e119626205732b8fd03c8f5ba986896
Parents: 7eb83fe
Author: Andrew Or <an...@databricks.com>
Authored: Sun Feb 21 15:00:24 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sun Feb 21 15:00:24 2016 -0800

----------------------------------------------------------------------
 .../apache/spark/sql/AnalysisException.scala    |   3 +
 .../spark/sql/catalyst/analysis/Catalog.scala   |   9 -
 .../catalyst/analysis/NoSuchItemException.scala |  52 +++
 .../sql/catalyst/catalog/InMemoryCatalog.scala  | 154 ++++----
 .../spark/sql/catalyst/catalog/interface.scala  | 190 ++++++----
 .../sql/catalyst/catalog/CatalogTestCases.scala | 284 +++++++++-----
 .../org/apache/spark/sql/hive/HiveCatalog.scala | 293 ++++++++++++++
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 171 +++++----
 .../org/apache/spark/sql/hive/HiveQl.scala      | 145 +++----
 .../spark/sql/hive/client/HiveClient.scala      | 210 ++++++----
 .../spark/sql/hive/client/HiveClientImpl.scala  | 379 +++++++++++++------
 .../hive/execution/CreateTableAsSelect.scala    |  20 +-
 .../sql/hive/execution/CreateViewAsSelect.scala |  20 +-
 .../hive/execution/InsertIntoHiveTable.scala    |   2 +-
 .../spark/sql/hive/HiveCatalogSuite.scala       |  49 +++
 .../sql/hive/HiveMetastoreCatalogSuite.scala    |  40 +-
 .../org/apache/spark/sql/hive/HiveQlSuite.scala |  94 ++---
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  25 +-
 .../spark/sql/hive/MultiDatabaseSuite.scala     |   4 +-
 .../spark/sql/hive/client/VersionsSuite.scala   |  37 +-
 .../spark/sql/hive/execution/PruningSuite.scala |   2 +-
 21 files changed, 1483 insertions(+), 700 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
index f999218..97f28fa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
@@ -19,6 +19,9 @@ package org.apache.spark.sql
 
 import org.apache.spark.annotation.DeveloperApi
 
+
+// TODO: don't swallow original stack trace if it exists
+
 /**
  * :: DeveloperApi ::
  * Thrown when a query fails to analyze, usually because the query itself is invalid.

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index 67edab5..52b284b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -20,20 +20,11 @@ package org.apache.spark.sql.catalyst.analysis
 import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{CatalystConf, EmptyConf, TableIdentifier}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 
-/**
- * Thrown by a catalog when a table cannot be found.  The analyzer will rethrow the exception
- * as an AnalysisException with the correct position information.
- */
-class NoSuchTableException extends Exception
-
-class NoSuchDatabaseException extends Exception
 
 /**
  * An interface for looking up relations by name.  Used by an [[Analyzer]].

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
new file mode 100644
index 0000000..81399db
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.catalog.Catalog.TablePartitionSpec
+
+
+/**
+ * Thrown by a catalog when an item cannot be found. The analyzer will rethrow the exception
+ * as an [[org.apache.spark.sql.AnalysisException]] with the correct position information.
+ */
+abstract class NoSuchItemException extends Exception {
+  override def getMessage: String
+}
+
+class NoSuchDatabaseException(db: String) extends NoSuchItemException {
+  override def getMessage: String = s"Database $db not found"
+}
+
+class NoSuchTableException(db: String, table: String) extends NoSuchItemException {
+  override def getMessage: String = s"Table $table not found in database $db"
+}
+
+class NoSuchPartitionException(
+    db: String,
+    table: String,
+    spec: TablePartitionSpec)
+  extends NoSuchItemException {
+
+  override def getMessage: String = {
+    s"Partition not found in table $table database $db:\n" + spec.mkString("\n")
+  }
+}
+
+class NoSuchFunctionException(db: String, func: String) extends NoSuchItemException {
+  override def getMessage: String = s"Function $func not found in database $db"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 38be61c..cba4de3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -30,15 +30,16 @@ import org.apache.spark.sql.AnalysisException
 class InMemoryCatalog extends Catalog {
   import Catalog._
 
-  private class TableDesc(var table: Table) {
-    val partitions = new mutable.HashMap[PartitionSpec, TablePartition]
+  private class TableDesc(var table: CatalogTable) {
+    val partitions = new mutable.HashMap[TablePartitionSpec, CatalogTablePartition]
   }
 
-  private class DatabaseDesc(var db: Database) {
+  private class DatabaseDesc(var db: CatalogDatabase) {
     val tables = new mutable.HashMap[String, TableDesc]
-    val functions = new mutable.HashMap[String, Function]
+    val functions = new mutable.HashMap[String, CatalogFunction]
   }
 
+  // Database name -> description
   private val catalog = new scala.collection.mutable.HashMap[String, DatabaseDesc]
 
   private def filterPattern(names: Seq[String], pattern: String): Seq[String] = {
@@ -47,39 +48,33 @@ class InMemoryCatalog extends Catalog {
   }
 
   private def existsFunction(db: String, funcName: String): Boolean = {
-    assertDbExists(db)
+    requireDbExists(db)
     catalog(db).functions.contains(funcName)
   }
 
   private def existsTable(db: String, table: String): Boolean = {
-    assertDbExists(db)
+    requireDbExists(db)
     catalog(db).tables.contains(table)
   }
 
-  private def existsPartition(db: String, table: String, spec: PartitionSpec): Boolean = {
-    assertTableExists(db, table)
+  private def existsPartition(db: String, table: String, spec: TablePartitionSpec): Boolean = {
+    requireTableExists(db, table)
     catalog(db).tables(table).partitions.contains(spec)
   }
 
-  private def assertDbExists(db: String): Unit = {
-    if (!catalog.contains(db)) {
-      throw new AnalysisException(s"Database $db does not exist")
-    }
-  }
-
-  private def assertFunctionExists(db: String, funcName: String): Unit = {
+  private def requireFunctionExists(db: String, funcName: String): Unit = {
     if (!existsFunction(db, funcName)) {
       throw new AnalysisException(s"Function $funcName does not exist in $db database")
     }
   }
 
-  private def assertTableExists(db: String, table: String): Unit = {
+  private def requireTableExists(db: String, table: String): Unit = {
     if (!existsTable(db, table)) {
       throw new AnalysisException(s"Table $table does not exist in $db database")
     }
   }
 
-  private def assertPartitionExists(db: String, table: String, spec: PartitionSpec): Unit = {
+  private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = {
     if (!existsPartition(db, table, spec)) {
       throw new AnalysisException(s"Partition does not exist in database $db table $table: $spec")
     }
@@ -90,7 +85,7 @@ class InMemoryCatalog extends Catalog {
   // --------------------------------------------------------------------------
 
   override def createDatabase(
-      dbDefinition: Database,
+      dbDefinition: CatalogDatabase,
       ignoreIfExists: Boolean): Unit = synchronized {
     if (catalog.contains(dbDefinition.name)) {
       if (!ignoreIfExists) {
@@ -124,17 +119,20 @@ class InMemoryCatalog extends Catalog {
     }
   }
 
-  override def alterDatabase(db: String, dbDefinition: Database): Unit = synchronized {
-    assertDbExists(db)
-    assert(db == dbDefinition.name)
-    catalog(db).db = dbDefinition
+  override def alterDatabase(dbDefinition: CatalogDatabase): Unit = synchronized {
+    requireDbExists(dbDefinition.name)
+    catalog(dbDefinition.name).db = dbDefinition
   }
 
-  override def getDatabase(db: String): Database = synchronized {
-    assertDbExists(db)
+  override def getDatabase(db: String): CatalogDatabase = synchronized {
+    requireDbExists(db)
     catalog(db).db
   }
 
+  override def databaseExists(db: String): Boolean = synchronized {
+    catalog.contains(db)
+  }
+
   override def listDatabases(): Seq[String] = synchronized {
     catalog.keySet.toSeq
   }
@@ -143,15 +141,17 @@ class InMemoryCatalog extends Catalog {
     filterPattern(listDatabases(), pattern)
   }
 
+  override def setCurrentDatabase(db: String): Unit = { /* no-op */ }
+
   // --------------------------------------------------------------------------
   // Tables
   // --------------------------------------------------------------------------
 
   override def createTable(
       db: String,
-      tableDefinition: Table,
+      tableDefinition: CatalogTable,
       ignoreIfExists: Boolean): Unit = synchronized {
-    assertDbExists(db)
+    requireDbExists(db)
     if (existsTable(db, tableDefinition.name)) {
       if (!ignoreIfExists) {
         throw new AnalysisException(s"Table ${tableDefinition.name} already exists in $db database")
@@ -165,7 +165,7 @@ class InMemoryCatalog extends Catalog {
       db: String,
       table: String,
       ignoreIfNotExists: Boolean): Unit = synchronized {
-    assertDbExists(db)
+    requireDbExists(db)
     if (existsTable(db, table)) {
       catalog(db).tables.remove(table)
     } else {
@@ -176,31 +176,30 @@ class InMemoryCatalog extends Catalog {
   }
 
   override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
-    assertTableExists(db, oldName)
+    requireTableExists(db, oldName)
     val oldDesc = catalog(db).tables(oldName)
     oldDesc.table = oldDesc.table.copy(name = newName)
     catalog(db).tables.put(newName, oldDesc)
     catalog(db).tables.remove(oldName)
   }
 
-  override def alterTable(db: String, table: String, tableDefinition: Table): Unit = synchronized {
-    assertTableExists(db, table)
-    assert(table == tableDefinition.name)
-    catalog(db).tables(table).table = tableDefinition
+  override def alterTable(db: String, tableDefinition: CatalogTable): Unit = synchronized {
+    requireTableExists(db, tableDefinition.name)
+    catalog(db).tables(tableDefinition.name).table = tableDefinition
   }
 
-  override def getTable(db: String, table: String): Table = synchronized {
-    assertTableExists(db, table)
+  override def getTable(db: String, table: String): CatalogTable = synchronized {
+    requireTableExists(db, table)
     catalog(db).tables(table).table
   }
 
   override def listTables(db: String): Seq[String] = synchronized {
-    assertDbExists(db)
+    requireDbExists(db)
     catalog(db).tables.keySet.toSeq
   }
 
   override def listTables(db: String, pattern: String): Seq[String] = synchronized {
-    assertDbExists(db)
+    requireDbExists(db)
     filterPattern(listTables(db), pattern)
   }
 
@@ -211,9 +210,9 @@ class InMemoryCatalog extends Catalog {
   override def createPartitions(
       db: String,
       table: String,
-      parts: Seq[TablePartition],
+      parts: Seq[CatalogTablePartition],
       ignoreIfExists: Boolean): Unit = synchronized {
-    assertTableExists(db, table)
+    requireTableExists(db, table)
     val existingParts = catalog(db).tables(table).partitions
     if (!ignoreIfExists) {
       val dupSpecs = parts.collect { case p if existingParts.contains(p.spec) => p.spec }
@@ -229,9 +228,9 @@ class InMemoryCatalog extends Catalog {
   override def dropPartitions(
       db: String,
       table: String,
-      partSpecs: Seq[PartitionSpec],
+      partSpecs: Seq[TablePartitionSpec],
       ignoreIfNotExists: Boolean): Unit = synchronized {
-    assertTableExists(db, table)
+    requireTableExists(db, table)
     val existingParts = catalog(db).tables(table).partitions
     if (!ignoreIfNotExists) {
       val missingSpecs = partSpecs.collect { case s if !existingParts.contains(s) => s }
@@ -244,30 +243,42 @@ class InMemoryCatalog extends Catalog {
     partSpecs.foreach(existingParts.remove)
   }
 
-  override def alterPartition(
+  override def renamePartitions(
       db: String,
       table: String,
-      spec: Map[String, String],
-      newPart: TablePartition): Unit = synchronized {
-    assertPartitionExists(db, table, spec)
-    val existingParts = catalog(db).tables(table).partitions
-    if (spec != newPart.spec) {
-      // Also a change in specs; remove the old one and add the new one back
-      existingParts.remove(spec)
+      specs: Seq[TablePartitionSpec],
+      newSpecs: Seq[TablePartitionSpec]): Unit = synchronized {
+    require(specs.size == newSpecs.size, "number of old and new partition specs differ")
+    specs.zip(newSpecs).foreach { case (oldSpec, newSpec) =>
+      val newPart = getPartition(db, table, oldSpec).copy(spec = newSpec)
+      val existingParts = catalog(db).tables(table).partitions
+      existingParts.remove(oldSpec)
+      existingParts.put(newSpec, newPart)
+    }
+  }
+
+  override def alterPartitions(
+      db: String,
+      table: String,
+      parts: Seq[CatalogTablePartition]): Unit = synchronized {
+    parts.foreach { p =>
+      requirePartitionExists(db, table, p.spec)
+      catalog(db).tables(table).partitions.put(p.spec, p)
     }
-    existingParts.put(newPart.spec, newPart)
   }
 
   override def getPartition(
       db: String,
       table: String,
-      spec: Map[String, String]): TablePartition = synchronized {
-    assertPartitionExists(db, table, spec)
+      spec: TablePartitionSpec): CatalogTablePartition = synchronized {
+    requirePartitionExists(db, table, spec)
     catalog(db).tables(table).partitions(spec)
   }
 
-  override def listPartitions(db: String, table: String): Seq[TablePartition] = synchronized {
-    assertTableExists(db, table)
+  override def listPartitions(
+      db: String,
+      table: String): Seq[CatalogTablePartition] = synchronized {
+    requireTableExists(db, table)
     catalog(db).tables(table).partitions.values.toSeq
   }
 
@@ -275,44 +286,39 @@ class InMemoryCatalog extends Catalog {
   // Functions
   // --------------------------------------------------------------------------
 
-  override def createFunction(
-      db: String,
-      func: Function,
-      ignoreIfExists: Boolean): Unit = synchronized {
-    assertDbExists(db)
+  override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
+    requireDbExists(db)
     if (existsFunction(db, func.name)) {
-      if (!ignoreIfExists) {
-        throw new AnalysisException(s"Function $func already exists in $db database")
-      }
+      throw new AnalysisException(s"Function $func already exists in $db database")
     } else {
       catalog(db).functions.put(func.name, func)
     }
   }
 
   override def dropFunction(db: String, funcName: String): Unit = synchronized {
-    assertFunctionExists(db, funcName)
+    requireFunctionExists(db, funcName)
     catalog(db).functions.remove(funcName)
   }
 
-  override def alterFunction(
-      db: String,
-      funcName: String,
-      funcDefinition: Function): Unit = synchronized {
-    assertFunctionExists(db, funcName)
-    if (funcName != funcDefinition.name) {
-      // Also a rename; remove the old one and add the new one back
-      catalog(db).functions.remove(funcName)
-    }
+  override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized {
+    requireFunctionExists(db, oldName)
+    val newFunc = getFunction(db, oldName).copy(name = newName)
+    catalog(db).functions.remove(oldName)
+    catalog(db).functions.put(newName, newFunc)
+  }
+
+  override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = synchronized {
+    requireFunctionExists(db, funcDefinition.name)
     catalog(db).functions.put(funcDefinition.name, funcDefinition)
   }
 
-  override def getFunction(db: String, funcName: String): Function = synchronized {
-    assertFunctionExists(db, funcName)
+  override def getFunction(db: String, funcName: String): CatalogFunction = synchronized {
+    requireFunctionExists(db, funcName)
     catalog(db).functions(funcName)
   }
 
   override def listFunctions(db: String, pattern: String): Seq[String] = synchronized {
-    assertDbExists(db)
+    requireDbExists(db)
     filterPattern(catalog(db).functions.keysIterator.toSeq, pattern)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 56aaa6b..dac5f02 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst.catalog
 
+import javax.annotation.Nullable
+
 import org.apache.spark.sql.AnalysisException
 
 
@@ -31,41 +33,59 @@ import org.apache.spark.sql.AnalysisException
 abstract class Catalog {
   import Catalog._
 
+  protected def requireDbExists(db: String): Unit = {
+    if (!databaseExists(db)) {
+      throw new AnalysisException(s"Database $db does not exist")
+    }
+  }
+
   // --------------------------------------------------------------------------
   // Databases
   // --------------------------------------------------------------------------
 
-  def createDatabase(dbDefinition: Database, ignoreIfExists: Boolean): Unit
+  def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit
 
   def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
 
   /**
-   * Alter an existing database. This operation does not support renaming.
+   * Alter a database whose name matches the one specified in `dbDefinition`,
+   * assuming the database exists.
+   *
+   * Note: If the underlying implementation does not support altering a certain field,
+   * this becomes a no-op.
    */
-  def alterDatabase(db: String, dbDefinition: Database): Unit
+  def alterDatabase(dbDefinition: CatalogDatabase): Unit
 
-  def getDatabase(db: String): Database
+  def getDatabase(db: String): CatalogDatabase
+
+  def databaseExists(db: String): Boolean
 
   def listDatabases(): Seq[String]
 
   def listDatabases(pattern: String): Seq[String]
 
+  def setCurrentDatabase(db: String): Unit
+
   // --------------------------------------------------------------------------
   // Tables
   // --------------------------------------------------------------------------
 
-  def createTable(db: String, tableDefinition: Table, ignoreIfExists: Boolean): Unit
+  def createTable(db: String, tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit
 
   def dropTable(db: String, table: String, ignoreIfNotExists: Boolean): Unit
 
   def renameTable(db: String, oldName: String, newName: String): Unit
 
   /**
-   * Alter an existing table. This operation does not support renaming.
+   * Alter a table whose name that matches the one specified in `tableDefinition`,
+   * assuming the table exists.
+   *
+   * Note: If the underlying implementation does not support altering a certain field,
+   * this becomes a no-op.
    */
-  def alterTable(db: String, table: String, tableDefinition: Table): Unit
+  def alterTable(db: String, tableDefinition: CatalogTable): Unit
 
-  def getTable(db: String, table: String): Table
+  def getTable(db: String, table: String): CatalogTable
 
   def listTables(db: String): Seq[String]
 
@@ -78,43 +98,62 @@ abstract class Catalog {
   def createPartitions(
       db: String,
       table: String,
-      parts: Seq[TablePartition],
+      parts: Seq[CatalogTablePartition],
       ignoreIfExists: Boolean): Unit
 
   def dropPartitions(
       db: String,
       table: String,
-      parts: Seq[PartitionSpec],
+      parts: Seq[TablePartitionSpec],
       ignoreIfNotExists: Boolean): Unit
 
   /**
-   * Alter an existing table partition and optionally override its spec.
+   * Override the specs of one or many existing table partitions, assuming they exist.
+   * This assumes index i of `specs` corresponds to index i of `newSpecs`.
+   */
+  def renamePartitions(
+      db: String,
+      table: String,
+      specs: Seq[TablePartitionSpec],
+      newSpecs: Seq[TablePartitionSpec]): Unit
+
+  /**
+   * Alter one or many table partitions whose specs that match those specified in `parts`,
+   * assuming the partitions exist.
+   *
+   * Note: If the underlying implementation does not support altering a certain field,
+   * this becomes a no-op.
    */
-  def alterPartition(
+  def alterPartitions(
       db: String,
       table: String,
-      spec: PartitionSpec,
-      newPart: TablePartition): Unit
+      parts: Seq[CatalogTablePartition]): Unit
 
-  def getPartition(db: String, table: String, spec: PartitionSpec): TablePartition
+  def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition
 
   // TODO: support listing by pattern
-  def listPartitions(db: String, table: String): Seq[TablePartition]
+  def listPartitions(db: String, table: String): Seq[CatalogTablePartition]
 
   // --------------------------------------------------------------------------
   // Functions
   // --------------------------------------------------------------------------
 
-  def createFunction(db: String, funcDefinition: Function, ignoreIfExists: Boolean): Unit
+  def createFunction(db: String, funcDefinition: CatalogFunction): Unit
 
   def dropFunction(db: String, funcName: String): Unit
 
+  def renameFunction(db: String, oldName: String, newName: String): Unit
+
   /**
-   * Alter an existing function and optionally override its name.
+   * Alter a function whose name that matches the one specified in `funcDefinition`,
+   * assuming the function exists.
+   *
+   * Note: If the underlying implementation does not support altering a certain field,
+   * this becomes a no-op.
    */
-  def alterFunction(db: String, funcName: String, funcDefinition: Function): Unit
+  def alterFunction(db: String, funcDefinition: CatalogFunction): Unit
 
-  def getFunction(db: String, funcName: String): Function
+  def getFunction(db: String, funcName: String): CatalogFunction
 
   def listFunctions(db: String, pattern: String): Seq[String]
 
@@ -127,33 +166,30 @@ abstract class Catalog {
  * @param name name of the function
  * @param className fully qualified class name, e.g. "org.apache.spark.util.MyFunc"
  */
-case class Function(
-  name: String,
-  className: String
-)
+case class CatalogFunction(name: String, className: String)
 
 
 /**
  * Storage format, used to describe how a partition or a table is stored.
  */
-case class StorageFormat(
-  locationUri: String,
-  inputFormat: String,
-  outputFormat: String,
-  serde: String,
-  serdeProperties: Map[String, String]
-)
+case class CatalogStorageFormat(
+    locationUri: Option[String],
+    inputFormat: Option[String],
+    outputFormat: Option[String],
+    serde: Option[String],
+    serdeProperties: Map[String, String])
 
 
 /**
  * A column in a table.
  */
-case class Column(
-  name: String,
-  dataType: String,
-  nullable: Boolean,
-  comment: String
-)
+case class CatalogColumn(
+    name: String,
+    // This may be null when used to create views. TODO: make this type-safe; this is left
+    // as a string due to issues in converting Hive varchars to and from SparkSQL strings.
+    @Nullable dataType: String,
+    nullable: Boolean = true,
+    comment: Option[String] = None)
 
 
 /**
@@ -162,10 +198,7 @@ case class Column(
  * @param spec partition spec values indexed by column name
  * @param storage storage format of the partition
  */
-case class TablePartition(
-  spec: Catalog.PartitionSpec,
-  storage: StorageFormat
-)
+case class CatalogTablePartition(spec: Catalog.TablePartitionSpec, storage: CatalogStorageFormat)
 
 
 /**
@@ -174,40 +207,65 @@ case class TablePartition(
  * Note that Hive's metastore also tracks skewed columns. We should consider adding that in the
  * future once we have a better understanding of how we want to handle skewed columns.
  */
-case class Table(
-  name: String,
-  description: String,
-  schema: Seq[Column],
-  partitionColumns: Seq[Column],
-  sortColumns: Seq[Column],
-  storage: StorageFormat,
-  numBuckets: Int,
-  properties: Map[String, String],
-  tableType: String,
-  createTime: Long,
-  lastAccessTime: Long,
-  viewOriginalText: Option[String],
-  viewText: Option[String]) {
-
-  require(tableType == "EXTERNAL_TABLE" || tableType == "INDEX_TABLE" ||
-    tableType == "MANAGED_TABLE" || tableType == "VIRTUAL_VIEW")
+case class CatalogTable(
+    specifiedDatabase: Option[String],
+    name: String,
+    tableType: CatalogTableType,
+    storage: CatalogStorageFormat,
+    schema: Seq[CatalogColumn],
+    partitionColumns: Seq[CatalogColumn] = Seq.empty,
+    sortColumns: Seq[CatalogColumn] = Seq.empty,
+    numBuckets: Int = 0,
+    createTime: Long = System.currentTimeMillis,
+    lastAccessTime: Long = System.currentTimeMillis,
+    properties: Map[String, String] = Map.empty,
+    viewOriginalText: Option[String] = None,
+    viewText: Option[String] = None) {
+
+  /** Return the database this table was specified to belong to, assuming it exists. */
+  def database: String = specifiedDatabase.getOrElse {
+    throw new AnalysisException(s"table $name did not specify database")
+  }
+
+  /** Return the fully qualified name of this table, assuming the database was specified. */
+  def qualifiedName: String = s"$database.$name"
+
+  /** Syntactic sugar to update a field in `storage`. */
+  def withNewStorage(
+      locationUri: Option[String] = storage.locationUri,
+      inputFormat: Option[String] = storage.inputFormat,
+      outputFormat: Option[String] = storage.outputFormat,
+      serde: Option[String] = storage.serde,
+      serdeProperties: Map[String, String] = storage.serdeProperties): CatalogTable = {
+    copy(storage = CatalogStorageFormat(
+      locationUri, inputFormat, outputFormat, serde, serdeProperties))
+  }
+
+}
+
+
+case class CatalogTableType private(name: String)
+object CatalogTableType {
+  val EXTERNAL_TABLE = new CatalogTableType("EXTERNAL_TABLE")
+  val MANAGED_TABLE = new CatalogTableType("MANAGED_TABLE")
+  val INDEX_TABLE = new CatalogTableType("INDEX_TABLE")
+  val VIRTUAL_VIEW = new CatalogTableType("VIRTUAL_VIEW")
 }
 
 
 /**
  * A database defined in the catalog.
  */
-case class Database(
-  name: String,
-  description: String,
-  locationUri: String,
-  properties: Map[String, String]
-)
+case class CatalogDatabase(
+    name: String,
+    description: String,
+    locationUri: String,
+    properties: Map[String, String])
 
 
 object Catalog {
   /**
-   * Specifications of a table partition indexed by column name.
+   * Specifications of a table partition. Mapping column name to column value.
    */
-  type PartitionSpec = Map[String, String]
+  type TablePartitionSpec = Map[String, String]
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
index 45c5cee..e0d1220 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst.catalog
 
+import org.scalatest.BeforeAndAfterEach
+
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.AnalysisException
 
@@ -26,18 +28,38 @@ import org.apache.spark.sql.AnalysisException
  *
  * Implementations of the [[Catalog]] interface can create test suites by extending this.
  */
-abstract class CatalogTestCases extends SparkFunSuite {
-  private val storageFormat = StorageFormat("usa", "$", "zzz", "serde", Map())
-  private val part1 = TablePartition(Map("a" -> "1"), storageFormat)
-  private val part2 = TablePartition(Map("b" -> "2"), storageFormat)
-  private val part3 = TablePartition(Map("c" -> "3"), storageFormat)
+abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
+  private lazy val storageFormat = CatalogStorageFormat(
+    locationUri = None,
+    inputFormat = Some(tableInputFormat),
+    outputFormat = Some(tableOutputFormat),
+    serde = None,
+    serdeProperties = Map.empty)
+  private lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat)
+  private lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat)
+  private lazy val part3 = CatalogTablePartition(Map("a" -> "5", "b" -> "6"), storageFormat)
   private val funcClass = "org.apache.spark.myFunc"
 
+  // Things subclasses should override
+  protected val tableInputFormat: String = "org.apache.park.serde.MyInputFormat"
+  protected val tableOutputFormat: String = "org.apache.park.serde.MyOutputFormat"
+  protected def newUriForDatabase(): String = "uri"
+  protected def resetState(): Unit = { }
   protected def newEmptyCatalog(): Catalog
 
+  // Clear all state after each test
+  override def afterEach(): Unit = {
+    try {
+      resetState()
+    } finally {
+      super.afterEach()
+    }
+  }
+
   /**
    * Creates a basic catalog, with the following structure:
    *
+   * default
    * db1
    * db2
    *   - tbl1
@@ -48,37 +70,65 @@ abstract class CatalogTestCases extends SparkFunSuite {
    */
   private def newBasicCatalog(): Catalog = {
     val catalog = newEmptyCatalog()
+    // When testing against a real catalog, the default database may already exist
+    catalog.createDatabase(newDb("default"), ignoreIfExists = true)
     catalog.createDatabase(newDb("db1"), ignoreIfExists = false)
     catalog.createDatabase(newDb("db2"), ignoreIfExists = false)
-    catalog.createTable("db2", newTable("tbl1"), ignoreIfExists = false)
-    catalog.createTable("db2", newTable("tbl2"), ignoreIfExists = false)
+    catalog.createTable("db2", newTable("tbl1", "db2"), ignoreIfExists = false)
+    catalog.createTable("db2", newTable("tbl2", "db2"), ignoreIfExists = false)
     catalog.createPartitions("db2", "tbl2", Seq(part1, part2), ignoreIfExists = false)
-    catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = false)
+    catalog.createFunction("db2", newFunc("func1"))
     catalog
   }
 
-  private def newFunc(): Function = Function("funcname", funcClass)
+  private def newFunc(): CatalogFunction = CatalogFunction("funcname", funcClass)
+
+  private def newDb(name: String): CatalogDatabase = {
+    CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty)
+  }
+
+  private def newTable(name: String, db: String): CatalogTable = {
+    CatalogTable(
+      specifiedDatabase = Some(db),
+      name = name,
+      tableType = CatalogTableType.EXTERNAL_TABLE,
+      storage = storageFormat,
+      schema = Seq(CatalogColumn("col1", "int"), CatalogColumn("col2", "string")),
+      partitionColumns = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string")))
+  }
 
-  private def newDb(name: String = "default"): Database =
-    Database(name, name + " description", "uri", Map.empty)
+  private def newFunc(name: String): CatalogFunction = CatalogFunction(name, funcClass)
 
-  private def newTable(name: String): Table =
-    Table(name, "", Seq.empty, Seq.empty, Seq.empty, null, 0, Map.empty, "EXTERNAL_TABLE", 0, 0,
-      None, None)
+  /**
+   * Whether the catalog's table partitions equal the ones given.
+   * Note: Hive sets some random serde things, so we just compare the specs here.
+   */
+  private def catalogPartitionsEqual(
+      catalog: Catalog,
+      db: String,
+      table: String,
+      parts: Seq[CatalogTablePartition]): Boolean = {
+    catalog.listPartitions(db, table).map(_.spec).toSet == parts.map(_.spec).toSet
+  }
 
-  private def newFunc(name: String): Function = Function(name, funcClass)
 
   // --------------------------------------------------------------------------
   // Databases
   // --------------------------------------------------------------------------
 
-  test("basic create, drop and list databases") {
+  test("basic create and list databases") {
     val catalog = newEmptyCatalog()
-    catalog.createDatabase(newDb(), ignoreIfExists = false)
-    assert(catalog.listDatabases().toSet == Set("default"))
-
-    catalog.createDatabase(newDb("default2"), ignoreIfExists = false)
-    assert(catalog.listDatabases().toSet == Set("default", "default2"))
+    catalog.createDatabase(newDb("default"), ignoreIfExists = true)
+    assert(catalog.databaseExists("default"))
+    assert(!catalog.databaseExists("testing"))
+    assert(!catalog.databaseExists("testing2"))
+    catalog.createDatabase(newDb("testing"), ignoreIfExists = false)
+    assert(catalog.databaseExists("testing"))
+    assert(catalog.listDatabases().toSet == Set("default", "testing"))
+    catalog.createDatabase(newDb("testing2"), ignoreIfExists = false)
+    assert(catalog.listDatabases().toSet == Set("default", "testing", "testing2"))
+    assert(catalog.databaseExists("testing2"))
+    assert(!catalog.databaseExists("does_not_exist"))
   }
 
   test("get database when a database exists") {
@@ -93,7 +143,7 @@ abstract class CatalogTestCases extends SparkFunSuite {
 
   test("list databases without pattern") {
     val catalog = newBasicCatalog()
-    assert(catalog.listDatabases().toSet == Set("db1", "db2"))
+    assert(catalog.listDatabases().toSet == Set("default", "db1", "db2"))
   }
 
   test("list databases with pattern") {
@@ -107,7 +157,7 @@ abstract class CatalogTestCases extends SparkFunSuite {
   test("drop database") {
     val catalog = newBasicCatalog()
     catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = false)
-    assert(catalog.listDatabases().toSet == Set("db2"))
+    assert(catalog.listDatabases().toSet == Set("default", "db2"))
   }
 
   test("drop database when the database is not empty") {
@@ -118,6 +168,7 @@ abstract class CatalogTestCases extends SparkFunSuite {
     intercept[AnalysisException] {
       catalog1.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
     }
+    resetState()
 
     // Throw exception if there are tables left
     val catalog2 = newBasicCatalog()
@@ -125,11 +176,12 @@ abstract class CatalogTestCases extends SparkFunSuite {
     intercept[AnalysisException] {
       catalog2.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
     }
+    resetState()
 
     // When cascade is true, it should drop them
     val catalog3 = newBasicCatalog()
     catalog3.dropDatabase("db2", ignoreIfNotExists = false, cascade = true)
-    assert(catalog3.listDatabases().toSet == Set("db1"))
+    assert(catalog3.listDatabases().toSet == Set("default", "db1"))
   }
 
   test("drop database when the database does not exist") {
@@ -144,13 +196,19 @@ abstract class CatalogTestCases extends SparkFunSuite {
 
   test("alter database") {
     val catalog = newBasicCatalog()
-    catalog.alterDatabase("db1", Database("db1", "new description", "lll", Map.empty))
-    assert(catalog.getDatabase("db1").description == "new description")
+    val db1 = catalog.getDatabase("db1")
+    // Note: alter properties here because Hive does not support altering other fields
+    catalog.alterDatabase(db1.copy(properties = Map("k" -> "v3", "good" -> "true")))
+    val newDb1 = catalog.getDatabase("db1")
+    assert(db1.properties.isEmpty)
+    assert(newDb1.properties.size == 2)
+    assert(newDb1.properties.get("k") == Some("v3"))
+    assert(newDb1.properties.get("good") == Some("true"))
   }
 
   test("alter database should throw exception when the database does not exist") {
     intercept[AnalysisException] {
-      newBasicCatalog().alterDatabase("no_db", Database("no_db", "ddd", "lll", Map.empty))
+      newBasicCatalog().alterDatabase(newDb("does_not_exist"))
     }
   }
 
@@ -165,61 +223,56 @@ abstract class CatalogTestCases extends SparkFunSuite {
     assert(catalog.listTables("db2").toSet == Set("tbl2"))
   }
 
-  test("drop table when database / table does not exist") {
+  test("drop table when database/table does not exist") {
     val catalog = newBasicCatalog()
-
     // Should always throw exception when the database does not exist
     intercept[AnalysisException] {
       catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = false)
     }
-
     intercept[AnalysisException] {
       catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = true)
     }
-
     // Should throw exception when the table does not exist, if ignoreIfNotExists is false
     intercept[AnalysisException] {
       catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = false)
     }
-
     catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = true)
   }
 
   test("rename table") {
     val catalog = newBasicCatalog()
-
     assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
     catalog.renameTable("db2", "tbl1", "tblone")
     assert(catalog.listTables("db2").toSet == Set("tblone", "tbl2"))
   }
 
-  test("rename table when database / table does not exist") {
+  test("rename table when database/table does not exist") {
     val catalog = newBasicCatalog()
-
-    intercept[AnalysisException] {  // Throw exception when the database does not exist
+    intercept[AnalysisException] {
       catalog.renameTable("unknown_db", "unknown_table", "unknown_table")
     }
-
-    intercept[AnalysisException] {  // Throw exception when the table does not exist
+    intercept[AnalysisException] {
       catalog.renameTable("db2", "unknown_table", "unknown_table")
     }
   }
 
   test("alter table") {
     val catalog = newBasicCatalog()
-    catalog.alterTable("db2", "tbl1", newTable("tbl1").copy(createTime = 10))
-    assert(catalog.getTable("db2", "tbl1").createTime == 10)
+    val tbl1 = catalog.getTable("db2", "tbl1")
+    catalog.alterTable("db2", tbl1.copy(properties = Map("toh" -> "frem")))
+    val newTbl1 = catalog.getTable("db2", "tbl1")
+    assert(!tbl1.properties.contains("toh"))
+    assert(newTbl1.properties.size == tbl1.properties.size + 1)
+    assert(newTbl1.properties.get("toh") == Some("frem"))
   }
 
-  test("alter table when database / table does not exist") {
+  test("alter table when database/table does not exist") {
     val catalog = newBasicCatalog()
-
-    intercept[AnalysisException] {  // Throw exception when the database does not exist
-      catalog.alterTable("unknown_db", "unknown_table", newTable("unknown_table"))
+    intercept[AnalysisException] {
+      catalog.alterTable("unknown_db", newTable("tbl1", "unknown_db"))
     }
-
-    intercept[AnalysisException] {  // Throw exception when the table does not exist
-      catalog.alterTable("db2", "unknown_table", newTable("unknown_table"))
+    intercept[AnalysisException] {
+      catalog.alterTable("db2", newTable("unknown_table", "db2"))
     }
   }
 
@@ -227,12 +280,11 @@ abstract class CatalogTestCases extends SparkFunSuite {
     assert(newBasicCatalog().getTable("db2", "tbl1").name == "tbl1")
   }
 
-  test("get table when database / table does not exist") {
+  test("get table when database/table does not exist") {
     val catalog = newBasicCatalog()
     intercept[AnalysisException] {
       catalog.getTable("unknown_db", "unknown_table")
     }
-
     intercept[AnalysisException] {
       catalog.getTable("db2", "unknown_table")
     }
@@ -246,10 +298,7 @@ abstract class CatalogTestCases extends SparkFunSuite {
 
   test("list tables with pattern") {
     val catalog = newBasicCatalog()
-
-    // Test when database does not exist
     intercept[AnalysisException] { catalog.listTables("unknown_db") }
-
     assert(catalog.listTables("db1", "*").toSet == Set.empty)
     assert(catalog.listTables("db2", "*").toSet == Set("tbl1", "tbl2"))
     assert(catalog.listTables("db2", "tbl*").toSet == Set("tbl1", "tbl2"))
@@ -263,12 +312,12 @@ abstract class CatalogTestCases extends SparkFunSuite {
   test("basic create and list partitions") {
     val catalog = newEmptyCatalog()
     catalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
-    catalog.createTable("mydb", newTable("mytbl"), ignoreIfExists = false)
-    catalog.createPartitions("mydb", "mytbl", Seq(part1, part2), ignoreIfExists = false)
-    assert(catalog.listPartitions("mydb", "mytbl").toSet == Set(part1, part2))
+    catalog.createTable("mydb", newTable("tbl", "mydb"), ignoreIfExists = false)
+    catalog.createPartitions("mydb", "tbl", Seq(part1, part2), ignoreIfExists = false)
+    assert(catalogPartitionsEqual(catalog, "mydb", "tbl", Seq(part1, part2)))
   }
 
-  test("create partitions when database / table does not exist") {
+  test("create partitions when database/table does not exist") {
     val catalog = newBasicCatalog()
     intercept[AnalysisException] {
       catalog.createPartitions("does_not_exist", "tbl1", Seq(), ignoreIfExists = false)
@@ -288,16 +337,17 @@ abstract class CatalogTestCases extends SparkFunSuite {
 
   test("drop partitions") {
     val catalog = newBasicCatalog()
-    assert(catalog.listPartitions("db2", "tbl2").toSet == Set(part1, part2))
+    assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2)))
     catalog.dropPartitions("db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false)
-    assert(catalog.listPartitions("db2", "tbl2").toSet == Set(part2))
+    assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part2)))
+    resetState()
     val catalog2 = newBasicCatalog()
-    assert(catalog2.listPartitions("db2", "tbl2").toSet == Set(part1, part2))
+    assert(catalogPartitionsEqual(catalog2, "db2", "tbl2", Seq(part1, part2)))
     catalog2.dropPartitions("db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false)
     assert(catalog2.listPartitions("db2", "tbl2").isEmpty)
   }
 
-  test("drop partitions when database / table does not exist") {
+  test("drop partitions when database/table does not exist") {
     val catalog = newBasicCatalog()
     intercept[AnalysisException] {
       catalog.dropPartitions("does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false)
@@ -317,14 +367,14 @@ abstract class CatalogTestCases extends SparkFunSuite {
 
   test("get partition") {
     val catalog = newBasicCatalog()
-    assert(catalog.getPartition("db2", "tbl2", part1.spec) == part1)
-    assert(catalog.getPartition("db2", "tbl2", part2.spec) == part2)
+    assert(catalog.getPartition("db2", "tbl2", part1.spec).spec == part1.spec)
+    assert(catalog.getPartition("db2", "tbl2", part2.spec).spec == part2.spec)
     intercept[AnalysisException] {
       catalog.getPartition("db2", "tbl1", part3.spec)
     }
   }
 
-  test("get partition when database / table does not exist") {
+  test("get partition when database/table does not exist") {
     val catalog = newBasicCatalog()
     intercept[AnalysisException] {
       catalog.getPartition("does_not_exist", "tbl1", part1.spec)
@@ -334,28 +384,69 @@ abstract class CatalogTestCases extends SparkFunSuite {
     }
   }
 
-  test("alter partitions") {
+  test("rename partitions") {
+    val catalog = newBasicCatalog()
+    val newPart1 = part1.copy(spec = Map("a" -> "100", "b" -> "101"))
+    val newPart2 = part2.copy(spec = Map("a" -> "200", "b" -> "201"))
+    val newSpecs = Seq(newPart1.spec, newPart2.spec)
+    catalog.renamePartitions("db2", "tbl2", Seq(part1.spec, part2.spec), newSpecs)
+    assert(catalog.getPartition("db2", "tbl2", newPart1.spec).spec === newPart1.spec)
+    assert(catalog.getPartition("db2", "tbl2", newPart2.spec).spec === newPart2.spec)
+    // The old partitions should no longer exist
+    intercept[AnalysisException] { catalog.getPartition("db2", "tbl2", part1.spec) }
+    intercept[AnalysisException] { catalog.getPartition("db2", "tbl2", part2.spec) }
+  }
+
+  test("rename partitions when database/table does not exist") {
     val catalog = newBasicCatalog()
-    val partSameSpec = part1.copy(storage = storageFormat.copy(serde = "myserde"))
-    val partNewSpec = part1.copy(spec = Map("x" -> "10"))
-    // alter but keep spec the same
-    catalog.alterPartition("db2", "tbl2", part1.spec, partSameSpec)
-    assert(catalog.getPartition("db2", "tbl2", part1.spec) == partSameSpec)
-    // alter and change spec
-    catalog.alterPartition("db2", "tbl2", part1.spec, partNewSpec)
     intercept[AnalysisException] {
-      catalog.getPartition("db2", "tbl2", part1.spec)
+      catalog.renamePartitions("does_not_exist", "tbl1", Seq(part1.spec), Seq(part2.spec))
+    }
+    intercept[AnalysisException] {
+      catalog.renamePartitions("db2", "does_not_exist", Seq(part1.spec), Seq(part2.spec))
     }
-    assert(catalog.getPartition("db2", "tbl2", partNewSpec.spec) == partNewSpec)
   }
 
-  test("alter partition when database / table does not exist") {
+  test("alter partitions") {
+    val catalog = newBasicCatalog()
+    try{
+      // Note: Before altering table partitions in Hive, you *must* set the current database
+      // to the one that contains the table of interest. Otherwise you will end up with the
+      // most helpful error message ever: "Unable to alter partition. alter is not possible."
+      // See HIVE-2742 for more detail.
+      catalog.setCurrentDatabase("db2")
+      val newLocation = newUriForDatabase()
+      // alter but keep spec the same
+      val oldPart1 = catalog.getPartition("db2", "tbl2", part1.spec)
+      val oldPart2 = catalog.getPartition("db2", "tbl2", part2.spec)
+      catalog.alterPartitions("db2", "tbl2", Seq(
+        oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocation))),
+        oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocation)))))
+      val newPart1 = catalog.getPartition("db2", "tbl2", part1.spec)
+      val newPart2 = catalog.getPartition("db2", "tbl2", part2.spec)
+      assert(newPart1.storage.locationUri == Some(newLocation))
+      assert(newPart2.storage.locationUri == Some(newLocation))
+      assert(oldPart1.storage.locationUri != Some(newLocation))
+      assert(oldPart2.storage.locationUri != Some(newLocation))
+      // alter but change spec, should fail because new partition specs do not exist yet
+      val badPart1 = part1.copy(spec = Map("a" -> "v1", "b" -> "v2"))
+      val badPart2 = part2.copy(spec = Map("a" -> "v3", "b" -> "v4"))
+      intercept[AnalysisException] {
+        catalog.alterPartitions("db2", "tbl2", Seq(badPart1, badPart2))
+      }
+    } finally {
+      // Remember to restore the original current database, which we assume to be "default"
+      catalog.setCurrentDatabase("default")
+    }
+  }
+
+  test("alter partitions when database/table does not exist") {
     val catalog = newBasicCatalog()
     intercept[AnalysisException] {
-      catalog.alterPartition("does_not_exist", "tbl1", part1.spec, part1)
+      catalog.alterPartitions("does_not_exist", "tbl1", Seq(part1))
     }
     intercept[AnalysisException] {
-      catalog.alterPartition("db2", "does_not_exist", part1.spec, part1)
+      catalog.alterPartitions("db2", "does_not_exist", Seq(part1))
     }
   }
 
@@ -366,23 +457,22 @@ abstract class CatalogTestCases extends SparkFunSuite {
   test("basic create and list functions") {
     val catalog = newEmptyCatalog()
     catalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
-    catalog.createFunction("mydb", newFunc("myfunc"), ignoreIfExists = false)
+    catalog.createFunction("mydb", newFunc("myfunc"))
     assert(catalog.listFunctions("mydb", "*").toSet == Set("myfunc"))
   }
 
   test("create function when database does not exist") {
     val catalog = newBasicCatalog()
     intercept[AnalysisException] {
-      catalog.createFunction("does_not_exist", newFunc(), ignoreIfExists = false)
+      catalog.createFunction("does_not_exist", newFunc())
     }
   }
 
   test("create function that already exists") {
     val catalog = newBasicCatalog()
     intercept[AnalysisException] {
-      catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = false)
+      catalog.createFunction("db2", newFunc("func1"))
     }
-    catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = true)
   }
 
   test("drop function") {
@@ -421,31 +511,43 @@ abstract class CatalogTestCases extends SparkFunSuite {
     }
   }
 
-  test("alter function") {
+  test("rename function") {
     val catalog = newBasicCatalog()
+    val newName = "funcky"
     assert(catalog.getFunction("db2", "func1").className == funcClass)
-    // alter func but keep name
-    catalog.alterFunction("db2", "func1", newFunc("func1").copy(className = "muhaha"))
-    assert(catalog.getFunction("db2", "func1").className == "muhaha")
-    // alter func and change name
-    catalog.alterFunction("db2", "func1", newFunc("funcky"))
+    catalog.renameFunction("db2", "func1", newName)
+    intercept[AnalysisException] { catalog.getFunction("db2", "func1") }
+    assert(catalog.getFunction("db2", newName).name == newName)
+    assert(catalog.getFunction("db2", newName).className == funcClass)
+    intercept[AnalysisException] { catalog.renameFunction("db2", "does_not_exist", "me") }
+  }
+
+  test("rename function when database does not exist") {
+    val catalog = newBasicCatalog()
     intercept[AnalysisException] {
-      catalog.getFunction("db2", "func1")
+      catalog.renameFunction("does_not_exist", "func1", "func5")
     }
-    assert(catalog.getFunction("db2", "funcky").className == funcClass)
+  }
+
+  test("alter function") {
+    val catalog = newBasicCatalog()
+    assert(catalog.getFunction("db2", "func1").className == funcClass)
+    catalog.alterFunction("db2", newFunc("func1").copy(className = "muhaha"))
+    assert(catalog.getFunction("db2", "func1").className == "muhaha")
+    intercept[AnalysisException] { catalog.alterFunction("db2", newFunc("funcky")) }
   }
 
   test("alter function when database does not exist") {
     val catalog = newBasicCatalog()
     intercept[AnalysisException] {
-      catalog.alterFunction("does_not_exist", "func1", newFunc())
+      catalog.alterFunction("does_not_exist", newFunc())
     }
   }
 
   test("list functions") {
     val catalog = newBasicCatalog()
-    catalog.createFunction("db2", newFunc("func2"), ignoreIfExists = false)
-    catalog.createFunction("db2", newFunc("not_me"), ignoreIfExists = false)
+    catalog.createFunction("db2", newFunc("func2"))
+    catalog.createFunction("db2", newFunc("not_me"))
     assert(catalog.listFunctions("db2", "*").toSet == Set("func1", "func2", "not_me"))
     assert(catalog.listFunctions("db2", "func*").toSet == Set("func1", "func2"))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
new file mode 100644
index 0000000..21b9cfb
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.hive.ql.metadata.HiveException
+import org.apache.thrift.TException
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.NoSuchItemException
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.hive.client.HiveClient
+
+
+/**
+ * A persistent implementation of the system catalog using Hive.
+ * All public methods must be synchronized for thread-safety.
+ */
+private[spark] class HiveCatalog(client: HiveClient) extends Catalog with Logging {
+  import Catalog._
+
+  // Exceptions thrown by the hive client that we would like to wrap
+  private val clientExceptions = Set(
+    classOf[HiveException].getCanonicalName,
+    classOf[TException].getCanonicalName)
+
+  /**
+   * Whether this is an exception thrown by the hive client that should be wrapped.
+   *
+   * Due to classloader isolation issues, pattern matching won't work here so we need
+   * to compare the canonical names of the exceptions, which we assume to be stable.
+   */
+  private def isClientException(e: Throwable): Boolean = {
+    var temp: Class[_] = e.getClass
+    var found = false
+    while (temp != null && !found) {
+      found = clientExceptions.contains(temp.getCanonicalName)
+      temp = temp.getSuperclass
+    }
+    found
+  }
+
+  /**
+   * Run some code involving `client` in a [[synchronized]] block and wrap certain
+   * exceptions thrown in the process in [[AnalysisException]].
+   */
+  private def withClient[T](body: => T): T = synchronized {
+    try {
+      body
+    } catch {
+      case e: NoSuchItemException =>
+        throw new AnalysisException(e.getMessage)
+      case NonFatal(e) if isClientException(e) =>
+        throw new AnalysisException(e.getClass.getCanonicalName + ": " + e.getMessage)
+    }
+  }
+
+  private def requireDbMatches(db: String, table: CatalogTable): Unit = {
+    if (table.specifiedDatabase != Some(db)) {
+      throw new AnalysisException(
+        s"Provided database $db does not much the one specified in the " +
+        s"table definition (${table.specifiedDatabase.getOrElse("n/a")})")
+    }
+  }
+
+  private def requireTableExists(db: String, table: String): Unit = {
+    withClient { getTable(db, table) }
+  }
+
+
+  // --------------------------------------------------------------------------
+  // Databases
+  // --------------------------------------------------------------------------
+
+  override def createDatabase(
+      dbDefinition: CatalogDatabase,
+      ignoreIfExists: Boolean): Unit = withClient {
+    client.createDatabase(dbDefinition, ignoreIfExists)
+  }
+
+  override def dropDatabase(
+      db: String,
+      ignoreIfNotExists: Boolean,
+      cascade: Boolean): Unit = withClient {
+    client.dropDatabase(db, ignoreIfNotExists, cascade)
+  }
+
+  /**
+   * Alter a database whose name matches the one specified in `dbDefinition`,
+   * assuming the database exists.
+   *
+   * Note: As of now, this only supports altering database properties!
+   */
+  override def alterDatabase(dbDefinition: CatalogDatabase): Unit = withClient {
+    val existingDb = getDatabase(dbDefinition.name)
+    if (existingDb.properties == dbDefinition.properties) {
+      logWarning(s"Request to alter database ${dbDefinition.name} is a no-op because " +
+        s"the provided database properties are the same as the old ones. Hive does not " +
+        s"currently support altering other database fields.")
+    }
+    client.alterDatabase(dbDefinition)
+  }
+
+  override def getDatabase(db: String): CatalogDatabase = withClient {
+    client.getDatabase(db)
+  }
+
+  override def databaseExists(db: String): Boolean = withClient {
+    client.getDatabaseOption(db).isDefined
+  }
+
+  override def listDatabases(): Seq[String] = withClient {
+    client.listDatabases("*")
+  }
+
+  override def listDatabases(pattern: String): Seq[String] = withClient {
+    client.listDatabases(pattern)
+  }
+
+  override def setCurrentDatabase(db: String): Unit = withClient {
+    client.setCurrentDatabase(db)
+  }
+
+  // --------------------------------------------------------------------------
+  // Tables
+  // --------------------------------------------------------------------------
+
+  override def createTable(
+      db: String,
+      tableDefinition: CatalogTable,
+      ignoreIfExists: Boolean): Unit = withClient {
+    requireDbExists(db)
+    requireDbMatches(db, tableDefinition)
+    client.createTable(tableDefinition, ignoreIfExists)
+  }
+
+  override def dropTable(
+      db: String,
+      table: String,
+      ignoreIfNotExists: Boolean): Unit = withClient {
+    requireDbExists(db)
+    client.dropTable(db, table, ignoreIfNotExists)
+  }
+
+  override def renameTable(db: String, oldName: String, newName: String): Unit = withClient {
+    val newTable = client.getTable(db, oldName).copy(name = newName)
+    client.alterTable(oldName, newTable)
+  }
+
+  /**
+   * Alter a table whose name that matches the one specified in `tableDefinition`,
+   * assuming the table exists.
+   *
+   * Note: As of now, this only supports altering table properties, serde properties,
+   * and num buckets!
+   */
+  override def alterTable(db: String, tableDefinition: CatalogTable): Unit = withClient {
+    requireDbMatches(db, tableDefinition)
+    requireTableExists(db, tableDefinition.name)
+    client.alterTable(tableDefinition)
+  }
+
+  override def getTable(db: String, table: String): CatalogTable = withClient {
+    client.getTable(db, table)
+  }
+
+  override def listTables(db: String): Seq[String] = withClient {
+    requireDbExists(db)
+    client.listTables(db)
+  }
+
+  override def listTables(db: String, pattern: String): Seq[String] = withClient {
+    requireDbExists(db)
+    client.listTables(db, pattern)
+  }
+
+  // --------------------------------------------------------------------------
+  // Partitions
+  // --------------------------------------------------------------------------
+
+  override def createPartitions(
+      db: String,
+      table: String,
+      parts: Seq[CatalogTablePartition],
+      ignoreIfExists: Boolean): Unit = withClient {
+    requireTableExists(db, table)
+    client.createPartitions(db, table, parts, ignoreIfExists)
+  }
+
+  override def dropPartitions(
+      db: String,
+      table: String,
+      parts: Seq[TablePartitionSpec],
+      ignoreIfNotExists: Boolean): Unit = withClient {
+    requireTableExists(db, table)
+    // Note: Unfortunately Hive does not currently support `ignoreIfNotExists` so we
+    // need to implement it here ourselves. This is currently somewhat expensive because
+    // we make multiple synchronous calls to Hive for each partition we want to drop.
+    val partsToDrop =
+      if (ignoreIfNotExists) {
+        parts.filter { spec =>
+          try {
+            getPartition(db, table, spec)
+            true
+          } catch {
+            // Filter out the partitions that do not actually exist
+            case _: AnalysisException => false
+          }
+        }
+      } else {
+        parts
+      }
+    if (partsToDrop.nonEmpty) {
+      client.dropPartitions(db, table, partsToDrop)
+    }
+  }
+
+  override def renamePartitions(
+      db: String,
+      table: String,
+      specs: Seq[TablePartitionSpec],
+      newSpecs: Seq[TablePartitionSpec]): Unit = withClient {
+    client.renamePartitions(db, table, specs, newSpecs)
+  }
+
+  override def alterPartitions(
+      db: String,
+      table: String,
+      newParts: Seq[CatalogTablePartition]): Unit = withClient {
+    client.alterPartitions(db, table, newParts)
+  }
+
+  override def getPartition(
+      db: String,
+      table: String,
+      spec: TablePartitionSpec): CatalogTablePartition = withClient {
+    client.getPartition(db, table, spec)
+  }
+
+  override def listPartitions(
+      db: String,
+      table: String): Seq[CatalogTablePartition] = withClient {
+    client.getAllPartitions(db, table)
+  }
+
+  // --------------------------------------------------------------------------
+  // Functions
+  // --------------------------------------------------------------------------
+
+  override def createFunction(
+      db: String,
+      funcDefinition: CatalogFunction): Unit = withClient {
+    client.createFunction(db, funcDefinition)
+  }
+
+  override def dropFunction(db: String, name: String): Unit = withClient {
+    client.dropFunction(db, name)
+  }
+
+  override def renameFunction(db: String, oldName: String, newName: String): Unit = withClient {
+    client.renameFunction(db, oldName, newName)
+  }
+
+  override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = withClient {
+    client.alterFunction(db, funcDefinition)
+  }
+
+  override def getFunction(db: String, funcName: String): CatalogFunction = withClient {
+    client.getFunction(db, funcName)
+  }
+
+  override def listFunctions(db: String, pattern: String): Seq[String] = withClient {
+    client.listFunctions(db, pattern)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index c222b00..3788736 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -25,15 +25,16 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.common.StatsSetupConst
 import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.metastore.Warehouse
+import org.apache.hadoop.hive.metastore.{TableType => HiveTableType, Warehouse}
 import org.apache.hadoop.hive.metastore.api.FieldSchema
-import org.apache.hadoop.hive.ql.metadata._
+import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable, _}
 import org.apache.hadoop.hive.ql.plan.TableDesc
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.{AnalysisException, SaveMode, SQLContext}
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog}
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -96,6 +97,8 @@ private[hive] object HiveSerDe {
   }
 }
 
+
+// TODO: replace this with o.a.s.sql.hive.HiveCatalog once we merge SQLContext and HiveContext
 private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveContext)
   extends Catalog with Logging {
 
@@ -107,16 +110,16 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
   /** A fully qualified identifier for a table (i.e., database.tableName) */
   case class QualifiedTableName(database: String, name: String)
 
-  private def getQualifiedTableName(tableIdent: TableIdentifier) = {
+  private def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = {
     QualifiedTableName(
       tableIdent.database.getOrElse(client.currentDatabase).toLowerCase,
       tableIdent.table.toLowerCase)
   }
 
-  private def getQualifiedTableName(hiveTable: HiveTable) = {
+  private def getQualifiedTableName(t: CatalogTable): QualifiedTableName = {
     QualifiedTableName(
-      hiveTable.specifiedDatabase.getOrElse(client.currentDatabase).toLowerCase,
-      hiveTable.name.toLowerCase)
+      t.specifiedDatabase.getOrElse(client.currentDatabase).toLowerCase,
+      t.name.toLowerCase)
   }
 
   /** A cache of Spark SQL data source tables that have been accessed. */
@@ -175,7 +178,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
 
         // It does not appear that the ql client for the metastore has a way to enumerate all the
         // SerDe properties directly...
-        val options = table.serdeProperties
+        val options = table.storage.serdeProperties
 
         val resolvedRelation =
           ResolvedDataSource(
@@ -276,53 +279,54 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
 
     val tableType = if (isExternal) {
       tableProperties.put("EXTERNAL", "TRUE")
-      ExternalTable
+      CatalogTableType.EXTERNAL_TABLE
     } else {
       tableProperties.put("EXTERNAL", "FALSE")
-      ManagedTable
+      CatalogTableType.MANAGED_TABLE
     }
 
     val maybeSerDe = HiveSerDe.sourceToSerDe(provider, hive.hiveconf)
     val dataSource = ResolvedDataSource(
       hive, userSpecifiedSchema, partitionColumns, bucketSpec, provider, options)
 
-    def newSparkSQLSpecificMetastoreTable(): HiveTable = {
-      HiveTable(
+    def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
+      CatalogTable(
         specifiedDatabase = Option(dbName),
         name = tblName,
-        schema = Nil,
-        partitionColumns = Nil,
         tableType = tableType,
-        properties = tableProperties.toMap,
-        serdeProperties = options)
+        schema = Nil,
+        storage = CatalogStorageFormat(
+          locationUri = None,
+          inputFormat = None,
+          outputFormat = None,
+          serde = None,
+          serdeProperties = options
+        ),
+        properties = tableProperties.toMap)
     }
 
-    def newHiveCompatibleMetastoreTable(relation: HadoopFsRelation, serde: HiveSerDe): HiveTable = {
-      def schemaToHiveColumn(schema: StructType): Seq[HiveColumn] = {
-        schema.map { field =>
-          HiveColumn(
-            name = field.name,
-            hiveType = HiveMetastoreTypes.toMetastoreType(field.dataType),
-            comment = "")
-        }
-      }
-
+    def newHiveCompatibleMetastoreTable(
+        relation: HadoopFsRelation,
+        serde: HiveSerDe): CatalogTable = {
       assert(partitionColumns.isEmpty)
       assert(relation.partitionColumns.isEmpty)
 
-      HiveTable(
+      CatalogTable(
         specifiedDatabase = Option(dbName),
         name = tblName,
-        schema = schemaToHiveColumn(relation.schema),
-        partitionColumns = Nil,
         tableType = tableType,
+        storage = CatalogStorageFormat(
+          locationUri = Some(relation.paths.head),
+          inputFormat = serde.inputFormat,
+          outputFormat = serde.outputFormat,
+          serde = serde.serde,
+          serdeProperties = options
+        ),
+        schema = relation.schema.map { f =>
+          CatalogColumn(f.name, HiveMetastoreTypes.toMetastoreType(f.dataType))
+        },
         properties = tableProperties.toMap,
-        serdeProperties = options,
-        location = Some(relation.paths.head),
-        viewText = None, // TODO We need to place the SQL string here.
-        inputFormat = serde.inputFormat,
-        outputFormat = serde.outputFormat,
-        serde = serde.serde)
+        viewText = None) // TODO: We need to place the SQL string here
     }
 
     // TODO: Support persisting partitioned data source relations in Hive compatible format
@@ -379,7 +383,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
         // specific way.
         try {
           logInfo(message)
-          client.createTable(table)
+          client.createTable(table, ignoreIfExists = false)
         } catch {
           case throwable: Throwable =>
             val warningMessage =
@@ -387,20 +391,20 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
                 s"it into Hive metastore in Spark SQL specific format."
             logWarning(warningMessage, throwable)
             val sparkSqlSpecificTable = newSparkSQLSpecificMetastoreTable()
-            client.createTable(sparkSqlSpecificTable)
+            client.createTable(sparkSqlSpecificTable, ignoreIfExists = false)
         }
 
       case (None, message) =>
         logWarning(message)
         val hiveTable = newSparkSQLSpecificMetastoreTable()
-        client.createTable(hiveTable)
+        client.createTable(hiveTable, ignoreIfExists = false)
     }
   }
 
   def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = {
     // Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName)
     val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
-    new Path(new Path(client.getDatabase(dbName).location), tblName).toString
+    new Path(new Path(client.getDatabase(dbName).locationUri), tblName).toString
   }
 
   override def tableExists(tableIdent: TableIdentifier): Boolean = {
@@ -420,7 +424,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
       // Then, if alias is specified, wrap the table with a Subquery using the alias.
       // Otherwise, wrap the table with a Subquery using the table name.
       alias.map(a => SubqueryAlias(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)
-    } else if (table.tableType == VirtualView) {
+    } else if (table.tableType == CatalogTableType.VIRTUAL_VIEW) {
       val viewText = table.viewText.getOrElse(sys.error("Invalid view without text."))
       alias match {
         // because hive use things like `_c0` to build the expanded text
@@ -429,7 +433,8 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
         case Some(aliasText) => SubqueryAlias(aliasText, hive.parseSql(viewText))
       }
     } else {
-      MetastoreRelation(qualifiedTableName.database, qualifiedTableName.name, alias)(table)(hive)
+      MetastoreRelation(
+        qualifiedTableName.database, qualifiedTableName.name, alias)(table, client, hive)
     }
   }
 
@@ -602,16 +607,14 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
         val schema = if (table.schema.nonEmpty) {
           table.schema
         } else {
-          child.output.map {
-            attr => new HiveColumn(
-              attr.name,
-              HiveMetastoreTypes.toMetastoreType(attr.dataType), null)
+          child.output.map { a =>
+            CatalogColumn(a.name, HiveMetastoreTypes.toMetastoreType(a.dataType), a.nullable)
           }
         }
 
         val desc = table.copy(schema = schema)
 
-        if (hive.convertCTAS && table.serde.isEmpty) {
+        if (hive.convertCTAS && table.storage.serde.isEmpty) {
           // Do the conversion when spark.sql.hive.convertCTAS is true and the query
           // does not specify any storage format (file format and storage handler).
           if (table.specifiedDatabase.isDefined) {
@@ -632,9 +635,9 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
             child
           )
         } else {
-          val desc = if (table.serde.isEmpty) {
+          val desc = if (table.storage.serde.isEmpty) {
             // add default serde
-            table.copy(
+            table.withNewStorage(
               serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
           } else {
             table
@@ -744,10 +747,13 @@ private[hive] case class InsertIntoHiveTable(
   }
 }
 
-private[hive] case class MetastoreRelation
-    (databaseName: String, tableName: String, alias: Option[String])
-    (val table: HiveTable)
-    (@transient private val sqlContext: SQLContext)
+private[hive] case class MetastoreRelation(
+    databaseName: String,
+    tableName: String,
+    alias: Option[String])
+    (val table: CatalogTable,
+     @transient private val client: HiveClient,
+     @transient private val sqlContext: SQLContext)
   extends LeafNode with MultiInstanceRelation with FileRelation {
 
   override def equals(other: Any): Boolean = other match {
@@ -765,7 +771,12 @@ private[hive] case class MetastoreRelation
 
   override protected def otherCopyArgs: Seq[AnyRef] = table :: sqlContext :: Nil
 
-  @transient val hiveQlTable: Table = {
+  private def toHiveColumn(c: CatalogColumn): FieldSchema = {
+    new FieldSchema(c.name, c.dataType, c.comment.orNull)
+  }
+
+  // TODO: merge this with HiveClientImpl#toHiveTable
+  @transient val hiveQlTable: HiveTable = {
     // We start by constructing an API table as Hive performs several important transformations
     // internally when converting an API table to a QL table.
     val tTable = new org.apache.hadoop.hive.metastore.api.Table()
@@ -776,27 +787,31 @@ private[hive] case class MetastoreRelation
     tTable.setParameters(tableParameters)
     table.properties.foreach { case (k, v) => tableParameters.put(k, v) }
 
-    tTable.setTableType(table.tableType.name)
+    tTable.setTableType(table.tableType match {
+      case CatalogTableType.EXTERNAL_TABLE => HiveTableType.EXTERNAL_TABLE.toString
+      case CatalogTableType.MANAGED_TABLE => HiveTableType.MANAGED_TABLE.toString
+      case CatalogTableType.INDEX_TABLE => HiveTableType.INDEX_TABLE.toString
+      case CatalogTableType.VIRTUAL_VIEW => HiveTableType.VIRTUAL_VIEW.toString
+    })
 
     val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
     tTable.setSd(sd)
-    sd.setCols(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
-    tTable.setPartitionKeys(
-      table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
+    sd.setCols(table.schema.map(toHiveColumn).asJava)
+    tTable.setPartitionKeys(table.partitionColumns.map(toHiveColumn).asJava)
 
-    table.location.foreach(sd.setLocation)
-    table.inputFormat.foreach(sd.setInputFormat)
-    table.outputFormat.foreach(sd.setOutputFormat)
+    table.storage.locationUri.foreach(sd.setLocation)
+    table.storage.inputFormat.foreach(sd.setInputFormat)
+    table.storage.outputFormat.foreach(sd.setOutputFormat)
 
     val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
-    table.serde.foreach(serdeInfo.setSerializationLib)
+    table.storage.serde.foreach(serdeInfo.setSerializationLib)
     sd.setSerdeInfo(serdeInfo)
 
     val serdeParameters = new java.util.HashMap[String, String]()
-    table.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
+    table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
     serdeInfo.setParameters(serdeParameters)
 
-    new Table(tTable)
+    new HiveTable(tTable)
   }
 
   @transient override lazy val statistics: Statistics = Statistics(
@@ -821,11 +836,11 @@ private[hive] case class MetastoreRelation
 
   // When metastore partition pruning is turned off, we cache the list of all partitions to
   // mimic the behavior of Spark < 1.5
-  lazy val allPartitions = table.getAllPartitions
+  private lazy val allPartitions: Seq[CatalogTablePartition] = client.getAllPartitions(table)
 
   def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
     val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
-      table.getPartitions(predicates)
+      client.getPartitionsByFilter(table, predicates)
     } else {
       allPartitions
     }
@@ -834,23 +849,22 @@ private[hive] case class MetastoreRelation
       val tPartition = new org.apache.hadoop.hive.metastore.api.Partition
       tPartition.setDbName(databaseName)
       tPartition.setTableName(tableName)
-      tPartition.setValues(p.values.asJava)
+      tPartition.setValues(p.spec.values.toList.asJava)
 
       val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
       tPartition.setSd(sd)
-      sd.setCols(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
-
-      sd.setLocation(p.storage.location)
-      sd.setInputFormat(p.storage.inputFormat)
-      sd.setOutputFormat(p.storage.outputFormat)
+      sd.setCols(table.schema.map(toHiveColumn).asJava)
+      p.storage.locationUri.foreach(sd.setLocation)
+      p.storage.inputFormat.foreach(sd.setInputFormat)
+      p.storage.outputFormat.foreach(sd.setOutputFormat)
 
       val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
       sd.setSerdeInfo(serdeInfo)
       // maps and lists should be set only after all elements are ready (see HIVE-7975)
-      serdeInfo.setSerializationLib(p.storage.serde)
+      p.storage.serde.foreach(serdeInfo.setSerializationLib)
 
       val serdeParameters = new java.util.HashMap[String, String]()
-      table.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
+      table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
       p.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
       serdeInfo.setParameters(serdeParameters)
 
@@ -877,10 +891,10 @@ private[hive] case class MetastoreRelation
     hiveQlTable.getMetadata
   )
 
-  implicit class SchemaAttribute(f: HiveColumn) {
+  implicit class SchemaAttribute(f: CatalogColumn) {
     def toAttribute: AttributeReference = AttributeReference(
       f.name,
-      HiveMetastoreTypes.toDataType(f.hiveType),
+      HiveMetastoreTypes.toDataType(f.dataType),
       // Since data can be dumped in randomly with no validation, everything is nullable.
       nullable = true
     )(qualifiers = Seq(alias.getOrElse(tableName)))
@@ -901,19 +915,22 @@ private[hive] case class MetastoreRelation
   val columnOrdinals = AttributeMap(attributes.zipWithIndex)
 
   override def inputFiles: Array[String] = {
-    val partLocations = table.getPartitions(Nil).map(_.storage.location).toArray
+    val partLocations = client
+      .getPartitionsByFilter(table, Nil)
+      .flatMap(_.storage.locationUri)
+      .toArray
     if (partLocations.nonEmpty) {
       partLocations
     } else {
       Array(
-        table.location.getOrElse(
+        table.storage.locationUri.getOrElse(
           sys.error(s"Could not get the location of ${table.qualifiedName}.")))
     }
   }
 
 
   override def newInstance(): MetastoreRelation = {
-    MetastoreRelation(databaseName, tableName, alias)(table)(sqlContext)
+    MetastoreRelation(databaseName, tableName, alias)(table, client, sqlContext)
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org