You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/02/22 00:00:26 UTC

[1/2] spark git commit: [SPARK-13080][SQL] Implement new Catalog API using Hive

Repository: spark
Updated Branches:
  refs/heads/master 7eb83fefd -> 6c3832b26


http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 752c037..5801051 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.parser._
 import org.apache.spark.sql.catalyst.parser.ParseUtils._
@@ -39,7 +40,6 @@ import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.SparkQl
 import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
-import org.apache.spark.sql.hive.client._
 import org.apache.spark.sql.hive.execution._
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.AnalysisException
@@ -55,7 +55,7 @@ private[hive] case object NativePlaceholder extends LogicalPlan {
 }
 
 private[hive] case class CreateTableAsSelect(
-    tableDesc: HiveTable,
+    tableDesc: CatalogTable,
     child: LogicalPlan,
     allowExisting: Boolean) extends UnaryNode with Command {
 
@@ -63,14 +63,14 @@ private[hive] case class CreateTableAsSelect(
   override lazy val resolved: Boolean =
     tableDesc.specifiedDatabase.isDefined &&
     tableDesc.schema.nonEmpty &&
-    tableDesc.serde.isDefined &&
-    tableDesc.inputFormat.isDefined &&
-    tableDesc.outputFormat.isDefined &&
+    tableDesc.storage.serde.isDefined &&
+    tableDesc.storage.inputFormat.isDefined &&
+    tableDesc.storage.outputFormat.isDefined &&
     childrenResolved
 }
 
 private[hive] case class CreateViewAsSelect(
-    tableDesc: HiveTable,
+    tableDesc: CatalogTable,
     child: LogicalPlan,
     allowExisting: Boolean,
     replace: Boolean,
@@ -193,7 +193,7 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
       view: ASTNode,
       viewNameParts: ASTNode,
       query: ASTNode,
-      schema: Seq[HiveColumn],
+      schema: Seq[CatalogColumn],
       properties: Map[String, String],
       allowExist: Boolean,
       replace: Boolean): CreateViewAsSelect = {
@@ -201,18 +201,20 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
 
     val originalText = query.source
 
-    val tableDesc = HiveTable(
+    val tableDesc = CatalogTable(
       specifiedDatabase = dbName,
       name = viewName,
+      tableType = CatalogTableType.VIRTUAL_VIEW,
       schema = schema,
-      partitionColumns = Seq.empty[HiveColumn],
+      storage = CatalogStorageFormat(
+        locationUri = None,
+        inputFormat = None,
+        outputFormat = None,
+        serde = None,
+        serdeProperties = Map.empty[String, String]
+      ),
       properties = properties,
-      serdeProperties = Map[String, String](),
-      tableType = VirtualView,
-      location = None,
-      inputFormat = None,
-      outputFormat = None,
-      serde = None,
+      viewOriginalText = Some(originalText),
       viewText = Some(originalText))
 
     // We need to keep the original SQL string so that if `spark.sql.nativeView` is
@@ -314,8 +316,8 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
           val schema = maybeColumns.map { cols =>
             // We can't specify column types when create view, so fill it with null first, and
             // update it after the schema has been resolved later.
-            nodeToColumns(cols, lowerCase = true).map(_.copy(hiveType = null))
-          }.getOrElse(Seq.empty[HiveColumn])
+            nodeToColumns(cols, lowerCase = true).map(_.copy(dataType = null))
+          }.getOrElse(Seq.empty[CatalogColumn])
 
           val properties = scala.collection.mutable.Map.empty[String, String]
 
@@ -369,19 +371,23 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
         val TableIdentifier(tblName, dbName) = extractTableIdent(tableNameParts)
 
         // TODO add bucket support
-        var tableDesc: HiveTable = HiveTable(
+        var tableDesc: CatalogTable = CatalogTable(
           specifiedDatabase = dbName,
           name = tblName,
-          schema = Seq.empty[HiveColumn],
-          partitionColumns = Seq.empty[HiveColumn],
-          properties = Map[String, String](),
-          serdeProperties = Map[String, String](),
-          tableType = if (externalTable.isDefined) ExternalTable else ManagedTable,
-          location = None,
-          inputFormat = None,
-          outputFormat = None,
-          serde = None,
-          viewText = None)
+          tableType =
+            if (externalTable.isDefined) {
+              CatalogTableType.EXTERNAL_TABLE
+            } else {
+              CatalogTableType.MANAGED_TABLE
+            },
+          storage = CatalogStorageFormat(
+            locationUri = None,
+            inputFormat = None,
+            outputFormat = None,
+            serde = None,
+            serdeProperties = Map.empty[String, String]
+          ),
+          schema = Seq.empty[CatalogColumn])
 
         // default storage type abbreviation (e.g. RCFile, ORC, PARQUET etc.)
         val defaultStorageType = hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT)
@@ -392,9 +398,10 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
             outputFormat = Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
         }
 
-        hiveSerDe.inputFormat.foreach(f => tableDesc = tableDesc.copy(inputFormat = Some(f)))
-        hiveSerDe.outputFormat.foreach(f => tableDesc = tableDesc.copy(outputFormat = Some(f)))
-        hiveSerDe.serde.foreach(f => tableDesc = tableDesc.copy(serde = Some(f)))
+        tableDesc = tableDesc.withNewStorage(
+          inputFormat = hiveSerDe.inputFormat.orElse(tableDesc.storage.inputFormat),
+          outputFormat = hiveSerDe.outputFormat.orElse(tableDesc.storage.outputFormat),
+          serde = hiveSerDe.serde.orElse(tableDesc.storage.serde))
 
         children.collect {
           case list @ Token("TOK_TABCOLLIST", _) =>
@@ -440,13 +447,13 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
               // TODO support the nullFormat
               case _ => assert(false)
             }
-            tableDesc = tableDesc.copy(
-              serdeProperties = tableDesc.serdeProperties ++ serdeParams.asScala)
+            tableDesc = tableDesc.withNewStorage(
+              serdeProperties = tableDesc.storage.serdeProperties ++ serdeParams.asScala)
           case Token("TOK_TABLELOCATION", child :: Nil) =>
             val location = EximUtil.relativeToAbsolutePath(hiveConf, unescapeSQLString(child.text))
-            tableDesc = tableDesc.copy(location = Option(location))
+            tableDesc = tableDesc.withNewStorage(locationUri = Option(location))
           case Token("TOK_TABLESERIALIZER", child :: Nil) =>
