You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/04/27 06:29:32 UTC

[2/2] spark git commit: [SPARK-13477][SQL] Expose new user-facing Catalog interface

[SPARK-13477][SQL] Expose new user-facing Catalog interface

## What changes were proposed in this pull request?

#12625 exposed a new user-facing conf interface in `SparkSession`. This patch adds a catalog interface.

## How was this patch tested?

See `CatalogSuite`.

Author: Andrew Or <an...@databricks.com>

Closes #12713 from andrewor14/user-facing-catalog.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8a83a56
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8a83a56
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8a83a56

Branch: refs/heads/master
Commit: d8a83a564ff3fd0281007adbf8aa3757da8a2c2b
Parents: d93976d
Author: Andrew Or <an...@databricks.com>
Authored: Tue Apr 26 21:29:25 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Apr 26 21:29:25 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/ScalaReflection.scala    |  26 +-
 .../sql/catalyst/catalog/SessionCatalog.scala   |  36 +-
 .../spark/sql/catalyst/catalog/interface.scala  |   8 +-
 .../catalyst/encoders/ExpressionEncoder.scala   |   5 +-
 .../sql/catalyst/catalog/CatalogTestCases.scala | 580 ------------------
 .../catalyst/catalog/ExternalCatalogSuite.scala | 581 +++++++++++++++++++
 .../catalyst/catalog/InMemoryCatalogSuite.scala |   2 +-
 .../catalyst/catalog/SessionCatalogSuite.scala  |   2 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |  33 +-
 .../org/apache/spark/sql/SparkSession.scala     | 248 +-------
 .../org/apache/spark/sql/catalog/Catalog.scala  | 214 +++++++
 .../apache/spark/sql/catalog/interface.scala    | 101 ++++
 .../spark/sql/execution/SparkSqlParser.scala    |   6 +-
 .../spark/sql/execution/command/cache.scala     |   4 +-
 .../spark/sql/execution/command/commands.scala  |   4 +-
 .../command/createDataSourceTables.scala        |   4 +-
 .../spark/sql/execution/command/ddl.scala       |   8 +-
 .../spark/sql/execution/command/tables.scala    |   2 +-
 .../spark/sql/execution/command/views.scala     |   2 +-
 .../apache/spark/sql/internal/CatalogImpl.scala | 352 +++++++++++
 .../spark/sql/execution/command/DDLSuite.scala  |   2 +-
 .../spark/sql/internal/CatalogSuite.scala       | 271 +++++++++
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |   2 +-
 .../spark/sql/hive/MetastoreRelation.scala      |   8 +-
 .../spark/sql/hive/client/HiveClientImpl.scala  |  16 +-
 .../spark/sql/hive/CachedTableSuite.scala       |   4 +-
 .../spark/sql/hive/HiveDDLCommandSuite.scala    |  18 +-
 .../sql/hive/HiveExternalCatalogSuite.scala     |   2 +-
 .../sql/hive/HiveMetastoreCatalogSuite.scala    |   6 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  10 +-
 .../spark/sql/hive/client/VersionsSuite.scala   |   2 +-
 .../spark/sql/hive/execution/HiveDDLSuite.scala |   6 +-
 32 files changed, 1665 insertions(+), 900 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index bd72313..be67605 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -22,7 +22,15 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
-import org.apache.spark.util.Utils
+
+
+/**
+ * A helper trait to create [[org.apache.spark.sql.catalyst.encoders.ExpressionEncoder]]s
+ * for classes whose fields are entirely defined by constructor params but should not be
+ * case classes.
+ */
+private[sql] trait DefinedByConstructorParams
+
 
 /**
  * A default version of ScalaReflection that uses the runtime universe.
@@ -333,7 +341,7 @@ object ScalaReflection extends ScalaReflection {
           "toScalaMap",
           keyData :: valueData :: Nil)
 
-      case t if t <:< localTypeOf[Product] =>
+      case t if definedByConstructorParams(t) =>
         val params = getConstructorParameters(t)
 
         val cls = getClassFromType(tpe)
@@ -401,7 +409,7 @@ object ScalaReflection extends ScalaReflection {
     val clsName = getClassNameFromType(tpe)
     val walkedTypePath = s"""- root class: "${clsName}"""" :: Nil
     serializerFor(inputObject, tpe, walkedTypePath) match {
-      case expressions.If(_, _, s: CreateNamedStruct) if tpe <:< localTypeOf[Product] => s
+      case expressions.If(_, _, s: CreateNamedStruct) if definedByConstructorParams(tpe) => s
       case other => CreateNamedStruct(expressions.Literal("value") :: other :: Nil)
     }
   }
@@ -491,7 +499,7 @@ object ScalaReflection extends ScalaReflection {
                 serializerFor(unwrapped, optType, newPath))
           }
 
-        case t if t <:< localTypeOf[Product] =>
+        case t if definedByConstructorParams(t) =>
           val params = getConstructorParameters(t)
           val nonNullOutput = CreateNamedStruct(params.flatMap { case (fieldName, fieldType) =>
             val fieldValue = Invoke(inputObject, fieldName, dataTypeFor(fieldType))
@@ -680,7 +688,7 @@ object ScalaReflection extends ScalaReflection {
         val Schema(valueDataType, valueNullable) = schemaFor(valueType)
         Schema(MapType(schemaFor(keyType).dataType,
           valueDataType, valueContainsNull = valueNullable), nullable = true)
-      case t if t <:< localTypeOf[Product] =>
+      case t if definedByConstructorParams(t) =>
         val params = getConstructorParameters(t)
         Schema(StructType(
           params.map { case (fieldName, fieldType) =>
@@ -712,6 +720,14 @@ object ScalaReflection extends ScalaReflection {
         throw new UnsupportedOperationException(s"Schema for type $other is not supported")
     }
   }
+
+  /**
+   * Whether the fields of the given type is defined entirely by its constructor parameters.
+   */
+  private[sql] def definedByConstructorParams(tpe: Type): Boolean = {
+    tpe <:< localTypeOf[Product] || tpe <:< localTypeOf[DefinedByConstructorParams]
+  }
+
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 402aacf..91d35de 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -613,6 +613,25 @@ class SessionCatalog(
   }
 
   /**
+   * Look up the [[ExpressionInfo]] associated with the specified function, assuming it exists.
+   */
+  private[spark] def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo = {
+    // TODO: just make function registry take in FunctionIdentifier instead of duplicating this
+    val qualifiedName = name.copy(database = name.database.orElse(Some(currentDb)))
+    functionRegistry.lookupFunction(name.funcName)
+      .orElse(functionRegistry.lookupFunction(qualifiedName.unquotedString))
+      .getOrElse {
+        val db = qualifiedName.database.get
+        if (externalCatalog.functionExists(db, name.funcName)) {
+          val metadata = externalCatalog.getFunction(db, name.funcName)
+          new ExpressionInfo(metadata.className, qualifiedName.unquotedString)
+        } else {
+          failFunctionLookup(name.funcName)
+        }
+      }
+  }
+
+  /**
    * Return an [[Expression]] that represents the specified function, assuming it exists.
    *
    * For a temporary function or a permanent function that has been loaded,
@@ -646,6 +665,7 @@ class SessionCatalog(
     // The function has not been loaded to the function registry, which means
     // that the function is a permanent function (if it actually has been registered
     // in the metastore). We need to first put the function in the FunctionRegistry.
+    // TODO: why not just check whether the function exists first?
     val catalogFunction = try {
       externalCatalog.getFunction(currentDb, name.funcName)
     } catch {
@@ -662,7 +682,7 @@ class SessionCatalog(
     val builder = makeFunctionBuilder(qualifiedName.unquotedString, catalogFunction.className)
     createTempFunction(qualifiedName.unquotedString, info, builder, ignoreIfExists = false)
     // Now, we need to create the Expression.
-    return functionRegistry.lookupFunction(qualifiedName.unquotedString, children)
+    functionRegistry.lookupFunction(qualifiedName.unquotedString, children)
   }
 
   /**
@@ -687,8 +707,8 @@ class SessionCatalog(
   // -----------------
 
   /**
-   * Drop all existing databases (except "default") along with all associated tables,
-   * partitions and functions, and set the current database to "default".
+   * Drop all existing databases (except "default"), tables, partitions and functions,
+   * and set the current database to "default".
    *
    * This is mainly used for tests.
    */