-            tableDesc = tableDesc.copy(
+            tableDesc = tableDesc.withNewStorage(
               serde = Option(unescapeSQLString(child.children.head.text)))
             if (child.numChildren == 2) {
               // This is based on the readProps(..) method in
@@ -459,59 +466,59 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
                     .orNull
                   (unescapeSQLString(prop), value)
               }.toMap
-              tableDesc = tableDesc.copy(serdeProperties = tableDesc.serdeProperties ++ serdeParams)
+              tableDesc = tableDesc.withNewStorage(
+                serdeProperties = tableDesc.storage.serdeProperties ++ serdeParams)
             }
           case Token("TOK_FILEFORMAT_GENERIC", child :: Nil) =>
             child.text.toLowerCase(Locale.ENGLISH) match {
               case "orc" =>
-                tableDesc = tableDesc.copy(
+                tableDesc = tableDesc.withNewStorage(
                   inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
                   outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"))
-                if (tableDesc.serde.isEmpty) {
-                  tableDesc = tableDesc.copy(
+                if (tableDesc.storage.serde.isEmpty) {
+                  tableDesc = tableDesc.withNewStorage(
                     serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
                 }
 
               case "parquet" =>
-                tableDesc = tableDesc.copy(
+                tableDesc = tableDesc.withNewStorage(
                   inputFormat =
                     Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
                   outputFormat =
                     Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
-                if (tableDesc.serde.isEmpty) {
-                  tableDesc = tableDesc.copy(
+                if (tableDesc.storage.serde.isEmpty) {
+                  tableDesc = tableDesc.withNewStorage(
                     serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
                 }
 
               case "rcfile" =>
-                tableDesc = tableDesc.copy(
+                tableDesc = tableDesc.withNewStorage(
                   inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
                   outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
-                if (tableDesc.serde.isEmpty) {
-                  tableDesc = tableDesc.copy(serde =
-                    Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))
+                if (tableDesc.storage.serde.isEmpty) {
+                  tableDesc = tableDesc.withNewStorage(
+                    serde =
+                      Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))
                 }
 
               case "textfile" =>
-                tableDesc = tableDesc.copy(
-                  inputFormat =
-                    Option("org.apache.hadoop.mapred.TextInputFormat"),
-                  outputFormat =
-                    Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
+                tableDesc = tableDesc.withNewStorage(
+                  inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"),
+                  outputFormat = Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
 
               case "sequencefile" =>
-                tableDesc = tableDesc.copy(
+                tableDesc = tableDesc.withNewStorage(
                   inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
                   outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat"))
 
               case "avro" =>
-                tableDesc = tableDesc.copy(
+                tableDesc = tableDesc.withNewStorage(
                   inputFormat =
                     Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"),
                   outputFormat =
                     Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"))
-                if (tableDesc.serde.isEmpty) {
-                  tableDesc = tableDesc.copy(
+                if (tableDesc.storage.serde.isEmpty) {
+                  tableDesc = tableDesc.withNewStorage(
                     serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe"))
                 }
 
@@ -522,23 +529,21 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
 
           case Token("TOK_TABLESERIALIZER",
           Token("TOK_SERDENAME", Token(serdeName, Nil) :: otherProps) :: Nil) =>
-            tableDesc = tableDesc.copy(serde = Option(unquoteString(serdeName)))
+            tableDesc = tableDesc.withNewStorage(serde = Option(unquoteString(serdeName)))
 
             otherProps match {
               case Token("TOK_TABLEPROPERTIES", list :: Nil) :: Nil =>
-                tableDesc = tableDesc.copy(
-                  serdeProperties = tableDesc.serdeProperties ++ getProperties(list))
+                tableDesc = tableDesc.withNewStorage(
+                  serdeProperties = tableDesc.storage.serdeProperties ++ getProperties(list))
               case _ =>
             }
 
           case Token("TOK_TABLEPROPERTIES", list :: Nil) =>
             tableDesc = tableDesc.copy(properties = tableDesc.properties ++ getProperties(list))
           case list @ Token("TOK_TABLEFILEFORMAT", _) =>
-            tableDesc = tableDesc.copy(
-              inputFormat =
-                Option(unescapeSQLString(list.children.head.text)),
-              outputFormat =
-                Option(unescapeSQLString(list.children(1).text)))
+            tableDesc = tableDesc.withNewStorage(
+              inputFormat = Option(unescapeSQLString(list.children.head.text)),
+              outputFormat = Option(unescapeSQLString(list.children(1).text)))
           case Token("TOK_STORAGEHANDLER", _) =>
             throw new AnalysisException(
               "CREATE TABLE AS SELECT cannot be used for a non-native table")
@@ -678,15 +683,15 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
 
   // This is based the getColumns methods in
   // ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
-  protected def nodeToColumns(node: ASTNode, lowerCase: Boolean): Seq[HiveColumn] = {
+  protected def nodeToColumns(node: ASTNode, lowerCase: Boolean): Seq[CatalogColumn] = {
     node.children.map(_.children).collect {
       case Token(rawColName, Nil) :: colTypeNode :: comment =>
-        val colName = if (!lowerCase) rawColName
-        else rawColName.toLowerCase
-        HiveColumn(
-          cleanIdentifier(colName),
-          nodeToTypeString(colTypeNode),
-          comment.headOption.map(n => unescapeSQLString(n.text)).orNull)
+        val colName = if (!lowerCase) rawColName else rawColName.toLowerCase
+        CatalogColumn(
+          name = cleanIdentifier(colName),
+          dataType = nodeToTypeString(colTypeNode),
+          nullable = true,
+          comment.headOption.map(n => unescapeSQLString(n.text)))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index f681cc6..6a0a089 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -18,67 +18,11 @@
 package org.apache.spark.sql.hive.client
 
 import java.io.PrintStream
-import java.util.{Map => JMap}
-import javax.annotation.Nullable
 
-import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}
+import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Expression
 
-private[hive] case class HiveDatabase(name: String, location: String)
-
-private[hive] abstract class TableType { val name: String }
-private[hive] case object ExternalTable extends TableType { override val name = "EXTERNAL_TABLE" }
-private[hive] case object IndexTable extends TableType { override val name = "INDEX_TABLE" }
-private[hive] case object ManagedTable extends TableType { override val name = "MANAGED_TABLE" }
-private[hive] case object VirtualView extends TableType { override val name = "VIRTUAL_VIEW" }
-
-// TODO: Use this for Tables and Partitions
-private[hive] case class HiveStorageDescriptor(
-    location: String,
-    inputFormat: String,
-    outputFormat: String,
-    serde: String,
-    serdeProperties: Map[String, String])
-
-private[hive] case class HivePartition(
-    values: Seq[String],
-    storage: HiveStorageDescriptor)
-
-private[hive] case class HiveColumn(name: String, @Nullable hiveType: String, comment: String)
-private[hive] case class HiveTable(
-    specifiedDatabase: Option[String],
-    name: String,
-    schema: Seq[HiveColumn],
-    partitionColumns: Seq[HiveColumn],
-    properties: Map[String, String],
-    serdeProperties: Map[String, String],
-    tableType: TableType,
-    location: Option[String] = None,
-    inputFormat: Option[String] = None,
-    outputFormat: Option[String] = None,
-    serde: Option[String] = None,
-    viewText: Option[String] = None) {
-
-  @transient
-  private[client] var client: HiveClient = _
-
-  private[client] def withClient(ci: HiveClient): this.type = {
-    client = ci
-    this
-  }
-
-  def database: String = specifiedDatabase.getOrElse(sys.error("database not resolved"))
-
-  def isPartitioned: Boolean = partitionColumns.nonEmpty
-
-  def getAllPartitions: Seq[HivePartition] = client.getAllPartitions(this)
-
-  def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
-    client.getPartitionsByFilter(this, predicates)
-
-  // Hive does not support backticks when passing names to the client.
-  def qualifiedName: String = s"$database.$name"
-}
 
 /**
  * An externally visible interface to the Hive client.  This interface is shared across both the
@@ -106,6 +50,9 @@ private[hive] trait HiveClient {
   /** Returns the names of all tables in the given database. */
   def listTables(dbName: String): Seq[String]
 
+  /** Returns the names of tables in the given database that matches the given pattern. */
+  def listTables(dbName: String, pattern: String): Seq[String]
+
   /** Returns the name of the active database. */
   def currentDatabase: String
 
@@ -113,46 +60,133 @@ private[hive] trait HiveClient {
   def setCurrentDatabase(databaseName: String): Unit
 
   /** Returns the metadata for specified database, throwing an exception if it doesn't exist */
-  def getDatabase(name: String): HiveDatabase = {
-    getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException)
+  final def getDatabase(name: String): CatalogDatabase = {
+    getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException(name))
   }
 
   /** Returns the metadata for a given database, or None if it doesn't exist. */
-  def getDatabaseOption(name: String): Option[HiveDatabase]
+  def getDatabaseOption(name: String): Option[CatalogDatabase]
+
+  /** List the names of all the databases that match the specified pattern. */
+  def listDatabases(pattern: String): Seq[String]
 
   /** Returns the specified table, or throws [[NoSuchTableException]]. */
-  def getTable(dbName: String, tableName: String): HiveTable = {
-    getTableOption(dbName, tableName).getOrElse(throw new NoSuchTableException)
+  final def getTable(dbName: String, tableName: String): CatalogTable = {
+    getTableOption(dbName, tableName).getOrElse(throw new NoSuchTableException(dbName, tableName))
   }
 
-  /** Returns the metadata for the specified table or None if it doens't exist. */
-  def getTableOption(dbName: String, tableName: String): Option[HiveTable]
+  /** Returns the metadata for the specified table or None if it doesn't exist. */
+  def getTableOption(dbName: String, tableName: String): Option[CatalogTable]
 
   /** Creates a view with the given metadata. */
-  def createView(view: HiveTable): Unit
+  def createView(view: CatalogTable): Unit
 
   /** Updates the given view with new metadata. */
-  def alertView(view: HiveTable): Unit
+  def alertView(view: CatalogTable): Unit
 
   /** Creates a table with the given metadata. */
-  def createTable(table: HiveTable): Unit
+  def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
 
-  /** Updates the given table with new metadata. */
-  def alterTable(table: HiveTable): Unit
+  /** Drop the specified table. */
+  def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit
+
+  /** Alter a table whose name matches the one specified in `table`, assuming it exists. */
+  final def alterTable(table: CatalogTable): Unit = alterTable(table.name, table)
+
+  /** Updates the given table with new metadata, optionally renaming the table. */
+  def alterTable(tableName: String, table: CatalogTable): Unit
 
   /** Creates a new database with the given name. */
-  def createDatabase(database: HiveDatabase): Unit
+  def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit
+
+  /**
+   * Drop the specified database, if it exists.
+   *
+   * @param name database to drop
+   * @param ignoreIfNotExists if true, do not throw error if the database does not exist
+   * @param cascade whether to remove all associated objects such as tables and functions
+   */
+  def dropDatabase(name: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
+
+  /**
+   * Alter a database whose name matches the one specified in `database`, assuming it exists.
+   */
+  def alterDatabase(database: CatalogDatabase): Unit
+
+  /**
+   * Create one or many partitions in the given table.
+   */
+  def createPartitions(
+      db: String,
+      table: String,
+      parts: Seq[CatalogTablePartition],
+      ignoreIfExists: Boolean): Unit
+
+  /**
+   * Drop one or many partitions in the given table.
+   *
+   * Note: Unfortunately, Hive does not currently provide a way to ignore this call if the
+   * partitions do not already exist. The seemingly relevant flag `ifExists` in
+   * [[org.apache.hadoop.hive.metastore.PartitionDropOptions]] is not read anywhere.
+   */
+  def dropPartitions(
+      db: String,
+      table: String,
+      specs: Seq[Catalog.TablePartitionSpec]): Unit
 
-  /** Returns the specified paritition or None if it does not exist. */
+  /**
+   * Rename one or many existing table partitions, assuming they exist.
+   */
+  def renamePartitions(
+      db: String,
+      table: String,
+      specs: Seq[Catalog.TablePartitionSpec],
+      newSpecs: Seq[Catalog.TablePartitionSpec]): Unit
+
+  /**
+   * Alter one or more table partitions whose specs match the ones specified in `newParts`,
+   * assuming the partitions exist.
+   */
+  def alterPartitions(
+      db: String,
+      table: String,
+      newParts: Seq[CatalogTablePartition]): Unit
+
+  /** Returns the specified partition, or throws [[NoSuchPartitionException]]. */
+  final def getPartition(
+      dbName: String,
+      tableName: String,
+      spec: Catalog.TablePartitionSpec): CatalogTablePartition = {
+    getPartitionOption(dbName, tableName, spec).getOrElse {
+      throw new NoSuchPartitionException(dbName, tableName, spec)
+    }
+  }
+
+  /** Returns the specified partition or None if it does not exist. */
+  final def getPartitionOption(
+      db: String,
+      table: String,
+      spec: Catalog.TablePartitionSpec): Option[CatalogTablePartition] = {
+    getPartitionOption(getTable(db, table), spec)
+  }
+
+  /** Returns the specified partition or None if it does not exist. */
   def getPartitionOption(
-      hTable: HiveTable,
-      partitionSpec: JMap[String, String]): Option[HivePartition]
+      table: CatalogTable,
+      spec: Catalog.TablePartitionSpec): Option[CatalogTablePartition]
+
+  /** Returns all partitions for the given table. */
+  final def getAllPartitions(db: String, table: String): Seq[CatalogTablePartition] = {
+    getAllPartitions(getTable(db, table))
+  }
 
   /** Returns all partitions for the given table. */
-  def getAllPartitions(hTable: HiveTable): Seq[HivePartition]
+  def getAllPartitions(table: CatalogTable): Seq[CatalogTablePartition]
 
   /** Returns partitions filtered by predicates for the given table. */
-  def getPartitionsByFilter(hTable: HiveTable, predicates: Seq[Expression]): Seq[HivePartition]
+  def getPartitionsByFilter(
+      table: CatalogTable,
+      predicates: Seq[Expression]): Seq[CatalogTablePartition]
 
   /** Loads a static partition into an existing table. */
   def loadPartition(
@@ -181,6 +215,29 @@ private[hive] trait HiveClient {
       holdDDLTime: Boolean,
       listBucketingEnabled: Boolean): Unit
 
+  /** Create a function in an existing database. */
+  def createFunction(db: String, func: CatalogFunction): Unit
+
+  /** Drop an existing function an the database. */
+  def dropFunction(db: String, name: String): Unit
+
+  /** Rename an existing function in the database. */
+  def renameFunction(db: String, oldName: String, newName: String): Unit
+
+  /** Alter a function whose name matches the one specified in `func`, assuming it exists. */
+  def alterFunction(db: String, func: CatalogFunction): Unit
+
+  /** Return an existing function in the database, assuming it exists. */
+  final def getFunction(db: String, name: String): CatalogFunction = {
+    getFunctionOption(db, name).getOrElse(throw new NoSuchFunctionException(db, name))
+  }
+
+  /** Return an existing function in the database, or None if it doesn't exist. */
+  def getFunctionOption(db: String, name: String): Option[CatalogFunction]
+
+  /** Return the names of all functions that match the given pattern in the database. */
+  def listFunctions(db: String, pattern: String): Seq[String]
+
   /** Add a jar into class loader */
   def addJar(path: String): Unit
 
@@ -192,4 +249,5 @@ private[hive] trait HiveClient {
 
   /** Used for testing only.  Removes all metadata from this instance of Hive. */
   def reset(): Unit
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index cf1ff55..7a007d2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -18,24 +18,25 @@
 package org.apache.spark.sql.hive.client
 
 import java.io.{File, PrintStream}
-import java.util.{Map => JMap}
 
 import scala.collection.JavaConverters._
 import scala.language.reflectiveCalls
 
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.metastore.{TableType => HTableType}
-import org.apache.hadoop.hive.metastore.api.{Database, FieldSchema}
-import org.apache.hadoop.hive.ql.{metadata, Driver}
-import org.apache.hadoop.hive.ql.metadata.Hive
+import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
+import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Function => HiveFunction, FunctionType, PrincipalType, ResourceUri}
+import org.apache.hadoop.hive.ql.Driver
+import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable}
+import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
 import org.apache.hadoop.hive.ql.processors._
 import org.apache.hadoop.hive.ql.session.SessionState
 import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader}
 import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.spark.{Logging, SparkConf, SparkException}
-import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
+import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException}
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.QueryExecutionException
 import org.apache.spark.util.{CircularBuffer, Utils}
@@ -234,167 +235,184 @@ private[hive] class HiveClientImpl(
     if (getDatabaseOption(databaseName).isDefined) {
       state.setCurrentDatabase(databaseName)
     } else {
-      throw new NoSuchDatabaseException
+      throw new NoSuchDatabaseException(databaseName)
     }
   }
 
-  override def createDatabase(database: HiveDatabase): Unit = withHiveState {
+  override def createDatabase(
+      database: CatalogDatabase,
+      ignoreIfExists: Boolean): Unit = withHiveState {
     client.createDatabase(
-      new Database(
+      new HiveDatabase(
         database.name,
-        "",
-        new File(database.location).toURI.toString,
-        new java.util.HashMap),
-        true)
+        database.description,
+        database.locationUri,
+        database.properties.asJava),
+        ignoreIfExists)
   }
 
-  override def getDatabaseOption(name: String): Option[HiveDatabase] = withHiveState {
+  override def dropDatabase(
+      name: String,
+      ignoreIfNotExists: Boolean,
+      cascade: Boolean): Unit = withHiveState {
+    client.dropDatabase(name, true, ignoreIfNotExists, cascade)
+  }
+
+  override def alterDatabase(database: CatalogDatabase): Unit = withHiveState {
+    client.alterDatabase(
+      database.name,
+      new HiveDatabase(
+        database.name,
+        database.description,
+        database.locationUri,
+        database.properties.asJava))
+  }
+
+  override def getDatabaseOption(name: String): Option[CatalogDatabase] = withHiveState {
     Option(client.getDatabase(name)).map { d =>
-      HiveDatabase(
+      CatalogDatabase(
         name = d.getName,
-        location = d.getLocationUri)
+        description = d.getDescription,
+        locationUri = d.getLocationUri,
+        properties = d.getParameters.asScala.toMap)
     }
   }
 
+  override def listDatabases(pattern: String): Seq[String] = withHiveState {
+    client.getDatabasesByPattern(pattern).asScala.toSeq
+  }
+
   override def getTableOption(
       dbName: String,
-      tableName: String): Option[HiveTable] = withHiveState {
-
+      tableName: String): Option[CatalogTable] = withHiveState {
     logDebug(s"Looking up $dbName.$tableName")
-
-    val hiveTable = Option(client.getTable(dbName, tableName, false))
-    val converted = hiveTable.map { h =>
-
-      HiveTable(
-        name = h.getTableName,
+    Option(client.getTable(dbName, tableName, false)).map { h =>
+      CatalogTable(
         specifiedDatabase = Option(h.getDbName),
-        schema = h.getCols.asScala.map(f => HiveColumn(f.getName, f.getType, f.getComment)),
-        partitionColumns = h.getPartCols.asScala.map(f =>
-          HiveColumn(f.getName, f.getType, f.getComment)),
-        properties = h.getParameters.asScala.toMap,
-        serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap,
+        name = h.getTableName,
         tableType = h.getTableType match {
-          case HTableType.MANAGED_TABLE => ManagedTable
-          case HTableType.EXTERNAL_TABLE => ExternalTable
-          case HTableType.VIRTUAL_VIEW => VirtualView
-          case HTableType.INDEX_TABLE => IndexTable
+          case HiveTableType.EXTERNAL_TABLE => CatalogTableType.EXTERNAL_TABLE
+          case HiveTableType.MANAGED_TABLE => CatalogTableType.MANAGED_TABLE
+          case HiveTableType.INDEX_TABLE => CatalogTableType.INDEX_TABLE
+          case HiveTableType.VIRTUAL_VIEW => CatalogTableType.VIRTUAL_VIEW
         },
-        location = shim.getDataLocation(h),
-        inputFormat = Option(h.getInputFormatClass).map(_.getName),
-        outputFormat = Option(h.getOutputFormatClass).map(_.getName),
-        serde = Option(h.getSerializationLib),
-        viewText = Option(h.getViewExpandedText)).withClient(this)
+        schema = h.getCols.asScala.map(fromHiveColumn),
+        partitionColumns = h.getPartCols.asScala.map(fromHiveColumn),
+        sortColumns = Seq(),
+        numBuckets = h.getNumBuckets,
+        createTime = h.getTTable.getCreateTime.toLong * 1000,
+        lastAccessTime = h.getLastAccessTime.toLong * 1000,
+        storage = CatalogStorageFormat(
+          locationUri = shim.getDataLocation(h),
+          inputFormat = Option(h.getInputFormatClass).map(_.getName),
+          outputFormat = Option(h.getOutputFormatClass).map(_.getName),
+          serde = Option(h.getSerializationLib),
+          serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap
+        ),
+        properties = h.getParameters.asScala.toMap,
+        viewOriginalText = Option(h.getViewOriginalText),
+        viewText = Option(h.getViewExpandedText))
     }
-    converted
   }
 
-  private def toInputFormat(name: String) =
-    Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]]
-
-  private def toOutputFormat(name: String) =
-    Utils.classForName(name)
-      .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
-
-  private def toQlTable(table: HiveTable): metadata.Table = {
-    val qlTable = new metadata.Table(table.database, table.name)
-
-    qlTable.setFields(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
-    qlTable.setPartCols(
-      table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
-    table.properties.foreach { case (k, v) => qlTable.setProperty(k, v) }
-    table.serdeProperties.foreach { case (k, v) => qlTable.setSerdeParam(k, v) }
-
-    // set owner
-    qlTable.setOwner(conf.getUser)
-    // set create time
-    qlTable.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int])
-
-    table.location.foreach { loc => shim.setDataLocation(qlTable, loc) }
-    table.inputFormat.map(toInputFormat).foreach(qlTable.setInputFormatClass)
-    table.outputFormat.map(toOutputFormat).foreach(qlTable.setOutputFormatClass)
-    table.serde.foreach(qlTable.setSerializationLib)
-
-    qlTable
+  override def createView(view: CatalogTable): Unit = withHiveState {
+    client.createTable(toHiveViewTable(view))
   }
 
-  private def toViewTable(view: HiveTable): metadata.Table = {
-    // TODO: this is duplicated with `toQlTable` except the table type stuff.
-    val tbl = new metadata.Table(view.database, view.name)
-    tbl.setTableType(HTableType.VIRTUAL_VIEW)
-    tbl.setSerializationLib(null)
-    tbl.clearSerDeInfo()
-
-    // TODO: we will save the same SQL string to original and expanded text, which is different
-    // from Hive.
-    tbl.setViewOriginalText(view.viewText.get)
-    tbl.setViewExpandedText(view.viewText.get)
-
-    tbl.setFields(view.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
-    view.properties.foreach { case (k, v) => tbl.setProperty(k, v) }
-
-    // set owner
-    tbl.setOwner(conf.getUser)
-    // set create time
-    tbl.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int])
-
-    tbl
+  override def alertView(view: CatalogTable): Unit = withHiveState {
+    client.alterTable(view.qualifiedName, toHiveViewTable(view))
   }
 
-  override def createView(view: HiveTable): Unit = withHiveState {
-    client.createTable(toViewTable(view))
+  override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState {
+    client.createTable(toHiveTable(table), ignoreIfExists)
   }
 
-  override def alertView(view: HiveTable): Unit = withHiveState {
-    client.alterTable(view.qualifiedName, toViewTable(view))
+  override def dropTable(
+      dbName: String,
+      tableName: String,
+      ignoreIfNotExists: Boolean): Unit = withHiveState {
+    client.dropTable(dbName, tableName, true, ignoreIfNotExists)
   }
 
-  override def createTable(table: HiveTable): Unit = withHiveState {
-    val qlTable = toQlTable(table)
-    client.createTable(qlTable)
+  override def alterTable(tableName: String, table: CatalogTable): Unit = withHiveState {
+    val hiveTable = toHiveTable(table)
+    // Do not use `table.qualifiedName` here because this may be a rename
+    val qualifiedTableName = s"${table.database}.$tableName"
+    client.alterTable(qualifiedTableName, hiveTable)
   }
 
-  override def alterTable(table: HiveTable): Unit = withHiveState {
-    val qlTable = toQlTable(table)
-    client.alterTable(table.qualifiedName, qlTable)
+  override def createPartitions(
+      db: String,
+      table: String,
+      parts: Seq[CatalogTablePartition],
+      ignoreIfExists: Boolean): Unit = withHiveState {
+    val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists)
+    parts.foreach { s =>
+      addPartitionDesc.addPartition(s.spec.asJava, s.storage.locationUri.orNull)
+    }
+    client.createPartitions(addPartitionDesc)
+  }
+
+  override def dropPartitions(
+      db: String,
+      table: String,
+      specs: Seq[Catalog.TablePartitionSpec]): Unit = withHiveState {
+    // TODO: figure out how to drop multiple partitions in one call
+    specs.foreach { s => client.dropPartition(db, table, s.values.toList.asJava, true) }
+  }
+
+  override def renamePartitions(
+      db: String,
+      table: String,
+      specs: Seq[Catalog.TablePartitionSpec],
+      newSpecs: Seq[Catalog.TablePartitionSpec]): Unit = withHiveState {
+    require(specs.size == newSpecs.size, "number of old and new partition specs differ")
+    val catalogTable = getTable(db, table)
+    val hiveTable = toHiveTable(catalogTable)
+    specs.zip(newSpecs).foreach { case (oldSpec, newSpec) =>
+      val hivePart = getPartitionOption(catalogTable, oldSpec)
+        .map { p => toHivePartition(p.copy(spec = newSpec), hiveTable) }
+        .getOrElse { throw new NoSuchPartitionException(db, table, oldSpec) }
+      client.renamePartition(hiveTable, oldSpec.asJava, hivePart)
+    }
   }
 
-  private def toHivePartition(partition: metadata.Partition): HivePartition = {
-    val apiPartition = partition.getTPartition
-    HivePartition(
-      values = Option(apiPartition.getValues).map(_.asScala).getOrElse(Seq.empty),
-      storage = HiveStorageDescriptor(
-        location = apiPartition.getSd.getLocation,
-        inputFormat = apiPartition.getSd.getInputFormat,
-        outputFormat = apiPartition.getSd.getOutputFormat,
-        serde = apiPartition.getSd.getSerdeInfo.getSerializationLib,
-        serdeProperties = apiPartition.getSd.getSerdeInfo.getParameters.asScala.toMap))
+  override def alterPartitions(
+      db: String,
+      table: String,
+      newParts: Seq[CatalogTablePartition]): Unit = withHiveState {
+    val hiveTable = toHiveTable(getTable(db, table))
+    client.alterPartitions(table, newParts.map { p => toHivePartition(p, hiveTable) }.asJava)
   }
 
   override def getPartitionOption(
-      table: HiveTable,
-      partitionSpec: JMap[String, String]): Option[HivePartition] = withHiveState {
-
-    val qlTable = toQlTable(table)
-    val qlPartition = client.getPartition(qlTable, partitionSpec, false)
-    Option(qlPartition).map(toHivePartition)
+      table: CatalogTable,
+      spec: Catalog.TablePartitionSpec): Option[CatalogTablePartition] = withHiveState {
+    val hiveTable = toHiveTable(table)
+    val hivePartition = client.getPartition(hiveTable, spec.asJava, false)
+    Option(hivePartition).map(fromHivePartition)
   }
 
-  override def getAllPartitions(hTable: HiveTable): Seq[HivePartition] = withHiveState {
-    val qlTable = toQlTable(hTable)
-    shim.getAllPartitions(client, qlTable).map(toHivePartition)
+  override def getAllPartitions(table: CatalogTable): Seq[CatalogTablePartition] = withHiveState {
+    val hiveTable = toHiveTable(table)
+    shim.getAllPartitions(client, hiveTable).map(fromHivePartition)
   }
 
   override def getPartitionsByFilter(
-      hTable: HiveTable,
-      predicates: Seq[Expression]): Seq[HivePartition] = withHiveState {
-    val qlTable = toQlTable(hTable)
-    shim.getPartitionsByFilter(client, qlTable, predicates).map(toHivePartition)
+      table: CatalogTable,
+      predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState {
+    val hiveTable = toHiveTable(table)
+    shim.getPartitionsByFilter(client, hiveTable, predicates).map(fromHivePartition)
   }
 
   override def listTables(dbName: String): Seq[String] = withHiveState {
     client.getAllTables(dbName).asScala
   }
 
+  override def listTables(dbName: String, pattern: String): Seq[String] = withHiveState {
+    client.getTablesByPattern(dbName, pattern).asScala
+  }
+
   /**
    * Runs the specified SQL query using Hive.
    */
@@ -508,6 +526,34 @@ private[hive] class HiveClientImpl(
       listBucketingEnabled)
   }
 
+  override def createFunction(db: String, func: CatalogFunction): Unit = withHiveState {
+    client.createFunction(toHiveFunction(func, db))
+  }
+
+  override def dropFunction(db: String, name: String): Unit = withHiveState {
+    client.dropFunction(db, name)
+  }
+
+  override def renameFunction(db: String, oldName: String, newName: String): Unit = withHiveState {
+    val catalogFunc = getFunction(db, oldName).copy(name = newName)
+    val hiveFunc = toHiveFunction(catalogFunc, db)
+    client.alterFunction(db, oldName, hiveFunc)
+  }
+
+  override def alterFunction(db: String, func: CatalogFunction): Unit = withHiveState {
+    client.alterFunction(db, func.name, toHiveFunction(func, db))
+  }
+
+  override def getFunctionOption(
+      db: String,
+      name: String): Option[CatalogFunction] = withHiveState {
+    Option(client.getFunction(db, name)).map(fromHiveFunction)
+  }
+
+  override def listFunctions(db: String, pattern: String): Seq[String] = withHiveState {
+    client.getFunctions(db, pattern).asScala
+  }
+
   def addJar(path: String): Unit = {
     val uri = new Path(path).toUri
     val jarURL = if (uri.getScheme == null) {
@@ -541,4 +587,97 @@ private[hive] class HiveClientImpl(
         client.dropDatabase(db, true, false, true)
       }
   }
+
+
+  /* -------------------------------------------------------- *
+   |  Helper methods for converting to and from Hive classes  |
+   * -------------------------------------------------------- */
+
+  private def toInputFormat(name: String) =
+    Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]]
+
+  private def toOutputFormat(name: String) =
+    Utils.classForName(name)
+      .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
+
+  private def toHiveFunction(f: CatalogFunction, db: String): HiveFunction = {
+    new HiveFunction(
+      f.name,
+      db,
+      f.className,
+      null,
+      PrincipalType.USER,
+      (System.currentTimeMillis / 1000).toInt,
+      FunctionType.JAVA,
+      List.empty[ResourceUri].asJava)
+  }
+
+  private def fromHiveFunction(hf: HiveFunction): CatalogFunction = {
+    new CatalogFunction(hf.getFunctionName, hf.getClassName)
+  }
+
+  private def toHiveColumn(c: CatalogColumn): FieldSchema = {
+    new FieldSchema(c.name, c.dataType, c.comment.orNull)
+  }
+
+  private def fromHiveColumn(hc: FieldSchema): CatalogColumn = {
+    new CatalogColumn(
+      name = hc.getName,
+      dataType = hc.getType,
+      nullable = true,
+      comment = Option(hc.getComment))
+  }
+
+  private def toHiveTable(table: CatalogTable): HiveTable = {
+    val hiveTable = new HiveTable(table.database, table.name)
+    hiveTable.setTableType(table.tableType match {
+      case CatalogTableType.EXTERNAL_TABLE => HiveTableType.EXTERNAL_TABLE
+      case CatalogTableType.MANAGED_TABLE => HiveTableType.MANAGED_TABLE
+      case CatalogTableType.INDEX_TABLE => HiveTableType.INDEX_TABLE
+      case CatalogTableType.VIRTUAL_VIEW => HiveTableType.VIRTUAL_VIEW
+    })
+    hiveTable.setFields(table.schema.map(toHiveColumn).asJava)
+    hiveTable.setPartCols(table.partitionColumns.map(toHiveColumn).asJava)
+    // TODO: set sort columns here too
+    hiveTable.setOwner(conf.getUser)
+    hiveTable.setNumBuckets(table.numBuckets)
+    hiveTable.setCreateTime((table.createTime / 1000).toInt)
+    hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt)
+    table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc) }
+    table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass)
+    table.storage.outputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass)
+    table.storage.serde.foreach(hiveTable.setSerializationLib)
+    table.storage.serdeProperties.foreach { case (k, v) => hiveTable.setSerdeParam(k, v) }
+    table.properties.foreach { case (k, v) => hiveTable.setProperty(k, v) }
+    table.viewOriginalText.foreach { t => hiveTable.setViewOriginalText(t) }
+    table.viewText.foreach { t => hiveTable.setViewExpandedText(t) }
+    hiveTable
+  }
+
+  private def toHiveViewTable(view: CatalogTable): HiveTable = {
+    val tbl = toHiveTable(view)
+    tbl.setTableType(HiveTableType.VIRTUAL_VIEW)
+    tbl.setSerializationLib(null)
+    tbl.clearSerDeInfo()
+    tbl
+  }
+
+  private def toHivePartition(
+      p: CatalogTablePartition,
+      ht: HiveTable): HivePartition = {
+    new HivePartition(ht, p.spec.asJava, p.storage.locationUri.map { l => new Path(l) }.orNull)
+  }
+
+  private def fromHivePartition(hp: HivePartition): CatalogTablePartition = {
+    val apiPartition = hp.getTPartition
+    CatalogTablePartition(
+      spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty),
+      storage = CatalogStorageFormat(
+        locationUri = Option(apiPartition.getSd.getLocation),
+        inputFormat = Option(apiPartition.getSd.getInputFormat),
+        outputFormat = Option(apiPartition.getSd.getOutputFormat),
+        serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib),
+        serdeProperties = apiPartition.getSd.getSerdeInfo.getParameters.asScala.toMap))
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 4c0aae6..3f81c99 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -19,10 +19,10 @@ package org.apache.spark.sql.hive.execution
 
 import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable}
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
 import org.apache.spark.sql.execution.RunnableCommand
 import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, MetastoreRelation}
-import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable}
 
 /**
  * Create table and insert the query result into it.
@@ -33,7 +33,7 @@ import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable}
  */
 private[hive]
 case class CreateTableAsSelect(
-    tableDesc: HiveTable,
+    tableDesc: CatalogTable,
     query: LogicalPlan,
     allowExisting: Boolean)
   extends RunnableCommand {
@@ -51,25 +51,25 @@ case class CreateTableAsSelect(
       import org.apache.hadoop.mapred.TextInputFormat
 
       val withFormat =
-        tableDesc.copy(
+        tableDesc.withNewStorage(
           inputFormat =
-            tableDesc.inputFormat.orElse(Some(classOf[TextInputFormat].getName)),
+            tableDesc.storage.inputFormat.orElse(Some(classOf[TextInputFormat].getName)),
           outputFormat =
-            tableDesc.outputFormat
+            tableDesc.storage.outputFormat
               .orElse(Some(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]].getName)),
-          serde = tableDesc.serde.orElse(Some(classOf[LazySimpleSerDe].getName())))
+          serde = tableDesc.storage.serde.orElse(Some(classOf[LazySimpleSerDe].getName)))
 
       val withSchema = if (withFormat.schema.isEmpty) {
         // Hive doesn't support specifying the column list for target table in CTAS
         // However we don't think SparkSQL should follow that.
-        tableDesc.copy(schema =
-        query.output.map(c =>
-          HiveColumn(c.name, HiveMetastoreTypes.toMetastoreType(c.dataType), null)))
+        tableDesc.copy(schema = query.output.map { c =>
+          CatalogColumn(c.name, HiveMetastoreTypes.toMetastoreType(c.dataType))
+        })
       } else {
         withFormat
       }
 
-      hiveContext.catalog.client.createTable(withSchema)
+      hiveContext.catalog.client.createTable(withSchema, ignoreIfExists = false)
 
       // Get the Metastore Relation
       hiveContext.catalog.lookupRelation(tableIdentifier, None) match {

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
index 5da58a7..2914d03 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
@@ -21,11 +21,11 @@ import scala.util.control.NonFatal
 
 import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable}
 import org.apache.spark.sql.catalyst.expressions.Alias
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
 import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, SQLBuilder}
-import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable}
+import org.apache.spark.sql.hive.{ HiveContext, HiveMetastoreTypes, SQLBuilder}
 
 /**
  * Create Hive view on non-hive-compatible tables by specifying schema ourselves instead of
@@ -34,7 +34,7 @@ import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable}
 // TODO: Note that this class can NOT canonicalize the view SQL string entirely, which is different
 // from Hive and may not work for some cases like create view on self join.
 private[hive] case class CreateViewAsSelect(
-    tableDesc: HiveTable,
+    tableDesc: CatalogTable,
     child: LogicalPlan,
     allowExisting: Boolean,
     orReplace: Boolean) extends RunnableCommand {
@@ -72,7 +72,7 @@ private[hive] case class CreateViewAsSelect(
     Seq.empty[Row]
   }
 
-  private def prepareTable(sqlContext: SQLContext): HiveTable = {
+  private def prepareTable(sqlContext: SQLContext): CatalogTable = {
     val expandedText = if (sqlContext.conf.canonicalView) {
       try rebuildViewQueryString(sqlContext) catch {
         case NonFatal(e) => wrapViewTextWithSelect
@@ -83,12 +83,16 @@ private[hive] case class CreateViewAsSelect(
 
     val viewSchema = {
       if (tableDesc.schema.isEmpty) {
-        childSchema.map { attr =>
-          HiveColumn(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), null)
+        childSchema.map { a =>
+          CatalogColumn(a.name, HiveMetastoreTypes.toMetastoreType(a.dataType))
         }
       } else {
-        childSchema.zip(tableDesc.schema).map { case (attr, col) =>
-          HiveColumn(col.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), col.comment)
+        childSchema.zip(tableDesc.schema).map { case (a, col) =>
+          CatalogColumn(
+            col.name,
+            HiveMetastoreTypes.toMetastoreType(a.dataType),
+            nullable = true,
+            col.comment)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index feb133d..d316664 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -205,7 +205,7 @@ case class InsertIntoHiveTable(
         val oldPart =
           catalog.client.getPartitionOption(
             catalog.client.getTable(table.databaseName, table.tableName),
-            partitionSpec.asJava)
+            partitionSpec)
 
         if (oldPart.isEmpty || !ifNotExists) {
             catalog.client.loadPartition(

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala
new file mode 100644
index 0000000..f73e7e2
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.util.VersionInfo
+
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.hive.client.{HiveClient, IsolatedClientLoader}
+import org.apache.spark.util.Utils
+
+
+/**
+ * Test suite for the [[HiveCatalog]].
+ */
+class HiveCatalogSuite extends CatalogTestCases {
+
+  private val client: HiveClient = {
+    IsolatedClientLoader.forVersion(
+      hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
+      hadoopVersion = VersionInfo.getVersion).createClient()
+  }
+
+  protected override val tableInputFormat: String =
+    "org.apache.hadoop.mapred.SequenceFileInputFormat"
+  protected override val tableOutputFormat: String =
+    "org.apache.hadoop.mapred.SequenceFileOutputFormat"
+
+  protected override def newUriForDatabase(): String = Utils.createTempDir().getAbsolutePath
+
+  protected override def resetState(): Unit = client.reset()
+
+  protected override def newEmptyCatalog(): Catalog = new HiveCatalog(client)
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index 14a83d5..f8764d4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -21,7 +21,7 @@ import java.io.File
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.{QueryTest, Row, SaveMode, SQLConf}
-import org.apache.spark.sql.hive.client.{ExternalTable, ManagedTable}
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils}
 import org.apache.spark.sql.types.{DecimalType, StringType, StructType}
@@ -83,16 +83,16 @@ class DataSourceWithHiveMetastoreCatalogSuite
         }
 
         val hiveTable = catalog.client.getTable("default", "t")
-        assert(hiveTable.inputFormat === Some(inputFormat))
-        assert(hiveTable.outputFormat === Some(outputFormat))
-        assert(hiveTable.serde === Some(serde))
+        assert(hiveTable.storage.inputFormat === Some(inputFormat))
+        assert(hiveTable.storage.outputFormat === Some(outputFormat))
+        assert(hiveTable.storage.serde === Some(serde))
 
-        assert(!hiveTable.isPartitioned)
-        assert(hiveTable.tableType === ManagedTable)
+        assert(hiveTable.partitionColumns.isEmpty)
+        assert(hiveTable.tableType === CatalogTableType.MANAGED_TABLE)
 
         val columns = hiveTable.schema
         assert(columns.map(_.name) === Seq("d1", "d2"))
-        assert(columns.map(_.hiveType) === Seq("decimal(10,3)", "string"))
+        assert(columns.map(_.dataType) === Seq("decimal(10,3)", "string"))
 
         checkAnswer(table("t"), testDF)
         assert(runSqlHive("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2"))
@@ -114,16 +114,17 @@ class DataSourceWithHiveMetastoreCatalogSuite
           }
 
           val hiveTable = catalog.client.getTable("default", "t")
-          assert(hiveTable.inputFormat === Some(inputFormat))
-          assert(hiveTable.outputFormat === Some(outputFormat))
-          assert(hiveTable.serde === Some(serde))
+          assert(hiveTable.storage.inputFormat === Some(inputFormat))
+          assert(hiveTable.storage.outputFormat === Some(outputFormat))
+          assert(hiveTable.storage.serde === Some(serde))
 
-          assert(hiveTable.tableType === ExternalTable)
-          assert(hiveTable.location.get === path.toURI.toString.stripSuffix(File.separator))
+          assert(hiveTable.tableType === CatalogTableType.EXTERNAL_TABLE)
+          assert(hiveTable.storage.locationUri ===
+            Some(path.toURI.toString.stripSuffix(File.separator)))
 
           val columns = hiveTable.schema
           assert(columns.map(_.name) === Seq("d1", "d2"))
-          assert(columns.map(_.hiveType) === Seq("decimal(10,3)", "string"))
+          assert(columns.map(_.dataType) === Seq("decimal(10,3)", "string"))
 
           checkAnswer(table("t"), testDF)
           assert(runSqlHive("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2"))
@@ -143,17 +144,16 @@ class DataSourceWithHiveMetastoreCatalogSuite
              """.stripMargin)
 
           val hiveTable = catalog.client.getTable("default", "t")
-          assert(hiveTable.inputFormat === Some(inputFormat))
-          assert(hiveTable.outputFormat === Some(outputFormat))
-          assert(hiveTable.serde === Some(serde))
+          assert(hiveTable.storage.inputFormat === Some(inputFormat))
+          assert(hiveTable.storage.outputFormat === Some(outputFormat))
+          assert(hiveTable.storage.serde === Some(serde))
 
-          assert(hiveTable.isPartitioned === false)
-          assert(hiveTable.tableType === ExternalTable)
-          assert(hiveTable.partitionColumns.length === 0)
+          assert(hiveTable.partitionColumns.isEmpty)
+          assert(hiveTable.tableType === CatalogTableType.EXTERNAL_TABLE)
 
           val columns = hiveTable.schema
           assert(columns.map(_.name) === Seq("d1", "d2"))
-          assert(columns.map(_.hiveType) === Seq("int", "string"))
+          assert(columns.map(_.dataType) === Seq("int", "string"))
 
           checkAnswer(table("t"), Row(1, "val_1"))
           assert(runSqlHive("SELECT * FROM t") === Seq("1\tval_1"))

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
index 137dadd..e869c0e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
@@ -22,15 +22,15 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.expressions.JsonTuple
 import org.apache.spark.sql.catalyst.parser.SimpleParserConf
 import org.apache.spark.sql.catalyst.plans.logical.Generate
-import org.apache.spark.sql.hive.client.{ExternalTable, HiveColumn, HiveTable, ManagedTable}
 
 class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
   val parser = new HiveQl(SimpleParserConf())
 
-  private def extractTableDesc(sql: String): (HiveTable, Boolean) = {
+  private def extractTableDesc(sql: String): (CatalogTable, Boolean) = {
     parser.parsePlan(sql).collect {
       case CreateTableAsSelect(desc, child, allowExisting) => (desc, allowExisting)
     }.head
@@ -53,28 +53,29 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
         |AS SELECT * FROM src""".stripMargin
 
     val (desc, exists) = extractTableDesc(s1)
-    assert(exists == true)
+    assert(exists)
     assert(desc.specifiedDatabase == Some("mydb"))
     assert(desc.name == "page_view")
-    assert(desc.tableType == ExternalTable)
-    assert(desc.location == Some("/user/external/page_view"))
+    assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE)
+    assert(desc.storage.locationUri == Some("/user/external/page_view"))
     assert(desc.schema ==
-      HiveColumn("viewtime", "int", null) ::
-        HiveColumn("userid", "bigint", null) ::
-        HiveColumn("page_url", "string", null) ::
-        HiveColumn("referrer_url", "string", null) ::
-        HiveColumn("ip", "string", "IP Address of the User") ::
-        HiveColumn("country", "string", "country of origination") :: Nil)
+      CatalogColumn("viewtime", "int") ::
+      CatalogColumn("userid", "bigint") ::
+      CatalogColumn("page_url", "string") ::
+      CatalogColumn("referrer_url", "string") ::
+      CatalogColumn("ip", "string", comment = Some("IP Address of the User")) ::
+      CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil)
     // TODO will be SQLText
     assert(desc.viewText == Option("This is the staging page view table"))
     assert(desc.partitionColumns ==
-      HiveColumn("dt", "string", "date type") ::
-        HiveColumn("hour", "string", "hour of the day") :: Nil)
-    assert(desc.serdeProperties ==
+      CatalogColumn("dt", "string", comment = Some("date type")) ::
+      CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
+    assert(desc.storage.serdeProperties ==
       Map((serdeConstants.SERIALIZATION_FORMAT, "\054"), (serdeConstants.FIELD_DELIM, "\054")))
-    assert(desc.inputFormat == Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
-    assert(desc.outputFormat == Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
-    assert(desc.serde == Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))
+    assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
+    assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
+    assert(desc.storage.serde ==
+      Some("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))
     assert(desc.properties == Map(("p1", "v1"), ("p2", "v2")))
   }
 
@@ -98,27 +99,27 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
         |AS SELECT * FROM src""".stripMargin
 
     val (desc, exists) = extractTableDesc(s2)
-    assert(exists == true)
+    assert(exists)
     assert(desc.specifiedDatabase == Some("mydb"))
     assert(desc.name == "page_view")
-    assert(desc.tableType == ExternalTable)
-    assert(desc.location == Some("/user/external/page_view"))
+    assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE)
+    assert(desc.storage.locationUri == Some("/user/external/page_view"))
     assert(desc.schema ==
-      HiveColumn("viewtime", "int", null) ::
-        HiveColumn("userid", "bigint", null) ::
-        HiveColumn("page_url", "string", null) ::
-        HiveColumn("referrer_url", "string", null) ::
-        HiveColumn("ip", "string", "IP Address of the User") ::
-        HiveColumn("country", "string", "country of origination") :: Nil)
+      CatalogColumn("viewtime", "int") ::
+      CatalogColumn("userid", "bigint") ::
+      CatalogColumn("page_url", "string") ::
+      CatalogColumn("referrer_url", "string") ::
+      CatalogColumn("ip", "string", comment = Some("IP Address of the User")) ::
+      CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil)
     // TODO will be SQLText
     assert(desc.viewText == Option("This is the staging page view table"))
     assert(desc.partitionColumns ==
-      HiveColumn("dt", "string", "date type") ::
-        HiveColumn("hour", "string", "hour of the day") :: Nil)
-    assert(desc.serdeProperties == Map())
-    assert(desc.inputFormat == Option("parquet.hive.DeprecatedParquetInputFormat"))
-    assert(desc.outputFormat == Option("parquet.hive.DeprecatedParquetOutputFormat"))
-    assert(desc.serde == Option("parquet.hive.serde.ParquetHiveSerDe"))
+      CatalogColumn("dt", "string", comment = Some("date type")) ::
+      CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
+    assert(desc.storage.serdeProperties == Map())
+    assert(desc.storage.inputFormat == Some("parquet.hive.DeprecatedParquetInputFormat"))
+    assert(desc.storage.outputFormat == Some("parquet.hive.DeprecatedParquetOutputFormat"))
+    assert(desc.storage.serde == Some("parquet.hive.serde.ParquetHiveSerDe"))
     assert(desc.properties == Map(("p1", "v1"), ("p2", "v2")))
   }
 
@@ -128,14 +129,15 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
     assert(exists == false)
     assert(desc.specifiedDatabase == None)
     assert(desc.name == "page_view")
-    assert(desc.tableType == ManagedTable)
-    assert(desc.location == None)
-    assert(desc.schema == Seq.empty[HiveColumn])
+    assert(desc.tableType == CatalogTableType.MANAGED_TABLE)
+    assert(desc.storage.locationUri == None)
+    assert(desc.schema == Seq.empty[CatalogColumn])
     assert(desc.viewText == None) // TODO will be SQLText
-    assert(desc.serdeProperties == Map())
-    assert(desc.inputFormat == Option("org.apache.hadoop.mapred.TextInputFormat"))
-    assert(desc.outputFormat == Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
-    assert(desc.serde.isEmpty)
+    assert(desc.storage.serdeProperties == Map())
+    assert(desc.storage.inputFormat == Some("org.apache.hadoop.mapred.TextInputFormat"))
+    assert(desc.storage.outputFormat ==
+      Some("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
+    assert(desc.storage.serde.isEmpty)
     assert(desc.properties == Map())
   }
 
@@ -162,14 +164,14 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
     assert(exists == false)
     assert(desc.specifiedDatabase == None)
     assert(desc.name == "ctas2")
-    assert(desc.tableType == ManagedTable)
-    assert(desc.location == None)
-    assert(desc.schema == Seq.empty[HiveColumn])
+    assert(desc.tableType == CatalogTableType.MANAGED_TABLE)
+    assert(desc.storage.locationUri == None)
+    assert(desc.schema == Seq.empty[CatalogColumn])
     assert(desc.viewText == None) // TODO will be SQLText
-    assert(desc.serdeProperties == Map(("serde_p1" -> "p1"), ("serde_p2" -> "p2")))
-    assert(desc.inputFormat == Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
-    assert(desc.outputFormat == Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
-    assert(desc.serde == Option("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"))
+    assert(desc.storage.serdeProperties == Map(("serde_p1" -> "p1"), ("serde_p2" -> "p2")))
+    assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
+    assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
+    assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"))
     assert(desc.properties == Map(("tbl_p1" -> "p11"), ("tbl_p2" -> "p22")))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index d9e4b02..0c288bd 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -25,9 +25,9 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
-import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable}
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types._
@@ -724,20 +724,25 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
     val tableName = "spark6655"
     withTable(tableName) {
       val schema = StructType(StructField("int", IntegerType, true) :: Nil)
-      val hiveTable = HiveTable(
+      val hiveTable = CatalogTable(
         specifiedDatabase = Some("default"),
         name = tableName,
+        tableType = CatalogTableType.MANAGED_TABLE,
         schema = Seq.empty,
-        partitionColumns = Seq.empty,
+        storage = CatalogStorageFormat(
+          locationUri = None,
+          inputFormat = None,
+          outputFormat = None,
+          serde = None,
+          serdeProperties = Map(
+            "path" -> catalog.hiveDefaultTableFilePath(TableIdentifier(tableName)))
+        ),
         properties = Map(
           "spark.sql.sources.provider" -> "json",
           "spark.sql.sources.schema" -> schema.json,
-          "EXTERNAL" -> "FALSE"),
-        tableType = ManagedTable,
-        serdeProperties = Map(
-          "path" -> catalog.hiveDefaultTableFilePath(TableIdentifier(tableName))))
+          "EXTERNAL" -> "FALSE"))
 
-      catalog.client.createTable(hiveTable)
+      catalog.client.createTable(hiveTable, ignoreIfExists = false)
 
       invalidateTable(tableName)
       val actualSchema = table(tableName).schema
@@ -916,7 +921,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
     // As a proxy for verifying that the table was stored in Hive compatible format, we verify that
     // each column of the table is of native type StringType.
     assert(catalog.client.getTable("default", "not_skip_hive_metadata").schema
-      .forall(column => HiveMetastoreTypes.toDataType(column.hiveType) == StringType))
+      .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == StringType))
 
     catalog.createDataSourceTable(
       tableIdent = TableIdentifier("skip_hive_metadata"),
@@ -930,6 +935,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
     // As a proxy for verifying that the table was stored in SparkSQL format, we verify that
     // the table has a column type as array of StringType.
     assert(catalog.client.getTable("default", "skip_hive_metadata").schema
-      .forall(column => HiveMetastoreTypes.toDataType(column.hiveType) == ArrayType(StringType)))
+      .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == ArrayType(StringType)))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
index c2c896e..488f298 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
@@ -26,9 +26,9 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
 
   private def checkTablePath(dbName: String, tableName: String): Unit = {
     val metastoreTable = hiveContext.catalog.client.getTable(dbName, tableName)
-    val expectedPath = hiveContext.catalog.client.getDatabase(dbName).location + "/" + tableName
+    val expectedPath = hiveContext.catalog.client.getDatabase(dbName).locationUri + "/" + tableName
 
-    assert(metastoreTable.serdeProperties("path") === expectedPath)
+    assert(metastoreTable.storage.serdeProperties("path") === expectedPath)
   }
 
   test(s"saveAsTable() to non-default database - with USE - Overwrite") {

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 1344a2c..d850d52 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -22,6 +22,7 @@ import java.io.File
 import org.apache.hadoop.util.VersionInfo
 
 import org.apache.spark.{Logging, SparkFunSuite}
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression}
 import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.hive.HiveContext
@@ -60,8 +61,8 @@ class VersionsSuite extends SparkFunSuite with Logging {
       hadoopVersion = VersionInfo.getVersion,
       config = buildConf(),
       ivyPath = ivyPath).createClient()
-    val db = new HiveDatabase("default", "")
-    badClient.createDatabase(db)
+    val db = new CatalogDatabase("default", "desc", "loc", Map())
+    badClient.createDatabase(db, ignoreIfExists = true)
   }
 
   private def getNestedMessages(e: Throwable): String = {
@@ -116,29 +117,27 @@ class VersionsSuite extends SparkFunSuite with Logging {
     }
 
     test(s"$version: createDatabase") {
-      val db = HiveDatabase("default", "")
-      client.createDatabase(db)
+      val db = CatalogDatabase("default", "desc", "loc", Map())
+      client.createDatabase(db, ignoreIfExists = true)
     }
 
     test(s"$version: createTable") {
       val table =
-        HiveTable(
+        CatalogTable(
           specifiedDatabase = Option("default"),
           name = "src",
-          schema = Seq(HiveColumn("key", "int", "")),
-          partitionColumns = Seq.empty,
-          properties = Map.empty,
-          serdeProperties = Map.empty,
-          tableType = ManagedTable,
-          location = None,
-          inputFormat =
-            Some(classOf[org.apache.hadoop.mapred.TextInputFormat].getName),
-          outputFormat =
-            Some(classOf[org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat[_, _]].getName),
-          serde =
-            Some(classOf[org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe].getName()))
-
-      client.createTable(table)
+          tableType = CatalogTableType.MANAGED_TABLE,
+          schema = Seq(CatalogColumn("key", "int")),
+          storage = CatalogStorageFormat(
+            locationUri = None,
+            inputFormat = Some(classOf[org.apache.hadoop.mapred.TextInputFormat].getName),
+            outputFormat = Some(
+              classOf[org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat[_, _]].getName),
+            serde = Some(classOf[org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe].getName()),
+            serdeProperties = Map.empty
+          ))
+
+      client.createTable(table, ignoreIfExists = false)
     }
 
     test(s"$version: getTable") {

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
index b91248b..37c0179 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
@@ -149,7 +149,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter {
       val (actualScannedColumns, actualPartValues) = plan.collect {
         case p @ HiveTableScan(columns, relation, _) =>
           val columnNames = columns.map(_.name)
-          val partValues = if (relation.table.isPartitioned) {
+          val partValues = if (relation.table.partitionColumns.nonEmpty) {
             p.prunePartitions(relation.getHiveQlPartitions()).map(_.getValues)
           } else {
             Seq.empty


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: [SPARK-13080][SQL] Implement new Catalog API using Hive

Posted by rx...@apache.org.
[SPARK-13080][SQL] Implement new Catalog API using Hive

## What changes were proposed in this pull request?

This is a step towards merging `SQLContext` and `HiveContext`. A new internal Catalog API was introduced in #10982 and extended in #11069. This patch introduces an implementation of this API using `HiveClient`, an existing interface to Hive. It also extends `HiveClient` with additional calls to Hive that are needed to complete the catalog implementation.

*Where should I start reviewing?* The new catalog introduced is `HiveCatalog`. This class is relatively simple because it just calls `HiveClientImpl`, where most of the new logic is. I would not start with `HiveClient`, `HiveQl`, or `HiveMetastoreCatalog`, which are modified mainly because of a refactor.

*Why is this patch so big?* I had to refactor HiveClient to remove an intermediate representation of databases, tables, partitions etc. After this refactor `CatalogTable` convert directly to and from `HiveTable` (etc.). Otherwise we would have to first convert `CatalogTable` to the intermediate representation and then convert that to HiveTable, which is messy.

The new class hierarchy is as follows:
```
org.apache.spark.sql.catalyst.catalog.Catalog
  - org.apache.spark.sql.catalyst.catalog.InMemoryCatalog
  - org.apache.spark.sql.hive.HiveCatalog
```

Note that, as of this patch, none of these classes are currently used anywhere yet. This will come in the future before the Spark 2.0 release.

## How was the this patch tested?
All existing unit tests, and HiveCatalogSuite that extends CatalogTestCases.

Author: Andrew Or <an...@databricks.com>
Author: Reynold Xin <rx...@databricks.com>

Closes #11293 from rxin/hive-catalog.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c3832b2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c3832b2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c3832b2

Branch: refs/heads/master
Commit: 6c3832b26e119626205732b8fd03c8f5ba986896
Parents: 7eb83fe
Author: Andrew Or <an...@databricks.com>
Authored: Sun Feb 21 15:00:24 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sun Feb 21 15:00:24 2016 -0800

----------------------------------------------------------------------
 .../apache/spark/sql/AnalysisException.scala    |   3 +
 .../spark/sql/catalyst/analysis/Catalog.scala   |   9 -
 .../catalyst/analysis/NoSuchItemException.scala |  52 +++
 .../sql/catalyst/catalog/InMemoryCatalog.scala  | 154 ++++----
 .../spark/sql/catalyst/catalog/interface.scala  | 190 ++++++----
 .../sql/catalyst/catalog/CatalogTestCases.scala | 284 +++++++++-----
 .../org/apache/spark/sql/hive/HiveCatalog.scala | 293 ++++++++++++++
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 171 +++++----
 .../org/apache/spark/sql/hive/HiveQl.scala      | 145 +++----
 .../spark/sql/hive/client/HiveClient.scala      | 210 ++++++----
 .../spark/sql/hive/client/HiveClientImpl.scala  | 379 +++++++++++++------
 .../hive/execution/CreateTableAsSelect.scala    |  20 +-
 .../sql/hive/execution/CreateViewAsSelect.scala |  20 +-
 .../hive/execution/InsertIntoHiveTable.scala    |   2 +-
 .../spark/sql/hive/HiveCatalogSuite.scala       |  49 +++
 .../sql/hive/HiveMetastoreCatalogSuite.scala    |  40 +-
 .../org/apache/spark/sql/hive/HiveQlSuite.scala |  94 ++---
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  25 +-
 .../spark/sql/hive/MultiDatabaseSuite.scala     |   4 +-
 .../spark/sql/hive/client/VersionsSuite.scala   |  37 +-
 .../spark/sql/hive/execution/PruningSuite.scala |   2 +-
 21 files changed, 1483 insertions(+), 700 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
index f999218..97f28fa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
@@ -19,6 +19,9 @@ package org.apache.spark.sql
 
 import org.apache.spark.annotation.DeveloperApi
 
+
+// TODO: don't swallow original stack trace if it exists
+
 /**
  * :: DeveloperApi ::
  * Thrown when a query fails to analyze, usually because the query itself is invalid.

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index 67edab5..52b284b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -20,20 +20,11 @@ package org.apache.spark.sql.catalyst.analysis
 import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{CatalystConf, EmptyConf, TableIdentifier}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 
-/**
- * Thrown by a catalog when a table cannot be found.  The analyzer will rethrow the exception
- * as an AnalysisException with the correct position information.
- */
-class NoSuchTableException extends Exception
-
-class NoSuchDatabaseException extends Exception
 
 /**
  * An interface for looking up relations by name.  Used by an [[Analyzer]].

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
new file mode 100644
index 0000000..81399db
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.catalog.Catalog.TablePartitionSpec
+
+
+/**
+ * Thrown by a catalog when an item cannot be found. The analyzer will rethrow the exception
+ * as an [[org.apache.spark.sql.AnalysisException]] with the correct position information.
+ */
+abstract class NoSuchItemException extends Exception {
+  override def getMessage: String
+}
+
+class NoSuchDatabaseException(db: String) extends NoSuchItemException {
+  override def getMessage: String = s"Database $db not found"
+}
+
+class NoSuchTableException(db: String, table: String) extends NoSuchItemException {
+  override def getMessage: String = s"Table $table not found in database $db"
+}
+
+class NoSuchPartitionException(
+    db: String,
+    table: String,
+    spec: TablePartitionSpec)
+  extends NoSuchItemException {
+
+  override def getMessage: String = {
+    s"Partition not found in table $table database $db:\n" + spec.mkString("\n")
+  }
+}
+
+class NoSuchFunctionException(db: String, func: String) extends NoSuchItemException {
+  override def getMessage: String = s"Function $func not found in database $db"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 38be61c..cba4de3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -30,15 +30,16 @@ import org.apache.spark.sql.AnalysisException
 class InMemoryCatalog extends Catalog {
   import Catalog._
 
-  private class TableDesc(var table: Table) {
-    val partitions = new mutable.HashMap[PartitionSpec, TablePartition]
+  private class TableDesc(var table: CatalogTable) {
+    val partitions = new mutable.HashMap[TablePartitionSpec, CatalogTablePartition]
   }
 
-  private class DatabaseDesc(var db: Database) {
+  private class DatabaseDesc(var db: CatalogDatabase) {
     val tables = new mutable.HashMap[String, TableDesc]
-    val functions = new mutable.HashMap[String, Function]
+    val functions = new mutable.HashMap[String, CatalogFunction]
   }
 
+  // Database name -> description
   private val catalog = new scala.collection.mutable.HashMap[String, DatabaseDesc]
 
   private def filterPattern(names: Seq[String], pattern: String): Seq[String] = {
@@ -47,39 +48,33 @@ class InMemoryCatalog extends Catalog {
   }
 
   private def existsFunction(db: String, funcName: String): Boolean = {
-    assertDbExists(db)
+    requireDbExists(db)
     catalog(db).functions.contains(funcName)
   }
 
   private def existsTable(db: String, table: String): Boolean = {
-    assertDbExists(db)
+    requireDbExists(db)
     catalog(db).tables.contains(table)
   }
 
-  private def existsPartition(db: String, table: String, spec: PartitionSpec): Boolean = {
-    assertTableExists(db, table)
+  private def existsPartition(db: String, table: String, spec: TablePartitionSpec): Boolean = {
+    requireTableExists(db, table)
     catalog(db).tables(table).partitions.contains(spec)
   }
 
-  private def assertDbExists(db: String): Unit = {
-    if (!catalog.contains(db)) {
-      throw new AnalysisException(s"Database $db does not exist")
-    }
-  }
-
-  private def assertFunctionExists(db: String, funcName: String): Unit = {
+  private def requireFunctionExists(db: String, funcName: String): Unit = {
     if (!existsFunction(db, funcName)) {
       throw new AnalysisException(s"Function $funcName does not exist in $db database")
     }
   }
 
-  private def assertTableExists(db: String, table: String): Unit = {
+  private def requireTableExists(db: String, table: String): Unit = {
     if (!existsTable(db, table)) {
       throw new AnalysisException(s"Table $table does not exist in $db database")
     }
   }
 
-  private def assertPartitionExists(db: String, table: String, spec: PartitionSpec): Unit = {
+  private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = {
     if (!existsPartition(db, table, spec)) {
       throw new AnalysisException(s"Partition does not exist in database $db table $table: $spec")
     }
@@ -90,7 +85,7 @@ class InMemoryCatalog extends Catalog {
   // --------------------------------------------------------------------------
 
   override def createDatabase(
-      dbDefinition: Database,
+      dbDefinition: CatalogDatabase,
       ignoreIfExists: Boolean): Unit = synchronized {
     if (catalog.contains(dbDefinition.name)) {
       if (!ignoreIfExists) {
@@ -124,17 +119,20 @@ class InMemoryCatalog extends Catalog {
     }
   }
 
-  override def alterDatabase(db: String, dbDefinition: Database): Unit = synchronized {
-    assertDbExists(db)
-    assert(db == dbDefinition.name)
-    catalog(db).db = dbDefinition
+  override def alterDatabase(dbDefinition: CatalogDatabase): Unit = synchronized {
+    requireDbExists(dbDefinition.name)
+    catalog(dbDefinition.name).db = dbDefinition
   }
 
-  override def getDatabase(db: String): Database = synchronized {
-    assertDbExists(db)
+  override def getDatabase(db: String): CatalogDatabase = synchronized {
+    requireDbExists(db)
     catalog(db).db
   }
 
+  override def databaseExists(db: String): Boolean = synchronized {
+    catalog.contains(db)
+  }
+
   override def listDatabases(): Seq[String] = synchronized {
     catalog.keySet.toSeq
   }
@@ -143,15 +141,17 @@ class InMemoryCatalog extends Catalog {
     filterPattern(listDatabases(), pattern)
   }
 
+  override def setCurrentDatabase(db: String): Unit = { /* no-op */ }
+
   // --------------------------------------------------------------------------
   // Tables
   // --------------------------------------------------------------------------
 
   override def createTable(
       db: String,
-      tableDefinition: Table,
+      tableDefinition: CatalogTable,
       ignoreIfExists: Boolean): Unit = synchronized {
-    assertDbExists(db)
+    requireDbExists(db)
     if (existsTable(db, tableDefinition.name)) {
       if (!ignoreIfExists) {
         throw new AnalysisException(s"Table ${tableDefinition.name} already exists in $db database")
@@ -165,7 +165,7 @@ class InMemoryCatalog extends Catalog {
       db: String,
       table: String,
       ignoreIfNotExists: Boolean): Unit = synchronized {
-    assertDbExists(db)
+    requireDbExists(db)
     if (existsTable(db, table)) {
       catalog(db).tables.remove(table)
     } else {
@@ -176,31 +176,30 @@ class InMemoryCatalog extends Catalog {
   }
 
   override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
-    assertTableExists(db, oldName)
+    requireTableExists(db, oldName)
     val oldDesc = catalog(db).tables(oldName)
     oldDesc.table = oldDesc.table.copy(name = newName)
     catalog(db).tables.put(newName, oldDesc)
     catalog(db).tables.remove(oldName)
   }
 
-  override def alterTable(db: String, table: String, tableDefinition: Table): Unit = synchronized {
-    assertTableExists(db, table)
-    assert(table == tableDefinition.name)
-    catalog(db).tables(table).table = tableDefinition
+  override def alterTable(db: String, tableDefinition: CatalogTable): Unit = synchronized {
+    requireTableExists(db, tableDefinition.name)
+    catalog(db).tables(tableDefinition.name).table = tableDefinition
   }
 
-  override def getTable(db: String, table: String): Table = synchronized {
-    assertTableExists(db, table)
+  override def getTable(db: String, table: String): CatalogTable = synchronized {
+    requireTableExists(db, table)
     catalog(db).tables(table).table
   }
 
   override def listTables(db: String): Seq[String] = synchronized {
-    assertDbExists(db)
+    requireDbExists(db)
     catalog(db).tables.keySet.toSeq
   }
 
   override def listTables(db: String, pattern: String): Seq[String] = synchronized {
-    assertDbExists(db)
+    requireDbExists(db)
     filterPattern(listTables(db), pattern)
   }
 
@@ -211,9 +210,9 @@ class InMemoryCatalog extends Catalog {
   override def createPartitions(
       db: String,
       table: String,
-      parts: Seq[TablePartition],
+      parts: Seq[CatalogTablePartition],
       ignoreIfExists: Boolean): Unit = synchronized {
-    assertTableExists(db, table)
+    requireTableExists(db, table)
     val existingParts = catalog(db).tables(table).partitions
     if (!ignoreIfExists) {
       val dupSpecs = parts.collect { case p if existingParts.contains(p.spec) => p.spec }
@@ -229,9 +228,9 @@ class InMemoryCatalog extends Catalog {
   override def dropPartitions(
       db: String,
       table: String,
-      partSpecs: Seq[PartitionSpec],
+      partSpecs: Seq[TablePartitionSpec],
       ignoreIfNotExists: Boolean): Unit = synchronized {
-    assertTableExists(db, table)
+    requireTableExists(db, table)
     val existingParts = catalog(db).tables(table).partitions
     if (!ignoreIfNotExists) {
       val missingSpecs = partSpecs.collect { case s if !existingParts.contains(s) => s }
@@ -244,30 +243,42 @@ class InMemoryCatalog extends Catalog {
     partSpecs.foreach(existingParts.remove)
   }
 
-  override def alterPartition(
+  override def renamePartitions(
       db: String,
       table: String,
-      spec: Map[String, String],
-      newPart: TablePartition): Unit = synchronized {
-    assertPartitionExists(db, table, spec)
-    val existingParts = catalog(db).tables(table).partitions
-    if (spec != newPart.spec) {
-      // Also a change in specs; remove the old one and add the new one back
-      existingParts.remove(spec)
+      specs: Seq[TablePartitionSpec],
+      newSpecs: Seq[TablePartitionSpec]): Unit = synchronized {
+    require(specs.size == newSpecs.size, "number of old and new partition specs differ")
+    specs.zip(newSpecs).foreach { case (oldSpec, newSpec) =>
+      val newPart = getPartition(db, table, oldSpec).copy(spec = newSpec)
+      val existingParts = catalog(db).tables(table).partitions
+      existingParts.remove(oldSpec)
+      existingParts.put(newSpec, newPart)
+    }
+  }
+
+  override def alterPartitions(
+      db: String,
+      table: String,
+      parts: Seq[CatalogTablePartition]): Unit = synchronized {
+    parts.foreach { p =>
+      requirePartitionExists(db, table, p.spec)
+      catalog(db).tables(table).partitions.put(p.spec, p)
     }
-    existingParts.put(newPart.spec, newPart)
   }
 
   override def getPartition(
       db: String,
       table: String,
-      spec: Map[String, String]): TablePartition = synchronized {
-    assertPartitionExists(db, table, spec)
+      spec: TablePartitionSpec): CatalogTablePartition = synchronized {
+    requirePartitionExists(db, table, spec)
     catalog(db).tables(table).partitions(spec)
   }
 
-  override def listPartitions(db: String, table: String): Seq[TablePartition] = synchronized {
-    assertTableExists(db, table)
+  override def listPartitions(
+      db: String,
+      table: String): Seq[CatalogTablePartition] = synchronized {
+    requireTableExists(db, table)
     catalog(db).tables(table).partitions.values.toSeq
   }
 
@@ -275,44 +286,39 @@ class InMemoryCatalog extends Catalog {
   // Functions
   // --------------------------------------------------------------------------
 
-  override def createFunction(
-      db: String,
-      func: Function,
-      ignoreIfExists: Boolean): Unit = synchronized {
-    assertDbExists(db)
+  override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
+    requireDbExists(db)
     if (existsFunction(db, func.name)) {
-      if (!ignoreIfExists) {
-        throw new AnalysisException(s"Function $func already exists in $db database")
-      }
+      throw new AnalysisException(s"Function $func already exists in $db database")
     } else {
       catalog(db).functions.put(func.name, func)
     }
   }
 
   override def dropFunction(db: String, funcName: String): Unit = synchronized {
-    assertFunctionExists(db, funcName)
+    requireFunctionExists(db, funcName)
     catalog(db).functions.remove(funcName)
   }
 
-  override def alterFunction(
-      db: String,
-      funcName: String,
-      funcDefinition: Function): Unit = synchronized {
-    assertFunctionExists(db, funcName)
-    if (funcName != funcDefinition.name) {
-      // Also a rename; remove the old one and add the new one back
-      catalog(db).functions.remove(funcName)
-    }
+  override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized {
+    requireFunctionExists(db, oldName)
+    val newFunc = getFunction(db, oldName).copy(name = newName)
+    catalog(db).functions.remove(oldName)
+    catalog(db).functions.put(newName, newFunc)
+  }
+
+  override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = synchronized {
+    requireFunctionExists(db, funcDefinition.name)
     catalog(db).functions.put(funcDefinition.name, funcDefinition)
   }
 
-  override def getFunction(db: String, funcName: String): Function = synchronized {
-    assertFunctionExists(db, funcName)
+  override def getFunction(db: String, funcName: String): CatalogFunction = synchronized {
+    requireFunctionExists(db, funcName)
     catalog(db).functions(funcName)
   }
 
   override def listFunctions(db: String, pattern: String): Seq[String] = synchronized {
-    assertDbExists(db)
+    requireDbExists(db)
     filterPattern(catalog(db).functions.keysIterator.toSeq, pattern)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 56aaa6b..dac5f02 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst.catalog
 
+import javax.annotation.Nullable
+
 import org.apache.spark.sql.AnalysisException
 
 
@@ -31,41 +33,59 @@ import org.apache.spark.sql.AnalysisException
 abstract class Catalog {
   import Catalog._
 
+  protected def requireDbExists(db: String): Unit = {
+    if (!databaseExists(db)) {
+      throw new AnalysisException(s"Database $db does not exist")
+    }
+  }
+
   // --------------------------------------------------------------------------
   // Databases
   // --------------------------------------------------------------------------
 
-  def createDatabase(dbDefinition: Database, ignoreIfExists: Boolean): Unit
+  def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit
 
   def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
 
   /**
-   * Alter an existing database. This operation does not support renaming.
+   * Alter a database whose name matches the one specified in `dbDefinition`,
+   * assuming the database exists.
+   *
+   * Note: If the underlying implementation does not support altering a certain field,
+   * this becomes a no-op.
    */
-  def alterDatabase(db: String, dbDefinition: Database): Unit
+  def alterDatabase(dbDefinition: CatalogDatabase): Unit
 
-  def getDatabase(db: String): Database
+  def getDatabase(db: String): CatalogDatabase
+
+  def databaseExists(db: String): Boolean
 
   def listDatabases(): Seq[String]
 
   def listDatabases(pattern: String): Seq[String]
 
+  def setCurrentDatabase(db: String): Unit
+
   // --------------------------------------------------------------------------
   // Tables
   // --------------------------------------------------------------------------
 
-  def createTable(db: String, tableDefinition: Table, ignoreIfExists: Boolean): Unit
+  def createTable(db: String, tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit
 
   def dropTable(db: String, table: String, ignoreIfNotExists: Boolean): Unit
 
   def renameTable(db: String, oldName: String, newName: String): Unit
 
   /**
-   * Alter an existing table. This operation does not support renaming.
+   * Alter a table whose name that matches the one specified in `tableDefinition`,
+   * assuming the table exists.
+   *
+   * Note: If the underlying implementation does not support altering a certain field,
+   * this becomes a no-op.
    */
-  def alterTable(db: String, table: String, tableDefinition: Table): Unit
+  def alterTable(db: String, tableDefinition: CatalogTable): Unit
 
-  def getTable(db: String, table: String): Table
+  def getTable(db: String, table: String): CatalogTable
 
   def listTables(db: String): Seq[String]
 
@@ -78,43 +98,62 @@ abstract class Catalog {
   def createPartitions(
       db: String,
       table: String,
-      parts: Seq[TablePartition],
+      parts: Seq[CatalogTablePartition],
       ignoreIfExists: Boolean): Unit
 
   def dropPartitions(
       db: String,
       table: String,
-      parts: Seq[PartitionSpec],
+      parts: Seq[TablePartitionSpec],
       ignoreIfNotExists: Boolean): Unit
 
   /**
-   * Alter an existing table partition and optionally override its spec.
+   * Override the specs of one or many existing table partitions, assuming they exist.
+   * This assumes index i of `specs` corresponds to index i of `newSpecs`.
+   */
+  def renamePartitions(
+      db: String,
+      table: String,
+      specs: Seq[TablePartitionSpec],
+      newSpecs: Seq[TablePartitionSpec]): Unit
+
+  /**
+   * Alter one or many table partitions whose specs that match those specified in `parts`,
+   * assuming the partitions exist.
+   *
+   * Note: If the underlying implementation does not support altering a certain field,
+   * this becomes a no-op.
    */
-  def alterPartition(
+  def alterPartitions(
       db: String,
       table: String,
-      spec: PartitionSpec,
-      newPart: TablePartition): Unit
+      parts: Seq[CatalogTablePartition]): Unit
 
-  def getPartition(db: String, table: String, spec: PartitionSpec): TablePartition
+  def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition
 
   // TODO: support listing by pattern
-  def listPartitions(db: String, table: String): Seq[TablePartition]
+  def listPartitions(db: String, table: String): Seq[CatalogTablePartition]
 
   // --------------------------------------------------------------------------
   // Functions
   // --------------------------------------------------------------------------
 
-  def createFunction(db: String, funcDefinition: Function, ignoreIfExists: Boolean): Unit
+  def createFunction(db: String, funcDefinition: CatalogFunction): Unit
 
   def dropFunction(db: String, funcName: String): Unit
 
+  def renameFunction(db: String, oldName: String, newName: String): Unit
+
   /**
-   * Alter an existing function and optionally override its name.
+   * Alter a function whose name that matches the one specified in `funcDefinition`,
+   * assuming the function exists.
+   *
+   * Note: If the underlying implementation does not support altering a certain field,
+   * this becomes a no-op.
    */
-  def alterFunction(db: String, funcName: String, funcDefinition: Function): Unit
+  def alterFunction(db: String, funcDefinition: CatalogFunction): Unit
 
-  def getFunction(db: String, funcName: String): Function
+  def getFunction(db: String, funcName: String): CatalogFunction
 
   def listFunctions(db: String, pattern: String): Seq[String]
 
@@ -127,33 +166,30 @@ abstract class Catalog {
  * @param name name of the function
  * @param className fully qualified class name, e.g. "org.apache.spark.util.MyFunc"
  */
-case class Function(
-  name: String,
-  className: String
-)
+case class CatalogFunction(name: String, className: String)
 
 
 /**
  * Storage format, used to describe how a partition or a table is stored.
  */
-case class StorageFormat(
-  locationUri: String,
-  inputFormat: String,
-  outputFormat: String,
-  serde: String,
-  serdeProperties: Map[String, String]
-)
+case class CatalogStorageFormat(
+    locationUri: Option[String],
+    inputFormat: Option[String],
+    outputFormat: Option[String],
+    serde: Option[String],
+    serdeProperties: Map[String, String])
 
 
 /**
  * A column in a table.
  */
-case class Column(
-  name: String,
-  dataType: String,
-  nullable: Boolean,
-  comment: String
-)
+case class CatalogColumn(
+    name: String,
+    // This may be null when used to create views. TODO: make this type-safe; this is left
+    // as a string due to issues in converting Hive varchars to and from SparkSQL strings.
+    @Nullable dataType: String,
+    nullable: Boolean = true,
+    comment: Option[String] = None)
 
 
 /**
@@ -162,10 +198,7 @@ case class Column(
  * @param spec partition spec values indexed by column name
  * @param storage storage format of the partition
  */
-case class TablePartition(
-  spec: Catalog.PartitionSpec,
-  storage: StorageFormat
-)
+case class CatalogTablePartition(spec: Catalog.TablePartitionSpec, storage: CatalogStorageFormat)
 
 
 /**
@@ -174,40 +207,65 @@ case class TablePartition(
  * Note that Hive's metastore also tracks skewed columns. We should consider adding that in the
  * future once we have a better understanding of how we want to handle skewed columns.
  */
-case class Table(
-  name: String,
-  description: String,
-  schema: Seq[Column],
-  partitionColumns: Seq[Column],
-  sortColumns: Seq[Column],
-  storage: StorageFormat,
-  numBuckets: Int,
-  properties: Map[String, String],
-  tableType: String,
-  createTime: Long,
-  lastAccessTime: Long,
-  viewOriginalText: Option[String],
-  viewText: Option[String]) {
-
-  require(tableType == "EXTERNAL_TABLE" || tableType == "INDEX_TABLE" ||
-    tableType == "MANAGED_TABLE" || tableType == "VIRTUAL_VIEW")
+case class CatalogTable(
+    specifiedDatabase: Option[String],
+    name: String,
+    tableType: CatalogTableType,
+    storage: CatalogStorageFormat,
+    schema: Seq[CatalogColumn],
+    partitionColumns: Seq[CatalogColumn] = Seq.empty,
+    sortColumns: Seq[CatalogColumn] = Seq.empty,
+    numBuckets: Int = 0,
+    createTime: Long = System.currentTimeMillis,
+    lastAccessTime: Long = System.currentTimeMillis,
+    properties: Map[String, String] = Map.empty,
+    viewOriginalText: Option[String] = None,
+    viewText: Option[String] = None) {
+
+  /** Return the database this table was specified to belong to, assuming it exists. */
+  def database: String = specifiedDatabase.getOrElse {
+    throw new AnalysisException(s"table $name did not specify database")
+  }
+
+  /** Return the fully qualified name of this table, assuming the database was specified. */
+  def qualifiedName: String = s"$database.$name"
+
+  /** Syntactic sugar to update a field in `storage`. */
+  def withNewStorage(
+      locationUri: Option[String] = storage.locationUri,
+      inputFormat: Option[String] = storage.inputFormat,
+      outputFormat: Option[String] = storage.outputFormat,
+      serde: Option[String] = storage.serde,
+      serdeProperties: Map[String, String] = storage.serdeProperties): CatalogTable = {
+    copy(storage = CatalogStorageFormat(
+      locationUri, inputFormat, outputFormat, serde, serdeProperties))
+  }
+
+}
+
+
+case class CatalogTableType private(name: String)
+object CatalogTableType {
+  val EXTERNAL_TABLE = new CatalogTableType("EXTERNAL_TABLE")
+  val MANAGED_TABLE = new CatalogTableType("MANAGED_TABLE")
+  val INDEX_TABLE = new CatalogTableType("INDEX_TABLE")
+  val VIRTUAL_VIEW = new CatalogTableType("VIRTUAL_VIEW")
 }
 
 
 /**
  * A database defined in the catalog.
  */
-case class Database(
-  name: String,
-  description: String,
-  locationUri: String,
-  properties: Map[String, String]
-)
+case class CatalogDatabase(
+    name: String,
+    description: String,
+    locationUri: String,
+    properties: Map[String, String])
 
 
 object Catalog {
   /**
-   * Specifications of a table partition indexed by column name.
+   * Specifications of a table partition. Mapping column name to column value.
    */
-  type PartitionSpec = Map[String, String]
+  type TablePartitionSpec = Map[String, String]
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
index 45c5cee..e0d1220 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst.catalog
 
+import org.scalatest.BeforeAndAfterEach
+
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.AnalysisException
 
@@ -26,18 +28,38 @@ import org.apache.spark.sql.AnalysisException
  *
  * Implementations of the [[Catalog]] interface can create test suites by extending this.
  */
-abstract class CatalogTestCases extends SparkFunSuite {
-  private val storageFormat = StorageFormat("usa", "$", "zzz", "serde", Map())
-  private val part1 = TablePartition(Map("a" -> "1"), storageFormat)
-  private val part2 = TablePartition(Map("b" -> "2"), storageFormat)
-  private val part3 = TablePartition(Map("c" -> "3"), storageFormat)
+abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
+  private lazy val storageFormat = CatalogStorageFormat(
+    locationUri = None,
+    inputFormat = Some(tableInputFormat),
+    outputFormat = Some(tableOutputFormat),
+    serde = None,
+    serdeProperties = Map.empty)
+  private lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat)
+  private lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat)
+  private lazy val part3 = CatalogTablePartition(Map("a" -> "5", "b" -> "6"), storageFormat)
   private val funcClass = "org.apache.spark.myFunc"
 
+  // Things subclasses should override
+  protected val tableInputFormat: String = "org.apache.park.serde.MyInputFormat"
+  protected val tableOutputFormat: String = "org.apache.park.serde.MyOutputFormat"
+  protected def newUriForDatabase(): String = "uri"
+  protected def resetState(): Unit = { }
   protected def newEmptyCatalog(): Catalog
 
+  // Clear all state after each test
+  override def afterEach(): Unit = {
+    try {
+      resetState()
+    } finally {
+      super.afterEach()
+    }
+  }
+
   /**
    * Creates a basic catalog, with the following structure:
    *
+   * default
    * db1
    * db2
    *   - tbl1
@@ -48,37 +70,65 @@ abstract class CatalogTestCases extends SparkFunSuite {
    */
   private def newBasicCatalog(): Catalog = {
     val catalog = newEmptyCatalog()
+    // When testing against a real catalog, the default database may already exist
+    catalog.createDatabase(newDb("default"), ignoreIfExists = true)
     catalog.createDatabase(newDb("db1"), ignoreIfExists = false)
     catalog.createDatabase(newDb("db2"), ignoreIfExists = false)
-    catalog.createTable("db2", newTable("tbl1"), ignoreIfExists = false)
-    catalog.createTable("db2", newTable("tbl2"), ignoreIfExists = false)
+    catalog.createTable("db2", newTable("tbl1", "db2"), ignoreIfExists = false)
+    catalog.createTable("db2", newTable("tbl2", "db2"), ignoreIfExists = false)
     catalog.createPartitions("db2", "tbl2", Seq(part1, part2), ignoreIfExists = false)
-    catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = false)
+    catalog.createFunction("db2", newFunc("func1"))
     catalog
   }
 
-  private def newFunc(): Function = Function("funcname", funcClass)
+  private def newFunc(): CatalogFunction = CatalogFunction("funcname", funcClass)
+
+  private def newDb(name: String): CatalogDatabase = {
+    CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty)
+  }
+
+  private def newTable(name: String, db: String): CatalogTable = {
+    CatalogTable(
+      specifiedDatabase = Some(db),
+      name = name,
+      tableType = CatalogTableType.EXTERNAL_TABLE,
+      storage = storageFormat,
+      schema = Seq(CatalogColumn("col1", "int"), CatalogColumn("col2", "string")),
+      partitionColumns = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string")))
+  }
 
-  private def newDb(name: String = "default"): Database =
-    Database(name, name + " description", "uri", Map.empty)
+  private def newFunc(name: String): CatalogFunction = CatalogFunction(name, funcClass)
 
-  private def newTable(name: String): Table =
-    Table(name, "", Seq.empty, Seq.empty, Seq.empty, null, 0, Map.empty, "EXTERNAL_TABLE", 0, 0,
-      None, None)
+  /**
+   * Whether the catalog's table partitions equal the ones given.
+   * Note: Hive sets some random serde things, so we just compare the specs here.
+   */
+  private def catalogPartitionsEqual(
+      catalog: Catalog,
+      db: String,
+      table: String,
+      parts: Seq[CatalogTablePartition]): Boolean = {
+    catalog.listPartitions(db, table).map(_.spec).toSet == parts.map(_.spec).toSet
+  }
 
-  private def newFunc(name: String): Function = Function(name, funcClass)
 
   // --------------------------------------------------------------------------
   // Databases
   // --------------------------------------------------------------------------
 
-  test("basic create, drop and list databases") {
+  test("basic create and list databases") {
     val catalog = newEmptyCatalog()
-    catalog.createDatabase(newDb(), ignoreIfExists = false)
-    assert(catalog.listDatabases().toSet == Set("default"))
-
-    catalog.createDatabase(newDb("default2"), ignoreIfExists = false)
-    assert(catalog.listDatabases().toSet == Set("default", "default2"))
+    catalog.createDatabase(newDb("default"), ignoreIfExists = true)
+    assert(catalog.databaseExists("default"))
+    assert(!catalog.databaseExists("testing"))
+    assert(!catalog.databaseExists("testing2"))
+    catalog.createDatabase(newDb("testing"), ignoreIfExists = false)
+    assert(catalog.databaseExists("testing"))
+    assert(catalog.listDatabases().toSet == Set("default", "testing"))
+    catalog.createDatabase(newDb("testing2"), ignoreIfExists = false)
+    assert(catalog.listDatabases().toSet == Set("default", "testing", "testing2"))
+    assert(catalog.databaseExists("testing2"))
+    assert(!catalog.databaseExists("does_not_exist"))
   }
 
   test("get database when a database exists") {
@@ -93,7 +143,7 @@ abstract class CatalogTestCases extends SparkFunSuite {
 
   test("list databases without pattern") {
     val catalog = newBasicCatalog()
-    assert(catalog.listDatabases().toSet == Set("db1", "db2"))
+    assert(catalog.listDatabases().toSet == Set("default", "db1", "db2"))
   }
 
   test("list databases with pattern") {
@@ -107,7 +157,7 @@ abstract class CatalogTestCases extends SparkFunSuite {
   test("drop database") {
     val catalog = newBasicCatalog()
     catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = false)
-    assert(catalog.listDatabases().toSet == Set("db2"))
+    assert(catalog.listDatabases().toSet == Set("default", "db2"))
   }
 
   test("drop database when the database is not empty") {
@@ -118,6 +168,7 @@ abstract class CatalogTestCases extends SparkFunSuite {
     intercept[AnalysisException] {
       catalog1.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
     }
+    resetState()
 
     // Throw exception if there are tables left
     val catalog2 = newBasicCatalog()
@@ -125,11 +176,12 @@ abstract class CatalogTestCases extends SparkFunSuite {
     intercept[AnalysisException] {
       catalog2.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
     }
+    resetState()
 
     // When cascade is true, it should drop them
     val catalog3 = newBasicCatalog()
     catalog3.dropDatabase("db2", ignoreIfNotExists = false, cascade = true)
-    assert(catalog3.listDatabases().toSet == Set("db1"))
+    assert(catalog3.listDatabases().toSet == Set("default", "db1"))
   }
 
   test("drop database when the database does not exist") {
@@ -144,13 +196,19 @@ abstract class CatalogTestCases extends SparkFunSuite {
 
   test("alter database") {
     val catalog = newBasicCatalog()
-    catalog.alterDatabase("db1", Database("db1", "new description", "lll", Map.empty))
-    assert(catalog.getDatabase("db1").description == "new description")
+    val db1 = catalog.getDatabase("db1")
+    // Note: alter properties here because Hive does not support altering other fields
+    catalog.alterDatabase(db1.copy(properties = Map("k" -> "v3", "good" -> "true")))
+    val newDb1 = catalog.getDatabase("db1")
+    assert(db1.properties.isEmpty)
+    assert(newDb1.properties.size == 2)
+    assert(newDb1.properties.get("k") == Some("v3"))
+    assert(newDb1.properties.get("good") == Some("true"))
   }
 
   test("alter database should throw exception when the database does not exist") {
     intercept[AnalysisException] {
-      newBasicCatalog().alterDatabase("no_db", Database("no_db", "ddd", "lll", Map.empty))
+      newBasicCatalog().alterDatabase(newDb("does_not_exist"))
     }
   }
 
@@ -165,61 +223,56 @@ abstract class CatalogTestCases extends SparkFunSuite {
     assert(catalog.listTables("db2").toSet == Set("tbl2"))
   }
 
-  test("drop table when database / table does not exist") {
+  test("drop table when database/table does not exist") {
     val catalog = newBasicCatalog()
-
     // Should always throw exception when the database does not exist
     intercept[AnalysisException] {
       catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = false)
     }
-
     intercept[AnalysisException] {
       catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = true)
     }
-
     // Should throw exception when the table does not exist, if ignoreIfNotExists is false
     intercept[AnalysisException] {
       catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = false)
     }
-
     catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = true)
   }
 
   test("rename table") {
     val catalog = newBasicCatalog()
-
     assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
     catalog.renameTable("db2", "tbl1", "tblone")
     assert(catalog.listTables("db2").toSet == Set("tblone", "tbl2"))
   }
 
-  test("rename table when database / table does not exist") {
+  test("rename table when database/table does not exist") {
     val catalog = newBasicCatalog()
-
-    intercept[AnalysisException] {  // Throw exception when the database does not exist
+    intercept[AnalysisException] {
       catalog.renameTable("unknown_db", "unknown_table", "unknown_table")
     }
-
-    intercept[AnalysisException] {  // Throw exception when the table does not exist
+    intercept[AnalysisException] {
       catalog.renameTable("db2", "unknown_table", "unknown_table")
     }
   }
 
   test("alter table") {
     val catalog = newBasicCatalog()
-    catalog.alterTable("db2", "tbl1", newTable("tbl1").copy(createTime = 10))
-    assert(catalog.getTable("db2", "tbl1").createTime == 10)
+    val tbl1 = catalog.getTable("db2", "tbl1")
+    catalog.alterTable("db2", tbl1.copy(properties = Map("toh" -> "frem")))
+    val newTbl1 = catalog.getTable("db2", "tbl1")
+    assert(!tbl1.properties.contains("toh"))
+    assert(newTbl1.properties.size == tbl1.properties.size + 1)
+    assert(newTbl1.properties.get("toh") == Some("frem"))
   }
 
-  test("alter table when database / table does not exist") {
+  test("alter table when database/table does not exist") {
     val catalog = newBasicCatalog()
-
-    intercept[AnalysisException] {  // Throw exception when the database does not exist
-      catalog.alterTable("unknown_db", "unknown_table", newTable("unknown_table"))
+    intercept[AnalysisException] {
+      catalog.alterTable("unknown_db", newTable("tbl1", "unknown_db"))
     }
-
-    intercept[AnalysisException] {  // Throw exception when the table does not exist
-      catalog.alterTable("db2", "unknown_table", newTable("unknown_table"))
+    intercept[AnalysisException] {
+      catalog.alterTable("db2", newTable("unknown_table", "db2"))
     }
   }
 
@@ -227,12 +280,11 @@ abstract class CatalogTestCases extends SparkFunSuite {
     assert(newBasicCatalog().getTable("db2", "tbl1").name == "tbl1")
   }
 
-  test("get table when database / table does not exist") {
+  test("get table when database/table does not exist") {
     val catalog = newBasicCatalog()
     intercept[AnalysisException] {
       catalog.getTable("unknown_db", "unknown_table")
     }
-
     intercept[AnalysisException] {
       catalog.getTable("db2", "unknown_table")
     }
@@ -246,10 +298,7 @@ abstract class CatalogTestCases extends SparkFunSuite {
 
   test("list tables with pattern") {
     val catalog = newBasicCatalog()
-
-    // Test when database does not exist
     intercept[AnalysisException] { catalog.listTables("unknown_db") }
-
     assert(catalog.listTables("db1", "*").toSet == Set.empty)
     assert(catalog.listTables("db2", "*").toSet == Set("tbl1", "tbl2"))
     assert(catalog.listTables("db2", "tbl*").toSet == Set("tbl1", "tbl2"))
@@ -263,12 +312,12 @@ abstract class CatalogTestCases extends SparkFunSuite {
   test("basic create and list partitions") {
     val catalog = newEmptyCatalog()
     catalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
-    catalog.createTable("mydb", newTable("mytbl"), ignoreIfExists = false)
-    catalog.createPartitions("mydb", "mytbl", Seq(part1, part2), ignoreIfExists = false)
-    assert(catalog.listPartitions("mydb", "mytbl").toSet == Set(part1, part2))
+    catalog.createTable("mydb", newTable("tbl", "mydb"), ignoreIfExists = false)
+    catalog.createPartitions("mydb", "tbl", Seq(part1, part2), ignoreIfExists = false)
+    assert(catalogPartitionsEqual(catalog, "mydb", "tbl", Seq(part1, part2)))
   }
 
-  test("create partitions when database / table does not exist") {
+  test("create partitions when database/table does not exist") {
     val catalog = newBasicCatalog()
     intercept[AnalysisException] {
       catalog.createPartitions("does_not_exist", "tbl1", Seq(), ignoreIfExists = false)
@@ -288,16 +337,17 @@ abstract class CatalogTestCases extends SparkFunSuite {
 
   test("drop partitions") {
     val catalog = newBasicCatalog()
-    assert(catalog.listPartitions("db2", "tbl2").toSet == Set(part1, part2))
+    assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2)))
     catalog.dropPartitions("db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false)
-    assert(catalog.listPartitions("db2", "tbl2").toSet == Set(part2))
+    assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part2)))
+    resetState()
     val catalog2 = newBasicCatalog()
-    assert(catalog2.listPartitions("db2", "tbl2").toSet == Set(part1, part2))
+    assert(catalogPartitionsEqual(catalog2, "db2", "tbl2", Seq(part1, part2)))
     catalog2.dropPartitions("db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false)
     assert(catalog2.listPartitions("db2", "tbl2").isEmpty)
   }
 
-  test("drop partitions when database / table does not exist") {
+  test("drop partitions when database/table does not exist") {
     val catalog = newBasicCatalog()
     intercept[AnalysisException] {
       catalog.dropPartitions("does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false)
@@ -317,14 +367,14 @@ abstract class CatalogTestCases extends SparkFunSuite {
 
   test("get partition") {
     val catalog = newBasicCatalog()
-    assert(catalog.getPartition("db2", "tbl2", part1.spec) == part1)
-    assert(catalog.getPartition("db2", "tbl2", part2.spec) == part2)
+    assert(catalog.getPartition("db2", "tbl2", part1.spec).spec == part1.spec)
+    assert(catalog.getPartition("db2", "tbl2", part2.spec).spec == part2.spec)
     intercept[AnalysisException] {
       catalog.getPartition("db2", "tbl1", part3.spec)
     }
   }
 
-  test("get partition when database / table does not exist") {
+  test("get partition when database/table does not exist") {
     val catalog = newBasicCatalog()
     intercept[AnalysisException] {
       catalog.getPartition("does_not_exist", "tbl1", part1.spec)
@@ -334,28 +384,69 @@ abstract class CatalogTestCases extends SparkFunSuite {
     }
   }
 
-  test("alter partitions") {
+  test("rename partitions") {
+    val catalog = newBasicCatalog()
+    val newPart1 = part1.copy(spec = Map("a" -> "100", "b" -> "101"))
+    val newPart2 = part2.copy(spec = Map("a" -> "200", "b" -> "201"))
+    val newSpecs = Seq(newPart1.spec, newPart2.spec)
+    catalog.renamePartitions("db2", "tbl2", Seq(part1.spec, part2.spec), newSpecs)
+    assert(catalog.getPartition("db2", "tbl2", newPart1.spec).spec === newPart1.spec)
+    assert(catalog.getPartition("db2", "tbl2", newPart2.spec).spec === newPart2.spec)
+    // The old partitions should no longer exist
+    intercept[AnalysisException] { catalog.getPartition("db2", "tbl2", part1.spec) }
+    intercept[AnalysisException] { catalog.getPartition("db2", "tbl2", part2.spec) }
+  }
+
+  test("rename partitions when database/table does not exist") {
     val catalog = newBasicCatalog()
-    val partSameSpec = part1.copy(storage = storageFormat.copy(serde = "myserde"))
-    val partNewSpec = part1.copy(spec = Map("x" -> "10"))
-    // alter but keep spec the same
-    catalog.alterPartition("db2", "tbl2", part1.spec, partSameSpec)
-    assert(catalog.getPartition("db2", "tbl2", part1.spec) == partSameSpec)
-    // alter and change spec
-    catalog.alterPartition("db2", "tbl2", part1.spec, partNewSpec)
     intercept[AnalysisException] {
-      catalog.getPartition("db2", "tbl2", part1.spec)
+      catalog.renamePartitions("does_not_exist", "tbl1", Seq(part1.spec), Seq(part2.spec))
+    }
+    intercept[AnalysisException] {
+      catalog.renamePartitions("db2", "does_not_exist", Seq(part1.spec), Seq(part2.spec))
     }
-    assert(catalog.getPartition("db2", "tbl2", partNewSpec.spec) == partNewSpec)
   }
 
-  test("alter partition when database / table does not exist") {
+  test("alter partitions") {
+    val catalog = newBasicCatalog()
+    try{
+      // Note: Before altering table partitions in Hive, you *must* set the current database
+      // to the one that contains the table of interest. Otherwise you will end up with the
+      // most helpful error message ever: "Unable to alter partition. alter is not possible."
+      // See HIVE-2742 for more detail.
+      catalog.setCurrentDatabase("db2")
+      val newLocation = newUriForDatabase()
+      // alter but keep spec the same
+      val oldPart1 = catalog.getPartition("db2", "tbl2", part1.spec)
+      val oldPart2 = catalog.getPartition("db2", "tbl2", part2.spec)
+      catalog.alterPartitions("db2", "tbl2", Seq(
+        oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocation))),
+        oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocation)))))
+      val newPart1 = catalog.getPartition("db2", "tbl2", part1.spec)
+      val newPart2 = catalog.getPartition("db2", "tbl2", part2.spec)
+      assert(newPart1.storage.locationUri == Some(newLocation))
+      assert(newPart2.storage.locationUri == Some(newLocation))
+      assert(oldPart1.storage.locationUri != Some(newLocation))
+      assert(oldPart2.storage.locationUri != Some(newLocation))
+      // alter but change spec, should fail because new partition specs do not exist yet
+      val badPart1 = part1.copy(spec = Map("a" -> "v1", "b" -> "v2"))
+      val badPart2 = part2.copy(spec = Map("a" -> "v3", "b" -> "v4"))
+      intercept[AnalysisException] {
+        catalog.alterPartitions("db2", "tbl2", Seq(badPart1, badPart2))
+      }
+    } finally {
+      // Remember to restore the original current database, which we assume to be "default"
+      catalog.setCurrentDatabase("default")
+    }
+  }
+
+  test("alter partitions when database/table does not exist") {
     val catalog = newBasicCatalog()
     intercept[AnalysisException] {
-      catalog.alterPartition("does_not_exist", "tbl1", part1.spec, part1)
+      catalog.alterPartitions("does_not_exist", "tbl1", Seq(part1))
     }
     intercept[AnalysisException] {
-      catalog.alterPartition("db2", "does_not_exist", part1.spec, part1)
+      catalog.alterPartitions("db2", "does_not_exist", Seq(part1))
     }
   }
 
@@ -366,23 +457,22 @@ abstract class CatalogTestCases extends SparkFunSuite {
   test("basic create and list functions") {
     val catalog = newEmptyCatalog()
     catalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
-    catalog.createFunction("mydb", newFunc("myfunc"), ignoreIfExists = false)
+    catalog.createFunction("mydb", newFunc("myfunc"))
     assert(catalog.listFunctions("mydb", "*").toSet == Set("myfunc"))
   }
 
   test("create function when database does not exist") {
     val catalog = newBasicCatalog()
     intercept[AnalysisException] {
-      catalog.createFunction("does_not_exist", newFunc(), ignoreIfExists = false)
+      catalog.createFunction("does_not_exist", newFunc())
     }
   }
 
   test("create function that already exists") {
     val catalog = newBasicCatalog()
     intercept[AnalysisException] {
-      catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = false)
+      catalog.createFunction("db2", newFunc("func1"))
     }
-    catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = true)
   }
 
   test("drop function") {
@@ -421,31 +511,43 @@ abstract class CatalogTestCases extends SparkFunSuite {
     }
   }
 
-  test("alter function") {
+  test("rename function") {
     val catalog = newBasicCatalog()
+    val newName = "funcky"
     assert(catalog.getFunction("db2", "func1").className == funcClass)
-    // alter func but keep name
-    catalog.alterFunction("db2", "func1", newFunc("func1").copy(className = "muhaha"))
-    assert(catalog.getFunction("db2", "func1").className == "muhaha")
-    // alter func and change name
-    catalog.alterFunction("db2", "func1", newFunc("funcky"))
+    catalog.renameFunction("db2", "func1", newName)
+    intercept[AnalysisException] { catalog.getFunction("db2", "func1") }
+    assert(catalog.getFunction("db2", newName).name == newName)
+    assert(catalog.getFunction("db2", newName).className == funcClass)
+    intercept[AnalysisException] { catalog.renameFunction("db2", "does_not_exist", "me") }
+  }
+
+  test("rename function when database does not exist") {
+    val catalog = newBasicCatalog()
     intercept[AnalysisException] {
-      catalog.getFunction("db2", "func1")
+      catalog.renameFunction("does_not_exist", "func1", "func5")
     }
-    assert(catalog.getFunction("db2", "funcky").className == funcClass)
+  }
+
+  test("alter function") {
+    val catalog = newBasicCatalog()
+    assert(catalog.getFunction("db2", "func1").className == funcClass)
+    catalog.alterFunction("db2", newFunc("func1").copy(className = "muhaha"))
+    assert(catalog.getFunction("db2", "func1").className == "muhaha")
+    intercept[AnalysisException] { catalog.alterFunction("db2", newFunc("funcky")) }
   }
 
   test("alter function when database does not exist") {
     val catalog = newBasicCatalog()
     intercept[AnalysisException] {
-      catalog.alterFunction("does_not_exist", "func1", newFunc())
+      catalog.alterFunction("does_not_exist", newFunc())
     }
   }
 
   test("list functions") {
     val catalog = newBasicCatalog()
-    catalog.createFunction("db2", newFunc("func2"), ignoreIfExists = false)
-    catalog.createFunction("db2", newFunc("not_me"), ignoreIfExists = false)
+    catalog.createFunction("db2", newFunc("func2"))
+    catalog.createFunction("db2", newFunc("not_me"))
     assert(catalog.listFunctions("db2", "*").toSet == Set("func1", "func2", "not_me"))
     assert(catalog.listFunctions("db2", "func*").toSet == Set("func1", "func2"))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
new file mode 100644
index 0000000..21b9cfb
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.hive.ql.metadata.HiveException
+import org.apache.thrift.TException
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.NoSuchItemException
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.hive.client.HiveClient
+
+
+/**
+ * A persistent implementation of the system catalog using Hive.
+ * All public methods must be synchronized for thread-safety.
+ */
+private[spark] class HiveCatalog(client: HiveClient) extends Catalog with Logging {
+  import Catalog._
+
+  // Exceptions thrown by the hive client that we would like to wrap
+  private val clientExceptions = Set(
+    classOf[HiveException].getCanonicalName,
+    classOf[TException].getCanonicalName)
+
+  /**
+   * Whether this is an exception thrown by the hive client that should be wrapped.
+   *
+   * Due to classloader isolation issues, pattern matching won't work here so we need
+   * to compare the canonical names of the exceptions, which we assume to be stable.
+   */
+  private def isClientException(e: Throwable): Boolean = {
+    var temp: Class[_] = e.getClass
+    var found = false
+    while (temp != null && !found) {
+      found = clientExceptions.contains(temp.getCanonicalName)
+      temp = temp.getSuperclass
+    }
+    found
+  }
+
+  /**
+   * Run some code involving `client` in a [[synchronized]] block and wrap certain
+   * exceptions thrown in the process in [[AnalysisException]].
+   */
+  private def withClient[T](body: => T): T = synchronized {
+    try {
+      body
+    } catch {
+      case e: NoSuchItemException =>
+        throw new AnalysisException(e.getMessage)
+      case NonFatal(e) if isClientException(e) =>
+        throw new AnalysisException(e.getClass.getCanonicalName + ": " + e.getMessage)
+    }
+  }
+
+  private def requireDbMatches(db: String, table: CatalogTable): Unit = {
+    if (table.specifiedDatabase != Some(db)) {
+      throw new AnalysisException(
+        s"Provided database $db does not much the one specified in the " +
+        s"table definition (${table.specifiedDatabase.getOrElse("n/a")})")
+    }
+  }
+
+  private def requireTableExists(db: String, table: String): Unit = {
+    withClient { getTable(db, table) }
+  }
+
+
+  // --------------------------------------------------------------------------
+  // Databases
+  // --------------------------------------------------------------------------
+
+  override def createDatabase(
+      dbDefinition: CatalogDatabase,
+      ignoreIfExists: Boolean): Unit = withClient {
+    client.createDatabase(dbDefinition, ignoreIfExists)
+  }
+
+  override def dropDatabase(
+      db: String,
+      ignoreIfNotExists: Boolean,
+      cascade: Boolean): Unit = withClient {
+    client.dropDatabase(db, ignoreIfNotExists, cascade)
+  }
+
+  /**
+   * Alter a database whose name matches the one specified in `dbDefinition`,
+   * assuming the database exists.
+   *
+   * Note: As of now, this only supports altering database properties!
+   */
+  override def alterDatabase(dbDefinition: CatalogDatabase): Unit = withClient {
+    val existingDb = getDatabase(dbDefinition.name)
+    if (existingDb.properties == dbDefinition.properties) {
+      logWarning(s"Request to alter database ${dbDefinition.name} is a no-op because " +
+        s"the provided database properties are the same as the old ones. Hive does not " +
+        s"currently support altering other database fields.")
+    }
+    client.alterDatabase(dbDefinition)
+  }
+
+  override def getDatabase(db: String): CatalogDatabase = withClient {
+    client.getDatabase(db)
+  }
+
+  override def databaseExists(db: String): Boolean = withClient {
+    client.getDatabaseOption(db).isDefined
+  }
+
+  override def listDatabases(): Seq[String] = withClient {
+    client.listDatabases("*")
+  }
+
+  override def listDatabases(pattern: String): Seq[String] = withClient {
+    client.listDatabases(pattern)
+  }
+
+  override def setCurrentDatabase(db: String): Unit = withClient {
+    client.setCurrentDatabase(db)
+  }
+
+  // --------------------------------------------------------------------------
+  // Tables
+  // --------------------------------------------------------------------------
+
+  override def createTable(
+      db: String,
+      tableDefinition: CatalogTable,
+      ignoreIfExists: Boolean): Unit = withClient {
+    requireDbExists(db)
+    requireDbMatches(db, tableDefinition)
+    client.createTable(tableDefinition, ignoreIfExists)
+  }
+
+  override def dropTable(
+      db: String,
+      table: String,
+      ignoreIfNotExists: Boolean): Unit = withClient {
+    requireDbExists(db)
+    client.dropTable(db, table, ignoreIfNotExists)
+  }
+
+  override def renameTable(db: String, oldName: String, newName: String): Unit = withClient {
+    val newTable = client.getTable(db, oldName).copy(name = newName)
+    client.alterTable(oldName, newTable)
+  }
+
+  /**
+   * Alter a table whose name that matches the one specified in `tableDefinition`,
+   * assuming the table exists.
+   *
+   * Note: As of now, this only supports altering table properties, serde properties,
+   * and num buckets!
+   */
+  override def alterTable(db: String, tableDefinition: CatalogTable): Unit = withClient {
+    requireDbMatches(db, tableDefinition)
+    requireTableExists(db, tableDefinition.name)
+    client.alterTable(tableDefinition)
+  }
+
+  override def getTable(db: String, table: String): CatalogTable = withClient {
+    client.getTable(db, table)
+  }
+
+  override def listTables(db: String): Seq[String] = withClient {
+    requireDbExists(db)
+    client.listTables(db)
+  }
+
+  override def listTables(db: String, pattern: String): Seq[String] = withClient {
+    requireDbExists(db)
+    client.listTables(db, pattern)
+  }
+
+  // --------------------------------------------------------------------------
+  // Partitions
+  // --------------------------------------------------------------------------
+
+  override def createPartitions(
+      db: String,
+      table: String,
+      parts: Seq[CatalogTablePartition],
+      ignoreIfExists: Boolean): Unit = withClient {
+    requireTableExists(db, table)
+    client.createPartitions(db, table, parts, ignoreIfExists)
+  }
+
+  override def dropPartitions(
+      db: String,
+      table: String,
+      parts: Seq[TablePartitionSpec],
+      ignoreIfNotExists: Boolean): Unit = withClient {
+    requireTableExists(db, table)
+    // Note: Unfortunately Hive does not currently support `ignoreIfNotExists` so we
+    // need to implement it here ourselves. This is currently somewhat expensive because
+    // we make multiple synchronous calls to Hive for each partition we want to drop.
+    val partsToDrop =
+      if (ignoreIfNotExists) {
+        parts.filter { spec =>
+          try {
+            getPartition(db, table, spec)
+            true
+          } catch {
+            // Filter out the partitions that do not actually exist
+            case _: AnalysisException => false
+          }
+        }
+      } else {
+        parts
+      }
+    if (partsToDrop.nonEmpty) {
+      client.dropPartitions(db, table, partsToDrop)
+    }
+  }
+
+  override def renamePartitions(
+      db: String,
+      table: String,
+      specs: Seq[TablePartitionSpec],
+      newSpecs: Seq[TablePartitionSpec]): Unit = withClient {
+    client.renamePartitions(db, table, specs, newSpecs)
+  }
+
+  override def alterPartitions(
+      db: String,
+      table: String,
+      newParts: Seq[CatalogTablePartition]): Unit = withClient {
+    client.alterPartitions(db, table, newParts)
+  }
+
+  override def getPartition(
+      db: String,
+      table: String,
+      spec: TablePartitionSpec): CatalogTablePartition = withClient {
+    client.getPartition(db, table, spec)
+  }
+
+  override def listPartitions(
+      db: String,
+      table: String): Seq[CatalogTablePartition] = withClient {
+    client.getAllPartitions(db, table)
+  }
+
+  // --------------------------------------------------------------------------
+  // Functions
+  // --------------------------------------------------------------------------
+
+  override def createFunction(
+      db: String,
+      funcDefinition: CatalogFunction): Unit = withClient {
+    client.createFunction(db, funcDefinition)
+  }
+
+  override def dropFunction(db: String, name: String): Unit = withClient {
+    client.dropFunction(db, name)
+  }
+
+  override def renameFunction(db: String, oldName: String, newName: String): Unit = withClient {
+    client.renameFunction(db, oldName, newName)
+  }
+
+  override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = withClient {
+    client.alterFunction(db, funcDefinition)
+  }
+
+  override def getFunction(db: String, funcName: String): CatalogFunction = withClient {
+    client.getFunction(db, funcName)
+  }
+
+  override def listFunctions(db: String, pattern: String): Seq[String] = withClient {
+    client.listFunctions(db, pattern)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index c222b00..3788736 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -25,15 +25,16 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.common.StatsSetupConst
 import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.metastore.Warehouse
+import org.apache.hadoop.hive.metastore.{TableType => HiveTableType, Warehouse}
 import org.apache.hadoop.hive.metastore.api.FieldSchema
-import org.apache.hadoop.hive.ql.metadata._
+import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable, _}
 import org.apache.hadoop.hive.ql.plan.TableDesc
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.{AnalysisException, SaveMode, SQLContext}
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog}
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -96,6 +97,8 @@ private[hive] object HiveSerDe {
   }
 }
 
+
+// TODO: replace this with o.a.s.sql.hive.HiveCatalog once we merge SQLContext and HiveContext
 private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveContext)
   extends Catalog with Logging {
 
@@ -107,16 +110,16 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
   /** A fully qualified identifier for a table (i.e., database.tableName) */
   case class QualifiedTableName(database: String, name: String)
 
-  private def getQualifiedTableName(tableIdent: TableIdentifier) = {
+  private def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = {
     QualifiedTableName(
       tableIdent.database.getOrElse(client.currentDatabase).toLowerCase,
       tableIdent.table.toLowerCase)
   }
 
-  private def getQualifiedTableName(hiveTable: HiveTable) = {
+  private def getQualifiedTableName(t: CatalogTable): QualifiedTableName = {
     QualifiedTableName(
-      hiveTable.specifiedDatabase.getOrElse(client.currentDatabase).toLowerCase,
-      hiveTable.name.toLowerCase)
+      t.specifiedDatabase.getOrElse(client.currentDatabase).toLowerCase,
+      t.name.toLowerCase)
   }
 
   /** A cache of Spark SQL data source tables that have been accessed. */
@@ -175,7 +178,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
 
         // It does not appear that the ql client for the metastore has a way to enumerate all the
         // SerDe properties directly...
-        val options = table.serdeProperties
+        val options = table.storage.serdeProperties
 
         val resolvedRelation =
           ResolvedDataSource(
@@ -276,53 +279,54 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
 
     val tableType = if (isExternal) {
       tableProperties.put("EXTERNAL", "TRUE")
-      ExternalTable
+      CatalogTableType.EXTERNAL_TABLE
     } else {
       tableProperties.put("EXTERNAL", "FALSE")
-      ManagedTable
+      CatalogTableType.MANAGED_TABLE
     }
 
     val maybeSerDe = HiveSerDe.sourceToSerDe(provider, hive.hiveconf)
     val dataSource = ResolvedDataSource(
       hive, userSpecifiedSchema, partitionColumns, bucketSpec, provider, options)
 
-    def newSparkSQLSpecificMetastoreTable(): HiveTable = {
-      HiveTable(
+    def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
+      CatalogTable(
         specifiedDatabase = Option(dbName),
         name = tblName,
-        schema = Nil,
-        partitionColumns = Nil,
         tableType = tableType,
-        properties = tableProperties.toMap,
-        serdeProperties = options)
+        schema = Nil,
+        storage = CatalogStorageFormat(
+          locationUri = None,
+          inputFormat = None,
+          outputFormat = None,
+          serde = None,
+          serdeProperties = options
+        ),
+        properties = tableProperties.toMap)
     }
 
-    def newHiveCompatibleMetastoreTable(relation: HadoopFsRelation, serde: HiveSerDe): HiveTable = {
-      def schemaToHiveColumn(schema: StructType): Seq[HiveColumn] = {
-        schema.map { field =>
-          HiveColumn(
-            name = field.name,
-            hiveType = HiveMetastoreTypes.toMetastoreType(field.dataType),
-            comment = "")
-        }
-      }
-
+    def newHiveCompatibleMetastoreTable(
+        relation: HadoopFsRelation,
+        serde: HiveSerDe): CatalogTable = {
       assert(partitionColumns.isEmpty)
       assert(relation.partitionColumns.isEmpty)
 
-      HiveTable(
+      CatalogTable(
         specifiedDatabase = Option(dbName),
         name = tblName,
-        schema = schemaToHiveColumn(relation.schema),
-        partitionColumns = Nil,
         tableType = tableType,
+        storage = CatalogStorageFormat(
+          locationUri = Some(relation.paths.head),
+          inputFormat = serde.inputFormat,
+          outputFormat = serde.outputFormat,
+          serde = serde.serde,
+          serdeProperties = options
+        ),
+        schema = relation.schema.map { f =>
+          CatalogColumn(f.name, HiveMetastoreTypes.toMetastoreType(f.dataType))
+        },
         properties = tableProperties.toMap,
-        serdeProperties = options,
-        location = Some(relation.paths.head),
-        viewText = None, // TODO We need to place the SQL string here.
-        inputFormat = serde.inputFormat,
-        outputFormat = serde.outputFormat,
-        serde = serde.serde)
+        viewText = None) // TODO: We need to place the SQL string here
     }
 
     // TODO: Support persisting partitioned data source relations in Hive compatible format
@@ -379,7 +383,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
         // specific way.
         try {
           logInfo(message)
-          client.createTable(table)
+          client.createTable(table, ignoreIfExists = false)
         } catch {
           case throwable: Throwable =>
             val warningMessage =
@@ -387,20 +391,20 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
                 s"it into Hive metastore in Spark SQL specific format."
             logWarning(warningMessage, throwable)
             val sparkSqlSpecificTable = newSparkSQLSpecificMetastoreTable()
-            client.createTable(sparkSqlSpecificTable)
+            client.createTable(sparkSqlSpecificTable, ignoreIfExists = false)
         }
 
       case (None, message) =>
         logWarning(message)
         val hiveTable = newSparkSQLSpecificMetastoreTable()
-        client.createTable(hiveTable)
+        client.createTable(hiveTable, ignoreIfExists = false)
     }
   }
 
   def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = {
     // Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName)
     val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
-    new Path(new Path(client.getDatabase(dbName).location), tblName).toString
+    new Path(new Path(client.getDatabase(dbName).locationUri), tblName).toString
   }
 
   override def tableExists(tableIdent: TableIdentifier): Boolean = {
@@ -420,7 +424,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
       // Then, if alias is specified, wrap the table with a Subquery using the alias.
       // Otherwise, wrap the table with a Subquery using the table name.
       alias.map(a => SubqueryAlias(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)
-    } else if (table.tableType == VirtualView) {
+    } else if (table.tableType == CatalogTableType.VIRTUAL_VIEW) {
       val viewText = table.viewText.getOrElse(sys.error("Invalid view without text."))
       alias match {
         // because hive use things like `_c0` to build the expanded text
@@ -429,7 +433,8 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
         case Some(aliasText) => SubqueryAlias(aliasText, hive.parseSql(viewText))
       }
     } else {
-      MetastoreRelation(qualifiedTableName.database, qualifiedTableName.name, alias)(table)(hive)
+      MetastoreRelation(
+        qualifiedTableName.database, qualifiedTableName.name, alias)(table, client, hive)
     }
   }
 
@@ -602,16 +607,14 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
         val schema = if (table.schema.nonEmpty) {
           table.schema
         } else {
-          child.output.map {
-            attr => new HiveColumn(
-              attr.name,
-              HiveMetastoreTypes.toMetastoreType(attr.dataType), null)
+          child.output.map { a =>
+            CatalogColumn(a.name, HiveMetastoreTypes.toMetastoreType(a.dataType), a.nullable)
           }
         }
 
         val desc = table.copy(schema = schema)
 
-        if (hive.convertCTAS && table.serde.isEmpty) {
+        if (hive.convertCTAS && table.storage.serde.isEmpty) {
           // Do the conversion when spark.sql.hive.convertCTAS is true and the query
           // does not specify any storage format (file format and storage handler).
           if (table.specifiedDatabase.isDefined) {
@@ -632,9 +635,9 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
             child
           )
         } else {
-          val desc = if (table.serde.isEmpty) {
+          val desc = if (table.storage.serde.isEmpty) {
             // add default serde
-            table.copy(
+            table.withNewStorage(
               serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
           } else {
             table
@@ -744,10 +747,13 @@ private[hive] case class InsertIntoHiveTable(
   }
 }
 
-private[hive] case class MetastoreRelation
-    (databaseName: String, tableName: String, alias: Option[String])
-    (val table: HiveTable)
-    (@transient private val sqlContext: SQLContext)
+private[hive] case class MetastoreRelation(
+    databaseName: String,
+    tableName: String,
+    alias: Option[String])
+    (val table: CatalogTable,
+     @transient private val client: HiveClient,
+     @transient private val sqlContext: SQLContext)
   extends LeafNode with MultiInstanceRelation with FileRelation {
 
   override def equals(other: Any): Boolean = other match {
@@ -765,7 +771,12 @@ private[hive] case class MetastoreRelation
 
   override protected def otherCopyArgs: Seq[AnyRef] = table :: sqlContext :: Nil
 
-  @transient val hiveQlTable: Table = {
+  private def toHiveColumn(c: CatalogColumn): FieldSchema = {
+    new FieldSchema(c.name, c.dataType, c.comment.orNull)
+  }
+
+  // TODO: merge this with HiveClientImpl#toHiveTable
+  @transient val hiveQlTable: HiveTable = {
     // We start by constructing an API table as Hive performs several important transformations
     // internally when converting an API table to a QL table.
     val tTable = new org.apache.hadoop.hive.metastore.api.Table()
@@ -776,27 +787,31 @@ private[hive] case class MetastoreRelation
     tTable.setParameters(tableParameters)
     table.properties.foreach { case (k, v) => tableParameters.put(k, v) }
 
-    tTable.setTableType(table.tableType.name)
+    tTable.setTableType(table.tableType match {
+      case CatalogTableType.EXTERNAL_TABLE => HiveTableType.EXTERNAL_TABLE.toString
+      case CatalogTableType.MANAGED_TABLE => HiveTableType.MANAGED_TABLE.toString
+      case CatalogTableType.INDEX_TABLE => HiveTableType.INDEX_TABLE.toString
+      case CatalogTableType.VIRTUAL_VIEW => HiveTableType.VIRTUAL_VIEW.toString
+    })
 
     val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
     tTable.setSd(sd)
-    sd.setCols(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
-    tTable.setPartitionKeys(
-      table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
+    sd.setCols(table.schema.map(toHiveColumn).asJava)
+    tTable.setPartitionKeys(table.partitionColumns.map(toHiveColumn).asJava)
 
-    table.location.foreach(sd.setLocation)
-    table.inputFormat.foreach(sd.setInputFormat)
-    table.outputFormat.foreach(sd.setOutputFormat)
+    table.storage.locationUri.foreach(sd.setLocation)
+    table.storage.inputFormat.foreach(sd.setInputFormat)
+    table.storage.outputFormat.foreach(sd.setOutputFormat)
 
     val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
-    table.serde.foreach(serdeInfo.setSerializationLib)
+    table.storage.serde.foreach(serdeInfo.setSerializationLib)
     sd.setSerdeInfo(serdeInfo)
 
     val serdeParameters = new java.util.HashMap[String, String]()
-    table.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
+    table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
     serdeInfo.setParameters(serdeParameters)
 
-    new Table(tTable)
+    new HiveTable(tTable)
   }
 
   @transient override lazy val statistics: Statistics = Statistics(
@@ -821,11 +836,11 @@ private[hive] case class MetastoreRelation
 
   // When metastore partition pruning is turned off, we cache the list of all partitions to
   // mimic the behavior of Spark < 1.5
-  lazy val allPartitions = table.getAllPartitions
+  private lazy val allPartitions: Seq[CatalogTablePartition] = client.getAllPartitions(table)
 
   def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
     val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
-      table.getPartitions(predicates)
+      client.getPartitionsByFilter(table, predicates)
     } else {
       allPartitions
     }
@@ -834,23 +849,22 @@ private[hive] case class MetastoreRelation
       val tPartition = new org.apache.hadoop.hive.metastore.api.Partition
       tPartition.setDbName(databaseName)
       tPartition.setTableName(tableName)
-      tPartition.setValues(p.values.asJava)
+      tPartition.setValues(p.spec.values.toList.asJava)
 
       val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
       tPartition.setSd(sd)
-      sd.setCols(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
-
-      sd.setLocation(p.storage.location)
-      sd.setInputFormat(p.storage.inputFormat)
-      sd.setOutputFormat(p.storage.outputFormat)
+      sd.setCols(table.schema.map(toHiveColumn).asJava)
+      p.storage.locationUri.foreach(sd.setLocation)
+      p.storage.inputFormat.foreach(sd.setInputFormat)
+      p.storage.outputFormat.foreach(sd.setOutputFormat)
 
       val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
       sd.setSerdeInfo(serdeInfo)
       // maps and lists should be set only after all elements are ready (see HIVE-7975)
-      serdeInfo.setSerializationLib(p.storage.serde)
+      p.storage.serde.foreach(serdeInfo.setSerializationLib)
 
       val serdeParameters = new java.util.HashMap[String, String]()
-      table.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
+      table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
       p.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
       serdeInfo.setParameters(serdeParameters)
 
@@ -877,10 +891,10 @@ private[hive] case class MetastoreRelation
     hiveQlTable.getMetadata
   )
 
-  implicit class SchemaAttribute(f: HiveColumn) {
+  implicit class SchemaAttribute(f: CatalogColumn) {
     def toAttribute: AttributeReference = AttributeReference(
       f.name,
-      HiveMetastoreTypes.toDataType(f.hiveType),
+      HiveMetastoreTypes.toDataType(f.dataType),
       // Since data can be dumped in randomly with no validation, everything is nullable.
       nullable = true
     )(qualifiers = Seq(alias.getOrElse(tableName)))
@@ -901,19 +915,22 @@ private[hive] case class MetastoreRelation
   val columnOrdinals = AttributeMap(attributes.zipWithIndex)
 
   override def inputFiles: Array[String] = {
-    val partLocations = table.getPartitions(Nil).map(_.storage.location).toArray
+    val partLocations = client
+      .getPartitionsByFilter(table, Nil)
+      .flatMap(_.storage.locationUri)
+      .toArray
     if (partLocations.nonEmpty) {
       partLocations
     } else {
       Array(
-        table.location.getOrElse(
+        table.storage.locationUri.getOrElse(
           sys.error(s"Could not get the location of ${table.qualifiedName}.")))
     }
   }
 
 
   override def newInstance(): MetastoreRelation = {
-    MetastoreRelation(databaseName, tableName, alias)(table)(sqlContext)
+    MetastoreRelation(databaseName, tableName, alias)(table, client, sqlContext)
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org