@@ -697,6 +717,16 @@ class SessionCatalog(
     listDatabases().filter(_ != default).foreach { db =>
       dropDatabase(db, ignoreIfNotExists = false, cascade = true)
     }
+    listTables(default).foreach { table =>
+      dropTable(table, ignoreIfNotExists = false)
+    }
+    listFunctions(default).foreach { func =>
+      if (func.database.isDefined) {
+        dropFunction(func, ignoreIfNotExists = false)
+      } else {
+        dropTempFunction(func.funcName, ignoreIfNotExists = false)
+      }
+    }
     tempTables.clear()
     functionRegistry.clear()
     // restore built-in functions

http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 9e90987..d1e2b3f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -299,10 +299,10 @@ case class CatalogTable(
 
 case class CatalogTableType private(name: String)
 object CatalogTableType {
-  val EXTERNAL_TABLE = new CatalogTableType("EXTERNAL_TABLE")
-  val MANAGED_TABLE = new CatalogTableType("MANAGED_TABLE")
-  val INDEX_TABLE = new CatalogTableType("INDEX_TABLE")
-  val VIRTUAL_VIEW = new CatalogTableType("VIRTUAL_VIEW")
+  val EXTERNAL = new CatalogTableType("EXTERNAL")
+  val MANAGED = new CatalogTableType("MANAGED")
+  val INDEX = new CatalogTableType("INDEX")
+  val VIEW = new CatalogTableType("VIEW")
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
index 56d29cf..5d29448 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
@@ -47,8 +47,9 @@ object ExpressionEncoder {
   def apply[T : TypeTag](): ExpressionEncoder[T] = {
     // We convert the not-serializable TypeTag into StructType and ClassTag.
     val mirror = typeTag[T].mirror
-    val cls = mirror.runtimeClass(typeTag[T].tpe)
-    val flat = !classOf[Product].isAssignableFrom(cls)
+    val tpe = typeTag[T].tpe
+    val cls = mirror.runtimeClass(tpe)
+    val flat = !ScalaReflection.definedByConstructorParams(tpe)
 
     val inputObject = BoundReference(0, ScalaReflection.dataTypeFor[T], nullable = false)
     val serializer = ScalaReflection.serializerFor[T](inputObject)

http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
deleted file mode 100644
index f961fe3..0000000
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
+++ /dev/null
@@ -1,580 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.catalog
-
-import org.scalatest.BeforeAndAfterEach
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.util.Utils
-
-
-/**
- * A reasonable complete test suite (i.e. behaviors) for a [[ExternalCatalog]].
- *
- * Implementations of the [[ExternalCatalog]] interface can create test suites by extending this.
- */
-abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
-  protected val utils: CatalogTestUtils
-  import utils._
-
-  protected def resetState(): Unit = { }
-
-  // Clear all state after each test
-  override def afterEach(): Unit = {
-    try {
-      resetState()
-    } finally {
-      super.afterEach()
-    }
-  }
-
-  // --------------------------------------------------------------------------
-  // Databases
-  // --------------------------------------------------------------------------
-
-  test("basic create and list databases") {
-    val catalog = newEmptyCatalog()
-    catalog.createDatabase(newDb("default"), ignoreIfExists = true)
-    assert(catalog.databaseExists("default"))
-    assert(!catalog.databaseExists("testing"))
-    assert(!catalog.databaseExists("testing2"))
-    catalog.createDatabase(newDb("testing"), ignoreIfExists = false)
-    assert(catalog.databaseExists("testing"))
-    assert(catalog.listDatabases().toSet == Set("default", "testing"))
-    catalog.createDatabase(newDb("testing2"), ignoreIfExists = false)
-    assert(catalog.listDatabases().toSet == Set("default", "testing", "testing2"))
-    assert(catalog.databaseExists("testing2"))
-    assert(!catalog.databaseExists("does_not_exist"))
-  }
-
-  test("get database when a database exists") {
-    val db1 = newBasicCatalog().getDatabase("db1")
-    assert(db1.name == "db1")
-    assert(db1.description.contains("db1"))
-  }
-
-  test("get database should throw exception when the database does not exist") {
-    intercept[AnalysisException] { newBasicCatalog().getDatabase("db_that_does_not_exist") }
-  }
-
-  test("list databases without pattern") {
-    val catalog = newBasicCatalog()
-    assert(catalog.listDatabases().toSet == Set("default", "db1", "db2"))
-  }
-
-  test("list databases with pattern") {
-    val catalog = newBasicCatalog()
-    assert(catalog.listDatabases("db").toSet == Set.empty)
-    assert(catalog.listDatabases("db*").toSet == Set("db1", "db2"))
-    assert(catalog.listDatabases("*1").toSet == Set("db1"))
-    assert(catalog.listDatabases("db2").toSet == Set("db2"))
-  }
-
-  test("drop database") {
-    val catalog = newBasicCatalog()
-    catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = false)
-    assert(catalog.listDatabases().toSet == Set("default", "db2"))
-  }
-
-  test("drop database when the database is not empty") {
-    // Throw exception if there are functions left
-    val catalog1 = newBasicCatalog()
-    catalog1.dropTable("db2", "tbl1", ignoreIfNotExists = false)
-    catalog1.dropTable("db2", "tbl2", ignoreIfNotExists = false)
-    intercept[AnalysisException] {
-      catalog1.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
-    }
-    resetState()
-
-    // Throw exception if there are tables left
-    val catalog2 = newBasicCatalog()
-    catalog2.dropFunction("db2", "func1")
-    intercept[AnalysisException] {
-      catalog2.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
-    }
-    resetState()
-
-    // When cascade is true, it should drop them
-    val catalog3 = newBasicCatalog()
-    catalog3.dropDatabase("db2", ignoreIfNotExists = false, cascade = true)
-    assert(catalog3.listDatabases().toSet == Set("default", "db1"))
-  }
-
-  test("drop database when the database does not exist") {
-    val catalog = newBasicCatalog()
-
-    intercept[AnalysisException] {
-      catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = false, cascade = false)
-    }
-
-    catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = true, cascade = false)
-  }
-
-  test("alter database") {
-    val catalog = newBasicCatalog()
-    val db1 = catalog.getDatabase("db1")
-    // Note: alter properties here because Hive does not support altering other fields
-    catalog.alterDatabase(db1.copy(properties = Map("k" -> "v3", "good" -> "true")))
-    val newDb1 = catalog.getDatabase("db1")
-    assert(db1.properties.isEmpty)
-    assert(newDb1.properties.size == 2)
-    assert(newDb1.properties.get("k") == Some("v3"))
-    assert(newDb1.properties.get("good") == Some("true"))
-  }
-
-  test("alter database should throw exception when the database does not exist") {
-    intercept[AnalysisException] {
-      newBasicCatalog().alterDatabase(newDb("does_not_exist"))
-    }
-  }
-
-  // --------------------------------------------------------------------------
-  // Tables
-  // --------------------------------------------------------------------------
-
-  test("the table type of an external table should be EXTERNAL_TABLE") {
-    val catalog = newBasicCatalog()
-    val table =
-      newTable("external_table1", "db2").copy(tableType = CatalogTableType.EXTERNAL_TABLE)
-    catalog.createTable("db2", table, ignoreIfExists = false)
-    val actual = catalog.getTable("db2", "external_table1")
-    assert(actual.tableType === CatalogTableType.EXTERNAL_TABLE)
-  }
-
-  test("drop table") {
-    val catalog = newBasicCatalog()
-    assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
-    catalog.dropTable("db2", "tbl1", ignoreIfNotExists = false)
-    assert(catalog.listTables("db2").toSet == Set("tbl2"))
-  }
-
-  test("drop table when database/table does not exist") {
-    val catalog = newBasicCatalog()
-    // Should always throw exception when the database does not exist
-    intercept[AnalysisException] {
-      catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = false)
-    }
-    intercept[AnalysisException] {
-      catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = true)
-    }
-    // Should throw exception when the table does not exist, if ignoreIfNotExists is false
-    intercept[AnalysisException] {
-      catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = false)
-    }
-    catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = true)
-  }
-
-  test("rename table") {
-    val catalog = newBasicCatalog()
-    assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
-    catalog.renameTable("db2", "tbl1", "tblone")
-    assert(catalog.listTables("db2").toSet == Set("tblone", "tbl2"))
-  }
-
-  test("rename table when database/table does not exist") {
-    val catalog = newBasicCatalog()
-    intercept[AnalysisException] {
-      catalog.renameTable("unknown_db", "unknown_table", "unknown_table")
-    }
-    intercept[AnalysisException] {
-      catalog.renameTable("db2", "unknown_table", "unknown_table")
-    }
-  }
-
-  test("alter table") {
-    val catalog = newBasicCatalog()
-    val tbl1 = catalog.getTable("db2", "tbl1")
-    catalog.alterTable("db2", tbl1.copy(properties = Map("toh" -> "frem")))
-    val newTbl1 = catalog.getTable("db2", "tbl1")
-    assert(!tbl1.properties.contains("toh"))
-    assert(newTbl1.properties.size == tbl1.properties.size + 1)
-    assert(newTbl1.properties.get("toh") == Some("frem"))
-  }
-
-  test("alter table when database/table does not exist") {
-    val catalog = newBasicCatalog()
-    intercept[AnalysisException] {
-      catalog.alterTable("unknown_db", newTable("tbl1", "unknown_db"))
-    }
-    intercept[AnalysisException] {
-      catalog.alterTable("db2", newTable("unknown_table", "db2"))
-    }
-  }
-
-  test("get table") {
-    assert(newBasicCatalog().getTable("db2", "tbl1").identifier.table == "tbl1")
-  }
-
-  test("get table when database/table does not exist") {
-    val catalog = newBasicCatalog()
-    intercept[AnalysisException] {
-      catalog.getTable("unknown_db", "unknown_table")
-    }
-    intercept[AnalysisException] {
-      catalog.getTable("db2", "unknown_table")
-    }
-  }
-
-  test("list tables without pattern") {
-    val catalog = newBasicCatalog()
-    intercept[AnalysisException] { catalog.listTables("unknown_db") }
-    assert(catalog.listTables("db1").toSet == Set.empty)
-    assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
-  }
-
-  test("list tables with pattern") {
-    val catalog = newBasicCatalog()
-    intercept[AnalysisException] { catalog.listTables("unknown_db", "*") }
-    assert(catalog.listTables("db1", "*").toSet == Set.empty)
-    assert(catalog.listTables("db2", "*").toSet == Set("tbl1", "tbl2"))
-    assert(catalog.listTables("db2", "tbl*").toSet == Set("tbl1", "tbl2"))
-    assert(catalog.listTables("db2", "*1").toSet == Set("tbl1"))
-  }
-
-  // --------------------------------------------------------------------------
-  // Partitions
-  // --------------------------------------------------------------------------
-
-  test("basic create and list partitions") {
-    val catalog = newEmptyCatalog()
-    catalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
-    catalog.createTable("mydb", newTable("tbl", "mydb"), ignoreIfExists = false)
-    catalog.createPartitions("mydb", "tbl", Seq(part1, part2), ignoreIfExists = false)
-    assert(catalogPartitionsEqual(catalog, "mydb", "tbl", Seq(part1, part2)))
-  }
-
-  test("create partitions when database/table does not exist") {
-    val catalog = newBasicCatalog()
-    intercept[AnalysisException] {
-      catalog.createPartitions("does_not_exist", "tbl1", Seq(), ignoreIfExists = false)
-    }
-    intercept[AnalysisException] {
-      catalog.createPartitions("db2", "does_not_exist", Seq(), ignoreIfExists = false)
-    }
-  }
-
-  test("create partitions that already exist") {
-    val catalog = newBasicCatalog()
-    intercept[AnalysisException] {
-      catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = false)
-    }
-    catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = true)
-  }
-
-  test("drop partitions") {
-    val catalog = newBasicCatalog()
-    assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2)))
-    catalog.dropPartitions(
-      "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false)
-    assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part2)))
-    resetState()
-    val catalog2 = newBasicCatalog()
-    assert(catalogPartitionsEqual(catalog2, "db2", "tbl2", Seq(part1, part2)))
-    catalog2.dropPartitions(
-      "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false)
-    assert(catalog2.listPartitions("db2", "tbl2").isEmpty)
-  }
-
-  test("drop partitions when database/table does not exist") {
-    val catalog = newBasicCatalog()
-    intercept[AnalysisException] {
-      catalog.dropPartitions(
-        "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false)
-    }
-    intercept[AnalysisException] {
-      catalog.dropPartitions(
-        "db2", "does_not_exist", Seq(), ignoreIfNotExists = false)
-    }
-  }
-
-  test("drop partitions that do not exist") {
-    val catalog = newBasicCatalog()
-    intercept[AnalysisException] {
-      catalog.dropPartitions(
-        "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false)
-    }
-    catalog.dropPartitions(
-      "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true)
-  }
-
-  test("get partition") {
-    val catalog = newBasicCatalog()
-    assert(catalog.getPartition("db2", "tbl2", part1.spec).spec == part1.spec)
-    assert(catalog.getPartition("db2", "tbl2", part2.spec).spec == part2.spec)
-    intercept[AnalysisException] {
-      catalog.getPartition("db2", "tbl1", part3.spec)
-    }
-  }
-
-  test("get partition when database/table does not exist") {
-    val catalog = newBasicCatalog()
-    intercept[AnalysisException] {
-      catalog.getPartition("does_not_exist", "tbl1", part1.spec)
-    }
-    intercept[AnalysisException] {
-      catalog.getPartition("db2", "does_not_exist", part1.spec)
-    }
-  }
-
-  test("rename partitions") {
-    val catalog = newBasicCatalog()
-    val newPart1 = part1.copy(spec = Map("a" -> "100", "b" -> "101"))
-    val newPart2 = part2.copy(spec = Map("a" -> "200", "b" -> "201"))
-    val newSpecs = Seq(newPart1.spec, newPart2.spec)
-    catalog.renamePartitions("db2", "tbl2", Seq(part1.spec, part2.spec), newSpecs)
-    assert(catalog.getPartition("db2", "tbl2", newPart1.spec).spec === newPart1.spec)
-    assert(catalog.getPartition("db2", "tbl2", newPart2.spec).spec === newPart2.spec)
-    // The old partitions should no longer exist
-    intercept[AnalysisException] { catalog.getPartition("db2", "tbl2", part1.spec) }
-    intercept[AnalysisException] { catalog.getPartition("db2", "tbl2", part2.spec) }
-  }
-
-  test("rename partitions when database/table does not exist") {
-    val catalog = newBasicCatalog()
-    intercept[AnalysisException] {
-      catalog.renamePartitions("does_not_exist", "tbl1", Seq(part1.spec), Seq(part2.spec))
-    }
-    intercept[AnalysisException] {
-      catalog.renamePartitions("db2", "does_not_exist", Seq(part1.spec), Seq(part2.spec))
-    }
-  }
-
-  test("alter partitions") {
-    val catalog = newBasicCatalog()
-    try {
-      // Note: Before altering table partitions in Hive, you *must* set the current database
-      // to the one that contains the table of interest. Otherwise you will end up with the
-      // most helpful error message ever: "Unable to alter partition. alter is not possible."
-      // See HIVE-2742 for more detail.
-      catalog.setCurrentDatabase("db2")
-      val newLocation = newUriForDatabase()
-      // alter but keep spec the same
-      val oldPart1 = catalog.getPartition("db2", "tbl2", part1.spec)
-      val oldPart2 = catalog.getPartition("db2", "tbl2", part2.spec)
-      catalog.alterPartitions("db2", "tbl2", Seq(
-        oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocation))),
-        oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocation)))))
-      val newPart1 = catalog.getPartition("db2", "tbl2", part1.spec)
-      val newPart2 = catalog.getPartition("db2", "tbl2", part2.spec)
-      assert(newPart1.storage.locationUri == Some(newLocation))
-      assert(newPart2.storage.locationUri == Some(newLocation))
-      assert(oldPart1.storage.locationUri != Some(newLocation))
-      assert(oldPart2.storage.locationUri != Some(newLocation))
-      // alter but change spec, should fail because new partition specs do not exist yet
-      val badPart1 = part1.copy(spec = Map("a" -> "v1", "b" -> "v2"))
-      val badPart2 = part2.copy(spec = Map("a" -> "v3", "b" -> "v4"))
-      intercept[AnalysisException] {
-        catalog.alterPartitions("db2", "tbl2", Seq(badPart1, badPart2))
-      }
-    } finally {
-      // Remember to restore the original current database, which we assume to be "default"
-      catalog.setCurrentDatabase("default")
-    }
-  }
-
-  test("alter partitions when database/table does not exist") {
-    val catalog = newBasicCatalog()
-    intercept[AnalysisException] {
-      catalog.alterPartitions("does_not_exist", "tbl1", Seq(part1))
-    }
-    intercept[AnalysisException] {
-      catalog.alterPartitions("db2", "does_not_exist", Seq(part1))
-    }
-  }
-
-  // --------------------------------------------------------------------------
-  // Functions
-  // --------------------------------------------------------------------------
-
-  test("basic create and list functions") {
-    val catalog = newEmptyCatalog()
-    catalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
-    catalog.createFunction("mydb", newFunc("myfunc"))
-    assert(catalog.listFunctions("mydb", "*").toSet == Set("myfunc"))
-  }
-
-  test("create function when database does not exist") {
-    val catalog = newBasicCatalog()
-    intercept[AnalysisException] {
-      catalog.createFunction("does_not_exist", newFunc())
-    }
-  }
-
-  test("create function that already exists") {
-    val catalog = newBasicCatalog()
-    intercept[AnalysisException] {
-      catalog.createFunction("db2", newFunc("func1"))
-    }
-  }
-
-  test("drop function") {
-    val catalog = newBasicCatalog()
-    assert(catalog.listFunctions("db2", "*").toSet == Set("func1"))
-    catalog.dropFunction("db2", "func1")
-    assert(catalog.listFunctions("db2", "*").isEmpty)
-  }
-
-  test("drop function when database does not exist") {
-    val catalog = newBasicCatalog()
-    intercept[AnalysisException] {
-      catalog.dropFunction("does_not_exist", "something")
-    }
-  }
-
-  test("drop function that does not exist") {
-    val catalog = newBasicCatalog()
-    intercept[AnalysisException] {
-      catalog.dropFunction("db2", "does_not_exist")
-    }
-  }
-
-  test("get function") {
-    val catalog = newBasicCatalog()
-    assert(catalog.getFunction("db2", "func1") ==
-      CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass,
-        Seq.empty[(String, String)]))
-    intercept[AnalysisException] {
-      catalog.getFunction("db2", "does_not_exist")
-    }
-  }
-
-  test("get function when database does not exist") {
-    val catalog = newBasicCatalog()
-    intercept[AnalysisException] {
-      catalog.getFunction("does_not_exist", "func1")
-    }
-  }
-
-  test("rename function") {
-    val catalog = newBasicCatalog()
-    val newName = "funcky"
-    assert(catalog.getFunction("db2", "func1").className == funcClass)
-    catalog.renameFunction("db2", "func1", newName)
-    intercept[AnalysisException] { catalog.getFunction("db2", "func1") }
-    assert(catalog.getFunction("db2", newName).identifier.funcName == newName)
-    assert(catalog.getFunction("db2", newName).className == funcClass)
-    intercept[AnalysisException] { catalog.renameFunction("db2", "does_not_exist", "me") }
-  }
-
-  test("rename function when database does not exist") {
-    val catalog = newBasicCatalog()
-    intercept[AnalysisException] {
-      catalog.renameFunction("does_not_exist", "func1", "func5")
-    }
-  }
-
-  test("list functions") {
-    val catalog = newBasicCatalog()
-    catalog.createFunction("db2", newFunc("func2"))
-    catalog.createFunction("db2", newFunc("not_me"))
-    assert(catalog.listFunctions("db2", "*").toSet == Set("func1", "func2", "not_me"))
-    assert(catalog.listFunctions("db2", "func*").toSet == Set("func1", "func2"))
-  }
-
-}
-
-
-/**
- * A collection of utility fields and methods for tests related to the [[ExternalCatalog]].
- */
-abstract class CatalogTestUtils {
-
-  // Unimplemented methods
-  val tableInputFormat: String
-  val tableOutputFormat: String
-  def newEmptyCatalog(): ExternalCatalog
-
-  // These fields must be lazy because they rely on fields that are not implemented yet
-  lazy val storageFormat = CatalogStorageFormat(
-    locationUri = None,
-    inputFormat = Some(tableInputFormat),
-    outputFormat = Some(tableOutputFormat),
-    serde = None,
-    serdeProperties = Map.empty)
-  lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat)
-  lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat)
-  lazy val part3 = CatalogTablePartition(Map("a" -> "5", "b" -> "6"), storageFormat)
-  lazy val funcClass = "org.apache.spark.myFunc"
-
-  /**
-   * Creates a basic catalog, with the following structure:
-   *
-   * default
-   * db1
-   * db2
-   *   - tbl1
-   *   - tbl2
-   *     - part1
-   *     - part2
-   *   - func1
-   */
-  def newBasicCatalog(): ExternalCatalog = {
-    val catalog = newEmptyCatalog()
-    // When testing against a real catalog, the default database may already exist
-    catalog.createDatabase(newDb("default"), ignoreIfExists = true)
-    catalog.createDatabase(newDb("db1"), ignoreIfExists = false)
-    catalog.createDatabase(newDb("db2"), ignoreIfExists = false)
-    catalog.createTable("db2", newTable("tbl1", "db2"), ignoreIfExists = false)
-    catalog.createTable("db2", newTable("tbl2", "db2"), ignoreIfExists = false)
-    catalog.createPartitions("db2", "tbl2", Seq(part1, part2), ignoreIfExists = false)
-    catalog.createFunction("db2", newFunc("func1", Some("db2")))
-    catalog
-  }
-
-  def newFunc(): CatalogFunction = newFunc("funcName")
-
-  def newUriForDatabase(): String = Utils.createTempDir().getAbsolutePath
-
-  def newDb(name: String): CatalogDatabase = {
-    CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty)
-  }
-
-  def newTable(name: String, db: String): CatalogTable = newTable(name, Some(db))
-
-  def newTable(name: String, database: Option[String] = None): CatalogTable = {
-    CatalogTable(
-      identifier = TableIdentifier(name, database),
-      tableType = CatalogTableType.EXTERNAL_TABLE,
-      storage = storageFormat,
-      schema = Seq(
-        CatalogColumn("col1", "int"),
-        CatalogColumn("col2", "string"),
-        CatalogColumn("a", "int"),
-        CatalogColumn("b", "string")),
-      partitionColumnNames = Seq("a", "b"))
-  }
-
-  def newFunc(name: String, database: Option[String] = None): CatalogFunction = {
-    CatalogFunction(FunctionIdentifier(name, database), funcClass, Seq.empty[(String, String)])
-  }
-
-  /**
-   * Whether the catalog's table partitions equal the ones given.
-   * Note: Hive sets some random serde things, so we just compare the specs here.
-   */
-  def catalogPartitionsEqual(
-      catalog: ExternalCatalog,
-      db: String,
-      table: String,
-      parts: Seq[CatalogTablePartition]): Boolean = {
-    catalog.listPartitions(db, table).map(_.spec).toSet == parts.map(_.spec).toSet
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
new file mode 100644
index 0000000..d739b17
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.util.Utils
+
+
+/**
+ * A reasonable complete test suite (i.e. behaviors) for a [[ExternalCatalog]].
+ *
+ * Implementations of the [[ExternalCatalog]] interface can create test suites by extending this.
+ */
+abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEach {
+  protected val utils: CatalogTestUtils
+  import utils._
+
+  protected def resetState(): Unit = { }
+
+  // Clear all state after each test
+  override def afterEach(): Unit = {
+    try {
+      resetState()
+    } finally {
+      super.afterEach()
+    }
+  }
+
+  // --------------------------------------------------------------------------
+  // Databases
+  // --------------------------------------------------------------------------
+
+  test("basic create and list databases") {
+    val catalog = newEmptyCatalog()
+    catalog.createDatabase(newDb("default"), ignoreIfExists = true)
+    assert(catalog.databaseExists("default"))
+    assert(!catalog.databaseExists("testing"))
+    assert(!catalog.databaseExists("testing2"))
+    catalog.createDatabase(newDb("testing"), ignoreIfExists = false)
+    assert(catalog.databaseExists("testing"))
+    assert(catalog.listDatabases().toSet == Set("default", "testing"))
+    catalog.createDatabase(newDb("testing2"), ignoreIfExists = false)
+    assert(catalog.listDatabases().toSet == Set("default", "testing", "testing2"))
+    assert(catalog.databaseExists("testing2"))
+    assert(!catalog.databaseExists("does_not_exist"))
+  }
+
+  test("get database when a database exists") {
+    val db1 = newBasicCatalog().getDatabase("db1")
+    assert(db1.name == "db1")
+    assert(db1.description.contains("db1"))
+  }
+
+  test("get database should throw exception when the database does not exist") {
+    intercept[AnalysisException] { newBasicCatalog().getDatabase("db_that_does_not_exist") }
+  }
+
+  test("list databases without pattern") {
+    val catalog = newBasicCatalog()
+    assert(catalog.listDatabases().toSet == Set("default", "db1", "db2"))
+  }
+
+  test("list databases with pattern") {
+    val catalog = newBasicCatalog()
+    assert(catalog.listDatabases("db").toSet == Set.empty)
+    assert(catalog.listDatabases("db*").toSet == Set("db1", "db2"))
+    assert(catalog.listDatabases("*1").toSet == Set("db1"))
+    assert(catalog.listDatabases("db2").toSet == Set("db2"))
+  }
+
+  test("drop database") {
+    val catalog = newBasicCatalog()
+    catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = false)
+    assert(catalog.listDatabases().toSet == Set("default", "db2"))
+  }
+
+  test("drop database when the database is not empty") {
+    // Throw exception if there are functions left
+    val catalog1 = newBasicCatalog()
+    catalog1.dropTable("db2", "tbl1", ignoreIfNotExists = false)
+    catalog1.dropTable("db2", "tbl2", ignoreIfNotExists = false)
+    intercept[AnalysisException] {
+      catalog1.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
+    }
+    resetState()
+
+    // Throw exception if there are tables left
+    val catalog2 = newBasicCatalog()
+    catalog2.dropFunction("db2", "func1")
+    intercept[AnalysisException] {
+      catalog2.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
+    }
+    resetState()
+
+    // When cascade is true, it should drop them
+    val catalog3 = newBasicCatalog()
+    catalog3.dropDatabase("db2", ignoreIfNotExists = false, cascade = true)
+    assert(catalog3.listDatabases().toSet == Set("default", "db1"))
+  }
+
+  test("drop database when the database does not exist") {
+    val catalog = newBasicCatalog()
+
+    intercept[AnalysisException] {
+      catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = false, cascade = false)
+    }
+
+    catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = true, cascade = false)
+  }
+
+  test("alter database") {
+    val catalog = newBasicCatalog()
+    val db1 = catalog.getDatabase("db1")
+    // Note: alter properties here because Hive does not support altering other fields
+    catalog.alterDatabase(db1.copy(properties = Map("k" -> "v3", "good" -> "true")))
+    val newDb1 = catalog.getDatabase("db1")
+    assert(db1.properties.isEmpty)
+    assert(newDb1.properties.size == 2)
+    assert(newDb1.properties.get("k") == Some("v3"))
+    assert(newDb1.properties.get("good") == Some("true"))
+  }
+
+  test("alter database should throw exception when the database does not exist") {
+    intercept[AnalysisException] {
+      newBasicCatalog().alterDatabase(newDb("does_not_exist"))
+    }
+  }
+
+  // --------------------------------------------------------------------------
+  // Tables
+  // --------------------------------------------------------------------------
+
+  test("the table type of an external table should be EXTERNAL_TABLE") {
+    val catalog = newBasicCatalog()
+    val table =
+      newTable("external_table1", "db2").copy(tableType = CatalogTableType.EXTERNAL)
+    catalog.createTable("db2", table, ignoreIfExists = false)
+    val actual = catalog.getTable("db2", "external_table1")
+    assert(actual.tableType === CatalogTableType.EXTERNAL)
+  }
+
+  test("drop table") {
+    val catalog = newBasicCatalog()
+    assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+    catalog.dropTable("db2", "tbl1", ignoreIfNotExists = false)
+    assert(catalog.listTables("db2").toSet == Set("tbl2"))
+  }
+
+  test("drop table when database/table does not exist") {
+    val catalog = newBasicCatalog()
+    // Should always throw exception when the database does not exist
+    intercept[AnalysisException] {
+      catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = false)
+    }
+    intercept[AnalysisException] {
+      catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = true)
+    }
+    // Should throw exception when the table does not exist, if ignoreIfNotExists is false
+    intercept[AnalysisException] {
+      catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = false)
+    }
+    catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = true)
+  }
+
+  test("rename table") {
+    val catalog = newBasicCatalog()
+    assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+    catalog.renameTable("db2", "tbl1", "tblone")
+    assert(catalog.listTables("db2").toSet == Set("tblone", "tbl2"))
+  }
+
+  test("rename table when database/table does not exist") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.renameTable("unknown_db", "unknown_table", "unknown_table")
+    }
+    intercept[AnalysisException] {
+      catalog.renameTable("db2", "unknown_table", "unknown_table")
+    }
+  }
+
+  test("alter table") {
+    val catalog = newBasicCatalog()
+    val tbl1 = catalog.getTable("db2", "tbl1")
+    catalog.alterTable("db2", tbl1.copy(properties = Map("toh" -> "frem")))
+    val newTbl1 = catalog.getTable("db2", "tbl1")
+    assert(!tbl1.properties.contains("toh"))
+    assert(newTbl1.properties.size == tbl1.properties.size + 1)
+    assert(newTbl1.properties.get("toh") == Some("frem"))
+  }
+
+  test("alter table when database/table does not exist") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.alterTable("unknown_db", newTable("tbl1", "unknown_db"))
+    }
+    intercept[AnalysisException] {
+      catalog.alterTable("db2", newTable("unknown_table", "db2"))
+    }
+  }
+
+  test("get table") {
+    assert(newBasicCatalog().getTable("db2", "tbl1").identifier.table == "tbl1")
+  }
+
+  test("get table when database/table does not exist") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.getTable("unknown_db", "unknown_table")
+    }
+    intercept[AnalysisException] {
+      catalog.getTable("db2", "unknown_table")
+    }
+  }
+
+  test("list tables without pattern") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] { catalog.listTables("unknown_db") }
+    assert(catalog.listTables("db1").toSet == Set.empty)
+    assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+  }
+
+  test("list tables with pattern") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] { catalog.listTables("unknown_db", "*") }
+    assert(catalog.listTables("db1", "*").toSet == Set.empty)
+    assert(catalog.listTables("db2", "*").toSet == Set("tbl1", "tbl2"))
+    assert(catalog.listTables("db2", "tbl*").toSet == Set("tbl1", "tbl2"))
+    assert(catalog.listTables("db2", "*1").toSet == Set("tbl1"))
+  }
+
+  // --------------------------------------------------------------------------
+  // Partitions
+  // --------------------------------------------------------------------------
+
+  test("basic create and list partitions") {
+    val catalog = newEmptyCatalog()
+    catalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
+    catalog.createTable("mydb", newTable("tbl", "mydb"), ignoreIfExists = false)
+    catalog.createPartitions("mydb", "tbl", Seq(part1, part2), ignoreIfExists = false)
+    assert(catalogPartitionsEqual(catalog, "mydb", "tbl", Seq(part1, part2)))
+  }
+
+  test("create partitions when database/table does not exist") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.createPartitions("does_not_exist", "tbl1", Seq(), ignoreIfExists = false)
+    }
+    intercept[AnalysisException] {
+      catalog.createPartitions("db2", "does_not_exist", Seq(), ignoreIfExists = false)
+    }
+  }
+
+  test("create partitions that already exist") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = false)
+    }
+    catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = true)
+  }
+
+  test("drop partitions") {
+    val catalog = newBasicCatalog()
+    assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2)))
+    catalog.dropPartitions(
+      "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false)
+    assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part2)))
+    resetState()
+    val catalog2 = newBasicCatalog()
+    assert(catalogPartitionsEqual(catalog2, "db2", "tbl2", Seq(part1, part2)))
+    catalog2.dropPartitions(
+      "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false)
+    assert(catalog2.listPartitions("db2", "tbl2").isEmpty)
+  }
+
+  test("drop partitions when database/table does not exist") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.dropPartitions(
+        "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false)
+    }
+    intercept[AnalysisException] {
+      catalog.dropPartitions(
+        "db2", "does_not_exist", Seq(), ignoreIfNotExists = false)
+    }
+  }
+
+  test("drop partitions that do not exist") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.dropPartitions(
+        "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false)
+    }
+    catalog.dropPartitions(
+      "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true)
+  }
+
+  test("get partition") {
+    val catalog = newBasicCatalog()
+    assert(catalog.getPartition("db2", "tbl2", part1.spec).spec == part1.spec)
+    assert(catalog.getPartition("db2", "tbl2", part2.spec).spec == part2.spec)
+    intercept[AnalysisException] {
+      catalog.getPartition("db2", "tbl1", part3.spec)
+    }
+  }
+
+  test("get partition when database/table does not exist") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.getPartition("does_not_exist", "tbl1", part1.spec)
+    }
+    intercept[AnalysisException] {
+      catalog.getPartition("db2", "does_not_exist", part1.spec)
+    }
+  }
+
+  test("rename partitions") {
+    val catalog = newBasicCatalog()
+    val newPart1 = part1.copy(spec = Map("a" -> "100", "b" -> "101"))
+    val newPart2 = part2.copy(spec = Map("a" -> "200", "b" -> "201"))
+    val newSpecs = Seq(newPart1.spec, newPart2.spec)
+    catalog.renamePartitions("db2", "tbl2", Seq(part1.spec, part2.spec), newSpecs)
+    assert(catalog.getPartition("db2", "tbl2", newPart1.spec).spec === newPart1.spec)
+    assert(catalog.getPartition("db2", "tbl2", newPart2.spec).spec === newPart2.spec)
+    // The old partitions should no longer exist
+    intercept[AnalysisException] { catalog.getPartition("db2", "tbl2", part1.spec) }
+    intercept[AnalysisException] { catalog.getPartition("db2", "tbl2", part2.spec) }
+  }
+
+  test("rename partitions when database/table does not exist") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.renamePartitions("does_not_exist", "tbl1", Seq(part1.spec), Seq(part2.spec))
+    }
+    intercept[AnalysisException] {
+      catalog.renamePartitions("db2", "does_not_exist", Seq(part1.spec), Seq(part2.spec))
+    }
+  }
+
+  test("alter partitions") {
+    val catalog = newBasicCatalog()
+    try {
+      // Note: Before altering table partitions in Hive, you *must* set the current database
+      // to the one that contains the table of interest. Otherwise you will end up with the
+      // most helpful error message ever: "Unable to alter partition. alter is not possible."
+      // See HIVE-2742 for more detail.
+      catalog.setCurrentDatabase("db2")
+      val newLocation = newUriForDatabase()
+      // alter but keep spec the same
+      val oldPart1 = catalog.getPartition("db2", "tbl2", part1.spec)
+      val oldPart2 = catalog.getPartition("db2", "tbl2", part2.spec)
+      catalog.alterPartitions("db2", "tbl2", Seq(
+        oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocation))),
+        oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocation)))))
+      val newPart1 = catalog.getPartition("db2", "tbl2", part1.spec)
+      val newPart2 = catalog.getPartition("db2", "tbl2", part2.spec)
+      assert(newPart1.storage.locationUri == Some(newLocation))
+      assert(newPart2.storage.locationUri == Some(newLocation))
+      assert(oldPart1.storage.locationUri != Some(newLocation))
+      assert(oldPart2.storage.locationUri != Some(newLocation))
+      // alter but change spec, should fail because new partition specs do not exist yet
+      val badPart1 = part1.copy(spec = Map("a" -> "v1", "b" -> "v2"))
+      val badPart2 = part2.copy(spec = Map("a" -> "v3", "b" -> "v4"))
+      intercept[AnalysisException] {
+        catalog.alterPartitions("db2", "tbl2", Seq(badPart1, badPart2))
+      }
+    } finally {
+      // Remember to restore the original current database, which we assume to be "default"
+      catalog.setCurrentDatabase("default")
+    }
+  }
+
+  test("alter partitions when database/table does not exist") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.alterPartitions("does_not_exist", "tbl1", Seq(part1))
+    }
+    intercept[AnalysisException] {
+      catalog.alterPartitions("db2", "does_not_exist", Seq(part1))
+    }
+  }
+
+  // --------------------------------------------------------------------------
+  // Functions
+  // --------------------------------------------------------------------------
+
+  test("basic create and list functions") {
+    val catalog = newEmptyCatalog()
+    catalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
+    catalog.createFunction("mydb", newFunc("myfunc"))
+    assert(catalog.listFunctions("mydb", "*").toSet == Set("myfunc"))
+  }
+
+  test("create function when database does not exist") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.createFunction("does_not_exist", newFunc())
+    }
+  }
+
+  test("create function that already exists") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.createFunction("db2", newFunc("func1"))
+    }
+  }
+
+  test("drop function") {
+    val catalog = newBasicCatalog()
+    assert(catalog.listFunctions("db2", "*").toSet == Set("func1"))
+    catalog.dropFunction("db2", "func1")
+    assert(catalog.listFunctions("db2", "*").isEmpty)
+  }
+
+  test("drop function when database does not exist") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.dropFunction("does_not_exist", "something")
+    }
+  }
+
+  test("drop function that does not exist") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.dropFunction("db2", "does_not_exist")
+    }
+  }
+
+  test("get function") {
+    val catalog = newBasicCatalog()
+    assert(catalog.getFunction("db2", "func1") ==
+      CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass,
+        Seq.empty[(String, String)]))
+    intercept[AnalysisException] {
+      catalog.getFunction("db2", "does_not_exist")
+    }
+  }
+
+  test("get function when database does not exist") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.getFunction("does_not_exist", "func1")
+    }
+  }
+
+  test("rename function") {
+    val catalog = newBasicCatalog()
+    val newName = "funcky"
+    assert(catalog.getFunction("db2", "func1").className == funcClass)
+    catalog.renameFunction("db2", "func1", newName)
+    intercept[AnalysisException] { catalog.getFunction("db2", "func1") }
+    assert(catalog.getFunction("db2", newName).identifier.funcName == newName)
+    assert(catalog.getFunction("db2", newName).className == funcClass)
+    intercept[AnalysisException] { catalog.renameFunction("db2", "does_not_exist", "me") }
+  }
+
+  test("rename function when database does not exist") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.renameFunction("does_not_exist", "func1", "func5")
+    }
+  }
+
+  test("list functions") {
+    val catalog = newBasicCatalog()
+    catalog.createFunction("db2", newFunc("func2"))
+    catalog.createFunction("db2", newFunc("not_me"))
+    assert(catalog.listFunctions("db2", "*").toSet == Set("func1", "func2", "not_me"))
+    assert(catalog.listFunctions("db2", "func*").toSet == Set("func1", "func2"))
+  }
+
+}
+
+
+/**
+ * A collection of utility fields and methods for tests related to the [[ExternalCatalog]].
+ */
+abstract class CatalogTestUtils {
+
+  // Unimplemented methods
+  val tableInputFormat: String
+  val tableOutputFormat: String
+  def newEmptyCatalog(): ExternalCatalog
+
+  // These fields must be lazy because they rely on fields that are not implemented yet
+  lazy val storageFormat = CatalogStorageFormat(
+    locationUri = None,
+    inputFormat = Some(tableInputFormat),
+    outputFormat = Some(tableOutputFormat),
+    serde = None,
+    serdeProperties = Map.empty)
+  lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat)
+  lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat)
+  lazy val part3 = CatalogTablePartition(Map("a" -> "5", "b" -> "6"), storageFormat)
+  lazy val funcClass = "org.apache.spark.myFunc"
+
+  /**
+   * Creates a basic catalog, with the following structure:
+   *
+   * default
+   * db1
+   * db2
+   *   - tbl1
+   *   - tbl2
+   *     - part1
+   *     - part2
+   *   - func1
+   */
+  def newBasicCatalog(): ExternalCatalog = {
+    val catalog = newEmptyCatalog()
+    // When testing against a real catalog, the default database may already exist
+    catalog.createDatabase(newDb("default"), ignoreIfExists = true)
+    catalog.createDatabase(newDb("db1"), ignoreIfExists = false)
+    catalog.createDatabase(newDb("db2"), ignoreIfExists = false)
+    catalog.createTable("db2", newTable("tbl1", "db2"), ignoreIfExists = false)
+    catalog.createTable("db2", newTable("tbl2", "db2"), ignoreIfExists = false)
+    catalog.createPartitions("db2", "tbl2", Seq(part1, part2), ignoreIfExists = false)
+    catalog.createFunction("db2", newFunc("func1", Some("db2")))
+    catalog
+  }
+
+  def newFunc(): CatalogFunction = newFunc("funcName")
+
+  def newUriForDatabase(): String = Utils.createTempDir().getAbsolutePath
+
+  def newDb(name: String): CatalogDatabase = {
+    CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty)
+  }
+
+  def newTable(name: String, db: String): CatalogTable = newTable(name, Some(db))
+
+  def newTable(name: String, database: Option[String] = None): CatalogTable = {
+    CatalogTable(
+      identifier = TableIdentifier(name, database),
+      tableType = CatalogTableType.EXTERNAL,
+      storage = storageFormat,
+      schema = Seq(
+        CatalogColumn("col1", "int"),
+        CatalogColumn("col2", "string"),
+        CatalogColumn("a", "int"),
+        CatalogColumn("b", "string")),
+      partitionColumnNames = Seq("a", "b"),
+      bucketColumnNames = Seq("col1"))
+  }
+
+  def newFunc(name: String, database: Option[String] = None): CatalogFunction = {
+    CatalogFunction(FunctionIdentifier(name, database), funcClass, Seq.empty[(String, String)])
+  }
+
+  /**
+   * Whether the catalog's table partitions equal the ones given.
+   * Note: Hive sets some random serde things, so we just compare the specs here.
+   */
+  def catalogPartitionsEqual(
+      catalog: ExternalCatalog,
+      db: String,
+      table: String,
+      parts: Seq[CatalogTablePartition]): Boolean = {
+    catalog.listPartitions(db, table).map(_.spec).toSet == parts.map(_.spec).toSet
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
index 63a7b2c..0605daa 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.catalog
 
 
 /** Test suite for the [[InMemoryCatalog]]. */
-class InMemoryCatalogSuite extends CatalogTestCases {
+class InMemoryCatalogSuite extends ExternalCatalogSuite {
 
   protected override val utils: CatalogTestUtils = new CatalogTestUtils {
     override val tableInputFormat: String = "org.apache.park.SequenceFileInputFormat"

http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 1933be5..ba5d8ce 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias}
 /**
  * Tests for [[SessionCatalog]] that assume that [[InMemoryCatalog]] is correctly implemented.
  *
- * Note: many of the methods here are very similar to the ones in [[CatalogTestCases]].
+ * Note: many of the methods here are very similar to the ones in [[ExternalCatalogSuite]].
  * This is because [[SessionCatalog]] and [[ExternalCatalog]] share many similar method
  * signatures but do not extend a common parent. This is largely by design but
  * unfortunately leads to very similar test code in two places.

http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 47c043a..dbbdf11 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.ShowTablesCommand
 import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
 import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
 import org.apache.spark.sql.sources.BaseRelation
@@ -258,7 +259,7 @@ class SQLContext private[sql](
    * @since 1.3.0
    */
   def isCached(tableName: String): Boolean = {
-    sparkSession.isCached(tableName)
+    sparkSession.catalog.isCached(tableName)
   }
 
   /**
@@ -267,7 +268,7 @@ class SQLContext private[sql](
    * @since 1.3.0
    */
   private[sql] def isCached(qName: Dataset[_]): Boolean = {
-    sparkSession.isCached(qName)
+    sparkSession.cacheManager.lookupCachedData(qName).nonEmpty
   }
 
   /**
@@ -276,7 +277,7 @@ class SQLContext private[sql](
    * @since 1.3.0
    */
   def cacheTable(tableName: String): Unit = {
-    sparkSession.cacheTable(tableName)
+    sparkSession.catalog.cacheTable(tableName)
   }
 
   /**
@@ -285,7 +286,7 @@ class SQLContext private[sql](
    * @since 1.3.0
    */
   def uncacheTable(tableName: String): Unit = {
-    sparkSession.uncacheTable(tableName)
+    sparkSession.catalog.uncacheTable(tableName)
   }
 
   /**
@@ -293,7 +294,7 @@ class SQLContext private[sql](
    * @since 1.3.0
    */
   def clearCache(): Unit = {
-    sparkSession.clearCache()
+    sparkSession.catalog.clearCache()
   }
 
   // scalastyle:off
@@ -507,7 +508,7 @@ class SQLContext private[sql](
    */
   @Experimental
   def createExternalTable(tableName: String, path: String): DataFrame = {
-    sparkSession.createExternalTable(tableName, path)
+    sparkSession.catalog.createExternalTable(tableName, path)
   }
 
   /**
@@ -523,7 +524,7 @@ class SQLContext private[sql](
       tableName: String,
       path: String,
       source: String): DataFrame = {
-    sparkSession.createExternalTable(tableName, path, source)
+    sparkSession.catalog.createExternalTable(tableName, path, source)
   }
 
   /**
@@ -539,7 +540,7 @@ class SQLContext private[sql](
       tableName: String,
       source: String,
       options: java.util.Map[String, String]): DataFrame = {
-    sparkSession.createExternalTable(tableName, source, options)
+    sparkSession.catalog.createExternalTable(tableName, source, options)
   }
 
   /**
@@ -556,7 +557,7 @@ class SQLContext private[sql](
       tableName: String,
       source: String,
       options: Map[String, String]): DataFrame = {
-    sparkSession.createExternalTable(tableName, source, options)
+    sparkSession.catalog.createExternalTable(tableName, source, options)
   }
 
   /**
@@ -573,7 +574,7 @@ class SQLContext private[sql](
       source: String,
       schema: StructType,
       options: java.util.Map[String, String]): DataFrame = {
-    sparkSession.createExternalTable(tableName, source, schema, options)
+    sparkSession.catalog.createExternalTable(tableName, source, schema, options)
   }
 
   /**
@@ -591,7 +592,7 @@ class SQLContext private[sql](
       source: String,
       schema: StructType,
       options: Map[String, String]): DataFrame = {
-    sparkSession.createExternalTable(tableName, source, schema, options)
+    sparkSession.catalog.createExternalTable(tableName, source, schema, options)
   }
 
   /**
@@ -611,7 +612,7 @@ class SQLContext private[sql](
    * @since 1.3.0
    */
   def dropTempTable(tableName: String): Unit = {
-    sparkSession.dropTempTable(tableName)
+    sparkSession.catalog.dropTempTable(tableName)
   }
 
   /**
@@ -700,7 +701,7 @@ class SQLContext private[sql](
    * @since 1.3.0
    */
   def tables(): DataFrame = {
-    sparkSession.tables()
+    Dataset.ofRows(sparkSession, ShowTablesCommand(None, None))
   }
 
   /**
@@ -712,7 +713,7 @@ class SQLContext private[sql](
    * @since 1.3.0
    */
   def tables(databaseName: String): DataFrame = {
-    sparkSession.tables(databaseName)
+    Dataset.ofRows(sparkSession, ShowTablesCommand(Some(databaseName), None))
   }
 
   /**
@@ -730,7 +731,7 @@ class SQLContext private[sql](
    * @since 1.3.0
    */
   def tableNames(): Array[String] = {
-    sparkSession.tableNames()
+    sparkSession.catalog.listTables().collect().map(_.name)
   }
 
   /**
@@ -740,7 +741,7 @@ class SQLContext private[sql](
    * @since 1.3.0
    */
   def tableNames(databaseName: String): Array[String] = {
-    sparkSession.tableNames(databaseName)
+    sparkSession.catalog.listTables(databaseName).collect().map(_.name)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index a0f0bd3..6477f42 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -31,16 +31,16 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.internal.config.{CATALOG_IMPLEMENTATION, ConfigEntry}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalog.Catalog
 import org.apache.spark.sql.catalyst._
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.encoders._
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range}
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.ShowTablesCommand
-import org.apache.spark.sql.execution.datasources.{CreateTableUsing, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.ui.SQLListener
-import org.apache.spark.sql.internal.{RuntimeConfigImpl, SessionState, SharedState}
+import org.apache.spark.sql.internal.{CatalogImpl, RuntimeConfigImpl, SessionState, SharedState}
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types.{DataType, LongType, StructType}
 import org.apache.spark.sql.util.ExecutionListenerManager
@@ -191,10 +191,6 @@ class SparkSession private(
    |  Methods for accessing or mutating configurations  |
    * -------------------------------------------------- */
 
-  @transient private lazy val _conf: RuntimeConfig = {
-    new RuntimeConfigImpl(sessionState.conf, sessionState.hadoopConf)
-  }
-
   /**
    * Runtime configuration interface for Spark.
    *
@@ -205,7 +201,9 @@ class SparkSession private(
    * @group config
    * @since 2.0.0
    */
-  def conf: RuntimeConfig = _conf
+  @transient lazy val conf: RuntimeConfig = {
+    new RuntimeConfigImpl(sessionState.conf, sessionState.hadoopConf)
+  }
 
   /**
    * Set Spark SQL configuration properties.
@@ -274,61 +272,6 @@ class SparkSession private(
   }
 
 
-  /* ------------------------------------- *
-   |  Methods related to cache management  |
-   * ------------------------------------- */
-
-  /**
-   * Returns true if the table is currently cached in-memory.
-   *
-   * @group cachemgmt
-   * @since 2.0.0
-   */
-  def isCached(tableName: String): Boolean = {
-    cacheManager.lookupCachedData(table(tableName)).nonEmpty
-  }
-
-  /**
-   * Caches the specified table in-memory.
-   *
-   * @group cachemgmt
-   * @since 2.0.0
-   */
-  def cacheTable(tableName: String): Unit = {
-    cacheManager.cacheQuery(table(tableName), Some(tableName))
-  }
-
-  /**
-   * Removes the specified table from the in-memory cache.
-   *
-   * @group cachemgmt
-   * @since 2.0.0
-   */
-  def uncacheTable(tableName: String): Unit = {
-    cacheManager.uncacheQuery(table(tableName))
-  }
-
-  /**
-   * Removes all cached tables from the in-memory cache.
-   *
-   * @group cachemgmt
-   * @since 2.0.0
-   */
-  def clearCache(): Unit = {
-    cacheManager.clearCache()
-  }
-
-  /**
-   * Returns true if the [[Dataset]] is currently cached in-memory.
-   *
-   * @group cachemgmt
-   * @since 2.0.0
-   */
-  protected[sql] def isCached(qName: Dataset[_]): Boolean = {
-    cacheManager.lookupCachedData(qName).nonEmpty
-  }
-
-
   /* --------------------------------- *
    |  Methods for creating DataFrames  |
    * --------------------------------- */
@@ -605,139 +548,18 @@ class SparkSession private(
   }
 
 
-  /* --------------------------- *
-   |  Methods related to tables  |
-   * --------------------------- */
-
-  /**
-   * :: Experimental ::
-   * Creates an external table from the given path and returns the corresponding DataFrame.
-   * It will use the default data source configured by spark.sql.sources.default.
-   *
-   * @group ddl_ops
-   * @since 2.0.0
-   */
-  @Experimental
-  def createExternalTable(tableName: String, path: String): DataFrame = {
-    val dataSourceName = sessionState.conf.defaultDataSourceName
-    createExternalTable(tableName, path, dataSourceName)
-  }
-
-  /**
-   * :: Experimental ::
-   * Creates an external table from the given path based on a data source
-   * and returns the corresponding DataFrame.
-   *
-   * @group ddl_ops
-   * @since 2.0.0
-   */
-  @Experimental
-  def createExternalTable(tableName: String, path: String, source: String): DataFrame = {
-    createExternalTable(tableName, source, Map("path" -> path))
-  }
-
-  /**
-   * :: Experimental ::
-   * Creates an external table from the given path based on a data source and a set of options.
-   * Then, returns the corresponding DataFrame.
-   *
-   * @group ddl_ops
-   * @since 2.0.0
-   */
-  @Experimental
-  def createExternalTable(
-      tableName: String,
-      source: String,
-      options: java.util.Map[String, String]): DataFrame = {
-    createExternalTable(tableName, source, options.asScala.toMap)
-  }
-
-  /**
-   * :: Experimental ::
-   * (Scala-specific)
-   * Creates an external table from the given path based on a data source and a set of options.
-   * Then, returns the corresponding DataFrame.
-   *
-   * @group ddl_ops
-   * @since 2.0.0
-   */
-  @Experimental
-  def createExternalTable(
-      tableName: String,
-      source: String,
-      options: Map[String, String]): DataFrame = {
-    val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
-    val cmd =
-      CreateTableUsing(
-        tableIdent,
-        userSpecifiedSchema = None,
-        source,
-        temporary = false,
-        options,
-        allowExisting = false,
-        managedIfNoPath = false)
-    executePlan(cmd).toRdd
-    table(tableIdent)
-  }
-
-  /**
-   * :: Experimental ::
-   * Create an external table from the given path based on a data source, a schema and
-   * a set of options. Then, returns the corresponding DataFrame.
-   *
-   * @group ddl_ops
-   * @since 2.0.0
-   */
-  @Experimental
-  def createExternalTable(
-      tableName: String,
-      source: String,
-      schema: StructType,
-      options: java.util.Map[String, String]): DataFrame = {
-    createExternalTable(tableName, source, schema, options.asScala.toMap)
-  }
+  /* ------------------------ *
+   |  Catalog-related methods |
+   * ----------------- ------ */
 
   /**
-   * :: Experimental ::
-   * (Scala-specific)
-   * Create an external table from the given path based on a data source, a schema and
-   * a set of options. Then, returns the corresponding DataFrame.
+   * Interface through which the user may create, drop, alter or query underlying
+   * databases, tables, functions etc.
    *
    * @group ddl_ops
    * @since 2.0.0
    */
-  @Experimental
-  def createExternalTable(
-      tableName: String,
-      source: String,
-      schema: StructType,
-      options: Map[String, String]): DataFrame = {
-    val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
-    val cmd =
-      CreateTableUsing(
-        tableIdent,
-        userSpecifiedSchema = Some(schema),
-        source,
-        temporary = false,
-        options,
-        allowExisting = false,
-        managedIfNoPath = false)
-    executePlan(cmd).toRdd
-    table(tableIdent)
-  }
-
-  /**
-   * Drops the temporary table with the given table name in the catalog.
-   * If the table has been cached/persisted before, it's also unpersisted.
-   *
-   * @param tableName the name of the table to be unregistered.
-   * @group ddl_ops
-   * @since 2.0.0
-   */
-  def dropTempTable(tableName: String): Unit = {
-    cacheManager.tryUncacheQuery(table(tableName))
-    sessionState.catalog.dropTable(TableIdentifier(tableName), ignoreIfNotExists = true)
-  }
+  @transient lazy val catalog: Catalog = new CatalogImpl(self)
 
   /**
    * Returns the specified table as a [[DataFrame]].
@@ -749,55 +571,11 @@ class SparkSession private(
     table(sessionState.sqlParser.parseTableIdentifier(tableName))
   }
 
-  private def table(tableIdent: TableIdentifier): DataFrame = {
+  protected[sql] def table(tableIdent: TableIdentifier): DataFrame = {
     Dataset.ofRows(self, sessionState.catalog.lookupRelation(tableIdent))
   }
 
   /**
-   * Returns a [[DataFrame]] containing names of existing tables in the current database.
-   * The returned DataFrame has two columns, tableName and isTemporary (a Boolean
-   * indicating if a table is a temporary one or not).
-   *
-   * @group ddl_ops
-   * @since 2.0.0
-   */
-  def tables(): DataFrame = {
-    Dataset.ofRows(self, ShowTablesCommand(None, None))
-  }
-
-  /**
-   * Returns a [[DataFrame]] containing names of existing tables in the given database.
-   * The returned DataFrame has two columns, tableName and isTemporary (a Boolean
-   * indicating if a table is a temporary one or not).
-   *
-   * @group ddl_ops
-   * @since 2.0.0
-   */
-  def tables(databaseName: String): DataFrame = {
-    Dataset.ofRows(self, ShowTablesCommand(Some(databaseName), None))
-  }
-
-  /**
-   * Returns the names of tables in the current database as an array.
-   *
-   * @group ddl_ops
-   * @since 2.0.0
-   */
-  def tableNames(): Array[String] = {
-    tableNames(sessionState.catalog.getCurrentDatabase)
-  }
-
-  /**
-   * Returns the names of tables in the given database as an array.
-   *
-   * @group ddl_ops
-   * @since 2.0.0
-   */
-  def tableNames(databaseName: String): Array[String] = {
-    sessionState.catalog.listTables(databaseName).map(_.table).toArray
-  }
-
-  /**
    * Registers the given [[DataFrame]] as a temporary table in the catalog.
    * Temporary tables exist only during the lifetime of this instance of [[SparkSession]].
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
new file mode 100644
index 0000000..868cc3a
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset}
+import org.apache.spark.sql.types.StructType
+
+
+/**
+ * Catalog interface for Spark. To access this, use `SparkSession.catalog`.
+ */
+abstract class Catalog {
+
+  /**
+   * Returns the current default database in this session.
+   *
+   * @since 2.0.0
+   */
+  def currentDatabase: String
+
+  /**
+   * Sets the current default database in this session.
+   *
+   * @since 2.0.0
+   */
+  def setCurrentDatabase(dbName: String): Unit
+
+  /**
+   * Returns a list of databases available across all sessions.
+   *
+   * @since 2.0.0
+   */
+  def listDatabases(): Dataset[Database]
+
+  /**
+   * Returns a list of tables in the current database.
+   * This includes all temporary tables.
+   *
+   * @since 2.0.0
+   */
+  def listTables(): Dataset[Table]
+
+  /**
+   * Returns a list of tables in the specified database.
+   * This includes all temporary tables.
+   *
+   * @since 2.0.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  def listTables(dbName: String): Dataset[Table]
+
+  /**
+   * Returns a list of functions registered in the current database.
+   * This includes all temporary functions
+   *
+   * @since 2.0.0
+   */
+  def listFunctions(): Dataset[Function]
+
+  /**
+   * Returns a list of functions registered in the specified database.
+   * This includes all temporary functions
+   *
+   * @since 2.0.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  def listFunctions(dbName: String): Dataset[Function]
+
+  /**
+   * Returns a list of columns for the given table in the current database.
+   *
+   * @since 2.0.0
+   */
+  @throws[AnalysisException]("table does not exist")
+  def listColumns(tableName: String): Dataset[Column]
+
+  /**
+   * Returns a list of columns for the given table in the specified database.
+   *
+   * @since 2.0.0
+   */
+  @throws[AnalysisException]("database or table does not exist")
+  def listColumns(dbName: String, tableName: String): Dataset[Column]
+
+  /**
+   * :: Experimental ::
+   * Creates an external table from the given path and returns the corresponding DataFrame.
+   * It will use the default data source configured by spark.sql.sources.default.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def createExternalTable(tableName: String, path: String): DataFrame
+
+  /**
+   * :: Experimental ::
+   * Creates an external table from the given path based on a data source
+   * and returns the corresponding DataFrame.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def createExternalTable(tableName: String, path: String, source: String): DataFrame
+
+  /**
+   * :: Experimental ::
+   * Creates an external table from the given path based on a data source and a set of options.
+   * Then, returns the corresponding DataFrame.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def createExternalTable(
+      tableName: String,
+      source: String,
+      options: java.util.Map[String, String]): DataFrame
+
+  /**
+   * :: Experimental ::
+   * (Scala-specific)
+   * Creates an external table from the given path based on a data source and a set of options.
+   * Then, returns the corresponding DataFrame.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def createExternalTable(
+      tableName: String,
+      source: String,
+      options: Map[String, String]): DataFrame
+
+  /**
+   * :: Experimental ::
+   * Create an external table from the given path based on a data source, a schema and
+   * a set of options. Then, returns the corresponding DataFrame.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def createExternalTable(
+      tableName: String,
+      source: String,
+      schema: StructType,
+      options: java.util.Map[String, String]): DataFrame
+
+  /**
+   * :: Experimental ::
+   * (Scala-specific)
+   * Create an external table from the given path based on a data source, a schema and
+   * a set of options. Then, returns the corresponding DataFrame.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def createExternalTable(
+      tableName: String,
+      source: String,
+      schema: StructType,
+      options: Map[String, String]): DataFrame
+
+  /**
+   * Drops the temporary table with the given table name in the catalog.
+   * If the table has been cached/persisted before, it's also unpersisted.
+   *
+   * @param tableName the name of the table to be unregistered.
+   * @since 2.0.0
+   */
+  def dropTempTable(tableName: String): Unit
+
+  /**
+   * Returns true if the table is currently cached in-memory.
+   *
+   * @since 2.0.0
+   */
+  def isCached(tableName: String): Boolean
+
+  /**
+   * Caches the specified table in-memory.
+   *
+   * @since 2.0.0
+   */
+  def cacheTable(tableName: String): Unit
+
+  /**
+   * Removes the specified table from the in-memory cache.
+   *
+   * @since 2.0.0
+   */
+  def uncacheTable(tableName: String): Unit
+
+  /**
+   * Removes all cached tables from the in-memory cache.
+   *
+   * @since 2.0.0
+   */
+  def clearCache(): Unit
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala
new file mode 100644
index 0000000..d5de6cd
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog
+
+import javax.annotation.Nullable
+
+import org.apache.spark.sql.catalyst.DefinedByConstructorParams
+
+
+// Note: all classes here are expected to be wrapped in Datasets and so must extend
+// DefinedByConstructorParams for the catalog to be able to create encoders for them.
+
+class Database(
+    val name: String,
+    @Nullable val description: String,
+    val locationUri: String)
+  extends DefinedByConstructorParams {
+
+  override def toString: String = {
+    "Database[" +
+      s"name='$name', " +
+      Option(description).map { d => s"description='$d', " }.getOrElse("") +
+      s"path='$locationUri']"
+  }
+
+}
+
+
+class Table(
+    val name: String,
+    @Nullable val database: String,
+    @Nullable val description: String,
+    val tableType: String,
+    val isTemporary: Boolean)
+  extends DefinedByConstructorParams {
+
+  override def toString: String = {
+    "Table[" +
+      s"name='$name', " +
+      Option(database).map { d => s"database='$d', " }.getOrElse("") +
+      Option(description).map { d => s"description='$d', " }.getOrElse("") +
+      s"tableType='$tableType', " +
+      s"isTemporary='$isTemporary']"
+  }
+
+}
+
+
+class Column(
+    val name: String,
+    @Nullable val description: String,
+    val dataType: String,
+    val nullable: Boolean,
+    val isPartition: Boolean,
+    val isBucket: Boolean)
+  extends DefinedByConstructorParams {
+
+  override def toString: String = {
+    "Column[" +
+      s"name='$name', " +
+      Option(description).map { d => s"description='$d', " }.getOrElse("") +
+      s"dataType='$dataType', " +
+      s"nullable='$nullable', " +
+      s"isPartition='$isPartition', " +
+      s"isBucket='$isBucket']"
+  }
+
+}
+
+
+class Function(
+    val name: String,
+    @Nullable val description: String,
+    val className: String,
+    val isTemporary: Boolean)
+  extends DefinedByConstructorParams {
+
+  override def toString: String = {
+    "Function[" +
+      s"name='$name', " +
+      Option(description).map { d => s"description='$d', " }.getOrElse("") +
+      s"className='$className', " +
+      s"isTemporary='$isTemporary']"
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index ebc60ed..e04e130 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -835,9 +835,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       throw new ParseException("Operation not allowed: CREATE TABLE ... CLUSTERED BY ...", ctx)
     }
     val tableType = if (external) {
-      CatalogTableType.EXTERNAL_TABLE
+      CatalogTableType.EXTERNAL
     } else {
-      CatalogTableType.MANAGED_TABLE
+      CatalogTableType.MANAGED
     }
     val comment = Option(ctx.STRING).map(string)
     val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns)
@@ -1083,7 +1083,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
     val sql = Option(source(query))
     val tableDesc = CatalogTable(
       identifier = visitTableIdentifier(name),
-      tableType = CatalogTableType.VIRTUAL_VIEW,
+      tableType = CatalogTableType.VIEW,
       schema = schema,
       storage = EmptyStorageFormat,
       properties = properties,

http://git-wip-us.apache.org/repos/asf/spark/blob/d8a83a56/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
index c283bd6..ec3fada 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
@@ -32,7 +32,7 @@ case class CacheTableCommand(
     plan.foreach { logicalPlan =>
       sparkSession.registerDataFrameAsTable(Dataset.ofRows(sparkSession, logicalPlan), tableName)
     }
-    sparkSession.cacheTable(tableName)
+    sparkSession.catalog.cacheTable(tableName)
 
     if (!isLazy) {
       // Performs eager caching
@@ -62,7 +62,7 @@ case class UncacheTableCommand(tableName: String) extends RunnableCommand {
 case object ClearCacheCommand extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    sparkSession.clearCache()
+    sparkSession.catalog.clearCache()
     Seq.empty[Row]
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org